创建 MirrorMaker 2.0 连接器

MirrorMaker 2.0 是一种可在 Kafka 集群之间复制主题的工具。 您可以创建以下 MirrorMaker 2.0 连接器:

  • MirrorMaker 2.0 来源

  • MirrorMaker 2.0 检查点

  • MirrorMaker 2.0 检测信号

MirrorMaker 2.0 源连接器是必需的,因为它会将数据从源集群镜像到目标集群。它还会同步 ACL。 MirrorMaker 2.0 检查点连接器和检测信号连接器是可选的。 您也可以在不创建 Source 连接器的情况下创建 MirrorMaker 2.0 Checkpoint 和 Heartbeat 连接器。

如需详细了解这些连接器,请参阅连接器概览

了解 MirrorMaker 2.0 中的集群角色

配置 MirrorMaker 2.0 时,请务必了解 Kafka 集群所扮演的不同角色:

  • 主集群:在 Managed Service for Apache Kafka 的上下文中,这是 Kafka Connect 集群直接附加到的 Managed Service for Apache Kafka 集群。Connect 集群托管 MirrorMaker 2.0 连接器实例。

  • 辅助集群:这是参与复制的另一个 Kafka 集群。它可以是另一个 Managed Service for Apache Kafka 集群,也可以是外部集群。例如,在 Compute Engine、GKE、本地或其他云中自行管理的数据库。

  • 源集群:这是 MirrorMaker 2.0 从中复制数据的 Kafka 集群。

  • 目标集群:这是 MirrorMaker 2.0 将数据复制到的 Kafka 集群。

主集群可作为源或目标:

  • 如果主集群是,则次集群是目标。数据从主集群流向次要集群。

  • 如果主集群是目标,则次集群是。数据从次要集群流向主要集群。

为了最大限度地缩短写入操作的延迟时间,建议将目标集群指定为主集群,并将 Connect 集群与目标集群放在同一区域中。

您必须正确配置连接器的所有属性。这些属性还包括针对辅助集群的生产者身份验证属性。如需详细了解潜在问题,请参阅改进 MirrorMaker 2.0 客户端配置

准备工作

如需创建 MirrorMaker 2.0 连接器,请完成以下任务:

  • 创建 Managed Service for Apache Kafka 集群(主集群)。 此集群充当 MirrorMaker 2.0 连接器的一个端点。

  • 创建辅助 Kafka 集群。此集群充当另一个端点。 它可以是另一个 Managed Service for Apache Kafka 集群,也可以是外部或自行管理的 Kafka 集群。您可以将多个 Kafka 集群配置为 Connect 集群的辅助 Kafka 集群。

  • 创建托管 MirrorMaker 2.0 连接器的 Connect 集群

  • 确保已配置辅助 Kafka 集群的 DNS 网域。

  • 配置防火墙规则,以允许 Private Service Connect 接口同时访问源 Kafka 集群和目标 Kafka 集群。

  • 如果通过互联网访问源 Kafka 集群或目标 Kafka 集群,请配置 Cloud NAT 以允许 Connect 工作器访问互联网。

  • 如果辅助集群包含外部或自行管理的 Kafka 集群,请确保将所需凭据配置为 Secret 资源。

如需详细了解网络要求,请参阅工作器子网

所需的角色和权限

如需获得创建 MirrorMaker 2.0 连接器所需的权限,请让您的管理员为您授予项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 MirrorMaker 2.0 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建 MirrorMaker 2.0 连接器需要以下权限:

  • 在父级 Connect 集群上授予创建连接器的权限: managedkafka.connectors.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色

在其他项目中创建 MirrorMaker 2.0 连接器

如果您的主 Managed Service for Apache Kafka 集群与运行 MirrorMaker 2.0 连接器的 Connect 集群位于不同的项目中,请参阅在其他项目中创建 Connect 集群

连接到自行管理的辅助 Kafka 集群

连接到自行管理的辅助 Kafka 集群时,请注意网络和身份验证。

  • 网络:确保已配置适当的 VPC 网络设置和防火墙规则,以允许在 Connect 集群 VPC 网络与托管自管理集群或外部集群的网络之间建立连接。

  • 对于 VPC 中的集群,请参阅创建和管理 VPC 网络

  • 如需连接到本地或其他云环境,请考虑 Cloud VPNCloud Interconnect 等解决方案。另请参阅有关连接到本地 Kafka 的具体指南。

  • 身份验证和加密:您的 Connect 集群必须与自行管理的集群或外部集群(如果需要)进行身份验证,并处理任何 TLS 加密。如需了解有关 Kafka 身份验证的一般信息,请参阅 Apache Kafka 安全性文档

