Pub/Sub 接收器連接器會將 Kafka 主題的訊息串流至 Pub/Sub 主題。這項功能可讓您將以 Kafka 為基礎的應用程式與 Pub/Sub 整合,進而簡化事件驅動架構和即時資料處理作業。
事前準備
建立 Pub/Sub 接收器連接器前,請確認您具備下列項目:
為 Connect 叢集建立 Managed Service for Apache Kafka 叢集。這是與 Connect 叢集相關聯的主要 Kafka 叢集。這也是構成連接器管道一端的來源叢集。
建立 Connect 叢集,用於代管 Pub/Sub 接收器連接器。
在來源叢集中建立及設定 Kafka 主題。資料會從這個 Kafka 主題移至目的地 Pub/Sub 主題。
必要角色和權限
如要取得建立 Pub/Sub Sink 連接器所需的權限,請要求管理員在包含 Connect 叢集的專案中,授予您下列 IAM 角色:
-
代管 Kafka 連接器編輯者 (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Pub/Sub 發布者 (
roles/pubsub.publisher)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
這些預先定義的角色具備建立 Pub/Sub 接收器連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:
所需權限
如要建立 Pub/Sub 接收器連接器,您必須具備下列權限:
-
在上層 Connect 叢集上授予建立連接器的權限:
managedkafka.connectors.create
如要進一步瞭解「Managed Kafka Connector Editor」角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。
如果 Managed Service for Apache Kafka 叢集與 Connect 叢集位於同一個專案,則不需要其他權限。如果 Connect 叢集位於其他專案,請參閱「在其他專案中建立 Connect 叢集」。
授予發布至 Pub/Sub 主題的權限
Connect 叢集服務帳戶 (格式為 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com) 需要有權發布訊息至 Pub/Sub 主題。如要這麼做,請在包含 Pub/Sub 主題的專案中,將 Pub/Sub 發布者角色 (roles/pubsub.publisher) 授予 Connect 叢集服務帳戶。
Pub/Sub 接收器連接器的運作方式
Pub/Sub 接收器連接器會從一或多個 Kafka 主題提取訊息,並發布至 Pub/Sub 主題。
以下詳細說明 Pub/Sub Sink 連接器如何複製資料:
連接器會取用來源叢集內一或多個 Kafka 主題的訊息。
連結器會將訊息寫入
cps.topic設定屬性指定的目標 Pub/Sub 主題 ID。這是必填屬性。連結器也需要使用
cps.project設定屬性,指定包含 Pub/Sub 主題的 Google Cloud 專案。這是必要屬性。連接器也可以選擇使用自訂 Pub/Sub 端點,方法是使用
cps.endpoint屬性指定端點。預設端點為"pubsub.googleapis.com:443"。為提升效能,連接器會先緩衝處理訊息,再發布至 Pub/Sub。您可以設定
maxBufferSize、maxBufferBytes、maxDelayThresholdMs、maxOutstandingRequestBytes和maxOutstandingMessages來控制緩衝。Kafka 記錄包含三個元件:標頭、鍵、值。 連接器會使用鍵和值轉換器,將 Kafka 訊息資料轉換為 Pub/Sub 預期的格式。使用結構體或對應值結構定義時,
messageBodyName屬性會指定要用做 Pub/Sub 訊息內文的欄位或鍵。只要將
metadata.publish屬性設為true,連接器就能將 Kafka 主題、分割區、偏移和時間戳記納入訊息屬性。連接器可以將 Kafka 訊息標頭納入 Pub/Sub 訊息屬性,方法是將
headers.publish屬性設為true。連接器可以使用
orderingKeySource屬性,為 Pub/Sub 訊息加入排序鍵。可能的值包括"none"(預設值)、"key"和"partition"。tasks.max屬性可控制連接器的平行處理層級。增加tasks.max可提高輸送量,但實際的平行處理量會受 Kafka 主題中的分區數量限制。
Pub/Sub 接收器連接器的屬性
建立 Pub/Sub 接收器連接器時,您需要指定下列屬性。
連接器名稱
Connect 叢集內連接器的專屬名稱。 如要查看資源命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。
連接器外掛程式類型
選取「Pub/Sub Sink」做為連接器外掛程式類型。這會決定資料流的方向 (從 Kafka 到 Pub/Sub),以及使用的特定連接器實作方式。如果您未使用使用者介面設定連接器,則必須另外指定連接器類別。
Kafka 主題
連接器從中取用訊息的 Kafka 主題。
您可以指定一或多個主題,也可以使用規則運算式比對多個主題。例如,topic.* 可比對所有以「topic」開頭的主題。這些主題必須位於與 Connect 叢集相關聯的 Managed Service for Apache Kafka 叢集內。
Pub/Sub 主題
現有的 Pub/Sub 主題,連接器會將訊息發布至該主題。如「事前準備」一文所述,請確保 Connect 叢集服務帳戶在主題的專案中具備 roles/pubsub.publisher 角色。
設定
您可以在這個部分指定其他連接器專屬的設定屬性。
由於 Kafka 主題中的資料格式可能不一,例如 Avro、JSON 或原始位元組,因此設定的關鍵部分是指定轉換器。轉換器會將 Kafka 主題中使用的格式資料,轉換為 Kafka Connect 的標準內部格式。接著,Pub/Sub 接收器連接器會取得這項內部資料,並轉換為 Pub/Sub 要求的格式,然後寫入資料。
如要進一步瞭解轉換器在 Kafka Connect 中的角色、支援的轉換器類型和常見設定選項,請參閱轉換器。
以下是 Pub/Sub 接收器連接器的專屬設定:
cps.project:指定包含 Pub/Sub 主題的 Google Cloud 專案 ID。cps.topic:指定要將資料發布至哪個 Pub/Sub 主題。cps.endpoint:指定要使用的 Pub/Sub 端點。
如要查看這個連接器可用的特定設定屬性清單,請參閱「Pub/Sub Sink 連接器設定」。
建立 Pub/Sub 接收器連接器
建立連接器前,請先參閱 Pub/Sub 接收器連接器屬性的說明文件。
控制台
前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。
按一下要建立連接器的 Connect 叢集。
系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。
按一下「Create connector」(建立連接器)。
系統會顯示「建立 Kafka 連接器」頁面。
輸入連接器名稱字串。
如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。
在「連接器外掛程式」部分,選取「Pub/Sub 接收器」。
在「主題」下方,選擇「選取 Kafka 主題清單」或「使用主題規則運算式」。然後選取或輸入這個連接器要取用訊息的 Kafka 主題。這些主題位於相關聯的 Kafka 叢集中。
在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 中,選擇這個連接器要將訊息發布至哪個 Pub/Sub 主題。主題會以完整資源名稱格式顯示:
projects/{project}/topics/{topic}。(選用) 在「設定」部分調整其他設定。您可以在這裡指定
tasks.max、key.converter和value.converter等屬性,如上一節所述。選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。
點選「建立」。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
執行
gcloud managed-kafka connectors create指令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE更改下列內容:
CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。
LOCATION:建立連接器的位置。這個位置必須與您建立 Connect 叢集的位置相同。
CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。
CONFIG_FILE:BigQuery Sink 連接器的 YAML 設定檔路徑。
以下是 Pub/Sub 接收器連接器的設定檔範例:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"更改下列內容:
CPS_SINK_CONNECTOR_ID:Pub/Sub Sink 連接器的 ID 或名稱。如要查看連接器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。
GMK_TOPIC_ID:Pub/Sub Sink 連接器從中讀取資料的 Managed Service for Apache Kafka 主題 ID。
CPS_TOPIC_ID:資料發布至其中的 Pub/Sub 主題 ID。
GCP_PROJECT_ID:Pub/Sub 主題所在的專案 ID。 Google Cloud
Terraform
您可以使用 Terraform 資源建立連接器。
如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。
Go
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。
Java
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。
Python
在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件。
如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。
建立連接器後,您可以編輯、刪除、暫停、停止或重新啟動連接器。