本文說明如何使用 Pub/Sub Group Kafka Connector 整合 Apache Kafka 和 Pub/Sub。
關於 Pub/Sub Group Kafka Connector
Apache Kafka 是開放原始碼平台,可串流事件。這項技術通常用於分散式架構,可讓鬆耦合的元件之間進行通訊。Pub/Sub 是一項代管服務,可非同步傳送及接收訊息。與 Kafka 相同,您可以使用 Pub/Sub 在雲端架構中的元件之間通訊。
您可以使用 Pub/Sub Group Kafka Connector 整合這兩個系統。連接器 JAR 中封裝了下列連接器:
- 接收器連接器會讀取一或多個 Kafka 主題的記錄,並發布至 Pub/Sub。
- 來源連接器會從 Pub/Sub 主題讀取訊息,並發布至 Kafka。
以下列舉幾個可能使用 Pub/Sub Group Kafka Connector 的情境:
- 您要將以 Kafka 為基礎的架構遷移至 Google Cloud。
- 您有一個前端系統,可將事件儲存在Google Cloud外部的 Kafka 中,但您也使用 Google Cloud 執行部分後端服務,這些服務需要接收 Kafka 事件。
- 從內部部署 Kafka 解決方案收集記錄,並傳送至Google Cloud 進行資料分析。
- 您有一個使用 Google Cloud的前端系統,但您也使用 Kafka 在地端儲存資料。
連接器需要 Kafka Connect,這是用於在 Kafka 和其他系統之間串流資料的架構。如要使用連接器,您必須在 Kafka 叢集旁執行 Kafka Connect。
本文假設您已熟悉 Kafka 和 Pub/Sub。建議您先完成其中一個 Pub/Sub 快速入門導覽課程,再閱讀本文。
Pub/Sub 連接器不支援 IAM 與 Kafka Connect ACL 之間的任何整合。 Google Cloud
開始使用連接器
本節將逐步說明如何執行下列工作:- 設定 Pub/Sub Group Kafka Connector。
- 將 Kafka 的事件傳送至 Pub/Sub。
- 將訊息從 Pub/Sub 傳送至 Kafka。
必要條件
安裝 Kafka
按照 Apache Kafka 快速入門導覽課程的說明,在本機電腦上安裝單一節點 Kafka。完成快速入門導覽課程中的下列步驟:
- 下載最新版 Kafka 並解壓縮。
- 啟動 Kafka 環境。
- 建立 Kafka 主題。
驗證
Pub/Sub Group Kafka Connector 必須向 Pub/Sub 進行驗證,才能傳送及接收 Pub/Sub 訊息。如要設定驗證,請按照下列步驟操作:
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
-
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
執行下列指令,初始化 gcloud CLI:
gcloud init -
選取或建立專案所需的角色
- 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
-
建立專案:如要建立專案,您需要具備專案建立者角色 (
roles/resourcemanager.projectCreator),其中包含resourcemanager.projects.create權限。瞭解如何授予角色。
-
建立 Google Cloud 專案:
gcloud projects create PROJECT_ID
將
PROJECT_ID替換為您要建立的 Google Cloud 專案名稱。 -
選取您建立的 Google Cloud 專案:
gcloud config set project PROJECT_ID
將
PROJECT_ID替換為 Google Cloud 專案名稱。
-
為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
-
將角色授予使用者帳戶。針對下列每個 IAM 角色,執行一次下列指令:
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
更改下列內容:
PROJECT_ID:專案 ID。USER_IDENTIFIER:使用者帳戶的 ID。 例如:myemail@example.com。ROLE:授予使用者帳戶的 IAM 角色。
下載連接器 JAR
將連接器 JAR 檔案下載到本機電腦。詳情請參閱 GitHub 讀我檔案中的「取得連接器」。
複製連接器設定檔
複製或下載連接器的 GitHub 存放區。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector將
config目錄的內容複製到 Kafka 安裝目錄的config子目錄。cp config/* [path to Kafka installation]/config/
這些檔案包含連接器的設定。
更新 Kafka Connect 設定
- 前往包含您下載的 Kafka Connect 二進位檔的目錄。
- 在 Kafka Connect 二進位目錄中,透過文字編輯器開啟名為
config/connect-standalone.properties的檔案。 - 如果
plugin.path property已註解排除,請取消註解。 更新
plugin.path property,加入連接器 JAR 的路徑。範例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar將
offset.storage.file.filename屬性設為本機檔案名稱。在獨立模式下,Kafka 會使用這個檔案儲存偏移資料。範例:
offset.storage.file.filename=/tmp/connect.offsets
將 Kafka 的事件轉送至 Pub/Sub
本節說明如何啟動接收器連接器、將事件發布至 Kafka,然後從 Pub/Sub 讀取轉送的訊息。
使用 Google Cloud CLI 建立含有訂閱項目的 Pub/Sub 主題。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
更改下列內容:
- PUBSUB_TOPIC:要接收來自 Kafka 訊息的 Pub/Sub 主題名稱。
- PUBSUB_SUBSCRIPTION:主題的 Pub/Sub 訂閱項目名稱。
使用文字編輯器開啟
/config/cps-sink-connector.properties檔案。為下列屬性新增值,這些屬性在註解中標示為"TODO":topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
更改下列內容:
- KAFKA_TOPICS:以半形逗號分隔的 Kafka 主題清單,用於讀取資料。
- PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
- PUBSUB_TOPIC:接收 Kafka 訊息的 Pub/Sub 主題。
在 Kafka 目錄中執行下列指令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties請按照 Apache Kafka 快速入門導覽課程中的步驟,將一些事件寫入 Kafka 主題。
使用 gcloud CLI 從 Pub/Sub 讀取事件。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
將 Pub/Sub 中的訊息轉送至 Kafka
本節說明如何啟動來源連接器、將訊息發布至 Pub/Sub,以及從 Kafka 讀取轉送的訊息。
使用 gcloud CLI 建立含有訂閱項目的 Pub/Sub 主題。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
更改下列內容:
- PUBSUB_TOPIC:Pub/Sub 主題的名稱。
- PUBSUB_SUBSCRIPTION:Pub/Sub 訂閱項目的名稱。
使用文字編輯器開啟名為
/config/cps-source-connector.properties的檔案。為下列屬性新增值,這些屬性在註解中標示為"TODO":kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
更改下列內容:
- KAFKA_TOPIC:用於接收 Pub/Sub 訊息的 Kafka 主題。
- PROJECT_ID:包含 Pub/Sub 主題的 Google Cloud 專案。
- PUBSUB_TOPIC:Pub/Sub 主題。
在 Kafka 目錄中執行下列指令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties使用 gcloud CLI 將訊息發布至 Pub/Sub。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
從 Kafka 讀取訊息。請按照 Apache Kafka 快速入門導覽課程中的步驟,從 Kafka 主題讀取訊息。
簡訊轉換
Kafka 記錄包含鍵和值,兩者都是可變長度的位元組陣列。Kafka 記錄也可以選擇性地包含標頭,也就是鍵/值組合。Pub/Sub 訊息主要包含兩個部分:訊息內文和零或多個鍵值屬性。
Kafka Connect 會使用轉換器,將鍵和值序列化為 Kafka,以及從 Kafka 序列化。如要控管序列化作業,請在連接器設定檔中設定下列屬性:
key.converter:用於序列化記錄鍵的轉換器。value.converter:用於序列化記錄值的轉換器。
Pub/Sub 訊息的主體是 ByteString 物件,因此最有效率的轉換方式是直接複製酬載。因此,我們建議盡可能使用可產生原始資料類型 (整數、浮點數、字串或位元組結構定義) 的轉換器,避免對同一訊息主體進行還原序列化和重新序列化。
從 Kafka 轉換至 Pub/Sub
接收器連接器會將 Kafka 記錄轉換為 Pub/Sub 訊息,轉換方式如下:
- Kafka 記錄鍵會以名為
"key"的屬性形式,儲存在 Pub/Sub 訊息中。 - 根據預設,連接器會捨棄 Kafka 記錄中的所有標頭。不過,如果您將
headers.publish設定選項設為true,連接器會將標頭寫入為 Pub/Sub 屬性。如果任何標頭超出 Pub/Sub 的訊息屬性限制,連接器就會略過這些標頭。 - 如果是整數、浮點數、字串和位元組結構定義,連接器會直接將 Kafka 記錄值的位元組傳遞至 Pub/Sub 訊息主體。
- 如果是結構體結構定義,連接器會將每個欄位寫入為 Pub/Sub 訊息的屬性。舉例來說,如果欄位為
{ "id"=123 },產生的 Pub/Sub 訊息就會有"id"="123"屬性。欄位值一律會轉換為字串。地圖和結構類型不支援做為結構內的欄位類型。 - 如果是對應結構定義,連接器會將每個鍵/值組合寫入為 Pub/Sub 訊息的屬性。舉例來說,如果對應項目是
{"alice"=1,"bob"=2},產生的 Pub/Sub 訊息就會有"alice"="1"和"bob"="2"這兩項屬性。鍵和值會轉換為字串。
結構體和對應結構定義有一些額外行為:
您可以設定
messageBodyName設定屬性,指定特定結構體欄位或對應鍵做為訊息主體。欄位或金鑰的值會以ByteString形式儲存在訊息內文中。如果未設定messageBodyName,則結構體和對應項架構的訊息內文會是空白。如果是陣列值,連接器只支援原始陣列類型。陣列中的值序列會串連成單一
ByteString物件。
從 Pub/Sub 轉換為 Kafka
來源連接器會將 Pub/Sub 訊息轉換為 Kafka 記錄,轉換方式如下:
Kafka 記錄鍵:預設鍵為
null。您也可以設定kafka.key.attribute配置選項,指定要用做鍵的 Pub/Sub 訊息屬性。在這種情況下,連接器會尋找具有該名稱的屬性,並將記錄鍵設為屬性值。如果沒有指定屬性,記錄鍵會設為null。Kafka 記錄值。連接器會依下列方式寫入記錄值:
如果 Pub/Sub 訊息沒有自訂屬性,連接器會使用
value.converter指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值,做為byte[]型別。如果 Pub/Sub 訊息具有自訂屬性,且
kafka.record.headers為false,連接器會將 struct 寫入記錄值。這個結構體包含每個屬性的一個欄位,以及一個名為"message"的欄位,其值為 Pub/Sub 訊息主體 (以位元組形式儲存):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }在這種情況下,您必須使用與
struct結構定義相容的value.converter,例如org.apache.kafka.connect.json.JsonConverter。如果 Pub/Sub 訊息含有自訂屬性,且
kafka.record.headers為true,連接器會將屬性寫入 Kafka 記錄標頭。它會使用value.converter指定的轉換器,將 Pub/Sub 訊息主體直接寫入 Kafka 記錄值,做為byte[]型別。
Kafka 記錄標頭。根據預設,除非您將
kafka.record.headers設為true,否則標頭會是空白。
設定選項
除了 Kafka Connect API 提供的設定外,Pub/Sub Group Kafka Connector 也支援接收器和來源設定,如「 Pub/Sub 連接器設定」一文所述。
取得支援
如需協助,請建立支援單。 如有一般問題或要進行討論,請在 GitHub 存放區中建立問題。
後續步驟
- 瞭解 Kafka 和 Pub/Sub 的差異。
- 進一步瞭解 Pub/Sub Group Kafka Connector。
- 請參閱 Pub/Sub Group Kafka Connector 的 GitHub 存放區。