Faça streaming do Pub/Sub para o BigQuery

Este tutorial usa o modelo Subscrição do Pub/Sub para o BigQuery para criar e executar uma tarefa de modelo do Dataflow com a Google Cloud consola ou a CLI Google Cloud. O tutorial explica um exemplo de pipeline de streaming que lê mensagens codificadas em JSON do Pub/Sub e as escreve numa tabela do BigQuery.

As pipelines de análise de streaming e integração de dados usam o Pub/Sub para carregar e distribuir dados. O Pub/Sub permite-lhe criar sistemas de produtores e consumidores de eventos, denominados publicadores e subscritores. Os publicadores enviam eventos para o serviço Pub/Sub de forma assíncrona, e o Pub/Sub envia os eventos para todos os serviços que precisam de reagir a eles.

O Dataflow é um serviço totalmente gerido para transformar e enriquecer dados nos modos de streaming (em tempo real) e em lote. Oferece um ambiente de desenvolvimento de pipeline simplificado que usa o SDK do Apache Beam para transformar os dados recebidos e, em seguida, gerar os dados transformados.

A vantagem deste fluxo de trabalho é que pode usar UDFs para transformar os dados das mensagens antes de serem escritos no BigQuery.

Antes de executar um pipeline do Dataflow para este cenário, considere se uma subscrição do Pub/Sub BigQuery com uma UDF cumpre os seus requisitos.

Objetivos

  • Crie um tópico Pub/Sub.
  • Crie um conjunto de dados do BigQuery com uma tabela e um esquema.
  • Use um modelo de streaming fornecido pela Google para fazer stream de dados da sua subscrição do Pub/Sub para o BigQuery através do Dataflow.

Custos

Neste documento, usa os seguintes componentes faturáveis da Google Cloud Platform:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Para gerar uma estimativa de custos com base na sua utilização prevista, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

Esta secção mostra-lhe como selecionar um projeto, ativar APIs e conceder as funções adequadas à sua conta de utilizador e à conta de serviço do trabalhador.

Consola

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Para concluir os passos neste tutorial, a sua conta de utilizador tem de ter a função Utilizador da conta de serviço. A conta de serviço predefinida do Compute Engine tem de ter as seguintes funções: trabalhador do Dataflow, administrador do Dataflow, editor do Pub/Sub, administrador de objetos de armazenamento e editor de dados do BigQuery. Para adicionar as funções necessárias na Google Cloud consola:

    1. Na Google Cloud consola, aceda à página IAM.

      Aceda ao IAM
    2. Selecione o seu projeto.
    3. Na linha que contém a sua conta de utilizador, clique em Editar principal, e, de seguida, clique em Adicionar outra função.
    4. Na lista pendente, selecione a função Utilizador da conta de serviço.
    5. Na linha que contém a conta de serviço predefinida do Compute Engine, clique em Editar principal, e, de seguida, clique em Adicionar outra função.
    6. Na lista pendente, selecione a função Dataflow Worker.
    7. Repita o processo para as funções Administrador do Dataflow, Editor do Pub/Sub, Administrador de objetos de armazenamento e Editor de dados do BigQuery e, de seguida, clique em Guardar.

      Para mais informações sobre como conceder funções, consulte o artigo Conceda uma função de IAM através da consola.

gcloud

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  7. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  13. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  14. Conceda funções à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto.
    • PROJECT_NUMBER: o número do seu projeto. Para encontrar o número do projeto, use o comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: cada função individual.

Crie um contentor do Cloud Storage

Comece por criar um contentor do Cloud Storage através da Google Cloud consola ou da Google Cloud CLI. O pipeline do Dataflow usa este contentor como uma localização de armazenamento temporária.

Consola

  1. Na Google Cloud consola, aceda à página Recipientes do Cloud Storage.

    Aceda a Recipientes

  2. Clique em Criar.

  3. Na página Crie um contentor, em Atribua um nome ao contentor, introduza um nome que cumpra os requisitos de nomenclatura de contentores. Os nomes dos contentores do Cloud Storage têm de ser globalmente exclusivos. Não selecione as outras opções.

  4. Clique em Criar.

gcloud

Use o comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME

Substitua BUCKET_NAME por um nome para o seu contentor do Cloud Storage que cumpra os requisitos de nomenclatura de contentores. Os nomes dos contentores do Cloud Storage têm de ser globalmente exclusivos.

Crie um tópico e uma subscrição do Pub/Sub

Crie um tópico Pub/Sub e, em seguida, crie uma subscrição para esse tópico.

Consola

Para criar um tópico, conclua os seguintes passos.

  1. Na Google Cloud consola, aceda à página Tópicos do Pub/Sub.

    Aceder a Tópicos

  2. Clique em Criar tópico.

  3. No campo ID do tópico, introduza um ID para o tópico. Para informações sobre como atribuir um nome a um tópico, consulte as Diretrizes para atribuir um nome a um tópico ou a uma subscrição.

  4. Mantenha a opção Adicionar uma subscrição predefinida. Não selecione as outras opções.

  5. Clique em Criar.

  6. Na página de detalhes do tópico, o nome da subscrição criada é apresentado em ID da subscrição. Tome nota deste valor para os passos posteriores.

