在本教學課程中,您將建立 Dataflow 串流管道,轉換來自 Pub/Sub 主題和訂閱項目的電子商務資料,並將資料輸出至 BigQuery 和 Bigtable。本教學課程需要 Gradle。
本教學課程提供端對端的電子商務範例應用程式,可將網路商店的資料串流至 BigQuery 和 Bigtable。這個範例應用程式說明瞭串流資料分析和即時人工智慧 (AI) 的常見用途和最佳做法。本教學課程將說明如何動態回應顧客動作,以便即時分析及回應事件。本教學課程說明如何儲存、分析及以圖表呈現事件資料,進一步瞭解消費者行為。
您可以在 GitHub 上找到範例應用程式。如要使用 Terraform 執行本教學課程,請按照 GitHub 範例應用程式提供的步驟操作。
目標
- 驗證傳入的資料,並盡可能套用修正內容。
- 分析點擊流資料,計算指定時間範圍內每個產品的瀏覽次數。將這項資訊儲存在低延遲儲存空間。應用程式接著就能使用這項資料,在網站上向顧客顯示瀏覽這項產品的人數訊息。
使用交易資料輔助訂購商品:
- 分析交易資料,計算特定期間內每個項目的銷售總數 (依商店和全球)。
- 分析商品目錄資料,計算每個項目的進貨量。
- 持續將這項資料傳送至庫存系統,以用於庫存購買決策。
驗證傳入的資料,並盡可能套用修正內容。 將任何無法修正的資料寫入無效信件佇列,以進行額外分析和處理。建立指標,代表傳送至無法傳送郵件佇列的傳入資料百分比,以供監控和發出快訊。
將所有傳入資料處理為標準格式,並儲存在資料倉儲中,以供日後分析和製作圖表。
將店內銷售交易資料去正規化,以便納入商店位置的緯度和經度等資訊。透過 BigQuery 中緩慢變更的資料表提供商店資訊,並使用商店 ID 做為鍵。
資料
應用程式會處理下列類型的資料:
- 線上系統傳送至 Pub/Sub 的點擊流資料。
- 內部部署或軟體即服務 (SaaS) 系統傳送至 Pub/Sub 的交易資料。
- 地端部署或 SaaS 系統傳送至 Pub/Sub 的股票資料。
工作模式
應用程式包含下列工作模式,這些模式常見於使用 Java 適用的 Apache Beam SDK 建構的管道:
費用
在本文件中,您會使用下列 Google Cloud的計費元件:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
如要根據預測用量估算費用,請使用 Pricing Calculator。
完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Compute Engine、Dataflow、Pub/Sub、BigQuery、Bigtable、Bigtable Admin 和 Cloud Scheduler API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
啟用 Compute Engine、Dataflow、Pub/Sub、BigQuery、Bigtable、Bigtable Admin 和 Cloud Scheduler API:
啟用 API 時所需的角色
如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (
roles/serviceusage.serviceUsageAdmin),其中包含serviceusage.services.enable權限。瞭解如何授予角色。gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
為新管道建立使用者代管的 worker 服務帳戶,並將必要角色授予該服務帳戶。
如要建立服務帳戶,請執行
gcloud iam service-accounts create指令:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/pubsub.editorroles/bigquery.dataEditorroles/bigtable.adminroles/bigquery.jobUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
將
SERVICE_ACCOUNT_ROLE替換為各個角色。將可為服務帳戶建立存取權杖的角色授予 Google 帳戶:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- 視需要下載及安裝 Gradle。
建立範例來源和接收器
本節說明如何建立下列項目:
- Cloud Storage bucket,做為暫時儲存位置
- 使用 Pub/Sub 的串流資料來源
- 將資料載入 BigQuery 的資料集
- Bigtable 執行個體
建立 Cloud Storage 值區
首先建立 Cloud Storage bucket。 這個 bucket 是 Dataflow 管道的暫時儲存位置。
使用 gcloud storage buckets create 指令:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
更改下列內容:
- BUCKET_NAME:Cloud Storage bucket 的名稱,必須符合 bucket 命名規定。Cloud Storage bucket 名稱不得重複。
- LOCATION:值區的位置。
建立 Pub/Sub 主題和訂閱項目
建立四個 Pub/Sub 主題,然後建立三個訂閱項目。
如要建立主題,請為每個主題執行一次 gcloud pubsub topics create 指令。如要瞭解如何命名訂閱項目,請參閱「主題或訂閱項目命名規範」。
gcloud pubsub topics create TOPIC_NAME
將 TOPIC_NAME 替換為下列值,並為每個主題執行一次指令 (共四次):
Clickstream-inboundTransactions-inboundInventory-inboundInventory-outbound
如要為主題建立訂閱項目,請針對每個訂閱項目執行一次 gcloud pubsub subscriptions create 指令:
建立
Clickstream-inbound-sub訂閱項目:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub建立
Transactions-inbound-sub訂閱項目:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub建立
Inventory-inbound-sub訂閱項目:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
建立 BigQuery 資料集和資料表
建立 BigQuery 資料集和已分割的資料表,並為 Pub/Sub 主題設定適當的結構定義。
使用
bq mk指令建立第一個資料集。bq --location=US mk \ PROJECT_ID:Retail_Store建立第二個資料集。
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations使用 CREATE TABLE SQL 陳述式建立含有結構定義和測試資料的資料表。測試資料有一個 ID 值為
1的商店。這個表格用於更新緩慢的側邊輸入模式。bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
建立 Bigtable 執行個體和資料表
建立 Bigtable 執行個體和資料表。如要進一步瞭解如何建立 Bigtable 執行個體,請參閱「建立執行個體」。
如有需要,請執行下列指令安裝
cbtCLI:gcloud components install cbt使用
bigtable instances create指令建立執行個體:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1將 CLUSTER_ZONE 替換為叢集執行的可用區。
使用
cbt createtable指令建立資料表:cbt -instance=aggregate-tables createtable PageView5MinAggregates使用下列指令將資料欄系列新增至資料表:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
執行管道
使用 Gradle 執行串流管道。 如要查看管線使用的 Java 程式碼,請參閱 RetailDataProcessingPipeline.java。
使用
git clone指令複製 GitHub 存放區:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git切換至應用程式目錄:
cd dataflow-sample-applications/retail/retail-java-applications如要測試管道,請在殼層或終端機中,使用 Gradle 執行下列指令:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks如要執行管道,請使用 Gradle 執行下列指令:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID \ --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
請參閱 GitHub 上的管道原始碼。
建立及執行 Cloud Scheduler 工作
建立並執行三項 Cloud Scheduler 工作,分別用於發布點擊流資料、目錄資料和交易資料。這個步驟會為管道產生範例資料。
如要為本教學課程建立 Cloud Scheduler 工作,請使用
gcloud scheduler jobs create指令。這個步驟會為點擊串流資料建立發布者,每分鐘發布一則訊息。gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'如要啟動 Cloud Scheduler 工作,請使用
gcloud scheduler jobs run指令。gcloud scheduler jobs run --location=LOCATION clickstream建立並執行另一個類似的發布商,發布商品目錄資料,每兩分鐘發布一則訊息。
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'啟動第二個 Cloud Scheduler 工作。
gcloud scheduler jobs run --location=LOCATION inventory建立並執行第三個發布者,用於發布交易資料,每兩分鐘發布一則訊息。
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'啟動第三個 Cloud Scheduler 工作。
gcloud scheduler jobs run --location=LOCATION transactions
查看結果
查看寫入 BigQuery 資料表的資料。執行下列查詢,在 BigQuery 中查看結果。這個管道執行時,您會看到每分鐘都有新資料列附加至 BigQuery 資料表。
您可能需要等待資料填入資料表。
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
清除所用資源
為避免因為本教學課程所用資源,導致系統向 Google Cloud 收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。
刪除專案
如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。 Google Cloud
- 前往 Google Cloud 控制台的「Manage resources」(管理資源) 頁面。
- 在專案清單中選取要刪除的專案,然後點選「Delete」(刪除)。
- 在對話方塊中輸入專案 ID,然後按一下 [Shut down] (關閉) 以刪除專案。
刪除個別資源
如要重複使用專案,請刪除您為本教學課程建立的資源。
清理 Google Cloud 專案資源
如要刪除 Cloud Scheduler 工作,請使用
gcloud scheduler jobs delete指令。gcloud scheduler jobs delete transactions --location=LOCATIONgcloud scheduler jobs delete inventory --location=LOCATIONgcloud scheduler jobs delete clickstream --location=LOCATION如要刪除 Pub/Sub 訂閱項目和主題,請使用
gcloud pubsub subscriptions delete和gcloud pubsub topics delete指令。gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME如要刪除 BigQuery 資料表,請使用
bq rm指令。bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations刪除 BigQuery 資料集。單獨使用資料集不會產生任何費用。
bq rm -r -f -d PROJECT_ID:Retail_Storebq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations如要刪除 Bigtable 執行個體,請使用
cbt deleteinstance指令。單獨使用值區不會產生任何費用。cbt deleteinstance aggregate-tables如要刪除 Cloud Storage bucket 和其中的物件,請使用
gcloud storage rm指令。單獨使用值區不會產生任何費用。gcloud storage rm gs://BUCKET_NAME --recursive
撤銷憑證
撤銷您授予使用者管理的 Worker 服務帳戶的角色。 針對下列每個 IAM 角色執行一次下列指令:
roles/dataflow.adminroles/dataflow.workerroles/pubsub.editorroles/bigquery.dataEditorroles/bigtable.adminroles/bigquery.jobUser
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。
gcloud auth application-default revoke
-
選用:從 gcloud CLI 撤銷憑證。
gcloud auth revoke
後續步驟
- 查看 GitHub 上的範例應用程式。
- 請參閱相關網誌文章: 瞭解 Beam 模式:使用 Google 代碼管理工具資料的點擊流處理程序。
- 請參閱這篇文章,瞭解如何使用 Pub/Sub 建立及使用主題,以及如何使用訂閱項目。
- 請參閱這篇文章,瞭解如何使用 BigQuery 建立資料集。
- 查看 Google Cloud 的參考架構、圖表和最佳做法。歡迎瀏覽我們的 Cloud Architecture Center。