使用 Secret Manager 管理凭据

Connect 集群可直接与 Secret Manager 集成。将所有敏感配置值(例如密码,以及连接到自行管理的集群或外部集群所需的信任库和密钥库内容)存储为 Secret Manager 中的 Secret。

  • 授予给 Connect 集群服务账号的 Secret 会自动装载为连接器运行时环境中的文件,位于 /var/secrets/ 目录下。

  • 文件名遵循 {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION} 模式。您必须使用项目名称,而不是项目编号

  • 引用 Secret 的方式取决于 Kafka 属性是需要 Secret 密码还是需要文件的路径

    • 对于密码,请使用 Kafka DirectoryConfigProvider 属性。 以 ${directory:/var/secrets}:{SECRET_FILENAME} 格式指定值。 示例:password=${directory:/var/secrets}:my-project-db-password-1

    • 对于文件路径,请指定已装载的 Secret 文件的直接路径。 示例:ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

如需详细了解如何在创建 Connect 集群期间授予访问权限和配置 Secret,请参阅配置 Secret Manager Secret

MirrorMaker Source 连接器的工作原理

MirrorMaker 源连接器会从源集群中的一个或多个 Kafka 主题拉取数据,并将这些数据连同 ACL 复制到目标集群中的主题。

下面详细介绍了 MirrorMaker Source 连接器如何复制数据:

  • 连接器会从源集群中指定的 Kafka 主题使用消息。使用 topics 配置属性指定要复制的主题,该属性接受以英文逗号分隔的主题名称或单个 Java 样式的正则表达式。例如 topic-a,topic-bmy-prefix-.*

  • 连接器还可以使用 topics.exclude 属性跳过复制您指定的特定主题;排除项会替换包含项。

  • 连接器会将已消费的消息写入目标集群。

  • 连接器需要源集群和目标集群的连接详细信息,例如 source.cluster.bootstrap.serverstarget.cluster.bootstrap.servers

  • 连接器还需要源集群和目标集群的别名,如 source.cluster.aliastarget.cluster.alias 中所指定。默认情况下,复制的主题会使用源别名自动重命名。例如,别名为 primary 的来源中的主题 orders 在目标中会变为 primary.orders

  • 与复制的主题相关联的 ACL 也会从源集群同步到目标集群。您可以使用 sync.topic.acls.enabled 属性停用此功能。

  • 如果源集群和目标集群需要身份验证,则必须在配置中提供连接到这两个集群的身份验证详细信息。您必须配置 security.protocolsasl.mechanismsasl.jaas.config 等属性,并为来源添加 source.cluster. 前缀,为目标添加 target.cluster. 前缀。

  • 连接器依赖于内部主题。您可能需要配置与这些相关的属性,例如 offset-syncs.topic.replication.factor

  • 连接器使用 Kafka 记录转换器 key.convertervalue.converterheader.converter。对于直接复制,这些参数通常默认为 org.apache.kafka.connect.converters.ByteArrayConverter,即不执行任何转换(直通)。

  • tasks.max 属性用于控制连接器的并行级别。增加 tasks.max 可能会提高吞吐量,但有效并行性通常会受到正在复制的源 Kafka 主题中的分区数量的限制。

MirrorMaker 2.0 连接器的属性

创建或更新 MirrorMaker 2.0 连接器时,请指定以下属性:

连接器名称

连接器的名称或 ID。如需了解有关如何命名资源的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。 该名称不可变。

连接器类型

连接器类型必须是以下类型之一:

Kafka 主集群

Managed Service for Apache Kafka 集群。系统会自动填充此字段。

  • 将 Kafka 主集群用作目标集群:选择此选项可将数据从其他 Kafka 集群移至主 Managed Service for Apache Kafka 集群。

  • 将 Kafka 主集群用作源集群:选择此选项可将数据从主 Managed Service for Apache Kafka 集群迁移到另一个 Kafka 集群。

目标集群或源集群

构成流水线另一端的辅助 Kafka 集群。

  • Managed Service for Apache Kafka 集群:从下拉菜单中选择一个集群。

  • 自行管理的 Kafka 集群或外部 Kafka 集群:以 hostname:port_number 格式输入引导地址。例如:kafka-test:9092

主题名称或正则表达式

要复制的主题。指定单个名称(topic1、topic2)或使用正则表达式 (topic.*)。MirrorMaker 2.0 Source 连接器需要此属性。默认值为 .*