gcloud

Para criar um tópico, execute o comando gcloud pubsub topics create. Para obter informações sobre como atribuir um nome a uma subscrição, consulte as diretrizes para atribuir um nome a um tópico ou a uma subscrição.

gcloud pubsub topics create TOPIC_ID

Substitua TOPIC_ID por um nome para o seu tópico Pub/Sub.

Para criar uma subscrição do seu tópico, execute o comando gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Substitua SUBSCRIPTION_ID por um nome para a sua subscrição do Pub/Sub.

Crie uma tabela do BigQuery

Neste passo, cria uma tabela do BigQuery com o seguinte esquema:

Nome da coluna Tipo de dados
name STRING
customer_id INTEGER

Se ainda não tiver um conjunto de dados do BigQuery, crie um primeiro. Para mais informações, consulte o artigo Crie conjuntos de dados. Em seguida, crie uma nova tabela vazia:

Consola

  1. Aceda à página do BigQuery.

    Aceda ao BigQuery

  2. No painel Explorador, expanda o seu projeto e, de seguida, selecione um conjunto de dados.

  3. Na secção de informações do conjunto de dados, clique em Criar tabela.

  4. Na lista Criar tabela a partir de, selecione Tabela vazia.

  5. Na caixa Tabela, introduza o nome da tabela.

  6. Na secção Esquema, clique em Editar como texto.

  7. Cole a seguinte definição do esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Clique em Criar tabela.

gcloud

Use o comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Substitua o seguinte:

  • PROJECT_ID: o ID do seu projeto
  • DATASET_NAME: o nome do conjunto de dados
  • TABLE_NAME: o nome da tabela a criar

Execute o pipeline

Execute um pipeline de streaming através do modelo de subscrição do Pub/Sub para o BigQuery fornecido pela Google. O pipeline recebe dados recebidos do tópico do Pub/Sub e envia os dados para o seu conjunto de dados do BigQuery.

Consola

  1. Na Google Cloud consola, aceda à página Tarefas do Dataflow.

    Aceder a Empregos

  2. Clique em Criar tarefa a partir de modelo.

  3. Introduza um Nome do trabalho para o seu trabalho do Dataflow.

  4. Para Ponto final regional, selecione uma região para a tarefa do Dataflow.

  5. Para Modelo do Dataflow, selecione o modelo Subscrição do Pub/Sub para BigQuery.

  6. Para Tabela de resultados do BigQuery, selecione Procurar e selecione a tabela do BigQuery.

  7. Na lista Subscrição de entrada do Pub/Sub, selecione a subscrição do Pub/Sub.

  8. Para Localização temporária, introduza o seguinte:

    gs://BUCKET_NAME/temp/
    

    Substitua BUCKET_NAME pelo nome do seu contentor do Cloud Storage. A pasta temp armazena ficheiros temporários para as tarefas do Dataflow.

  9. Clique em Executar tarefa.

gcloud

Para executar o modelo na shell ou no terminal, use o comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Substitua as seguintes variáveis:

  • JOB_NAME. um nome para a tarefa
  • DATAFLOW_REGION: uma região para a tarefa
  • PROJECT_ID: o nome do seu projeto da Google Cloud Platform
  • SUBSCRIPTION_ID: o nome da sua subscrição do Pub/Sub
  • DATASET_NAME: o nome do seu conjunto de dados do BigQuery
  • TABLE_NAME: o nome da sua tabela do BigQuery

Publique mensagens no Pub/Sub

Depois de a tarefa do Dataflow ser iniciada, pode publicar mensagens no Pub/Sub, e o pipeline escreve-as no BigQuery.

Consola

  1. Na Google Cloud consola, aceda à página Pub/Sub > Tópicos.

    Aceder a Tópicos

  2. Na lista de tópicos, clique no nome do tópico.

  3. Clique em Mensagens.

  4. Clique em Publicar mensagens.

  5. Para Número de mensagens, introduza 10.

  6. Para Corpo da mensagem, introduza {"name": "Alice", "customer_id": 1}.

  7. Clique em Publicar.

gcloud

Para publicar mensagens no seu tópico, use o comando gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Substitua TOPIC_ID pelo nome do seu tópico.

Veja os resultados

Veja os dados escritos na tabela do BigQuery. Pode demorar até 1 minuto para que os dados comecem a ser apresentados na tabela.

Consola

  1. Na Google Cloud consola, aceda à página BigQuery.
    Aceda à página do BigQuery

  2. No editor de consultas, execute a seguinte consulta:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Substitua as seguintes variáveis:

    • PROJECT_ID: o nome do seu projeto da Google Cloud Platform
    • DATASET_NAME: o nome do seu conjunto de dados do BigQuery
    • TABLE_NAME: o nome da sua tabela do BigQuery

