Créer un connecteur MirrorMaker 2.0

Les connecteurs MirrorMaker 2.0 répliquent les données d'un cluster Kafka vers un autre cluster Kafka. Vous pouvez utiliser les connecteurs MirrorMaker 2.0 pour effectuer la reprise après sinistre entre les clusters Kafka, et pour obtenir une haute disponibilité et une tolérance aux pannes dans vos applications basées sur Kafka.

Un connecteur MirrorMaker 2.0 peut établir des connexions entre deux clusters Managed Service pour Apache Kafka, ou entre un cluster Managed Service pour Apache Kafka et un cluster Kafka externe ou autogéré.

Voici quelques exemples d'utilisation des connecteurs MirrorMaker 2.0 :

  • Migration de données. Transférez votre charge de travail Kafka vers un nouveau cluster Managed Service pour Apache Kafka.

  • Reprise après sinistre. Créez un cluster de sauvegarde pour assurer la continuité de l'activité en cas de défaillance.

  • Agrégation des données. Consolidez les données de plusieurs clusters Kafka dans un cluster Managed Service pour Apache Kafka centralisé afin d'effectuer des analyses.

Types de connecteurs MirrorMaker 2.0

Managed Service pour Apache Kafka fournit les types de connecteurs MirrorMaker 2.0 suivants.

Connecteur source MirrorMaker 2.0

Le connecteur source MirrorMaker 2.0 réplique les sujets et les données d'un cluster Kafka (la source) vers un autre cluster Kafka (la cible).

Utilisez ce connecteur pour les scénarios de migration suivants :

  • Répliquez ou migrez des données d'un cluster Kafka externe ou autogéré vers un cluster Managed Service pour Apache Kafka.

  • Répliquez ou migrez des données d'un cluster Managed Service pour Apache Kafka vers un cluster Kafka externe ou autogéré.

  • Répliquez les données Kafka dans plusieurs régions pour répondre aux exigences de reprise après sinistre et de haute disponibilité.

Pour la réplication de données de base entre des clusters Kafka, vous pouvez utiliser le connecteur source MirrorMaker 2.0 seul. Les autres connecteurs MirrorMaker 2.0 offrent des fonctionnalités supplémentaires pour la réplication des données.

Connecteur de point de contrôle MirrorMaker 2.0

Le connecteur de point de contrôle MirrorMaker 2.0 copie les décalages des clients d'un cluster Kafka vers un autre. Les décalages de consommateur indiquent le dernier message consommé avec succès dans une partition. La réplication des décalages garantit que les consommateurs du cluster cible peuvent reprendre le traitement au même point que le cluster source.

Ce connecteur permet les cas d'utilisation suivants :

  • Assurez-vous de réduire au minimum les temps d'arrêt lors du basculement du cluster source vers le cluster cible.

  • Permet un basculement fluide en fournissant un état de consommateur cohérent entre les clusters.

  • Conservez la progression des consommateurs lorsque vous déplacez des données vers le cluster cible.

Connecteur de pulsation MirrorMaker 2.0

Le connecteur de pulsation MirrorMaker 2.0 génère des messages de pulsation périodiques sur un cluster Kafka. Le connecteur écrit ces messages dans un sujet dédié du cluster, généralement nommé heartbeats.

Après avoir configuré un connecteur de pulsation MirrorMaker 2.0, vous pouvez utiliser un connecteur source MirrorMaker 2.0 pour répliquer le sujet heartbeats dans un cluster cible. En observant les pulsations répliquées, vous pouvez implémenter les cas d'utilisation suivants :

  • Surveillez l'état et les performances de la réplication des données entre les clusters.

  • Vérifiez la connexion et le flux de données entre les clusters, même lorsqu'aucune autre donnée n'est produite.

  • Configurez des alertes dans Cloud Monitoring pour être averti si la réplication du signal de présence s'arrête.

