Criar um pipeline do Dataflow usando Python

Este documento mostra como usar o SDK do Apache Beam para Python na criação de um programa que defina um pipeline. Em seguida, execute o pipeline usando um executor local direto ou um executor baseado na nuvem, como o Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.


Para seguir as instruções detalhadas desta tarefa diretamente no console do Google Cloud , clique em Orientação:

Orientações


Antes de começar

  1. Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.

  3. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

  4. Para inicializar a gcloud CLI, execute o seguinte comando:

    gcloud init
  5. Crie ou selecione um Google Cloud projeto.

    Funções necessárias para selecionar ou criar um projeto

    • Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos (roles/resourcemanager.projectCreator), que contém a permissão resourcemanager.projects.create. Saiba como conceder papéis.
    • Crie um projeto do Google Cloud :

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto Google Cloud que você está criando.

    • Selecione o projeto Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud .

  6. Verifique se o faturamento está ativado para o projeto do Google Cloud .

  7. Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Crie credenciais de autenticação local para sua conta de usuário:

    gcloud auth application-default login

    Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.

  9. Atribua papéis à sua conta de usuário. Execute o seguinte comando uma vez para cada um dos seguintes papéis do IAM: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • USER_IDENTIFIER: o identificador da sua conta de usuário . Por exemplo, myemail@example.com.
    • ROLE: o papel do IAM concedido à sua conta de usuário.
  10. Instale a CLI do Google Cloud.

  11. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

  12. Para inicializar a gcloud CLI, execute o seguinte comando:

    gcloud init
  13. Crie ou selecione um Google Cloud projeto.

    Funções necessárias para selecionar ou criar um projeto

    • Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos (roles/resourcemanager.projectCreator), que contém a permissão resourcemanager.projects.create. Saiba como conceder papéis.
    • Crie um projeto do Google Cloud :

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto Google Cloud que você está criando.

    • Selecione o projeto Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud .

  14. Verifique se o faturamento está ativado para o projeto do Google Cloud .

  15. Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Crie credenciais de autenticação local para sua conta de usuário:

    gcloud auth application-default login

    Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.

  17. Atribua papéis à sua conta de usuário. Execute o seguinte comando uma vez para cada um dos seguintes papéis do IAM: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • USER_IDENTIFIER: o identificador da sua conta de usuário . Por exemplo, myemail@example.com.
    • ROLE: o papel do IAM concedido à sua conta de usuário.
  18. Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua PROJECT_NUMBER pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comando gcloud projects describe.
    • Substitua SERVICE_ACCOUNT_ROLE por cada papel individual.
  19. Crie um bucket do Cloud Storage e configure-o da seguinte maneira:
    • Defina a classe de armazenamento como S (Standard).
    • Defina o local de armazenamento como o seguinte: US (Estados Unidos).
    • Substitua BUCKET_NAME por um nome de bucket exclusivo. Não inclua informações sensíveis no nome do bucket já que o namespace dele é global e visível para o público.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. Copie o ID do projeto Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste documento.

Configurar o ambiente

Nesta seção, use o prompt de comando para configurar um ambiente virtual Python isolado e executar seu projeto de pipeline usando venv. Esse processo permite isolar as dependências de um projeto das dependências de outros projetos.

Caso você não tenha um prompt de comando disponível, use o Cloud Shell. O Cloud Shell já tem o gerenciador de pacotes do Python 3 instalado, portanto, você pode pular para a criação de um ambiente virtual.

Para instalar o Python e criar um ambiente virtual, siga estas etapas:

  1. Verifique se o Python 3 e o pip estão em execução no sistema:
    python --version
    python -m pip --version
  2. Se necessário, instale o Python 3 e, em seguida, configure um ambiente virtual do Python: siga as instruções fornecidas nas seções Como instalar o Python e Como configurar o venv do Como configurar uma página do ambiente de desenvolvimento em Python.

Depois de concluir o guia de início rápido, execute deactivate para desativar o ambiente virtual.

Instale o SDK do Apache Beam

O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.

