Crie um pipeline do Dataflow com Python

Este documento mostra como usar o SDK Apache Beam para Python para criar um programa que define um pipeline. Em seguida, executa o pipeline através de um executor local direto ou de um executor baseado na nuvem, como o Dataflow. Para uma introdução ao pipeline WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.


Para seguir orientações passo a passo para esta tarefa diretamente na Google Cloud consola, clique em Orientar-me:

Visita guiada


Antes de começar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  12. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Conceda funções à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Substitua PROJECT_ID pelo ID do seu projeto.
    • Substitua PROJECT_NUMBER pelo número do seu projeto. Para encontrar o número do projeto, consulte o artigo Identifique projetos ou use o comando gcloud projects describe.
    • Substitua SERVICE_ACCOUNT_ROLE por cada função individual.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Padrão).
    • Defina a localização do armazenamento para o seguinte: US (Estados Unidos).
    • Substitua BUCKET_NAME por um nome de contentor exclusivo. Não inclua informações confidenciais no nome do contentor, uma vez que o espaço de nomes do contentor é global e visível publicamente.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copie o Google Cloud ID do projeto e o nome do contentor do Cloud Storage. Vai precisar destes valores mais tarde neste documento.
    • Configure o seu ambiente

      Nesta secção, use a linha de comandos para configurar um ambiente virtual Python isolado para executar o seu projeto de pipeline usando venv. Este processo permite-lhe isolar as dependências de um projeto das dependências de outros projetos.

      Se não tiver um pedido de comando disponível, pode usar a Cloud Shell. O Cloud Shell já tem o gestor de pacotes para o Python 3 instalado, pelo que pode avançar para a criação de um ambiente virtual.

      Para instalar o Python e, em seguida, criar um ambiente virtual, siga estes passos:

      1. Verifique se tem o Python 3 e o pip em execução no seu sistema:
        python --version
        python -m pip --version
      2. Se necessário, instale o Python 3 e, em seguida, configure um ambiente virtual Python: siga as instruções indicadas nas secções Instalar o Python e Configurar o venv da página Configurar um ambiente de desenvolvimento Python.

      Depois de concluir o início rápido, pode desativar o ambiente virtual executando deactivate.

      Obtenha o SDK do Apache Beam

      O SDK Apache Beam é um modelo de programação de código aberto para pipelines de dados. Define um pipeline com um programa Apache Beam e, em seguida, escolhe um executor, como o Dataflow, para executar o pipeline.

      Para transferir e instalar o SDK do Apache Beam, siga estes passos:

      1. Confirme que está no ambiente virtual do Python que criou na secção anterior. Certifique-se de que o comando começa com <env_name>, em que env_name é o nome do ambiente virtual.
      2. Instale a versão mais recente do SDK Apache Beam para Python:
      3. pip install apache-beam[gcp]

      Execute o pipeline localmente

      Para ver como um pipeline é executado localmente, use um módulo Python pronto a usar para o wordcount exemplo incluído no pacote apache_beam.

      O exemplo de pipeline wordcount faz o seguinte:

      1. Aceita um ficheiro de texto como entrada.

        Este ficheiro de texto está localizado num contentor do Cloud Storage com o nome do recurso gs://dataflow-samples/shakespeare/kinglear.txt.

      2. Analisa cada linha em palavras.
      3. Faz uma contagem de frequência das palavras tokenizadas.

      Para preparar o pipeline wordcount localmente, siga estes passos:

      1. No seu terminal local, execute o exemplo wordcount:
        python -m apache_beam.examples.wordcount \
          --output outputs
      2. Veja o resultado da pipeline:
        more outputs*
      3. Para sair, prima q.
      A execução do pipeline localmente permite-lhe testar e depurar o seu programa Apache Beam. Pode ver o wordcount.pycódigo fonte no GitHub do Apache Beam.

      Execute o pipeline no serviço Dataflow

      Nesta secção, execute o pipeline de exemplo wordcount do pacote apache_beam no serviço Dataflow. Este exemplo especifica DataflowRunner como o parâmetro de --runner.
      • Execute a pipeline:
        python -m apache_beam.examples.wordcount \
            --region DATAFLOW_REGION \
            --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://BUCKET_NAME/results/outputs \
            --runner DataflowRunner \
            --project PROJECT_ID \
            --temp_location gs://BUCKET_NAME/tmp/

        Substitua o seguinte:

        • DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow, por exemplo, europe-west1

          A flag --region substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

        • BUCKET_NAME: o nome do contentor do Cloud Storage que copiou anteriormente
        • PROJECT_ID: o Google Cloud ID do projeto que copiou anteriormente

      Veja os resultados

      Quando executa um pipeline através do Dataflow, os resultados são armazenados num contentor do Cloud Storage. Nesta secção, verifique se o pipeline está em execução através da Google Cloud consola ou do terminal local.

      Google Cloud consola

      Para ver os resultados na Google Cloud consola, siga estes passos:

      1. Na Google Cloud consola, aceda à página Tarefas do Dataflow.

        Aceder a Empregos

        A página Tarefas apresenta detalhes da sua tarefa wordcount, incluindo o estado Em execução inicialmente e, em seguida, Concluído.

      2. Aceda à página Contentores do Cloud Storage.

        Aceder a Recipientes

      3. Na lista de contentores no seu projeto, clique no contentor de armazenamento que criou anteriormente.

        No diretório wordcount, são apresentados os ficheiros de saída criados pela tarefa.

      Terminal local

      Veja os resultados a partir do seu terminal ou através do Cloud Shell.

      1. Para listar os ficheiros de saída, use o comando gcloud storage ls:
        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
      2. Substitua BUCKET_NAME pelo nome do contentor do Cloud Storage usado no programa de pipeline.

      3. Para ver os resultados nos ficheiros de saída, use o comando gcloud storage cat:
        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Modifique o código da conduta

      O pipeline wordcount nos exemplos anteriores distingue entre palavras com letras maiúsculas e minúsculas. Os passos seguintes mostram como modificar o pipeline para que o pipeline wordcount não seja sensível a maiúsculas e minúsculas.
      1. Na sua máquina local, transfira a cópia mais recente do wordcount código do repositório do GitHub do Apache Beam.
      2. No terminal local, execute o pipeline:
        python wordcount.py --output outputs
      3. Veja os resultados:
        more outputs*
      4. Para sair, prima q.
      5. Num editor à sua escolha, abra o ficheiro wordcount.py.
      6. Na função run, examine os passos do pipeline:
        counts = (
                lines
                | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum))

        Após split, as linhas são divididas em palavras como strings.

      7. Para converter as strings em minúsculas, modifique a linha após split:
        counts = (
                lines
                | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
                | 'lowercase' >> beam.Map(str.lower)
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
        Esta modificação mapeia a função str.lower para cada palavra. Esta linha é equivalente a beam.Map(lambda word: str.lower(word)).
      8. Guarde o ficheiro e execute a tarefa wordcount modificada:
        python wordcount.py --output outputs
      9. Veja os resultados do pipeline modificado:
        more outputs*
      10. Para sair, prima q.
      11. Execute o pipeline modificado no serviço Dataflow:
        python wordcount.py \
            --region DATAFLOW_REGION \
            --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://BUCKET_NAME/results/outputs \
            --runner DataflowRunner \
            --project PROJECT_ID \
            --temp_location gs://BUCKET_NAME/tmp/

        Substitua o seguinte:

        • DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow
        • BUCKET_NAME: o nome do seu contentor do Cloud Storage
        • PROJECT_ID: o ID do projeto Google Cloud

      Limpar

      Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados nesta página, elimine o Google Cloud projeto com os recursos.

      1. Elimine o contentor:
        gcloud storage buckets delete BUCKET_NAME
      2. Se mantiver o projeto, revogue as funções que concedeu à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      O que se segue?