將 Pub/Sub 連線至 Apache Kafka

本文說明如何使用 Pub/Sub Group Kafka Connector 整合 Apache Kafka 和 Pub/Sub。

關於 Pub/Sub Group Kafka Connector

Apache Kafka 是開放原始碼平台,可串流事件。這項技術通常用於分散式架構,可讓鬆耦合的元件之間進行通訊。Pub/Sub 是一項代管服務,可非同步傳送及接收訊息。與 Kafka 相同,您可以使用 Pub/Sub 在雲端架構中的元件之間通訊。

您可以使用 Pub/Sub Group Kafka Connector 整合這兩個系統。連接器 JAR 中封裝了下列連接器:

  • 接收器連接器會讀取一或多個 Kafka 主題的記錄,並發布至 Pub/Sub。
  • 來源連接器會從 Pub/Sub 主題讀取訊息,並發布至 Kafka。

以下列舉幾個可能使用 Pub/Sub Group Kafka Connector 的情境:

  • 您要將以 Kafka 為基礎的架構遷移至 Google Cloud。
  • 您有一個前端系統,可將事件儲存在Google Cloud外部的 Kafka 中,但您也使用 Google Cloud 執行部分後端服務,這些服務需要接收 Kafka 事件。
  • 從內部部署 Kafka 解決方案收集記錄,並傳送至Google Cloud 進行資料分析。
  • 您有一個使用 Google Cloud的前端系統,但您也使用 Kafka 在地端儲存資料。

連接器需要 Kafka Connect,這是用於在 Kafka 和其他系統之間串流資料的架構。如要使用連接器,您必須在 Kafka 叢集旁執行 Kafka Connect。

本文假設您已熟悉 Kafka 和 Pub/Sub。建議您先完成其中一個 Pub/Sub 快速入門導覽課程,再閱讀本文。

Pub/Sub 連接器不支援 IAM 與 Kafka Connect ACL 之間的任何整合。 Google Cloud

開始使用連接器

本節將逐步說明如何執行下列工作:

  1. 設定 Pub/Sub Group Kafka Connector。
  2. 將 Kafka 的事件傳送至 Pub/Sub。
  3. 將訊息從 Pub/Sub 傳送至 Kafka。

必要條件

安裝 Kafka

按照 Apache Kafka 快速入門導覽課程的說明,在本機電腦上安裝單一節點 Kafka。完成快速入門導覽課程中的下列步驟:

  1. 下載最新版 Kafka 並解壓縮。
  2. 啟動 Kafka 環境。
  3. 建立 Kafka 主題。

驗證

Pub/Sub Group Kafka Connector 必須向 Pub/Sub 進行驗證,才能傳送及接收 Pub/Sub 訊息。如要設定驗證,請按照下列步驟操作:

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. 安裝 Google Cloud CLI。

  3. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  4. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  5. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  6. 為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

  7. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。
  8. 安裝 Google Cloud CLI。

  9. 若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI

  10. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  11. 建立或選取 Google Cloud 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Google Cloud 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Google Cloud 專案名稱。

    • 選取您建立的 Google Cloud 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Google Cloud 專案名稱。

  12. 為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

  13. 將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    更改下列內容:

    • PROJECT_ID:專案 ID。
    • USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com
    • ROLE:授予使用者帳戶的 IAM 角色。

下載連接器 JAR

將連接器 JAR 檔案下載到本機電腦。詳情請參閱 GitHub 讀我檔案中的「取得連接器」。

