Créer un connecteur MirrorMaker 2.0

MirrorMaker 2.0 est un outil qui réplique les sujets entre les clusters Kafka. Vous pouvez créer les connecteurs MirrorMaker 2.0 suivants :

  • Source MirrorMaker 2.0

  • Point de contrôle MirrorMaker 2.0

  • Pulsation MirrorMaker 2.0

Le connecteur source MirrorMaker 2.0 est toujours requis, car il reflète les données des clusters source vers les clusters cibles. Il synchronise également les LCA. Les connecteurs de point de contrôle et de pulsation MirrorMaker 2.0 sont facultatifs. Vous pouvez également créer les connecteurs Checkpoint et Heartbeat MirrorMaker 2.0 sans créer le connecteur Source.

Pour en savoir plus sur ces connecteurs, consultez Présentation des connecteurs.

Comprendre les rôles de cluster dans MirrorMaker 2.0

Lorsque vous configurez MirrorMaker 2.0, il est important de comprendre les différents rôles que jouent les clusters Kafka :

  • Cluster principal : dans le contexte de Managed Service pour Apache Kafka, il s'agit du cluster Managed Service pour Apache Kafka auquel votre cluster Kafka Connect est directement associé. Le cluster Connect héberge l'instance de connecteur MirrorMaker 2.0.

  • Cluster secondaire : il s'agit de l'autre cluster Kafka impliqué dans la réplication. Il peut s'agir d'un autre cluster Managed Service pour Apache Kafka ou d'un cluster externe. Par exemple, elles peuvent être autogérées sur Compute Engine, GKE, sur site ou dans un autre cloud.

  • Cluster source : il s'agit du cluster Kafka à partir duquel MirrorMaker 2.0 réplique les données.

  • Cluster cible : il s'agit du cluster Kafka vers lequel MirrorMaker 2.0 réplique les données.

Le cluster principal peut servir de source ou de cible :

  • Si le cluster principal est la source, le cluster secondaire est la cible. Les données circulent du cluster principal vers le cluster secondaire.

  • Si le cluster principal est la cible, le cluster secondaire est la source. Les données circulent du cluster secondaire vers le cluster principal.

Pour minimiser la latence des opérations d'écriture, nous vous recommandons de désigner le cluster cible comme cluster principal et de placer le cluster Connect dans la même région que le cluster cible.

Vous devez configurer correctement toutes les propriétés du connecteur. Elles incluent également les propriétés d'authentification du producteur qui sont dirigées vers le cluster secondaire. Pour en savoir plus sur les problèmes potentiels, consultez Améliorer la configuration du client MirrorMaker 2.0.

Avant de commencer

Pour créer un connecteur MirrorMaker 2.0, procédez comme suit :

  • Créez un cluster Managed Service pour Apache Kafka (principal). Ce cluster sert de point de terminaison pour votre connecteur MirrorMaker 2.0.

  • Créez un cluster Kafka secondaire. Ce cluster sert d'autre point de terminaison. Il peut s'agir d'un autre cluster Managed Service pour Apache Kafka ou d'un cluster Kafka externe ou autogéré. Vous pouvez configurer plusieurs clusters Kafka comme clusters Kafka secondaires d'un cluster Connect.

  • Créez un cluster Connect qui héberge votre connecteur MirrorMaker 2.0.

  • Assurez-vous que les domaines DNS des clusters Kafka secondaires sont configurés.

  • Configurez des règles de pare-feu pour permettre à l'interface Private Service Connect d'accéder aux clusters Kafka source et cible.

  • Si le cluster Kafka source ou cible est accessible via Internet, configurez un service Cloud NAT pour permettre aux nœuds de calcul Connect d'accéder à Internet.

  • Si les clusters secondaires incluent des clusters Kafka externes ou autogérés, assurez-vous que les identifiants requis sont configurés en tant que ressources secrètes.

