MirrorMaker 2.0-Connector erstellen

MirrorMaker 2.0 ist ein Tool, mit dem Themen zwischen Kafka-Clustern repliziert werden. Sie können die folgenden MirrorMaker 2.0-Connectors erstellen:

  • MirrorMaker 2.0 (Quelle)

  • MirrorMaker 2.0 (Prüfpunkt)

  • MirrorMaker 2.0 (Heartbeat)

Der Quell-Connector von MirrorMaker 2.0 ist immer erforderlich, da er die Daten vom Quell- in die Zielcluster spiegelt. Außerdem werden ACLs synchronisiert. Die MirrorMaker 2.0-Connectors für Prüfpunkte und Heartbeats sind optional. Sie können die MirrorMaker 2.0-Checkpoint- und Heartbeat-Connectors auch ohne den Quellconnector erstellen.

Weitere Informationen zu diesen Connectors finden Sie in der Übersicht über Connectors.

Clusterrollen in MirrorMaker 2.0

Bei der Konfiguration von MirrorMaker 2.0 ist es wichtig, die verschiedenen Rollen von Kafka-Clustern zu verstehen:

  • Primärer Cluster:Im Kontext von Managed Service for Apache Kafka ist dies der Managed Service for Apache Kafka-Cluster, an den Ihr Kafka Connect-Cluster direkt angehängt ist. Der Connect-Cluster hostet die MirrorMaker 2.0-Connector-Instanz.

  • Sekundärer Cluster:Dies ist der andere Kafka-Cluster, der an der Replikation beteiligt ist. Das kann ein anderer Managed Service for Apache Kafka-Cluster oder ein externer Cluster sein. Beispiele sind selbstverwaltete Instanzen in Compute Engine, GKE, lokal oder in einer anderen Cloud.

  • Quellcluster:Dies ist der Kafka-Cluster, aus dem MirrorMaker 2.0 Daten repliziert.

  • Zielcluster:Dies ist der Kafka-Cluster, in den MirrorMaker 2.0 Daten repliziert.

Der primäre Cluster kann als Quelle oder Ziel dienen:

  • Wenn der primäre Cluster die Quelle ist, ist der sekundäre Cluster das Ziel. Die Daten fließen vom primären zum sekundären Cluster.

  • Wenn der primäre Cluster das Ziel ist, ist der sekundäre Cluster die Quelle. Die Daten fließen vom sekundären zum primären Cluster.

Um die Latenz für Schreibvorgänge zu minimieren, empfiehlt es sich, den Zielcluster als primären Cluster festzulegen und den Connect-Cluster in derselben Region wie den Zielcluster zu platzieren.

Sie müssen alle Attribute für den Connector richtig konfigurieren. Dazu gehören auch Eigenschaften zur Authentifizierung von Produzenten, die auf den sekundären Cluster ausgerichtet sind. Details zu potenziellen Problemen finden Sie unter MirrorMaker 2.0-Clientkonfiguration verbessern.

Hinweise

Führen Sie die folgenden Aufgaben aus, um einen MirrorMaker 2.0-Connector zu erstellen:

  • Erstellen Sie einen Managed Service for Apache Kafka-Cluster (primär). Dieser Cluster dient als ein Endpunkt Ihres MirrorMaker 2.0-Connectors.

  • Erstellen Sie einen sekundären Kafka-Cluster. Dieser Cluster dient als anderer Endpunkt. Das kann ein anderer Managed Service for Apache Kafka-Cluster oder ein externer oder selbstverwalteter Kafka-Cluster sein. Sie können mehrere Kafka-Cluster als sekundäre Kafka-Cluster eines Connect-Clusters konfigurieren.

  • Erstellen Sie einen Connect-Cluster, in dem Ihr MirrorMaker 2.0-Connector gehostet wird.

  • Achten Sie darauf, dass DNS-Domains von sekundären Kafka-Clustern konfiguriert sind.

  • Konfigurieren Sie Firewallregeln, damit die Private Service Connect-Schnittstelle sowohl den Quell- als auch den Ziel-Kafka-Cluster erreichen kann.

  • Wenn auf den Quell- oder Ziel-Kafka-Cluster über das Internet zugegriffen wird, konfigurieren Sie eine Cloud NAT, damit die Connect-Worker auf das Internet zugreifen können.

  • Wenn sekundäre Cluster externe oder selbstverwaltete Kafka-Cluster enthalten, müssen die erforderlichen Anmeldedaten als Secret-Ressourcen konfiguriert sein.

Weitere Informationen zu den Netzwerkanforderungen finden Sie unter Worker-Subnetz.

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines MirrorMaker 2.0-Connectors benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines MirrorMaker 2.0-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines MirrorMaker 2.0-Connectors erforderlich:

  • Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster: managedkafka.connectors.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

