Crea un conector de MirrorMaker 2.0

MirrorMaker 2.0 es una herramienta que replica temas entre clústeres de Kafka. Puedes crear los siguientes conectores de MirrorMaker 2.0:

  • Fuente de MirrorMaker 2.0

  • Punto de control de MirrorMaker 2.0

  • Señal de monitoreo de funcionamiento de MirrorMaker 2.0

El conector de origen de MirrorMaker 2.0 siempre es necesario, ya que refleja los datos de los clústeres de origen a los de destino. También sincroniza las LCA. Los conectores de punto de control y señal de monitoreo de funcionamiento de MirrorMaker 2.0 son opcionales. También puedes crear los conectores de Checkpoint y Heartbeat de MirrorMaker 2.0 sin crear el conector de Source.

Para obtener más información sobre estos conectores, consulta la Descripción general de los conectores.

Comprende los roles de clúster en MirrorMaker 2.0

Cuando configures MirrorMaker 2.0, es importante que comprendas los diferentes roles que desempeñan los clústeres de Kafka:

  • Clúster principal: En el contexto de Managed Service para Apache Kafka, es el clúster de Managed Service para Apache Kafka al que se adjunta directamente tu clúster de Kafka Connect. El clúster de Connect aloja la instancia del conector de MirrorMaker 2.0.

  • Clúster secundario: Es el otro clúster de Kafka que participa en la replicación. Puede ser otro clúster de Managed Service para Apache Kafka o un clúster externo. Algunos ejemplos son la autoadministración en Compute Engine, GKE, de forma local o en otra nube.

  • Clúster de origen: Es el clúster de Kafka desde el que MirrorMaker 2.0 replica los datos.

  • Clúster de destino: Es el clúster de Kafka al que MirrorMaker 2.0 replica los datos.

El clúster principal puede servir como fuente o destino:

  • Si el clúster principal es la fuente, el clúster secundario es el destino. Los datos fluyen desde el clúster principal hacia el secundario.

  • Si el clúster principal es el destino, el clúster secundario es la fuente. Los datos fluyen desde el clúster secundario hacia el clúster principal.

Para minimizar la latencia de las operaciones de escritura, se recomienda designar el clúster de destino como el clúster principal y colocar el clúster de Connect en la misma región que el clúster de destino.

Debes configurar correctamente todas las propiedades del conector. También incluyen propiedades de autenticación del productor que se dirigen al clúster secundario. Para obtener detalles sobre los posibles problemas, consulta Mejora la configuración del cliente de MirrorMaker 2.0.

Antes de comenzar

Para crear un conector de MirrorMaker 2.0, completa estas tareas:

  • Crea un clúster de Managed Service para Apache Kafka (principal). Este clúster funciona como un extremo de tu conector de MirrorMaker 2.0.

  • Crea un clúster secundario de Kafka. Este clúster funciona como el otro extremo. Puede ser otro clúster de Managed Service para Apache Kafka o un clúster de Kafka externo o autoadministrado. Puedes configurar varios clústeres de Kafka como clústeres secundarios de un clúster de Connect.

  • Crea un clúster de Connect que aloje tu conector de MirrorMaker 2.0.

  • Asegúrate de que estén configurados los dominios DNS de los clústeres secundarios de Kafka.

  • Configura reglas de firewall para permitir que la interfaz de Private Service Connect llegue a los clústeres de Kafka de origen y destino.

  • Si se accede al clúster de Kafka de origen o de destino a través de Internet, configura una instancia de Cloud NAT para permitir que los trabajadores de Connect accedan a Internet.

  • Si los clústeres secundarios incluyen clústeres de Kafka externos o autoadministrados, asegúrate de que las credenciales requeridas estén configuradas como recursos secretos.

