Kafka 至 BigQuery 管道的效能特徵

本頁面說明從 Apache Kafka 讀取資料並寫入 BigQuery 的 Dataflow 串流作業效能特徵。這項工具會提供僅限地圖管道的基準測試結果,這類管道會執行每則訊息的轉換作業,但不會追蹤狀態或將串流中的元素分組。

許多資料整合工作負載 (包括 ETL、欄位驗證和結構定義對應) 都屬於僅限對應的類別。如果管道符合這個模式,您可以使用這些基準來評估 Dataflow 工作,並與效能良好的參考設定進行比較。

測試方法

基準測試是使用下列資源進行:

  • Managed Service for Apache Kafka 叢集。這些訊息是使用「串流資料產生器」範本產生。

    • 訊息傳送速率:每秒約 1,000,000 則訊息
    • 輸入負載:1 GiB/秒
    • 訊息格式:隨機產生的 JSON 文字,具有固定結構定義
    • 郵件大小:每封郵件約 1 KiB
    • Kafka 分區:1000
  • 標準 BigQuery 資料表。

  • 使用 Apache Kafka 到 BigQuery 範本的 Dataflow 串流管道。這個管道會執行最少量的必要剖析和結構定義對應作業。未使用自訂使用者定義函式 (UDF)。

水平擴充功能穩定後,管道達到穩定狀態,管道可執行約一天,之後會收集及分析結果。

Dataflow 管道

這項基準測試使用僅含對應的管道,可執行簡單的對應作業,並轉換 JSON 訊息。我們已使用僅一次模式至少一次模式測試管道。「至少一次」處理作業可提供更高的處理量。不過,只有在可接受重複記錄,或下游接收器會處理重複資料時,才應使用這項功能。

工作設定

下表顯示 Dataflow 工作的設定方式。

設定
工作站機器類型 e2-standard-2
工作站機器 vCPU 2
工作站機器 RAM 8 GB
工作站永久磁碟 標準永久磁碟 (HDD),30 GB
工作站數量上限 120
Streaming Engine
自動水平調度資源
計費模式 以資源為準的計費方式
是否已啟用 Storage Write API?
Storage Write API 串流 400
Storage Write API 觸發頻率 5 秒
訊息格式 JSON
Kafka 驗證模式

應用程式預設憑證 (ADC)。

詳情請參閱「 Kafka 代理程式的驗證類型」。

建議串流管道使用 BigQuery Storage Write API。使用 Storage Write API 的「只寫入一次」模式時,您可以調整下列設定:

  • 寫入串流數量。為確保寫入階段有足夠的鍵值平行處理,請將 Storage Write API 串流數量設為大於工作站 CPU 數的值,同時遵循每串流輸送量建議

  • 觸發頻率:單一位數的秒數值適用於高輸送量管道。

詳情請參閱「從 Dataflow 寫入 BigQuery」。

此外,也應特別考量 Apache Kafka 分區數量。為確保讀取階段有足夠的鍵平行處理量,分割區數量至少應等於工作站 vCPU 總數。詳情請參閱「從 Apache Kafka 讀取資料到 Dataflow」。

基準結果

本節說明基準測試結果。

處理量和資源用量

下表顯示管道輸送量和資源用量的測試結果。

結果 僅限一次 至少一次
每個工作站的輸入總處理量 平均值:15 MB/秒,n=3 平均值:18 MBps,n=3
所有工作站的平均 CPU 使用率 平均值:70%,n=3 平均值:75%,n=3
工作站節點數 平均值:63,n=3 平均值:53,n=3
每小時 Streaming Engine 運算單元 平均值:58,n=3 平均值:0,n=3

自動調度演算法可能會影響目標 CPU 使用率。如要提高或降低目標 CPU 使用率,可以設定自動調度資源範圍工作站使用率提示。提高使用率目標可降低成本,但也會導致尾端延遲變差,尤其是在負載變動時。

延遲時間

