创建 Pub/Sub Source 连接器

Pub/Sub 源连接器会将消息从 Pub/Sub 流式传输到 Kafka。 这样一来,您就可以将 Pub/Sub 与基于 Kafka 的应用和数据流水线集成。

连接器从 Pub/Sub 订阅中读取消息,将每条消息转换为 Kafka 记录,并将记录写入 Kafka 主题。默认情况下,连接器会按如下方式创建 Kafka 记录:

  • Kafka 记录键为 null
  • Kafka 记录值是 Pub/Sub 消息数据(以字节为单位)。
  • Kafka 记录标头为空。

不过,您可以配置此行为。如需了解详情,请参阅配置连接器

准备工作

在创建 Pub/Sub 来源连接器之前,请确保您已准备好以下各项:

所需的角色和权限

如需获得创建 Pub/Sub Source 连接器所需的权限,请让您的管理员为您授予包含 Connect 集群的项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 Pub/Sub Source 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能创建 Pub/Sub 来源连接器:

  • 在父级 Connect 集群上授予创建连接器的权限: managedkafka.connectors.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色

如果您的 Managed Service for Apache Kafka 集群与 Connect 集群位于同一项目中,则无需进一步授予权限。如果 Connect 集群位于其他项目中,请参阅在其他项目中创建 Connect 集群

授予从 Pub/Sub 读取数据的权限

Managed Kafka 服务账号必须有权从 Pub/Sub 订阅中读取消息。向包含 Pub/Sub 订阅的项目中的服务账号授予以下 IAM 角色:

  • Pub/Sub Subscriber (roles/pubsub.subscriber)
  • Pub/Sub Viewer (roles/pubsub.viewer)

受管 Kafka 服务账号采用以下格式: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com。 将 PROJECT_NUMBER 替换为项目编号。

创建 Pub/Sub 源连接器

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要创建连接器的 Connect 集群。

  3. 点击创建连接器

  4. 对于连接器名称,请输入一个字符串。

    有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南

  5. 对于连接器插件,选择 Pub/Sub Source

  6. Cloud Pub/Sub 订阅列表中,选择一个 Pub/Sub 订阅。连接器从相应订阅中拉取消息。订阅以完整资源名称的形式显示:projects/{project}/subscriptions/{subscription}

  7. Kafka 主题列表中,选择写入消息的 Kafka 主题。

  8. 可选:在配置框中,添加配置属性或修改默认属性。如需了解详情,请参阅配置连接器

  9. 选择任务重启政策。如需了解详情,请参阅任务重启政策

  10. 点击创建

gcloud

  1. 运行 gcloud managed-kafka connectors create 命令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    替换以下内容:

    • CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。

    • LOCATION:Connect 集群的位置。

    • CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。

    • CONFIG_FILE:YAML 或 JSON 配置文件的路径。

以下是一个配置文件示例:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

替换以下内容:

  • PROJECT_ID:Pub/Sub 订阅所在的 Google Cloud项目的 ID。

  • PUBSUB_SUBSCRIPTION_ID:要从中拉取数据的 Pub/Sub 订阅的 ID。

  • KAFKA_TOPIC_ID:写入数据的 Kafka 主题的 ID。

必须提供 cps.projectcps.subscriptionkafka.topic 配置属性。如需了解其他配置选项,请参阅配置连接器

Terraform

您可以使用 Terraform 资源创建连接器

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

Go

在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

创建连接器后,您可以修改、删除、暂停、停止或重启连接器。

配置连接器

本部分介绍了一些可在连接器上设置的配置属性。

如需查看此连接器特有的属性的完整列表,请参阅 Pub/Sub Source 连接器配置

拉取模式

拉取模式用于指定连接器检索 Pub/Sub 消息的方式。支持以下模式:

  • 拉取模式(默认)。消息以批次形式拉取。如需启用此模式,请设置 cps.streamingPull.enabled=false.。如需配置批次大小,请设置 cps.maxBatchSize 属性。

    如需详细了解拉取模式,请参阅 Pull API

  • Streaming Pull 模式。在从 Pub/Sub 检索消息时实现最大吞吐量和最低延迟时间。如需启用此模式,请设置 cps.streamingPull.enabled=true

    如需详细了解流式拉取模式,请参阅 StreamingPull API

    如果启用了流式拉取,您可以通过设置以下配置属性来调整性能:

    • cps.streamingPull.flowControlBytes:每个任务的未处理消息字节数上限。
    • cps.streamingPull.flowControlMessages:每个任务的未处理消息数上限。
    • cps.streamingPull.maxAckExtensionMs:连接器延长订阅截止时间的最长时间(以毫秒为单位)。
    • cps.streamingPull.maxMsPerAckExtension:连接器每次延长订阅截止时间的最长时间(以毫秒为单位)。
    • cps.streamingPull.parallelStreams:从订阅中拉取消息的流数量。

Pub/Sub 端点

默认情况下,连接器使用全球 Pub/Sub 端点。如需指定端点,请将 cps.endpoint 属性设置为端点地址。如需详细了解端点,请参阅 Pub/Sub 端点

Kafka 记录

Pub/Sub 源连接器会将 Pub/Sub 消息转换为 Kafka 记录。以下部分介绍了转换流程。

记录密钥

密钥转换器必须是 org.apache.kafka.connect.storage.StringConverter

  • 默认情况下,记录键为 null

  • 如需使用 Pub/Sub 消息属性作为键,请将 kafka.key.attribute 设置为相应属性的名称。例如 kafka.key.attribute=username

  • 如需使用 Pub/Sub 排序键作为键,请设置 kafka.key.attribute=orderingKey

记录标头

默认情况下,记录标题为空。

如果 kafka.record.headerstrue,Pub/Sub 消息特性将作为记录标头写入。如需添加排序键,请设置 cps.makeOrderingKeyAttribute=true

记录值

如果 kafka.record.headerstrue,或者 Pub/Sub 消息没有自定义属性,则记录值为消息数据(以字节数组的形式)。将值转换器设置为 org.apache.kafka.connect.converters.ByteArrayConverter

否则,如果 kafka.record.headersfalse 且消息至少包含一个自定义属性,连接器会将记录值写入为 struct。将值转换器设置为 org.apache.kafka.connect.json.JsonConverter

struct 包含以下字段:

  • message:Pub/Sub 消息数据(以字节为单位)。

  • 每个 Pub/Sub 消息属性的字段。如需包含排序键,请设置 cps.makeOrderingKeyAttribute=true

例如,假设消息具有 username 属性:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

如果 value.converter.schemas.enabletrue,则 struct 同时包含载荷和架构:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Kafka 分区

默认情况下,连接器会写入主题中的单个分区。如需指定连接器写入的分区数量,请设置 kafka.partition.count 属性。该值不得超过主题的分区数

如需指定连接器如何将消息分配给分区,请设置 kafka.partition.scheme 属性。如需了解详情,请参阅 Pub/Sub Source 连接器配置

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。