Para obtener más información sobre los requisitos de redes, consulta Subred del trabajador.

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear un conector de MirrorMaker 2.0, pídele a tu administrador que te otorgue el rol de IAM de editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear un conector de MirrorMaker 2.0. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un conector de MirrorMaker 2.0:

  • Otorga el permiso para crear un conector en el clúster de Connect principal: managedkafka.connectors.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de Editor de conectores de Kafka administrados, consulta Roles predefinidos de Managed Service for Apache Kafka.

Crea un conector de MirrorMaker 2.0 en otro proyecto

Si tu clúster principal de Managed Service para Apache Kafka reside en un proyecto diferente del clúster de Connect que ejecuta el conector de MirrorMaker 2.0, consulta Crea un clúster de Connect en un proyecto diferente.

Conéctate a un clúster secundario de Kafka autoadministrado

Cuando te conectes a un clúster secundario de Kafka autoadministrado, presta atención a la red y la autenticación.

  • Herramientas de redes: Asegúrate de que la configuración de la red de VPC y las reglas de firewall sean correctas para permitir la conectividad entre la red de VPC del clúster de Connect y la red que aloja el clúster externo o autoadministrado.

  • Para los clústeres dentro de las VPC, consulta Crea y administra redes de VPC.

  • Para conectarte a entornos locales o de otras nubes, considera soluciones como Cloud VPN o Cloud Interconnect. Consulta también la guía específica para conectarte a Kafka local.

  • Autenticación y encriptación: Tu clúster de Connect debe autenticarse con el clúster externo o autoadministrado (si es necesario) y controlar cualquier encriptación TLS. Para obtener información general sobre la autenticación de Kafka, consulta la documentación de seguridad de Apache Kafka.

Usa Secret Manager para las credenciales

