Os conectores de coletor do Pub/Sub transmitem mensagens de tópicos do Kafka para tópicos do Pub/Sub. Isso permite integrar seus aplicativos baseados no Kafka ao Pub/Sub, facilitando arquiteturas orientadas a eventos e o processamento de dados em tempo real.
Antes de começar
Antes de criar um conector de coletor do Pub/Sub, verifique se você tem o seguinte:
Crie um cluster do Serviço gerenciado para Apache Kafka para seu cluster do Connect. Esse é o cluster principal do Kafka associado ao cluster do Connect. Esse também é o cluster de origem que forma uma extremidade do pipeline do conector.
Crie um cluster do Connect para hospedar o conector de coletor do Pub/Sub.
Crie e configure um tópico do Kafka no cluster de origem. Os dados são movidos desse tópico do Kafka para o tópico de destino do Pub/Sub.
Papéis e permissões necessárias
Para receber as permissões necessárias para criar um conector de gravador do Pub/Sub, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto que contém o cluster do Connect:
-
Editor de conector Kafka gerenciado (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Publicador do Pub/Sub (
roles/pubsub.publisher)
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esses papéis predefinidos contêm as permissões necessárias para criar um conector de gravador do Pub/Sub. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para criar um conector de coletor do Pub/Sub:
-
Conceda a permissão para criar um conector no cluster pai do Connect:
managedkafka.connectors.create
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre o papel Editor do conector gerenciado do Kafka, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.
Se o cluster do Serviço gerenciado para Apache Kafka estiver no mesmo projeto que o cluster do Connect, não serão necessárias outras permissões. Se o cluster do Connect estiver em um projeto diferente, consulte Criar um cluster do Connect em um projeto diferente.
Conceder permissões para publicar no tópico do Pub/Sub
A conta de serviço do cluster do Connect, que segue o formato
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
precisa da permissão para publicar mensagens no
tópico do Pub/Sub. Para isso, conceda o
papel de editor do Pub/Sub (roles/pubsub.publisher)
à conta de serviço do cluster do Connect no projeto que contém o
tópico do Pub/Sub.
Como funciona um conector de coletor do Pub/Sub
Um conector de coletor do Pub/Sub extrai mensagens de um ou mais tópicos do Kafka e as publica em um tópico do Pub/Sub.
Confira um detalhamento de como o conector de coletor do Pub/Sub copia dados:
O conector consome mensagens de um ou mais tópicos do Kafka no cluster de origem.
O conector grava mensagens no ID do tópico de destino do Pub/Sub especificado usando a propriedade de configuração
cps.topic. Essa é uma propriedade obrigatória.O conector também exige que o projeto Google Cloud que contém o tópico do Pub/Sub seja especificado usando a propriedade de configuração
cps.project. Essa é uma propriedade obrigatória.O conector também pode usar um endpoint personalizado do Pub/Sub especificado com a propriedade
cps.endpoint. O endpoint padrão é"pubsub.googleapis.com:443".Para otimizar a performance, o conector armazena mensagens em buffer antes de publicá-las no Pub/Sub. É possível configurar
maxBufferSize,maxBufferBytes,maxDelayThresholdMs,maxOutstandingRequestBytesemaxOutstandingMessagespara controlar o buffer.Um registro do Kafka tem três componentes: cabeçalhos, chaves e valores. O conector usa conversores de chave e valor para transformar os dados de mensagens do Kafka no formato esperado pelo Pub/Sub. Ao usar esquemas de valor de struct ou mapa, a propriedade
messageBodyNameespecifica o campo ou a chave a ser usada como o corpo da mensagem do Pub/Sub.O conector pode incluir o tópico, a partição, o deslocamento e o carimbo de data/hora do Kafka como atributos de mensagem usando a propriedade
metadata.publishdefinida comotrue.O conector pode incluir cabeçalhos de mensagens do Kafka como atributos de mensagens do Pub/Sub usando a propriedade
headers.publishdefinida comotrue.O conector pode incluir uma chave de ordenação para mensagens do Pub/Sub usando a propriedade
orderingKeySource. As opções de valor incluem"none"(padrão),"key"e"partition".A propriedade
tasks.maxcontrola o nível de paralelismo do conector. Aumentartasks.maxpode melhorar a capacidade de transferência, mas o paralelismo real é limitado pelo número de partições nos tópicos do Kafka.
Propriedades de um conector de coletor do Pub/Sub
Ao criar um conector de coletor do Pub/Sub, você precisa especificar as seguintes propriedades.
Nome do conector
Um nome exclusivo para o conector no cluster do Connect. Para conferir as diretrizes de nomeação de recursos, acesse Diretrizes de nomeação de recursos do serviço gerenciado para Apache Kafka.
Tipo de plug-in do conector
Selecione Coletor do Pub/Sub como o tipo de plug-in do conector. Isso determina a direção do fluxo de dados, que é do Kafka para o Pub/Sub, e a implementação específica do conector usada. Se você não usar a interface do usuário para configurar o conector, também precisará especificar a classe dele.
Tópicos do Kafka
Os tópicos do Kafka de que o conector consome mensagens.
É possível especificar um ou mais temas ou usar uma expressão regular para corresponder a vários temas. Por exemplo, topic.* para corresponder a todos os tópicos que começam
com "tópico". Esses tópicos precisam existir no cluster do Serviço gerenciado para Apache Kafka associado ao seu cluster do Connect.
Tópico do Pub/Sub
O tópico do Pub/Sub em que o conector
publica mensagens. Verifique se a conta de serviço do cluster do Connect tem
o papel roles/pubsub.publisher no projeto do tópico, conforme
descrito em Antes de começar.
Configuração
Nesta seção, você pode especificar outras propriedades de configuração específicas do conector.
Como os dados nos tópicos do Kafka podem estar em vários formatos, como Avro, JSON ou bytes brutos, uma parte fundamental da configuração envolve especificar conversores. Os conversores traduzem dados do formato usado nos tópicos do Kafka para o formato interno padronizado do Kafka Connect. Em seguida, o conector de coletor do Pub/Sub usa esses dados internos e os transforma no formato exigido pelo Pub/Sub antes de gravar.
Para mais informações gerais sobre a função dos conversores no Kafka Connect, tipos de conversores compatíveis e opções de configuração comuns, consulte conversores.
Confira algumas configurações específicas do conector de coletor do Pub/Sub:
cps.project: especifica o ID do projeto Google Cloud que contém o tópico do Pub/Sub.cps.topic: especifica o tópico do Pub/Sub em que os dados são publicados.cps.endpoint: especifica o endpoint do Pub/Sub a ser usado.
Para uma lista das propriedades de configuração disponíveis específicas desse conector, consulte as configurações do conector de gravador do Pub/Sub.
Criar um conector de coletor do Pub/Sub
Antes de criar um conector, revise a documentação sobre Propriedades de um conector de coletor do Pub/Sub.
Console
No console do Google Cloud , acesse a página Conectar clusters.
Clique no cluster do Connect para o qual você quer criar o conector.
A página Detalhes do cluster de conexão é exibida.
Clique em Criar conector.
A página Criar conector do Kafka é exibida.
Para o nome do conector, insira uma string.
Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka.
Em Plug-in do conector, selecione Coletor do Pub/Sub.
Em Tópicos, escolha Selecionar uma lista de tópicos do Kafka ou Usar uma regex de tópico. Em seguida, selecione ou insira os tópicos do Kafka de que esse conector consome mensagens. Esses tópicos estão no cluster do Kafka associado.
Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub em que esse conector publica mensagens. O assunto é mostrado no formato de nome de recurso completo:
projects/{project}/topics/{topic}.(Opcional) Configure outras opções na seção Configurações. É aqui que você especifica propriedades como
tasks.max,key.converterevalue.converter, conforme discutido na seção anterior.Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.
Clique em Criar.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Execute o comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESubstitua:
CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.
CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector foi criado.
CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector de coletor do BigQuery.
Confira um exemplo de arquivo de configuração para o conector de gravador do Pub/Sub:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"Substitua:
CPS_SINK_CONNECTOR_ID: o ID ou nome do conector de coletor do Pub/Sub. Para conferir as diretrizes de nomeação de um conector, acesse Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
GMK_TOPIC_ID: o ID do tópico do Serviço Gerenciado para Apache Kafka de que os dados são lidos pelo conector de coletor do Pub/Sub.
CPS_TOPIC_ID: o ID do tópico do Pub/Sub em que os dados são publicados.
GCP_PROJECT_ID: o ID do projeto Google Cloud em que o tópico do Pub/Sub está localizado.
Terraform
É possível usar um recurso do Terraform para criar um conector.
Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.
Go
Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.
Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Java
Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Python
Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Depois de criar um conector, é possível editar, excluir, pausar, interromper ou reiniciar.