Fazer streaming de mensagens do Pub/Sub usando o Dataflow e o Cloud Storage
O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:
- ler mensagens publicadas em um tópico do Pub/Sub;
- organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora;
- gravar as mensagens no Cloud Storage.
Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível. Este guia de início rápido também é oferecido como um tutorial do Google Cloud Ensina, que oferece credenciais temporárias para você começar.
Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.
Antes de começar
- Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
-
Instale a CLI do Google Cloud.
-
Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.
-
Para inicializar a gcloud CLI, execute o seguinte comando:
gcloud init -
Crie ou selecione um Google Cloud projeto.
Funções necessárias para selecionar ou criar um projeto
- Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
-
Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos
(
roles/resourcemanager.projectCreator), que contém a permissãoresourcemanager.projects.create. Saiba como conceder papéis.
-
Crie um projeto do Google Cloud :
gcloud projects create PROJECT_ID
Substitua
PROJECT_IDpor um nome para o projeto Google Cloud que você está criando. -
Selecione o projeto Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_IDpelo nome do projeto do Google Cloud .
-
Verifique se o faturamento está ativado para o projeto do Google Cloud .
Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager e Cloud Scheduler:
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (
roles/serviceusage.serviceUsageAdmin), que contém a permissãoserviceusage.services.enable. Saiba como conceder papéis.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configure a autenticação:
-
Verifique se você tem o papel do IAM de criação de contas de serviço
(
roles/iam.serviceAccountCreator) e o papel de administrador do IAM do projeto (roles/resourcemanager.projectIamAdmin). Saiba como conceder papéis. -
Crie a conta de serviço:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Substitua
SERVICE_ACCOUNT_NAMEpor um nome para a conta de serviço. -
Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Substitua:
SERVICE_ACCOUNT_NAME: o nome da conta de serviço.PROJECT_ID: o ID do projeto em que você criou a conta de serviçoROLE: o papel a ser concedido
-
Conceda ao principal o papel necessário para anexar a conta de serviço a outros recursos.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Substitua:
SERVICE_ACCOUNT_NAME: o nome da conta de serviço.PROJECT_ID: o ID do projeto em que você criou a conta de serviçoUSER_EMAIL: o endereço de e-mail de uma Conta do Google
-
Verifique se você tem o papel do IAM de criação de contas de serviço
(
-
Instale a CLI do Google Cloud.
-
Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.
-
Para inicializar a gcloud CLI, execute o seguinte comando:
gcloud init -
Crie ou selecione um Google Cloud projeto.
Funções necessárias para selecionar ou criar um projeto
- Selecionar um projeto: não é necessário um papel específico do IAM para selecionar um projeto. Você pode escolher qualquer projeto em que tenha recebido um papel.
-
Criar um projeto: para criar um projeto, é necessário ter o papel de Criador de projetos
(
roles/resourcemanager.projectCreator), que contém a permissãoresourcemanager.projects.create. Saiba como conceder papéis.
-
Crie um projeto do Google Cloud :
gcloud projects create PROJECT_ID
Substitua
PROJECT_IDpor um nome para o projeto Google Cloud que você está criando. -
Selecione o projeto Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_IDpelo nome do projeto do Google Cloud .
-
Verifique se o faturamento está ativado para o projeto do Google Cloud .
Ative as APIs Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager e Cloud Scheduler:
Funções necessárias para ativar APIs
Para ativar as APIs, é necessário ter o papel do IAM de administrador do Service Usage (
roles/serviceusage.serviceUsageAdmin), que contém a permissãoserviceusage.services.enable. Saiba como conceder papéis.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configure a autenticação:
-
Verifique se você tem o papel do IAM de criação de contas de serviço
(
roles/iam.serviceAccountCreator) e o papel de administrador do IAM do projeto (roles/resourcemanager.projectIamAdmin). Saiba como conceder papéis. -
Crie a conta de serviço:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Substitua
SERVICE_ACCOUNT_NAMEpor um nome para a conta de serviço. -
Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Substitua:
SERVICE_ACCOUNT_NAME: o nome da conta de serviço.PROJECT_ID: o ID do projeto em que você criou a conta de serviçoROLE: o papel a ser concedido
-
Conceda ao principal o papel necessário para anexar a conta de serviço a outros recursos.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Substitua:
SERVICE_ACCOUNT_NAME: o nome da conta de serviço.PROJECT_ID: o ID do projeto em que você criou a conta de serviçoUSER_EMAIL: o endereço de e-mail de uma Conta do Google
-
Verifique se você tem o papel do IAM de criação de contas de serviço
(
-
Crie credenciais de autenticação local para sua conta de usuário:
gcloud auth application-default login
Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.
Configurar seu projeto do Pub/Sub
-
Crie variáveis para o bucket, o projeto e a região. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. Selecione uma região do Dataflow perto de onde você executa os comandos neste guia de início rápido. O valor da variável
REGIONprecisa ser um nome de região válido. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crie um bucket do Cloud Storage que pertença a este projeto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crie um tópico do Pub/Sub neste projeto:
gcloud pubsub topics create $TOPIC_ID
-
Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Pub/Sub em intervalos de um minuto.
Esta etapa criará um aplicativo do App Engine para o projeto, se já não houver um.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Inicie o job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Use os seguintes comandos para clonar o repositório do guia de início rápido e navegar até o diretório do código de amostra:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Fazer streaming de mensagens do Pub/Sub para o Cloud Storage
Exemplo de código
Este exemplo de código usa o Dataflow para:
- Leia as mensagens do Pub/Sub.
- organizar em janelas (ou agrupar) mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
Grave as mensagens em cada janela nos arquivos no Cloud Storage.
Java
Python
Iniciar o pipeline
Para iniciar o pipeline, execute o seguinte comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem. Quando o comando retornar JOB_MESSAGE_DETAILED: Workers
have started successfully, saia do programa local usando Ctrl+C.
Observar o andamento do job e do pipeline
Observe o progresso do job no console do Dataflow.
Abra a visualização de detalhes do job para ver:
- a estrutura do job;
- os registros da tarefa;
- as métricas do cenário.
Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.
Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.
gcloud storage ls gs://${BUCKET_NAME}/samples/
A saída será semelhante a esta:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1Limpar
Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud e os recursos.
Exclua o job do Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.
Exclua o tópico.
gcloud pubsub topics delete $TOPIC_ID
Exclua os arquivos criados pelo pipeline.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Remova o bucket do Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Exclua a conta de serviço:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:
gcloud auth application-default revoke
-
Opcional: revogar credenciais da CLI gcloud.
gcloud auth revoke
A seguir
Se você quiser organizar em janelas as mensagens do Pub/Sub por um carimbo de data/hora personalizado, especifique esse carimbo como um atributo na mensagem do Pub/Sub e use-o com PubsubIO's
withTimestampAttribute.Confira os modelos do Dataflow de código aberto projetados para streaming.
Leia mais sobre como o Dataflow se integra ao Pub/Sub.
Confira este tutorial que lê do Pub/Sub e grava no BigQuery usando modelos Flex do Dataflow.
Para saber mais sobre janelas, consulte o exemplo Pipeline de jogos para dispositivos móveis do Apache Beam (em inglês).