Los clústeres de Connect se integran directamente con Secret Manager. Almacena todos los valores de configuración sensibles, como contraseñas y el contenido de los almacenes de confianza y de claves necesarios para conectarte al clúster externo o autoadministrado, como secretos en Secret Manager.

  • Los secretos otorgados a la cuenta de servicio del clúster de Connect se activan automáticamente como archivos dentro del entorno de ejecución del conector en el directorio /var/secrets/.

  • El nombre del archivo sigue el patrón {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Debes usar el nombre del proyecto, no el número.

  • La forma en que haces referencia a un secreto depende de si la propiedad de Kafka espera la contraseña secreta o la ruta de acceso a un archivo:

    • Para las contraseñas, usa la propiedad DirectoryConfigProvider de Kafka. Especifica el valor en el formato ${directory:/var/secrets}:{SECRET_FILENAME}. Ejemplo: password=${directory:/var/secrets}:my-project-db-password-1

    • Para las rutas de acceso a archivos, especifica la ruta directa al archivo secreto activado. Ejemplo: ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Para obtener más detalles sobre cómo otorgar acceso y configurar secretos durante la creación del clúster de Connect, consulta Configura secretos de Secret Manager.

Cómo funciona un conector de fuente de MirrorMaker

Un conector de origen de MirrorMaker extrae datos de uno o más temas de Kafka en un clúster de origen y replica esos datos, junto con las ACL, en temas de un clúster de destino.

A continuación, se incluye un desglose detallado de cómo el conector de origen de MirrorMaker replica los datos:

  • El conector consume mensajes de temas de Kafka especificados dentro del clúster de origen. Especifica los temas que se replicarán con la propiedad de configuración topics, que acepta nombres de temas separados por comas o una sola expresión regular de estilo Java. Por ejemplo, topic-a,topic-b o my-prefix-.*.

  • El conector también puede omitir la replicación de temas específicos que especifiques con la propiedad topics.exclude. Las exclusiones anulan las inclusiones.

  • El conector escribe los mensajes consumidos en el clúster de destino.

  • El conector requiere los detalles de conexión del clúster de origen y destino, como source.cluster.bootstrap.servers y target.cluster.bootstrap.servers.

  • El conector también requiere alias para los clústeres de origen y destino, como se especifica en source.cluster.alias y target.cluster.alias. De forma predeterminada, los temas replicados se renombran automáticamente con el alias de la fuente. Por ejemplo, un tema llamado orders de una fuente con el alias primary se convierte en primary.orders en el destino.

  • Las ACL asociadas con los temas replicados también se sincronizan del clúster de origen al de destino. Esto se puede inhabilitar con la propiedad sync.topic.acls.enabled.

  • Si los clústeres lo requieren, se deben proporcionar los detalles de autenticación para conectarse a los clústeres de origen y destino en la configuración. Debes configurar propiedades como security.protocol, sasl.mechanism y sasl.jaas.config, con el prefijo source.cluster. para la fuente y target.cluster. para el destino.

  • El conector depende de temas internos. Es posible que debas configurar propiedades relacionadas con estos, como offset-syncs.topic.replication.factor.

  • El conector usa los convertidores de registros de Kafka key.converter, value.converter y header.converter. Para la replicación directa, estos parámetros suelen establecerse de forma predeterminada en org.apache.kafka.connect.converters.ByteArrayConverter, que no realiza ninguna conversión (transferencia directa).

  • La propiedad tasks.max controla el nivel de paralelismo del conector. Aumentar tasks.max puede mejorar la capacidad de procesamiento, pero el paralelismo efectivo suele estar limitado por la cantidad de particiones en los temas de Kafka de origen que se replican.

Propiedades de un conector de MirrorMaker 2.0

Cuando crees o actualices un conector de MirrorMaker 2.0, especifica estas propiedades:

Nombre del conector

Es el nombre o ID del conector. Si necesitas ayuda para asignarle un nombre al recurso, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre es inmutable.

Tipo de conector

El tipo de conector debe ser uno de los siguientes:

Clúster principal de Kafka

Es el clúster de Managed Service para Apache Kafka. El sistema completa automáticamente este campo.

  • Usar el clúster principal de Kafka como clúster de destino: Selecciona esta opción para mover datos de otro clúster de Kafka al clúster principal de Managed Service para Apache Kafka.

  • Usar el clúster principal de Kafka como clúster de origen: Selecciona esta opción para transferir datos del clúster principal de Managed Service para Apache Kafka a otro clúster de Kafka.

Clúster de origen o destino

Es el clúster secundario de Kafka que forma el otro extremo de la canalización.

  • Clúster de Managed Service para Apache Kafka: Selecciona un clúster en el menú desplegable.

  • Clúster de Kafka externo o autoadministrado: Ingresa la dirección de inicio en el formato hostname:port_number. Por ejemplo: kafka-test:9092.

Nombres de temas o expresiones regulares

Son los temas que se replicarán. Especifica nombres individuales (topic1, topic2) o usa una expresión regular (topic.*). Esta propiedad es obligatoria para el conector de origen de MirrorMaker 2.0. El valor predeterminado es .*.

Nombres de grupos de consumidores o expresiones regulares

Son los grupos de consumidores que se replicarán. Especifica nombres individuales (grupo1, grupo2) o usa una expresión regular (group.*). Esta propiedad es obligatoria para el conector de Checkpoint de MirrorMaker 2.0. El valor predeterminado es .*.

Configuración

En esta sección, puedes especificar propiedades de configuración adicionales y específicas del conector para el conector de MirrorMaker 2.0.

Dado que los datos de los temas de Kafka pueden estar en varios formatos, como Avro, JSON o bytes sin procesar, una parte clave de la configuración implica especificar convertidores. Los convertidores traducen los datos del formato que se usa en tus temas de Kafka al formato interno estandarizado de Kafka Connect.

Para obtener información más general sobre el rol de los convertidores en Kafka Connect, los tipos de convertidores admitidos y las opciones de configuración comunes, consulta Convertidores.

Algunos parámetros de configuración comunes para todos los conectores de MirrorMaker 2.0 incluyen los siguientes:

  • source.cluster.alias: Es el alias del clúster de origen.

  • target.cluster.alias: Es el alias del clúster de destino.

Son los parámetros de configuración que se usan para excluir recursos específicos cuando se replican datos:

  • topics.exclude: Son los temas excluidos. Admite nombres de temas y regex separados por comas. Las exclusiones tienen prioridad sobre las inclusiones. Se usa para el conector de fuente de MirrorMaker 2.0. El valor predeterminado es mm2.*.internal,.*.replica,__.*

  • groups.exclude: Excluye grupos. Admite IDs de grupo y expresiones regulares separados por comas. Las exclusiones tienen prioridad sobre las inclusiones. Se usa para el conector de punto de control de MirrorMaker 2.0. El valor predeterminado es console-consumer-.*,connect-.*,__.*

Se requieren parámetros de configuración de autenticación para los conectores de MirrorMaker 2.0.

Si el clúster de Kafka de origen o destino es un clúster de Managed Service for Apache Kafka, el clúster de Connect usa OAuthBearer para autenticarse con él. Los parámetros de configuración de autenticación están preconfigurados, por lo que no es necesario que los configures manualmente.

En el caso del clúster de Kafka autoadministrado o local, las configuraciones de autenticación dependen del mecanismo de autenticación que admite el clúster de Kafka. Un ejemplo de configuración de autenticación para un clúster de Kafka de origen se ve de la siguiente manera:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Un ejemplo de configuración de autenticación para un clúster de Kafka de destino se ve de la siguiente manera:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Las propiedades de configuración disponibles dependen del conector específico. Consulta la versión del conector de MirrorMaker 2.0 compatible para ver qué ejemplos de configuración se admiten. Consulta los siguientes documentos:

Conversión de registros de Kafka

Kafka Connect usa org.apache.kafka.connect.converters.ByteArrayConverter como el convertidor predeterminado para la clave y el valor, lo que proporciona una opción de transferencia que no realiza ninguna conversión.

Puedes configurar header.converter, key.converter y value.converter para usar otros convertidores.

Recuento de tareas

El valor de tasks.max configura la cantidad máxima de tareas que Kafka Connect usa para ejecutar conectores de MirrorMaker. Controla el nivel de paralelismo de un conector. Aumentar el recuento de tareas puede incrementar el procesamiento, pero está limitado por factores como la cantidad de particiones del tema de Kafka.

Crea un conector de fuente de MirrorMaker 2.0

Antes de crear un conector, revisa la documentación sobre las propiedades del conector.

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect en el que deseas crear el conector.

    Se mostrará la página Detalles de conexión del clúster.

  3. Haz clic en Crear conector.

    Aparecerá la página Create Kafka Connector.

  4. En Nombre del conector, ingresa una cadena.

    Para obtener más información sobre cómo asignar un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.

  5. En Complemento del conector, selecciona "Fuente de MirrorMaker 2.0".

  6. En Clúster principal de Kafka, elige una de las siguientes opciones:

    • Usar el clúster principal de Kafka como clúster de origen: Para transferir datos desde el clúster de Managed Service para Apache Kafka
    • Usar el clúster principal de Kafka como clúster de destino: Para transferir datos al clúster de Managed Service para Apache Kafka
  7. En Clúster de destino o Clúster de origen, elige una de las siguientes opciones:

    • Clúster de Managed Service para Apache Kafka: Selecciona esta opción en el menú.
    • Clúster de Kafka autoadministrado o externo: Ingresa la dirección de inicio en el formato hostname:port_number.
  8. Ingresa los nombres de temas o regex de temas separados por comas.

  9. Revisa y ajusta la configuración, incluidos los parámetros de configuración de seguridad obligatorios.

    Para obtener más información sobre la configuración y la autenticación, consulta Configuración.

  10. Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.

  11. Haz clic en Crear.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Reemplaza lo siguiente:

    A continuación, se muestra un ejemplo de un archivo de configuración para el conector de origen de MirrorMaker 2.0:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    Puedes usar un recurso de Terraform para crear un conector.

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.