Kafka 至 BigQuery 管道的效能特徵

本頁面說明從 Apache Kafka 讀取資料並寫入 BigQuery 的 Dataflow 串流作業效能特徵。這項工具會提供僅限地圖管道的基準測試結果,這類管道會執行每則訊息的轉換作業,但不會追蹤狀態或將串流中的元素分組。

許多資料整合工作負載 (包括 ETL、欄位驗證和結構定義對應) 都屬於僅限對應的類別。如果管道符合這個模式,您可以使用這些基準來評估 Dataflow 工作,並與效能良好的參考設定進行比較。

測試方法

基準測試是使用下列資源進行:

  • Managed Service for Apache Kafka 叢集。這些訊息是使用「串流資料產生器」範本產生。

    • 訊息傳送速率:每秒約 1,000,000 則訊息
    • 輸入負載:1 GiB/s
    • 訊息格式:隨機產生的 JSON 文字,具有固定結構定義
    • 郵件大小:每封郵件約 1 KiB
    • Kafka 分區:1000
  • 標準 BigQuery 資料表。

  • 使用 Apache Kafka 到 BigQuery 範本的 Dataflow 串流管道。這個管道會執行最少量的必要剖析和結構定義對應作業。未使用自訂使用者定義函式 (UDF)。

水平擴充功能穩定後,管道達到穩定狀態,管道可執行約一天,之後會收集及分析結果。

Dataflow 管道

這項基準測試使用僅含對應的管道,可執行簡單的對應作業,並轉換 JSON 訊息。我們已使用僅一次模式至少一次模式測試管道。「至少一次」處理作業可提供更高的處理量。不過,只有在可接受重複記錄,或下游接收器會處理重複資料時,才應使用這項功能。

工作設定

下表顯示 Dataflow 工作的設定方式。

設定
工作站機器類型 e2-standard-2
工作站機器 vCPU 2
工作站機器 RAM 8 GB
工作站永久磁碟 標準永久磁碟 (HDD),30 GB
工作站數量上限 120
Streaming Engine
自動水平調度資源
計費模式 以資源為準的計費方式
是否已啟用 Storage Write API?
Storage Write API 串流 400
Storage Write API 觸發頻率 5 秒
訊息格式 JSON
Kafka 驗證模式

應用程式預設憑證 (ADC)。

詳情請參閱「 Kafka 代理程式的驗證類型」。

建議串流管道使用 BigQuery Storage Write API。使用 Storage Write API 的「只傳送一次」模式時,您可以調整下列設定:

  • 寫入串流數量。為確保寫入階段有足夠的鍵值平行處理,請將 Storage Write API 串流數量設為大於工作站 CPU 數的值,同時遵循每串流輸送量建議

  • 觸發頻率:單一位數的秒數值適用於高輸送量管道。

詳情請參閱「從 Dataflow 寫入 BigQuery」。

此外,也應特別考量 Apache Kafka 分區數量。為確保讀取階段有足夠的鍵平行處理量,分割區數量至少應等於工作站 vCPU 總數。詳情請參閱「從 Apache Kafka 讀取資料到 Dataflow」。

基準結果

本節說明基準測試結果。

處理量和資源用量

下表顯示管道輸送量和資源用量的測試結果。

結果 僅一次 至少一次
每個工作站的輸入總處理量 平均值:15 MBps,n=3 平均值:18 MBps,n=3
所有工作站的平均 CPU 使用率 平均值:70%,n=3 平均值:75%,n=3
工作站節點數 平均值:63,n=3 平均值:53,n=3
每小時 Streaming Engine 運算單元 平均值:58,n=3 平均值:0,n=3

自動調度演算法可能會影響目標 CPU 使用率。如要提高或降低目標 CPU 使用率,可以設定自動調度資源範圍工作站使用率提示。提高使用率目標可降低成本,但也會導致尾端延遲變差,尤其是在負載量變化時。