Pour en savoir plus sur les exigences concernant le réseau, consultez Sous-réseau des nœuds de calcul.

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour créer un connecteur MirrorMaker 2.0, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Kafka géré (roles/managedkafka.connectorEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un connecteur MirrorMaker 2.0. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un connecteur MirrorMaker 2.0 :

  • Accordez l'autorisation de créer un connecteur sur le cluster Connect parent : managedkafka.connectors.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle Éditeur de connecteurs Kafka gérés, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Créer un connecteur MirrorMaker 2.0 dans un autre projet

Si votre cluster Managed Service pour Apache Kafka principal réside dans un projet différent de celui du cluster Connect exécutant le connecteur MirrorMaker 2.0, consultez Créer un cluster Connect dans un autre projet.

Se connecter à un cluster Kafka secondaire autogéré

Lorsque vous vous connectez à un cluster Kafka secondaire autogéré, faites attention à la mise en réseau et à l'authentification.

  • Mise en réseau : assurez-vous que les paramètres du réseau VPC et les règles de pare-feu appropriés sont configurés pour permettre la connectivité entre le réseau VPC du cluster Connect et le réseau hébergeant le cluster autogéré ou externe.

  • Pour les clusters dans les VPC, consultez Créer et gérer des réseaux VPC.

  • Pour vous connecter à des environnements sur site ou à d'autres environnements cloud, envisagez d'utiliser des solutions telles que Cloud VPN ou Cloud Interconnect. Consultez également les conseils spécifiques pour se connecter à Kafka sur site.

  • Authentification et chiffrement : votre cluster Connect doit s'authentifier auprès du cluster autogéré ou externe (si nécessaire) et gérer tout chiffrement TLS. Pour obtenir des informations générales sur l'authentification Kafka, consultez la documentation sur la sécurité d'Apache Kafka.

Utiliser Secret Manager pour les identifiants

Les clusters Connect s'intègrent directement à Secret Manager. Stockez toutes les valeurs de configuration sensibles telles que les mots de passe, ainsi que le contenu du truststore et du keystore requis pour vous connecter au cluster autogéré ou externe en tant que secrets dans Secret Manager.

  • Les secrets accordés au compte de service du cluster Connect sont automatiquement montés en tant que fichiers dans l'environnement d'exécution du connecteur, sous le répertoire /var/secrets/.

  • Le nom du fichier suit le modèle {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Vous devez utiliser le nom du projet, et non son numéro.

  • La façon dont vous référencez un secret dépend de ce que la propriété Kafka attend : le mot de passe secret ou le chemin d'accès à un fichier.

    • Pour les mots de passe, utilisez la propriété Kafka DirectoryConfigProvider. Spécifiez la valeur au format ${directory:/var/secrets}:{SECRET_FILENAME}. Exemple : password=${directory:/var/secrets}:my-project-db-password-1

    • Pour les chemins de fichiers, spécifiez le chemin direct vers le fichier secret installé. Exemple : ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Pour en savoir plus sur l'octroi d'accès et la configuration de secrets lors de la création d'un cluster Connect, consultez Configurer des secrets Secret Manager.

Fonctionnement d'un connecteur source MirrorMaker

Un connecteur source MirrorMaker extrait les données d'un ou de plusieurs sujets Kafka dans un cluster source et les réplique, ainsi que les LCA, dans les sujets d'un cluster cible.

Voici une explication détaillée de la façon dont le connecteur MirrorMaker Source réplique les données :

  • Le connecteur consomme les messages des sujets Kafka spécifiés dans le cluster source. Spécifiez les sujets à répliquer à l'aide de la propriété de configuration topics, qui accepte les noms de sujets séparés par une virgule ou une seule expression régulière de style Java. Par exemple, topic-a,topic-b ou my-prefix-.*.

  • Le connecteur peut également ignorer la réplication de sujets spécifiques que vous spécifiez à l'aide de la propriété topics.exclude. Les exclusions remplacent les inclusions.

  • Le connecteur écrit les messages consommés dans le cluster cible.

  • Le connecteur nécessite les informations de connexion du cluster source et cible, telles que source.cluster.bootstrap.servers et target.cluster.bootstrap.servers.

  • Le connecteur nécessite également des alias pour les clusters source et cible, comme spécifié par source.cluster.alias et target.cluster.alias. Par défaut, les thèmes répliqués sont automatiquement renommés à l'aide de l'alias source. Par exemple, un sujet nommé orders provenant d'une source avec l'alias primary devient primary.orders dans la cible.

  • Les LCA associés aux sujets répliqués sont également synchronisés du cluster source vers le cluster cible. Vous pouvez désactiver cette fonctionnalité à l'aide de la propriété sync.topic.acls.enabled.

  • Si les clusters source et cible l'exigent, vous devez fournir les informations d'authentification pour vous y connecter dans la configuration. Vous devez configurer des propriétés telles que security.protocol, sasl.mechanism et sasl.jaas.config, préfixées par source.cluster. pour la source et target.cluster. pour la cible.

  • Le connecteur s'appuie sur des thèmes internes. Vous devrez peut-être configurer des propriétés associées, telles que offset-syncs.topic.replication.factor.

  • Le connecteur utilise les convertisseurs d'enregistrements Kafka key.converter, value.converter et header.converter. Pour la réplication directe, ces valeurs sont souvent définies par défaut sur org.apache.kafka.connect.converters.ByteArrayConverter, ce qui n'effectue aucune conversion (transmission directe).

  • La propriété tasks.max contrôle le niveau de parallélisme du connecteur. L'augmentation de tasks.max peut potentiellement améliorer le débit, mais le parallélisme effectif est souvent limité par le nombre de partitions dans les sujets Kafka sources répliqués.

Propriétés d'un connecteur MirrorMaker 2.0

Lorsque vous créez ou mettez à jour un connecteur MirrorMaker 2.0, spécifiez les propriétés suivantes :

Nom du connecteur

Nom ou ID du connecteur. Pour obtenir des instructions sur la façon de nommer la ressource, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom est immuable.

Type de connecteur

Le type de connecteur doit être l'un des suivants :

Cluster Kafka principal

Cluster Managed Service pour Apache Kafka. Le système remplit automatiquement ce champ.

  • Utiliser le cluster Kafka principal comme cluster cible : sélectionnez cette option pour transférer les données d'un autre cluster Kafka vers le cluster Managed Service pour Apache Kafka principal.

  • Utiliser le cluster Kafka principal comme cluster source : sélectionnez cette option pour déplacer les données du cluster Managed Service pour Apache Kafka principal vers un autre cluster Kafka.

Cluster cible ou source

Cluster Kafka secondaire qui constitue l'autre extrémité du pipeline.

  • Cluster Managed Service pour Apache Kafka : sélectionnez un cluster dans le menu déroulant.

  • Cluster Kafka autogéré ou externe : saisissez l'adresse d'amorçage au format hostname:port_number. Par exemple : kafka-test:9092.

Noms de sujets ou expressions régulières

Thèmes à répliquer. Spécifiez des noms individuels (topic1, topic2) ou utilisez une expression régulière (topic.*). Cette propriété est requise pour le connecteur source MirrorMaker 2.0. La valeur par défaut est .*.

Noms ou expressions régulières de groupes de consommateurs

Groupes de consommateurs à répliquer. Spécifiez des noms individuels (group1, group2) ou utilisez une expression régulière (group.*). Cette propriété est requise pour le connecteur MirrorMaker 2.0 Checkpoint. La valeur par défaut est .*.

Configuration

Cette section vous permet de spécifier des propriétés de configuration supplémentaires et spécifiques au connecteur MirrorMaker 2.0.

Étant donné que les données des thèmes Kafka peuvent être dans différents formats (Avro, JSON ou octets bruts, par exemple), une partie essentielle de la configuration consiste à spécifier des convertisseurs. Les convertisseurs traduisent les données du format utilisé dans vos sujets Kafka au format interne standardisé de Kafka Connect.

Pour obtenir des informations plus générales sur le rôle des convertisseurs dans Kafka Connect, les types de convertisseurs compatibles et les options de configuration courantes, consultez Convertisseurs.

Voici quelques configurations courantes pour tous les connecteurs MirrorMaker 2.0 :

  • source.cluster.alias : alias du cluster source.

  • target.cluster.alias : alias du cluster cible.

Configurations utilisées pour exclure des ressources spécifiques lors de la réplication des données :

  • topics.exclude : thèmes exclus. Accepte les noms de sujets et les expressions régulières séparés par une virgule. Les exclusions sont prioritaires par rapport aux inclusions. Utilisé pour le connecteur source MirrorMaker 2.0. La valeur par défaut est mm2.*.internal,.*.replica,__.*.

  • groups.exclude : exclure des groupes. Compatible avec les ID de groupe et les expressions régulières séparés par une virgule. Les exclusions sont prioritaires par rapport aux inclusions. Utilisé pour le connecteur de point de contrôle MirrorMaker 2.0. La valeur par défaut est console-consumer-.*,connect-.*,__.*.

Les configurations d'authentification sont requises pour les connecteurs MirrorMaker 2.0.

Si le cluster Kafka source ou cible est un cluster Managed Service pour Apache Kafka, le cluster Connect utilise OAuthBearer pour s'authentifier auprès de celui-ci. Les configurations d'authentification sont préconfigurées. Vous n'avez donc pas besoin de les configurer manuellement.

Pour le cluster Kafka autogéré ou sur site, les configurations d'authentification dépendent du mécanisme d'authentification compatible avec le cluster Kafka. Voici un exemple de configuration d'authentification pour une configuration de cluster Kafka source :

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Voici un exemple de configuration d'authentification pour un cluster Kafka cible :

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Les propriétés de configuration disponibles dépendent du connecteur spécifique. Consultez la version du connecteur MirrorMaker 2.0 compatible pour voir quels exemples de configuration sont acceptés. Consultez les documents suivants :

Conversion des enregistrements Kafka

Kafka Connect utilise org.apache.kafka.connect.converters.ByteArrayConverter comme convertisseur par défaut pour la clé et la valeur, ce qui fournit une option de transfert qui n'effectue aucune conversion.

Vous pouvez configurer header.converter, key.converter et value.converter pour utiliser d'autres convertisseurs.

Nombre de tâches

La valeur tasks.max configure le nombre maximal de tâches que Kafka Connect utilise pour exécuter les connecteurs MirrorMaker. Il contrôle le niveau de parallélisme d'un connecteur. L'augmentation du nombre de tâches peut accroître le débit, mais est limitée par des facteurs tels que le nombre de partitions de sujet Kafka.

Créer un connecteur source MirrorMaker 2.0

Avant de créer un connecteur, consultez la documentation sur les propriétés des connecteurs.

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect dans lequel vous souhaitez créer le connecteur.

    La page Détails du cluster Connect s'affiche.

  3. Cliquez sur Créer un connecteur.

    La page Créer un connecteur Kafka s'affiche.

  4. Saisissez une chaîne pour le nom du connecteur.

    Pour en savoir plus sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  5. Pour Connector plugin (Plug-in de connecteur), sélectionnez "MirrorMaker 2.0 Source".

  6. Pour Cluster Kafka principal, choisissez l'une des options suivantes :

    • Utiliser le cluster Kafka principal comme cluster source : pour déplacer les données du cluster Managed Service pour Apache Kafka.
    • Utiliser le cluster Kafka principal comme cluster cible : pour transférer les données vers le cluster Managed Service pour Apache Kafka.
  7. Pour Cluster cible ou Cluster source, choisissez l'une des options suivantes :

    • Cluster Managed Service pour Apache Kafka : sélectionnez cette option dans le menu.
    • Cluster Kafka autogéré ou externe : saisissez l'adresse d'amorçage au format hostname:port_number.
  8. Saisissez les noms de sujets ou expressions régulières de sujets (séparés par une virgule).

  9. Vérifiez et ajustez les configurations, y compris les paramètres de sécurité requis.

    Pour en savoir plus sur la configuration et l'authentification, consultez Configuration.

  10. Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.

  11. Cliquez sur Créer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka connectors create :

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • LOCATION : emplacement où vous créez le connecteur. Il doit s'agir du même emplacement que celui où vous avez créé le cluster Connect.

    • CONNECT_CLUSTER_ID : ID du cluster Connect où le connecteur est créé.

    • CONFIG_FILE : chemin d'accès au fichier de configuration YAML pour le connecteur BigQuery Sink.

    Voici un exemple de fichier de configuration pour le connecteur MirrorMaker 2.0 Source :

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    Vous pouvez utiliser une ressource Terraform pour créer un connecteur.

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.