MirrorMaker 2.0 è uno strumento che replica gli argomenti tra i cluster Kafka. Puoi creare i seguenti connettori MirrorMaker 2.0:
Origine MirrorMaker 2.0
Checkpoint MirrorMaker 2.0
Heartbeat MirrorMaker 2.0
Il connettore di origine MirrorMaker 2.0 è sempre necessario perché esegue il mirroring dei dati dai cluster di origine a quelli di destinazione. Sincronizza anche le ACL. I connettori di checkpoint e heartbeat MirrorMaker 2.0 sono facoltativi. Puoi anche creare i connettori MirrorMaker 2.0 Checkpoint e Heartbeat senza creare il connettore di origine.
Per saperne di più su questi connettori, consulta Panoramica dei connettori.
Informazioni sui ruoli del cluster in MirrorMaker 2.0
Quando configuri MirrorMaker 2.0, è importante comprendere i diversi ruoli svolti dai cluster Kafka:
Cluster primario:nel contesto di Managed Service per Apache Kafka, si tratta del cluster Managed Service per Apache Kafka a cui è collegato direttamente il cluster Kafka Connect. Il cluster di connessione ospita l'istanza del connettore MirrorMaker 2.0.
Cluster secondario:si tratta dell'altro cluster Kafka coinvolto nella replica. Può essere un altro cluster Managed Service per Apache Kafka o un cluster esterno. Alcuni esempi sono self-managed su Compute Engine, GKE, on-premise o in un altro cloud.
Cluster di origine:questo è il cluster Kafka da cui MirrorMaker 2.0 replica i dati.
Cluster di destinazione:questo è il cluster Kafka a cui MirrorMaker 2.0 replica i dati.
Il cluster principale può fungere da origine o da destinazione:
Se il cluster primario è l'origine, il cluster secondario è la destinazione. I dati vengono trasferiti dal cluster primario a quello secondario.
Se il cluster primario è la destinazione, il cluster secondario è l'origine. I dati vengono trasferiti dal cluster secondario al cluster primario.
Per ridurre al minimo la latenza per le operazioni di scrittura, ti consigliamo di designare il cluster di destinazione come cluster primario e di inserire il cluster di connessione nella stessa regione del cluster di destinazione.
Devi configurare correttamente tutte le proprietà del connettore. Queste includono anche le proprietà di autenticazione del produttore indirizzate al cluster secondario. Per informazioni dettagliate sui potenziali problemi, vedi Migliorare la configurazione del client MirrorMaker 2.0.
Prima di iniziare
Per creare un connettore MirrorMaker 2.0, completa queste attività:
Crea un cluster Managed Service per Apache Kafka (primario). Questo cluster funge da endpoint del connettore MirrorMaker 2.0.
Crea un cluster Kafka secondario. Questo cluster funge da altro endpoint. Può trattarsi di un altro cluster Managed Service per Apache Kafka o di un cluster Kafka esterno o autogestito. Puoi configurare più cluster Kafka come cluster Kafka secondari di un cluster di connessione.
Crea un cluster di connessione che ospita il connettore MirrorMaker 2.0.
Assicurati che i domini DNS dei cluster Kafka secondari siano configurati.
Configura le regole firewall per consentire all'interfaccia Private Service Connect di raggiungere i cluster Kafka di origine e di destinazione.
Se si accede al cluster Kafka di origine o di destinazione tramite internet, configura Cloud NAT per consentire ai worker di Connect di accedere a internet.
Se i cluster secondari includono cluster Kafka esterni o autogestiti, assicurati che le credenziali richieste siano configurate come risorse secret.
Per saperne di più sui requisiti di networking, consulta Subnet worker.
Ruoli e autorizzazioni richiesti
Per ottenere le autorizzazioni
necessarie per creare un connettore MirrorMaker 2.0,
chiedi all'amministratore di concederti il
ruolo IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor)
nel progetto.
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore MirrorMaker 2.0. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:
Autorizzazioni obbligatorie
Per creare un connettore MirrorMaker 2.0 sono necessarie le seguenti autorizzazioni:
-
Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale:
managedkafka.connectors.create
Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.
Per maggiori informazioni sul ruolo Editor connettore Kafka gestito, consulta la sezione Ruoli predefiniti di Managed Service per Apache Kafka.
Crea un connettore MirrorMaker 2.0 in un altro progetto
Se il cluster Managed Service per Apache Kafka principale si trova in un progetto diverso dal cluster di connessione che esegue il connettore MirrorMaker 2.0, consulta Creare un cluster di connessione in un progetto diverso.
Connettersi a un cluster Kafka secondario autogestito
Quando ti connetti a un cluster Kafka secondario autogestito, presta attenzione al networking e all'autenticazione.
Networking:assicurati che le impostazioni di rete VPC e le regole firewall siano configurate per consentire la connettività tra la rete VPC del cluster Connect e la rete che ospita il cluster autogestito o esterno.
Per i cluster all'interno dei VPC, consulta Crea e gestisci le reti VPC.
Per la connessione a ambienti on-premise o ad altri cloud, valuta soluzioni come Cloud VPN o Cloud Interconnect. Consulta anche le indicazioni specifiche per la connessione a Kafka on-premise.
Autenticazione e crittografia:il cluster Connect deve autenticarsi con il cluster autogestito o esterno (se necessario) e gestire qualsiasi crittografia TLS. Per informazioni generali sull'autenticazione Kafka, consulta la documentazione sulla sicurezza di Apache Kafka.
Utilizzare Secret Manager per le credenziali
I cluster di connessione si integrano direttamente con Secret Manager. Archivia tutti i valori di configurazione sensibili, come password e contenuti di truststore e keystore necessari per connettersi al cluster autogestito o esterno come secret in Secret Manager.
I secret concessi al account di servizio del cluster Connect vengono montati automaticamente come file nell'ambiente di runtime del connettore nella directory
/var/secrets/.Il nome del file segue il pattern
{PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Devi utilizzare il nome del progetto, non il numero.Il modo in cui fai riferimento a un secret dipende dal fatto che la proprietà Kafka si aspetti la password del secret o il percorso di un file:
Per le password, utilizza la proprietà
DirectoryConfigProviderdi Kafka. Specifica il valore nel formato${directory:/var/secrets}:{SECRET_FILENAME}. Esempio:password=${directory:/var/secrets}:my-project-db-password-1Per i percorsi dei file, specifica il percorso diretto del file secret montato. Esempio:
ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3
Per ulteriori dettagli sulla concessione dell'accesso e sulla configurazione dei secret durante la creazione del cluster Connect, consulta Configurare i secret di Secret Manager.
Come funziona un connettore di origine MirrorMaker
Un connettore di origine MirrorMaker estrae i dati da uno o più argomenti Kafka in un cluster di origine e li replica, insieme agli ACL, negli argomenti di un cluster di destinazione.
Ecco una suddivisione dettagliata di come il connettore di origine MirrorMaker replica i dati:
Il connettore utilizza i messaggi degli argomenti Kafka specificati all'interno del cluster di origine. Specifica gli argomenti da replicare utilizzando la proprietà di configurazione
topics, che accetta nomi di argomenti separati da virgole o una singola espressione regolare in stile Java. Ad esempio,topic-a,topic-bomy-prefix-.*.Il connettore può anche saltare la replica di argomenti specifici che specifichi utilizzando la proprietà
topics.exclude; le esclusioni hanno la precedenza sulle inclusioni.Il connettore scrive i messaggi utilizzati nel cluster di destinazione.
Il connettore richiede i dettagli di connessione del cluster di origine e di destinazione, ad esempio
source.cluster.bootstrap.serversetarget.cluster.bootstrap.servers.Il connettore richiede anche alias per i cluster di origine e di destinazione come specificato da
source.cluster.aliasetarget.cluster.alias. Per impostazione predefinita, gli argomenti replicati vengono rinominati automaticamente utilizzando l'alias di origine. Ad esempio, un argomento denominatoordersda una sorgente con aliasprimarydiventaprimary.ordersnella destinazione.Anche gli ACL associati agli argomenti replicati vengono sincronizzati dal cluster di origine a quello di destinazione. Questa funzionalità può essere disattivata utilizzando la proprietà
sync.topic.acls.enabled.Se richiesto dai cluster, i dettagli di autenticazione per la connessione ai cluster di origine e di destinazione devono essere forniti nella configurazione. Devi configurare proprietà come
security.protocol,sasl.mechanismesasl.jaas.config, con il prefissosource.cluster.per l'origine etarget.cluster.per la destinazione.Il connettore si basa su argomenti interni. Potresti dover configurare proprietà correlate, ad esempio
offset-syncs.topic.replication.factor.Il connettore utilizza i convertitori di record Kafka
key.converter,value.convertereheader.converter. Per la replica diretta, questi valori predefiniti sono spessoorg.apache.kafka.connect.converters.ByteArrayConverter, che non esegue alcuna conversione (pass-through).La proprietà
tasks.maxcontrolla il livello di parallelismo per il connettore. L'aumento ditasks.maxpuò potenzialmente migliorare la velocità effettiva, ma il parallelismo effettivo è spesso limitato dal numero di partizioni negli argomenti Kafka di origine replicati.
Proprietà di un connettore MirrorMaker 2.0
Quando crei o aggiorni un connettore MirrorMaker 2.0, specifica queste proprietà:
Nome connettore
Il nome o l'ID del connettore. Per le linee guida su come assegnare un nome alla risorsa, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome è immutabile.
Tipo di connettore
Il tipo di connettore deve essere uno dei seguenti:
Cluster Kafka principale
Il cluster Managed Service per Apache Kafka. Il sistema compila automaticamente questo campo.
Utilizza il cluster Kafka principale come cluster di destinazione:seleziona questa opzione per spostare i dati da un altro cluster Kafka al cluster Managed Service per Apache Kafka principale.
Utilizza il cluster Kafka principale come cluster di origine:seleziona questa opzione per spostare i dati dal cluster Managed Service per Apache Kafka principale a un altro cluster Kafka.
Cluster di destinazione o di origine
Il cluster Kafka secondario che costituisce l'altra estremità della pipeline.
Cluster Managed Service per Apache Kafka:seleziona un cluster dal menu a discesa.
Cluster Kafka autogestito o esterno:inserisci l'indirizzo di bootstrap nel formato
hostname:port_number. Ad esempiokafka-test:9092.
Nomi degli argomenti o espressioni regolari
Gli argomenti da replicare. Specifica singoli nomi (topic1, topic2) o utilizza un'espressione regolare (topic.*). Questa proprietà è obbligatoria per il connettore di origine MirrorMaker 2.0. Il valore predefinito è .*
Nomi dei gruppi di consumer o espressioni regolari
I gruppi di consumer da replicare. Specifica i singoli nomi (group1, group2) o utilizza un'espressione regolare (group.*). Questa proprietà è obbligatoria per il connettore di checkpoint MirrorMaker 2.0. Il valore predefinito è .*
Configurazione
Questa sezione ti consente di specificare proprietà di configurazione aggiuntive e specifiche del connettore per il connettore MirrorMaker 2.0.
Poiché i dati negli argomenti Kafka possono essere in vari formati come Avro, JSON o byte non elaborati, una parte fondamentale della configurazione consiste nello specificare i convertitori. I convertitori traducono i dati dal formato utilizzato negli argomenti Kafka nel formato interno standardizzato di Kafka Connect.
Per informazioni più generali sul ruolo dei convertitori in Kafka Connect, sui tipi di convertitori supportati e sulle opzioni di configurazione comuni, consulta la sezione Convertitori.
Alcune configurazioni comuni per tutti i connettori MirrorMaker 2.0 includono:
source.cluster.alias: alias del cluster di origine.target.cluster.alias: alias per il cluster di destinazione.
Configurazioni utilizzate per escludere risorse specifiche durante la replica dei dati:
topics.exclude: argomenti esclusi. Supporta nomi di argomenti separati da virgole e regex. Le esclusioni hanno la precedenza sulle inclusioni. Utilizzato per il connettore di origine MirrorMaker 2.0. Il valore predefinito èmm2.*.internal,.*.replica,__.*groups.exclude: escludi gruppi. Supporta ID gruppo e regex separati da virgole. Le esclusioni hanno la precedenza sulle inclusioni. Utilizzato per il connettore di checkpoint MirrorMaker 2.0. Il valore predefinito èconsole-consumer-.*,connect-.*,__.*
Le configurazioni di autenticazione sono obbligatorie per i connettori MirrorMaker 2.0.
Se il cluster Kafka di origine o di destinazione è un cluster Managed Service per Apache Kafka, il cluster di connessione utilizza OAuthBearer per l'autenticazione. Le configurazioni di autenticazione sono preconfigurate, quindi non devi configurarle manualmente.
Per il cluster Kafka autogestito o on-premise, le configurazioni di autenticazione dipendono dal meccanismo di autenticazione supportato dal cluster Kafka. Un esempio di configurazione dell'autenticazione per una configurazione del cluster Kafka di origine è il seguente:
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Un esempio di configurazione di autenticazione per un cluster Kafka di destinazione configurazione è il seguente:
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Le proprietà di configurazione disponibili dipendono dal connettore specifico. Controlla la versione del connettore MirrorMaker 2.0 supportata per vedere quali esempi di configurazione sono supportati. Consulta i seguenti documenti:
Proprietà di configurazione di MirrorMaker 2.0 comuni a tutti i connettori MirrorMaker 2.0
Proprietà di configurazione specifiche dell'origine MirrorMaker 2.0
Proprietà di configurazione specifiche del checkpoint MirrorMaker 2.0
Proprietà di configurazione specifiche per l'heartbeat di MirrorMaker 2.0
Conversione dei record Kafka
Kafka Connect utilizza org.apache.kafka.connect.converters.ByteArrayConverter
come convertitore predefinito per chiave e valore, che fornisce un'opzione
pass-through che non esegue alcuna conversione.
Puoi configurare header.converter, key.converter e value.converter per
utilizzare altri convertitori.
Numero di attività
Il valore tasks.max configura il numero massimo di attività che Kafka Connect utilizza per eseguire
i connettori MirrorMaker. Controlla il livello di parallelismo per un connettore.
L'aumento del numero di attività può aumentare la velocità effettiva, ma è limitato da fattori come il numero di partizioni dell'argomento Kafka.
Crea un connettore di origine MirrorMaker 2.0
Prima di creare un connettore, esamina la documentazione relativa alle proprietà del connettore.
Console
Nella console Google Cloud , vai alla pagina Connetti cluster.
Fai clic sul cluster di connessione in cui vuoi creare il connettore.
Viene visualizzata la pagina Dettagli cluster di connessione.
Fai clic su Crea connettore.
Viene visualizzata la pagina Crea connettore Kafka.
Per Nome connettore, inserisci una stringa.
Per saperne di più su come assegnare un nome a un connettore, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.
Per Connector plugin (Plug-in connettore), seleziona "MirrorMaker 2.0 Source" (Origine MirrorMaker 2.0).
Per Cluster Kafka principale, scegli una delle seguenti opzioni:
- Utilizza il cluster Kafka principale come cluster di origine: per spostare i dati dal cluster Managed Service per Apache Kafka.
- Utilizza il cluster Kafka principale come cluster di destinazione: per spostare i dati nel cluster Managed Service per Apache Kafka.
Per Cluster di destinazione o Cluster di origine, scegli una delle seguenti opzioni:
- Cluster Managed Service per Apache Kafka: seleziona questa opzione dal menu.
- Cluster Kafka autogestito o esterno: inserisci l'indirizzo
bootstrap nel formato
hostname:port_number.
Inserisci i nomi o le regex degli argomenti separati da virgole.
Controlla e modifica le configurazioni, incluse le impostazioni di sicurezza richieste.
Per saperne di più su configurazione e autenticazione, consulta Configurazione.
Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.
Fai clic su Crea.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Esegui il comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESostituisci quanto segue:
CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.
LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.
CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.
CONFIG_FILE: il percorso del file di configurazione YAML per il connettore BigQuery Sink.
Ecco un esempio di file di configurazione per il connettore di origine MirrorMaker 2.0:
connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector" name: "MM2_CONNECTOR_ID" source.cluster.alias: "source" target.cluster.alias: "target" topics: "GMK_TOPIC_NAME" source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS" target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS" offset-syncs.topic.replication.factor: "1" source.cluster.security.protocol: "SASL_SSL" source.cluster.sasl.mechanism: "OAUTHBEARER" source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; target.cluster.security.protocol: "SASL_SSL" target.cluster.sasl.mechanism: "OAUTHBEARER" target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Terraform
Puoi utilizzare una risorsa Terraform per creare un connettore.
Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.
Go
Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Java
Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Python
Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Passaggi successivi
Risolvere i problemi relativi a un connettore MirrorMaker 2.0
Mettere in pausa, riprendere, interrompere e riavviare un connettore