本頁面說明從 Apache Kafka 讀取資料並寫入 BigQuery 的 Dataflow 串流作業效能特徵。這項工具會提供僅限地圖管道的基準測試結果,這類管道會執行每則訊息的轉換作業,但不會追蹤狀態或將串流中的元素分組。
許多資料整合工作負載 (包括 ETL、欄位驗證和結構定義對應) 都屬於僅限對應的類別。如果管道符合這個模式,您可以使用這些基準來評估 Dataflow 工作,並與效能良好的參考設定進行比較。
測試方法
基準測試是使用下列資源進行:
Managed Service for Apache Kafka 叢集。這些訊息是使用「串流資料產生器」範本產生。
- 訊息傳送速率:每秒約 1,000,000 則訊息
- 輸入負載:1 GiB/s
- 訊息格式:隨機產生的 JSON 文字,具有固定結構定義
- 郵件大小:每封郵件約 1 KiB
- Kafka 分區:1000
標準 BigQuery 資料表。
使用 Apache Kafka 到 BigQuery 範本的 Dataflow 串流管道。這個管道會執行最少量的必要剖析和結構定義對應作業。未使用自訂使用者定義函式 (UDF)。
水平擴充功能穩定後,管道達到穩定狀態,管道可執行約一天,之後會收集及分析結果。
Dataflow 管道
這項基準測試使用僅含對應的管道,可執行簡單的對應作業,並轉換 JSON 訊息。我們已使用僅一次模式和至少一次模式測試管道。「至少一次」處理作業可提供更高的處理量。不過,只有在可接受重複記錄,或下游接收器會處理重複資料時,才應使用這項功能。
工作設定
下表顯示 Dataflow 工作的設定方式。
| 設定 | 值 |
|---|---|
| 工作站機器類型 | e2-standard-2 |
| 工作站機器 vCPU | 2 |
| 工作站機器 RAM | 8 GB |
| 工作站永久磁碟 | 標準永久磁碟 (HDD),30 GB |
| 工作站數量上限 | 120 |
| Streaming Engine | 是 |
| 自動水平調度資源 | 是 |
| 計費模式 | 以資源為準的計費方式 |
| 是否已啟用 Storage Write API? | 是 |
| Storage Write API 串流 | 400 |
| Storage Write API 觸發頻率 | 5 秒 |
| 訊息格式 | JSON |
| Kafka 驗證模式 |
應用程式預設憑證 (ADC)。 詳情請參閱「 Kafka 代理程式的驗證類型」。 |
建議串流管道使用 BigQuery Storage Write API。使用 Storage Write API 的「只傳送一次」模式時,您可以調整下列設定:
寫入串流數量。為確保寫入階段有足夠的鍵值平行處理,請將 Storage Write API 串流數量設為大於工作站 CPU 數的值,同時遵循每串流輸送量建議。
觸發頻率:單一位數的秒數值適用於高輸送量管道。
詳情請參閱「從 Dataflow 寫入 BigQuery」。
此外,也應特別考量 Apache Kafka 分區數量。為確保讀取階段有足夠的鍵平行處理量,分割區數量至少應等於工作站 vCPU 總數。詳情請參閱「從 Apache Kafka 讀取資料到 Dataflow」。
基準結果
本節說明基準測試結果。
處理量和資源用量
下表顯示管道輸送量和資源用量的測試結果。
| 結果 | 僅一次 | 至少一次 |
|---|---|---|
| 每個工作站的輸入總處理量 | 平均值:15 MBps,n=3 | 平均值:18 MBps,n=3 |
| 所有工作站的平均 CPU 使用率 | 平均值:70%,n=3 | 平均值:75%,n=3 |
| 工作站節點數 | 平均值:63,n=3 | 平均值:53,n=3 |
| 每小時 Streaming Engine 運算單元 | 平均值:58,n=3 | 平均值:0,n=3 |
自動調度演算法可能會影響目標 CPU 使用率。如要提高或降低目標 CPU 使用率,可以設定自動調度資源範圍或工作站使用率提示。提高使用率目標可降低成本,但也會導致尾端延遲變差,尤其是在負載量變化時。
延遲時間
下表顯示「僅處理一次」模式的管道延遲時間基準測試結果,但不包括輸入階段。
| 階段端對端總延遲時間 (不含輸入階段) | 僅一次 |
|---|---|
| 第 50 個百分位數 | 平均值:1,200 毫秒,n=3 |
| P95 | 平均值:3,000 毫秒,n=3 |
| 第 99 個百分位數 | 平均值:5,400 毫秒,n=3 |
這項測試測量了三個長期執行的測試,每個階段的端對端延遲時間 (job/streaming_engine/stage_end_to_end_latencies 指標)。這項指標會測量 Streaming Engine 在每個管道階段花費的時間。這包括管道的所有內部步驟,例如:
- 隨機排序訊息並加入佇列以供處理
- 實際處理時間,例如將訊息轉換為列物件
- 寫入永久狀態,以及排隊寫入永久狀態所花費的時間
由於指標限制,系統不會回報輸入階段延遲時間。 因此不會計入總數。
這裡顯示的基準代表基準線。延遲時間對管道複雜度非常敏感。自訂 UDF、額外轉換和複雜的視窗邏輯都可能增加延遲時間。
估算費用
如要估算自有類似管道的基準費用,請使用 Google Cloud Platform Pricing Calculator,並採用以資源為準的計費方式,步驟如下:
- 開啟價格計算機。
- 按一下「新增至估算值」。
- 選取「Dataflow」。
- 在「服務類型」部分,選取「Dataflow Classic」。
- 選取「進階設定」即可查看所有選項。
- 選擇執行工作的位置。
- 在「Job type」(工作類型) 區段選取「Streaming」(串流)。
- 選取「啟用 Streaming Engine」。
- 輸入工作執行時數、工作站節點、工作站機器和永久磁碟儲存空間的相關資訊。
- 輸入預估的 Streaming Engine 運算單元數量。
資源用量和費用大致會隨著輸入輸送量線性擴展,但對於只有少數工作站的小型工作,總費用主要取決於固定費用。您可以根據基準測試結果,推斷工作站節點數量和資源耗用量,做為起點。
舉例來說,假設您以「完全一次」模式執行僅含對應作業的管道,輸入資料速率為 100 MiB/s。根據 1 GiB/s 管道的基準測試結果,您可以估算資源需求,如下所示:
- 資源調度係數:(100 MiB/秒) / (1 GiB/秒) = 0.1
- 預估工作站節點數:63 個工作站 × 0.1 = 6.3 個工作站
- 預計每小時的 Streaming Engine 運算單元數:58 × 0.1 = 每小時 5.8 個單元
這個值僅供初步估算。實際輸送量和成本可能會因機器類型、訊息大小分配、使用者程式碼、彙整類型、鍵值平行處理和視窗大小等因素而有顯著差異。詳情請參閱「Dataflow 成本最佳化的最佳做法」。
執行測試管道
本節顯示用於執行僅限地圖管道的
gcloud dataflow flex-template run
指令。
僅一次模式
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400
至少傳送一次模式
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--additional-experiments=streaming_mode_at_least_once \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true
更改下列內容:
JOB_NAME:Dataflow 工作名稱PROJECT_ID:專案 IDKAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 叢集的啟動位址KAFKA_TOPIC:Kafka 主題的名稱BQ_DATASET:BigQuery 資料集名稱BQ_TABLE_NAME:BigQuery 資料表的名稱
產生測試資料
如要產生測試資料,請使用下列指令執行串流資料產生器範本:
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--max-workers=140 \
--parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON
更改下列內容:
JOB_NAME:Dataflow 工作名稱PROJECT_ID:專案 IDSCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 叢集的啟動位址KAFKA_TOPIC:Kafka 主題的名稱
串流資料產生器範本會使用 JSON 資料產生器檔案定義訊息結構定義。基準測試使用的訊息結構定義類似於下列內容:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
後續步驟
- 使用 Dataflow 工作監控介面
- Dataflow 成本最佳化最佳做法
- 疑難排解串流工作緩慢或停滯的問題
- 從 Apache Kafka 讀取資料到 Dataflow
- 從 Dataflow 寫入 BigQuery