本頁面說明從 Pub/Sub 讀取資料並寫入 BigQuery 的 Dataflow 串流工作效能特徵。這項工具會提供兩種串流管道的基準測試結果:
僅對應 (單一訊息轉換):這類管道會執行單一訊息轉換,不會追蹤狀態或將元素分組到串流中。例如 ETL、欄位驗證和結構定義對應。
時間範圍匯總 (
GroupByKey):執行有狀態作業,並依據鍵和時間範圍分組資料的管道。例如計算事件、計算總和,以及收集使用者工作階段的記錄。
串流資料整合的大多數工作負載都屬於這兩類。如果您的管道遵循類似模式,可以使用這些基準評估 Dataflow 工作,並與效能良好的參考設定進行比較。
測試方法
基準測試是使用下列資源進行:
預先佈建的 Pub/Sub 主題,具有穩定的輸入負載。 這些訊息是使用「串流資料產生器」範本產生。
- 訊息傳送速率:每秒約 1,000,000 則訊息
- 輸入負載:1 GiB/秒
- 訊息格式:隨機產生的 JSON 文字,具有固定結構定義
- 郵件大小:每封郵件約 1 KiB
標準 BigQuery 資料表。
以 Pub/Sub 到 BigQuery 範本為基礎的 Dataflow 串流管道。這些管道會執行最低要求的剖析和結構定義對應作業。未使用自訂使用者定義函式 (UDF)。
水平擴充功能穩定下來,且管道達到穩定狀態後,管道會執行約一天,然後收集及分析結果。
Dataflow 管道
我們測試了兩種管道變體:
僅限地圖的管道。這個管道會對 JSON 訊息執行簡單的對應和轉換作業。這項測試使用「Pub/Sub 到 BigQuery」範本,且未經過修改。
以時間範圍為準的匯總管道。這個管道會依固定大小時間範圍內的特定鍵將訊息分組,並將匯總記錄寫入 BigQuery。在本測試中,我們使用以「Pub/Sub 到 BigQuery」範本為基礎的自訂 Apache Beam 管道。
匯總邏輯:針對每個固定且不重疊的 1 分鐘時間範圍,系統會收集具有相同鍵的訊息,並以單一匯總記錄的形式寫入 BigQuery。這種匯總方式通常用於記錄處理程序,可將相關事件 (例如使用者活動) 合併為單一記錄,以供後續分析。
索引鍵平行處理:基準測試使用 1,000,000 個均勻分布的索引鍵。
語意:管道已使用僅一次模式進行測試。匯總作業需要「僅限一次」語意,才能確保正確性,並防止在群組和時間範圍內重複計算。
工作設定
下表顯示 Dataflow 工作的設定方式。
| 設定 | 僅限一次對應 | 僅限地圖,至少一次 | 時間區間匯總 (僅處理一次) |
|---|---|---|---|
| 工作站機器類型 | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| 工作站機器 vCPU | 2 | 2 | 2 |
| 工作站機器 RAM | 7.5 GiB | 7.5 GiB | 7.5 GiB |
| 工作站永久磁碟 | 標準永久磁碟 (HDD),30 GB | 標準永久磁碟 (HDD),30 GB | 標準永久磁碟 (HDD),30 GB |
| 初始工作站 | 70 | 30 | 180 |
| 工作站數量上限 | 100 | 100 | 250 |
| Streaming Engine | 是 | 是 | 是 |
| 自動水平調度資源 | 是 | 是 | 是 |
| 計費模式 | 以資源為準的計費方式 | 以資源為準的計費方式 | 以資源為準的計費方式 |
| 是否已啟用 Storage Write API? | 是 | 是 | 是 |
| Storage Write API 串流 | 200 | 不適用 | 500 |
| Storage Write API 觸發頻率 | 5 秒 | 不適用 | 5 秒 |
建議串流管道使用 BigQuery Storage Write API。使用 Storage Write API 的「只傳送一次」模式時,您可以調整下列設定:
寫入串流數量。為確保寫入階段有足夠的鍵值平行處理,請將 Storage Write API 串流數量設為大於工作站 CPU 數的值,同時維持合理的 BigQuery 寫入串流輸送量。
觸發頻率:單一位數的秒數值適用於高輸送量管道。
詳情請參閱「從 Dataflow 寫入 BigQuery」。
基準結果
本節說明基準測試結果。
處理量和資源用量
下表顯示管道輸送量和資源用量的測試結果。
| 結果 | 僅限地圖,僅一次 | 僅限地圖,至少一次 | 時間區間匯總 (僅處理一次) |
|---|---|---|---|
| 每個工作站的輸入總處理量 | 平均值:17 MB/秒,n=3 | 平均值:21 MBps,n=3 | 平均值:6 MBps,n=3 |
| 所有工作站的平均 CPU 使用率 | 平均值:65%,n=3 | 平均值:69%,n=3 | 平均值:80%,n=3 |
| 工作站節點數 | 平均值:57,n=3 | 平均值:48,n=3 | 平均值:169,n=3 |
| 每小時 Streaming Engine 運算單元 | 平均值:125,n=3 | 平均值:46,n=3 | 平均值:354,n=3 |
自動調度演算法可能會影響目標 CPU 使用率。如要提高或降低目標 CPU 使用率,可以設定自動調度資源範圍或工作站使用率提示。提高使用率目標可降低成本,但也會導致尾端延遲變差,尤其是在負載變動時。
對於時間區間匯總管道,匯總類型、時間區間大小和鍵平行處理程序可能會對資源用量造成重大影響。
延遲時間
下表顯示管道延遲的基準測試結果。
| 階段端對端總延遲時間 | 僅限一次對應 | 僅限地圖,至少一次 | 時間區間匯總 (僅處理一次) |
|---|---|---|---|
| 第 50 個百分位數 | 平均值:800 毫秒,n=3 | 平均值:160 毫秒,n=3 | 平均值:3,400 毫秒,n=3 |
| P95 | 平均值:2,000 毫秒,n=3 | 平均值:250 毫秒,n=3 | 平均值:13,000 毫秒,n=3 |
| 第 99 個百分位數 | 平均值:2,800 毫秒,n=3 | 平均值:410 毫秒,n=3 | 平均值:25,000 毫秒,n=3 |
這項測試測量了三個長期執行的測試中,每個階段的端對端延遲時間 (job/streaming_engine/stage_end_to_end_latencies 指標)。這項指標會測量 Streaming Engine 在每個管道階段花費的時間。包括管道的所有內部步驟,例如:
- 隨機排序訊息並加入佇列以供處理
- 實際處理時間,例如將訊息轉換為資料列物件
- 寫入永久狀態,以及排隊寫入永久狀態所花費的時間
另一個延遲指標是資料更新間隔。不過,資料即時性會受到多項因素影響,例如使用者定義的視窗和來源的上游延遲。系統延遲時間可提供更客觀的基準,用於評估管道在負載下的內部處理效率和健康狀態。
每次執行測試時,系統會測量約一天的資料,並捨棄初始啟動期間的資料,以反映穩定狀態下的效能。結果會顯示造成額外延遲的兩個因素:
「僅限一次」模式。如要達到僅一次語意,必須進行確定性隨機排序和持續狀態查詢,才能進行重複資料刪除。「至少一次」模式會略過這些步驟,因此執行速度明顯較快。
時段匯總。訊息必須經過完整隨機排序、緩衝處理及寫入持續性狀態,才能關閉視窗,這會增加端對端延遲時間。
這裡顯示的基準代表基準線。延遲對管道複雜度非常敏感。自訂 UDF、額外轉換和複雜的視窗化邏輯都可能增加延遲時間。與狀態繁重的作業 (例如將元素收集到清單中) 相比,簡單且高度縮減的彙整作業 (例如總和和計數) 往往會導致較低的延遲。
估算費用
如要使用 Resource-based billing 估算類似管道的基準費用,請按照下列步驟使用 Google Cloud Platform Pricing Calculator:
- 開啟 Pricing Calculator。
- 按一下「新增至估算值」。
- 選取「Dataflow」。
- 在「服務類型」部分,選取「Dataflow Classic」。
- 選取「進階設定」即可查看所有選項。
- 選擇執行工作的位置。
- 在「工作類型」中選取「串流」。
- 選取「啟用 Streaming Engine」。
- 輸入作業執行時數、工作站節點、工作站機器和 Persistent Disk 儲存空間的資訊。
- 輸入預估的 Streaming Engine 運算單元數量。
資源用量和費用大致會隨著輸入輸送量線性擴展,但如果是只有少數 worker 的小型工作,總費用主要會是固定費用。您可以根據基準測試結果,推斷工作站節點數量和資源耗用量,做為起點。
舉例來說,假設您以「完全一次」模式執行僅含對應作業的管道,輸入資料速率為 100 MiB/s。根據 1 GiB/s 管道的基準測試結果,您可以估算資源需求,如下所示:
- 縮放比例:(100 MiB/秒) / (1 GiB/秒) = 0.1
- 預計工作站節點數:57 個工作站 × 0.1 = 5.7 個工作站
- 預計每小時的 Streaming Engine 運算單元數:125 × 0.1 = 每小時 12.5 個單元
這個值僅供初步估算,實際輸送量和費用可能會因機器類型、訊息大小分配、使用者程式碼、彙整類型、鍵值平行處理和視窗大小等因素而有顯著差異。詳情請參閱「Dataflow 成本最佳化的最佳做法」。
執行測試管道
本節顯示用於執行僅含地圖管道的
gcloud dataflow flex-template run
指令。
僅限一次模式
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
至少傳送一次模式
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
更改下列內容:
JOB_ID:Dataflow 工作 IDPROJECT_ID:專案 IDSUBSCRIPTION_NAME:Pub/Sub 訂閱項目名稱DATASET:BigQuery 資料集名稱TABLE_NAME:BigQuery 資料表的名稱
產生測試資料
如要產生測試資料,請使用下列指令執行串流資料產生器範本:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
更改下列內容:
JOB_ID:Dataflow 工作 IDPROJECT_ID:專案 IDTOPIC_NAME:Pub/Sub 主題名稱SCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑
串流資料產生器範本會使用 JSON 資料產生器檔案定義訊息結構定義。基準測試使用的訊息結構定義類似於下列內容:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
後續步驟
- 使用 Dataflow 工作監控介面
- Dataflow 成本最佳化最佳做法
- 疑難排解串流工作緩慢或停滯的問題
- 從 Pub/Sub 讀取資料至 Dataflow
- 從 Dataflow 寫入 BigQuery