Este documento descreve como ler dados do Apache Kafka para o Dataflow e inclui dicas de desempenho e práticas recomendadas.
Para a maioria dos exemplos de utilização, considere usar o conetor de E/S gerido para ler a partir do Kafka.
Se precisar de uma otimização do desempenho mais avançada, considere usar o conector KafkaIO. O conetor KafkaIO está disponível para
Java
ou através da utilização da
estrutura de pipelines multilingues
para Python
e Go.
Paralelismo
As secções seguintes descrevem como configurar o paralelismo ao ler a partir do Kafka.
Vista geral
O paralelismo é limitado por dois fatores: o
número máximo de trabalhadores
(max_num_workers) e o número de partições do Kafka. O Dataflow usa por predefinição uma ramificação de paralelismo de 4 x max_num_workers. No entanto, a distribuição é limitada pelo número de partições. Por exemplo, se estiverem disponíveis 100 vCPUs, mas o pipeline apenas ler a partir de 10 partições do Kafka, o paralelismo máximo é 10.
Para maximizar o paralelismo, recomendamos que tenha, pelo menos, 4 partições do Kafka.max_num_workers Se o seu trabalho usar o Runner v2, considere definir um paralelismo ainda mais elevado.
Um bom ponto de partida é ter partições iguais ao dobro do número de vCPUs de trabalho.
Redistribuir
Se não conseguir aumentar o número de partições, pode aumentar o paralelismo
chamando KafkaIO.Read.withRedistribute. Este método adiciona uma transformação Redistribute ao pipeline, que fornece uma sugestão ao Dataflow para redistribuir e paralelizar os dados de forma mais eficiente. Recomendamos vivamente que especifique o número ideal de fragmentos chamando KafkaIO.Read.withRedistributeNumKeys. A utilização apenas de KafkaIO.Read.withRedistribute pode gerar inúmeras chaves, o que leva a problemas de desempenho. Para mais informações, consulte o artigo
Identifique fases com elevado paralelismo.
A redistribuição dos dados adiciona alguma sobrecarga adicional para executar o passo de aleatorização. Para mais informações, consulte o artigo
Impedir a união.
Para minimizar o custo da reorganização da redistribuição, chame KafkaIO.Read.withOffsetDeduplication. Este modo
minimiza a quantidade de dados que têm de ser mantidos como parte da mistura aleatória,
ao mesmo tempo que oferece um processamento exatamente uma vez.
Se o processamento exatamente uma vez não for necessário, pode permitir duplicados chamando KafkaIO.Read.withAllowDuplicates.
A tabela seguinte resume as opções de redistribuição:
| Opção | Modo de processamento | Apache Beam | Configuração |
|---|---|---|---|
| Redistribua a entrada | Exatamente uma vez | v2.60 ou superior | KafkaIO.Read.withRedistribute() |
| Permitir duplicados | At-least-once | v2.60 ou superior | KafkaIO.Read.withRedistribute().withAllowDuplicates() |
| Remoção de duplicados de desvio | Exatamente uma vez | v2.69 ou superior | KafkaIO.Read.withRedistribute().withOffsetDeduplication() |
Desvio de carga
Tente garantir que a carga entre as partições é relativamente uniforme e não distorcida. Se a carga estiver desequilibrada, pode levar a uma má utilização dos trabalhadores. Os trabalhadores que leem a partir de partições com uma carga mais leve podem estar relativamente inativos, enquanto os trabalhadores que leem a partir de partições com uma carga pesada podem ficar para trás. O Dataflow fornece métricas para o trabalho em atraso por partição.
Se a carga estiver desequilibrada, o equilíbrio dinâmico do trabalho pode ajudar a distribuir o trabalho. Por exemplo, o Dataflow pode atribuir um trabalhador para ler a partir de várias partições de baixo volume e atribuir outro trabalhador para ler a partir de uma única partição de alto volume. No entanto, dois trabalhadores não podem ler a partir da mesma partição, pelo que uma partição com muita carga pode continuar a fazer com que o pipeline fique atrasado.
Práticas recomendadas
Esta secção contém recomendações para a leitura de dados do Kafka para o Dataflow.
Tópicos com volume baixo
Um cenário comum é ler a partir de muitos tópicos de baixo volume ao mesmo tempo, por exemplo, um tópico por cliente. A criação de tarefas do Dataflow separadas para cada tópico é ineficiente em termos de custos, porque cada tarefa requer, pelo menos, um trabalhador completo. Em alternativa, considere as seguintes opções:
Unir tópicos. Combine tópicos antes de serem carregados no Dataflow. A incorporação de alguns tópicos de volume elevado é muito mais eficiente do que a incorporação de muitos tópicos de volume baixo. Cada tópico de grande volume pode ser processado por uma única tarefa do Dataflow que utiliza totalmente os respetivos trabalhadores.
Ler vários tópicos. Se não conseguir combinar tópicos antes de os carregar para o Dataflow, considere criar um pipeline que leia a partir de vários tópicos. Esta abordagem permite que o Dataflow atribua vários tópicos ao mesmo trabalhador. Existem duas formas de implementar esta abordagem:
Passo de leitura único. Crie uma única instância do conetor
KafkaIOe configure-a para ler vários tópicos. Em seguida, filtre por nome do tópico para aplicar uma lógica diferente por tópico. Para ver um exemplo de código, consulte o artigo Leia a partir de vários tópicos. Considere esta opção se todos os seus tópicos estiverem localizados no mesmo cluster. Uma desvantagem é que os problemas com uma única origem ou transformação podem fazer com que todos os tópicos acumulem atrasos.Para exemplos de utilização mais avançados, transmita um conjunto de
KafkaSourceDescriptorobjetos que especifiquem os tópicos a partir dos quais ler. A utilização deKafkaSourceDescriptorpermite-lhe atualizar a lista de tópicos mais tarde, se necessário. Esta funcionalidade requer o Java com o Runner v2.Vários passos de leitura. Para ler a partir de tópicos localizados em diferentes clusters, o seu pipeline pode incluir várias instâncias
KafkaIO. Enquanto a tarefa está em execução, pode atualizar origens individuais através de mapeamentos de transformação. A definição de um novo tópico ou cluster só é suportada quando usa o Runner v2. A observabilidade é um potencial desafio com esta abordagem, porque tem de monitorizar cada transformação de leitura individual em vez de se basear em métricas ao nível do pipeline.
Confirmação de volta para o Kafka
Por predefinição, o conetor KafkaIO não usa deslocamentos do Kafka para acompanhar o progresso e não confirma novamente no Kafka. Se chamar
commitOffsetsInFinalize, o conetor faz o melhor esforço para confirmar novamente no Kafka depois de os registos serem confirmados no
Dataflow. Os registos comprometidos no Dataflow podem não ser totalmente processados. Por isso, se cancelar o pipeline, pode ser comprometido um desvio sem que os registos sejam totalmente processados.
Uma vez que a definição enable.auto.commit=True confirma os desvios assim que são lidos do Kafka sem processamento pelo Dataflow, não é recomendável usar esta opção.
A recomendação é definir enable.auto.commit=False e commitOffsetsInFinalize=True. Se definir
enable.auto.commit como True, os dados podem ser perdidos se o pipeline for interrompido
durante o tratamento. Os registos já confirmados no Kafka podem ser ignorados.
Marcas de água
Por predefinição, o conetor KafkaIO usa o tempo de processamento atual para atribuir
a marca de água de saída
e a data/hora do evento. Para alterar este comportamento, chame
withTimestampPolicyFactory e atribua um
TimestampPolicy. O Beam fornece implementações de TimestampPolicy que calculam a marca de água com base na hora de anexação do registo do Kafka ou na hora de criação da mensagem.
Considerações sobre o corredor
O conetor KafkaIO tem duas implementações subjacentes para leituras do Kafka, o ReadFromKafkaViaUnbounded mais antigo e o ReadFromKafkaViaSDF mais recente. O Dataflow escolhe automaticamente a melhor implementação para a sua tarefa com base no idioma do SDK e nos requisitos da tarefa. Evite pedir explicitamente uma implementação do executor ou do Kafka, a menos que precise de funcionalidades específicas disponíveis apenas nessa implementação. Para mais informações sobre como escolher um executor, consulte o artigo
Use o Dataflow Runner v2.
Se o seu pipeline usar withTopic ou withTopics,
a implementação mais antiga consulta o Kafka no momento da construção do pipeline para as
partições disponíveis. A máquina que cria o pipeline tem de ter autorização
para se ligar ao Kafka. Se receber um erro de autorização, verifique se tem autorizações para se ligar ao Kafka localmente. Pode evitar este problema usando withTopicPartitions, que não se liga ao Kafka no momento da criação do pipeline.
Implemente na produção
Quando implementa a sua solução em produção, recomendamos que use modelos flexíveis. Ao usar um modelo flexível, o pipeline é iniciado a partir de um ambiente consistente, o que pode ajudar a mitigar problemas de configuração local.
O registo de KafkaIO pode ser bastante detalhado. Considere reduzir o nível de registo na produção da seguinte forma:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Para mais informações, consulte o artigo Defina os níveis de registo do trabalhador do pipeline.
Configure a rede
Por predefinição, o Dataflow inicia instâncias na sua rede da nuvem virtual privada (VPC) predefinida. Consoante a configuração do Kafka, pode ter de configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte o artigo Especifique uma rede e uma sub-rede. Quando configurar a sua rede, crie regras de firewall que permitam que as máquinas de trabalho do Dataflow alcancem os agentes Kafka.
Se estiver a usar os VPC Service Controls, coloque o cluster Kafka dentro do perímetro dos VPC Service Controls ou, caso contrário, estenda os perímetros à VPN ou ao Cloud Interconnect autorizados.
Se o cluster do Kafka estiver implementado fora do Google Cloud, tem de criar uma ligação de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes compromissos:
- Estabeleça ligação através de um espaço de endereço RFC 1918 partilhado, utilizando uma das seguintes opções:
- Alcance o seu cluster Kafka alojado externamente através de endereços IP públicos, usando uma das seguintes opções:
- Internet pública
- Intercâmbio direto
- Intercâmbio por operadora
O Dedicated Interconnect é a melhor opção para um desempenho e uma fiabilidade previsíveis, mas a configuração pode demorar mais tempo porque terceiros têm de aprovisionar os novos circuitos. Com uma topologia baseada em IP público, pode começar rapidamente porque não é necessário fazer muito trabalho de rede.
As duas secções seguintes descrevem estas opções mais detalhadamente.
Espaço de endereço RFC 1918 partilhado
Tanto a interligação dedicada como a VPN IPsec dão-lhe acesso direto a endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a sua configuração do Kafka. Se estiver a usar uma topologia baseada em VPN, considere configurar uma VPN de débito elevado.
Por predefinição, o Dataflow inicia instâncias na sua rede VPC predefinida. Numa topologia de rede privada com
rotas definidas explicitamente no Cloud Router
que ligam sub-redes no Google Cloud a esse cluster do Kafka, precisa de
mais controlo sobre a localização das suas instâncias do Dataflow. Pode usar o Dataflow para configurar os network e os subnetwork
parâmetros de execução.
Certifique-se de que a sub-rede correspondente tem endereços IP suficientes disponíveis para o Dataflow iniciar instâncias quando tenta aumentar a escala. Além disso, quando criar uma rede separada para lançar as suas instâncias do Dataflow, certifique-se de que tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede predefinida já tem esta regra de firewall configurada.
Espaço de endereços IP públicos
Esta arquitetura usa o Transport Layer Security (TLS) para proteger o tráfego entre clientes externos e o Kafka, e usa tráfego não encriptado para a comunicação entre agentes. Quando o ouvinte do Kafka se associa a uma interface de rede usada para comunicação interna e externa, a configuração do ouvinte é simples. No entanto, em muitos cenários, os endereços anunciados externamente
dos agentes Kafka no cluster diferem das interfaces de rede internas
que o Kafka usa. Nesses cenários, pode usar a propriedade advertised.listeners:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Os clientes externos estabelecem ligação através da porta 9093 através de um canal "SSL" e os clientes internos estabelecem ligação através da porta 9092 através de um canal de texto simples. Quando especificar um endereço em advertised.listeners, use nomes DNS (kafkabroker-n.mydomain.com, neste exemplo) que sejam resolvidos para a mesma instância para tráfego externo e interno. A utilização de endereços IP públicos pode não funcionar, uma vez que os endereços podem não ser resolvidos para tráfego interno.
Sintonize o Kafka
As definições do cluster Kafka e do cliente Kafka podem ter um grande impacto no desempenho. Em particular, as seguintes definições podem ser demasiado baixas. Esta secção apresenta alguns pontos de partida sugeridos, mas deve experimentar estes valores para a sua carga de trabalho específica.
unboundedReaderMaxElements. A predefinição é 10 000. Um valor mais elevado, como 100 000, pode aumentar o tamanho dos pacotes, o que pode melhorar significativamente o desempenho se o seu pipeline incluir agregações. No entanto, um valor mais elevado também pode aumentar a latência. Para definir o valor, usesetUnboundedReaderMaxElements. Esta definição não se aplica ao Runner v2. Para o Runner v2, use a opção do serviço Dataflow.sdf_checkpoint_after_output_bytesunboundedReaderMaxReadTimeMs. A predefinição é 10 000 ms. Um valor mais elevado, como 20 000 ms, pode aumentar o tamanho do pacote, enquanto um valor mais baixo, como 5000 ms, pode reduzir a latência ou o atraso. Para definir o valor, usesetUnboundedReaderMaxReadTimeMs. Esta definição não se aplica ao Runner v2. Para o Runner v2, use a opção do serviço Dataflow.sdf_checkpoint_after_durationmax.poll.records. A predefinição é 500. Um valor mais elevado pode ter um melhor desempenho ao obter mais registos recebidos em conjunto, especialmente quando usa o Runner v2. Para definir o valor, chamewithConsumerConfigUpdates.fetch.max.bytes. A predefinição é 1 MB. Um valor mais elevado pode melhorar o débito reduzindo o número de pedidos, especialmente quando usa o Runner v2. No entanto, se o definir como demasiado elevado, pode aumentar a latência, embora o processamento a jusante seja mais provável de ser o principal obstáculo. Um valor inicial recomendado é de 100 MB. Para definir o valor, chamewithConsumerConfigUpdates.max.partition.fetch.bytes. A predefinição é 1 MB. Este parâmetro define a quantidade máxima de dados por partição que o servidor devolve. Aumentar o valor pode melhorar o débito reduzindo o número de pedidos, especialmente quando usa o Runner v2. No entanto, se o definir demasiado alto, pode aumentar a latência, embora o processamento a jusante seja mais provável de ser o principal gargalo. Um valor inicial recomendado é de 100 MB. Para definir o valor, chamewithConsumerConfigUpdates.consumerPollingTimeout. A predefinição é 2 segundos. Se o cliente consumidor expirar o tempo limite antes de poder ler quaisquer registos, experimente definir um valor mais elevado. Esta definição é mais relevante quando realiza leituras entre regiões ou leituras com uma rede lenta. Para definir o valor, chamewithConsumerPollingTimeout.
Certifique-se de que receive.buffer.bytes é suficientemente grande para processar o tamanho das mensagens. Se o valor for demasiado pequeno, os registos podem mostrar que os consumidores estão a ser recriados continuamente e a procurar um deslocamento específico.
Exemplos
Os exemplos de código seguintes mostram como criar pipelines do Dataflow que leem a partir do Kafka. Quando usar as Credenciais predefinidas da aplicação em conjunto com o processador de callback fornecido pelo Google Cloud Managed Service para Apache Kafka, é necessária a versão 3.7.0 ou superior do kafka-clients.
Leia a partir de um único tópico
Este exemplo usa o conetor de E/S gerido. Mostra como ler a partir de um tópico do Kafka e escrever os payloads das mensagens em ficheiros de texto.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Python
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Leia a partir de vários tópicos
Este exemplo usa o conetor KafkaIO. Mostra como ler a partir de vários tópicos do Kafka e aplicar uma lógica de pipeline separada para cada tópico.
Para exemplos de utilização mais avançados, transmita dinamicamente um conjunto de objetos KafkaSourceDescriptor, para que possa atualizar a lista de tópicos a partir dos quais ler. Esta abordagem requer o Java com o Runner v2.
Java
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
Python
Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.
O que se segue?
- Escrever no Apache Kafka.
- Saiba mais sobre a E/S gerida.