Kafka Connect 總覽

Kafka Connect 是 Kafka 開發人員整合資料的首選工具。 這個架構可將 Kafka 連接至資料庫、訊息佇列和檔案系統等外部系統。

Kafka Connect 提供精選的內建連接器外掛程式,並由 Google Cloud審查及維護。這些連接器外掛程式會自動修補及升級,簡化維護作業並確保相容性。Google Cloud 也提供內建的監控和記錄功能,可維護管道的健康狀態。

Kafka Connect API 是 Google Cloud Managed Service for Apache Kafka 服務的一部分。這些 API 可透過 managedkafka.googleapis.com 存取,並整合至 Google Cloud 控制台和用戶端程式庫。如要管理 Kafka Connect,可以使用 Google Cloud 控制台、gcloud CLI、Managed Kafka API、Cloud 用戶端程式庫或 Terraform。

Kafka Connect 用途

Kafka Connect 支援在 Managed Service for Apache Kafka 叢集與各種其他系統之間整合資料。以下是一些主要用途:

  • 將現有的 Kafka 部署作業遷移至 Managed Service for Apache Kafka。

  • 將 Managed Service for Apache Kafka 叢集複製到其他區域,以進行災害復原。

  • 將資料從 Managed Service for Apache Kafka 串流至 BigQuery、Cloud Storage、Pub/Sub。

Kafka Connect 術語

這些章節會討論某些重要的 Kafka Connect 元件。

連結叢集

Connect 叢集是 Kafka Connect 的分散式部署項目,內含預先封裝的連接器外掛程式和設定。每個 Connect 叢集都會與一個主要 Managed Service for Apache Kafka 叢集相關聯。這個主要叢集會儲存 Connect 叢集上執行的連接器狀態。

一般來說,主要 Managed Service for Apache Kafka 叢集也會做為所有來源連接器的目標,以及在相關聯 Connect 叢集上執行的所有接收器連接器的來源。

單一 Managed Service for Apache Kafka 叢集可以有多個 Connect 叢集。如果執行 MirrorMaker 2.0,Connect 叢集可以連線至非主要 Managed Service for Apache Kafka 叢集或自行管理的 Kafka 叢集,讀取或寫入主題資料。這個程序可讓不同叢集之間複製主題。

就資源模型而言,Connect 叢集是與 Managed Service for Apache Kafka 叢集不同的資源。

假設您有一個 Managed Service for Apache Kafka 叢集,用於儲存網站流量資料。您想將這項資料串流至 BigQuery 進行分析。您可以建立 Connect 叢集,並使用 BigQuery 接收器連接器,將 Kafka 主題的資料移至 BigQuery。這個 Connect 叢集會與您的 Managed Service for Apache Kafka 叢集建立關聯,做為主要叢集。

連接器外掛程式

用於建立連接器的軟體套件。可視為定義連接器邏輯的程式碼。

  • 連接器可以是來源或接收器連接器。來源連接器會將來源資料寫入 Managed Service for Apache Kafka 叢集。

  • 接收器連接器會將 Managed Service for Apache Kafka 叢集的資料寫入接收器。

Managed Service for Apache Kafka 支援多種內建連接器外掛程式,您可以設定這些外掛程式來建立連接器。這些連接器可與 Pub/Sub 或 BigQuery 等常見服務整合。這些連接器外掛程式如下:

  • BigQuery Sink 連接器外掛程式

  • Cloud Storage Sink 連接器外掛程式

  • Pub/Sub 來源連接器外掛程式

  • Pub/Sub 接收器連接器外掛程式

  • MirrorMaker 2.0 連接器外掛程式

連接器

連接器是特定 Connect 叢集內連接器外掛程式的執行個體。您可以透過同一個連接器外掛程式建立多個連接器,每個連接器都有各自的特定設定。設定範例包括不同的驗證詳細資料和作業設定。連接器會在 Connect 叢集中部署、設定及管理。您可以啟動、停止、暫停、重新啟動作業,以及更新作業設定。

我們將在下節討論連接器的元件。

轉換者

轉換器是 Kafka Connect 的重要元件,負責序列化和還原序列化。這些轉換器會在 Kafka 主題上找到的原始位元組線路格式 (例如 Avro 或 JSON 格式) 與 Kafka Connect 的內部結構化資料表示法之間轉換資料。

轉換者的角色

  • 對於 Sink 連接器,轉換器會將主題的線路格式資料還原序列化為 Kafka Connect 的內部結構化資料表示法,然後連接器會使用該表示法寫入目標系統。

  • 如果是來源連接器,轉換器會將 Kafka Connect 提供的內部結構化資料表示法,序列化為 Kafka 主題的指定線路格式。

這個內部格式可做為通用表示法,支援各種中繼處理步驟。這些步驟包括篩選器、述詞、轉換和轉換器等基本項目,全都以這個統一的內部格式運作。使用抽象內部格式時,這些中間步驟的邏輯不會受到特定輸入或輸出資料格式影響。

如果需要與資料互動,而不只是傳遞資料,就必須使用轉換器。具體來說,如果需要以精細且結構感知的方式執行中繼處理步驟 (例如述詞或轉換),就必須使用轉換器。

如果您只打算將位元組字串 (即使是 JSON) 從來源移至 Kafka,且不進行任何操作,則不需要轉換器。

在連接器設定中,如果您未指定鍵和值轉換器,連接器會使用預設的 ByteArrayConverter 值。org.apache.kafka.connect.converters.ByteArrayConverter 值不會對資料套用任何轉換,而是以原始格式傳遞資料。

支援的轉換器

