创建 Connect 集群

Connect 集群为连接器提供了一个环境,有助于将数据从现有 Kafka 部署迁移到 Google Cloud Managed Service for Apache Kafka 集群,或将数据从 Managed Service for Apache Kafka 集群迁移到其他 Google Cloud 服务或其他 Kafka 集群。辅助 Kafka 集群可以是另一个 Google Cloud Managed Service for Apache Kafka 集群,也可以是自行管理的集群或本地集群。

准备工作

确保您已创建 Managed Service for Apache Kafka 集群。 您需要知道 Connect 集群将要附加到的 Managed Service for Apache Kafka 集群的名称。

每个 Connect 集群都与一个 Managed Service for Apache Kafka 集群相关联。此集群存储在 Connect 集群上运行的连接器的状态。

创建 Connect 集群所需的角色和权限

如需获得创建 Connect 集群所需的权限,请让您的管理员为您授予项目的 Managed Kafka Connect Cluster Editor (roles/managedkafka.connectClusterEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建 Connect 集群所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要拥有以下权限才能创建 Connect 集群:

  • 在指定位置授予创建 Connect 集群的权限: managedkafka.connectClusters.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色

必需的 ACL 主账号

默认情况下,如果未配置 ACL,Managed Service for Apache Kafka 集群允许 Connect 集群访问资源。为此,请将 allow.everyone.if.no.acl.found 设置为 true(默认设置)。

不过,如果 Managed Service for Apache Kafka 集群已配置 ACL,则 Connect 集群不会自动获得对资源的读取和写入权限。您必须手动授予这些权限。

在 ACL 中用作主账号的 Connect 集群服务账号采用以下格式:User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com

如果您已在 Kafka 集群上配置 ACL,请使用以下命令向 Connect 集群授予对主题的读取和写入权限,以及对消费者群组的读取权限:

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

如需详细了解这些命令,请参阅为精细的访问权限控制配置 Apache Kafka ACL

在其他项目中创建 Connect 集群

创建 Connect 集群时,该集群会与同一项目中的 Managed Service for Apache Kafka 集群共享同一服务代理。如果此 Managed Service for Apache Kafka 集群被指定为附加到 Connect 集群的 Kafka 主集群,则无需其他权限。

服务代理的格式为 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com。项目编号是指包含 Connect 集群和 Managed Service for Apache Kafka 集群的项目。

如果您的 Connect 集群位于项目 A 中,而关联的 Managed Service for Apache Kafka 集群位于项目 B 中,请按以下步骤操作:

  1. 确保已为项目 A 和项目 B 启用 Managed Kafka API。

    启用该 API

  2. 确定项目 A 中 Connect 集群的服务代理。

    服务代理的格式为 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com

  3. 在项目 B 中,向 Connect 集群的服务账号授予 Managed Kafka Client 角色 (roles/managedkafka.client)。

    此角色可授予连接到 Managed Service for Apache Kafka 集群并执行读取和写入数据等操作所需的权限。

    如需详细了解如何授予该角色,请参阅创建服务代理并授予角色

授予权限时,请务必遵循最小权限原则。仅授予必要的权限,以确保安全并防止未经授权的访问。

Connect 集群的属性

本部分介绍了 Connect 集群的属性。

Connect 集群名称

您要创建的 Connect 集群的名称。有关如何命名 Connect 集群的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。集群的名称不可变。

Kafka 主集群

与您的 Connect 集群关联的 Managed Service for Apache Kafka 集群。此关联集群(主集群)存储在 Connect 集群上运行的连接器的状态。一般来说,主 Managed Service for Apache Kafka 集群也是在 Connect 集群上运行的所有源连接器的目标位置,以及所有接收器连接器的输入位置。

单个 Managed Service for Apache Kafka 集群可以有多个 Connect 集群。如果您选择其他项目中的 Managed Service for Apache Kafka 集群,请确保已配置适当的权限

创建 Connect 集群后,您将无法更新为其他 Kafka 集群。

区域同位带来的延迟和网络费用优势

将 Managed Service for Apache Kafka 和 Connect 集群放置在同一区域可减少延迟和网络费用。例如,假设您的 Managed Service for Apache Kafka 集群位于 region-a 中,并且您正在使用接收器连接器将数据从该 Managed Service for Apache Kafka 集群(来源)写入同样位于 region-a 中的 BigQuery 表(接收器)。如果您在 region-a 中部署 Connect 集群,此部署选择可最大限度地减少 BigQuery 写入操作的延迟,并消除 Managed Service for Apache Kafka 集群与 Connect 集群之间的区域间网络传输费用。

多系统延迟时间和费用注意事项

Kafka Connect 使用连接器在系统之间移动数据。连接器的一侧始终与 Managed Service for Apache Kafka 集群交互。单个 Kafka Connect 集群可以运行多个连接器,每个连接器都可以充当来源(从系统拉取数据)或接收器(将数据推送到系统)。

虽然与 Managed Service for Apache Kafka 集群位于同一区域的 Connect 集群受益于两者之间较低的通信延迟,但每个连接器还会与另一个系统(例如 BigQuery 表或其他 Kafka 集群)进行交互。即使 Connect 集群和 Managed Service for Apache Kafka 集群位于同一位置,该其他系统也可能位于其他区域。这会导致延迟时间和费用增加。整个流水线的延迟时间取决于所有三个系统的位置:Managed Service for Apache Kafka 集群、Connect 集群以及源系统或接收器系统。

例如,如果您的 Managed Service for Apache Kafka 集群位于 region-a 中,Connect 集群位于 region-b 中,并且您正在使用 Cloud Storage 连接器来连接 region-c 中的存储桶,则您需要支付两次网络跃点费用(从 region-aregion-b,然后再从 region-bregion-c,或者反过来,具体取决于连接器方向)。

在规划 Connect 集群放置位置时,请仔细考虑所有涉及的区域,以优化延迟时间和费用。

容量配置

容量配置要求您为 Connect 集群配置 vCPU 数量和每个 vCPU 的内存量。您可以在创建 Connect 集群后更新其容量。以下是容量配置的属性:

  • vCPU:分配给 Connect 集群的 vCPU 数量。最小值是 3 个 vCPU。

  • 内存:为每个 vCPU 分配的内存量。您必须为每个 vCPU 预配 1 GiB 到 8 GiB 之间的内存。创建集群后,可以在这些限制范围内增加或减少内存量。

    例如,如果您创建了一个具有 6 个 vCPU 的集群,则可为该集群分配的最小内存为 6 GiB(每个 vCPU 1 GiB),最大内存为 48 GiB(每个 vCPU 8 GiB)。

分配给 Connect 集群中每个工作器的 vCPU 和内存对集群的性能、容量和费用有很大影响。下面详细介绍了 vCPU 和内存如何影响 Connect 集群。

vCPU 数量

  • Kafka Connect 会将连接器的工作划分为多个任务。每个任务都可以并行处理数据。vCPU 越多,可以同时运行的任务就越多,从而实现更高的吞吐量。

  • 增加 vCPU 会增加 Connect 集群的费用。

内存

  • Kafka Connect 使用内存来缓冲在连接器和 Managed Service for Apache Kafka 之间流动的数据。更大的内存可实现更大的缓冲区。大内存可以提高吞吐量,尤其是在处理大量数据流时。处理非常大的消息或记录的连接器需要足够的内存来处理这些消息或记录,以免遇到 OutOfMemoryError 异常。

  • 内存越多,Connect 集群的费用就越高。

  • 如果您使用繁重的转换逻辑,则需要分配更多内存。

您的目标是为 Connect 集群选择合适的容量配置。为此,您必须了解 Connect 集群可以处理的吞吐量。

工作器(主要)子网

工作器子网(也称为主要子网)用于将 VPC 网络连接到 Connect 集群。此子网可让集群工作器访问使用方网络中来源和接收器的端点,例如 Managed Service for Apache Kafka 集群或自托管 Kafka 集群。

以下是配置工作器子网的一些要求:

  • 必须提供工作器子网。

  • 子网必须与 Connect 集群位于同一区域。

  • 子网必须与主 Kafka 集群已连接子网列表中的某个子网位于同一父 VPC 中。

  • 子网 CIDR 范围的大小不得小于 /22(1024 个地址)。

系统会使用 Private Service Connect 接口,在工作器子网中为集群工作器分配 IP 地址。工作器可以访问子网的 VPC 网络中可访问的任何网络目的地,但需满足以下要求:

  • 端点不得位于 172.16.0.0/14 CIDR 范围内。此范围预留给 Managed Service for Apache Kafka Connect 内部使用。
  • 防火墙规则必须允许相应流量。请参阅配置网络连接的安全性
  • 对于互联网流量,您必须配置 Cloud NAT。例如,MirrorMaker 连接器需要 Cloud NAT 才能从可通过互联网访问的 Kafka 集群复制数据。
  • 如需访问与工作器子网 VPC 不同的 VPC 中的 Private Service Connect 端点,您必须确保使用受支持的使用方配置(例如 NCC)。如需了解详情,请参阅通过端点访问已发布服务的简介

可解析的 DNS 域名

可解析的 DNS 网域(也称为 DNS 网域名)可让租户 VPC 能够使用使用方 VPC 网络中的 DNS 地址。这样一来,Connect 集群便可将 DNS 名称解析为 IP 地址,从而与其他服务(包括 MirrorMaker 连接器的其他 Kafka 集群)进行通信。

对于可解析的 DNS 网域,您可以选择 Managed Service for Apache Kafka 集群。 您无需为主要的 Managed Service for Apache Kafka 集群配置 DNS 域名,因为其引导地址会自动包含在可解析的 DNS 域名列表中。

不过,您也可以手动指定 DNS 网域,如果您选择外部 Kafka 集群,则必须这样做。系统会自动添加主 Managed Service for Apache Kafka 集群的 DNS 域名。其他 Kafka 集群仍需要配置 DNS 网域。

Secret Manager 资源

指定要加载到工作器中的 Secret Manager。 这些密文会安全地存储在 Secret Manager 中,并提供给您的 Connect 集群。

您可以选择在连接器配置中使用 Secret Manager。例如,您可以将密钥文件加载到 Connect 集群中,并让连接器读取该文件。Secret Manager 作为文件装载在工作器中。

连接的集群直接与 Secret Manager 集成。 您必须使用 Secret Manager 来存储和管理您的 Secret。

指定密钥的格式为:projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID}

  • PROJECT_ID:Secret Manager Secret 所在项目的 ID。

  • SECRET_NAME:Secret Manager 中的密文名称。

  • VERSION_ID:密文的特定版本号。这是一个数字,例如“1”“2”“3”。

