In this tutorial, you create a Dataflow streaming pipeline that transforms ecommerce data from Pub/Sub topics and subscriptions and outputs the data to BigQuery and Bigtable. This tutorial requires Gradle.
The tutorial provides an end-to-end ecommerce sample application that streams data from a webstore to BigQuery and Bigtable. The sample application illustrates common use cases and best practices for implementing streaming data analytics and real-time artificial intelligence (AI). Use this tutorial to learn how to respond dynamically to customer actions in order to analyze and react to events in real-time. This tutorial describes how to store, analyze, and visualize event data to get more insight into customer behavior.
The sample application is available on GitHub. To run this tutorial using Terraform, follow the steps provided with the sample application on GitHub.
Create the example sources and sinks
This section explains how to create the following:
- A Cloud Storage bucket to use as a temporary storage location
- Streaming data sources using Pub/Sub
- Datasets to load the data into BigQuery
- A Bigtable instance
Create a Cloud Storage bucket
Begin by creating a Cloud Storage bucket. This bucket is used as a temporary storage location by the Dataflow pipeline.
Use the
gcloud storage buckets create
command:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Replace the following:
- BUCKET_NAME: a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.
- LOCATION: the location for the bucket.
Create Pub/Sub topics and subscriptions
Create four Pub/Sub topics and then create three subscriptions.
To create your topics, run the
gcloud pubsub topics create
command once for each topic. For information about how to name a subscription, see
Guidelines to name a topic or a subscription.
gcloud pubsub topics create TOPIC_NAME
Replace TOPIC_NAME with the following values, running the command four times, once for each topic:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
To create a subscription to your topic, run the
gcloud pubsub subscriptions create
command once for each subscription:
Create a
Clickstream-inbound-sub
subscription:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Create a
Transactions-inbound-sub
subscription:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Create an
Inventory-inbound-sub
subscription:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Create BigQuery datasets and table
Create a BigQuery dataset and a partitioned table with the appropriate schema for your Pub/Sub topic.
Use the
bq mk
command to create the first dataset.bq --location=US mk \ PROJECT_ID:Retail_Store
Create the second dataset.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Use the CREATE TABLE SQL statement to create a table with a schema and test data. The test data has one store with an ID value of
1
. The slow update side input pattern uses this table.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Create a Bigtable instance and table
Create a Bigtable instance and table. For more information about creating Bigtable instances, see Create an instance.
If needed, run the following command to install the
cbt
CLI:gcloud components install cbt
Use the
bigtable instances create
command to create an instance:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Replace CLUSTER_ZONE with the zone where the cluster runs.
Use the
cbt createtable
command to create a table:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Use the following command to add a column family to the table:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Run the pipeline
Use Gradle to run a streaming pipeline. To view the Java code that the pipeline is using, see RetailDataProcessingPipeline.java.
Use the
git clone
command to clone the GitHub repository:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Switch to the application directory:
cd dataflow-sample-applications/retail/retail-java-applications
To test the pipeline, in your shell or terminal, run the following command using Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
To run the pipeline, run the following command using Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID \ --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
See the pipeline source code on GitHub.
Create and run Cloud Scheduler jobs
Create and run three Cloud Scheduler jobs, one that publishes clickstream data, one for inventory data, and one for transaction data. This step generates sample data for the pipeline.
To create a Cloud Scheduler job for this tutorial, use the
gcloud scheduler jobs create
command. This step creates a publisher for clickstream data that publishes one message per minute.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
To start the Cloud Scheduler job, use the
gcloud scheduler jobs run
command.gcloud scheduler jobs run --location=LOCATION clickstream
Create and run another similar publisher for inventory data that publishes one message every two minutes.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Start the second Cloud Scheduler job.
gcloud scheduler jobs run --location=LOCATION inventory
Create and run a third publisher for transaction data that publishes one message every two minutes.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Start the third Cloud Scheduler job.
gcloud scheduler jobs run --location=LOCATION transactions
View your results
View data written to your BigQuery tables. Check the results in BigQuery by running the following queries. While this pipeline is running, you can see new rows appended to the BigQuery tables every minute.
You might need to wait for the tables to populate with data.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'