Utilisé seul, le connecteur Heartbeat ne surveille pas automatiquement la réplication. Vous devez répliquer le sujet heartbeats et observer les messages Heartbeat qui arrivent au cluster cible.

Comprendre les rôles de cluster dans MirrorMaker 2.0

Lorsque vous configurez MirrorMaker 2.0, il est important de comprendre les différents rôles que jouent les clusters Kafka :

  • Cluster principal : dans le contexte de Managed Service pour Apache Kafka, il s'agit du cluster Managed Service pour Apache Kafka auquel votre cluster Kafka Connect est directement associé. Le cluster Connect héberge l'instance de connecteur MirrorMaker 2.0.

  • Cluster secondaire : il s'agit de l'autre cluster Kafka impliqué dans la réplication. Il peut s'agir d'un autre cluster Managed Service pour Apache Kafka ou d'un cluster externe. Par exemple, ils peuvent être autogérés sur Compute Engine, GKE, sur site ou dans un autre cloud.

  • Cluster source : il s'agit du cluster Kafka à partir duquel MirrorMaker 2.0 réplique les données.

  • Cluster cible : il s'agit du cluster Kafka vers lequel MirrorMaker 2.0 réplique les données.

Le cluster principal peut servir de source ou de cible :

  • Si le cluster principal est la source, le cluster secondaire est la cible. Les données circulent du cluster principal vers le cluster secondaire.

  • Si le cluster principal est la cible, le cluster secondaire est la source. Les données sont transférées du cluster secondaire vers le cluster principal.

Pour minimiser la latence des opérations d'écriture, nous vous recommandons de désigner le cluster cible comme cluster principal et de placer le cluster Connect dans la même région que le cluster cible.

Vous devez configurer correctement toutes les propriétés du connecteur. Elles incluent également les propriétés d'authentification du producteur qui sont dirigées vers le cluster secondaire. Pour en savoir plus sur les problèmes potentiels, consultez Améliorer la configuration du client MirrorMaker 2.0.

Avant de commencer

Pour créer un connecteur MirrorMaker 2.0, procédez comme suit :

  • Créez un cluster Managed Service pour Apache Kafka (principal). Ce cluster sert de point de terminaison pour votre connecteur MirrorMaker 2.0.

  • Créez un cluster Kafka secondaire. Ce cluster sert d'autre point de terminaison. Il peut s'agir d'un autre cluster Managed Service pour Apache Kafka ou d'un cluster Kafka externe ou autogéré. Vous pouvez configurer plusieurs clusters Kafka comme clusters Kafka secondaires d'un cluster Connect.

  • Créez un cluster Connect qui héberge votre connecteur MirrorMaker 2.0.

  • Assurez-vous que les domaines DNS des clusters Kafka secondaires sont configurés.

  • Configurez des règles de pare-feu pour permettre à l'interface Private Service Connect d'accéder aux clusters Kafka source et cible.

  • Si le cluster Kafka source ou cible est accessible via Internet, configurez un service Cloud NAT pour permettre aux nœuds de calcul Connect d'accéder à Internet.

  • Si les clusters secondaires incluent des clusters Kafka externes ou autogérés, assurez-vous que les identifiants requis sont configurés en tant que ressources secrètes.

