Crea una pipeline di flussi di e-commerce

In questo tutorial, crei una pipeline di streaming Dataflow che trasforma i dati di e-commerce da argomenti e sottoscrizioni Pub/Sub e li invia a BigQuery e Bigtable. Questo tutorial richiede Gradle.

Il tutorial fornisce un'applicazione di esempio di e-commerce end-to-end che trasmette dati da un negozio online a BigQuery e Bigtable. L'applicazione di esempio illustra i casi d'uso comuni e le best practice per l'implementazione dell'analisi dei dati di streaming e dell'intelligenza artificiale (AI) in tempo reale. Utilizza questo tutorial per scoprire come rispondere in modo dinamico alle azioni dei clienti per analizzare e reagire agli eventi in tempo reale. Questo tutorial descrive come archiviare, analizzare e visualizzare i dati sugli eventi per ottenere maggiori informazioni sul comportamento dei clienti.

L'applicazione di esempio è disponibile su GitHub. Per eseguire questo tutorial utilizzando Terraform, segui i passaggi forniti con l'applicazione di esempio su GitHub.

Obiettivi

  • Convalida i dati in entrata e applica le correzioni, se possibile.
  • Analizza i dati clickstream per tenere traccia del numero di visualizzazioni per prodotto in un determinato periodo di tempo. Memorizza queste informazioni in un archivio a bassa latenza. L'applicazione può quindi utilizzare i dati per fornire messaggi ai clienti sul sito web, ad esempio il numero di persone che hanno visualizzato questo prodotto.
  • Utilizza i dati sulle transazioni per informare l'ordine di inventario:

    • Analizza i dati sulle transazioni per calcolare il numero totale di vendite per ogni articolo, sia per negozio che a livello globale, per un determinato periodo.
    • Analizza i dati di inventario per calcolare l'inventario in entrata per ogni articolo.
    • Trasferisci questi dati ai sistemi di inventario in modo continuo, in modo che possano essere utilizzati per le decisioni di acquisto dell'inventario.
  • Convalida i dati in entrata e applica le correzioni, se possibile. Scrivi i dati non correggibili in una coda di messaggi non recapitabili per ulteriori analisi e l'elaborazione. Crea una metrica che rappresenti la percentuale di dati in entrata inviati alla coda dei messaggi non recapitabili disponibile per il monitoraggio e gli avvisi.

  • Elabora tutti i dati in entrata in un formato standard e archiviali in un data warehouse da utilizzare per analisi e visualizzazioni future.

  • Denormalizza i dati delle transazioni per le vendite in negozio in modo che possano includere informazioni come la latitudine e la longitudine della posizione del negozio. Fornisci le informazioni sul negozio tramite una tabella a variazione lenta in BigQuery, utilizzando l'ID negozio come chiave.

Dati

L'applicazione elabora i seguenti tipi di dati:

  • Dati clickstream inviati dai sistemi online a Pub/Sub.
  • Dati delle transazioni inviati da sistemi on-premise o Software as a Service (SaaS) a Pub/Sub.
  • Dati di inventario inviati da sistemi on-premise o SaaS a Pub/Sub.

Pattern delle attività

L'applicazione contiene i seguenti pattern di attività comuni alle pipeline create con l'SDK Apache Beam per Java:

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per saperne di più, consulta Esegui la pulizia.

Prima di iniziare

  1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. Installa Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l'autorizzazione resourcemanager.projects.create. Scopri come concedere i ruoli.
    • Creare un progetto Google Cloud :

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del progetto Google Cloud .

  6. Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .

  7. Abilita le API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin e Cloud Scheduler:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  8. Crea credenziali di autenticazione locali per il tuo account utente:

    gcloud auth application-default login

    Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.

  9. Concedi ruoli al tuo account utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • USER_IDENTIFIER: l'identificatore del tuo account utente . Ad esempio: myemail@example.com.
    • ROLE: il ruolo IAM che concedi al tuo account utente.
  10. Installa Google Cloud CLI.

  11. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  12. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  13. Crea o seleziona un Google Cloud progetto.

    Ruoli richiesti per selezionare o creare un progetto

    • Seleziona un progetto: la selezione di un progetto non richiede un ruolo IAM specifico. Puoi selezionare qualsiasi progetto per il quale ti è stato concesso un ruolo.
    • Crea un progetto: per creare un progetto, devi disporre del ruolo Autore progetto (roles/resourcemanager.projectCreator), che contiene l'autorizzazione resourcemanager.projects.create. Scopri come concedere i ruoli.
    • Creare un progetto Google Cloud :

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del progetto Google Cloud .

  14. Verifica che la fatturazione sia abilitata per il tuo progetto Google Cloud .

  15. Abilita le API Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin e Cloud Scheduler:

    Ruoli richiesti per abilitare le API

    Per abilitare le API, devi disporre del ruolo IAM Amministratore utilizzo dei servizi (roles/serviceusage.serviceUsageAdmin), che include l'autorizzazione serviceusage.services.enable. Scopri come concedere i ruoli.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  16. Crea credenziali di autenticazione locali per il tuo account utente:

    gcloud auth application-default login

    Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.

  17. Concedi ruoli al tuo account utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • USER_IDENTIFIER: l'identificatore del tuo account utente . Ad esempio: myemail@example.com.
    • ROLE: il ruolo IAM che concedi al tuo account utente.
  18. Crea un account di servizio worker gestito dall'utente per la nuova pipeline e concedi i ruoli necessari alaccount di serviziot.

    1. Per creare il account di servizio, esegui il comando gcloud iam service-accounts create:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Concedi ruoli al account di servizio. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      • roles/bigquery.jobUser
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.

    3. Concedi al tuo Account Google un ruolo che ti consenta di creare token di accesso per il account di servizio:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  19. Se necessario, scarica e installa Gradle.