您可以将最多 32 个 Secret 加载到单个 Connect 集群中。

确保运行 Connect 工作人员的服务代理对您要使用的 Secret 具有 secretmanager.secretAccessor 角色(Secret Manager Secret Accessor)。此角色允许 Connect 集群从 Secret Manager 中检索 Secret 值。

标签

标签是有助于您进行组织和标识的键值对。它们可帮助您整理 Connect 集群。您可以为每个 Connect 集群附加标签,然后根据标签过滤资源。标签示例包括 environment:prodapplication:web-app

创建 Connect 集群

在创建集群之前,请查看 Connect 集群属性的相关文档。

创建 Connect 集群需要 20 到 30 分钟。

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击创建

    创建 Connect 集群页面随即会打开。

  3. Connect 集群名称中,输入一个字符串。

    如需详细了解如何命名 Connect 集群,请参阅 Managed Service for Apache Kafka 资源命名指南

  4. 对于主 Kafka 集群,请从菜单中选择一个 Managed Service for Apache Kafka 集群。

    如需详细了解此 Managed Service for Apache Kafka 集群执行的功能,请参阅主 Kafka 集群

  5. 对于位置,从区域菜单中选择一个受支持的位置,或保留默认值。

    如需详细了解如何选择合适的位置,请参阅主 Kafka 集群

  6. 对于容量配置,请输入 vCPU内存的值,或保留默认值。

    对于 vCPUs,请输入集群的虚拟 CPU 数量。

    对于内存,请输入每个 CPU 的内存量(以 GiB 为单位)。如果每个 CPU 的内存大于 8 GiB,系统会显示错误消息。

    如需详细了解如何确定 Managed Service for Apache Kafka 集群的规模,请参阅容量配置

  7. 对于网络配置,从网络菜单中选择或保留主 Managed Service for Apache Kafka 集群的网络。

  8. 对于工作器子网,请从菜单中选择或保留子网。

    系统会自动填充子网 URI 路径字段。如需了解详情,请参阅工作器子网

  9. 对于可解析的 DNS 域名,系统会自动将主 Kafka 集群的 DNS 域名添加为可解析的 DNS 域名。

    如需添加其他 DNS 网域,请展开相应部分。

  10. 点击添加 DNS 网域

    从菜单中选择一个 Kafka 集群。

    系统会自动填充 DNS 域名。您还可以输入外部 Kafka 集群的 DNS 域名。

    点击完成

  11. 对于 Secret Manager 资源,请根据需要展开相应部分。

  12. 点击添加 Secret 资源

  13. 密文菜单中选择一个密文,然后从密文版本菜单中选择一个版本。您还可以创建新的 Secret

    确保运行 Connect worker 的服务代理对您要使用的 Secret 具有 Secret Manager Secret Accessor 角色。如需详细了解 Secret Manager,请参阅 Secret Manager 资源

  14. 点击完成

  15. 如果您需要添加更多 Secret,请点击添加 Secret 资源

  16. 对于标签,请展开相应部分(如有需要)。

    如需整理项目,您可为资源添加键值对形式的任意标签。

    点击添加标签,以添加不同的环境、服务、所有者、团队等。

  17. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka connect-clusters create 命令:

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    替换以下内容:

    • CONNECT_CLUSTER_ID:Connect 集群的 ID 或名称。有关如何命名 Connect 集群的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。 Connect 集群的名称不可变。

    • LOCATION:您在其中创建 Connect 集群的位置。必须是受支持的 Google Cloud地区。 创建后,您无法更改 Connect 集群的位置。如需获取可用位置列表,请参阅 Managed Service for Apache Kafka 位置。 如需详细了解位置建议,请参阅主 Kafka 集群

    • CPU:Connect 集群的 vCPU 数量。最小值为 3 个 vCPU。请参阅 vCPU 数量

    • MEMORY:Connect 集群的内存量。使用“MB”“MiB”“GB”“GiB”“TB”或“TiB”单位。例如,“3GiB”。您必须为每个 vCPU 预配 1 GiB 到 8 GiB 之间的内存。 请参阅内存

    • WORKER_SUBNET:Connect 集群的工作子网。

      子网的格式为 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID

      工作器子网必须与 Connect 集群位于同一区域。

    • PROJECT_ID:(可选)Google Cloud 项目的 ID。如果未提供,则使用当前项目。

    • KAFKA_CLUSTER:与 Connect 集群关联的主 Managed Service for Apache Kafka 集群的 ID 或完全限定名称。请参阅 Kafka 集群。Kafka 集群的格式为 projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID

      创建 Connect 集群后,您将无法更新为其他 Kafka 集群。

    • SECRET:(可选)要加载到工作器中的 Secret。 必须提供 Secret Manager 中的确切 Secret 版本,不支持别名。一个集群最多可加载 32 个 Secret。 格式:projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME:(可选)要对 Connect 集群公开的子网中的 DNS 域名。Connect 集群可以使用域名访问资源,而无需依赖 IP 地址。请参阅 DNS 对等互连

    • LABELS:(可选)要与集群关联的标签。如需详细了解标签的格式,请参阅标签。要添加的标签键值对列表。键必须以小写字符开头,并且只能包含连字符 (-)、下划线 (_)、小写字符和数字。值只能包含连字符 (-)、下划线 (_)、小写字符和数字。

    • CONFIG_FILE:(可选)包含从集群或连接器默认值替换的配置的 JSON 或 YAML 文件的路径。此文件还支持内嵌 JSON 或 YAML。

    • --async:(可选)立即返回结果,而无需等待正在进行的操作完成。使用 --async 标志,您可以在后台创建集群的同时继续执行其他任务。如果不使用此标志,系统会等待操作完成,然后再返回响应。您必须等到集群完全更新后才能继续执行其他任务。

    您会收到类似如下所示的响应:

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    存储 OPERATION_ID 以跟踪进度。例如,此处的值为 operation-1753590328249-63ae19098cc06-64300a0a-06512d02

  3. Terraform

    您可以使用 Terraform 资源创建 Connect 集群

    resource "google_managed_kafka_connect_cluster" "default" {
      provider           = google-beta
      project            = data.google_project.default.project_id
      connect_cluster_id = "my-connect-cluster-id"
      location           = "us-central1"
      kafka_cluster      = google_managed_kafka_cluster.default.id
      capacity_config {
        vcpu_count   = 12
        memory_bytes = 12884901888 # 12 GiB
      }
      gcp_config {
        access_config {
          network_configs {
            primary_subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
    
    	// Capacity configuration with 12 vCPU and 12 GiB RAM
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   12,
    		MemoryBytes: 12884901888, // 12 GiB in bytes
    	}
    
    	// Optionally, you can also specify accessible subnets and resolvable DNS
    	// domains as part of your network configuration. For example:
    	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
    	// 	{
    	// 		PrimarySubnet:      primarySubnet,
    	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
    	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
    	// 	},
    	// }
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		KafkaCluster:   kafkaCluster,
    		CapacityConfig: capacityConfig,
    	}
    
    	req := &managedkafkapb.CreateConnectClusterRequest{
    		Parent:           locationPath,
    		ConnectClusterId: clusterID,
    		ConnectCluster:   connectCluster,
    	}
    	op, err := client.CreateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
    import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
    import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
        int cpu = 12;
        long memoryBytes = 12884901888L; // 12 GiB
        createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
      }
    
      public static void createConnectCluster(
          String projectId,
          String region,
          String clusterId,
          String subnet,
          String kafkaCluster,
          int cpu,
          long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
            .setMemoryBytes(memoryBytes).build();
        ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
            .setPrimarySubnet(subnet)
            .build();
        // Optionally, you can also specify additional accessible subnets and resolvable
        // DNS domains as part of your network configuration. For example:
        // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
        // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
        ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
            .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setKafkaCluster(kafkaCluster)
            .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
            .create(settingsBuilder.build())) {
          CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setConnectClusterId(clusterId)
              .setConnectCluster(connectCluster)
              .build();
    
          // The duration of this operation can vary considerably, typically taking
          // between 10-30 minutes.
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .createConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(), operation.isDone(), future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone
          // = True)
          ConnectCluster response = future.get();
          System.out.printf("Created connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
              e.getMessage());
          throw e;
        }
      }
    }

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
    
    # TODO(developer): Update with your values.
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # kafka_cluster_id = "my-kafka-cluster"
    # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 12
    # memory_bytes = 12884901888  # 12 GiB
    
    connect_client = ManagedKafkaConnectClient()
    kafka_client = managedkafka_v1.ManagedKafkaClient()
    
    parent = connect_client.common_location_path(project_id, region)
    kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    connect_cluster.kafka_cluster = kafka_cluster_path
    connect_cluster.capacity_config.vcpu_count = cpu
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
    # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
    # For example:
    # connect_cluster.gcp_config.access_config.network_configs = [
    #     ConnectNetworkConfig(
    #         primary_subnet=primary_subnet,
    #         additional_subnets=additional_subnets,
    #         dns_domain_names=dns_domain_names,
    #     )
    # ]
    
    request = CreateConnectClusterRequest(
        parent=parent,
        connect_cluster_id=connect_cluster_id,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.create_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # Creating a Connect cluster can take 10-40 minutes.
        response = operation.result(timeout=3000)
        print("Created Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

监控集群创建操作

只有在运行 gcloud CLI 创建 Connect 集群后,您才能运行以下命令。

  • 创建 Connect 集群通常需要 20-30 分钟。为了跟踪集群创建的进度,gcloud managed-kafka connect-clusters create 命令使用长时间运行的操作 (LRO),您可以使用以下命令监控该操作:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    替换以下内容:

    • OPERATION_ID 替换为上一部分中的操作 ID 值。
    • LOCATION 替换为上一部分中的位置值。

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。