建立電子商務串流管道

在本教學課程中,您將建立 Dataflow 串流管道,轉換來自 Pub/Sub 主題和訂閱項目的電子商務資料,並將資料輸出至 BigQuery 和 Bigtable。本教學課程需要 Gradle

本教學課程提供端對端的電子商務範例應用程式,可將網路商店的資料串流至 BigQuery 和 Bigtable。這個範例應用程式說明瞭串流資料分析和即時人工智慧 (AI) 的常見用途和最佳做法。本教學課程將說明如何動態回應顧客動作,以便即時分析及回應事件。本教學課程說明如何儲存、分析及以圖表呈現事件資料,進一步瞭解消費者行為。

您可以在 GitHub 上找到範例應用程式。如要使用 Terraform 執行本教學課程,請按照 GitHub 範例應用程式提供的步驟操作。

目標

  • 驗證傳入的資料,並盡可能套用修正內容。
  • 分析點擊流資料,計算指定時間範圍內每個產品的瀏覽次數。將這項資訊儲存在低延遲儲存空間。應用程式接著就能使用這項資料,在網站上向顧客顯示瀏覽這項產品的人數訊息。
  • 使用交易資料輔助訂購商品:

    • 分析交易資料,計算特定期間內每個項目的銷售總數 (依商店和全球)。
    • 分析商品目錄資料,計算每個項目的進貨量。
    • 持續將這項資料傳送至庫存系統,以用於庫存購買決策。
  • 驗證傳入的資料,並盡可能套用修正內容。 將任何無法修正的資料寫入無效信件佇列,以進行額外分析和處理。建立指標,代表傳送至無法傳送郵件佇列的傳入資料百分比,以供監控和發出快訊。

  • 將所有傳入資料處理為標準格式,並儲存在資料倉儲中,以供日後分析和製作圖表。

  • 將店內銷售交易資料去正規化,以便納入商店位置的緯度和經度等資訊。透過 BigQuery 中緩慢變更的資料表提供商店資訊,並使用商店 ID 做為鍵。

資料

應用程式會處理下列類型的資料:

  • 線上系統傳送至 Pub/Sub 的點擊流資料。
  • 內部部署或軟體即服務 (SaaS) 系統傳送至 Pub/Sub 的交易資料。
  • 地端部署或 SaaS 系統傳送至 Pub/Sub 的股票資料。

工作模式

應用程式包含下列工作模式,這些模式常見於使用 Java 適用的 Apache Beam SDK 建構的管道:

費用

在本文件中,您會使用下列 Google Cloud的計費元件:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用期資格。

完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。

事前準備

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. 安裝 Google Cloud CLI。

  3. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  4. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  5. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  6. 確認專案已啟用計費功能 Google Cloud

  7. 啟用 Compute Engine、Dataflow、Pub/Sub、BigQuery、Bigtable、Bigtable Admin 和 Cloud Scheduler API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  8. 為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

  9. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。
  10. 安裝 Google Cloud CLI。

  11. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  12. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  13. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  14. 確認專案已啟用計費功能 Google Cloud

  15. 啟用 Compute Engine、Dataflow、Pub/Sub、BigQuery、Bigtable、Bigtable Admin 和 Cloud Scheduler API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  16. 為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

  17. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。
  18. 為新管道建立使用者代管的 worker 服務帳戶,並將必要角色授予該服務帳戶。

    1. 如要建立服務帳戶,請執行 gcloud iam service-accounts create 指令:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. 將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      • roles/bigquery.jobUser
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE 替換為各個角色。

    3. 將可為服務帳戶建立存取權杖的角色授予 Google 帳戶:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  19. 視需要下載及安裝 Gradle

建立範例來源和接收器

本節說明如何建立下列項目:

  • Cloud Storage bucket,做為暫時儲存位置
  • 使用 Pub/Sub 的串流資料來源
  • 將資料載入 BigQuery 的資料集
  • Bigtable 執行個體

建立 Cloud Storage 值區

首先建立 Cloud Storage bucket。 這個 bucket 是 Dataflow 管道的暫時儲存位置。

使用 gcloud storage buckets create 指令:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

更改下列內容:

  • BUCKET_NAME:Cloud Storage bucket 的名稱,必須符合 bucket 命名規定。Cloud Storage bucket 名稱不得重複。
  • LOCATION:值區的位置

建立 Pub/Sub 主題和訂閱項目

建立四個 Pub/Sub 主題,然後建立三個訂閱項目。

如要建立主題,請為每個主題執行一次 gcloud pubsub topics create 指令。如要瞭解如何命名訂閱項目,請參閱「主題或訂閱項目命名規範」。

gcloud pubsub topics create TOPIC_NAME

