Crie um pipeline do Dataflow com o Go
Esta página mostra como usar o SDK Apache Beam para Go para criar um programa que define um pipeline. Em seguida, executa o pipeline localmente e no serviço Dataflow. Para uma introdução ao pipeline WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.
Antes de começar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init -
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init -
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Conceda funções à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Substitua
PROJECT_IDpelo ID do seu projeto. - Substitua
PROJECT_NUMBERpelo número do seu projeto. Para encontrar o número do projeto, consulte o artigo Identifique projetos ou use o comandogcloud projects describe. - Substitua
SERVICE_ACCOUNT_ROLEpor cada função individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S(Padrão). -
Defina a localização do armazenamento para o seguinte:
US(Estados Unidos). -
Substitua
BUCKET_NAMEpor um nome de contentor exclusivo. Não inclua informações confidenciais no nome do contentor, uma vez que o espaço de nomes do contentor é global e visível publicamente. - Copie o Google Cloud ID do projeto e o nome do contentor do Cloud Storage. Vai precisar destes valores mais tarde neste início rápido.
- Lê um ficheiro de texto como entrada. Por predefinição, lê um ficheiro de texto localizado num contentor do Cloud Storage com o nome do recurso
gs://dataflow-samples/shakespeare/kinglear.txt. - Analisa cada linha em palavras.
- Faz uma contagem de frequência das palavras tokenizadas.
Use o comando
git clonepara clonar o repositório do GitHubapache/beam:git clone https://github.com/apache/beam.gitMude para o diretório
beam/sdks/go:cd beam/sdks/goUse o seguinte comando para executar o pipeline:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsO sinalizador
inputespecifica o ficheiro a ler e o sinalizadoroutputespecifica o nome do ficheiro para a saída da contagem de frequência.Crie um diretório para o seu módulo Go numa localização à sua escolha:
mkdir wordcountcd wordcountCrie um módulo Go. Para este exemplo, use
example/dataflowcomo o caminho do módulo.go mod init example/dataflowTransfira a cópia mais recente do código
wordcountdo repositório do GitHub do Apache Beam. Coloque este ficheiro no diretóriowordcountque criou.Se estiver a usar um sistema operativo que não seja Linux, tem de obter o pacote Go
unix. Este pacote é necessário para executar pipelines no serviço Dataflow.go get -u golang.org/x/sys/unixCertifique-se de que o ficheiro
go.modcorresponde ao código-fonte do módulo:go mod tidyA partir do terminal, crie e execute o pipeline localmente:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsVeja os resultados da saída:
more outputs*Para sair, prima q.
Num editor à sua escolha, abra o ficheiro
wordcount.go.Examine o bloco
init(os comentários foram removidos para maior clareza):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }Adicione uma nova linha para registar a função
strings.ToLower:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }Examine a função
CountWords:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }Para escrever as palavras em minúsculas, adicione um elemento ParDo que aplica
strings.ToLowera todas as palavras:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }Guarde o ficheiro.
Crie e execute o pipeline
wordcountmodificado:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsVeja os resultados da saída do pipeline modificado. Todas as palavras devem estar em minúsculas.
more outputs*Para sair, prima q.
BUCKET_NAME: o nome do contentor do Cloud Storage.PROJECT_ID: o Google Cloud ID do projeto.DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow. Por exemplo,europe-west1. Para ver uma lista de localizações disponíveis, consulte o artigo Localizações do Dataflow. A flag--regionsubstitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.Para listar os ficheiros de saída, use o comando
gcloud storage ls:gcloud storage ls gs://BUCKET_NAME/results/outputs* --longSubstitua
BUCKET_NAMEpelo nome do contentor do Cloud Storage de saída especificado.Para ver os resultados nos ficheiros de saída, use o comando
gcloud storage cat:gcloud storage cat gs://BUCKET_NAME/results/outputs*-
Elimine o contentor:
gcloud storage buckets delete BUCKET_NAME
Se mantiver o projeto, revogue as funções que concedeu à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Configure o ambiente de programação
O SDK Apache Beam é um modelo de programação de código aberto para pipelines de dados. Define um pipeline com um programa Apache Beam e, em seguida, escolhe um executor, como o Dataflow, para executar o pipeline.
Recomendamos que use a versão mais recente do Go quando trabalhar com o SDK do Apache Beam para Go. Se não tiver a versão mais recente do Go instalada, use o guia de transferência e instalação do Go para transferir e instalar o Go para o seu sistema operativo específico.
Para validar a versão do Go que tem instalada, execute o seguinte comando no seu terminal local:
go versionExecute o exemplo de contagem de palavras do Beam
O SDK Apache Beam para Go inclui um
wordcountexemplo de pipeline. O exemplowordcountfaz o seguinte:Para executar a versão mais recente do exemplo do Beam
wordcountna sua máquina local, siga estes passos:Após a conclusão do pipeline, veja os resultados:
more outputs*Para sair, prima q.
Modifique o código da conduta
O pipeline do Beam distingue entre palavras com letras maiúsculas e minúsculas.
wordcountOs passos seguintes mostram como criar o seu próprio módulo Go, modificar o pipelinewordcountpara que não seja sensível a maiúsculas e minúsculas, e executá-lo no Dataflow.Crie um módulo Go
Para fazer alterações ao código do pipeline, siga estes passos.
Execute o pipeline não modificado
Valide se o pipeline
wordcountnão modificado é executado localmente.Altere o código da tubagem
Para alterar o pipeline de modo a que não seja sensível a maiúsculas e minúsculas, modifique o código para aplicar a função
strings.ToLowera todas as palavras.Execute o pipeline atualizado localmente
Execute o pipeline
wordcountatualizado localmente e verifique se o resultado foi alterado.Execute o pipeline no serviço Dataflow
Para executar o exemplo
wordcountatualizado no serviço Dataflow, use o seguinte comando:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner dataflow \ --project PROJECT_ID \ --region DATAFLOW_REGION \ --staging_location gs://BUCKET_NAME/binaries/Substitua o seguinte:
Veja os resultados
Pode ver uma lista das suas tarefas do Dataflow na Google Cloud consola. Na Google Cloud consola, aceda à página Tarefas do Dataflow.
A página Tarefas apresenta detalhes da sua tarefa
wordcount, incluindo um estado de Em execução no início e, em seguida, Concluído.Quando executa um pipeline através do Dataflow, os resultados são armazenados num contentor do Cloud Storage. Veja os resultados da saída através daGoogle Cloud consola ou do terminal local.
Consola
Para ver os resultados na Google Cloud consola, aceda à página Recipientes do Cloud Storage.
Na lista de contentores no seu projeto, clique no contentor de armazenamento que criou anteriormente. Os ficheiros de saída criados pela tarefa são apresentados no diretório
results.Terminal
Veja os resultados a partir do seu terminal ou através do Cloud Shell.
Limpar
Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados nesta página, elimine o Google Cloud projeto com os recursos.
O que se segue?
-
Set the storage class to