MirrorMaker 2.0-Connector in einem anderen Projekt erstellen

Wenn sich Ihr primärer Managed Service for Apache Kafka-Cluster in einem anderen Projekt als der Connect-Cluster befindet, in dem der MirrorMaker 2.0-Connector ausgeführt wird, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.

Verbindung zu einem selbstverwalteten sekundären Kafka-Cluster herstellen

Achten Sie beim Herstellen einer Verbindung zu einem sekundären Kafka-Cluster, der selbst verwaltet wird, auf die Netzwerk- und Authentifizierungseinstellungen.

  • Netzwerk:Achten Sie darauf, dass die richtigen VPC-Netzwerkeinstellungen und Firewallregeln konfiguriert sind, um die Verbindung zwischen dem VPC-Netzwerk des Connect-Clusters und dem Netzwerk, in dem der selbstverwaltete oder externe Cluster gehostet wird, zu ermöglichen.

  • Informationen zu Clustern in VPCs finden Sie unter VPC-Netzwerke erstellen und verwalten.

  • Für die Verbindung zu lokalen oder anderen Cloud-Umgebungen sollten Sie Lösungen wie Cloud VPN oder Cloud Interconnect in Betracht ziehen. Weitere Informationen zum Herstellen einer Verbindung zu lokalem Kafka

  • Authentifizierung und Verschlüsselung:Ihr Connect-Cluster muss sich beim selbstverwalteten oder externen Cluster authentifizieren (falls erforderlich) und die TLS-Verschlüsselung verarbeiten. Allgemeine Informationen zur Kafka-Authentifizierung finden Sie in der Dokumentation zur Apache Kafka-Sicherheit.

Secret Manager für Anmeldedaten verwenden

