Criar um pipeline do Dataflow usando Go

Nesta página, mostramos como usar o SDK do Apache Beam para Go a fim de criar um programa que define um pipeline. Em seguida, você executa o pipeline localmente e no serviço do Dataflow. Para uma introdução ao pipeline do WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.

Antes de começar

  1. Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.

  3. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

  4. Para inicializar a gcloud CLI, execute o seguinte comando:

    gcloud init
  5. Crie ou selecione um Google Cloud projeto.

    Funções necessárias para selecionar ou criar um projeto

    • Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos (roles/resourcemanager.projectCreator), que contém a permissão resourcemanager.projects.create. Saiba como conceder papéis.
    • Crie um projeto do Google Cloud :

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto Google Cloud que você está criando.

    • Selecione o projeto Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud .

  6. Verifique se o faturamento está ativado para o projeto do Google Cloud .

  7. Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON e Cloud Resource Manager:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. Crie credenciais de autenticação local para sua conta de usuário:

    gcloud auth application-default login

    Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.

  9. Atribua papéis à sua conta de usuário. Execute o seguinte comando uma vez para cada um dos seguintes papéis do IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • USER_IDENTIFIER: o identificador da sua conta de usuário . Por exemplo, myemail@example.com.
    • ROLE: o papel do IAM concedido à sua conta de usuário.
  10. Instale a CLI do Google Cloud.

  11. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

  12. Para inicializar a gcloud CLI, execute o seguinte comando:

    gcloud init
  13. Crie ou selecione um Google Cloud projeto.

    Funções necessárias para selecionar ou criar um projeto

    • Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos (roles/resourcemanager.projectCreator), que contém a permissão resourcemanager.projects.create. Saiba como conceder papéis.
    • Crie um projeto do Google Cloud :

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto Google Cloud que você está criando.

    • Selecione o projeto Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud .

  14. Verifique se o faturamento está ativado para o projeto do Google Cloud .

  15. Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON e Cloud Resource Manager:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (roles/serviceusage.serviceUsageAdmin), que contém a permissão serviceusage.services.enable. Saiba como conceder papéis.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. Crie credenciais de autenticação local para sua conta de usuário:

    gcloud auth application-default login

    Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.

  17. Atribua papéis à sua conta de usuário. Execute o seguinte comando uma vez para cada um dos seguintes papéis do IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • USER_IDENTIFIER: o identificador da sua conta de usuário . Por exemplo, myemail@example.com.
    • ROLE: o papel do IAM concedido à sua conta de usuário.
  18. Conceda papéis à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua PROJECT_NUMBER pelo número do projeto. Para encontrar o número do projeto, consulte Identificar projetos ou use o comando gcloud projects describe.
    • Substitua SERVICE_ACCOUNT_ROLE por cada papel individual.
  19. Crie um bucket do Cloud Storage e configure-o da seguinte maneira:
    • Defina a classe de armazenamento como S (Standard).
    • Defina o local de armazenamento como o seguinte: US (Estados Unidos).
    • Substitua BUCKET_NAME por um nome de bucket exclusivo. Não inclua informações sensíveis no nome do bucket já que o namespace dele é global e visível para o público.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. Copie o ID do projeto Google Cloud e o nome do bucket do Cloud Storage. Você precisará desses valores posteriormente neste guia de início rápido.

Configurar o ambiente de desenvolvimento

O SDK do Apache Beam é um modelo de programação de código aberto para pipelines de dados. Defina um pipeline com um programa do Apache Beam e escolha um executor, como o Dataflow, para executar o pipeline.

Recomendamos que você use a versão mais recente do Go ao trabalhar com o SDK do Apache Beam para Go. Se você não tiver a versão mais recente do Go instalada, use o Guia de download e instalação para fazer o download e instalar o Go no seu sistema operacional específico.

Para verificar a versão do Go instalada, execute o seguinte comando no seu terminal local:

go version

Executar o exemplo de contagem de palavras do Beam

O SDK do Apache Beam para Go inclui um exemplo de pipeline wordcount (em inglês). O exemplo wordcount faz o seguinte:

  1. Lê um arquivo de texto como entrada. Por padrão, ele lê um arquivo de texto localizado em um bucket do Cloud Storage com o nome de recurso gs://dataflow-samples/shakespeare/kinglear.txt.
  2. Analisa cada linha na forma de palavras.
  3. Realiza uma contagem de frequência com base nas palavras tokenizadas.

Para executar a versão mais recente do exemplo wordcount do Beam na máquina local, siga estas etapas:

  1. Use o comando git clone para clonar o repositório do GitHub apache/beam:

    git clone https://github.com/apache/beam.git
  2. Alterne para o diretório beam/sdks/go:

    cd beam/sdks/go
  3. Use o comando a seguir para executar o pipeline:

    go run examples/wordcount/wordcount.go \
      --input gs://dataflow-samples/shakespeare/kinglear.txt \
      --output outputs

    A flag input especifica o arquivo a ser lido, e a flag output especifica o nome do arquivo da saída da contagem de frequência.