下表顯示「僅處理一次」模式的管道延遲時間基準測試結果,但不包括輸入階段。

階段端對端總延遲時間 (不含輸入階段) 僅限一次
第 50 個百分位數 平均值:1,200 毫秒,n=3
P95 平均值:3,000 毫秒,n=3
第 99 個百分位數 平均值:5,400 毫秒,n=3

這項測試測量了三個長期執行的測試,每個階段的端對端延遲時間 (job/streaming_engine/stage_end_to_end_latencies 指標)。這項指標會測量 Streaming Engine 在每個管道階段花費的時間。這包括管道的所有內部步驟,例如:

  • 隨機排序訊息並加入佇列以供處理
  • 實際處理時間,例如將訊息轉換為列物件
  • 寫入永久狀態,以及排隊寫入永久狀態所花費的時間

由於指標限制,系統不會回報輸入階段延遲時間。 因此不會計入總數。

這裡顯示的基準代表基準線。延遲時間對管道複雜度非常敏感。自訂 UDF、額外轉換和複雜的視窗邏輯都可能增加延遲時間。

估算費用

如要估算自有類似管道的基準費用,請使用以資源為準的計費方式,並按照下列步驟操作:Google Cloud

  1. 開啟 Pricing Calculator
  2. 按一下「新增至估算值」
  3. 選取「Dataflow」。
  4. 在「服務類型」部分,選取「Dataflow Classic」。
  5. 選取「進階設定」即可查看所有選項。
  6. 選擇執行工作的位置。
  7. 在「Job type」(工作類型) 區段選取「Streaming」(串流)。
  8. 選取「啟用 Streaming Engine」
  9. 輸入工作執行時數、工作站節點、工作站機器和永久磁碟儲存空間的相關資訊。
  10. 輸入預估的 Streaming Engine 運算單元數量。

資源用量和費用大致會隨著輸入輸送量線性擴展,但對於只有少數工作站的小型工作,總費用主要取決於固定費用。您可以根據基準測試結果,推斷工作站節點數量和資源耗用量,做為起點。

舉例來說,假設您以「完全一次」模式執行僅含對應作業的管道,輸入資料速率為 100 MiB/s。根據 1 GiB/s 管道的基準測試結果,您可以估算資源需求,如下所示:

  • 資源調度係數:(100 MiB/秒) / (1 GiB/秒) = 0.1
  • 預估工作站節點數:63 個工作站 × 0.1 = 6.3 個工作站
  • 預計每小時的 Streaming Engine 運算單元數:58 × 0.1 = 每小時 5.8 個單元

這個值僅供初步估算。實際輸送量和費用可能會因機器類型、訊息大小分配、使用者程式碼、彙整類型、鍵值平行處理和視窗大小等因素而有顯著差異。詳情請參閱「Dataflow 成本最佳化的最佳做法」。

執行測試管道

本節顯示用於執行僅限地圖管道的 gcloud dataflow flex-template run 指令。

「僅限一次」模式

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

至少傳送一次模式

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

更改下列內容:

  • JOB_NAME:Dataflow 工作名稱
  • PROJECT_ID:專案 ID
  • KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 叢集的啟動位址
  • KAFKA_TOPIC:Kafka 主題的名稱
  • BQ_DATASET:BigQuery 資料集名稱
  • BQ_TABLE_NAME:BigQuery 資料表的名稱

產生測試資料

如要產生測試資料,請使用下列指令執行串流資料產生器範本

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

更改下列內容:

  • JOB_NAME:Dataflow 工作名稱
  • PROJECT_ID:專案 ID
  • SCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑
  • KAFKA_BOOTSTRAP_ADDRESS:Apache Kafka 叢集的啟動位址
  • KAFKA_TOPIC:Kafka 主題的名稱

串流資料產生器範本會使用 JSON 資料產生器檔案定義訊息結構定義。基準測試使用的訊息結構定義類似於下列內容:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

後續步驟