Para fazer o download e instalar o SDK do Apache Beam, siga estas etapas:

  1. Verifique se você está no ambiente virtual do Python criado na seção anterior. Verifique se o prompt começa com <env_name>, em que env_name é o nome do ambiente virtual.
  2. Instale a versão mais recente do SDK do Apache Beam para Python:
  3. pip install apache-beam[gcp]

Execute o pipeline localmente

Para ver como um pipeline é executado localmente, use um módulo Python pronto para o exemplo wordcount incluído no pacote apache_beam.

O exemplo de pipeline wordcount faz o seguinte:

  1. Usa um arquivo de texto como entrada.

    Este arquivo de texto está localizado em um bucket do Cloud Storage com o nome do recurso gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Analisa cada linha na forma de palavras.
  3. Realiza uma contagem de frequência com base nas palavras tokenizadas.

Para preparar o pipeline wordcount localmente, siga estas etapas:

  1. No terminal local, execute o exemplo wordcount:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Veja a saída do pipeline:
    more outputs*
  3. Para sair, pressione q.
Executar o pipeline localmente permite testar e depurar o programa Apache Beam. O código-fonte do wordcount.py pode ser visualizado no GitHub do Apache Beam.

Executar o pipeline no serviço do Dataflow

Nesta seção, execute o pipeline de exemplo wordcount do pacote apache_beam no serviço do Dataflow. Este exemplo especifica DataflowRunner como o parâmetro para --runner.
  • Execute o canal:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Substitua:

    • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow, por exemplo, europe-west1

      A sinalização --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

    • BUCKET_NAME: o nome do bucket do Cloud Storage que você copiou anteriormente
    • PROJECT_ID: o ID do projeto Google Cloud que você copiou anteriormente.

Ver os resultados

Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Nesta seção, verifique se o pipeline está em execução usando o console Google Cloud ou o terminal local.

Console doGoogle Cloud

Para ver os resultados no console do Google Cloud , siga estas etapas:

  1. No console Google Cloud , acesse a página Jobs do Dataflow.

    Acessar "Jobs"

    A página Jobs exibe detalhes do job do wordcount, incluindo o status Em execução primeiro e depois Finalizado.

  2. Acesse a página Buckets do Cloud Storage:

    Acessar buckets

  3. Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente.

    No diretório wordcount, os arquivos de saída criados pelo seu job são exibidos.

Terminal local

Acesse os resultados no seu terminal ou usando o Cloud Shell.

  1. Para listar os arquivos de saída, use o comando gcloud storage ls:
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. Substitua BUCKET_NAME pelo nome do bucket do Cloud Storage usado no programa de pipeline.

  3. Para acessar os resultados nos arquivos de saída, use o comando gcloud storage cat:
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Modificar o código do pipeline

Nos exemplos anteriores, o pipeline wordcount diferencia letras maiúsculas e minúsculas. Nas etapas a seguir, mostramos como modificar o pipeline para que o wordcount não diferencie maiúsculas de minúsculas.
  1. Na máquina local, faça o download da cópia mais recente do código wordcount no repositório do Apache Beam no GitHub.
  2. No terminal local, execute o pipeline:
    python wordcount.py --output outputs
  3. Ver os resultados:
    more outputs*
  4. Para sair, pressione q.
  5. Em um editor de sua escolha, abra o arquivo wordcount.py.
  6. Dentro da função run, examine as etapas do pipeline:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Depois de split, as linhas são divididas em palavras como strings.

  7. Para usar letras minúsculas em strings, modifique a linha após split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Essa modificação mapeia a função str.lower em todas as palavras. Essa linha é equivalente a beam.Map(lambda word: str.lower(word)).
  8. Salve o arquivo e execute o job wordcount modificado:
    python wordcount.py --output outputs
  9. Veja os resultados do pipeline modificado:
    more outputs*
  10. Para sair, pressione q.
  11. Execute o pipeline modificado no serviço do Dataflow:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Substitua:

    • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow
    • BUCKET_NAME: seu nome do bucket do Cloud Storage
    • PROJECT_ID: o ID do projeto do Google Cloud

Limpar

Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud e os recursos.

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.
  4. Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:

    gcloud auth application-default revoke
  6. Opcional: revogar credenciais da CLI gcloud.

    gcloud auth revoke

A seguir