Depois que o pipeline for concluído, veja os resultados da saída:

more outputs*

Para sair, pressione q.

Modificar o código do pipeline

O pipeline wordcount do Beam diferencia palavras maiúsculas e minúsculas. As etapas a seguir mostram como criar seu próprio módulo Go, modificar o pipeline wordcount para que ele não diferencie maiúsculas de minúsculas e executá-lo no Dataflow.

Criar um módulo Go

Para fazer alterações no código do pipeline, siga estas etapas.

  1. Crie um diretório para seu módulo Go em um local de sua escolha:

    mkdir wordcount
    cd wordcount
  2. Crie um módulo Go. Neste exemplo, use example/dataflow como o caminho do módulo.

    go mod init example/dataflow
  3. Faça o download da cópia mais recente do código wordcount no repositório do Apache Beam no GitHub. Coloque esse arquivo no diretório wordcount que você criou.

  4. Se estiver usando um sistema operacional não Linux, será necessário conseguir o pacote unix do Go. Esse pacote é necessário para executar pipelines no serviço do Dataflow.

    go get -u golang.org/x/sys/unix
  5. Verifique se o arquivo go.mod corresponde ao código-fonte do módulo:

    go mod tidy

Execute o pipeline não modificado

Verifique se o pipeline wordcount não modificado é executado localmente.

  1. No terminal, crie e execute o pipeline localmente:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. Veja os resultados da saída:

     more outputs*
  3. Para sair, pressione q.

Alterar o código do pipeline

Para alterar o pipeline para que ele não diferencie maiúsculas de minúsculas, modifique o código para aplicar a função strings.ToLower a todas as palavras.

  1. Em um editor de sua escolha, abra o arquivo wordcount.go.

  2. Examine o bloco init (os comentários foram removidos para maior clareza):

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. Adicione uma nova linha para registrar a função strings.ToLower:

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. Examine a função CountWords:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. Para colocar as palavras em minúsculas, adicione uma ParDo que aplica strings.ToLower a cada palavra:

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. Salve o arquivo.

Executar o pipeline atualizado localmente

Execute o pipeline wordcount atualizado localmente e verifique se a saída mudou.

  1. Crie e execute o pipeline wordcount modificado:

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. Veja os resultados de saída do pipeline modificado. Todas as palavras precisam estar em letras minúsculas.

     more outputs*
  3. Para sair, pressione q.

Executar o pipeline no serviço do Dataflow

Para executar o exemplo wordcount atualizado no serviço Dataflow, use o seguinte comando:

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

Substitua:

  • BUCKET_NAME: o nome do bucket do Cloud Storage.

  • PROJECT_ID: o ID do projeto do Google Cloud .

  • DATAFLOW_REGION: a região onde você quer implantar o job do Dataflow. Por exemplo, europe-west1. Para uma lista de locais disponíveis, consulte Locais do Dataflow. A sinalização --region substitui a região padrão definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Ver os resultados

É possível ver uma lista dos jobs do Dataflow no console doGoogle Cloud . No console Google Cloud , acesse a página Jobs do Dataflow.

Acessar "Jobs"

A página Jobs exibe detalhes do job do wordcount, incluindo o status Em execução primeiro e, depois, Finalizado.

Quando você executa um pipeline usando o Dataflow, os resultados são armazenados em um bucket do Cloud Storage. Visualize os resultados de saída usando o console doGoogle Cloud ou o terminal local.

Console

Para ver os resultados no console do Google Cloud , acesse a página Buckets do Cloud Storage.

Acessar buckets

Na lista de buckets do projeto, clique no bucket de armazenamento que você criou anteriormente. Os arquivos de saída criados pelo job são exibidos no diretório results.

Terminal

Acesse os resultados no seu terminal ou usando o Cloud Shell.

  1. Para listar os arquivos de saída, use o comando gcloud storage ls:

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

    Substitua BUCKET_NAME pelo nome do bucket de saída especificado do Cloud Storage.

  2. Para acessar os resultados nos arquivos de saída, use o comando gcloud storage cat:

    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Limpar

Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud e os recursos.

  1. No console do Google Cloud , acesse a página Buckets do Cloud Storage.

    Acessar buckets

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.
  4. Se você mantiver o projeto, revogue os papéis concedidos à conta de serviço padrão do Compute Engine. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:

    gcloud auth application-default revoke
  6. Opcional: revogar credenciais da CLI gcloud.

    gcloud auth revoke

A seguir