Connect-Cluster lassen sich direkt in Secret Manager einbinden. Speichern Sie alle vertraulichen Konfigurationswerte wie Passwörter sowie Truststore- und Keystore-Inhalte, die für die Verbindung zum selbstverwalteten oder externen Cluster erforderlich sind, als Secrets im Secret Manager.

  • Secrets, die dem Dienstkonto des Connect-Clusters gewährt werden, werden automatisch als Dateien in der Laufzeitumgebung des Connectors im Verzeichnis /var/secrets/ bereitgestellt.

  • Der Dateiname entspricht dem Muster {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Sie müssen den Namen des Projekts und nicht die Nummer des Projekts verwenden.

  • Wie Sie auf ein Secret verweisen, hängt davon ab, ob für die Kafka-Property das password des Secrets oder der path zu einer Datei erwartet wird:

    • Verwenden Sie für Passwörter die Kafka-Eigenschaft DirectoryConfigProvider. Geben Sie den Wert im Format ${directory:/var/secrets}:{SECRET_FILENAME} an. Beispiel: password=${directory:/var/secrets}:my-project-db-password-1

    • Geben Sie für Dateipfade den direkten Pfad zur bereitgestellten Secret-Datei an. Beispiel: ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Weitere Informationen zum Gewähren des Zugriffs und Konfigurieren von Secrets beim Erstellen von Connect-Clustern finden Sie unter Secret Manager-Secrets konfigurieren.

Funktionsweise eines MirrorMaker-Quellconnectors

Ein MirrorMaker-Quell-Connector ruft Daten aus einem oder mehreren Kafka-Themen in einem Quellcluster ab und repliziert diese Daten zusammen mit ACLs in Themen in einem Zielcluster.

Hier eine detaillierte Aufschlüsselung der Replikation von Daten durch den MirrorMaker-Quellconnector:

  • Der Connector ruft Nachrichten aus angegebenen Kafka-Themen im Quellcluster ab. Geben Sie die zu replizierenden Themen mit der Konfigurationseigenschaft topics an. Diese akzeptiert kommagetrennte Themennamen oder einen einzelnen regulären Ausdruck im Java-Stil. Beispiel: topic-a,topic-b oder my-prefix-.*.

  • Der Connector kann auch die Replikation bestimmter Themen überspringen, die Sie mit der Property topics.exclude angeben. Ausschlüsse haben dabei Vorrang vor Einschlüssen.

  • Der Connector schreibt die empfangenen Nachrichten in den Zielcluster.

  • Für den Connector sind die Verbindungsdetails des Quell- und Zielclusters erforderlich, z. B. source.cluster.bootstrap.servers und target.cluster.bootstrap.servers.

  • Der Connector erfordert auch Aliase für die Quell- und Zielcluster, wie in source.cluster.alias und target.cluster.alias angegeben. Standardmäßig werden replizierte Themen automatisch mit dem Quellalias umbenannt. Beispiel: Ein Thema mit dem Namen orders aus einer Quelle mit dem Alias primary wird im Ziel zu primary.orders.

  • ACLs, die den replizierten Themen zugeordnet sind, werden ebenfalls vom Quell- zum Zielcluster synchronisiert. Dies kann mit der sync.topic.acls.enabled-Property deaktiviert werden.

  • Authentifizierungsdetails für die Verbindung zu Quell- und Zielclustern müssen in der Konfiguration angegeben werden, wenn dies von den Clustern erforderlich ist. Sie müssen Attribute wie security.protocol, sasl.mechanism und sasl.jaas.config konfigurieren, die mit source.cluster. für die Quelle und target.cluster. für das Ziel beginnen.

  • Der Connector verwendet interne Themen. Möglicherweise müssen Sie Eigenschaften konfigurieren, die sich auf diese beziehen, z. B. offset-syncs.topic.replication.factor.

  • Der Connector verwendet die Kafka-Datensatzkonverter key.converter, value.converter und header.converter. Bei der direkten Replikation ist häufig org.apache.kafka.connect.converters.ByteArrayConverter die Standardeinstellung, wodurch keine Konvertierung erfolgt (Pass-Through).

  • Mit der Eigenschaft tasks.max wird die Parallelität des Connectors gesteuert. Durch Erhöhen von tasks.max kann der Durchsatz möglicherweise verbessert werden. Die tatsächliche Parallelität wird jedoch häufig durch die Anzahl der Partitionen in den replizierten Kafka-Quellthemen begrenzt.

Eigenschaften eines MirrorMaker 2.0-Connectors

Geben Sie beim Erstellen oder Aktualisieren eines MirrorMaker 2.0-Connectors die folgenden Eigenschaften an:

Connector-Name

Der Name oder die ID des Connectors. Hinweise zum Benennen der Ressource finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource. Der Name ist unveränderlich.

Connector-Typ

Der Connectortyp muss einer der folgenden sein:

Primärer Kafka-Cluster

Der Managed Service for Apache Kafka-Cluster. Dieses Feld wird automatisch ausgefüllt.

  • Primären Kafka-Cluster als Zielcluster verwenden:Wählen Sie diese Option aus, um Daten aus einem anderen Kafka-Cluster in den primären Managed Service for Apache Kafka-Cluster zu verschieben.

  • Primären Kafka-Cluster als Quellcluster verwenden:Wählen Sie diese Option aus, um Daten aus dem primären Managed Service for Apache Kafka-Cluster in einen anderen Kafka-Cluster zu verschieben.

Ziel- oder Quellcluster

Der sekundäre Kafka-Cluster, der das andere Ende der Pipeline bildet.

  • Managed Service for Apache Kafka-Cluster:Wählen Sie ein Cluster aus dem Drop-down-Menü aus.

  • Selbstverwalteter oder externer Kafka-Cluster:Geben Sie die Bootstrap-Adresse im Format hostname:port_number ein. Beispiel: kafka-test:9092.

Themennamen oder reguläre Ausdrücke

Die zu replizierenden Themen. Geben Sie einzelne Namen an (topic1, topic2) oder verwenden Sie einen regulären Ausdruck (topic.*). Diese Eigenschaft ist für den MirrorMaker 2.0-Quellconnector erforderlich. Der Standardwert ist .*.

Namen von Nutzergruppen oder reguläre Ausdrücke

Die zu replizierenden Verbrauchergruppen. Geben Sie einzelne Namen an (group1, group2) oder verwenden Sie einen regulären Ausdruck (group.*). Diese Eigenschaft ist für den MirrorMaker 2.0-Checkpoint-Connector erforderlich. Der Standardwert ist .*.

Konfiguration

In diesem Abschnitt können Sie zusätzliche, connectorspezifische Konfigurationseigenschaften für den MirrorMaker 2.0-Connector angeben.

Da Daten in Kafka-Themen in verschiedenen Formaten wie Avro, JSON oder Rohbytes vorliegen können, ist die Angabe von Konvertern ein wichtiger Teil der Konfiguration. Mit Converters werden Daten aus dem Format, das in Ihren Kafka-Themen verwendet wird, in das standardisierte interne Format von Kafka Connect übersetzt.

Allgemeinere Informationen zur Rolle von Converters in Kafka Connect, zu unterstützten Converter-Typen und zu gängigen Konfigurationsoptionen finden Sie unter Converters.

Einige gängige Konfigurationen für alle MirrorMaker 2.0-Connectors sind:

  • source.cluster.alias: Alias für den Quellcluster.

  • target.cluster.alias: Alias für den Zielcluster.

Konfigurationen zum Ausschließen bestimmter Ressourcen beim Replizieren von Daten:

  • topics.exclude: Ausgeschlossene Themen. Unterstützt durch Kommas getrennte Themennamen und reguläre Ausdrücke. Ausschlüsse haben Vorrang vor Einschlüssen. Wird für den MirrorMaker 2.0-Quellconnector verwendet. Der Standardwert ist mm2.*.internal,.*.replica,__.*.

  • groups.exclude: Gruppen ausschließen. Unterstützt durch Komma getrennte Gruppen-IDs und reguläre Ausdrücke. Ausschlüsse haben Vorrang vor Einschlüssen. Wird für den Prüfpunkt-Connector von MirrorMaker 2.0 verwendet. Der Standardwert ist console-consumer-.*,connect-.*,__.*.

Für MirrorMaker 2.0-Connectors sind Authentifizierungskonfigurationen erforderlich.

Wenn der Kafka-Quell- oder ‑Zielcluster ein Managed Service for Apache Kafka-Cluster ist, verwendet der Connect-Cluster OAuthBearer zur Authentifizierung. Authentifizierungskonfigurationen sind vorkonfiguriert, sodass Sie die Konfigurationen nicht manuell einrichten müssen.

Bei selbstverwalteten oder lokalen Kafka-Clustern hängen die Authentifizierungskonfigurationen vom Authentifizierungsmechanismus ab, den der Kafka-Cluster unterstützt. Eine Beispielkonfiguration für die Authentifizierung für eine Kafka-Quellclusterkonfiguration sieht so aus:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Eine Beispielkonfiguration für die Authentifizierung für einen Ziel-Kafka-Cluster sieht so aus:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Die verfügbaren Konfigurationseigenschaften hängen vom jeweiligen Connector ab. Sehen Sie nach, welche Konfigurationsbeispiele für die unterstützte Version des MirrorMaker 2.0-Connectors verfügbar sind. Weitere Informationen finden Sie in den folgenden Dokumenten:

Konvertierung von Kafka-Einträgen

Kafka Connect verwendet org.apache.kafka.connect.converters.ByteArrayConverter als Standardkonverter für Schlüssel und Werte. Dies ist eine Pass-Through-Option, bei der keine Konvertierung erfolgt.

Sie können header.converter, key.converter und value.converter so konfigurieren, dass andere Konverter verwendet werden.

Taskanzahl

Mit dem Wert tasks.max wird die maximale Anzahl von Tasks konfiguriert, die Kafka Connect zum Ausführen von MirrorMaker-Connectors verwendet. Damit wird der Grad der Parallelität für einen Connector gesteuert. Wenn Sie die Anzahl der Aufgaben erhöhen, kann der Durchsatz steigen. Dies ist jedoch durch Faktoren wie die Anzahl der Kafka-Themenpartitionen begrenzt.

MirrorMaker 2.0-Quell-Connector erstellen

Bevor Sie einen Connector erstellen, sollten Sie die Dokumentation zu Connector-Eigenschaften lesen.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.

    Die Seite Clusterdetails verbinden wird angezeigt.

  3. Klicken Sie auf Connector erstellen.

    Die Seite Kafka-Connector erstellen wird angezeigt.

  4. Geben Sie für Connector-Name einen String ein.

    Weitere Informationen zum Benennen eines Connectors finden Sie unter Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.

  5. Wählen Sie für Connector-Plug‑in die Option „MirrorMaker 2.0-Quelle“ aus.

  6. Wählen Sie für Primärer Kafka-Cluster eine der folgenden Optionen aus:

    • Primären Kafka-Cluster als Quellcluster verwenden: Zum Verschieben von Daten aus dem Managed Service for Apache Kafka-Cluster.
    • Primären Kafka-Cluster als Zielcluster verwenden: Damit werden Daten in den Managed Service for Apache Kafka-Cluster verschoben.
  7. Wählen Sie für Zielcluster oder Quellcluster eine der folgenden Optionen aus:

    • Managed Service for Apache Kafka-Cluster: Wählen Sie aus dem Menü aus.
    • Selbstverwalteter oder externer Kafka-Cluster: Geben Sie die Bootstrap-Adresse im Format hostname:port_number ein.
  8. Geben Sie die kommagetrennten Themennamen oder das Themen-Regex ein.

  9. Prüfen und passen Sie die Konfigurationen an, einschließlich der erforderlichen Sicherheitseinstellungen.

    Weitere Informationen zur Konfiguration und Authentifizierung finden Sie unter Konfiguration.

  10. Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.

  11. Klicken Sie auf Erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka connectors create aus:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.

    • CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.

    • CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.

    Hier sehen Sie ein Beispiel für eine Konfigurationsdatei für den MirrorMaker 2.0-Quellconnector:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.