Criar um conector do MirrorMaker 2.0

O MirrorMaker 2.0 é uma ferramenta que replica tópicos entre clusters do Kafka. É possível criar estes conectores do MirrorMaker 2.0:

  • MirrorMaker 2.0 Source

  • MirrorMaker 2.0 Checkpoint

  • MirrorMaker 2.0 Heartbeat

O conector de origem do MirrorMaker 2.0 é sempre necessário porque espelha os dados dos clusters de origem para os de destino. Ele também sincroniza ACLs. Os conectores de ponto de verificação e pulsação do MirrorMaker 2.0 são opcionais. Também é possível criar os conectores de ponto de verificação e heartbeat do MirrorMaker 2.0 sem criar o conector de origem.

Para mais informações sobre esses conectores, consulte Visão geral dos conectores.

Entender as funções de cluster no MirrorMaker 2.0

Ao configurar o MirrorMaker 2.0, é importante entender as diferentes funções dos clusters do Kafka:

  • Cluster principal:no contexto do Serviço Gerenciado para Apache Kafka, é o cluster do Serviço Gerenciado para Apache Kafka ao qual seu cluster do Kafka Connect está diretamente anexado. O cluster do Connect hospeda a instância do conector do MirrorMaker 2.0.

  • Cluster secundário:é o outro cluster do Kafka envolvido na replicação. Pode ser outro cluster do Serviço gerenciado para Apache Kafka ou um cluster externo. Alguns exemplos são autogerenciados no Compute Engine, no GKE, em infraestruturas locais ou em outra nuvem.

  • Cluster de origem:é o cluster do Kafka de onde o MirrorMaker 2.0 replica dados.

  • Cluster de destino:é o cluster do Kafka para o qual o MirrorMaker 2.0 replica dados.

O cluster principal pode servir como origem ou destino:

  • Se o cluster principal for a origem, o secundário será o destino. Os dados fluem do cluster principal para o secundário.

  • Se o cluster principal for o destino, o secundário será a origem. Os dados fluem do cluster secundário para o principal.

Para minimizar a latência das operações de gravação, é recomendável designar o cluster de destino como o principal e colocar o cluster do Connect na mesma região que o cluster de destino.

É necessário configurar corretamente todas as propriedades do conector. Elas também incluem propriedades de autenticação do produtor direcionadas ao cluster secundário. Para detalhes sobre possíveis problemas, consulte Melhorar a configuração do cliente do MirrorMaker 2.0.

Antes de começar

Para criar um conector do MirrorMaker 2.0, conclua estas tarefas:

  • Crie um cluster do Serviço gerenciado para Apache Kafka (primário). Esse cluster serve como um endpoint do conector do MirrorMaker 2.0.

  • Crie um cluster secundário do Kafka. Esse cluster serve como o outro endpoint. Pode ser outro cluster do Serviço Gerenciado para Apache Kafka ou um cluster do Kafka externo ou autogerenciado. É possível configurar vários clusters do Kafka como clusters secundários de um cluster do Connect.

  • Crie um cluster do Connect que hospede seu conector do MirrorMaker 2.0.

  • Verifique se os domínios DNS dos clusters secundários do Kafka estão configurados.

  • Configure regras de firewall para permitir que a interface do Private Service Connect alcance os clusters de origem e destino do Kafka.

  • Se o cluster Kafka de origem ou de destino for acessado pela Internet, configure um Cloud NAT para permitir que os workers do Connect acessem a Internet.

  • Se os clusters secundários incluírem clusters do Kafka externos ou autogerenciados, verifique se as credenciais necessárias estão configuradas como recursos secretos.

Para mais informações sobre os requisitos de rede, consulte Sub-rede de trabalho.

Papéis e permissões necessárias

Para receber as permissões necessárias para criar um conector do MirrorMaker 2.0, peça ao administrador para conceder a você o papel do IAM Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor) no seu projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um conector do MirrorMaker 2.0. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um conector do MirrorMaker 2.0:

  • Conceda a permissão para criar um conector no cluster pai do Connect: managedkafka.connectors.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre a função Editor de conector gerenciado do Kafka, consulte Funções predefinidas do Serviço Gerenciado para Apache Kafka.

Criar um conector do MirrorMaker 2.0 em um projeto diferente

Se o cluster principal do Serviço gerenciado para Apache Kafka estiver em um projeto diferente do cluster do Connect que executa o conector do MirrorMaker 2.0, consulte Criar um cluster do Connect em um projeto diferente.

Conectar-se a um cluster secundário autogerenciado do Kafka

