Criar um conector do MirrorMaker 2.0

Os conectores do MirrorMaker 2.0 replicam dados de um cluster do Kafka para outro. É possível usar os conectores do MirrorMaker 2.0 para fazer recuperação de desastres entre clusters do Kafka e alcançar alta disponibilidade e tolerância a falhas nos aplicativos baseados no Kafka.

Um conector do MirrorMaker 2.0 pode estabelecer conexões entre dois clusters do Serviço Gerenciado para Apache Kafka ou entre um cluster do Serviço Gerenciado para Apache Kafka e um cluster do Kafka externo ou autogerenciado.

Os casos de uso dos conectores do MirrorMaker 2.0 incluem:

  • Migração de dados. Mova sua carga de trabalho do Kafka para um novo cluster do Serviço Gerenciado para Apache Kafka.

  • Recuperação de desastres. Crie um cluster de backup para garantir a continuidade dos negócios em caso de falhas.

  • Agregação de dados. Consolide dados de vários clusters do Kafka em um cluster central do Serviço Gerenciado para Apache Kafka para realizar análises.

Tipos de conector do MirrorMaker 2.0

O Serviço Gerenciado para Apache Kafka oferece os seguintes tipos de conectores do MirrorMaker 2.0.

Conector de origem do MirrorMaker 2.0

O conector de origem do MirrorMaker 2.0 replica tópicos e dados de um cluster do Kafka (a origem) para outro (o destino).

Use esse conector nos seguintes cenários de migração:

  • Replique ou migre dados de um cluster do Kafka externo ou autogerenciado para um cluster do Serviço Gerenciado para Apache Kafka.

  • Replique ou migre dados de um cluster do Serviço Gerenciado para Apache Kafka para um cluster externo ou autogerenciado do Kafka.

  • Replique dados do Kafka entre regiões para atender aos requisitos de recuperação de desastres e alta disponibilidade.

Para replicação básica de dados entre clusters do Kafka, use o conector de origem do MirrorMaker 2.0. Os outros conectores do MirrorMaker 2.0 oferecem funcionalidades adicionais para replicação de dados.

Conector de ponto de verificação do MirrorMaker 2.0

O conector de ponto de verificação do MirrorMaker 2.0 copia os deslocamentos de consumidores de um cluster do Kafka para outro. Os offsets do consumidor indicam a última mensagem consumida em uma partição. A replicação dos deslocamentos garante que os consumidores no cluster de destino possam retomar o processamento do mesmo ponto que o cluster de origem.

Esse conector permite os seguintes casos de uso:

  • Garantir um tempo de inatividade mínimo durante uma troca do cluster de origem para o de destino.

  • Permita um failover sem problemas fornecendo um estado consistente do consumidor em todos os clusters.

  • Preserve o progresso do consumidor ao mover dados para o cluster de destino.

Conector de sinal de funcionamento do MirrorMaker 2.0

O conector de sinal de funcionamento do MirrorMaker 2.0 gera mensagens periódicas de sinal de funcionamento em um cluster do Kafka. O conector grava essas mensagens em um tópico dedicado no cluster, normalmente chamado de heartbeats.

Depois de configurar um conector de sinal de funcionamento do MirrorMaker 2.0, use um conector de origem do MirrorMaker 2.0 para replicar o tópico heartbeats em um cluster de destino. Ao observar os heartbeats replicados, é possível implementar os seguintes casos de uso:

  • Monitore o status e a performance da replicação de dados entre os clusters.

  • Verifique a conexão e o fluxo de dados entre clusters mesmo quando nenhum outro dado está sendo produzido.

  • Configure alertas no Cloud Monitoring para notificar você se a replicação do heartbeat parar.

Usado por si só, o conector de pulsação não monitora automaticamente a replicação. É necessário replicar o tópico heartbeats e observar as mensagens de pulsação chegando ao cluster de destino.

Entender os papéis de cluster no MirrorMaker 2.0

