建立 MirrorMaker 2.0 連接器

MirrorMaker 2.0 是一種工具,可在 Kafka 叢集之間複製主題。 您可以建立下列 MirrorMaker 2.0 連接器:

  • MirrorMaker 2.0 來源

  • MirrorMaker 2.0 查核點

  • MirrorMaker 2.0 活動訊號

MirrorMaker 2.0 來源連接器一律為必要項目,因為它會將資料從來源叢集鏡像複製到目標叢集。也會同步處理 ACL。 MirrorMaker 2.0 檢查點和活動訊號連接器為選用項目。 您也可以建立 MirrorMaker 2.0 檢查點和活動訊號連接器,而不必建立來源連接器。

如要進一步瞭解這些連接器,請參閱「連接器總覽」。

瞭解 MirrorMaker 2.0 中的叢集角色

設定 MirrorMaker 2.0 時,請務必瞭解 Kafka 叢集扮演的不同角色:

  • 主要叢集:在 Managed Service for Apache Kafka 的環境中,這是指 Kafka Connect 叢集直接連結的 Managed Service for Apache Kafka 叢集。Connect 叢集會代管 MirrorMaker 2.0 連接器例項。

  • 次要叢集:這是參與複製作業的其他 Kafka 叢集。可以是另一個 Managed Service for Apache Kafka 叢集,也可以是外部叢集。例如在 Compute Engine、GKE、內部部署環境或其他雲端自行管理。

  • 來源叢集:這是 MirrorMaker 2.0 從中複製資料的 Kafka 叢集。

  • 目標叢集:這是 MirrorMaker 2.0 要將資料複製到的 Kafka 叢集。

主要叢集可做為來源或目標:

  • 如果主要叢集是來源,次要叢集就是目標。資料會主要叢集流向次要叢集。

  • 如果主要叢集是目標,次要叢集就是來源。資料會次要叢集流向主要叢集。

為盡量減少寫入作業的延遲時間,建議將目標叢集指定為主要叢集,並將 Connect 叢集放在與目標叢集相同的區域。

您必須正確設定連接器的所有屬性。這些屬性也包括針對次要叢集設定的生產端驗證屬性。如要瞭解潛在問題的詳細資料,請參閱「改善 MirrorMaker 2.0 用戶端設定」。

事前準備

如要建立 MirrorMaker 2.0 連接器,請完成下列工作:

  • 建立 Managed Service for Apache Kafka 叢集 (主要)。這個叢集是 MirrorMaker 2.0 連接器的其中一個端點。

  • 建立次要 Kafka 叢集。這個叢集是另一個端點。 可以是另一個 Managed Service for Apache Kafka 叢集,也可以是外部或自行管理的 Kafka 叢集。您可以將多個 Kafka 叢集設定為 Connect 叢集的次要 Kafka 叢集。

  • 建立 Connect 叢集,用於代管 MirrorMaker 2.0 連接器。

  • 確認已設定次要 Kafka 叢集的 DNS 網域。

  • 設定防火牆規則,允許 Private Service Connect 介面連線至來源和目標 Kafka 叢集。

  • 如果來源或目標 Kafka 叢集是透過網際網路存取,請設定 Cloud NAT,允許 Connect 工作站存取網際網路。

  • 如果次要叢集包含外部或自行管理的 Kafka 叢集,請務必將必要憑證設定為密鑰資源。

如要進一步瞭解網路需求,請參閱「工作人員子網路」。

必要角色和權限

