Crea un connettore MirrorMaker 2.0

I connettori MirrorMaker 2.0 replicano i dati da un cluster Kafka a un altro. Puoi utilizzare i connettori MirrorMaker 2.0 per eseguire il disaster recovery tra cluster Kafka e per ottenere alta affidabilità e tolleranza agli errori nelle tue applicazioni basate su Kafka.

Un connettore MirrorMaker 2.0 può stabilire connessioni tra due cluster Managed Service per Apache Kafka oppure tra un cluster Managed Service per Apache Kafka e un cluster Kafka esterno o autogestito.

I casi d'uso dei connettori MirrorMaker 2.0 includono:

  • Migrazione dei dati. Sposta il tuo workload Kafka in un nuovo cluster Managed Service per Apache Kafka.

  • Disaster recovery. Crea un cluster di backup per garantire la continuità operativa in caso di errori.

  • Aggregazione dei dati. Consolida i dati di più cluster Kafka in un cluster Managed Service per Apache Kafka centrale per eseguire l'analisi.

Tipi di connettori MirrorMaker 2.0

Managed Service per Apache Kafka fornisce i seguenti tipi di connettori MirrorMaker 2.0.

Connettore di origine MirrorMaker 2.0

Il connettore di origine MirrorMaker 2.0 replica gli argomenti e i dati da un cluster Kafka (l'origine) a un altro cluster Kafka (la destinazione).

Utilizza questo connettore per i seguenti scenari di migrazione:

  • Replica o esegui la migrazione dei dati da un cluster Kafka esterno o autogestito in un cluster Managed Service per Apache Kafka.

  • Replica o esegui la migrazione dei dati da un cluster Managed Service per Apache Kafka a un cluster Kafka esterno o autogestito.

  • Replica i dati Kafka tra le regioni per soddisfare i requisiti di ripristino di emergenza e alta disponibilità.

Per la replica di base dei dati tra cluster Kafka, puoi utilizzare il connettore di origine MirrorMaker 2.0 da solo. Gli altri connettori MirrorMaker 2.0 forniscono funzionalità aggiuntive per la replica dei dati.

Connettore di checkpoint MirrorMaker 2.0

Il connettore di checkpoint MirrorMaker 2.0 copia gli offset dei consumer da un cluster Kafka a un altro. Gli offset del consumer indicano l'ultimo messaggio utilizzato correttamente all'interno di una partizione. La replica degli offset garantisce che i consumer del cluster di destinazione possano riprendere l'elaborazione dallo stesso punto del cluster di origine.

Questo connettore consente i seguenti casi d'uso:

  • Garantisci tempi di inattività minimi durante il passaggio dal cluster di origine al cluster di destinazione.

  • Consente il failover senza interruzioni fornendo uno stato del consumer coerente nei cluster.

  • Conserva l'avanzamento del consumer quando sposti i dati nel cluster di destinazione.

Connettore heartbeat MirrorMaker 2.0

Il connettore di heartbeat MirrorMaker 2.0 genera messaggi heartbeat periodici su un cluster Kafka. Il connettore scrive questi messaggi in un argomento dedicato nel cluster, in genere denominato heartbeats.

Dopo aver configurato un connettore di heartbeat MirrorMaker 2.0, puoi utilizzare un connettore di origine MirrorMaker 2.0 per replicare l'argomento heartbeats in un cluster di destinazione. Osservando i battiti replicati, puoi implementare i seguenti casi d'uso:

  • Monitora lo stato e le prestazioni della replica dei dati tra i cluster.

  • Verifica la connessione e il flusso di dati tra i cluster anche quando non vengono prodotti altri dati.

  • Configura gli avvisi in Cloud Monitoring per ricevere una notifica se la replica del battito cardiaco si interrompe.

Utilizzato da solo, il connettore Heartbeat non monitora automaticamente la replica; devi replicare l'argomento heartbeats e osservare i messaggi heartbeat che arrivano al cluster di destinazione.

Informazioni sui ruoli del cluster in MirrorMaker 2.0

Quando configuri MirrorMaker 2.0, è importante comprendere i diversi ruoli svolti dai cluster Kafka:

  • Cluster primario:nel contesto di Managed Service per Apache Kafka, si tratta del cluster Managed Service per Apache Kafka a cui è collegato direttamente il cluster Kafka Connect. Il cluster di connessione ospita l'istanza del connettore MirrorMaker 2.0.

  • Cluster secondario:si tratta dell'altro cluster Kafka coinvolto nella replica. Può essere un altro cluster Managed Service per Apache Kafka o un cluster esterno. Alcuni esempi sono self-managed su Compute Engine, GKE, on-premise o in un altro cloud.

  • Cluster di origine:questo è il cluster Kafka da cui MirrorMaker 2.0 replica i dati.

  • Cluster di destinazione:questo è il cluster Kafka a cui MirrorMaker 2.0 replica i dati.

Il cluster principale può fungere da origine o da destinazione:

  • Se il cluster primario è l'origine, il cluster secondario è la destinazione. I dati vengono trasferiti dal cluster primario al cluster secondario.

  • Se il cluster primario è la destinazione, il cluster secondario è l'origine. I dati vengono trasferiti dal cluster secondario al cluster primario.

Per ridurre al minimo la latenza per le operazioni di scrittura, ti consigliamo di designare il cluster di destinazione come cluster principale e di inserire il cluster di connessione nella stessa regione del cluster di destinazione.

Devi configurare correttamente tutte le proprietà del connettore. Queste includono anche le proprietà di autenticazione del produttore indirizzate al cluster secondario. Per informazioni dettagliate sui potenziali problemi, vedi Migliorare la configurazione del client MirrorMaker 2.0.

Prima di iniziare

Per creare un connettore MirrorMaker 2.0, completa queste attività:

  • Crea un cluster Managed Service per Apache Kafka (primario). Questo cluster funge da endpoint del connettore MirrorMaker 2.0.

  • Crea un cluster Kafka secondario. Questo cluster funge da altro endpoint. Può trattarsi di un altro cluster Managed Service per Apache Kafka o di un cluster Kafka esterno o autogestito. Puoi configurare più cluster Kafka come cluster Kafka secondari di un cluster di connessione.

  • Crea un cluster di connessione che ospita il connettore MirrorMaker 2.0.

  • Assicurati che i domini DNS dei cluster Kafka secondari siano configurati.

  • Configura le regole firewall per consentire all'interfaccia Private Service Connect di raggiungere i cluster Kafka di origine e di destinazione.

  • Se si accede al cluster Kafka di origine o di destinazione tramite internet, configura Cloud NAT per consentire ai worker di Connect di accedere a internet.

  • Se i cluster secondari includono cluster Kafka esterni o autogestiti, assicurati che le credenziali richieste siano configurate come risorse secret.

Per saperne di più sui requisiti di networking, consulta Subnet worker.

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un connettore, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un connettore sono necessarie le seguenti autorizzazioni:

  • Crea un connettore: managedkafka.connectors.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Crea un connettore MirrorMaker 2.0 in un altro progetto

Se il cluster Managed Service per Apache Kafka principale si trova in un progetto diverso dal cluster di connessione che esegue il connettore MirrorMaker 2.0, consulta Creare un cluster di connessione in un progetto diverso.

Connettersi a un cluster Kafka secondario autogestito

Quando ti connetti a un cluster Kafka secondario autogestito, presta attenzione al networking e all'autenticazione.

  • Networking:assicurati che le impostazioni di rete VPC e le regole firewall siano configurate per consentire la connettività tra la rete VPC del cluster Connect e la rete che ospita il cluster autogestito o esterno.

  • Per i cluster all'interno dei VPC, consulta Crea e gestisci le reti VPC.

  • Per la connessione a ambienti on-premise o in altri cloud, valuta soluzioni come Cloud VPN o Cloud Interconnect. Consulta anche le indicazioni specifiche per la connessione a Kafka on-premise.

  • Autenticazione e crittografia:il cluster Connect deve autenticarsi con il cluster autogestito o esterno (se necessario) e gestire qualsiasi crittografia TLS. Per informazioni generali sull'autenticazione Kafka, consulta la documentazione sulla sicurezza di Apache Kafka.

Utilizzare Secret Manager per le credenziali

I cluster di connessione si integrano direttamente con Secret Manager. Archivia tutti i valori di configurazione sensibili, come password e contenuti di truststore e keystore necessari per connettersi al cluster autogestito o esterno come secret in Secret Manager.

  • I secret concessi al account di servizio del cluster di connessione vengono montati automaticamente come file nell'ambiente di runtime del connettore nella directory /var/secrets/.

  • Il nome del file segue il pattern {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Devi utilizzare il nome del progetto, non il numero del progetto.

  • Il modo in cui fai riferimento a un secret dipende dal fatto che la proprietà Kafka si aspetti la password del secret o il percorso di un file:

    • Per le password, utilizza la proprietà Kafka DirectoryConfigProvider. Specifica il valore nel formato ${directory:/var/secrets}:{SECRET_FILENAME}. Esempio: password=${directory:/var/secrets}:my-project-db-password-1

    • Per i percorsi dei file, specifica il percorso diretto del file secret montato. Esempio: ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Per maggiori dettagli sulla concessione dell'accesso e sulla configurazione dei secret durante la creazione del cluster Connect, consulta Configurare i secret di Secret Manager.

Come funziona un connettore di origine MirrorMaker

Un connettore di origine MirrorMaker estrae i dati da uno o più argomenti Kafka in un cluster di origine e li replica, insieme agli ACL, negli argomenti di un cluster di destinazione.

Ecco una suddivisione dettagliata di come il connettore di origine MirrorMaker replica i dati:

  • Il connettore utilizza i messaggi degli argomenti Kafka specificati all'interno del cluster di origine. Specifica gli argomenti da replicare utilizzando la proprietà di configurazione topics, che accetta nomi di argomenti separati da virgole o una singola espressione regolare in stile Java. Ad esempio, topic-a,topic-b o my-prefix-.*.

  • Il connettore può anche saltare la replica di argomenti specifici che specifichi utilizzando la proprietà topics.exclude; le esclusioni hanno la precedenza sulle inclusioni.

  • Il connettore scrive i messaggi utilizzati nel cluster di destinazione.

  • Il connettore richiede i dettagli di connessione del cluster di origine e di destinazione, ad esempio source.cluster.bootstrap.servers e target.cluster.bootstrap.servers.

  • Il connettore richiede anche alias per i cluster di origine e di destinazione come specificato da source.cluster.alias e target.cluster.alias. Per impostazione predefinita, gli argomenti replicati vengono rinominati automaticamente utilizzando l'alias di origine. Ad esempio, un argomento denominato orders da una sorgente con alias primary diventa primary.orders nella destinazione.

  • Anche gli ACL associati agli argomenti replicati vengono sincronizzati dal cluster di origine a quello di destinazione. Questa funzionalità può essere disattivata utilizzando la proprietà sync.topic.acls.enabled.

  • Se richiesto dai cluster, i dettagli di autenticazione per la connessione ai cluster di origine e di destinazione devono essere forniti nella configurazione. Devi configurare proprietà come security.protocol, sasl.mechanism e sasl.jaas.config, con il prefisso source.cluster. per l'origine e target.cluster. per la destinazione.

  • Il connettore si basa su argomenti interni. Potresti dover configurare proprietà correlate, ad esempio offset-syncs.topic.replication.factor.

  • Il connettore utilizza i convertitori di record Kafka key.converter, value.converter e header.converter. Per la replica diretta, questi valori predefiniti sono spesso org.apache.kafka.connect.converters.ByteArrayConverter, che non esegue alcuna conversione (pass-through).

  • La proprietà tasks.max controlla il livello di parallelismo per il connettore. L'aumento di tasks.max può potenzialmente migliorare il throughput, ma il parallelismo effettivo è spesso limitato dal numero di partizioni negli argomenti Kafka di origine replicati.

Proprietà di un connettore MirrorMaker 2.0

Quando crei o aggiorni un connettore MirrorMaker 2.0, specifica queste proprietà:

Nome connettore

Il nome o l'ID del connettore. Per le linee guida su come assegnare un nome alla risorsa, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome è immutabile.

Tipo di connettore

Il tipo di connettore deve essere uno dei seguenti:

Cluster Kafka principale

Il cluster Managed Service per Apache Kafka. Il sistema compila automaticamente questo campo.

  • Utilizza il cluster Kafka principale come cluster di destinazione:seleziona questa opzione per spostare i dati da un altro cluster Kafka al cluster Managed Service per Apache Kafka principale.

  • Utilizza il cluster Kafka principale come cluster di origine:seleziona questa opzione per spostare i dati dal cluster Managed Service per Apache Kafka principale a un altro cluster Kafka.

Cluster di destinazione o di origine

Il cluster Kafka secondario che costituisce l'altra estremità della pipeline.

  • Cluster Managed Service per Apache Kafka:seleziona un cluster dal menu a discesa.

  • Cluster Kafka autogestito o esterno: inserisci l'indirizzo di bootstrap nel formato hostname:port_number. Ad esempio kafka-test:9092.

Nomi degli argomenti o espressioni regolari

Gli argomenti da replicare. Specifica singoli nomi (topic1, topic2) o utilizza un'espressione regolare (topic.*). Questa proprietà è obbligatoria per il connettore di origine MirrorMaker 2.0. Il valore predefinito è .*

Nomi dei gruppi di consumer o espressioni regolari

I gruppi di consumer da replicare. Specifica i singoli nomi (group1, group2) o utilizza un'espressione regolare (group.*). Questa proprietà è obbligatoria per il connettore di checkpoint MirrorMaker 2.0. Il valore predefinito è .*

Configurazione

Questa sezione ti consente di specificare proprietà di configurazione aggiuntive e specifiche del connettore per il connettore MirrorMaker 2.0.

Poiché i dati negli argomenti Kafka possono essere in vari formati come Avro, JSON o byte non elaborati, una parte fondamentale della configurazione prevede la specifica dei convertitori. I convertitori traducono i dati dal formato utilizzato negli argomenti Kafka nel formato interno standardizzato di Kafka Connect.

Per informazioni più generali sul ruolo dei convertitori in Kafka Connect, sui tipi di convertitori supportati e sulle opzioni di configurazione comuni, consulta la sezione Convertitori.

Alcune configurazioni comuni per tutti i connettori MirrorMaker 2.0 includono:

  • source.cluster.alias: alias del cluster di origine.

  • target.cluster.alias: alias per il cluster di destinazione.

Configurazioni utilizzate per escludere risorse specifiche durante la replica dei dati:

  • topics.exclude: argomenti esclusi. Supporta nomi degli argomenti e regex separati da virgole. Le esclusioni hanno la precedenza sulle inclusioni. Utilizzato per il connettore di origine MirrorMaker 2.0. Il valore predefinito è mm2.*.internal,.*.replica,__.*

  • groups.exclude: escludi gruppi. Supporta ID gruppo e regex separati da virgole. Le esclusioni hanno la precedenza sulle inclusioni. Utilizzato per il connettore di checkpoint MirrorMaker 2.0. Il valore predefinito è console-consumer-.*,connect-.*,__.*

Le configurazioni di autenticazione sono obbligatorie per i connettori MirrorMaker 2.0.

Se il cluster Kafka di origine o di destinazione è un cluster Managed Service per Apache Kafka, il cluster di connessione utilizza OAuthBearer per l'autenticazione. Le configurazioni di autenticazione sono preconfigurate, quindi non devi configurarle manualmente.

Per il cluster Kafka autogestito o on-premise, le configurazioni di autenticazione dipendono dal meccanismo di autenticazione supportato dal cluster Kafka. Un esempio di configurazione dell'autenticazione per una configurazione del cluster Kafka di origine è il seguente:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Un esempio di configurazione dell'autenticazione per un cluster Kafka di destinazione è il seguente:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Le proprietà di configurazione disponibili dipendono dal connettore specifico. Controlla la versione del connettore MirrorMaker 2.0 supportata per vedere quali esempi di configurazione sono supportati. Consulta i seguenti documenti:

Conversione dei record Kafka

Kafka Connect utilizza org.apache.kafka.connect.converters.ByteArrayConverter come convertitore predefinito per la chiave e il valore, che fornisce un'opzione pass-through che non esegue alcuna conversione.

Puoi configurare header.converter, key.converter e value.converter per utilizzare altri convertitori.

Numero di attività

Il valore tasks.max configura il numero massimo di attività che Kafka Connect utilizza per eseguire i connettori MirrorMaker. Controlla il livello di parallelismo per un connettore. L'aumento del numero di attività può aumentare la velocità effettiva, ma è limitato da fattori come il numero di partizioni dell'argomento Kafka.

Crea un connettore di origine MirrorMaker 2.0

Prima di creare un connettore, esamina la documentazione relativa alle proprietà del connettore.

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster di connessione in cui vuoi creare il connettore.

    Viene visualizzata la pagina Dettagli cluster di connessione.

  3. Fai clic su Crea connettore.

    Viene visualizzata la pagina Crea connettore Kafka.

  4. Per Nome connettore, inserisci una stringa.

    Per saperne di più su come assegnare un nome a un connettore, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.

  5. Per Connector plugin (Plug-in connettore), seleziona "MirrorMaker 2.0 Source" (Origine MirrorMaker 2.0).

  6. Per Cluster Kafka principale, scegli una delle seguenti opzioni:

    • Utilizza il cluster Kafka principale come cluster di origine: per spostare i dati dal cluster Managed Service per Apache Kafka.
    • Utilizza il cluster Kafka principale come cluster di destinazione: per spostare i dati nel cluster Managed Service per Apache Kafka.
  7. Per Cluster di destinazione o Cluster di origine, scegli una delle seguenti opzioni:

    • Cluster Managed Service per Apache Kafka: seleziona questa opzione dal menu.
    • Cluster Kafka autogestito o esterno: inserisci l'indirizzo bootstrap nel formato hostname:port_number.
  8. Inserisci i nomi o le regex degli argomenti separati da virgole.

  9. Controlla e modifica le configurazioni, incluse le impostazioni di sicurezza richieste.

    Per saperne di più su configurazione e autenticazione, consulta Configurazione.

  10. Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.

  11. Fai clic su Crea.

gcloud

  1. Nella console Google Cloud , attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console Google Cloud viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installata e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Esegui il comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Sostituisci quanto segue:

    • CONNECTOR_ID: l'ID o il nome del connettore. Per maggiori informazioni su come assegnare un nome a un connettore, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.

    • CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.

    • CONFIG_FILE: il percorso del file di configurazione YAML per il connettore.

    Ecco un esempio di file di configurazione per il connettore di origine MirrorMaker 2.0:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

Terraform

Puoi utilizzare una risorsa Terraform per creare un connettore.

# A single MirrorMaker 2 Source Connector to replicate from one source to one target.
resource "google_managed_kafka_connector" "default" {
  project         = data.google_project.default.project_id
  connector_id    = "mm2-source-to-target-connector-id"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    "name"                 = "mm2-source-to-target-connector-id"
    "tasks.max"            = "3"
    "source.cluster.alias" = "source"
    "target.cluster.alias" = "target"
    "topics"               = ".*" # Replicate all topics from the source
    # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
    # for one or more Kafka brokers in the source/target cluster.
    "source.cluster.bootstrap.servers" = "source_cluster_dns"
    "target.cluster.bootstrap.servers" = "target_cluster_dns"
    # You can define an exclusion policy for topics as follows:
    # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
    "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
  }

  provider = google-beta
}

Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "mm2-source-to-target-connector-id"
	// sourceBootstrapServers := "source_cluster_dns"
	// targetBootstrapServers := "target_cluster_dns"
	// tasksMax := "3"
	// sourceClusterAlias := "source"
	// targetClusterAlias := "target"
	// topics := ".*"
	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	config := map[string]string{
		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
		"name":                 connectorID,
		"tasks.max":            tasksMax,
		"source.cluster.alias": sourceClusterAlias,
		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
		// Replicate all topics from the source
		"topics": topics,
		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
		// the source/target cluster.
		// For example: "kafka-broker:9092"
		"source.cluster.bootstrap.servers": sourceBootstrapServers,
		"target.cluster.bootstrap.servers": targetBootstrapServers,
		// You can define an exclusion policy for topics as follows:
		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
		"topics.exclude": topicsExclude,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateMirrorMaker2SourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String maxTasks = "3";
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-mirrormaker2-connector";
    String sourceClusterBootstrapServers = "my-source-cluster:9092";
    String targetClusterBootstrapServers = "my-target-cluster:9092";
    String sourceClusterAlias = "source";
    String targetClusterAlias = "target"; // This is usually the primary cluster.
    String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
    String topics = ".*";
    // You can define an exclusion policy for topics as follows:
    // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    String topicsExclude = "mm2.*.internal,.*.replica,__.*";
    createMirrorMaker2SourceConnector(
        projectId,
        region,
        maxTasks,
        connectClusterId,
        connectorId,
        sourceClusterBootstrapServers,
        targetClusterBootstrapServers,
        sourceClusterAlias,
        targetClusterAlias,
        connectorClass,
        topics,
        topicsExclude);
  }

  public static void createMirrorMaker2SourceConnector(
      String projectId,
      String region,
      String maxTasks,
      String connectClusterId,
      String connectorId,
      String sourceClusterBootstrapServers,
      String targetClusterBootstrapServers,
      String sourceClusterAlias,
      String targetClusterAlias,
      String connectorClass,
      String topics,
      String topicsExclude)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("tasks.max", maxTasks);
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("source.cluster.alias", sourceClusterAlias);
    configMap.put("target.cluster.alias", targetClusterAlias);
    configMap.put("topics", topics);
    configMap.put("topics.exclude", topicsExclude);
    configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
    configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "source.cluster.alias": source_cluster_alias,
    "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
    # Replicate all topics from the source
    "topics": topics,
    # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    # the source/target cluster.
    # For example: "kafka-broker:9092"
    "source.cluster.bootstrap.servers": source_bootstrap_servers,
    "target.cluster.bootstrap.servers": target_bootstrap_servers,
    # You can define an exclusion policy for topics as follows:
    # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    "topics.exclude": topics_exclude,
}

connector = Connector()
# The name of the connector.
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.