gcloud

Verifique os resultados no BigQuery executando a seguinte consulta:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Substitua as seguintes variáveis:

  • PROJECT_ID: o nome do seu projeto da Google Cloud Platform
  • DATASET_NAME: o nome do seu conjunto de dados do BigQuery
  • TABLE_NAME: o nome da sua tabela do BigQuery

Use uma FDU para transformar os dados

Este tutorial pressupõe que as mensagens do Pub/Sub estão formatadas como JSON e que o esquema da tabela do BigQuery corresponde aos dados JSON.

Opcionalmente, pode fornecer uma função definida pelo utilizador (FDU) em JavaScript que transforma os dados antes de serem escritos no BigQuery. A FDU pode realizar processamento adicional, como filtrar, remover informações de identificação pessoal (IIP) ou enriquecer os dados com campos adicionais.

Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Use uma tabela de mensagens rejeitadas

Enquanto a tarefa está em execução, o pipeline pode não conseguir escrever mensagens individuais no BigQuery. Os possíveis erros incluem:

  • Erros de serialização, incluindo JSON com formato incorreto.
  • Erros de conversão de tipos, causados por uma incompatibilidade no esquema de tabela e nos dados JSON.
  • Campos adicionais nos dados JSON que não estão presentes no esquema de tabela.

O pipeline escreve estes erros numa tabela de mensagens rejeitadas no BigQuery. Por predefinição, o pipeline cria automaticamente uma tabela de mensagens rejeitadas denominada TABLE_NAME_error_records, onde TABLE_NAME é o nome da tabela de saída. Para usar um nome diferente, defina o parâmetro do modelo outputDeadletterTable.

Limpar

Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

Elimine o projeto

A forma mais fácil de eliminar a faturação é eliminar o Google Cloud projeto que criou para o tutorial.

Consola

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimine os recursos individuais

Se quiser reutilizar o projeto mais tarde, pode mantê-lo, mas eliminar os recursos que criou durante o tutorial.

Pare a pipeline do Dataflow

Consola

  1. Na Google Cloud consola, aceda à página Tarefas do Dataflow.

    Aceder a Empregos

  2. Clique na tarefa que quer parar.

    Para parar uma tarefa, o respetivo estado tem de ser em execução.

  3. Na página de detalhes da tarefa, clique em Parar.

  4. Clique em Cancelar.

  5. Para confirmar a sua escolha, clique em Parar tarefa.

gcloud

Para cancelar a tarefa do Dataflow, use o comando gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Limpe os recursos do projeto da Google Cloud Platform

Consola

  1. Elimine o tópico e a subscrição Pub/Sub.

    1. Aceda à página Tópicos do Pub/Sub na Google Cloud consola.

      Aceder a Tópicos

    2. Selecione o tópico que criou.

    3. Clique em Eliminar para eliminar o tópico permanentemente.

    4. Aceda à página Subscrições do Pub/Sub na Google Cloud consola.

      Aceda a Subscrições

    5. Selecione a subscrição criada com o seu tópico.

    6. Clique em Eliminar para eliminar a subscrição permanentemente.

  2. Elimine a tabela e o conjunto de dados do BigQuery.

    1. Na Google Cloud consola, aceda à página BigQuery.

      Aceda ao BigQuery

    2. No painel Explorador, expanda o seu projeto.

    3. Junto ao conjunto de dados que quer eliminar, clique em Ver ações e, de seguida, clique em eliminar.

  3. Elimine o contentor do Cloud Storage.

    1. Na Google Cloud consola, aceda à página Recipientes do Cloud Storage.

      Aceda a Recipientes

    2. Selecione o contentor que quer eliminar, clique em Eliminar e, de seguida, siga as instruções.

gcloud

  1. Para eliminar a subscrição e o tópico do Pub/Sub, use os comandos gcloud pubsub subscriptions delete e gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Para eliminar a tabela do BigQuery, use o comando bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Elimine o conjunto de dados do BigQuery. O conjunto de dados por si só não incorre em custos.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Para eliminar o contentor do Cloud Storage e os respetivos objetos, use o comando gcloud storage rm. O contentor sozinho não incorre em cobranças.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revogue as credenciais

Consola

Se mantiver o projeto, revogue as funções que concedeu à conta de serviço predefinida do Compute Engine.

  1. Na Google Cloud consola, aceda à página IAM.

Aceda ao IAM

  1. Selecione um projeto, uma pasta ou uma organização.

  2. Procure a linha que contém o principal cujo acesso quer revogar. Nessa linha, clique em Editar principal.

  3. Clique no botão Eliminar para cada função que quer revogar e, de seguida, clique em Guardar.

gcloud

  • Se mantiver o projeto, revogue as funções que concedeu à conta de serviço predefinida do Compute Engine. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

O que se segue?