O MirrorMaker 2.0 é uma ferramenta que replica tópicos entre clusters do Kafka. É possível criar estes conectores do MirrorMaker 2.0:
MirrorMaker 2.0 Source
MirrorMaker 2.0 Checkpoint
MirrorMaker 2.0 Heartbeat
O conector de origem do MirrorMaker 2.0 é sempre necessário porque espelha os dados dos clusters de origem para os de destino. Ele também sincroniza ACLs. Os conectores de ponto de verificação e pulsação do MirrorMaker 2.0 são opcionais. Também é possível criar os conectores de ponto de verificação e heartbeat do MirrorMaker 2.0 sem criar o conector de origem.
Para mais informações sobre esses conectores, consulte Visão geral dos conectores.
Entender as funções de cluster no MirrorMaker 2.0
Ao configurar o MirrorMaker 2.0, é importante entender as diferentes funções dos clusters do Kafka:
Cluster principal:no contexto do Serviço Gerenciado para Apache Kafka, é o cluster do Serviço Gerenciado para Apache Kafka ao qual seu cluster do Kafka Connect está diretamente anexado. O cluster do Connect hospeda a instância do conector do MirrorMaker 2.0.
Cluster secundário:é o outro cluster do Kafka envolvido na replicação. Pode ser outro cluster do Serviço gerenciado para Apache Kafka ou um cluster externo. Alguns exemplos são autogerenciados no Compute Engine, no GKE, em infraestruturas locais ou em outra nuvem.
Cluster de origem:é o cluster do Kafka de onde o MirrorMaker 2.0 replica dados.
Cluster de destino:é o cluster do Kafka para o qual o MirrorMaker 2.0 replica dados.
O cluster principal pode servir como origem ou destino:
Se o cluster principal for a origem, o secundário será o destino. Os dados fluem do cluster principal para o secundário.
Se o cluster principal for o destino, o secundário será a origem. Os dados fluem do cluster secundário para o principal.
Para minimizar a latência das operações de gravação, é recomendável designar o cluster de destino como o principal e colocar o cluster do Connect na mesma região que o cluster de destino.
É necessário configurar corretamente todas as propriedades do conector. Elas também incluem propriedades de autenticação do produtor direcionadas ao cluster secundário. Para detalhes sobre possíveis problemas, consulte Melhorar a configuração do cliente do MirrorMaker 2.0.
Antes de começar
Para criar um conector do MirrorMaker 2.0, conclua estas tarefas:
Crie um cluster do Serviço gerenciado para Apache Kafka (primário). Esse cluster serve como um endpoint do conector do MirrorMaker 2.0.
Crie um cluster secundário do Kafka. Esse cluster serve como o outro endpoint. Pode ser outro cluster do Serviço Gerenciado para Apache Kafka ou um cluster do Kafka externo ou autogerenciado. É possível configurar vários clusters do Kafka como clusters secundários de um cluster do Connect.
Crie um cluster do Connect que hospede seu conector do MirrorMaker 2.0.
Verifique se os domínios DNS dos clusters secundários do Kafka estão configurados.
Configure regras de firewall para permitir que a interface do Private Service Connect alcance os clusters de origem e destino do Kafka.
Se o cluster Kafka de origem ou de destino for acessado pela Internet, configure um Cloud NAT para permitir que os workers do Connect acessem a Internet.
Se os clusters secundários incluírem clusters do Kafka externos ou autogerenciados, verifique se as credenciais necessárias estão configuradas como recursos secretos.
Para mais informações sobre os requisitos de rede, consulte Sub-rede de trabalho.
Papéis e permissões necessárias
Para receber as permissões necessárias para criar um conector do MirrorMaker 2.0,
peça ao administrador para conceder a você o papel do IAM
Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor)
no seu projeto.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esse papel predefinido contém as permissões necessárias para criar um conector do MirrorMaker 2.0. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para criar um conector do MirrorMaker 2.0:
-
Conceda a permissão para criar um conector no cluster pai do Connect:
managedkafka.connectors.create
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre a função Editor de conector gerenciado do Kafka, consulte Funções predefinidas do Serviço Gerenciado para Apache Kafka.
Criar um conector do MirrorMaker 2.0 em um projeto diferente
Se o cluster principal do Serviço gerenciado para Apache Kafka estiver em um projeto diferente do cluster do Connect que executa o conector do MirrorMaker 2.0, consulte Criar um cluster do Connect em um projeto diferente.
Conectar-se a um cluster secundário autogerenciado do Kafka
Ao se conectar a um cluster secundário do Kafka autogerenciado, preste atenção à rede e à autenticação.
Rede:verifique se as configurações de rede VPC e as regras de firewall adequadas estão configuradas para permitir a conectividade entre a rede VPC do cluster do Connect e a rede que hospeda o cluster autogerenciado ou externo.
Para clusters em VPCs, consulte Criar e gerenciar redes VPC.
Para se conectar a ambientes locais ou de outras nuvens, considere soluções como o Cloud VPN ou o Cloud Interconnect. Consulte também orientações específicas para conexão com o Kafka local.
Autenticação e criptografia:seu cluster do Connect precisa se autenticar com o cluster autogerenciado ou externo (se necessário) e processar qualquer criptografia TLS. Para informações gerais sobre a autenticação do Kafka, consulte a documentação de segurança do Apache Kafka.
Usar o Secret Manager para credenciais
Os clusters do Connect se integram diretamente ao Secret Manager. Armazene todos os valores de configuração sensíveis, como senhas e conteúdo de truststore e keystore necessários para se conectar ao cluster autogerenciado ou externo, como secrets no Secret Manager.
Os secrets concedidos à conta de serviço do cluster do Connect são montados automaticamente como arquivos no ambiente de execução do conector no diretório
/var/secrets/.O nome de arquivo segue o padrão
{PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Use o nome do projeto, não o número.A forma de referenciar um secret depende se a propriedade do Kafka espera a senha ou o caminho de um arquivo:
Para senhas, use a propriedade
DirectoryConfigProviderdo Kafka. Especifique o valor no formato${directory:/var/secrets}:{SECRET_FILENAME}. Exemplo:password=${directory:/var/secrets}:my-project-db-password-1Para caminhos de arquivo, especifique o caminho direto para o arquivo de secret montado. Exemplo:
ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3
Para mais detalhes sobre como conceder acesso e configurar secrets durante a criação do cluster do Connect, consulte Configurar secrets do Secret Manager.
Como funciona um conector de origem do MirrorMaker
Um conector de origem do MirrorMaker extrai dados de um ou mais tópicos do Kafka em um cluster de origem e replica esses dados, junto com ACLs, para tópicos em um cluster de destino.
Confira um detalhamento de como o conector de origem do MirrorMaker replica dados:
O conector consome mensagens de tópicos especificados do Kafka no cluster de origem. Especifique os tópicos a serem replicados usando a propriedade de configuração
topics, que aceita nomes de tópicos separados por vírgula ou uma única expressão regular no estilo Java. Por exemplo,topic-a,topic-boumy-prefix-.*.O conector também pode pular a replicação de tópicos específicos que você especificar usando a propriedade
topics.exclude. As exclusões substituem as inclusões.O conector grava as mensagens consumidas no cluster de destino.
O conector exige os detalhes de conexão do cluster de origem e de destino, como
source.cluster.bootstrap.serversetarget.cluster.bootstrap.servers.O conector também exige aliases para os clusters de origem e de destino, conforme especificado por
source.cluster.aliasetarget.cluster.alias. Por padrão, os tópicos replicados são renomeados automaticamente usando o alias de origem. Por exemplo, um tópico chamadoordersde uma fonte com o aliasprimaryse tornaprimary.ordersno destino.As ACLs associadas aos tópicos replicados também são sincronizadas do cluster de origem para o de destino. Isso pode ser desativado usando a propriedade
sync.topic.acls.enabled.Os detalhes de autenticação para se conectar aos clusters de origem e de destino precisam ser fornecidos na configuração, se exigido pelos clusters. É necessário configurar propriedades como
security.protocol,sasl.mechanismesasl.jaas.config, com o prefixosource.cluster.para a origem etarget.cluster.para o destino.O conector depende de tópicos internos. Talvez seja necessário configurar propriedades relacionadas a eles, como
offset-syncs.topic.replication.factor.O conector usa os conversores de registros do Kafka
key.converter,value.convertereheader.converter. Para replicação direta, geralmente o padrão éorg.apache.kafka.connect.converters.ByteArrayConverter, que não realiza conversão (passagem direta).A propriedade
tasks.maxcontrola o nível de paralelismo do conector. Aumentartasks.maxpode melhorar a capacidade, mas o paralelismo efetivo geralmente é limitado pelo número de partições nos tópicos de origem do Kafka que estão sendo replicados.
Propriedades de um conector do MirrorMaker 2.0
Ao criar ou atualizar um conector do MirrorMaker 2.0, especifique estas propriedades:
Nome do conector
O nome ou ID do conector. Para conferir as diretrizes de nomeação de recursos, consulte Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome é imutável.
Tipo de conector
O tipo de conector precisa ser um dos seguintes:
Cluster principal do Kafka
O cluster do Serviço gerenciado para Apache Kafka. O sistema preenche esse campo automaticamente.
Usar o cluster principal do Kafka como cluster de destino:selecione essa opção para mover dados de outro cluster do Kafka para o cluster principal do Serviço Gerenciado para Apache Kafka.
Usar o cluster principal do Kafka como cluster de origem:selecione essa opção para mover dados do cluster principal do Serviço Gerenciado para Apache Kafka para outro cluster do Kafka.
Cluster de destino ou de origem
O cluster secundário do Kafka que forma a outra extremidade do pipeline.
Cluster do Serviço gerenciado para Apache Kafka:selecione um cluster no menu suspenso.
Cluster do Kafka autogerenciado ou externo:insira o endereço de inicialização no formato
hostname:port_number. Por exemplo:kafka-test:9092.
Nomes de tópicos ou expressões regulares
Os tópicos a serem replicados. Especifique nomes individuais (topic1, topic2) ou use uma expressão regular (topic.*). Essa propriedade é obrigatória para o conector de origem do MirrorMaker 2.0. O valor padrão é .*.
Nomes de grupos de consumidores ou expressões regulares
Os grupos de consumidores a serem replicados. Especifique nomes individuais (group1, group2) ou use uma expressão regular (group.*). Essa propriedade é obrigatória para o conector de ponto de verificação do MirrorMaker 2.0. O valor padrão é .*.
Configuração
Nesta seção, você pode especificar outras propriedades de configuração específicas do conector para o MirrorMaker 2.0.
Como os dados nos tópicos do Kafka podem estar em vários formatos, como Avro, JSON ou bytes brutos, uma parte fundamental da configuração envolve especificar conversores. Os conversores traduzem dados do formato usado nos tópicos do Kafka para o formato interno padronizado do Kafka Connect.
Para mais informações gerais sobre a função dos conversores no Kafka Connect, os tipos de conversores compatíveis e as opções de configuração comuns, consulte Conversores.
Algumas configurações comuns para todos os conectores do MirrorMaker 2.0 incluem:
source.cluster.alias: alias do cluster de origem.target.cluster.alias: alias do cluster de destino.
Configurações usadas para excluir recursos específicos ao replicar dados:
topics.exclude: tópicos excluídos. Aceita nomes de tópicos e expressões regulares separados por vírgulas. As exclusões têm prioridade sobre as inclusões. Usado para o conector de origem do MirrorMaker 2.0. O valor padrão émm2.*.internal,.*.replica,__.*.groups.exclude: exclui grupos. Aceita IDs de grupo e expressões regulares separados por vírgula. As exclusões têm prioridade sobre as inclusões. Usado para o conector de ponto de verificação do MirrorMaker 2.0. O valor padrão éconsole-consumer-.*,connect-.*,__.*.
As configurações de autenticação são necessárias para os conectores do MirrorMaker 2.0.
Se o cluster Kafka de origem ou destino for um cluster do serviço gerenciado para Apache Kafka, o cluster Connect usará OAuthBearer para autenticar. As configurações de autenticação são pré-configuradas para que você não precise fazer isso manualmente.
Para o cluster do Kafka autogerenciado ou local, as configurações de autenticação dependem do mecanismo compatível com o cluster. Um exemplo de configuração de autenticação para uma configuração de cluster Kafka de origem é assim:
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Um exemplo de configuração de autenticação para um cluster Kafka de destino é assim:
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
As propriedades de configuração disponíveis dependem do conector específico. Verifique a versão do conector do MirrorMaker 2.0 compatível para saber quais exemplos de configuração são aceitos. Consulte os seguintes documentos:
Propriedades de configuração do MirrorMaker 2.0 comuns a todos os conectores do MirrorMaker 2.0
Propriedades de configuração específicas da origem do MirrorMaker 2.0
Propriedades de configuração específicas do ponto de verificação do MirrorMaker 2.0
Propriedades de configuração específicas do MirrorMaker 2.0 Heartbeat
Conversão de registros do Kafka
O Kafka Connect usa org.apache.kafka.connect.converters.ByteArrayConverter
como o conversor padrão para chave e valor, que oferece uma opção
de transferência sem conversão.
É possível configurar header.converter, key.converter e value.converter para usar outros conversores.
Contagem de tarefas
O valor tasks.max configura o número máximo de tarefas que o Kafka Connect usa para executar
conectores do MirrorMaker. Ele controla o nível de paralelismo de um conector.
Aumentar a contagem de tarefas pode aumentar a capacidade, mas é limitado por fatores como o número de partições de tópicos do Kafka.
Criar um conector de origem do MirrorMaker 2.0
Antes de criar um conector, consulte a documentação sobre propriedades do conector.
Console
No console do Google Cloud , acesse a página Conectar clusters.
Clique no cluster do Connect em que você quer criar o conector.
A página Detalhes do cluster de conexão é exibida.
Clique em Criar conector.
A página Criar conector do Kafka é exibida.
Em Nome do conector, insira uma string.
Para mais informações sobre como nomear um conector, consulte Diretrizes de nomenclatura de recursos do serviço gerenciado para Apache Kafka.
Em Plug-in do conector, selecione "Origem do MirrorMaker 2.0".
Em Cluster principal do Kafka, escolha uma das seguintes opções:
- Usar o cluster principal do Kafka como cluster de origem: para mover dados do cluster do Serviço Gerenciado para Apache Kafka.
- Usar o cluster principal do Kafka como cluster de destino: para mover dados para o cluster do Serviço Gerenciado para Apache Kafka.
Em Cluster de destino ou Cluster de origem, escolha uma das seguintes opções:
- Cluster do Serviço gerenciado para Apache Kafka: selecione no menu.
- Cluster do Kafka autogerenciado ou externo: insira o endereço de inicialização no formato
hostname:port_number.
Insira os Nomes de tópicos ou regex de tópicos separados por vírgulas.
Revise e ajuste as Configurações, incluindo as configurações de segurança necessárias.
Para mais informações sobre configuração e autenticação, consulte Configuração.
Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.
Clique em Criar.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Execute o comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESubstitua:
CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.
CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.
CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector de coletor do BigQuery.
Confira um exemplo de arquivo de configuração para o conector de origem do MirrorMaker 2.0:
connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector" name: "MM2_CONNECTOR_ID" source.cluster.alias: "source" target.cluster.alias: "target" topics: "GMK_TOPIC_NAME" source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS" target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS" offset-syncs.topic.replication.factor: "1" source.cluster.security.protocol: "SASL_SSL" source.cluster.sasl.mechanism: "OAUTHBEARER" source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; target.cluster.security.protocol: "SASL_SSL" target.cluster.sasl.mechanism: "OAUTHBEARER" target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Terraform
É possível usar um recurso do Terraform para criar um conector.
Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.
Go
Antes de testar este exemplo, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.
Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Java
Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Python
Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.