Pour en savoir plus sur les exigences concernant le réseau, consultez Sous-réseau des nœuds de calcul.

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour créer un connecteur, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteurs Kafka gérés (roles/managedkafka.connectorEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un connecteur. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un connecteur :

  • Créez un connecteur : managedkafka.connectors.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Créer un connecteur MirrorMaker 2.0 dans un autre projet

Si votre cluster Managed Service pour Apache Kafka principal réside dans un projet différent de celui du cluster Connect exécutant le connecteur MirrorMaker 2.0, consultez Créer un cluster Connect dans un autre projet.

Se connecter à un cluster Kafka secondaire autogéré

Lorsque vous vous connectez à un cluster Kafka secondaire autogéré, faites attention à la mise en réseau et à l'authentification.

  • Mise en réseau : assurez-vous que les paramètres réseau VPC et les règles de pare-feu appropriés sont configurés pour permettre la connectivité entre le réseau VPC du cluster Connect et le réseau hébergeant le cluster autogéré ou externe.

  • Pour les clusters dans les VPC, consultez Créer et gérer des réseaux VPC.

  • Pour vous connecter à des environnements sur site ou à d'autres environnements cloud, envisagez d'utiliser des solutions telles que Cloud VPN ou Cloud Interconnect. Consultez également les conseils spécifiques pour se connecter à Kafka sur site.

  • Authentification et chiffrement : votre cluster Connect doit s'authentifier auprès du cluster autogéré ou externe (si nécessaire) et gérer tout chiffrement TLS. Pour obtenir des informations générales sur l'authentification Kafka, consultez la documentation sur la sécurité d'Apache Kafka.

Utiliser Secret Manager pour les identifiants

Les clusters Connect s'intègrent directement à Secret Manager. Stockez toutes les valeurs de configuration sensibles telles que les mots de passe, ainsi que le contenu du truststore et du keystore requis pour vous connecter au cluster autogéré ou externe en tant que secrets dans Secret Manager.

  • Les secrets accordés au compte de service du cluster Connect sont automatiquement montés en tant que fichiers dans l'environnement d'exécution du connecteur, sous le répertoire /var/secrets/.

  • Le nom du fichier suit le modèle {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Vous devez utiliser le nom du projet, et non son numéro.

  • La façon dont vous référencez un secret dépend de la propriété Kafka qui attend le mot de passe secret ou le chemin d'accès à un fichier :

    • Pour les mots de passe, utilisez la propriété Kafka DirectoryConfigProvider. Spécifiez la valeur au format ${directory:/var/secrets}:{SECRET_FILENAME}. Exemple : password=${directory:/var/secrets}:my-project-db-password-1

    • Pour les chemins de fichiers, spécifiez le chemin direct vers le fichier secret installé. Exemple : ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Pour en savoir plus sur l'octroi d'accès et la configuration de secrets lors de la création d'un cluster Connect, consultez Configurer des secrets Secret Manager.

Fonctionnement d'un connecteur source MirrorMaker

Un connecteur source MirrorMaker extrait les données d'un ou de plusieurs sujets Kafka dans un cluster source et réplique ces données, ainsi que les LCA, dans les sujets d'un cluster cible.

Voici une explication détaillée de la façon dont le connecteur MirrorMaker Source réplique les données :

  • Le connecteur consomme les messages des sujets Kafka spécifiés dans le cluster source. Spécifiez les thèmes à répliquer à l'aide de la propriété de configuration topics, qui accepte les noms de thèmes séparés par une virgule ou une seule expression régulière de style Java. Par exemple, topic-a,topic-b ou my-prefix-.*.

  • Le connecteur peut également ignorer la réplication de sujets spécifiques que vous spécifiez à l'aide de la propriété topics.exclude. Les exclusions remplacent les inclusions.

  • Le connecteur écrit les messages consommés dans le cluster cible.

  • Le connecteur nécessite les informations de connexion du cluster source et cible, telles que source.cluster.bootstrap.servers et target.cluster.bootstrap.servers.

  • Le connecteur nécessite également des alias pour les clusters source et de destination, comme spécifié par source.cluster.alias et target.cluster.alias. Par défaut, les thèmes répliqués sont automatiquement renommés à l'aide de l'alias source. Par exemple, un sujet nommé orders provenant d'une source avec l'alias primary devient primary.orders dans la cible.

  • Les LCA associés aux sujets répliqués sont également synchronisés du cluster source vers le cluster cible. Vous pouvez désactiver cette fonctionnalité à l'aide de la propriété sync.topic.acls.enabled.

  • Si les clusters source et cible l'exigent, vous devez fournir les informations d'authentification pour vous y connecter dans la configuration. Vous devez configurer des propriétés telles que security.protocol, sasl.mechanism et sasl.jaas.config, préfixées par source.cluster. pour la source et target.cluster. pour la cible.

  • Le connecteur s'appuie sur des thèmes internes. Vous devrez peut-être configurer des propriétés associées, telles que offset-syncs.topic.replication.factor.

  • Le connecteur utilise les convertisseurs d'enregistrements Kafka key.converter, value.converter et header.converter. Pour la réplication directe, ces valeurs sont souvent définies par défaut sur org.apache.kafka.connect.converters.ByteArrayConverter, qui n'effectue aucune conversion (transmission).

  • La propriété tasks.max contrôle le niveau de parallélisme du connecteur. L'augmentation de tasks.max peut potentiellement améliorer le débit, mais le parallélisme effectif est souvent limité par le nombre de partitions dans les sujets Kafka sources répliqués.

Propriétés d'un connecteur MirrorMaker 2.0

Lorsque vous créez ou mettez à jour un connecteur MirrorMaker 2.0, spécifiez les propriétés suivantes :

Nom du connecteur

Nom ou ID du connecteur. Pour obtenir des instructions sur la façon de nommer la ressource, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom est immuable.

Type de connecteur

Le type de connecteur doit être l'un des suivants :

Cluster Kafka principal

Cluster Managed Service pour Apache Kafka. Le système remplit automatiquement ce champ.

  • Utiliser le cluster Kafka principal comme cluster cible : sélectionnez cette option pour transférer les données d'un autre cluster Kafka vers le cluster Managed Service pour Apache Kafka principal.

  • Utiliser le cluster Kafka principal comme cluster source : sélectionnez cette option pour déplacer les données du cluster Managed Service pour Apache Kafka principal vers un autre cluster Kafka.

Cluster cible ou source

Cluster Kafka secondaire qui constitue l'autre extrémité du pipeline.

  • Cluster Managed Service pour Apache Kafka : sélectionnez un cluster dans le menu déroulant.

  • Cluster Kafka autogéré ou externe : saisissez l'adresse d'amorçage au format hostname:port_number. Par exemple : kafka-test:9092.

Noms de sujets ou expressions régulières

Sujets à répliquer. Spécifiez des noms individuels (topic1, topic2) ou utilisez une expression régulière (topic.*). Cette propriété est requise pour le connecteur source MirrorMaker 2.0. La valeur par défaut est .*.

Noms ou expressions régulières de groupes de consommateurs

Groupes de consommateurs à répliquer. Spécifiez des noms individuels (group1, group2) ou utilisez une expression régulière (group.*). Cette propriété est requise pour le connecteur MirrorMaker 2.0 Checkpoint. La valeur par défaut est .*.

Configuration

Cette section vous permet de spécifier des propriétés de configuration supplémentaires et spécifiques au connecteur MirrorMaker 2.0.

Étant donné que les données des thèmes Kafka peuvent être dans différents formats (Avro, JSON ou octets bruts, par exemple), une partie essentielle de la configuration consiste à spécifier des convertisseurs. Les convertisseurs traduisent les données du format utilisé dans vos sujets Kafka au format interne standardisé de Kafka Connect.

Pour obtenir des informations plus générales sur le rôle des convertisseurs dans Kafka Connect, les types de convertisseurs compatibles et les options de configuration courantes, consultez Convertisseurs.

Voici quelques configurations courantes pour tous les connecteurs MirrorMaker 2.0 :

  • source.cluster.alias : alias du cluster source.

  • target.cluster.alias : alias du cluster cible.

Configurations utilisées pour exclure des ressources spécifiques lors de la réplication des données :

  • topics.exclude : thèmes exclus. Accepte les noms de sujets et les expressions régulières séparés par une virgule. Les exclusions sont prioritaires par rapport aux inclusions. Utilisé pour le connecteur source MirrorMaker 2.0. La valeur par défaut est mm2.*.internal,.*.replica,__.*.

  • groups.exclude : exclure des groupes. Compatible avec les ID de groupe et les expressions régulières séparés par une virgule. Les exclusions sont prioritaires par rapport aux inclusions. Utilisé pour le connecteur de point de contrôle MirrorMaker 2.0. La valeur par défaut est console-consumer-.*,connect-.*,__.*.

Les configurations d'authentification sont requises pour les connecteurs MirrorMaker 2.0.

Si le cluster Kafka source ou cible est un cluster Managed Service pour Apache Kafka, le cluster Connect utilise OAuthBearer pour s'authentifier auprès de celui-ci. Les configurations d'authentification sont préconfigurées. Vous n'avez donc pas besoin de les configurer manuellement.

Pour le cluster Kafka autogéré ou sur site, les configurations d'authentification dépendent du mécanisme d'authentification pris en charge par le cluster Kafka. Voici un exemple de configuration d'authentification pour une configuration de cluster Kafka source :

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Voici un exemple de configuration d'authentification pour un cluster Kafka cible :

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Les propriétés de configuration disponibles dépendent du connecteur spécifique. Consultez la version du connecteur MirrorMaker 2.0 compatible pour voir quels exemples de configuration sont acceptés. Consultez les documents suivants :

Conversion des enregistrements Kafka

Kafka Connect utilise org.apache.kafka.connect.converters.ByteArrayConverter comme convertisseur par défaut pour la clé et la valeur, ce qui fournit une option de transfert qui n'effectue aucune conversion.

Vous pouvez configurer header.converter, key.converter et value.converter pour utiliser d'autres convertisseurs.

Nombre de tâches

La valeur tasks.max configure le nombre maximal de tâches que Kafka Connect utilise pour exécuter les connecteurs MirrorMaker. Il contrôle le niveau de parallélisme d'un connecteur. L'augmentation du nombre de tâches peut accroître le débit, mais est limitée par des facteurs tels que le nombre de partitions de sujet Kafka.

Créer un connecteur source MirrorMaker 2.0

Avant de créer un connecteur, consultez la documentation sur les propriétés des connecteurs.

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect dans lequel vous souhaitez créer le connecteur.

    La page Connecter les détails du cluster s'affiche.

  3. Cliquez sur Créer un connecteur.

    La page Créer un connecteur Kafka s'affiche.

  4. Saisissez une chaîne pour le nom du connecteur.

    Pour savoir comment nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  5. Pour Connector plugin (Plug-in de connecteur), sélectionnez "MirrorMaker 2.0 Source".

  6. Pour Cluster Kafka principal, choisissez l'une des options suivantes :

    • Utiliser le cluster Kafka principal comme cluster source : pour déplacer les données du cluster Managed Service pour Apache Kafka.
    • Utiliser le cluster Kafka principal comme cluster cible : pour déplacer les données vers le cluster Managed Service pour Apache Kafka.
  7. Pour Cluster cible ou Cluster source, choisissez l'une des options suivantes :

    • Cluster Managed Service pour Apache Kafka : sélectionnez une option dans le menu.
    • Cluster Kafka autogéré ou externe : saisissez l'adresse d'amorçage au format hostname:port_number.
  8. Saisissez les noms de sujets ou expressions régulières de sujets (séparés par une virgule).

  9. Vérifiez et ajustez les configurations, y compris les paramètres de sécurité requis.

    Pour en savoir plus sur la configuration et l'authentification, consultez Configuration.

  10. Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.

  11. Cliquez sur Créer.

gcloud

  1. Dans la console Google Cloud , activez Cloud Shell.

    Activer Cloud Shell

    En bas de la console Google Cloud , une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la commande gcloud managed-kafka connectors create :

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • LOCATION : emplacement dans lequel vous créez le connecteur. Il doit s'agir du même emplacement que celui où vous avez créé le cluster Connect.

    • CONNECT_CLUSTER_ID : ID du cluster Connect où le connecteur est créé.

    • CONFIG_FILE : chemin d'accès au fichier de configuration YAML du connecteur.

    Voici un exemple de fichier de configuration pour le connecteur MirrorMaker 2.0 Source :

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

Terraform

Vous pouvez utiliser une ressource Terraform pour créer un connecteur.

# A single MirrorMaker 2 Source Connector to replicate from one source to one target.
resource "google_managed_kafka_connector" "default" {
  project         = data.google_project.default.project_id
  connector_id    = "mm2-source-to-target-connector-id"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    "name"                 = "mm2-source-to-target-connector-id"
    "tasks.max"            = "3"
    "source.cluster.alias" = "source"
    "target.cluster.alias" = "target"
    "topics"               = ".*" # Replicate all topics from the source
    # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
    # for one or more Kafka brokers in the source/target cluster.
    "source.cluster.bootstrap.servers" = "source_cluster_dns"
    "target.cluster.bootstrap.servers" = "target_cluster_dns"
    # You can define an exclusion policy for topics as follows:
    # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
    "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
  }

  provider = google-beta
}

Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application(ADC, Application Default Credentials). Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "mm2-source-to-target-connector-id"
	// sourceBootstrapServers := "source_cluster_dns"
	// targetBootstrapServers := "target_cluster_dns"
	// tasksMax := "3"
	// sourceClusterAlias := "source"
	// targetClusterAlias := "target"
	// topics := ".*"
	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	config := map[string]string{
		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
		"name":                 connectorID,
		"tasks.max":            tasksMax,
		"source.cluster.alias": sourceClusterAlias,
		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
		// Replicate all topics from the source
		"topics": topics,
		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
		// the source/target cluster.
		// For example: "kafka-broker:9092"
		"source.cluster.bootstrap.servers": sourceBootstrapServers,
		"target.cluster.bootstrap.servers": targetBootstrapServers,
		// You can define an exclusion policy for topics as follows:
		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
		"topics.exclude": topicsExclude,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateMirrorMaker2SourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String maxTasks = "3";
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-mirrormaker2-connector";
    String sourceClusterBootstrapServers = "my-source-cluster:9092";
    String targetClusterBootstrapServers = "my-target-cluster:9092";
    String sourceClusterAlias = "source";
    String targetClusterAlias = "target"; // This is usually the primary cluster.
    String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
    String topics = ".*";
    // You can define an exclusion policy for topics as follows:
    // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    String topicsExclude = "mm2.*.internal,.*.replica,__.*";
    createMirrorMaker2SourceConnector(
        projectId,
        region,
        maxTasks,
        connectClusterId,
        connectorId,
        sourceClusterBootstrapServers,
        targetClusterBootstrapServers,
        sourceClusterAlias,
        targetClusterAlias,
        connectorClass,
        topics,
        topicsExclude);
  }

  public static void createMirrorMaker2SourceConnector(
      String projectId,
      String region,
      String maxTasks,
      String connectClusterId,
      String connectorId,
      String sourceClusterBootstrapServers,
      String targetClusterBootstrapServers,
      String sourceClusterAlias,
      String targetClusterAlias,
      String connectorClass,
      String topics,
      String topicsExclude)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("tasks.max", maxTasks);
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("source.cluster.alias", sourceClusterAlias);
    configMap.put("target.cluster.alias", targetClusterAlias);
    configMap.put("topics", topics);
    configMap.put("topics.exclude", topicsExclude);
    configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
    configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "source.cluster.alias": source_cluster_alias,
    "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
    # Replicate all topics from the source
    "topics": topics,
    # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    # the source/target cluster.
    # For example: "kafka-broker:9092"
    "source.cluster.bootstrap.servers": source_bootstrap_servers,
    "target.cluster.bootstrap.servers": target_bootstrap_servers,
    # You can define an exclusion policy for topics as follows:
    # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    "topics.exclude": topics_exclude,
}

connector = Connector()
# The name of the connector.
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.