建立 Pub/Sub 來源連接器

Pub/Sub 來源連接器會將訊息從 Pub/Sub 串流至 Kafka。 這樣一來,您就能將 Pub/Sub 與 Kafka 型應用程式和資料管道整合。

這個連接器會從 Pub/Sub 訂閱項目讀取訊息,將每則訊息轉換為 Kafka 記錄,然後將記錄寫入 Kafka 主題。根據預設,連接器會建立下列 Kafka 記錄:

  • Kafka 記錄鍵為 null
  • Kafka 記錄值是 Pub/Sub 訊息資料 (以位元組為單位)。
  • Kafka 記錄標頭為空白。

不過,您可以設定這項行為。詳情請參閱「設定連接器」。

事前準備

建立 Pub/Sub 來源連接器前,請確認您具備下列項目:

必要角色和權限

如要取得建立 Pub/Sub 來源連接器所需的權限,請要求管理員在包含 Connect 叢集的專案中,授予您「Managed Kafka Connector 編輯者 」(roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備建立 Pub/Sub 來源連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立 Pub/Sub 來源連接器,您必須具備下列權限:

  • 在上層 Connect 叢集上授予建立連接器的權限: managedkafka.connectors.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Connector 編輯者角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

如果 Managed Service for Apache Kafka 叢集與 Connect 叢集位於同一個專案,則不需要其他權限。如果 Connect 叢集位於其他專案,請參閱「在其他專案中建立 Connect 叢集」。

授予從 Pub/Sub 讀取的權限

代管 Kafka 服務帳戶必須具備從 Pub/Sub 訂閱項目讀取訊息的權限。在包含 Pub/Sub 訂閱項目的專案中,將下列 IAM 角色授予服務帳戶:

  • Pub/Sub 訂閱者 (roles/pubsub.subscriber)
  • Pub/Sub 檢視器 (roles/pubsub.viewer)

代管型 Kafka 服務帳戶的格式如下: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com。 將 PROJECT_NUMBER 替換為專案編號。

建立 Pub/Sub 來源連接器

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要建立連接器的 Connect 叢集。

  3. 按一下「Create connector」(建立連接器)。

  4. 輸入連接器名稱字串。

    如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南

  5. 在「連接器外掛程式」部分,選取「Pub/Sub 來源」

  6. 在「Cloud Pub/Sub subscription」(Cloud Pub/Sub 訂閱項目) 清單中,選取 Pub/Sub 訂閱項目。連接器會從這個訂閱項目提取訊息。訂閱項目會以完整資源名稱顯示:projects/{project}/subscriptions/{subscription}

  7. 在「Kafka 主題」清單中,選取要寫入訊息的 Kafka 主題。

  8. 選用:在「設定」方塊中,新增設定屬性或編輯預設屬性。詳情請參閱「設定連接器」。

  9. 選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。

  10. 點選「建立」

gcloud

  1. 執行 gcloud managed-kafka connectors create 指令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    更改下列內容:

    • CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。

    • LOCATION:Connect 叢集的位置。

    • CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。

    • CONFIG_FILE:YAML 或 JSON 設定檔的路徑。

以下是設定檔的範例:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

更改下列內容:

  • PROJECT_ID:Pub/Sub 訂閱項目所在的專案 ID。 Google Cloud

  • PUBSUB_SUBSCRIPTION_ID:要從中擷取資料的 Pub/Sub 訂閱項目 ID。

  • KAFKA_TOPIC_ID:寫入資料的 Kafka 主題 ID。

cps.projectcps.subscriptionkafka.topic 設定屬性為必填。如需其他設定選項,請參閱「設定連接器」。

Terraform

您可以使用 Terraform 資源建立連接器

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。

Go

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

建立連接器後,您可以編輯、刪除、暫停、停止或重新啟動連接器。

設定連接器

本節說明您可以在連接器上設定的部分設定屬性。

如需這個連接器的專屬屬性完整清單,請參閱 Pub/Sub 來源連接器設定

提取模式

提取模式會指定連接器擷取 Pub/Sub 訊息的方式。支援的模式如下:

  • 提取模式 (預設)。系統會分批提取訊息。如要啟用這項模式,請設定 cps.streamingPull.enabled=false.。如要設定批次大小,請設定 cps.maxBatchSize 屬性。

    如要進一步瞭解提取模式,請參閱「提取 API」。

  • 串流提取模式。從 Pub/Sub 擷取訊息時,可達到最大總處理量和最低延遲時間。如要啟用這項模式,請設定 cps.streamingPull.enabled=true

    如要進一步瞭解串流提取模式,請參閱「StreamingPull API」。

    如果啟用串流提取功能,您可以設定下列設定屬性來調整效能:

    • cps.streamingPull.flowControlBytes:每個工作未處理的訊息位元組數上限。
    • cps.streamingPull.flowControlMessages:每個工作未處理訊息的數量上限。
    • cps.streamingPull.maxAckExtensionMs:連接器延長訂閱期限的時間上限 (以毫秒為單位)。
    • cps.streamingPull.maxMsPerAckExtension:連接器每次延長訂閱期限的時間上限,單位為毫秒。
    • cps.streamingPull.parallelStreams:要從訂閱項目提取訊息的串流數量。

Pub/Sub 端點

連接器預設會使用全域 Pub/Sub 端點。如要指定端點,請將 cps.endpoint 屬性設為端點位址。如要進一步瞭解端點,請參閱「Pub/Sub 端點」。

Kafka 記錄

Pub/Sub 來源連接器會將 Pub/Sub 訊息轉換為 Kafka 記錄。以下各節說明轉換程序。

記錄鍵

金鑰轉換器必須為 org.apache.kafka.connect.storage.StringConverter

  • 根據預設,記錄鍵為 null

  • 如要使用 Pub/Sub 訊息屬性做為鍵,請將 kafka.key.attribute 設為屬性名稱。例如:kafka.key.attribute=username

  • 如要使用 Pub/Sub 排序鍵做為鍵,請設定 kafka.key.attribute=orderingKey

記錄標頭

根據預設,記錄標頭為空白。

如果 kafka.record.headerstrue,Pub/Sub 訊息屬性會寫入為記錄標頭。如要加入排序鍵,請設定 cps.makeOrderingKeyAttribute=true

記錄價值

如果 kafka.record.headerstrue,或 Pub/Sub 訊息沒有自訂屬性,記錄值就是訊息資料 (位元組陣列)。將值轉換器設為 org.apache.kafka.connect.converters.ByteArrayConverter

否則,如果 kafka.record.headersfalse,且訊息至少有一個自訂屬性,連接器會將記錄值寫為 struct。將值轉換器設為 org.apache.kafka.connect.json.JsonConverter

struct 包含下列欄位:

  • message:Pub/Sub 訊息資料 (以位元組表示)。

  • 每個 Pub/Sub 訊息屬性的欄位。如要加入排序鍵,請設定 cps.makeOrderingKeyAttribute=true

舉例來說,假設訊息具有 username 屬性:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

如果 value.converter.schemas.enabletrue,則 struct 會同時包含酬載和結構定義:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Kafka 分區

根據預設,連接器會寫入主題中的單一分區。如要指定連接器寫入的分區數量,請設定 kafka.partition.count 屬性。值不得超過主題的分割區計數

如要指定連接器將郵件指派給分割區的方式,請設定 kafka.partition.scheme 屬性。詳情請參閱「Pub/Sub 來源連接器設定」。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。