Crie um pipeline de streaming de comércio eletrónico

Neste tutorial, vai criar um pipeline de streaming do Dataflow que transforma dados de comércio eletrónico de tópicos e subscrições do Pub/Sub e envia os dados para o BigQuery e o Bigtable. Este tutorial requer o Gradle.

O tutorial fornece uma aplicação de exemplo de comércio eletrónico integral que transmite dados de uma loja Web para o BigQuery e o Bigtable. A aplicação de exemplo ilustra exemplos de utilização comuns e práticas recomendadas para implementar a análise de dados de streaming e a inteligência artificial (IA) em tempo real. Use este tutorial para saber como responder dinamicamente às ações dos clientes para analisar e reagir a eventos em tempo real. Este tutorial descreve como armazenar, analisar e visualizar dados de eventos para obter mais estatísticas sobre o comportamento dos clientes.

A aplicação de exemplo está disponível no GitHub. Para executar este tutorial com o Terraform, siga os passos fornecidos com a aplicação de exemplo no GitHub.

Objetivos

  • Validar os dados recebidos e aplicar-lhes correções sempre que possível.
  • Analise os dados de fluxo de cliques para manter uma contagem do número de visualizações por produto num determinado período. Armazenar estas informações num armazenamento de baixa latência. A aplicação pode, em seguida, usar os dados para fornecer mensagens do tipo número de pessoas que viram este produto aos clientes no Website.
  • Use os dados de transações para informar a encomenda de inventário:

    • Analise os dados de transações para calcular o número total de vendas de cada artigo, tanto por loja como globalmente, durante um determinado período.
    • Analise os dados do inventário para calcular o inventário recebido de cada item.
    • Transmita estes dados aos sistemas de inventário de forma contínua para que possam ser usados para decisões de compra de inventário.
  • Validar os dados recebidos e aplicar-lhes correções sempre que possível. Escrever dados não corrigíveis numa fila de mensagens rejeitadas para análise e processamento adicionais. Crie uma métrica que represente a percentagem de dados recebidos que são enviados para a fila de mensagens rejeitadas disponível para monitorização e alertas.

  • Processar todos os dados recebidos num formato padrão e armazená-los num data warehouse para utilização em análises e visualizações futuras.

  • Desnormalizar os dados de transações para vendas em lojas, de modo que possam incluir informações como a latitude e a longitude da localização da loja. Forneça as informações da loja através de uma tabela de alteração lenta no BigQuery, usando o ID da loja como chave.

Dados

A aplicação processa os seguintes tipos de dados:

  • Dados de fluxo de cliques enviados por sistemas online para o Pub/Sub.
  • Dados de transações enviados por sistemas locais ou de software como serviço (SaaS) para o Pub/Sub.
  • Dados de inventário enviados por sistemas locais ou SaaS para o Pub/Sub.

Padrões de tarefas

A aplicação contém os seguintes padrões de tarefas comuns a pipelines criados com o SDK Apache Beam para Java:

Custos

Neste documento, usa os seguintes componentes faturáveis da Google Cloud Platform:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Para gerar uma estimativa de custos com base na sua utilização prevista, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação sem custo financeiro.

Quando terminar as tarefas descritas neste documento, pode evitar a faturação contínua eliminando os recursos que criou. Para mais informações, consulte o artigo Limpe.

Antes de começar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  12. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example, myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Crie uma conta de serviço de worker gerida pelo utilizador para o novo pipeline e conceda as funções necessárias à conta de serviço.

    1. Para criar a conta de serviço, execute o comando gcloud iam service-accounts create:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Conceda funções à conta de serviço. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      • roles/bigquery.jobUser
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Substitua SERVICE_ACCOUNT_ROLE por cada função individual.

    3. Conceda à sua Conta Google uma função que lhe permita criar chaves de acesso para a conta de serviço:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  19. Se necessário, transfira e instale o Gradle.

Crie as origens e os destinos de exemplo

Esta secção explica como criar o seguinte:

  • Um contentor do Cloud Storage para usar como localização de armazenamento temporário
  • Origens de dados de streaming com o Pub/Sub
  • Conjuntos de dados para carregar os dados no BigQuery
  • Uma instância do Bigtable

Crie um contentor do Cloud Storage

Comece por criar um contentor do Cloud Storage. Este contentor é usado como uma localização de armazenamento temporário pelo pipeline do Dataflow.

Use o comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Substitua o seguinte:

Crie tópicos e subscrições do Pub/Sub

Crie quatro tópicos do Pub/Sub e, em seguida, crie três subscrições.