TOPIC_NAME 替換為下列值,並為每個主題執行一次指令 (共四次):

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

如要為主題建立訂閱項目,請針對每個訂閱項目執行一次 gcloud pubsub subscriptions create 指令:

  1. 建立Clickstream-inbound-sub訂閱項目:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. 建立Transactions-inbound-sub訂閱項目:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. 建立Inventory-inbound-sub訂閱項目:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

建立 BigQuery 資料集和資料表

建立 BigQuery 資料集和已分割的資料表,並為 Pub/Sub 主題設定適當的結構定義。

  1. 使用 bq mk 指令建立第一個資料集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. 建立第二個資料集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. 使用 CREATE TABLE SQL 陳述式建立含有結構定義和測試資料的資料表。測試資料有一個 ID 值為 1 的商店。這個表格用於更新緩慢的側邊輸入模式。

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

建立 Bigtable 執行個體和資料表

建立 Bigtable 執行個體和資料表。如要進一步瞭解如何建立 Bigtable 執行個體,請參閱「建立執行個體」。

  1. 如有需要,請執行下列指令安裝 cbt CLI

    gcloud components install cbt
    
  2. 使用 bigtable instances create 指令建立執行個體:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    CLUSTER_ZONE 替換為叢集執行的可用區

  3. 使用 cbt createtable 指令建立資料表:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. 使用下列指令將資料欄系列新增至資料表:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

執行管道

使用 Gradle 執行串流管道。 如要查看管線使用的 Java 程式碼,請參閱 RetailDataProcessingPipeline.java

  1. 使用 git clone 指令複製 GitHub 存放區:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 切換至應用程式目錄:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. 如要測試管道,請在殼層或終端機中,使用 Gradle 執行下列指令:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. 如要執行管道,請使用 Gradle 執行下列指令:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID \
    --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
    

請參閱 GitHub 上的管道原始碼

建立及執行 Cloud Scheduler 工作

建立並執行三項 Cloud Scheduler 工作,分別用於發布點擊流資料、目錄資料和交易資料。這個步驟會為管道產生範例資料。

  1. 如要為本教學課程建立 Cloud Scheduler 工作,請使用 gcloud scheduler jobs create 指令。這個步驟會為點擊串流資料建立發布者,每分鐘發布一則訊息。

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. 如要啟動 Cloud Scheduler 工作,請使用 gcloud scheduler jobs run 指令。

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. 建立並執行另一個類似的發布商,發布商品目錄資料,每兩分鐘發布一則訊息。

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. 啟動第二個 Cloud Scheduler 工作。

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. 建立並執行第三個發布者,用於發布交易資料,每兩分鐘發布一則訊息。

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. 啟動第三個 Cloud Scheduler 工作。

    gcloud scheduler jobs run --location=LOCATION transactions
    

查看結果

查看寫入 BigQuery 資料表的資料。執行下列查詢,在 BigQuery 中查看結果。這個管道執行時,您會看到每分鐘都有新資料列附加至 BigQuery 資料表。

您可能需要等待資料填入資料表。

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

清除所用資源

為避免因為本教學課程所用資源,導致系統向 Google Cloud 收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。 Google Cloud

  1. 前往 Google Cloud 控制台的「Manage resources」(管理資源) 頁面。

    前往「Manage resources」(管理資源)

  2. 在專案清單中選取要刪除的專案,然後點選「Delete」(刪除)
  3. 在對話方塊中輸入專案 ID,然後按一下 [Shut down] (關閉) 以刪除專案。

刪除個別資源

如要重複使用專案,請刪除您為本教學課程建立的資源。

清理 Google Cloud 專案資源

  1. 如要刪除 Cloud Scheduler 工作,請使用 gcloud scheduler jobs delete 指令。

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. 如要刪除 Pub/Sub 訂閱項目和主題,請使用 gcloud pubsub subscriptions deletegcloud pubsub topics delete 指令。

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. 如要刪除 BigQuery 資料表,請使用 bq rm 指令。

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. 刪除 BigQuery 資料集。單獨使用資料集不會產生任何費用。

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. 如要刪除 Bigtable 執行個體,請使用 cbt deleteinstance 指令。單獨使用值區不會產生任何費用。

    cbt deleteinstance aggregate-tables
    
  6. 如要刪除 Cloud Storage bucket 和其中的物件,請使用 gcloud storage rm 指令。單獨使用值區不會產生任何費用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤銷憑證

  1. 撤銷您授予使用者管理的 Worker 服務帳戶的角色。 針對下列每個 IAM 角色執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    • roles/bigquery.jobUser
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. 選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。

    gcloud auth application-default revoke
  3. 選用:從 gcloud CLI 撤銷憑證。

    gcloud auth revoke

後續步驟