Ao configurar o MirrorMaker 2.0, é importante entender as diferentes funções dos clusters do Kafka:

  • Cluster principal:no contexto do Serviço Gerenciado para Apache Kafka, é o cluster do Serviço Gerenciado para Apache Kafka ao qual seu cluster do Kafka Connect está diretamente conectado. O cluster do Connect hospeda a instância do conector do MirrorMaker 2.0.

  • Cluster secundário:é o outro cluster do Kafka envolvido na replicação. Pode ser outro cluster do Serviço Gerenciado para Apache Kafka ou um cluster externo. Alguns exemplos são autogerenciados no Compute Engine, no GKE, localmente ou em outra nuvem.

  • Cluster de origem:é o cluster do Kafka de onde o MirrorMaker 2.0 replica dados.

  • Cluster de destino:é o cluster do Kafka para o qual o MirrorMaker 2.0 replica dados.

O cluster principal pode servir como origem ou destino:

  • Se o cluster principal for a origem, o secundário será o destino. Os dados fluem do cluster principal para o secundário.

  • Se o cluster principal for o destino, o secundário será a origem. Os dados fluem do cluster secundário para o principal.

Para minimizar a latência das operações de gravação, é recomendável designar o cluster de destino como o principal e colocar o cluster do Connect na mesma região que o cluster de destino.

É necessário configurar corretamente todas as propriedades do conector. Elas também incluem propriedades de autenticação do produtor direcionadas ao cluster secundário. Para detalhes sobre possíveis problemas, consulte Melhorar a configuração do cliente do MirrorMaker 2.0.

Antes de começar

Para criar um conector do MirrorMaker 2.0, conclua estas tarefas:

  • Crie um cluster do Serviço Gerenciado para Apache Kafka (primário). Esse cluster serve como um endpoint do conector do MirrorMaker 2.0.

  • Crie um cluster secundário do Kafka. Esse cluster serve como o outro endpoint. Pode ser outro cluster do Serviço Gerenciado para Apache Kafka ou um cluster do Kafka externo ou autogerenciado. É possível configurar vários clusters do Kafka como secundários de um cluster do Connect.

  • Crie um cluster do Connect que hospede seu conector do MirrorMaker 2.0.

  • Verifique se os domínios DNS dos clusters secundários do Kafka estão configurados.

  • Configure regras de firewall para permitir que a interface do Private Service Connect alcance os clusters de origem e destino do Kafka.

  • Se o cluster Kafka de origem ou de destino for acessado pela Internet, configure um Cloud NAT para permitir que os workers do Connect acessem a Internet.

  • Se os clusters secundários incluírem clusters externos ou autogerenciados do Kafka, verifique se as credenciais necessárias estão configuradas como recursos secretos.

Para mais informações sobre os requisitos de rede, consulte Sub-rede de trabalho.

Papéis e permissões necessárias

Para receber as permissões necessárias para criar um conector, peça ao administrador para conceder a você o papel do IAM de Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um conector. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um conector:

  • Crie um conector: managedkafka.connectors.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Criar um conector do MirrorMaker 2.0 em um projeto diferente

Se o cluster principal do Serviço gerenciado para Apache Kafka estiver em um projeto diferente do cluster do Connect que executa o conector do MirrorMaker 2.0, consulte Criar um cluster do Connect em outro projeto.

Conectar-se a um cluster secundário autogerenciado do Kafka

Ao se conectar a um cluster secundário do Kafka autogerenciado, preste atenção à rede e à autenticação.

  • Rede:verifique se as configurações de rede VPC e as regras de firewall adequadas estão configuradas para permitir a conectividade entre a rede VPC do cluster do Connect e a rede que hospeda o cluster autogerenciado ou externo.

  • Para clusters em VPCs, consulte Criar e gerenciar redes VPC.

  • Para se conectar a ambientes locais ou de outras nuvens, considere soluções como o Cloud VPN ou o Cloud Interconnect. Consulte também orientações específicas para conectar-se ao Kafka local.

  • Autenticação e criptografia:seu cluster do Connect precisa se autenticar com o cluster autogerenciado ou externo (se necessário) e processar qualquer criptografia TLS. Para informações gerais sobre a autenticação do Kafka, consulte a documentação de segurança do Apache Kafka.