Ao se conectar a um cluster secundário do Kafka autogerenciado, preste atenção à rede e à autenticação.

  • Rede:verifique se as configurações de rede VPC e as regras de firewall adequadas estão configuradas para permitir a conectividade entre a rede VPC do cluster do Connect e a rede que hospeda o cluster autogerenciado ou externo.

  • Para clusters em VPCs, consulte Criar e gerenciar redes VPC.

  • Para se conectar a ambientes locais ou de outras nuvens, considere soluções como o Cloud VPN ou o Cloud Interconnect. Consulte também orientações específicas para conexão com o Kafka local.

  • Autenticação e criptografia:seu cluster do Connect precisa se autenticar com o cluster autogerenciado ou externo (se necessário) e processar qualquer criptografia TLS. Para informações gerais sobre a autenticação do Kafka, consulte a documentação de segurança do Apache Kafka.

Usar o Secret Manager para credenciais

Os clusters do Connect se integram diretamente ao Secret Manager. Armazene todos os valores de configuração sensíveis, como senhas e conteúdo de truststore e keystore necessários para se conectar ao cluster autogerenciado ou externo, como secrets no Secret Manager.

  • Os secrets concedidos à conta de serviço do cluster do Connect são montados automaticamente como arquivos no ambiente de execução do conector no diretório /var/secrets/.

  • O nome de arquivo segue o padrão {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Use o nome do projeto, não o número.

  • A forma de referenciar um secret depende se a propriedade do Kafka espera a senha ou o caminho de um arquivo:

    • Para senhas, use a propriedade DirectoryConfigProvider do Kafka. Especifique o valor no formato ${directory:/var/secrets}:{SECRET_FILENAME}. Exemplo: password=${directory:/var/secrets}:my-project-db-password-1

    • Para caminhos de arquivo, especifique o caminho direto para o arquivo de secret montado. Exemplo: ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Para mais detalhes sobre como conceder acesso e configurar secrets durante a criação do cluster do Connect, consulte Configurar secrets do Secret Manager.

Como funciona um conector de origem do MirrorMaker

Um conector de origem do MirrorMaker extrai dados de um ou mais tópicos do Kafka em um cluster de origem e replica esses dados, junto com ACLs, para tópicos em um cluster de destino.

Confira um detalhamento de como o conector de origem do MirrorMaker replica dados:

  • O conector consome mensagens de tópicos especificados do Kafka no cluster de origem. Especifique os tópicos a serem replicados usando a propriedade de configuração topics, que aceita nomes de tópicos separados por vírgula ou uma única expressão regular no estilo Java. Por exemplo, topic-a,topic-b ou my-prefix-.*.

  • O conector também pode pular a replicação de tópicos específicos que você especificar usando a propriedade topics.exclude. As exclusões substituem as inclusões.

  • O conector grava as mensagens consumidas no cluster de destino.

  • O conector exige os detalhes de conexão do cluster de origem e de destino, como source.cluster.bootstrap.servers e target.cluster.bootstrap.servers.

  • O conector também exige aliases para os clusters de origem e de destino, conforme especificado por source.cluster.alias e target.cluster.alias. Por padrão, os tópicos replicados são renomeados automaticamente usando o alias de origem. Por exemplo, um tópico chamado orders de uma fonte com o alias primary se torna primary.orders no destino.

  • As ACLs associadas aos tópicos replicados também são sincronizadas do cluster de origem para o de destino. Isso pode ser desativado usando a propriedade sync.topic.acls.enabled.

  • Os detalhes de autenticação para se conectar aos clusters de origem e de destino precisam ser fornecidos na configuração, se exigido pelos clusters. É necessário configurar propriedades como security.protocol, sasl.mechanism e sasl.jaas.config, com o prefixo source.cluster. para a origem e target.cluster. para o destino.

  • O conector depende de tópicos internos. Talvez seja necessário configurar propriedades relacionadas a eles, como offset-syncs.topic.replication.factor.

  • O conector usa os conversores de registros do Kafka key.converter, value.converter e header.converter. Para replicação direta, geralmente o padrão é org.apache.kafka.connect.converters.ByteArrayConverter, que não realiza conversão (passagem direta).

  • A propriedade tasks.max controla o nível de paralelismo do conector. Aumentar tasks.max pode melhorar a capacidade, mas o paralelismo efetivo geralmente é limitado pelo número de partições nos tópicos de origem do Kafka que estão sendo replicados.

Propriedades de um conector do MirrorMaker 2.0

Ao criar ou atualizar um conector do MirrorMaker 2.0, especifique estas propriedades:

Nome do conector

O nome ou ID do conector. Para conferir as diretrizes de nomeação de recursos, consulte Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome é imutável.

Tipo de conector

O tipo de conector precisa ser um dos seguintes:

Cluster principal do Kafka

O cluster do Serviço gerenciado para Apache Kafka. O sistema preenche esse campo automaticamente.

  • Usar o cluster principal do Kafka como cluster de destino:selecione essa opção para mover dados de outro cluster do Kafka para o cluster principal do Serviço Gerenciado para Apache Kafka.

  • Usar o cluster principal do Kafka como cluster de origem:selecione essa opção para mover dados do cluster principal do Serviço Gerenciado para Apache Kafka para outro cluster do Kafka.

Cluster de destino ou de origem

O cluster secundário do Kafka que forma a outra extremidade do pipeline.

  • Cluster do Serviço gerenciado para Apache Kafka:selecione um cluster no menu suspenso.

  • Cluster do Kafka autogerenciado ou externo:insira o endereço de inicialização no formato hostname:port_number. Por exemplo: kafka-test:9092.

Nomes de tópicos ou expressões regulares

Os tópicos a serem replicados. Especifique nomes individuais (topic1, topic2) ou use uma expressão regular (topic.*). Essa propriedade é obrigatória para o conector de origem do MirrorMaker 2.0. O valor padrão é .*.

Nomes de grupos de consumidores ou expressões regulares

Os grupos de consumidores a serem replicados. Especifique nomes individuais (group1, group2) ou use uma expressão regular (group.*). Essa propriedade é obrigatória para o conector de ponto de verificação do MirrorMaker 2.0. O valor padrão é .*.

Configuração

Nesta seção, você pode especificar outras propriedades de configuração específicas do conector para o MirrorMaker 2.0.

Como os dados nos tópicos do Kafka podem estar em vários formatos, como Avro, JSON ou bytes brutos, uma parte fundamental da configuração envolve especificar conversores. Os conversores traduzem dados do formato usado nos tópicos do Kafka para o formato interno padronizado do Kafka Connect.

Para mais informações gerais sobre a função dos conversores no Kafka Connect, os tipos de conversores compatíveis e as opções de configuração comuns, consulte Conversores.

Algumas configurações comuns para todos os conectores do MirrorMaker 2.0 incluem:

  • source.cluster.alias: alias do cluster de origem.

  • target.cluster.alias: alias do cluster de destino.

Configurações usadas para excluir recursos específicos ao replicar dados:

  • topics.exclude: tópicos excluídos. Aceita nomes de tópicos e expressões regulares separados por vírgulas. As exclusões têm prioridade sobre as inclusões. Usado para o conector de origem do MirrorMaker 2.0. O valor padrão é mm2.*.internal,.*.replica,__.*.

  • groups.exclude: exclui grupos. Aceita IDs de grupo e expressões regulares separados por vírgula. As exclusões têm prioridade sobre as inclusões. Usado para o conector de ponto de verificação do MirrorMaker 2.0. O valor padrão é console-consumer-.*,connect-.*,__.*.

As configurações de autenticação são necessárias para os conectores do MirrorMaker 2.0.

Se o cluster Kafka de origem ou destino for um cluster do serviço gerenciado para Apache Kafka, o cluster Connect usará OAuthBearer para autenticar. As configurações de autenticação são pré-configuradas para que você não precise fazer isso manualmente.

Para o cluster do Kafka autogerenciado ou local, as configurações de autenticação dependem do mecanismo compatível com o cluster. Um exemplo de configuração de autenticação para uma configuração de cluster Kafka de origem é assim:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Um exemplo de configuração de autenticação para um cluster Kafka de destino é assim:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

As propriedades de configuração disponíveis dependem do conector específico. Verifique a versão do conector do MirrorMaker 2.0 compatível para saber quais exemplos de configuração são aceitos. Consulte os seguintes documentos:

Conversão de registros do Kafka

O Kafka Connect usa org.apache.kafka.connect.converters.ByteArrayConverter como o conversor padrão para chave e valor, que oferece uma opção de transferência sem conversão.

É possível configurar header.converter, key.converter e value.converter para usar outros conversores.

Contagem de tarefas

O valor tasks.max configura o número máximo de tarefas que o Kafka Connect usa para executar conectores do MirrorMaker. Ele controla o nível de paralelismo de um conector. Aumentar a contagem de tarefas pode aumentar a capacidade, mas é limitado por fatores como o número de partições de tópicos do Kafka.

Criar um conector de origem do MirrorMaker 2.0

Antes de criar um conector, consulte a documentação sobre propriedades do conector.

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique no cluster do Connect em que você quer criar o conector.

    A página Detalhes do cluster de conexão é exibida.

  3. Clique em Criar conector.

    A página Criar conector do Kafka é exibida.

  4. Em Nome do conector, insira uma string.

    Para mais informações sobre como nomear um conector, consulte Diretrizes de nomenclatura de recursos do serviço gerenciado para Apache Kafka.

  5. Em Plug-in do conector, selecione "Origem do MirrorMaker 2.0".

  6. Em Cluster principal do Kafka, escolha uma das seguintes opções:

    • Usar o cluster principal do Kafka como cluster de origem: para mover dados do cluster do Serviço Gerenciado para Apache Kafka.
    • Usar o cluster principal do Kafka como cluster de destino: para mover dados para o cluster do Serviço Gerenciado para Apache Kafka.
  7. Em Cluster de destino ou Cluster de origem, escolha uma das seguintes opções:

    • Cluster do Serviço gerenciado para Apache Kafka: selecione no menu.
    • Cluster do Kafka autogerenciado ou externo: insira o endereço de inicialização no formato hostname:port_number.
  8. Insira os Nomes de tópicos ou regex de tópicos separados por vírgulas.

  9. Revise e ajuste as Configurações, incluindo as configurações de segurança necessárias.

    Para mais informações sobre configuração e autenticação, consulte Configuração.

  10. Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.

  11. Clique em Criar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Substitua:

    • CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.

    • LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.

    • CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.

    • CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector de coletor do BigQuery.

    Confira um exemplo de arquivo de configuração para o conector de origem do MirrorMaker 2.0:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  3. Terraform

    É possível usar um recurso do Terraform para criar um conector.

    # A single MirrorMaker 2 Source Connector to replicate from one source to one target.
    resource "google_managed_kafka_connector" "default" {
      project         = data.google_project.default.project_id
      connector_id    = "mm2-source-to-target-connector-id"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
        "name"                 = "mm2-source-to-target-connector-id"
        "tasks.max"            = "3"
        "source.cluster.alias" = "source"
        "target.cluster.alias" = "target"
        "topics"               = ".*" # Replicate all topics from the source
        # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
        # for one or more Kafka brokers in the source/target cluster.
        "source.cluster.bootstrap.servers" = "source_cluster_dns"
        "target.cluster.bootstrap.servers" = "target_cluster_dns"
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
        "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
      }
    
      provider = google-beta
    }

    Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

    Go

    Antes de testar este exemplo, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

    Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
    func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "mm2-source-to-target-connector-id"
    	// sourceBootstrapServers := "source_cluster_dns"
    	// targetBootstrapServers := "target_cluster_dns"
    	// tasksMax := "3"
    	// sourceClusterAlias := "source"
    	// targetClusterAlias := "target"
    	// topics := ".*"
    	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    		"name":                 connectorID,
    		"tasks.max":            tasksMax,
    		"source.cluster.alias": sourceClusterAlias,
    		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
    		// Replicate all topics from the source
    		"topics": topics,
    		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    		// the source/target cluster.
    		// For example: "kafka-broker:9092"
    		"source.cluster.bootstrap.servers": sourceBootstrapServers,
    		"target.cluster.bootstrap.servers": targetBootstrapServers,
    		// You can define an exclusion policy for topics as follows:
    		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
    		"topics.exclude": topicsExclude,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateMirrorMaker2SourceConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String maxTasks = "3";
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-mirrormaker2-connector";
        String sourceClusterBootstrapServers = "my-source-cluster:9092";
        String targetClusterBootstrapServers = "my-target-cluster:9092";
        String sourceClusterAlias = "source";
        String targetClusterAlias = "target"; // This is usually the primary cluster.
        String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
        String topics = ".*";
        // You can define an exclusion policy for topics as follows:
        // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        String topicsExclude = "mm2.*.internal,.*.replica,__.*";
        createMirrorMaker2SourceConnector(
            projectId,
            region,
            maxTasks,
            connectClusterId,
            connectorId,
            sourceClusterBootstrapServers,
            targetClusterBootstrapServers,
            sourceClusterAlias,
            targetClusterAlias,
            connectorClass,
            topics,
            topicsExclude);
      }
    
      public static void createMirrorMaker2SourceConnector(
          String projectId,
          String region,
          String maxTasks,
          String connectClusterId,
          String connectorId,
          String sourceClusterBootstrapServers,
          String targetClusterBootstrapServers,
          String sourceClusterAlias,
          String targetClusterAlias,
          String connectorClass,
          String topics,
          String topicsExclude)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("source.cluster.alias", sourceClusterAlias);
        configMap.put("target.cluster.alias", targetClusterAlias);
        configMap.put("topics", topics);
        configMap.put("topics.exclude", topicsExclude);
        configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
        configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

    Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "source.cluster.alias": source_cluster_alias,
        "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
        # Replicate all topics from the source
        "topics": topics,
        # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
        # the source/target cluster.
        # For example: "kafka-broker:9092"
        "source.cluster.bootstrap.servers": source_bootstrap_servers,
        "target.cluster.bootstrap.servers": target_bootstrap_servers,
        # You can define an exclusion policy for topics as follows:
        # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
        "topics.exclude": topics_exclude,
    }
    
    connector = Connector()
    # The name of the connector.
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.