Create an ecommerce streaming pipeline

In this tutorial, you create a Dataflow streaming pipeline that transforms ecommerce data from Pub/Sub topics and subscriptions and outputs the data to BigQuery and Bigtable. This tutorial requires Gradle.

The tutorial provides an end-to-end ecommerce sample application that streams data from a webstore to BigQuery and Bigtable. The sample application illustrates common use cases and best practices for implementing streaming data analytics and real-time artificial intelligence (AI). Use this tutorial to learn how to respond dynamically to customer actions in order to analyze and react to events in real-time. This tutorial describes how to store, analyze, and visualize event data to get more insight into customer behavior.

The sample application is available on GitHub. To run this tutorial using Terraform, follow the steps provided with the sample application on GitHub.

Create the example sources and sinks

This section explains how to create the following:

  • A Cloud Storage bucket to use as a temporary storage location
  • Streaming data sources using Pub/Sub
  • Datasets to load the data into BigQuery
  • A Bigtable instance

Create a Cloud Storage bucket

Begin by creating a Cloud Storage bucket. This bucket is used as a temporary storage location by the Dataflow pipeline.

Use the gcloud storage buckets create command:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Replace the following:

  • BUCKET_NAME: a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.
  • LOCATION: the location for the bucket.

Create Pub/Sub topics and subscriptions

Create four Pub/Sub topics and then create three subscriptions.

To create your topics, run the gcloud pubsub topics create command once for each topic. For information about how to name a subscription, see Guidelines to name a topic or a subscription.

gcloud pubsub topics create TOPIC_NAME

Replace TOPIC_NAME with the following values, running the command four times, once for each topic:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

To create a subscription to your topic, run the gcloud pubsub subscriptions create command once for each subscription:

  1. Create a Clickstream-inbound-sub subscription:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Create a Transactions-inbound-sub subscription:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Create an Inventory-inbound-sub subscription:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Create BigQuery datasets and table

Create a BigQuery dataset and a partitioned table with the appropriate schema for your Pub/Sub topic.

  1. Use the bq mk command to create the first dataset.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Create the second dataset.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Use the CREATE TABLE SQL statement to create a table with a schema and test data. The test data has one store with an ID value of 1. The slow update side input pattern uses this table.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Create a Bigtable instance and table

Create a Bigtable instance and table. For more information about creating Bigtable instances, see Create an instance.

  1. If needed, run the following command to install the cbt CLI:

    gcloud components install cbt
    
  2. Use the bigtable instances create command to create an instance:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Replace CLUSTER_ZONE with the zone where the cluster runs.

  3. Use the cbt createtable command to create a table:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Use the following command to add a column family to the table:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Run the pipeline

Use Gradle to run a streaming pipeline. To view the Java code that the pipeline is using, see RetailDataProcessingPipeline.java.

  1. Use the git clone command to clone the GitHub repository:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Switch to the application directory:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. To test the pipeline, in your shell or terminal, run the following command using Gradle:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. To run the pipeline, run the following command using Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID \
    --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
    

See the pipeline source code on GitHub.

Create and run Cloud Scheduler jobs

Create and run three Cloud Scheduler jobs, one that publishes clickstream data, one for inventory data, and one for transaction data. This step generates sample data for the pipeline.

  1. To create a Cloud Scheduler job for this tutorial, use the gcloud scheduler jobs create command. This step creates a publisher for clickstream data that publishes one message per minute.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. To start the Cloud Scheduler job, use the gcloud scheduler jobs run command.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Create and run another similar publisher for inventory data that publishes one message every two minutes.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Start the second Cloud Scheduler job.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Create and run a third publisher for transaction data that publishes one message every two minutes.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Start the third Cloud Scheduler job.

    gcloud scheduler jobs run --location=LOCATION transactions
    

View your results

View data written to your BigQuery tables. Check the results in BigQuery by running the following queries. While this pipeline is running, you can see new rows appended to the BigQuery tables every minute.

You might need to wait for the tables to populate with data.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'