Usar o Secret Manager para credenciais

Os clusters do Connect se integram diretamente ao Secret Manager. Armazene todos os valores de configuração sensíveis, como senhas e conteúdo de truststore e keystore necessários para se conectar ao cluster autogerenciado ou externo, como secrets no Secret Manager.

  • Os secrets concedidos à conta de serviço do cluster do Connect são montados automaticamente como arquivos no ambiente de execução do conector no diretório /var/secrets/.

  • O nome do arquivo segue o padrão {PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Use o nome do projeto, não o número.

  • A forma de referenciar um secret depende de a propriedade do Kafka esperar a senha do secret ou o caminho para um arquivo:

    • Para senhas, use a propriedade DirectoryConfigProvider do Kafka. Especifique o valor no formato ${directory:/var/secrets}:{SECRET_FILENAME}. Exemplo: password=${directory:/var/secrets}:my-project-db-password-1

    • Para caminhos de arquivo, especifique o caminho direto para o arquivo de secret montado. Exemplo: ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3

Para mais detalhes sobre como conceder acesso e configurar secrets durante a criação do cluster do Connect, consulte Configurar secrets do Secret Manager.

Como funciona um conector de origem do MirrorMaker

Um conector de origem do MirrorMaker extrai dados de um ou mais tópicos do Kafka em um cluster de origem e replica esses dados, junto com ACLs, para tópicos em um cluster de destino.

Confira um detalhamento de como o conector de origem do MirrorMaker replica dados:

  • O conector consome mensagens de tópicos especificados do Kafka no cluster de origem. Especifique os tópicos a serem replicados usando a propriedade de configuração topics, que aceita nomes de tópicos separados por vírgula ou uma única expressão regular no estilo Java. Por exemplo, topic-a,topic-b ou my-prefix-.*.

  • O conector também pode pular a replicação de tópicos específicos que você especificar usando a propriedade topics.exclude. As exclusões substituem as inclusões.

  • O conector grava as mensagens consumidas no cluster de destino.

  • O conector exige os detalhes de conexão do cluster de origem e de destino, como source.cluster.bootstrap.servers e target.cluster.bootstrap.servers.

  • O conector também exige aliases para os clusters de origem e de destino, conforme especificado por source.cluster.alias e target.cluster.alias. Por padrão, os tópicos replicados são renomeados automaticamente usando o alias de origem. Por exemplo, um tópico chamado orders de uma fonte com o alias primary se torna primary.orders no destino.

  • As ACLs associadas aos tópicos replicados também são sincronizadas do cluster de origem para o de destino. Isso pode ser desativado usando a propriedade sync.topic.acls.enabled.

  • Os detalhes de autenticação para se conectar aos clusters de origem e de destino precisam ser fornecidos na configuração, se exigido pelos clusters. É necessário configurar propriedades como security.protocol, sasl.mechanism e sasl.jaas.config, com o prefixo source.cluster. para a origem e target.cluster. para o destino.

  • O conector depende de tópicos internos. Talvez seja necessário configurar propriedades relacionadas a eles, como offset-syncs.topic.replication.factor.

  • O conector usa conversores de registros do Kafka key.converter, value.converter e header.converter. Para replicação direta, geralmente o padrão é org.apache.kafka.connect.converters.ByteArrayConverter, que não realiza conversão (passagem direta).

  • A propriedade tasks.max controla o nível de paralelismo do conector. Aumentar tasks.max pode melhorar a capacidade de processamento, mas o paralelismo efetivo geralmente é limitado pelo número de partições nos tópicos de origem do Kafka que estão sendo replicados.

Propriedades de um conector do MirrorMaker 2.0

Ao criar ou atualizar um conector do MirrorMaker 2.0, especifique estas propriedades:

Nome do conector

O nome ou ID do conector. Para conferir as diretrizes de nomeação de recursos, consulte Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome é imutável.

Tipo de conector

O tipo de conector precisa ser um dos seguintes:

Cluster principal do Kafka

O cluster do Serviço gerenciado para Apache Kafka. O sistema preenche esse campo automaticamente.

  • Usar o cluster principal do Kafka como cluster de destino:selecione essa opção para mover dados de outro cluster do Kafka para o cluster principal do Serviço Gerenciado para Apache Kafka.

  • Usar o cluster principal do Kafka como cluster de origem:selecione essa opção para mover dados do cluster principal do Serviço Gerenciado para Apache Kafka para outro cluster do Kafka.

Cluster de destino ou de origem

O cluster secundário do Kafka que forma a outra extremidade do pipeline.

  • Cluster do Serviço Gerenciado para Apache Kafka:selecione um cluster no menu suspenso.

  • Cluster do Kafka autogerenciado ou externo:insira o endereço de inicialização no formato hostname:port_number. Por exemplo: kafka-test:9092.

Nomes de tópicos ou expressões regulares

Os tópicos a serem replicados. Especifique nomes individuais (topic1, topic2) ou use uma expressão regular (topic.*). Essa propriedade é obrigatória para o conector de origem do MirrorMaker 2.0. O valor padrão é .*.

Nomes de grupos de consumidores ou expressões regulares

Os grupos de consumidores a serem replicados. Especifique nomes individuais (group1, group2) ou use uma expressão regular (group.*). Essa propriedade é obrigatória para o conector de ponto de verificação do MirrorMaker 2.0. O valor padrão é .*.

Configuração

Nesta seção, você pode especificar outras propriedades de configuração específicas do conector para o MirrorMaker 2.0.

Como os dados nos tópicos do Kafka podem estar em vários formatos, como Avro, JSON ou bytes brutos, uma parte fundamental da configuração envolve especificar conversores. Os conversores traduzem dados do formato usado nos tópicos do Kafka para o formato interno padronizado do Kafka Connect.

Para mais informações gerais sobre a função dos conversores no Kafka Connect, os tipos de conversores compatíveis e as opções de configuração comuns, consulte Conversores.

Algumas configurações comuns para todos os conectores do MirrorMaker 2.0 incluem:

  • source.cluster.alias: alias do cluster de origem.

  • target.cluster.alias: alias do cluster de destino.

Configurações usadas para excluir recursos específicos ao replicar dados:

  • topics.exclude: tópicos excluídos. Aceita nomes de tópicos e expressões regulares separados por vírgulas. As exclusões têm precedência sobre as inclusões. Usado para o conector de origem do MirrorMaker 2.0. O valor padrão é mm2.*.internal,.*.replica,__.*.

  • groups.exclude: exclui grupos. Aceita IDs de grupo e expressões regulares separados por vírgula. As exclusões têm precedência sobre as inclusões. Usado para o conector de ponto de verificação do MirrorMaker 2.0. O valor padrão é console-consumer-.*,connect-.*,__.*.

As configurações de autenticação são necessárias para os conectores do MirrorMaker 2.0.

Se o cluster Kafka de origem ou destino for um cluster do Serviço Gerenciado para Apache Kafka, o cluster do Connect usará OAuthBearer para autenticar. As configurações de autenticação são pré-configuradas para que você não precise fazer isso manualmente.

Para o cluster do Kafka autogerenciado ou local, as configurações de autenticação dependem do mecanismo compatível com o cluster. Um exemplo de configuração de autenticação para uma configuração de cluster Kafka de origem é assim:

source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

Um exemplo de configuração de autenticação para um cluster Kafka de destino é assim:

target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

As propriedades de configuração disponíveis dependem do conector específico. Verifique a versão do conector do MirrorMaker 2.0 compatível para saber quais exemplos de configuração são aceitos. Consulte os seguintes documentos:

Conversão de registros do Kafka

O Kafka Connect usa org.apache.kafka.connect.converters.ByteArrayConverter como o conversor padrão para chave e valor, que oferece uma opção de passagem que não faz conversão.

É possível configurar header.converter, key.converter e value.converter para usar outros conversores.

Contagem de tarefas

O valor tasks.max configura o número máximo de tarefas que o Kafka Connect usa para executar conectores do MirrorMaker. Ele controla o nível de paralelismo de um conector. Aumentar a contagem de tarefas pode aumentar a capacidade, mas é limitado por fatores como o número de partições de tópicos do Kafka.

Criar um conector de origem do MirrorMaker 2.0

Antes de criar um conector, consulte a documentação sobre propriedades do conector.

Console

  1. No console do Google Cloud , acesse a página Conectar clusters.

    Acessar o Connect Clusters

  2. Clique no cluster do Connect em que você quer criar o conector.

    A página Detalhes do cluster de conexão é exibida.

  3. Clique em Criar conector.

    A página Criar conector do Kafka é exibida.

  4. Em Nome do conector, insira uma string.

    Para mais informações sobre como nomear um conector, consulte Diretrizes de nomenclatura de um recurso do serviço gerenciado para Apache Kafka.

  5. Em Plug-in do conector, selecione "Origem do MirrorMaker 2.0".

  6. Em Cluster principal do Kafka, escolha uma das seguintes opções:

    • Usar o cluster principal do Kafka como cluster de origem: para mover dados do cluster do Serviço Gerenciado para Apache Kafka.
    • Usar o cluster principal do Kafka como cluster de destino: para mover dados para o cluster do Serviço gerenciado para Apache Kafka.
  7. Em Cluster de destino ou Cluster de origem, escolha uma das seguintes opções:

    • Cluster do Serviço Gerenciado para Apache Kafka: selecione no menu.
    • Cluster do Kafka autogerenciado ou externo: insira o endereço de inicialização no formato hostname:port_number.
  8. Insira os Nomes de tópicos ou regex de tópicos separados por vírgulas.

  9. Revise e ajuste as Configurações, incluindo as configurações de segurança necessárias.

    Para mais informações sobre configuração e autenticação, consulte Configuração.

  10. Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.

  11. Clique em Criar.

gcloud

  1. No console do Google Cloud , ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do console Google Cloud , uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a CLI do Google Cloud já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Substitua:

    • CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.

    • LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.

    • CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.

    • CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector.

    Confira um exemplo de arquivo de configuração para o conector de origem do MirrorMaker 2.0:

    connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    name: "MM2_CONNECTOR_ID"
    source.cluster.alias: "source"
    target.cluster.alias: "target"
    topics: "GMK_TOPIC_NAME"
    source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS"
    target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS"
    offset-syncs.topic.replication.factor: "1"
    source.cluster.security.protocol: "SASL_SSL"
    source.cluster.sasl.mechanism: "OAUTHBEARER"
    source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    target.cluster.security.protocol: "SASL_SSL"
    target.cluster.sasl.mechanism: "OAUTHBEARER"
    target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
    target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

Terraform

É possível usar um recurso do Terraform para criar um conector.

# A single MirrorMaker 2 Source Connector to replicate from one source to one target.
resource "google_managed_kafka_connector" "default" {
  project         = data.google_project.default.project_id
  connector_id    = "mm2-source-to-target-connector-id"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"      = "org.apache.kafka.connect.mirror.MirrorSourceConnector"
    "name"                 = "mm2-source-to-target-connector-id"
    "tasks.max"            = "3"
    "source.cluster.alias" = "source"
    "target.cluster.alias" = "target"
    "topics"               = ".*" # Replicate all topics from the source
    # The value for bootstrap.servers is a comma-separated list of hostname:port pairs
    # for one or more Kafka brokers in the source/target cluster.
    "source.cluster.bootstrap.servers" = "source_cluster_dns"
    "target.cluster.bootstrap.servers" = "target_cluster_dns"
    # You can define an exclusion policy for topics as follows:
    # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics,.
    "topics.exclude" = "mm2.*\\.internal,.*\\.replica,__.*"
  }

  provider = google-beta
}

Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createMirrorMaker2SourceConnector creates a MirrorMaker 2.0 Source connector.
func createMirrorMaker2SourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, sourceBootstrapServers, targetBootstrapServers, tasksMax, sourceClusterAlias, targetClusterAlias, topics, topicsExclude string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "mm2-source-to-target-connector-id"
	// sourceBootstrapServers := "source_cluster_dns"
	// targetBootstrapServers := "target_cluster_dns"
	// tasksMax := "3"
	// sourceClusterAlias := "source"
	// targetClusterAlias := "target"
	// topics := ".*"
	// topicsExclude := "mm2.*.internal,.*.replica,__.*"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	config := map[string]string{
		"connector.class":      "org.apache.kafka.connect.mirror.MirrorSourceConnector",
		"name":                 connectorID,
		"tasks.max":            tasksMax,
		"source.cluster.alias": sourceClusterAlias,
		"target.cluster.alias": targetClusterAlias, // This is usually the primary cluster.
		// Replicate all topics from the source
		"topics": topics,
		// The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
		// the source/target cluster.
		// For example: "kafka-broker:9092"
		"source.cluster.bootstrap.servers": sourceBootstrapServers,
		"target.cluster.bootstrap.servers": targetBootstrapServers,
		// You can define an exclusion policy for topics as follows:
		// To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
		// topicsExclude := "mm2.*.internal,.*.replica,__.*"
		"topics.exclude": topicsExclude,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created MirrorMaker 2.0 Source connector: %s\n", resp.Name)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateMirrorMaker2SourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String maxTasks = "3";
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-mirrormaker2-connector";
    String sourceClusterBootstrapServers = "my-source-cluster:9092";
    String targetClusterBootstrapServers = "my-target-cluster:9092";
    String sourceClusterAlias = "source";
    String targetClusterAlias = "target"; // This is usually the primary cluster.
    String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
    String topics = ".*";
    // You can define an exclusion policy for topics as follows:
    // To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    String topicsExclude = "mm2.*.internal,.*.replica,__.*";
    createMirrorMaker2SourceConnector(
        projectId,
        region,
        maxTasks,
        connectClusterId,
        connectorId,
        sourceClusterBootstrapServers,
        targetClusterBootstrapServers,
        sourceClusterAlias,
        targetClusterAlias,
        connectorClass,
        topics,
        topicsExclude);
  }

  public static void createMirrorMaker2SourceConnector(
      String projectId,
      String region,
      String maxTasks,
      String connectClusterId,
      String connectorId,
      String sourceClusterBootstrapServers,
      String targetClusterBootstrapServers,
      String sourceClusterAlias,
      String targetClusterAlias,
      String connectorClass,
      String topics,
      String topicsExclude)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("tasks.max", maxTasks);
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("source.cluster.alias", sourceClusterAlias);
    configMap.put("target.cluster.alias", targetClusterAlias);
    configMap.put("topics", topics);
    configMap.put("topics.exclude", topicsExclude);
    configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
    configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.

Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "source.cluster.alias": source_cluster_alias,
    "target.cluster.alias": target_cluster_alias,  # This is usually the primary cluster.
    # Replicate all topics from the source
    "topics": topics,
    # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
    # the source/target cluster.
    # For example: "kafka-broker:9092"
    "source.cluster.bootstrap.servers": source_bootstrap_servers,
    "target.cluster.bootstrap.servers": target_bootstrap_servers,
    # You can define an exclusion policy for topics as follows:
    # To exclude internal MirrorMaker 2 topics, internal topics and replicated topics.
    "topics.exclude": topics_exclude,
}

connector = Connector()
# The name of the connector.
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.