Os conectores de origem do Pub/Sub transmitem mensagens do Pub/Sub para o Kafka. Isso permite integrar o Pub/Sub aos seus aplicativos e pipelines de dados baseados no Kafka.
O conector lê mensagens de uma assinatura do Pub/Sub, converte cada mensagem em um registro do Kafka e grava os registros em um tópico do Kafka. Por padrão, o conector cria registros do Kafka da seguinte forma:
- A chave de registro do Kafka é
null. - O valor do registro do Kafka são os dados da mensagem do Pub/Sub como bytes.
- Os cabeçalhos de registro do Kafka estão vazios.
No entanto, é possível configurar esse comportamento. Para mais informações, consulte Configurar o conector.
Antes de começar
Antes de criar um conector de origem do Pub/Sub, verifique se você tem o seguinte:
Um tópico do Pub/Sub com uma assinatura.
Um tópico do Kafka no cluster do Kafka.
Um cluster do Connect. Ao criar o cluster do Connect, defina o cluster do Managed Service para Apache Kafka como o cluster principal do Kafka.
Papéis e permissões necessárias
Para receber as permissões necessárias para criar um conector de origem do Pub/Sub,
peça ao administrador para conceder a você o
papel do IAM Editor do conector gerenciado do Kafka (roles/managedkafka.connectorEditor)
no projeto que contém o cluster do Connect.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esse papel predefinido contém as permissões necessárias para criar um conector de origem do Pub/Sub. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para criar um conector de origem do Pub/Sub:
-
Conceda a permissão para criar um conector no cluster pai do Connect:
managedkafka.connectors.create
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre a função de editor do conector gerenciado do Kafka, consulte Funções predefinidas do Serviço Gerenciado para Apache Kafka.
Se o cluster do Serviço gerenciado para Apache Kafka estiver no mesmo projeto que o cluster do Connect, não serão necessárias outras permissões. Se o cluster do Connect estiver em um projeto diferente, consulte Criar um cluster do Connect em um projeto diferente.
Conceder permissões de leitura do Pub/Sub
A conta de serviço do Managed Kafka precisa ter permissão para ler mensagens da assinatura do Pub/Sub. Conceda os seguintes papéis do IAM à conta de serviço no projeto que contém a assinatura do Pub/Sub:
- Assinante do Pub/Sub (
roles/pubsub.subscriber) - Leitor do Pub/Sub (
roles/pubsub.viewer)
A conta de serviço do Kafka gerenciado tem o seguinte formato:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com.
Substitua PROJECT_NUMBER pelo número do projeto.
Criar um conector de origem do Pub/Sub
Console
No console do Google Cloud , acesse a página Conectar clusters.
Clique no cluster do Connect em que você quer criar o conector.
Clique em Criar conector.
Para o nome do conector, insira uma string.
Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka.
Em Plug-in do conector, selecione Origem do Pub/Sub.
Na lista Assinatura do Cloud Pub/Sub, selecione uma assinatura do Pub/Sub. O conector extrai mensagens dessa assinatura. A assinatura é mostrada como um nome de recurso completo:
projects/{project}/subscriptions/{subscription}.Na lista Tópico do Kafka, selecione o tópico em que as mensagens são gravadas.
Opcional: na caixa Configurações, adicione propriedades de configuração ou edite as propriedades padrão. Para mais informações, consulte Configurar o conector.
Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.
Clique em Criar.
gcloud
Execute o comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESubstitua:
CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
LOCATION: o local do cluster do Connect.
CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.
CONFIG_FILE: o caminho para um arquivo de configuração YAML ou JSON.
Confira um exemplo de arquivo de configuração:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Substitua:
PROJECT_ID: o ID do projeto do Google Cloud em que a assinatura do Pub/Sub está localizada.
PUBSUB_SUBSCRIPTION_ID: o ID da assinatura do Pub/Sub de onde os dados serão extraídos.
KAFKA_TOPIC_ID: o ID do tópico do Kafka em que os dados são gravados.
As propriedades de configuração cps.project, cps.subscription e kafka.topic são obrigatórias. Para outras opções de configuração, consulte
Configurar o conector.
Terraform
É possível usar um recurso do Terraform para criar um conector.
Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.
Go
Antes de testar este exemplo, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.
Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Java
Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Python
Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Depois de criar um conector, é possível editar, excluir, pausar, interromper ou reiniciar.
Configurar o conector
Nesta seção, descrevemos algumas propriedades de configuração que podem ser definidas no conector.
Para uma lista completa das propriedades específicas desse conector, consulte Configurações do conector de origem do Pub/Sub.
Modo pull
O modo de extração especifica como o conector recupera mensagens do Pub/Sub. Estes são os modos compatíveis:
Modo pull (padrão). As mensagens são extraídas em lotes. Para ativar esse modo, defina
cps.streamingPull.enabled=false.. Para configurar o tamanho do lote, defina a propriedadecps.maxBatchSize.Para mais informações sobre o modo de extração, consulte API Pull.
Modo de pull de streaming. Permite a capacidade máxima e a menor latência ao recuperar mensagens do Pub/Sub. Para ativar esse modo, defina
cps.streamingPull.enabled=true.Para mais informações sobre o modo de extração por streaming, consulte API StreamingPull.
Se o streaming pull estiver ativado, você poderá ajustar o desempenho definindo as seguintes propriedades de configuração:
cps.streamingPull.flowControlBytes: o número máximo de bytes de mensagem pendentes por tarefa.cps.streamingPull.flowControlMessages: o número máximo de mensagens pendentes por tarefa.cps.streamingPull.maxAckExtensionMs: o período máximo em que o conector estende o prazo de inscrição, em milissegundos.cps.streamingPull.maxMsPerAckExtension: o período máximo em que o conector estende o prazo de inscrição por extensão, em milissegundos.cps.streamingPull.parallelStreams: o número de streams para extrair mensagens da assinatura.
Endpoint do Pub/Sub
Por padrão, o conector usa o endpoint global do Pub/Sub. Para especificar um endpoint, defina a propriedade cps.endpoint como o endereço do endpoint.
Para mais informações sobre endpoints, consulte
Endpoints do Pub/Sub.
Registros do Kafka
O conector de origem do Pub/Sub converte mensagens do Pub/Sub em registros do Kafka. As seções a seguir descrevem o processo de conversão.
Chave de registro
O conversor de chaves precisa ser org.apache.kafka.connect.storage.StringConverter.
Por padrão, as chaves de registro são
null.Para usar um atributo de mensagem do Pub/Sub como chave, defina
kafka.key.attributecomo o nome do atributo. Por exemplo,kafka.key.attribute=username.Para usar a chave de ordenação do Pub/Sub como chave, defina
kafka.key.attribute=orderingKey.
Gravar cabeçalhos
Por padrão, os cabeçalhos de registro ficam vazios.
Se kafka.record.headers for true, os atributos da mensagem do Pub/Sub serão gravados como cabeçalhos de registro. Para incluir a chave de ordenação, defina
cps.makeOrderingKeyAttribute=true.
Valor do registro
Se kafka.record.headers for true ou se a mensagem do Pub/Sub não tiver atributos personalizados, o valor do registro será os dados da mensagem, como uma matriz de bytes.
Defina o conversor de valores como
org.apache.kafka.connect.converters.ByteArrayConverter.
Caso contrário, se kafka.record.headers for false e a mensagem tiver pelo menos um
atributo personalizado, o conector vai gravar o valor do registro como um struct. Defina o conversor de valores como org.apache.kafka.connect.json.JsonConverter.
O struct contém os seguintes campos:
message: os dados da mensagem do Pub/Sub, em bytes.Um campo para cada atributo de mensagem do Pub/Sub. Para incluir a chave de ordenação, defina
cps.makeOrderingKeyAttribute=true.
Por exemplo, supondo que a mensagem tenha um atributo username:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Se value.converter.schemas.enable for true, o struct vai incluir a carga útil e o esquema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Partições do Kafka
Por padrão, o conector grava em uma única partição no tópico. Para especificar
em quantas partições o conector grava, defina a propriedade kafka.partition.count. O valor não pode exceder a contagem de partições do tópico.
Para especificar como o conector atribui mensagens a partições, defina a propriedade kafka.partition.scheme. Para mais informações, consulte
Configurações do conector de origem do Pub/Sub.