這個版本 Google Cloud 支援下列內建轉換器:

  • org.apache.kafka.connect.converters.ByteArrayConverter:將資料轉換為位元組陣列,或從位元組陣列轉換資料。這是預設轉換器。並透過連接器傳遞資料,做為原始基礎位元組。

  • org.apache.kafka.connect.json.JsonConverter:將資料轉換為 JSON 格式,或從 JSON 格式轉換資料。

  • org.apache.kafka.connect.storage.StringConverter:將資料轉換為字串格式,或從字串格式轉換資料。

  • org.apache.kafka.connect.converters.ByteArrayConverter:將資料轉換為位元組陣列,或從位元組陣列轉換資料。

  • org.apache.kafka.connect.converters.DoubleConverter:將資料轉換為 Double 格式,或從 Double 格式轉換資料。

  • org.apache.kafka.connect.converters.FloatConverter:將資料轉換為 Float 格式,或從 Float 格式轉換資料。

  • org.apache.kafka.connect.converters.IntegerConverter:將資料轉換為整數格式,或從整數格式轉換資料。

  • org.apache.kafka.connect.converters.LongConverter:將資料轉換為 Long 格式,或從 Long 格式轉換資料。

  • org.apache.kafka.connect.converters.ShortConverter:將資料轉換為簡短格式,或從簡短格式轉換資料。

  • org.apache.kafka.connect.converters.BooleanConverter:將資料轉換為布林值格式,或從布林值格式轉換資料。

  • io.confluent.connect.avro.AvroConverter:將資料轉換為 Apache Avro 格式,或從該格式轉換資料。

在這個版本中,Kafka Connect 不支援使用 Schema Registry 針對遠端結構定義進行驗證。

如要瞭解各個連接器的偏好轉換器,請參閱特定連接器的說明文件。

預設轉換器設定

所有支援的連接器預設的鍵和值轉換器為 org.apache.kafka.connect.json.JsonConverter

設定連線器時,您需要為 Kafka 訊息的鍵和值指定適當的轉換器。舉例來說,如果您處理的是 JSON 資料,請使用 JsonConverter。如果資料採用字串格式,請使用 StringConverter

常見的設定包括:

  • tasks.max:這個連接器可建立的任務數量上限。 這項設定可控制連接器的平行處理量。增加工作數量可提高總處理量,但也會增加資源耗用量 (CPU 和記憶體)。最佳值取決於工作負載、分配給 Connect 叢集工作站的資源,以及 Kafka 主題分割區的數量 (適用於接收器連接器)。

  • value.converter:用於序列化訊息的,然後再將訊息傳送至 Cloud Storage bucket。常見的轉換器包括:

    • org.apache.kafka.connect.json.JsonConverter:適用於 JSON 資料。使用這個轉換器搭配純 JSON (不含結構定義) 時,通常需要設定 value.converter.schemas.enable=false

    • org.apache.kafka.connect.converters.ByteArrayConverter:確保兩個系統中的訊息內容完全一致。

    • org.apache.kafka.connect.storage.StringConverter:適用於純文字字串。

  • key.converter:用於序列化訊息的轉換器。適用與 value.converter 相同的轉換器選項。如果訊息沒有金鑰,通常可以使用 org.apache.kafka.connect.storage.StringConverter

  • value.converter.schemas.enable:如果是接收器連接器,使用 org.apache.kafka.connect.json.JsonConverter 時將此設定為 true,會指示 Kafka Connect 尋找並使用內嵌在傳入 Kafka 訊息中的結構定義。如果設為 false (預設值),Kafka Connect 預期資料為純 JSON,不含內嵌結構定義。

轉換 (選用)

轉換外掛程式可在資料管道中控管或補充資料。轉換功能可讓您在個別訊息傳送至 Managed Service for Apache Kafka (適用於來源連接器) 或外部系統 (適用於接收器連接器) 前進行修改。您可能會使用轉換來遮蓋私密資料、新增時間戳記或重新命名欄位。

述詞 (選用)

述詞可根據特定條件篩選資料。述詞可做為套用轉換的篩選器,根據訊息屬性判斷轉換作業要套用至哪些訊息。

在 Google Cloud中管理 Kafka Connect

有了 Kafka Connect,您就能專注於部署連接器, Google Cloud 則會處理基礎架構和作業複雜度。以下列出 Google Cloud 自動執行的作業,以及您可以設定的項目:

Kafka Connect 服務會自動執行下列作業:

  • 佈建 Kafka Connect worker:建立 Connect 叢集時,Kafka Connect 服務會自動在 Kubernetes 中佈建 worker 叢集。

  • 網路:Kafka Connect 服務會設定網路,以便工作人員、Managed Service for Apache Kafka 代理程式和外部系統之間進行通訊。在某些情況下,您可能需要變更現有的網路設定。

  • 可用區復原能力:Kafka Connect 服務會將工作站分散到至少三個可用區,確保可用區發生中斷時,資料處理作業仍可繼續進行。

  • 驗證:Kafka Connect 服務也會設定與 Kafka 代理程式的驗證,確保連線安全無虞。

  • 發布和升級:Kafka Connect 服務會管理工作人員設定變更、版本升級和安全性修補程式,確保部署項目一律為最新狀態。

在 Kafka Connect 服務中,您可以執行下列設定:

  • 容量和網路限制:定義資源限制和網路設定,以提升效能和成本效益。

  • 監控和記錄:存取連結器的記錄和指標,監控效能及排解問題。

  • 連接器生命週期管理:視需要暫停、繼續、重新啟動或停止連接器,以管理資料管道。

限制

  • Kafka Connect 服務只支援將 Managed Service for Apache Kafka 叢集做為主要 Kafka 叢集。主要叢集是 Kafka Connect 叢集寫入中繼資料的叢集。

  • 這項服務不支援將自訂連接器外掛程式上傳至 Kafka Connect 叢集。

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。