Crie um pipeline do Dataflow com o Go

Esta página mostra como usar o SDK Apache Beam para Go para criar um programa que define um pipeline. Em seguida, executa o pipeline localmente e no serviço Dataflow. Para uma introdução ao pipeline WordCount, consulte o vídeo Como usar o WordCount no Apache Beam.

Antes de começar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  12. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Conceda funções à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Substitua PROJECT_ID pelo ID do seu projeto.
    • Substitua PROJECT_NUMBER pelo número do seu projeto. Para encontrar o número do projeto, consulte o artigo Identifique projetos ou use o comando gcloud projects describe.
    • Substitua SERVICE_ACCOUNT_ROLE por cada função individual.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Padrão).
    • Defina a localização do armazenamento para o seguinte: US (Estados Unidos).
    • Substitua BUCKET_NAME por um nome de contentor exclusivo. Não inclua informações confidenciais no nome do contentor, uma vez que o espaço de nomes do contentor é global e visível publicamente.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copie o Google Cloud ID do projeto e o nome do contentor do Cloud Storage. Vai precisar destes valores mais tarde neste início rápido.
    • Configure o ambiente de programação

      O SDK Apache Beam é um modelo de programação de código aberto para pipelines de dados. Define um pipeline com um programa Apache Beam e, em seguida, escolhe um executor, como o Dataflow, para executar o pipeline.

      Recomendamos que use a versão mais recente do Go quando trabalhar com o SDK do Apache Beam para Go. Se não tiver a versão mais recente do Go instalada, use o guia de transferência e instalação do Go para transferir e instalar o Go para o seu sistema operativo específico.

      Para validar a versão do Go que tem instalada, execute o seguinte comando no seu terminal local:

      go version

      Execute o exemplo de contagem de palavras do Beam

      O SDK Apache Beam para Go inclui um wordcount exemplo de pipeline. O exemplo wordcount faz o seguinte:

      1. Lê um ficheiro de texto como entrada. Por predefinição, lê um ficheiro de texto localizado num contentor do Cloud Storage com o nome do recurso gs://dataflow-samples/shakespeare/kinglear.txt.
      2. Analisa cada linha em palavras.
      3. Faz uma contagem de frequência das palavras tokenizadas.

      Para executar a versão mais recente do exemplo do Beam wordcount na sua máquina local, siga estes passos:

      1. Use o comando git clone para clonar o repositório do GitHub apache/beam:

        git clone https://github.com/apache/beam.git
      2. Mude para o diretório beam/sdks/go:

        cd beam/sdks/go
      3. Use o seguinte comando para executar o pipeline:

        go run examples/wordcount/wordcount.go \
          --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output outputs

        O sinalizador input especifica o ficheiro a ler e o sinalizador output especifica o nome do ficheiro para a saída da contagem de frequência.

      Após a conclusão do pipeline, veja os resultados:

      more outputs*

      Para sair, prima q.

      Modifique o código da conduta

      O pipeline do Beam distingue entre palavras com letras maiúsculas e minúsculas.wordcount Os passos seguintes mostram como criar o seu próprio módulo Go, modificar o pipeline wordcount para que não seja sensível a maiúsculas e minúsculas, e executá-lo no Dataflow.

      Crie um módulo Go

      Para fazer alterações ao código do pipeline, siga estes passos.

      1. Crie um diretório para o seu módulo Go numa localização à sua escolha:

        mkdir wordcount
        cd wordcount
      2. Crie um módulo Go. Para este exemplo, use example/dataflow como o caminho do módulo.

        go mod init example/dataflow
      3. Transfira a cópia mais recente do código wordcount do repositório do GitHub do Apache Beam. Coloque este ficheiro no diretório wordcount que criou.

      4. Se estiver a usar um sistema operativo que não seja Linux, tem de obter o pacote Go unix. Este pacote é necessário para executar pipelines no serviço Dataflow.

        go get -u golang.org/x/sys/unix
      5. Certifique-se de que o ficheiro go.mod corresponde ao código-fonte do módulo:

        go mod tidy

      Execute o pipeline não modificado

      Valide se o pipeline wordcount não modificado é executado localmente.

      1. A partir do terminal, crie e execute o pipeline localmente:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Veja os resultados da saída:

         more outputs*
      3. Para sair, prima q.

      Altere o código da tubagem

      Para alterar o pipeline de modo a que não seja sensível a maiúsculas e minúsculas, modifique o código para aplicar a função strings.ToLower a todas as palavras.

      1. Num editor à sua escolha, abra o ficheiro wordcount.go.

      2. Examine o bloco init (os comentários foram removidos para maior clareza):

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
         }
        
      3. Adicione uma nova linha para registar a função strings.ToLower:

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
           register.Function1x1(strings.ToLower)
         }
        
      4. Examine a função CountWords:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Count the number of times each word occurs.
           return stats.Count(s, col)
         }
        
      5. Para escrever as palavras em minúsculas, adicione um elemento ParDo que aplica strings.ToLower a todas as palavras:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Map all letters to lowercase.
           lowercaseWords := beam.ParDo(s, strings.ToLower, col)
        
           // Count the number of times each word occurs.
           return stats.Count(s, lowercaseWords)
         }
        
      6. Guarde o ficheiro.

      Execute o pipeline atualizado localmente

      Execute o pipeline wordcount atualizado localmente e verifique se o resultado foi alterado.

      1. Crie e execute o pipeline wordcountmodificado:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Veja os resultados da saída do pipeline modificado. Todas as palavras devem estar em minúsculas.

         more outputs*
      3. Para sair, prima q.

      Execute o pipeline no serviço Dataflow

      Para executar o exemplo wordcount atualizado no serviço Dataflow, use o seguinte comando:

      go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://BUCKET_NAME/results/outputs \
          --runner dataflow \
          --project PROJECT_ID \
          --region DATAFLOW_REGION \
          --staging_location gs://BUCKET_NAME/binaries/

      Substitua o seguinte:

      • BUCKET_NAME: o nome do contentor do Cloud Storage.

      • PROJECT_ID: o Google Cloud ID do projeto.

      • DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow. Por exemplo, europe-west1. Para ver uma lista de localizações disponíveis, consulte o artigo Localizações do Dataflow. A flag --region substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

      Veja os resultados

      Pode ver uma lista das suas tarefas do Dataflow na Google Cloud consola. Na Google Cloud consola, aceda à página Tarefas do Dataflow.

      Aceder a Empregos

      A página Tarefas apresenta detalhes da sua tarefa wordcount, incluindo um estado de Em execução no início e, em seguida, Concluído.

      Quando executa um pipeline através do Dataflow, os resultados são armazenados num contentor do Cloud Storage. Veja os resultados da saída através daGoogle Cloud consola ou do terminal local.

      Consola

      Para ver os resultados na Google Cloud consola, aceda à página Recipientes do Cloud Storage.

      Aceda a Recipientes

      Na lista de contentores no seu projeto, clique no contentor de armazenamento que criou anteriormente. Os ficheiros de saída criados pela tarefa são apresentados no diretório results.

      Terminal

      Veja os resultados a partir do seu terminal ou através do Cloud Shell.

      1. Para listar os ficheiros de saída, use o comando gcloud storage ls:

        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

        Substitua BUCKET_NAME pelo nome do contentor do Cloud Storage de saída especificado.

      2. Para ver os resultados nos ficheiros de saída, use o comando gcloud storage cat:

        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Limpar

      Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados nesta página, elimine o Google Cloud projeto com os recursos.

      1. Elimine o contentor:
        gcloud storage buckets delete BUCKET_NAME
      2. Se mantiver o projeto, revogue as funções que concedeu à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      O que se segue?