Pub/Sub 到 BigQuery 管道的效能特徵

本頁面說明從 Pub/Sub 讀取資料並寫入 BigQuery 的 Dataflow 串流工作效能特徵。這項工具會提供兩種串流管道的基準測試結果:

  • 僅對應 (單一訊息轉換):這類管道會執行單一訊息轉換,不會追蹤狀態或將元素分組到串流中。例如 ETL、欄位驗證和結構定義對應。

  • 時間範圍匯總 (GroupByKey):執行有狀態作業,並依據鍵和時間範圍分組資料的管道。例如計算事件、計算總和,以及收集使用者工作階段的記錄。

串流資料整合的大多數工作負載都屬於這兩類。如果您的管道遵循類似模式,可以使用這些基準評估 Dataflow 工作,並與效能良好的參考設定進行比較。

測試方法

基準測試是使用下列資源進行:

  • 預先佈建的 Pub/Sub 主題,具有穩定的輸入負載。 這些訊息是使用「串流資料產生器」範本產生。

    • 訊息傳送速率:每秒約 1,000,000 則訊息
    • 輸入負載:1 GiB/秒
    • 訊息格式:隨機產生的 JSON 文字,具有固定結構定義
    • 郵件大小:每封郵件約 1 KiB
  • 標準 BigQuery 資料表。

  • Pub/Sub 到 BigQuery 範本為基礎的 Dataflow 串流管道。這些管道會執行最低要求的剖析和結構定義對應作業。未使用自訂使用者定義函式 (UDF)。

水平擴充功能穩定下來,且管道達到穩定狀態後,管道會執行約一天,然後收集及分析結果。

Dataflow 管道

我們測試了兩種管道變體:

僅限地圖的管道。這個管道會對 JSON 訊息執行簡單的對應和轉換作業。這項測試使用「Pub/Sub 到 BigQuery」範本,且未經過修改。

  • 語意:我們已使用僅一次模式至少一次模式測試管道。「至少一次」處理作業可提供更高的處理量。不過,只有在可接受重複記錄,或下游接收器會處理重複資料時,才應使用這項功能。

以時間範圍為準的匯總管道。這個管道會依固定大小時間範圍內的特定鍵將訊息分組,並將匯總記錄寫入 BigQuery。在本測試中,我們使用以「Pub/Sub 到 BigQuery」範本為基礎的自訂 Apache Beam 管道。

  • 匯總邏輯:針對每個固定且不重疊的 1 分鐘時間範圍,系統會收集具有相同鍵的訊息,並以單一匯總記錄的形式寫入 BigQuery。這種匯總方式通常用於記錄處理程序,可將相關事件 (例如使用者活動) 合併為單一記錄,以供後續分析。

  • 索引鍵平行處理:基準測試使用 1,000,000 個均勻分布的索引鍵。

  • 語意:管道已使用僅一次模式進行測試。匯總作業需要「僅限一次」語意,才能確保正確性,並防止在群組和時間範圍內重複計算。

工作設定

下表顯示 Dataflow 工作的設定方式。

設定 僅限一次對應 僅限地圖,至少一次 時間區間匯總 (僅處理一次)
工作站機器類型 n1-standard-2 n1-standard-2 n1-standard-2
工作站機器 vCPU 2 2 2
工作站機器 RAM 7.5 GiB 7.5 GiB 7.5 GiB
工作站永久磁碟 標準永久磁碟 (HDD),30 GB 標準永久磁碟 (HDD),30 GB 標準永久磁碟 (HDD),30 GB
初始工作站 70 30 180
工作站數量上限 100 100 250
Streaming Engine
自動水平調度資源
計費模式 以資源為準的計費方式 以資源為準的計費方式 以資源為準的計費方式
是否已啟用 Storage Write API?
Storage Write API 串流 200 不適用 500
Storage Write API 觸發頻率 5 秒 不適用 5 秒

建議串流管道使用 BigQuery Storage Write API。使用 Storage Write API 的「只傳送一次」模式時,您可以調整下列設定:

  • 寫入串流數量。為確保寫入階段有足夠的鍵值平行處理,請將 Storage Write API 串流數量設為大於工作站 CPU 數的值,同時維持合理的 BigQuery 寫入串流輸送量

  • 觸發頻率:單一位數的秒數值適用於高輸送量管道。

詳情請參閱「從 Dataflow 寫入 BigQuery」。

基準結果

本節說明基準測試結果。

處理量和資源用量

下表顯示管道輸送量和資源用量的測試結果。

結果 僅限地圖,僅一次 僅限地圖,至少一次 時間區間匯總 (僅處理一次)
每個工作站的輸入總處理量 平均值:17 MB/秒,n=3 平均值:21 MBps,n=3 平均值:6 MBps,n=3
所有工作站的平均 CPU 使用率 平均值:65%,n=3 平均值:69%,n=3 平均值:80%,n=3
工作站節點數 平均值:57,n=3 平均值:48,n=3 平均值:169,n=3
每小時 Streaming Engine 運算單元 平均值:125,n=3 平均值:46,n=3 平均值:354,n=3

