本文概要說明 Kafka Connect 連接器在Google Cloud中的用途。瞭解何時該使用哪種連接器類型,以便管理及整合資料串流。
這些連接器會使用 Kafka Connect 架構,將 Apache Kafka 與其他應用程式整合。這些代理程式會擷取資料,並在 Kafka 叢集和應用程式之間複製資料。可用的連結器類型包括:
MirrorMaker 2.0 連接器
來源連接器
查核點連接器
心跳聲連接器
BigQuery Sink 連接器
Cloud Storage 接收器連接器
Pub/Sub 來源連接器
Pub/Sub 接收器連接器
MirrorMaker 2.0 連接器專為 Kafka 叢集之間的資料複製和災難復原而設計。這項工具可將資料從一個 Kafka 叢集鏡像到另一個叢集,實現高可用性和容錯能力。
MirrorMaker 2.0 連接器可在 Managed Service for Apache Kafka 叢集之間,以及 Managed Service for Apache Kafka 叢集與自行管理的 Kafka 叢集之間建立連線。
其他 Sink 和 Source 連接器則用於整合 Kafka 與各種Google Cloud 服務。這些連接器可在 Managed Service for Apache Kafka 叢集和 Google Cloud 服務 (例如 BigQuery、Cloud Storage 或 Pub/Sub) 之間轉移資料。
事前準備
探索及建立連結器之前,請先瞭解下列事項並確認符合必備條件:
熟悉 Kafka Connect 和 Connect 叢集。您必須先建立 Connect 叢集,才能部署連接器。
如果是接收器和來源連接器,則需瞭解 BigQuery 表格、Cloud Storage 值區,或 Pub/Sub 主題和訂閱項目,視您設定的連接器類型而定。
熟悉 YAML 或 JSON 設定檔,因為連接器是使用這些格式設定。
MirrorMaker 2.0 的使用時機
在下列情況下使用 MirrorMaker 2.0 連接器:
遷移資料:將 Kafka 工作負載移至新的 Managed Service for Apache Kafka 叢集。
災難復原:建立備份叢集,確保業務持續運作,以防發生故障。
匯總資料:將多個 Kafka 叢集的資料整合至中央 Managed Service for Apache Kafka 叢集,以供分析。
MirrorMaker 2.0 主要功能
- 複製所有必要元件,包括主題、資料、設定、含位移的消費者群組和 ACL。
- 在目標叢集中維持相同的分割區配置,簡化應用程式的轉換作業。
- 自動偵測及複製新主題和分割區,盡量減少手動設定。
- 提供端對端複製延遲等重要指標,方便您追蹤複製程序的健康狀態和效能。
- 即使資料量龐大,也能確保運作穩定,並可水平擴充,因應日益增加的工作負載。
- 使用內部主題進行位移同步、檢查點和心跳。這些主題具有可設定的複寫因數 (例如
offset.syncs.topic.replication.factor),可確保高可用性和容錯能力。
使用 MirrorMaker 2.0 來源連接器
MirrorMaker 2.0 來源連接器會將某個 Kafka 叢集 (來源) 的主題和資料,複製到另一個 Kafka 叢集 (目標)。
| 來源 | 目標 |
|---|---|
| Managed Service for Apache Kafka 叢集 | Managed Service for Apache Kafka 叢集 |
| Managed Service for Apache Kafka 叢集 | 外部或自行管理的 Kafka 叢集 |
| 外部或自行管理的 Kafka 叢集 | Managed Service for Apache Kafka 叢集 |
MirrorMaker 2.0 來源連接器支援下列遷移情境:
將外部或自行管理的 Kafka 叢集資料複製或遷移至 Managed Service for Apache Kafka 叢集
將資料從 Managed Service for Apache Kafka 叢集複製或遷移至外部或自行管理的 Kafka 叢集。
跨區域複製 Kafka 資料,以滿足災難復原和高可用性需求。
使用 MirrorMaker 2.0 查核點連接器
您可以選擇是否使用 MirrorMaker 2.0 查核點連接器。 這會複製消費者偏移,指出最後成功消費的訊息。這個程序可確保目標叢集上的消費者能從與來源叢集相同的位置繼續處理資料。
MirrorMaker 2.0 來源連接器不需要這個連接器即可運作。只有在從來源叢集切換至目標叢集時,需要同步處理 ConsumerGroup 狀態以盡量縮短停機時間,才需要這個連接器。如果只需要來源資料副本,則不需要這個連接器。
在下列情況下,請使用 MirrorMaker 2.0 查核點連接器:
災難復原,確保叢集間的消費者狀態一致,並啟用無縫容錯移轉。
在重要情境中保留消費者進度。
使用 MirrorMaker 2.0 Heartbeat 連接器
MirrorMaker 2.0 活動訊號連接器是選用元件,可定期在來源 Kafka 叢集上產生活動訊號訊息。連接器會將這些訊息寫入專屬主題,通常名為 heartbeats。
您可以設定 MirrorMaker 2.0 來源連接器,將 heartbeats 主題複製到目標叢集。觀察目標叢集上複製的主題,即可監控主題複製流程的狀態和成效。即使沒有產生或複製其他資料,您仍可透過這項功能驗證叢集之間的連線和資料流。
單獨部署 Heartbeat 連接器不會自動監控複寫健康狀態。如要使用這項功能進行監控,您必須複製 heartbeats 主題,然後在目標叢集上觀察該主題的存在和及時性,或使用會消耗這些心跳的監控工具。
MirrorMaker 2.0 來源連接器不需要 Heartbeat 連接器即可運作。在下列情況下,請使用 MirrorMaker 2.0 Heartbeat 連接器:
監控 MirrorMaker 2 複寫的健康狀態和狀態。
使用產生的心跳訊號和可用指標,在 Cloud Monitoring 中設定快訊,以便在複寫或心跳訊號停止時收到通知。
使用 Sink 連接器
接收器連接器會將 Kafka 主題的資料匯出至其他系統。
使用 BigQuery Sink 連接器
BigQuery 接收器連接器會將 Kafka 主題的資料串流至 BigQuery 資料表。
在下列用途中,使用 BigQuery Sink 連接器:
資料倉儲,可將串流資料載入 BigQuery,用於分析和報表。
在 BigQuery 資料表中填入資料,以支援即時資訊主頁。
使用 Cloud Storage Sink 連接器
Cloud Storage 接收器連接器會將 Kafka 主題的資料串流至 Cloud Storage bucket。
在下列用途中,使用 Cloud Storage Sink 連接器:
資料湖泊擷取作業,將 Kafka 資料儲存在資料湖泊中,以供長期封存和批次處理。
封存資料以符合法規要求。
使用 Pub/Sub 接收器連接器
Pub/Sub 接收器連接器會將 Kafka 主題的訊息,串流至 Pub/Sub 主題。
在下列情況下,請使用 Pub/Sub 接收器連接器:
服務整合,可將資料從 Kafka 傳送至其他 Google Cloud 服務或應用程式,並從 Pub/Sub 取用資料。
根據處理後的資料觸發即時通知或動作。
使用來源連接器
來源連接器會將其他系統的資料匯入 Kafka 主題。
使用 Pub/Sub 來源連接器
Pub/Sub 來源連接器會將 Pub/Sub 訂閱項目的訊息,串流至 Kafka 主題。
在下列情況下,請使用 Pub/Sub 來源連接器:
即時擷取資料,從雲端服務或其他應用程式擷取資料,並發布至 Pub/Sub,然後傳送至 Kafka 進行串流處理。
事件導向架構,根據發布至 Pub/Sub 的事件觸發以 Kafka 為基礎的處理程序。
任務重新啟動政策
您可以設定連接器的工作重新啟動政策,決定發生失敗時的行為。連結器支援下列政策:
永不重新啟動。連接器不會重新啟動失敗的工作。這項政策是預設行為。這項功能有助於偵錯,或在發生錯誤後需要手動介入的情況下使用。
以指數輪詢方式重新啟動。連接器會在延遲一段時間 (稱為退避期) 後,重新啟動失敗的任務。每次後續失敗時,延遲時間會呈指數增加。建議大多數正式環境工作負載採用這項政策。
如果使用指數輪詢政策,請同時設定最短和最長輪詢時間。輪詢時間下限應大於 60 秒,輪詢時間上限應小於 7200 秒。
轉換和述詞
Kafka Connect 支援預設的 Kafka 轉換和述詞。
您可以在連接器設定中指定設定。舉例來說,如要設定接收器連接器,使其忽略含有 DoNotProcess 標頭鍵的訊息,請在連接器中新增下列設定:
transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess
這項設定會執行下列操作:
設定名為
hasKey的述詞,類型為org.apache.kafka.connect.transforms.predicates.HasHeaderKey。這個述詞會比對含有以DoNotProcess為鍵的標頭的所有郵件。設定名為
dropMessage的轉換,類型為org.apache.kafka.connect.transforms.Filter。這項轉換會捨棄所有符合所設述詞的訊息。將轉換連結至述詞
hasKey。這可確保只有含有DoNotProcess標頭鍵的訊息會遭到轉換作業捨棄。