使用方群组名称或正则表达式

要复制的使用方群组。指定单个名称(group1、group2)或使用正则表达式 (group.*)。MirrorMaker 2.0 检查点连接器需要此属性。默认值为 .*

配置

您可以在此部分中为 MirrorMaker 2.0 连接器指定其他连接器专用配置属性。

由于 Kafka 主题中的数据可以采用各种格式(例如 Avro、JSON 或原始字节),因此配置的关键部分是指定转换器。转换器会将 Kafka 主题中使用的格式的数据转换为 Kafka Connect 的标准化内部格式。

如需详细了解转换器在 Kafka Connect 中的作用、支持的转换器类型和常见配置选项,请参阅转换器

所有 MirrorMaker 2.0 连接器的一些常见配置包括:

  • source.cluster.alias:源集群的别名。

  • target.cluster.alias:目标集群的别名。

用于在复制数据时排除特定资源的配置:

  • topics.exclude:排除的主题。支持以英文逗号分隔的主题名称和正则表达式。排除项的优先级高于包含项。 用于 MirrorMaker 2.0 Source 连接器。 默认值为 mm2.*.internal,.*.replica,__.*

  • groups.exclude:排除群组。支持以英文逗号分隔的群组 ID 和正则表达式。排除项的优先级高于包含项。 用于 MirrorMaker 2.0 检查点连接器。 默认值为 console-consumer-.*,connect-.*,__.*

MirrorMaker 2.0 连接器需要身份验证配置。

如果源 Kafka 集群或目标 Kafka 集群是 Managed Service for Apache Kafka 集群,则 Connect 集群会使用 OAuthBearer 与其进行身份验证。身份验证配置已预先配置,因此您无需手动设置配置。

对于自行管理的 Kafka 集群或本地 Kafka 集群,身份验证配置取决于 Kafka 集群支持的身份验证机制。源 Kafka 集群配置的身份验证配置示例如下所示:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

目标 Kafka 集群配置的身份验证配置示例如下所示:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

可用的配置属性取决于具体连接器。查看支持的 MirrorMaker 2.0 连接器版本,了解支持哪些配置示例。请参阅以下文档:

Kafka 记录转换

Kafka Connect 使用 org.apache.kafka.connect.converters.ByteArrayConverter 作为键和值的默认转换器,该转换器提供不进行转换的直通选项。

您可以配置 header.converterkey.convertervalue.converter 以使用其他转换器。

任务计数

tasks.max 值用于配置 Kafka Connect 用于运行 MirrorMaker 连接器的任务数上限。它控制连接器的并行级别。 增加任务数量可能会提高吞吐量,但会受到 Kafka 主题分区数量等因素的限制。

创建 MirrorMaker 2.0 来源连接器

在创建连接器之前,请查看连接器属性的相关文档。

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要创建连接器的 Connect 集群。

    系统会显示连接集群详情页面。

  3. 点击创建连接器

    系统会显示创建 Kafka 连接器页面。

  4. 对于连接器名称,请输入一个字符串。

    如需详细了解如何为连接器命名,请参阅 Managed Service for Apache Kafka 资源命名指南

  5. 对于连接器插件,请选择“MirrorMaker 2.0 源”。

  6. 对于主 Kafka 集群,请选择以下任一选项:

    • 将 Kafka 主集群用作源集群:用于从 Managed Service for Apache Kafka 集群迁移数据。
    • 将 Kafka 主集群用作目标集群:将数据迁移到 Managed Service for Apache Kafka 集群。
  7. 对于目标集群源集群,请选择以下选项之一:

    • Managed Service for Apache Kafka 集群:从菜单中选择。
    • 自行管理的 Kafka 集群或外部 Kafka 集群:以 hostname:port_number 格式输入引导地址。
  8. 输入以英文逗号分隔的主题名称或主题正则表达式

  9. 查看并调整配置,包括必需的安全设置。

    如需详细了解配置和身份验证,请参阅配置

  10. 选择任务重启政策。如需了解详情,请参阅任务重启政策

  11. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka connectors create 命令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    替换以下内容:

    • CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。

    • LOCATION:您创建连接器的位置。此位置必须与您创建 Connect 集群的位置相同。

    • CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。

    • CONFIG_FILE:BigQuery Sink 连接器的 YAML 配置文件路径。

    以下是 MirrorMaker 2.0 源连接器的配置文件示例:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    您可以使用 Terraform 资源创建连接器

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。