Crie um pipeline do Dataflow com Python
Este documento mostra como usar o SDK Apache Beam para Python para criar um programa que define um pipeline. Em seguida, executa o pipeline através de um executor local direto ou de um executor baseado na nuvem, como o Dataflow. Para uma introdução ao pipeline WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.
Para seguir orientações passo a passo para esta tarefa diretamente na Google Cloud consola, clique em Orientar-me:
Antes de começar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init -
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init -
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Conceda funções à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Substitua
PROJECT_IDpelo ID do seu projeto. - Substitua
PROJECT_NUMBERpelo número do seu projeto. Para encontrar o número do projeto, consulte o artigo Identifique projetos ou use o comandogcloud projects describe. - Substitua
SERVICE_ACCOUNT_ROLEpor cada função individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S(Padrão). -
Defina a localização do armazenamento para o seguinte:
US(Estados Unidos). -
Substitua
BUCKET_NAMEpor um nome de contentor exclusivo. Não inclua informações confidenciais no nome do contentor, uma vez que o espaço de nomes do contentor é global e visível publicamente. - Copie o Google Cloud ID do projeto e o nome do contentor do Cloud Storage. Vai precisar destes valores mais tarde neste documento.
- Verifique se tem o Python 3 e o
pipem execução no seu sistema:python --version python -m pip --version
- Se necessário, instale o Python 3 e, em seguida, configure um ambiente virtual Python: siga as instruções indicadas nas secções Instalar o Python e Configurar o venv da página Configurar um ambiente de desenvolvimento Python.
- Confirme que está no ambiente virtual do Python que criou na secção anterior.
Certifique-se de que o comando começa com
<env_name>, em queenv_nameé o nome do ambiente virtual. - Instale a versão mais recente do SDK Apache Beam para Python:
Aceita um ficheiro de texto como entrada.
Este ficheiro de texto está localizado num contentor do Cloud Storage com o nome do recurso
gs://dataflow-samples/shakespeare/kinglear.txt.- Analisa cada linha em palavras.
- Faz uma contagem de frequência das palavras tokenizadas.
- No seu terminal local, execute o exemplo
wordcount:python -m apache_beam.examples.wordcount \ --output outputs
- Veja o resultado da pipeline:
more outputs* - Para sair, prima q.
- Execute a pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Substitua o seguinte:
DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow, por exemplo,europe-west1A flag
--regionsubstitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.BUCKET_NAME: o nome do contentor do Cloud Storage que copiou anteriormentePROJECT_ID: o Google Cloud ID do projeto que copiou anteriormente
- Na Google Cloud consola, aceda à página Tarefas do Dataflow.
A página Tarefas apresenta detalhes da sua tarefa
wordcount, incluindo o estado Em execução inicialmente e, em seguida, Concluído. - Aceda à página Contentores do Cloud Storage.
Na lista de contentores no seu projeto, clique no contentor de armazenamento que criou anteriormente.
No diretório
wordcount, são apresentados os ficheiros de saída criados pela tarefa.- Para listar os ficheiros de saída, use o comando
gcloud storage ls:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Para ver os resultados nos ficheiros de saída, use o comando
gcloud storage cat:gcloud storage cat gs://BUCKET_NAME/results/outputs*
- Na sua máquina local, transfira a cópia mais recente do
wordcountcódigo do repositório do GitHub do Apache Beam. - No terminal local, execute o pipeline:
python wordcount.py --output outputs
- Veja os resultados:
more outputs* - Para sair, prima q.
- Num editor à sua escolha, abra o ficheiro
wordcount.py. - Na função
run, examine os passos do pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Após
split, as linhas são divididas em palavras como strings. - Para converter as strings em minúsculas, modifique a linha após
split: Esta modificação mapeia a funçãocounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lowerpara cada palavra. Esta linha é equivalente abeam.Map(lambda word: str.lower(word)). - Guarde o ficheiro e execute a tarefa
wordcountmodificada:python wordcount.py --output outputs
- Veja os resultados do pipeline modificado:
more outputs* - Para sair, prima q.
- Execute o pipeline modificado no serviço Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Substitua o seguinte:
DATAFLOW_REGION: a região onde quer implementar a tarefa do DataflowBUCKET_NAME: o nome do seu contentor do Cloud StoragePROJECT_ID: o ID do projeto Google Cloud
-
Elimine o contentor:
gcloud storage buckets delete BUCKET_NAME
Se mantiver o projeto, revogue as funções que concedeu à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Configure o seu ambiente
Nesta secção, use a linha de comandos para configurar um ambiente virtual Python isolado para executar o seu projeto de pipeline usando venv. Este processo permite-lhe isolar as dependências de um projeto das dependências de outros projetos.
Se não tiver um pedido de comando disponível, pode usar a Cloud Shell. O Cloud Shell já tem o gestor de pacotes para o Python 3 instalado, pelo que pode avançar para a criação de um ambiente virtual.
Para instalar o Python e, em seguida, criar um ambiente virtual, siga estes passos:
Depois de concluir o início rápido, pode desativar o ambiente virtual executando
deactivate.Obtenha o SDK do Apache Beam
O SDK Apache Beam é um modelo de programação de código aberto para pipelines de dados. Define um pipeline com um programa Apache Beam e, em seguida, escolhe um executor, como o Dataflow, para executar o pipeline.
Para transferir e instalar o SDK do Apache Beam, siga estes passos:
pip install apache-beam[gcp]
Execute o pipeline localmente
Para ver como um pipeline é executado localmente, use um módulo Python pronto a usar para o
wordcountexemplo incluído no pacoteapache_beam.O exemplo de pipeline
wordcountfaz o seguinte:Para preparar o pipeline
wordcountlocalmente, siga estes passos:wordcount.pycódigo fonte no GitHub do Apache Beam.Execute o pipeline no serviço Dataflow
Nesta secção, execute o pipeline de exemplowordcountdo pacoteapache_beamno serviço Dataflow. Este exemplo especificaDataflowRunnercomo o parâmetro de--runner.Veja os resultados
Quando executa um pipeline através do Dataflow, os resultados são armazenados num contentor do Cloud Storage. Nesta secção, verifique se o pipeline está em execução através da Google Cloud consola ou do terminal local.
Google Cloud consola
Para ver os resultados na Google Cloud consola, siga estes passos:
Terminal local
Veja os resultados a partir do seu terminal ou através do Cloud Shell.
Substitua
BUCKET_NAMEpelo nome do contentor do Cloud Storage usado no programa de pipeline.Modifique o código da conduta
O pipelinewordcountnos exemplos anteriores distingue entre palavras com letras maiúsculas e minúsculas. Os passos seguintes mostram como modificar o pipeline para que o pipelinewordcountnão seja sensível a maiúsculas e minúsculas.Limpar
Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados nesta página, elimine o Google Cloud projeto com os recursos.
O que se segue?
-
Set the storage class to