延遲時間

下表顯示「僅處理一次」模式的管道延遲時間基準測試結果,但不包括輸入階段。

階段端對端總延遲時間 (不含輸入階段) 僅一次
第 50 個百分位數 平均值:1,200 毫秒,n=3
P95 平均值:3,000 毫秒,n=3
第 99 個百分位數 平均值:5,400 毫秒,n=3

這項測試測量了三個長期執行的測試,每個階段的端對端延遲時間 (job/streaming_engine/stage_end_to_end_latencies 指標)。這項指標會測量 Streaming Engine 在每個管道階段花費的時間。這包括管道的所有內部步驟,例如:

  • 隨機排序訊息並加入佇列以供處理
  • 實際處理時間,例如將訊息轉換為列物件
  • 寫入永久狀態,以及排隊寫入永久狀態所花費的時間

由於指標限制,系統不會回報輸入階段延遲時間。 因此不會計入總數。

這裡顯示的基準代表基準線。延遲時間對管道複雜度非常敏感。自訂 UDF、額外轉換和複雜的視窗邏輯都可能增加延遲時間。

估算費用

如要估算自有類似管道的基準費用,請使用 Google Cloud Platform Pricing Calculator,並採用以資源為準的計費方式,步驟如下:

  1. 開啟價格計算機
  2. 按一下「新增至估算值」
  3. 選取「Dataflow」。
  4. 在「服務類型」部分,選取「Dataflow Classic」。
  5. 選取「進階設定」即可查看所有選項。
  6. 選擇執行工作的位置。
  7. 在「Job type」(工作類型) 區段選取「Streaming」(串流)。
  8. 選取「啟用 Streaming Engine」
  9. 輸入工作執行時數、工作站節點、工作站機器和永久磁碟儲存空間的相關資訊。
  10. 輸入預估的 Streaming Engine 運算單元數量。

資源用量和費用大致會隨著輸入輸送量線性擴展,但對於只有少數工作站的小型工作,總費用主要取決於固定費用。您可以根據基準測試結果,推斷工作站節點數量和資源耗用量,做為起點。

舉例來說,假設您以「完全一次」模式執行僅含對應作業的管道,輸入資料速率為 100 MiB/s。根據 1 GiB/s 管道的基準測試結果,您可以估算資源需求,如下所示:

  • 資源調度係數:(100 MiB/秒) / (1 GiB/秒) = 0.1
  • 預估工作站節點數:63 個工作站 × 0.1 = 6.3 個工作站
  • 預計每小時的 Streaming Engine 運算單元數:58 × 0.1 = 每小時 5.8 個單元

這個值僅供初步估算。實際輸送量和成本可能會因機器類型、訊息大小分配、使用者程式碼、彙整類型、鍵值平行處理和視窗大小等因素而有顯著差異。詳情請參閱「Dataflow 成本最佳化的最佳做法」。

執行測試管道

本節顯示用於執行僅限地圖管道的 gcloud dataflow flex-template run 指令。

僅一次模式

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

至少傳送一次模式

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

更改下列內容:

  • JOB_NAME:Dataflow 工作名稱
  • PROJECT_ID:專案 ID
  • KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 叢集的啟動位址
  • KAFKA_TOPIC:Kafka 主題的名稱
  • BQ_DATASET:BigQuery 資料集名稱
  • BQ_TABLE_NAME:BigQuery 資料表的名稱

產生測試資料

如要產生測試資料,請使用下列指令執行串流資料產生器範本

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

更改下列內容:

  • JOB_NAME:Dataflow 工作名稱
  • PROJECT_ID:專案 ID
  • SCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑
  • KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 叢集的啟動位址
  • KAFKA_TOPIC:Kafka 主題的名稱

串流資料產生器範本會使用 JSON 資料產生器檔案定義訊息結構定義。基準測試使用的訊息結構定義類似於下列內容:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

後續步驟