自動調度演算法可能會影響目標 CPU 使用率。如要提高或降低目標 CPU 使用率,可以設定自動調度資源範圍工作站使用率提示。提高使用率目標可降低成本,但也會導致尾端延遲變差,尤其是在負載變動時。

對於時間區間匯總管道,匯總類型、時間區間大小和鍵平行處理程序可能會對資源用量造成重大影響。

延遲時間

下表顯示管道延遲的基準測試結果。

階段端對端總延遲時間 僅限一次對應 僅限地圖,至少一次 時間區間匯總 (僅處理一次)
第 50 個百分位數 平均值:800 毫秒,n=3 平均值:160 毫秒,n=3 平均值:3,400 毫秒,n=3
P95 平均值:2,000 毫秒,n=3 平均值:250 毫秒,n=3 平均值:13,000 毫秒,n=3
第 99 個百分位數 平均值:2,800 毫秒,n=3 平均值:410 毫秒,n=3 平均值:25,000 毫秒,n=3

這項測試測量了三個長期執行的測試中,每個階段的端對端延遲時間 (job/streaming_engine/stage_end_to_end_latencies 指標)。這項指標會測量 Streaming Engine 在每個管道階段花費的時間。包括管道的所有內部步驟,例如:

  • 隨機排序訊息並加入佇列以供處理
  • 實際處理時間,例如將訊息轉換為資料列物件
  • 寫入永久狀態,以及排隊寫入永久狀態所花費的時間

另一個延遲指標是資料更新間隔。不過,資料即時性會受到多項因素影響,例如使用者定義的視窗和來源的上游延遲。系統延遲時間可提供更客觀的基準,用於評估管道在負載下的內部處理效率和健康狀態。

每次執行測試時,系統會測量約一天的資料,並捨棄初始啟動期間的資料,以反映穩定狀態下的效能。結果會顯示造成額外延遲的兩個因素:

  • 「僅限一次」模式。如要達到僅一次語意,必須進行確定性隨機排序和持續狀態查詢,才能進行重複資料刪除。「至少一次」模式會略過這些步驟,因此執行速度明顯較快。

  • 時段匯總。訊息必須經過完整隨機排序、緩衝處理及寫入持續性狀態,才能關閉視窗,這會增加端對端延遲時間。

這裡顯示的基準代表基準線。延遲對管道複雜度非常敏感。自訂 UDF、額外轉換和複雜的視窗化邏輯都可能增加延遲時間。與狀態繁重的作業 (例如將元素收集到清單中) 相比,簡單且高度縮減的彙整作業 (例如總和和計數) 往往會導致較低的延遲。

估算費用

如要使用 Resource-based billing 估算類似管道的基準費用,請按照下列步驟使用 Google Cloud Platform Pricing Calculator

  1. 開啟 Pricing Calculator
  2. 按一下「新增至估算值」
  3. 選取「Dataflow」。
  4. 在「服務類型」部分,選取「Dataflow Classic」。
  5. 選取「進階設定」即可查看所有選項。
  6. 選擇執行工作的位置。
  7. 在「工作類型」中選取「串流」。
  8. 選取「啟用 Streaming Engine」
  9. 輸入作業執行時數、工作站節點、工作站機器和 Persistent Disk 儲存空間的資訊。
  10. 輸入預估的 Streaming Engine 運算單元數量。

資源用量和費用大致會隨著輸入輸送量線性擴展,但如果是只有少數 worker 的小型工作,總費用主要會是固定費用。您可以根據基準測試結果,推斷工作站節點數量和資源耗用量,做為起點。

舉例來說,假設您以「完全一次」模式執行僅含對應作業的管道,輸入資料速率為 100 MiB/s。根據 1 GiB/s 管道的基準測試結果,您可以估算資源需求,如下所示:

  • 縮放比例:(100 MiB/秒) / (1 GiB/秒) = 0.1
  • 預計工作站節點數:57 個工作站 × 0.1 = 5.7 個工作站
  • 預計每小時的 Streaming Engine 運算單元數:125 × 0.1 = 每小時 12.5 個單元

這個值僅供初步估算,實際輸送量和費用可能會因機器類型、訊息大小分配、使用者程式碼、彙整類型、鍵值平行處理和視窗大小等因素而有顯著差異。詳情請參閱「Dataflow 成本最佳化的最佳做法」。

執行測試管道

本節顯示用於執行僅含地圖管道的 gcloud dataflow flex-template run 指令。

僅限一次模式

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5

至少傳送一次模式

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 30 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
  --additional-experiments streaming_mode_at_least_once

更改下列內容:

  • JOB_ID:Dataflow 工作 ID
  • PROJECT_ID:專案 ID
  • SUBSCRIPTION_NAME:Pub/Sub 訂閱項目名稱
  • DATASET:BigQuery 資料集名稱
  • TABLE_NAME:BigQuery 資料表的名稱

產生測試資料

如要產生測試資料,請使用下列指令執行串流資料產生器範本

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION

更改下列內容:

  • JOB_ID:Dataflow 工作 ID
  • PROJECT_ID:專案 ID
  • TOPIC_NAME:Pub/Sub 主題名稱
  • SCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑

串流資料產生器範本會使用 JSON 資料產生器檔案定義訊息結構定義。基準測試使用的訊息結構定義類似於下列內容:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

後續步驟