A tabela a seguir lista os tipos de conector do Kafka Connect que o Serviço Gerenciado para Apache Kafka aceita. É possível usar esses conectores para integrar o Apache Kafka aos seus aplicativos e a outros serviços do Google Cloud.
| Conector | Descrição | Casos de uso |
|---|---|---|
| MirrorMaker 2.0 | Replica tópicos e dados de um cluster do Kafka para outro. | Replicação de dados, recuperação de desastres, migração de dados |
| Coletor do BigQuery | Transmite dados de tópicos do Kafka para uma tabela do BigQuery. | Armazenamento em data warehouse, análise |
| Coletor do Cloud Storage | Transmite dados de tópicos do Kafka para um bucket do Cloud Storage. | Ingestão de data lake, arquivamento de dados |
| Coletor do Pub/Sub | Transmite dados de tópicos do Kafka para um tópico do Pub/Sub. | Integração de serviços, notificações em tempo real |
| Origem do Pub/Sub | Transmite mensagens de uma assinatura do Pub/Sub para um tópico do Kafka. | Ingestão de dados em tempo real, arquiteturas orientadas a eventos |
Conversões
Os conversores são responsáveis pela serialização e desserialização dos dados de registros do Kafka. Eles fazem a tradução entre o formato de byte bruto encontrado em tópicos do Kafka e a representação de dados estruturados interna usada pelo Kafka Connect.
Para conectores de coletor, os conversores desserializam dados do formato de fio do tópico para o formato de dados interno do Kafka Connect, que o conector usa para gravar no sistema de destino.
Para conectores de origem, os conversores serializam os dados do formato de dados interno do Kafka Connect para o formato de transmissão especificado do tópico do Kafka.
Os conversores garantem que o conector leia ou grave os registros do Kafka em um formato compatível com o sistema externo.
Ao configurar um conector, defina as seguintes propriedades:
Conversor de chave (
key.converter): o conversor a ser usado ao serializar e desserializar chaves de registro do Kafka.Conversor de valor (
value.converter): o conversor a ser usado ao serializar e desserializar valores de registro do Kafka.
Se você não especificar um conversor, o tipo padrão será
org.apache.kafka.connect.converters.ByteArrayConverter, que transmite
os dados no formato de byte bruto.
Conversores compatíveis
O Serviço Gerenciado para Apache Kafka oferece suporte aos seguintes conversores integrados:
| Converter | Formato |
|---|---|
io.confluent.connect.avro.AvroConverter |
Apache Avro |
org.apache.kafka.connect.converters.BooleanConverter |
Booleano |
org.apache.kafka.connect.converters.ByteArrayConverter |
Matriz de bytes O tipo de conversor padrão. Preserva o conteúdo exato das mensagens em dois sistemas. |
org.apache.kafka.connect.converters.DoubleConverter |
Duplo |
org.apache.kafka.connect.converters.FloatConverter |
Ponto flutuante |
org.apache.kafka.connect.converters.IntegerConverter |
Número inteiro |
org.apache.kafka.connect.json.JsonConverter |
JSON Para dados JSON sem um esquema, também defina
|
org.apache.kafka.connect.converters.LongConverter |
Longo |
org.apache.kafka.connect.converters.ShortConverter |
Curta |
org.apache.kafka.connect.storage.StringConverter |
String |
A escolha do conversor depende do tipo de conector e dos dados que você está armazenando no Kafka. Para mais informações, consulte a documentação do conector específico.
Tarefas
Um conector transfere dados criando uma ou mais tarefas que operam em
paralelo. Para definir um limite superior de quantas tarefas um conector cria, defina
a propriedade de configuração tasks.max do conector. O conector pode criar
menos tarefas do que esse valor.
Aumentar o valor de tasks.max pode melhorar a capacidade de processamento, mas também aumentar o consumo de recursos (CPU e memória). O valor ideal depende da carga de trabalho e dos recursos alocados aos workers do cluster do Connect. Para conectores de
gravador, o número de partições de tópicos do Kafka também pode afetar o paralelismo.
Política de reinicialização da tarefa
É possível definir uma política de reinicialização de tarefas de um conector, que determina o comportamento quando ocorre uma falha. Os conectores são compatíveis com as seguintes políticas:
Nunca reiniciar. O conector não reinicia tarefas com falha. Essa política é o comportamento padrão. Isso é útil para depuração ou em situações em que é necessária intervenção manual após um erro.
Reinicie com espera exponencial. O conector reinicia uma tarefa com falha após um atraso (chamado de período de backoff). O atraso aumenta exponencialmente com cada falha subsequente. Essa política é recomendada para a maioria das cargas de trabalho de produção.
Se você usar a política de espera exponencial, defina também valores para a espera mínima e máxima. A espera mínima precisa ser maior que 60 segundos, e a espera máxima precisa ser menor que 7.200 segundos.
Transformações e predicados
O Serviço Gerenciado para Apache Kafka é compatível com as transformações e os predicados padrão do Kafka Connect.
Com as transformações, é possível modificar mensagens individuais antes que elas sejam enviadas ao Serviço gerenciado para Apache Kafka (para conectores de origem) ou ao sistema externo (para conectores de coletor). Você pode usar uma transformação para mascarar dados sensíveis, adicionar carimbos de data/hora ou renomear campos.
Com os predicados, é possível filtrar dados com base em condições específicas, determinando a quais mensagens uma transformação se aplica com base nas propriedades da mensagem.
Por exemplo, para configurar um conector de gravador para ignorar mensagens que contêm uma chave de cabeçalho DoNotProcess, adicione a seguinte configuração:
transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess
Essa configuração faz o seguinte:
Configura um predicado chamado
hasKeydo tipoorg.apache.kafka.connect.transforms.predicates.HasHeaderKey. Esse predicado corresponde a todas as mensagens que contêm um cabeçalho com a chaveDoNotProcess.Configura uma transformação chamada
dropMessagedo tipoorg.apache.kafka.connect.transforms.Filter. Essa transformação descarta todas as mensagens que correspondem ao predicado configurado.Vincula a transformação ao predicado
hasKey. Isso garante que apenas mensagens com a chave de cabeçalhoDoNotProcesspresente sejam descartadas pela transformação.