複製連接器設定檔

  1. 複製或下載連接器的 GitHub 存放區

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 目錄的內容複製到 Kafka 安裝目錄的 config 子目錄。

    cp config/* [path to Kafka installation]/config/
    

這些檔案包含連接器的設定

更新 Kafka Connect 設定

  1. 前往包含您下載的 Kafka Connect 二進位檔的目錄。
  2. 在 Kafka Connect 二進位目錄中,透過文字編輯器開啟名為 config/connect-standalone.properties 的檔案。
  3. 如果 plugin.path property 已註解排除,請取消註解。
  4. 更新 plugin.path property,加入連接器 JAR 的路徑。

    範例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 屬性設為本機檔案名稱。在獨立模式下,Kafka 會使用這個檔案儲存偏移資料。

    範例:

    offset.storage.file.filename=/tmp/connect.offsets
    

將 Kafka 的事件轉送至 Pub/Sub

本節說明如何啟動接收器連接器、將事件發布至 Kafka,然後從 Pub/Sub 讀取轉送的訊息。

  1. 使用 Google Cloud CLI 建立含有訂閱項目的 Pub/Sub 主題。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    更改下列內容:

    • PUBSUB_TOPIC:要接收來自 Kafka 訊息的 Pub/Sub 主題名稱。
    • PUBSUB_SUBSCRIPTION:主題的 Pub/Sub 訂閱項目名稱。
  2. 使用文字編輯器開啟 /config/cps-sink-connector.properties 檔案。為下列屬性新增值,這些屬性在註解中標示為 "TODO"

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    更改下列內容:

    • KAFKA_TOPICS:以半形逗號分隔的 Kafka 主題清單,用於讀取資料。
    • PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
    • PUBSUB_TOPIC:接收 Kafka 訊息的 Pub/Sub 主題。
  3. 在 Kafka 目錄中執行下列指令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. 請按照 Apache Kafka 快速入門導覽課程中的步驟,將一些事件寫入 Kafka 主題。

  5. 使用 gcloud CLI 從 Pub/Sub 讀取事件。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

將 Pub/Sub 中的訊息轉送至 Kafka

本節說明如何啟動來源連接器、將訊息發布至 Pub/Sub,以及從 Kafka 讀取轉送的訊息。

  1. 使用 gcloud CLI 建立含有訂閱項目的 Pub/Sub 主題。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    更改下列內容:

    • PUBSUB_TOPIC:Pub/Sub 主題的名稱。
    • PUBSUB_SUBSCRIPTION:Pub/Sub 訂閱項目的名稱。
  2. 使用文字編輯器開啟名為 /config/cps-source-connector.properties 的檔案。為下列屬性新增值,這些屬性在註解中標示為 "TODO"

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    更改下列內容:

    • KAFKA_TOPIC:用於接收 Pub/Sub 訊息的 Kafka 主題。
    • PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
    • PUBSUB_TOPIC:Pub/Sub 主題。
  3. 在 Kafka 目錄中執行下列指令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. 使用 gcloud CLI 將訊息發布至 Pub/Sub。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. 從 Kafka 讀取訊息。請按照 Apache Kafka 快速入門導覽課程中的步驟,從 Kafka 主題讀取訊息。

簡訊轉換

Kafka 記錄包含鍵和值,兩者都是可變長度的位元組陣列。Kafka 記錄也可以選擇性地包含標頭,也就是鍵/值組合。Pub/Sub 訊息主要包含兩個部分:訊息內文和零或多個鍵值屬性。

Kafka Connect 會使用轉換器,將鍵和值序列化為 Kafka,以及從 Kafka 序列化。如要控管序列化作業,請在連接器設定檔中設定下列屬性:

  • key.converter:用於序列化記錄鍵的轉換器。
  • value.converter:用於序列化記錄值的轉換器。

Pub/Sub 訊息的主體是 ByteString 物件,因此最有效率的轉換方式是直接複製酬載。因此,我們建議盡可能使用可產生原始資料類型 (整數、浮點數、字串或位元組結構定義) 的轉換器,避免對同一訊息主體進行還原序列化和重新序列化。

從 Kafka 轉換至 Pub/Sub

接收器連接器會將 Kafka 記錄轉換為 Pub/Sub 訊息,轉換方式如下:

  • Kafka 記錄鍵會以名為 "key" 的屬性形式,儲存在 Pub/Sub 訊息中。
  • 根據預設,連接器會捨棄 Kafka 記錄中的所有標頭。不過,如果您將 headers.publish 設定選項設為 true,連接器會將標頭寫入為 Pub/Sub 屬性。如果任何標頭超出 Pub/Sub 的訊息屬性限制,連接器就會略過這些標頭。
  • 如果是整數、浮點數、字串和位元組結構定義,連接器會直接將 Kafka 記錄值的位元組傳遞至 Pub/Sub 訊息主體。
  • 如果是結構體結構定義,連接器會將每個欄位寫入為 Pub/Sub 訊息的屬性。舉例來說,如果欄位為 { "id"=123 },產生的 Pub/Sub 訊息就會有 "id"="123" 屬性。欄位值一律會轉換為字串。地圖和結構類型不支援做為結構內的欄位類型。
  • 如果是對應結構定義,連接器會將每個鍵/值組合寫入為 Pub/Sub 訊息的屬性。舉例來說,如果對應項目是 {"alice"=1,"bob"=2},產生的 Pub/Sub 訊息就會有 "alice"="1""bob"="2" 這兩項屬性。鍵和值會轉換為字串。

結構體和對應結構定義有一些額外行為:

  • 您可以設定 messageBodyName 設定屬性,指定特定結構體欄位或對應鍵做為訊息主體。欄位或金鑰的值會以 ByteString 形式儲存在訊息內文中。如果未設定 messageBodyName,則結構體和對應項架構的訊息內文會是空白。

  • 如果是陣列值,連接器只支援原始陣列類型。陣列中的值序列會串連成單一 ByteString 物件。

從 Pub/Sub 轉換為 Kafka

來源連接器會將 Pub/Sub 訊息轉換為 Kafka 記錄,轉換方式如下:

  • Kafka 記錄鍵:預設鍵為 null。您也可以設定 kafka.key.attribute 配置選項,指定要用做鍵的 Pub/Sub 訊息屬性。在這種情況下,連接器會尋找具有該名稱的屬性,並將記錄鍵設為屬性值。如果沒有指定屬性,記錄鍵會設為 null

  • Kafka 記錄值。連接器會依下列方式寫入記錄值:

    • 如果 Pub/Sub 訊息沒有自訂屬性,連接器會使用 value.converter 指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值,做為 byte[] 型別。

    • 如果 Pub/Sub 訊息具有自訂屬性,且 kafka.record.headersfalse,連接器會將 struct 寫入記錄值。這個結構體包含每個屬性的一個欄位,以及一個名為 "message" 的欄位,其值為 Pub/Sub 訊息主體 (以位元組形式儲存):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      在這種情況下,您必須使用與 struct 結構定義相容的 value.converter,例如 org.apache.kafka.connect.json.JsonConverter

    • 如果 Pub/Sub 訊息含有自訂屬性,且 kafka.record.headerstrue,連接器會將屬性寫入 Kafka 記錄標頭。它會使用 value.converter 指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值,做為 byte[] 型別。

  • Kafka 記錄標頭。根據預設,除非您將 kafka.record.headers 設為 true,否則標頭會是空白。

設定選項

除了 Kafka Connect API 提供的設定外,Pub/Sub Group Kafka Connector 也支援接收器和來源設定,如「 Pub/Sub 連接器設定」一文所述。

取得支援

如需協助,請建立支援單。 如有一般問題或要進行討論,請在 GitHub 存放區中建立問題。

後續步驟