创建 Pub/Sub Sink 连接器

Pub/Sub 接收器连接器会将消息从 Kafka 主题流式传输到 Pub/Sub 主题。这样,您就可以将基于 Kafka 的应用与 Pub/Sub 集成,从而实现事件驱动型架构和实时数据处理。

准备工作

在创建 Pub/Sub Sink 连接器之前,请确保您已准备好以下各项:

所需的角色和权限

如需获得创建 Pub/Sub Sink 连接器所需的权限,请让管理员向您授予包含 Connect 集群的项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

这些预定义角色包含创建 Pub/Sub Sink 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建 Pub/Sub 接收器连接器,您需要以下权限:

  • 在父级 Connect 集群上授予创建连接器的权限: managedkafka.connectors.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色

如果您的 Managed Service for Apache Kafka 集群与 Connect 集群位于同一项目中,则无需进一步授予权限。如果 Connect 集群位于其他项目中,请参阅在其他项目中创建 Connect 集群

授予向 Pub/Sub 主题发布消息的权限

Connect 集群服务账号(格式为 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)需要有权将消息发布到 Pub/Sub 主题。为此,请向包含 Pub/Sub 主题的项目中的 Connect 集群服务账号授予 Pub/Sub Publisher 角色 (roles/pubsub.publisher)。

Pub/Sub 接收器连接器的工作原理

Pub/Sub 接收器连接器从一个或多个 Kafka 主题拉取消息,并将其发布到 Pub/Sub 主题。

下面详细介绍了 Pub/Sub 接收器连接器如何复制数据:

  • 连接器会使用源集群内一个或多个 Kafka 主题中的消息。

  • 连接器会将消息写入使用 cps.topic 配置属性指定的目标 Pub/Sub 主题 ID。这是必需属性。

  • 连接器还需要使用 cps.project 配置属性指定包含 Pub/Sub 主题的 Google Cloud 项目。这是一个必需的属性。

  • 连接器还可以选择使用通过 cps.endpoint 属性指定的自定义 Pub/Sub 端点。默认端点为 "pubsub.googleapis.com:443"

  • 为了优化性能,连接器会在将消息发布到 Pub/Sub 之前对其进行缓冲。您可以配置 maxBufferSizemaxBufferBytesmaxDelayThresholdMsmaxOutstandingRequestBytesmaxOutstandingMessages 来控制缓冲。

  • Kafka 记录包含三个组成部分:标头、键、值。 连接器使用键和值转换器将 Kafka 消息数据转换为 Pub/Sub 所需的格式。使用结构或映射值架构时,messageBodyName 属性用于指定要用作 Pub/Sub 消息正文的字段或键。

  • 通过将 metadata.publish 属性设置为 true,连接器可以将 Kafka 主题、分区、偏移量和时间戳作为消息属性包含在内。

  • 连接器可以使用 headers.publish 属性(设置为 true)将 Kafka 消息标头作为 Pub/Sub 消息属性包含在内。

  • 连接器可以使用 orderingKeySource 属性为 Pub/Sub 消息添加排序键。此值的选项包括 "none"(默认)、"key""partition"

  • tasks.max 属性用于控制连接器的并行级别。增加 tasks.max 可以提高吞吐量,但实际并行性受 Kafka 主题中分区数量的限制。

Pub/Sub 接收器连接器的属性

创建 Pub/Sub 接收器连接器时,您需要指定以下属性。

连接器名称

Connect 集群中连接器的唯一名称。 有关如何命名资源的指南,请参阅 Managed Service for Apache Kafka 资源命名指南

连接器插件类型

选择 Pub/Sub 接收器作为连接器插件类型。这决定了数据流的方向(从 Kafka Pub/Sub)以及所用的具体连接器实现。如果您不使用界面来配置连接器,还必须指定连接器类。

Kafka 主题

连接器从中消费消息的 Kafka 主题。 您可以指定一个或多个主题,也可以使用正则表达式来匹配多个主题。例如,topic.* 可匹配以“topic”开头的所有主题。这些主题必须存在于与您的 Connect 集群关联的 Managed Service for Apache Kafka 集群中。

Pub/Sub 主题

连接器将消息发布到的现有 Pub/Sub 主题。确保 Connect 集群服务账号对主题的项目具有 roles/pubsub.publisher 角色,如准备工作中所述。

配置

您可以在此部分中指定其他连接器专用配置属性。

由于 Kafka 主题中的数据可以采用各种格式(例如 Avro、JSON 或原始字节),因此配置的关键部分是指定转换器。转换器会将 Kafka 主题中使用的格式的数据转换为 Kafka Connect 的标准化内部格式。然后,Pub/Sub 接收器连接器会获取此内部数据,并将其转换为 Pub/Sub 所需的格式,然后再写入。

如需详细了解转换器在 Kafka Connect 中的作用、支持的转换器类型和常见配置选项,请参阅转换器

以下是 Pub/Sub 接收器连接器特有的一些配置:

  • cps.project:指定包含 Pub/Sub 主题的 Google Cloud 项目 ID。

  • cps.topic:指定要将数据发布到的 Pub/Sub 主题。

  • cps.endpoint:指定要使用的 Pub/Sub 端点。

如需查看此连接器特有的可用配置属性的列表,请参阅 Pub/Sub Sink 连接器配置

创建 Pub/Sub 接收器连接器

在创建连接器之前,请查看Pub/Sub 接收器连接器的属性的相关文档。

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要为其创建连接器的 Connect 集群。

    系统会显示连接集群详情页面。

  3. 点击创建连接器

    系统会显示创建 Kafka 连接器页面。

  4. 对于连接器名称,请输入一个字符串。

    有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南

  5. 对于连接器插件,选择 Pub/Sub Sink

  6. 主题下,选择选择一个 Kafka 主题列表使用主题正则表达式。然后,选择或输入此连接器从中消费消息的 Kafka 主题。这些主题位于您的关联 Kafka 集群中。

  7. 选择 Cloud Pub/Sub 主题部分,选择此连接器将消息发布到的 Pub/Sub 主题。主题以完整资源名称格式显示:projects/{project}/topics/{topic}

  8. (可选)在配置部分中配置其他设置。您可以在此处指定 tasks.maxkey.convertervalue.converter 等属性,如上一部分中所述。

  9. 选择任务重启政策。如需了解详情,请参阅任务重启政策

  10. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka connectors create 命令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    替换以下内容:

    • CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。

    • LOCATION:您创建连接器的位置。此位置必须与您创建 Connect 集群的位置相同。

    • CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。

    • CONFIG_FILE:BigQuery Sink 连接器的 YAML 配置文件路径。

    以下是 Pub/Sub Sink 连接器的配置文件示例:

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    替换以下内容:

    • CPS_SINK_CONNECTOR_ID:Pub/Sub Sink 连接器的 ID 或名称。如需了解如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。 连接器的名称不可变。

    • GMK_TOPIC_ID:Managed Service for Apache Kafka 主题的 ID,Pub/Sub 接收器连接器从中读取数据。

    • CPS_TOPIC_ID:发布数据的 Pub/Sub 主题的 ID。

    • GCP_PROJECT_ID:Pub/Sub 主题所在的 Google Cloud项目的 ID。

  3. Terraform

    您可以使用 Terraform 资源创建连接器

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

创建连接器后,您可以修改、删除、暂停、停止或重启连接器。

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。