處理 Bigtable 變更串流

本教學課程說明如何將資料管道部署至 Dataflow,即時串流來自 Bigtable 資料表變更串流的資料庫異動。管道的輸出內容會寫入 Cloud Storage 上的一系列檔案。

我們提供音樂收聽應用程式的範例資料集。在本教學課程中,您會追蹤聆聽的歌曲,然後在一段時間內列出前五名。

本教學課程適用於熟悉程式碼編寫,以及將資料管道部署至 Google Cloud的技術使用者。

目標

本教學課程說明如何執行下列操作:

  • 建立已啟用變更串流的 Bigtable 資料表。
  • 在 Dataflow 上部署管道,轉換並輸出變更串流。
  • 查看資料管道的結果。

費用

在本文件中,您會使用下列 Google Cloud的計費元件:

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用期資格。

完成本文所述工作後,您可以刪除建立的資源,避免繼續計費,詳情請參閱「清除所用資源」。

事前準備

    登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。

    安裝 Google Cloud CLI。 完成後,執行下列指令來初始化 Google Cloud CLI:

    gcloud init

    若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

    建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

    確認專案已啟用計費功能 Google Cloud

    啟用 Dataflow、Cloud Bigtable API、Cloud Bigtable Admin API 和 Cloud Storage API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    安裝 Google Cloud CLI。 完成後,執行下列指令來初始化 Google Cloud CLI:

    gcloud init

    若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

    建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

    確認專案已啟用計費功能 Google Cloud

    啟用 Dataflow、Cloud Bigtable API、Cloud Bigtable Admin API 和 Cloud Storage API:

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. 更新並安裝 cbt CLI 。
    gcloud components update
    gcloud components install cbt

準備環境

取得程式碼

複製包含範例程式碼的存放區。如果先前已下載這個存放區,請提取最新版本。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

建立值區

  • 建立 Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    BUCKET_NAME 替換為符合值區命名規定的值區名稱。
  • 建立 Bigtable 執行個體

    您可以使用現有執行個體進行本教學課程,也可以在您附近的區域建立執行個體,並採用預設設定。

    建立資料表

    這個範例應用程式會追蹤使用者收聽的歌曲,並將收聽事件儲存在 Bigtable 中。建立啟用變更串流的資料表,其中包含一個資料欄系列 (cf) 和一個資料欄 (song),並使用使用者 ID 做為資料列索引鍵。

    建立資料表。

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    更改下列內容:

    • PROJECT_ID:您使用的專案 ID
    • BIGTABLE_INSTANCE_ID:要包含新資料表的執行個體 ID

    啟動管道

    這個管道會執行下列動作,轉換變更串流:

    1. 讀取變更串流
    2. 取得歌曲名稱
    3. 將歌曲聆聽事件分組到 N 秒的時間範圍
    4. 計算前五首歌曲
    5. 輸出結果

    執行管道。

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    請將 BIGTABLE_REGION 改成 Bigtable 執行個體所在的區域 ID,例如 us-east5

    瞭解管道

    管道中的下列程式碼片段可協助您瞭解執行的程式碼。

    讀取變更串流

    這個範例中的程式碼會使用特定 Bigtable 執行個體和資料表的參數,設定來源串流。

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    取得歌曲名稱

    當系統偵測到有人聆聽歌曲時,會將歌曲名稱寫入資料欄系列 cf 和資料欄限定詞 song,因此程式碼會從變更串流突變中擷取值,並輸出至管道的下一個步驟。

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    計算前五名歌曲

    您可以使用內建的 Beam 函式 CountTop.of,取得目前視窗中排名前五的歌曲。

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    輸出結果

    這個管道會將結果寫入標準輸出內容和檔案。如果是檔案,系統會將寫入作業分組,每組 10 個元素或 1 分鐘的區段。

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    查看管道

    1. 前往 Google Cloud 控制台的「Dataflow」頁面。

      前往 Dataflow

    2. 按一下名稱開頭為 song-rank 的工作。

    3. 按一下畫面底部的「顯示」,開啟記錄面板。

    4. 按一下「Worker logs」(工作人員記錄),監控變更串流的輸出記錄。

    串流寫入

    使用 cbt CLI ,將多位使用者的歌曲收聽次數寫入 song-rank 資料表。這項功能會在幾分鐘內寫入資料,模擬一段時間內串流播放歌曲的收聽次數。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    查看輸出內容

    讀取 Cloud Storage 中的輸出內容,查看最熱門的歌曲。

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    輸出內容範例:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    清除所用資源

    為避免因為本教學課程所用資源,導致系統向 Google Cloud 收取費用,請刪除含有相關資源的專案,或者保留專案但刪除個別資源。

    刪除專案

      刪除 Google Cloud 專案:

      gcloud projects delete PROJECT_ID

    刪除個別資源

    1. 刪除 bucket 和檔案。

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. 停用資料表的變更串流。

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. 刪除資料表 song-rank

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. 停止變更串流管道。

      1. 列出工作以取得工作 ID。

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. 取消工作。

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        JOB_ID 換成上一個指令顯示的工作 ID。

    後續步驟