Neste tutorial, vai criar um pipeline de streaming do Dataflow que transforma dados de comércio eletrónico de tópicos e subscrições do Pub/Sub e envia os dados para o BigQuery e o Bigtable. Este tutorial requer o Gradle.
O tutorial fornece uma aplicação de exemplo de comércio eletrónico integral que transmite dados de uma loja Web para o BigQuery e o Bigtable. A aplicação de exemplo ilustra exemplos de utilização comuns e práticas recomendadas para implementar a análise de dados de streaming e a inteligência artificial (IA) em tempo real. Use este tutorial para saber como responder dinamicamente às ações dos clientes para analisar e reagir a eventos em tempo real. Este tutorial descreve como armazenar, analisar e visualizar dados de eventos para obter mais estatísticas sobre o comportamento dos clientes.
A aplicação de exemplo está disponível no GitHub. Para executar este tutorial com o Terraform, siga os passos fornecidos com a aplicação de exemplo no GitHub.
Objetivos
- Validar os dados recebidos e aplicar-lhes correções sempre que possível.
- Analise os dados de fluxo de cliques para manter uma contagem do número de visualizações por produto num determinado período. Armazenar estas informações num armazenamento de baixa latência. A aplicação pode, em seguida, usar os dados para fornecer mensagens do tipo número de pessoas que viram este produto aos clientes no Website.
Use os dados de transações para informar a encomenda de inventário:
- Analise os dados de transações para calcular o número total de vendas de cada artigo, tanto por loja como globalmente, durante um determinado período.
- Analise os dados do inventário para calcular o inventário recebido de cada item.
- Transmita estes dados aos sistemas de inventário de forma contínua para que possam ser usados para decisões de compra de inventário.
Validar os dados recebidos e aplicar-lhes correções sempre que possível. Escrever dados não corrigíveis numa fila de mensagens rejeitadas para análise e processamento adicionais. Crie uma métrica que represente a percentagem de dados recebidos que são enviados para a fila de mensagens rejeitadas disponível para monitorização e alertas.
Processar todos os dados recebidos num formato padrão e armazená-los num data warehouse para utilização em análises e visualizações futuras.
Desnormalizar os dados de transações para vendas em lojas, de modo que possam incluir informações como a latitude e a longitude da localização da loja. Forneça as informações da loja através de uma tabela de alteração lenta no BigQuery, usando o ID da loja como chave.
Dados
A aplicação processa os seguintes tipos de dados:
- Dados de fluxo de cliques enviados por sistemas online para o Pub/Sub.
- Dados de transações enviados por sistemas locais ou de software como serviço (SaaS) para o Pub/Sub.
- Dados de inventário enviados por sistemas locais ou SaaS para o Pub/Sub.
Padrões de tarefas
A aplicação contém os seguintes padrões de tarefas comuns a pipelines criados com o SDK Apache Beam para Java:
- Esquemas do Apache Beam para trabalhar com dados estruturados
JsonToRowpara converter dados JSON- O gerador de código
AutoValuepara gerar objetos Java simples (POJOs) - Colocar dados não processáveis em fila para análise posterior
- Transformações de validação de dados em série
DoFn.StartBundlepara agrupar chamadas em micro lotes para serviços externos- Padrões de entrada lateral
Custos
Neste documento, usa os seguintes componentes faturáveis da Google Cloud Platform:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Para gerar uma estimativa de custos com base na sua utilização prevista,
use a calculadora de preços.
Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.
Antes de começar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init -
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init -
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Crie uma conta de serviço de worker gerida pelo utilizador para o novo pipeline e conceda as funções necessárias à conta de serviço.
Para criar a conta de serviço, execute o comando
gcloud iam service-accounts create:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Conceda funções à conta de serviço. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
roles/dataflow.adminroles/dataflow.workerroles/pubsub.editorroles/bigquery.dataEditorroles/bigtable.adminroles/bigquery.jobUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Substitua
SERVICE_ACCOUNT_ROLEpor cada função individual.Conceda à sua Conta Google uma função que lhe permita criar chaves de acesso para a conta de serviço:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Se necessário, transfira e instale o Gradle.
Crie as origens e os destinos de exemplo
Esta secção explica como criar o seguinte:
- Um contentor do Cloud Storage para usar como localização de armazenamento temporário
- Origens de dados de streaming com o Pub/Sub
- Conjuntos de dados para carregar os dados no BigQuery
- Uma instância do Bigtable
Crie um contentor do Cloud Storage
Comece por criar um contentor do Cloud Storage. Este contentor é usado como uma localização de armazenamento temporário pelo pipeline do Dataflow.
Use o comando
gcloud storage buckets create:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Substitua o seguinte:
- BUCKET_NAME: um nome para o seu contentor do Cloud Storage que cumpre os requisitos de nomenclatura de contentores. Os nomes dos contentores do Cloud Storage têm de ser globalmente exclusivos.
- LOCATION: a localização do contentor.
Crie tópicos e subscrições do Pub/Sub
Crie quatro tópicos do Pub/Sub e, em seguida, crie três subscrições.
Para criar os seus tópicos, execute o comando
gcloud pubsub topics create
uma vez para cada tópico. Para obter informações sobre como atribuir um nome a uma subscrição, consulte as
diretrizes para atribuir um nome a um tópico ou a uma subscrição.
gcloud pubsub topics create TOPIC_NAME
Substitua TOPIC_NAME pelos seguintes valores, executando o comando quatro vezes, uma vez para cada tópico:
Clickstream-inboundTransactions-inboundInventory-inboundInventory-outbound
Para criar uma subscrição para o seu tópico, execute o comando
gcloud pubsub subscriptions create
uma vez para cada subscrição:
Crie uma subscrição
Clickstream-inbound-sub:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-subCrie uma subscrição
Transactions-inbound-sub:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-subCrie uma subscrição do
Inventory-inbound-sub:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Crie conjuntos de dados e tabelas do BigQuery
Crie um conjunto de dados do BigQuery e uma tabela particionada com o esquema adequado para o seu tópico do Pub/Sub.
Use o comando
bq mkpara criar o primeiro conjunto de dados.bq --location=US mk \ PROJECT_ID:Retail_StoreCrie o segundo conjunto de dados.
bq --location=US mk \ PROJECT_ID:Retail_Store_AggregationsUse a declaração SQL CREATE TABLE para criar uma tabela com um esquema e dados de teste. Os dados de teste têm uma loja com um valor de ID de
1. O padrão de entrada lateral de atualização lenta usa esta tabela.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Crie uma instância e uma tabela do Bigtable
Crie uma instância e uma tabela do Bigtable. Para mais informações sobre como criar instâncias do Bigtable, consulte o artigo Crie uma instância.
Se necessário, execute o seguinte comando para instalar a CLI
cbt:gcloud components install cbtUse o comando
bigtable instances createpara criar uma instância:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1Substitua CLUSTER_ZONE pela zona onde o cluster é executado.
Use o comando
cbt createtablepara criar uma tabela:cbt -instance=aggregate-tables createtable PageView5MinAggregatesUse o seguinte comando para adicionar uma família de colunas à tabela:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Execute o pipeline
Use o Gradle para executar um pipeline de streaming. Para ver o código Java que o pipeline está a usar, consulte RetailDataProcessingPipeline.java.
Use o comando
git clonepara clonar o repositório do GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.gitMude para o diretório da aplicação:
cd dataflow-sample-applications/retail/retail-java-applicationsPara testar o pipeline, na shell ou no terminal, execute o seguinte comando com o Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasksPara executar o pipeline, execute o seguinte comando com o Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID \ --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
Consulte o código fonte do pipeline no GitHub.
Crie e execute tarefas do Cloud Scheduler
Crie e execute três tarefas do Cloud Scheduler, uma que publica dados de fluxo de cliques, uma para dados de inventário e uma para dados de transações. Este passo gera dados de exemplo para o pipeline.
Para criar uma tarefa do Cloud Scheduler para este tutorial, use o comando
gcloud scheduler jobs create. Este passo cria um publicador para dados de fluxo de cliques que publica uma mensagem por minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'Para iniciar a tarefa do Cloud Scheduler, use o comando
gcloud scheduler jobs run.gcloud scheduler jobs run --location=LOCATION clickstreamCrie e execute outro publicador semelhante para dados de inventário que publica uma mensagem a cada dois minutos.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'Inicie a segunda tarefa do Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventoryCrie e execute um terceiro publicador para dados de transações que publica uma mensagem a cada dois minutos.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'Inicie a terceira tarefa do Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Veja os resultados
Ver os dados escritos nas tabelas do BigQuery. Verifique os resultados no BigQuery executando as seguintes consultas. Enquanto este pipeline estiver em execução, pode ver novas linhas anexadas às tabelas do BigQuery a cada minuto.
Pode ter de aguardar que as tabelas sejam preenchidas com dados.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Limpar
Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.
Elimine o projeto
A forma mais fácil de eliminar a faturação é eliminar o Google Cloud projeto que criou para o tutorial.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimine os recursos individuais
Se quiser reutilizar o projeto, elimine os recursos que criou para o tutorial.
Limpe os recursos do projeto da Google Cloud Platform
Para eliminar as tarefas do Cloud Scheduler, use o comando
gcloud scheduler jobs delete.gcloud scheduler jobs delete transactions --location=LOCATIONgcloud scheduler jobs delete inventory --location=LOCATIONgcloud scheduler jobs delete clickstream --location=LOCATIONPara eliminar as subscrições e os tópicos do Pub/Sub, use os comandos
gcloud pubsub subscriptions deleteegcloud pubsub topics delete.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAMEPara eliminar a tabela do BigQuery, use o comando
bq rm.bq rm -f -t PROJECT_ID:Retail_Store.Store_LocationsElimine os conjuntos de dados do BigQuery. O conjunto de dados por si só não incorre em custos.
bq rm -r -f -d PROJECT_ID:Retail_Storebq rm -r -f -d PROJECT_ID:Retail_Store_AggregationsPara eliminar a instância do Bigtable, use o comando
cbt deleteinstance. O contentor por si só não incorre em custos.cbt deleteinstance aggregate-tablesPara eliminar o contentor do Cloud Storage e os respetivos objetos, use o comando
gcloud storage rm. O contentor por si só não incorre em custos.gcloud storage rm gs://BUCKET_NAME --recursive
Revogue as credenciais
Revogue as funções que concedeu à conta de serviço do trabalhador gerida pelo utilizador. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:
roles/dataflow.adminroles/dataflow.workerroles/pubsub.editorroles/bigquery.dataEditorroles/bigtable.adminroles/bigquery.jobUser
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
O que se segue?
- Veja a aplicação de exemplo no GitHub.
- Leia a publicação no blogue relacionada Saiba mais sobre os padrões de feixes com o processamento de fluxo de cliques dos dados do Gestor de Etiquetas da Google.
- Leia acerca da utilização do Pub/Sub para criar e usar tópicos e para usar subscrições.
- Leia acerca da utilização do BigQuery para criar conjuntos de dados.
- Explore arquiteturas de referência, diagramas e práticas recomendadas sobre o Google Cloud. Consulte o nosso Centro de arquitetura na nuvem.