Crea le origini e i sink di esempio

Questa sezione spiega come creare:

  • Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea
  • Origini dei flussi di dati che utilizzano Pub/Sub
  • Set di dati in cui caricare i dati in BigQuery
  • Un'istanza Bigtable

Crea un bucket Cloud Storage

Inizia creando un bucket Cloud Storage. Questo bucket viene utilizzato come posizione di archiviazione temporanea dalla pipeline Dataflow.

Utilizza il comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Sostituisci quanto segue:

Crea argomenti e sottoscrizioni Pub/Sub

Crea quattro argomenti Pub/Sub e poi tre sottoscrizioni.

Per creare gli argomenti, esegui il comando gcloud pubsub topics create una volta per ogni argomento. Per informazioni su come denominare un abbonamento, vedi Linee guida per assegnare un nome a un argomento o a un abbonamento.

gcloud pubsub topics create TOPIC_NAME

Sostituisci TOPIC_NAME con i seguenti valori, eseguendo il comando quattro volte, una volta per ogni argomento:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Per creare una sottoscrizione all'argomento, esegui il comando gcloud pubsub subscriptions create una volta per ogni sottoscrizione:

  1. Crea un abbonamento a Clickstream-inbound-sub:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Crea un abbonamento a Transactions-inbound-sub:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Crea un abbonamento a Inventory-inbound-sub:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Crea set di dati e tabella BigQuery

Crea un set di dati BigQuery e una tabella partizionata con lo schema appropriato per l'argomento Pub/Sub.

  1. Utilizza il comando bq mk per creare il primo set di dati.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Crea il secondo set di dati.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Utilizza l'istruzione SQL CREATE TABLE per creare una tabella con uno schema e dati di test. I dati di test hanno un datastore con un valore ID pari a 1. Il pattern di input aggiuntivi di aggiornamento lento utilizza questa tabella.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Crea un'istanza e una tabella Bigtable

Crea un'istanza e una tabella Bigtable. Per saperne di più sulla creazione di istanze Bigtable, consulta Crea un'istanza.

  1. Se necessario, esegui il seguente comando per installare la CLI cbt:

    gcloud components install cbt
    
  2. Utilizza il comando bigtable instances create per creare un'istanza:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Sostituisci CLUSTER_ZONE con la zona in cui viene eseguito il cluster.

  3. Utilizza il comando cbt createtable per creare una tabella:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Utilizza il seguente comando per aggiungere una famiglia di colonne alla tabella:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

esegui la pipeline.

Utilizza Gradle per eseguire una pipeline di streaming. Per visualizzare il codice Java utilizzato dalla pipeline, vedi RetailDataProcessingPipeline.java.

  1. Utilizza il comando git clone per clonare il repository GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Passa alla directory dell'applicazione:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Per testare la pipeline, esegui questo comando utilizzando Gradle nella shell o nel terminale:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Per eseguire la pipeline, esegui il comando seguente utilizzando Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID \
    --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
    

Visualizza il codice sorgente della pipeline su GitHub.

Crea ed esegui job Cloud Scheduler

Crea ed esegui tre job Cloud Scheduler, uno che pubblica i dati clickstream, uno per i dati di inventario e uno per i dati delle transazioni. Questo passaggio genera dati di esempio per la pipeline.

  1. Per creare un job Cloud Scheduler per questo tutorial, utilizza il comando gcloud scheduler jobs create. Questo passaggio crea un publisher per i dati clickstream che pubblica un messaggio al minuto.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Per avviare il job Cloud Scheduler, utilizza il comando gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Crea ed esegui un altro publisher simile per i dati dell'inventario che pubblica un messaggio ogni due minuti.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Avvia il secondo job Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Crea ed esegui un terzo publisher per i dati sulle transazioni che pubblica un messaggio ogni due minuti.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Avvia il terzo job Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Visualizza i tuoi risultati

Visualizza i dati scritti nelle tabelle BigQuery. Controlla i risultati in BigQuery eseguendo le seguenti query. Mentre la pipeline è in esecuzione, puoi vedere le nuove righe aggiunte alle tabelle BigQuery ogni minuto.

Potresti dover attendere il completamento delle tabelle con i dati.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Esegui la pulizia

Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per il tutorial.

  1. Nella console Google Cloud , vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona quello che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Elimina le singole risorse

Se vuoi riutilizzare il progetto, elimina le risorse che hai creato per il tutorial.

Libera spazio per le risorse del progetto Google Cloud

  1. Per eliminare i job Cloud Scheduler, utilizza il comando gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Per eliminare gli argomenti e le sottoscrizioni Pub/Sub, utilizza i comandi gcloud pubsub subscriptions delete e gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Per eliminare la tabella BigQuery, utilizza il comando bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Elimina i set di dati BigQuery. Il solo set di dati non comporta costi.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Per eliminare l'istanza Bigtable, utilizza il comando cbt deleteinstance. Il solo bucket non comporta alcun addebito.

    cbt deleteinstance aggregate-tables
    
  6. Per eliminare il bucket Cloud Storage e i relativi oggetti, utilizza il comando gcloud storage rm. Il solo bucket non comporta alcun addebito.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revocare le credenziali

  1. Revoca i ruoli che hai concesso al account di servizio worker gestito dall'utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    • roles/bigquery.jobUser
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. (Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  3. (Facoltativo) Revoca le credenziali da gcloud CLI.

    gcloud auth revoke

Passaggi successivi