Pub/Sub 來源連接器會將訊息從 Pub/Sub 串流至 Kafka。 這樣一來,您就能將 Pub/Sub 與 Kafka 型應用程式和資料管道整合。
這個連接器會從 Pub/Sub 訂閱項目讀取訊息,將每則訊息轉換為 Kafka 記錄,然後將記錄寫入 Kafka 主題。根據預設,連接器會建立下列 Kafka 記錄:
- Kafka 記錄鍵為
null。 - Kafka 記錄值是 Pub/Sub 訊息資料 (以位元組為單位)。
- Kafka 記錄標頭為空白。
不過,您可以設定這項行為。詳情請參閱「設定連接器」。
事前準備
建立 Pub/Sub 來源連接器前,請確認您具備下列項目:
具有訂閱項目的 Pub/Sub 主題。
Kafka 叢集中的 Kafka 主題。
連結叢集。 建立 Connect 叢集時,請將 Managed Service for Apache Kafka 叢集設為主要 Kafka 叢集。
必要角色和權限
如要取得建立 Pub/Sub 來源連接器所需的權限,請要求管理員在包含 Connect 叢集的專案中,授予您「Managed Kafka Connector 編輯者 」(roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
這個預先定義的角色具備建立 Pub/Sub 來源連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:
所需權限
如要建立 Pub/Sub 來源連接器,您必須具備下列權限:
-
在上層 Connect 叢集上授予建立連接器的權限:
managedkafka.connectors.create
如要進一步瞭解 Managed Kafka Connector 編輯者角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。
如果 Managed Service for Apache Kafka 叢集與 Connect 叢集位於同一個專案,則不需要其他權限。如果 Connect 叢集位於其他專案,請參閱「在其他專案中建立 Connect 叢集」。
授予從 Pub/Sub 讀取的權限
代管 Kafka 服務帳戶必須具備從 Pub/Sub 訂閱項目讀取訊息的權限。在包含 Pub/Sub 訂閱項目的專案中,將下列 IAM 角色授予服務帳戶:
- Pub/Sub 訂閱者 (
roles/pubsub.subscriber) - Pub/Sub 檢視器 (
roles/pubsub.viewer)
代管型 Kafka 服務帳戶的格式如下:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com。
將 PROJECT_NUMBER 替換為專案編號。
建立 Pub/Sub 來源連接器
控制台
前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。
按一下要建立連接器的 Connect 叢集。
按一下「Create connector」(建立連接器)。
輸入連接器名稱字串。
如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。
在「連接器外掛程式」部分,選取「Pub/Sub 來源」。
在「Cloud Pub/Sub subscription」(Cloud Pub/Sub 訂閱項目) 清單中,選取 Pub/Sub 訂閱項目。連接器會從這個訂閱項目提取訊息。訂閱項目會以完整資源名稱顯示:
projects/{project}/subscriptions/{subscription}。在「Kafka 主題」清單中,選取要寫入訊息的 Kafka 主題。
選用:在「設定」方塊中,新增設定屬性或編輯預設屬性。詳情請參閱「設定連接器」。
選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。
點選「建立」。
gcloud
執行
gcloud managed-kafka connectors create指令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE更改下列內容:
CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。
LOCATION:Connect 叢集的位置。
CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。
CONFIG_FILE:YAML 或 JSON 設定檔的路徑。
以下是設定檔的範例:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
更改下列內容:
PROJECT_ID:Pub/Sub 訂閱項目所在的專案 ID。 Google Cloud
PUBSUB_SUBSCRIPTION_ID:要從中擷取資料的 Pub/Sub 訂閱項目 ID。
KAFKA_TOPIC_ID:寫入資料的 Kafka 主題 ID。
cps.project、cps.subscription 和 kafka.topic 設定屬性為必填。如需其他設定選項,請參閱「設定連接器」。
Terraform
您可以使用 Terraform 資源建立連接器。
如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。
Go
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。
Java
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。
Python
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。
建立連接器後,您可以編輯、刪除、暫停、停止或重新啟動連接器。
設定連接器
本節說明您可以在連接器上設定的部分設定屬性。
如需這個連接器的專屬屬性完整清單,請參閱 Pub/Sub 來源連接器設定。
提取模式
提取模式會指定連接器擷取 Pub/Sub 訊息的方式。支援的模式如下:
提取模式 (預設)。系統會分批提取訊息。如要啟用這項模式,請設定
cps.streamingPull.enabled=false.。如要設定批次大小,請設定cps.maxBatchSize屬性。如要進一步瞭解提取模式,請參閱「提取 API」。
串流提取模式。從 Pub/Sub 擷取訊息時,可達到最大總處理量和最低延遲時間。如要啟用這項模式,請設定
cps.streamingPull.enabled=true。如要進一步瞭解串流提取模式,請參閱「StreamingPull API」。
如果啟用串流提取功能,您可以設定下列設定屬性來調整效能:
cps.streamingPull.flowControlBytes:每個工作未處理的訊息位元組數上限。cps.streamingPull.flowControlMessages:每個工作未處理訊息的數量上限。cps.streamingPull.maxAckExtensionMs:連接器延長訂閱期限的時間上限 (以毫秒為單位)。cps.streamingPull.maxMsPerAckExtension:連接器每次延長訂閱期限的時間上限,單位為毫秒。cps.streamingPull.parallelStreams:要從訂閱項目提取訊息的串流數量。
Pub/Sub 端點
連接器預設會使用全域 Pub/Sub 端點。如要指定端點,請將 cps.endpoint 屬性設為端點位址。如要進一步瞭解端點,請參閱「Pub/Sub 端點」。
Kafka 記錄
Pub/Sub 來源連接器會將 Pub/Sub 訊息轉換為 Kafka 記錄。以下各節說明轉換程序。
記錄鍵
金鑰轉換器必須為 org.apache.kafka.connect.storage.StringConverter。
根據預設,記錄鍵為
null。如要使用 Pub/Sub 訊息屬性做為鍵,請將
kafka.key.attribute設為屬性名稱。例如:kafka.key.attribute=username。如要使用 Pub/Sub 排序鍵做為鍵,請設定
kafka.key.attribute=orderingKey。
記錄標頭
根據預設,記錄標頭為空白。
如果 kafka.record.headers 為 true,Pub/Sub 訊息屬性會寫入為記錄標頭。如要加入排序鍵,請設定 cps.makeOrderingKeyAttribute=true。
記錄價值
如果 kafka.record.headers 為 true,或 Pub/Sub 訊息沒有自訂屬性,記錄值就是訊息資料 (位元組陣列)。將值轉換器設為 org.apache.kafka.connect.converters.ByteArrayConverter。
否則,如果 kafka.record.headers 為 false,且訊息至少有一個自訂屬性,連接器會將記錄值寫為 struct。將值轉換器設為 org.apache.kafka.connect.json.JsonConverter。
struct 包含下列欄位:
message:Pub/Sub 訊息資料 (以位元組表示)。每個 Pub/Sub 訊息屬性的欄位。如要加入排序鍵,請設定
cps.makeOrderingKeyAttribute=true。
舉例來說,假設訊息具有 username 屬性:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
如果 value.converter.schemas.enable 是 true,則 struct 會同時包含酬載和結構定義:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Kafka 分區
根據預設,連接器會寫入主題中的單一分區。如要指定連接器寫入的分區數量,請設定 kafka.partition.count 屬性。值不得超過主題的分割區計數。
如要指定連接器將郵件指派給分割區的方式,請設定 kafka.partition.scheme 屬性。詳情請參閱「Pub/Sub 來源連接器設定」。