如要取得建立 MirrorMaker 2.0 連接器所需的權限,請要求管理員授予您專案的受管理 Kafka 連接器編輯者 (roles/managedkafka.connectorEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備建立 MirrorMaker 2.0 連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立 MirrorMaker 2.0 連接器,您必須具備下列權限:

  • 在上層 Connect 叢集上授予建立連接器的權限: managedkafka.connectors.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解「Managed Kafka Connector Editor」角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

在其他專案中建立 MirrorMaker 2.0 連接器

如果主要 Managed Service for Apache Kafka 叢集與執行 MirrorMaker 2.0 連接器的 Connect 叢集位於不同專案,請參閱「在不同專案中建立 Connect 叢集」。

連線至自行管理的次要 Kafka 叢集

連線至自行管理的次要 Kafka 叢集時,請注意網路和驗證。

  • 網路:請確認已設定適當的虛擬私有雲網路設定和防火牆規則,允許 Connect 叢集虛擬私有雲網路與代管自管或外部叢集的網路之間建立連線。

  • 如要瞭解虛擬私有雲內的叢集,請參閱「建立及管理虛擬私有雲網路」。

  • 如要連線至地端部署或其他雲端環境,請考慮使用 Cloud VPNCloud Interconnect 等解決方案。另請參閱連結至內部部署 Kafka 的具體指引。

  • 驗證和加密:Connect 叢集必須向自行管理或外部叢集進行驗證 (如有需要),並處理所有 TLS 加密。如需 Kafka 驗證的一般資訊,請參閱 Apache Kafka 安全性說明文件

使用 Secret Manager 管理憑證

Connect 叢集會直接與 Secret Manager 整合。將所有敏感設定值 (例如密碼,以及連線至自行管理或外部叢集所需的信任儲存區和金鑰儲存區內容) 儲存在 Secret Manager 中做為密鑰。

  • 授予 Connect 叢集服務帳戶的密鑰會自動掛接為連接器執行階段環境中的檔案,位於 /var/secrets/ 目錄下。

  • 檔案名稱的格式為 {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}。請務必使用專案名稱,而非專案編號

  • 參照密鑰的方式取決於 Kafka 屬性預期是密鑰密碼還是檔案路徑

    • 如為密碼,請使用 Kafka DirectoryConfigProvider 屬性。 以 ${directory:/var/secrets}:{SECRET_FILENAME} 格式指定值。 範例:password=${directory:/var/secrets}:my-project-db-password-1

    • 如果是檔案路徑,請指定已掛接密鑰檔案的直接路徑。 範例:ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

如要進一步瞭解如何在建立 Connect 叢集時授予存取權及設定密鑰,請參閱「設定 Secret Manager 密鑰」。

MirrorMaker 來源連接器的運作方式

MirrorMaker 來源連接器會從來源叢集的一或多個 Kafka 主題中提取資料,並將這些資料連同存取控制清單,複製到目標叢集的主題。

以下詳細說明 MirrorMaker 來源連接器如何複製資料:

  • 連接器會取用來源叢集內指定 Kafka 主題的訊息。使用 topics 設定屬性指定要複製的主題,這個屬性接受以半形逗號分隔的主題名稱,或單一 Java 式規則運算式。例如:topic-a,topic-bmy-prefix-.*

  • 您也可以使用 topics.exclude 屬性,讓連接器略過您指定的特定主題,排除條件會覆寫納入條件。

  • 連接器會將已取用的訊息寫入目標叢集。

  • 連接器需要來源和目標叢集連線詳細資料,例如 source.cluster.bootstrap.serverstarget.cluster.bootstrap.servers

  • 連接器也需要來源和目標叢集的別名,如 source.cluster.aliastarget.cluster.alias 所指定。根據預設,系統會使用來源別名自動重新命名複製的主題。舉例來說,別名為 primary 的來源中,名為 orders 的主題在目標中會變成 primary.orders

  • 與複製主題相關聯的 ACL 也會從來源叢集同步至目標叢集。您可以使用 sync.topic.acls.enabled 屬性停用這項功能。

  • 如叢集需要驗證,您必須在設定中提供連線至來源和目標叢集的驗證詳細資料。您必須設定 security.protocolsasl.mechanismsasl.jaas.config 等屬性,來源的前置字串為 source.cluster.,目標的前置字串為 target.cluster.

  • 連接器會依賴內部主題。您可能需要設定與這些項目相關的屬性,例如 offset-syncs.topic.replication.factor

  • 連接器會使用 Kafka 記錄轉換器 key.convertervalue.converterheader.converter。如果是直接複製,這些通常會預設為 org.apache.kafka.connect.converters.ByteArrayConverter,不會執行任何轉換 (直通)。

  • tasks.max 屬性可控制連接器的平行處理層級。增加 tasks.max 可能可以提高輸送量,但有效平行處理通常會受到複製來源 Kafka 主題中的分割區數量限制。

MirrorMaker 2.0 連接器的屬性

建立或更新 MirrorMaker 2.0 連接器時,請指定下列屬性:

連接器名稱

連接器的名稱或 ID。如要查看資源命名準則,請參閱「Managed Service for Apache Kafka 資源命名指南」。名稱無法變更。

連接器類型

連接器類型必須是下列其中一項:

主要 Kafka 叢集

Managed Service for Apache Kafka 叢集。系統會自動填入這個欄位。

  • 將主要 Kafka 叢集設為目標叢集:選取這個選項,將其他 Kafka 叢集的資料移至主要的 Managed Service for Apache Kafka 叢集。

  • 將主要 Kafka 叢集設為來源叢集:選取這個選項,將資料從主要 Managed Service for Apache Kafka 叢集移至其他 Kafka 叢集。

目標或來源叢集

構成管道另一端的次要 Kafka 叢集。

  • Managed Service for Apache Kafka 叢集:從下拉式選單中選取叢集。

  • 外部或自行管理的 Kafka 叢集:hostname:port_number 格式輸入啟動位址。例如:kafka-test:9092

主題名稱或規則運算式

要複製的主題。指定個別名稱 (topic1、topic2) 或使用規則運算式 (topic.*)。MirrorMaker 2.0 來源連接器必須具備這項屬性。預設值為 .*

消費者群組名稱或規則運算式

要複製的消費者群組。指定個別名稱 (group1、group2) 或使用規則運算式 (group.*)。MirrorMaker 2.0 檢查點連接器必須使用這項屬性。預設值為 .*

設定

您可以在這個部分為 MirrorMaker 2.0 連接器指定其他連接器專屬的設定屬性。

由於 Kafka 主題中的資料格式可能不一,例如 Avro、JSON 或原始位元組,因此設定的關鍵部分是指定轉換器。轉換器會將 Kafka 主題中使用的格式資料,轉換為 Kafka Connect 的標準內部格式。

如要進一步瞭解 Kafka Connect 中轉換器的角色、支援的轉換器類型和常見設定選項,請參閱「轉換器」。

所有 MirrorMaker 2.0 連接器的常見設定包括:

  • source.cluster.alias:來源叢集的別名。

  • target.cluster.alias:目標叢集的別名。

複製資料時用來排除特定資源的設定:

  • topics.exclude:已排除的主題。支援以半形逗號分隔的主題名稱和規則運算式。排除條件的優先順序高於納入條件。 用於 MirrorMaker 2.0 來源連接器。 預設值為 mm2.*.internal,.*.replica,__.*

  • groups.exclude:排除群組。支援以半形逗號分隔的群組 ID 和規則運算式。排除條件的優先順序高於納入條件。 用於 MirrorMaker 2.0 查核點連接器。 預設值為 console-consumer-.*,connect-.*,__.*

MirrorMaker 2.0 連接器必須通過驗證。

如果來源或目標 Kafka 叢集是 Managed Service for Apache Kafka 叢集,Connect 叢集會使用 OAuthBearer 向該叢集進行驗證。驗證設定已預先設定,因此您不必手動設定。

如果是自行管理或地端部署的 Kafka 叢集,驗證設定取決於 Kafka 叢集支援的驗證機制。來源 Kafka 叢集設定的驗證設定範例如下:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

目標 Kafka 叢集設定的驗證設定範例如下:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

可用的設定屬性取決於特定連接器。請查看支援的 MirrorMaker 2.0 連接器版本,瞭解支援哪些設定範例。請參閱下列文件:

轉換 Kafka 記錄

Kafka Connect 會使用 org.apache.kafka.connect.converters.ByteArrayConverter 做為鍵和值的預設轉換器,提供不執行轉換的直通選項。

您可以設定 header.converterkey.convertervalue.converter,使用其他轉換器。

任務數量

tasks.max 值會設定 Kafka Connect 用來執行 MirrorMaker 連接器的最大工作數。這項設定可控制連接器的平行處理級別。 增加工作數量可能會提高輸送量,但會受到 Kafka 主題分區數量等因素限制。

建立 MirrorMaker 2.0 來源連接器

建立連接器前,請先參閱連接器屬性的說明文件。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要建立連接器的 Connect 叢集。

    系統會顯示「Connect cluster details」(連結叢集詳細資料) 頁面。

  3. 按一下「建立連接器」

    系統會顯示「Create Kafka Connector」(建立 Kafka 連接器) 頁面。

  4. 在「Connector name」(連接器名稱) 中輸入字串。

    如要進一步瞭解如何命名連接器,請參閱「Managed Service for Apache Kafka 資源命名指南」。

  5. 在「連接器外掛程式」部分,選取「MirrorMaker 2.0 來源」。

  6. 在「Primary Kafka cluster」(主要 Kafka 叢集) 中,選擇下列其中一個選項:

    • 將主要 Kafka 叢集設為來源叢集:從 Managed Service for Apache Kafka 叢集遷移資料。
    • 將主要 Kafka 叢集設為目標叢集:將資料移至 Managed Service for Apache Kafka 叢集。
  7. 在「目標叢集」或「來源叢集」部分,選擇下列其中一個選項:

    • Managed Service for Apache Kafka 叢集:從選單中選取。
    • 外部或自行管理的 Kafka 叢集:以 hostname:port_number 格式輸入啟動位址。
  8. 輸入主題名稱或主題規則運算式 (以半形逗號分隔)

  9. 查看並調整「設定」,包括必要的安全性設定。

    如要進一步瞭解設定和驗證,請參閱「設定」。

  10. 選取「任務重新啟動政策」。詳情請參閱「工作重新啟動政策」。

  11. 點選「建立」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka connectors create 指令:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    更改下列內容:

    • CONNECTOR_ID:連接器的 ID 或名稱。 如要查看連線器命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 連接器名稱無法變更。

    • LOCATION:建立連接器的位置。這個位置必須與您建立 Connect 叢集的位置相同。

    • CONNECT_CLUSTER_ID:建立連接器的 Connect 叢集 ID。

    • CONFIG_FILE:BigQuery Sink 連接器的 YAML 設定檔路徑。

    以下是 MirrorMaker 2.0 來源連接器的設定檔範例:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    您可以使用 Terraform 資源建立連接器

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。