Para criar os seus tópicos, execute o comando gcloud pubsub topics create uma vez para cada tópico. Para obter informações sobre como atribuir um nome a uma subscrição, consulte as diretrizes para atribuir um nome a um tópico ou a uma subscrição.

gcloud pubsub topics create TOPIC_NAME

Substitua TOPIC_NAME pelos seguintes valores, executando o comando quatro vezes, uma vez para cada tópico:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Para criar uma subscrição para o seu tópico, execute o comando gcloud pubsub subscriptions create uma vez para cada subscrição:

  1. Crie uma subscrição Clickstream-inbound-sub:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Crie uma subscrição Transactions-inbound-sub:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Crie uma subscrição do Inventory-inbound-sub:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Crie conjuntos de dados e tabelas do BigQuery

Crie um conjunto de dados do BigQuery e uma tabela particionada com o esquema adequado para o seu tópico do Pub/Sub.

  1. Use o comando bq mk para criar o primeiro conjunto de dados.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Crie o segundo conjunto de dados.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Use a declaração SQL CREATE TABLE para criar uma tabela com um esquema e dados de teste. Os dados de teste têm uma loja com um valor de ID de 1. O padrão de entrada lateral de atualização lenta usa esta tabela.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Crie uma instância e uma tabela do Bigtable

Crie uma instância e uma tabela do Bigtable. Para mais informações sobre como criar instâncias do Bigtable, consulte o artigo Crie uma instância.

  1. Se necessário, execute o seguinte comando para instalar a CLI cbt:

    gcloud components install cbt
    
  2. Use o comando bigtable instances create para criar uma instância:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Substitua CLUSTER_ZONE pela zona onde o cluster é executado.

  3. Use o comando cbt createtable para criar uma tabela:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Use o seguinte comando para adicionar uma família de colunas à tabela:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Execute o pipeline

Use o Gradle para executar um pipeline de streaming. Para ver o código Java que o pipeline está a usar, consulte RetailDataProcessingPipeline.java.

  1. Use o comando git clone para clonar o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Mude para o diretório da aplicação:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Para testar o pipeline, na shell ou no terminal, execute o seguinte comando com o Gradle:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Para executar o pipeline, execute o seguinte comando com o Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID \
    --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
    

Consulte o código fonte do pipeline no GitHub.

Crie e execute tarefas do Cloud Scheduler

Crie e execute três tarefas do Cloud Scheduler, uma que publica dados de fluxo de cliques, uma para dados de inventário e uma para dados de transações. Este passo gera dados de exemplo para o pipeline.

  1. Para criar uma tarefa do Cloud Scheduler para este tutorial, use o comando gcloud scheduler jobs create. Este passo cria um publicador para dados de fluxo de cliques que publica uma mensagem por minuto.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Para iniciar a tarefa do Cloud Scheduler, use o comando gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Crie e execute outro publicador semelhante para dados de inventário que publica uma mensagem a cada dois minutos.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Inicie a segunda tarefa do Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Crie e execute um terceiro publicador para dados de transações que publica uma mensagem a cada dois minutos.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Inicie a terceira tarefa do Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Veja os resultados

Ver os dados escritos nas tabelas do BigQuery. Verifique os resultados no BigQuery executando as seguintes consultas. Enquanto este pipeline estiver em execução, pode ver novas linhas anexadas às tabelas do BigQuery a cada minuto.

Pode ter de aguardar que as tabelas sejam preenchidas com dados.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Limpar

Para evitar incorrer em custos na sua conta do Google Cloud pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

Elimine o projeto

A forma mais fácil de eliminar a faturação é eliminar o Google Cloud projeto que criou para o tutorial.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimine os recursos individuais

Se quiser reutilizar o projeto, elimine os recursos que criou para o tutorial.

Limpe os recursos do projeto da Google Cloud Platform

  1. Para eliminar as tarefas do Cloud Scheduler, use o comando gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Para eliminar as subscrições e os tópicos do Pub/Sub, use os comandos gcloud pubsub subscriptions delete e gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Para eliminar a tabela do BigQuery, use o comando bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Elimine os conjuntos de dados do BigQuery. O conjunto de dados por si só não incorre em custos.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Para eliminar a instância do Bigtable, use o comando cbt deleteinstance. O contentor por si só não incorre em custos.

    cbt deleteinstance aggregate-tables
    
  6. Para eliminar o contentor do Cloud Storage e os respetivos objetos, use o comando gcloud storage rm. O contentor por si só não incorre em custos.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revogue as credenciais

  1. Revogue as funções que concedeu à conta de serviço do trabalhador gerida pelo utilizador. Execute o seguinte comando uma vez para cada uma das seguintes funções do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    • roles/bigquery.jobUser
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

O que se segue?