Leia do Apache Kafka para o Dataflow

Este documento descreve como ler dados do Apache Kafka para o Dataflow e inclui dicas de desempenho e práticas recomendadas.

Para a maioria dos exemplos de utilização, considere usar o conetor de E/S gerido para ler a partir do Kafka.

Se precisar de uma otimização do desempenho mais avançada, considere usar o conector KafkaIO. O conetor KafkaIO está disponível para Java ou através da utilização da estrutura de pipelines multilingues para Python e Go.

Paralelismo

As secções seguintes descrevem como configurar o paralelismo ao ler a partir do Kafka.

Vista geral

O paralelismo é limitado por dois fatores: o número máximo de trabalhadores (max_num_workers) e o número de partições do Kafka. O Dataflow usa por predefinição uma ramificação de paralelismo de 4 x max_num_workers. No entanto, a distribuição é limitada pelo número de partições. Por exemplo, se estiverem disponíveis 100 vCPUs, mas o pipeline apenas ler a partir de 10 partições do Kafka, o paralelismo máximo é 10.

Para maximizar o paralelismo, recomendamos que tenha, pelo menos, 4 partições do Kafka.max_num_workers Se o seu trabalho usar o Runner v2, considere definir um paralelismo ainda mais elevado. Um bom ponto de partida é ter partições iguais ao dobro do número de vCPUs de trabalho.

Redistribuir

Se não conseguir aumentar o número de partições, pode aumentar o paralelismo chamando KafkaIO.Read.withRedistribute. Este método adiciona uma transformação Redistribute ao pipeline, que fornece uma sugestão ao Dataflow para redistribuir e paralelizar os dados de forma mais eficiente. Recomendamos vivamente que especifique o número ideal de fragmentos chamando KafkaIO.Read.withRedistributeNumKeys. A utilização apenas de KafkaIO.Read.withRedistribute pode gerar inúmeras chaves, o que leva a problemas de desempenho. Para mais informações, consulte o artigo Identifique fases com elevado paralelismo. A redistribuição dos dados adiciona alguma sobrecarga adicional para executar o passo de aleatorização. Para mais informações, consulte o artigo Impedir a união.

Para minimizar o custo da reorganização da redistribuição, chame KafkaIO.Read.withOffsetDeduplication. Este modo minimiza a quantidade de dados que têm de ser mantidos como parte da mistura aleatória, ao mesmo tempo que oferece um processamento exatamente uma vez.

Se o processamento exatamente uma vez não for necessário, pode permitir duplicados chamando KafkaIO.Read.withAllowDuplicates.

A tabela seguinte resume as opções de redistribuição:

Opção Modo de processamento Apache Beam Configuração
Redistribua a entrada Exatamente uma vez v2.60 ou superior KafkaIO.Read.withRedistribute()
Permitir duplicados At-least-once v2.60 ou superior KafkaIO.Read.withRedistribute().withAllowDuplicates()
Remoção de duplicados de desvio Exatamente uma vez v2.69 ou superior KafkaIO.Read.withRedistribute().withOffsetDeduplication()

Desvio de carga

Tente garantir que a carga entre as partições é relativamente uniforme e não distorcida. Se a carga estiver desequilibrada, pode levar a uma má utilização dos trabalhadores. Os trabalhadores que leem a partir de partições com uma carga mais leve podem estar relativamente inativos, enquanto os trabalhadores que leem a partir de partições com uma carga pesada podem ficar para trás. O Dataflow fornece métricas para o trabalho em atraso por partição.

Se a carga estiver desequilibrada, o equilíbrio dinâmico do trabalho pode ajudar a distribuir o trabalho. Por exemplo, o Dataflow pode atribuir um trabalhador para ler a partir de várias partições de baixo volume e atribuir outro trabalhador para ler a partir de uma única partição de alto volume. No entanto, dois trabalhadores não podem ler a partir da mesma partição, pelo que uma partição com muita carga pode continuar a fazer com que o pipeline fique atrasado.

Práticas recomendadas

Esta secção contém recomendações para a leitura de dados do Kafka para o Dataflow.

Tópicos com volume baixo

Um cenário comum é ler a partir de muitos tópicos de baixo volume ao mesmo tempo, por exemplo, um tópico por cliente. A criação de tarefas do Dataflow separadas para cada tópico é ineficiente em termos de custos, porque cada tarefa requer, pelo menos, um trabalhador completo. Em alternativa, considere as seguintes opções:

  • Unir tópicos. Combine tópicos antes de serem carregados no Dataflow. A incorporação de alguns tópicos de volume elevado é muito mais eficiente do que a incorporação de muitos tópicos de volume baixo. Cada tópico de grande volume pode ser processado por uma única tarefa do Dataflow que utiliza totalmente os respetivos trabalhadores.

  • Ler vários tópicos. Se não conseguir combinar tópicos antes de os carregar para o Dataflow, considere criar um pipeline que leia a partir de vários tópicos. Esta abordagem permite que o Dataflow atribua vários tópicos ao mesmo trabalhador. Existem duas formas de implementar esta abordagem:

    • Passo de leitura único. Crie uma única instância do conetor KafkaIO e configure-a para ler vários tópicos. Em seguida, filtre por nome do tópico para aplicar uma lógica diferente por tópico. Para ver um exemplo de código, consulte o artigo Leia a partir de vários tópicos. Considere esta opção se todos os seus tópicos estiverem localizados no mesmo cluster. Uma desvantagem é que os problemas com uma única origem ou transformação podem fazer com que todos os tópicos acumulem atrasos.

      Para exemplos de utilização mais avançados, transmita um conjunto de KafkaSourceDescriptor objetos que especifiquem os tópicos a partir dos quais ler. A utilização de KafkaSourceDescriptor permite-lhe atualizar a lista de tópicos mais tarde, se necessário. Esta funcionalidade requer o Java com o Runner v2.

    • Vários passos de leitura. Para ler a partir de tópicos localizados em diferentes clusters, o seu pipeline pode incluir várias instâncias KafkaIO. Enquanto a tarefa está em execução, pode atualizar origens individuais através de mapeamentos de transformação. A definição de um novo tópico ou cluster só é suportada quando usa o Runner v2. A observabilidade é um potencial desafio com esta abordagem, porque tem de monitorizar cada transformação de leitura individual em vez de se basear em métricas ao nível do pipeline.

Confirmação de volta para o Kafka

Por predefinição, o conetor KafkaIO não usa deslocamentos do Kafka para acompanhar o progresso e não confirma novamente no Kafka. Se chamar commitOffsetsInFinalize, o conetor faz o melhor esforço para confirmar novamente no Kafka depois de os registos serem confirmados no Dataflow. Os registos comprometidos no Dataflow podem não ser totalmente processados. Por isso, se cancelar o pipeline, pode ser comprometido um desvio sem que os registos sejam totalmente processados.

Uma vez que a definição enable.auto.commit=True confirma os desvios assim que são lidos do Kafka sem processamento pelo Dataflow, não é recomendável usar esta opção. A recomendação é definir enable.auto.commit=False e commitOffsetsInFinalize=True. Se definir enable.auto.commit como True, os dados podem ser perdidos se o pipeline for interrompido durante o tratamento. Os registos já confirmados no Kafka podem ser ignorados.

Marcas de água

Por predefinição, o conetor KafkaIO usa o tempo de processamento atual para atribuir a marca de água de saída e a data/hora do evento. Para alterar este comportamento, chame withTimestampPolicyFactory e atribua um TimestampPolicy. O Beam fornece implementações de TimestampPolicy que calculam a marca de água com base na hora de anexação do registo do Kafka ou na hora de criação da mensagem.

Considerações sobre o corredor

O conetor KafkaIO tem duas implementações subjacentes para leituras do Kafka, o ReadFromKafkaViaUnbounded mais antigo e o ReadFromKafkaViaSDF mais recente. O Dataflow escolhe automaticamente a melhor implementação para a sua tarefa com base no idioma do SDK e nos requisitos da tarefa. Evite pedir explicitamente uma implementação do executor ou do Kafka, a menos que precise de funcionalidades específicas disponíveis apenas nessa implementação. Para mais informações sobre como escolher um executor, consulte o artigo Use o Dataflow Runner v2.

Se o seu pipeline usar withTopic ou withTopics, a implementação mais antiga consulta o Kafka no momento da construção do pipeline para as partições disponíveis. A máquina que cria o pipeline tem de ter autorização para se ligar ao Kafka. Se receber um erro de autorização, verifique se tem autorizações para se ligar ao Kafka localmente. Pode evitar este problema usando withTopicPartitions, que não se liga ao Kafka no momento da criação do pipeline.

Implemente na produção

Quando implementa a sua solução em produção, recomendamos que use modelos flexíveis. Ao usar um modelo flexível, o pipeline é iniciado a partir de um ambiente consistente, o que pode ajudar a mitigar problemas de configuração local.

O registo de KafkaIO pode ser bastante detalhado. Considere reduzir o nível de registo na produção da seguinte forma:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Para mais informações, consulte o artigo Defina os níveis de registo do trabalhador do pipeline.

Configure a rede

Por predefinição, o Dataflow inicia instâncias na sua rede da nuvem virtual privada (VPC) predefinida. Consoante a configuração do Kafka, pode ter de configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte o artigo Especifique uma rede e uma sub-rede. Quando configurar a sua rede, crie regras de firewall que permitam que as máquinas de trabalho do Dataflow alcancem os agentes Kafka.

Se estiver a usar os VPC Service Controls, coloque o cluster Kafka dentro do perímetro dos VPC Service Controls ou, caso contrário, estenda os perímetros à VPN ou ao Cloud Interconnect autorizados.

Se o cluster do Kafka estiver implementado fora do Google Cloud, tem de criar uma ligação de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes compromissos:

O Dedicated Interconnect é a melhor opção para um desempenho e uma fiabilidade previsíveis, mas a configuração pode demorar mais tempo porque terceiros têm de aprovisionar os novos circuitos. Com uma topologia baseada em IP público, pode começar rapidamente porque não é necessário fazer muito trabalho de rede.

As duas secções seguintes descrevem estas opções mais detalhadamente.

Espaço de endereço RFC 1918 partilhado

Tanto a interligação dedicada como a VPN IPsec dão-lhe acesso direto a endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a sua configuração do Kafka. Se estiver a usar uma topologia baseada em VPN, considere configurar uma VPN de débito elevado.

Por predefinição, o Dataflow inicia instâncias na sua rede VPC predefinida. Numa topologia de rede privada com rotas definidas explicitamente no Cloud Router que ligam sub-redes no Google Cloud a esse cluster do Kafka, precisa de mais controlo sobre a localização das suas instâncias do Dataflow. Pode usar o Dataflow para configurar os network e os subnetwork parâmetros de execução.

Certifique-se de que a sub-rede correspondente tem endereços IP suficientes disponíveis para o Dataflow iniciar instâncias quando tenta aumentar a escala. Além disso, quando criar uma rede separada para lançar as suas instâncias do Dataflow, certifique-se de que tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede predefinida já tem esta regra de firewall configurada.

Espaço de endereços IP públicos

Esta arquitetura usa o Transport Layer Security (TLS) para proteger o tráfego entre clientes externos e o Kafka, e usa tráfego não encriptado para a comunicação entre agentes. Quando o ouvinte do Kafka se associa a uma interface de rede usada para comunicação interna e externa, a configuração do ouvinte é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes Kafka no cluster diferem das interfaces de rede internas que o Kafka usa. Nesses cenários, pode usar a propriedade advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Os clientes externos estabelecem ligação através da porta 9093 através de um canal "SSL" e os clientes internos estabelecem ligação através da porta 9092 através de um canal de texto simples. Quando especificar um endereço em advertised.listeners, use nomes DNS (kafkabroker-n.mydomain.com, neste exemplo) que sejam resolvidos para a mesma instância para tráfego externo e interno. A utilização de endereços IP públicos pode não funcionar, uma vez que os endereços podem não ser resolvidos para tráfego interno.

Sintonize o Kafka

As definições do cluster Kafka e do cliente Kafka podem ter um grande impacto no desempenho. Em particular, as seguintes definições podem ser demasiado baixas. Esta secção apresenta alguns pontos de partida sugeridos, mas deve experimentar estes valores para a sua carga de trabalho específica.

  • unboundedReaderMaxElements. A predefinição é 10 000. Um valor mais elevado, como 100 000, pode aumentar o tamanho dos pacotes, o que pode melhorar significativamente o desempenho se o seu pipeline incluir agregações. No entanto, um valor mais elevado também pode aumentar a latência. Para definir o valor, use setUnboundedReaderMaxElements. Esta definição não se aplica ao Runner v2. Para o Runner v2, use a opção do serviço Dataflow.sdf_checkpoint_after_output_bytes

  • unboundedReaderMaxReadTimeMs. A predefinição é 10 000 ms. Um valor mais elevado, como 20 000 ms, pode aumentar o tamanho do pacote, enquanto um valor mais baixo, como 5000 ms, pode reduzir a latência ou o atraso. Para definir o valor, use setUnboundedReaderMaxReadTimeMs. Esta definição não se aplica ao Runner v2. Para o Runner v2, use a opção do serviço Dataflow.sdf_checkpoint_after_duration

  • max.poll.records. A predefinição é 500. Um valor mais elevado pode ter um melhor desempenho ao obter mais registos recebidos em conjunto, especialmente quando usa o Runner v2. Para definir o valor, chame withConsumerConfigUpdates.

  • fetch.max.bytes. A predefinição é 1 MB. Um valor mais elevado pode melhorar o débito reduzindo o número de pedidos, especialmente quando usa o Runner v2. No entanto, se o definir como demasiado elevado, pode aumentar a latência, embora o processamento a jusante seja mais provável de ser o principal obstáculo. Um valor inicial recomendado é de 100 MB. Para definir o valor, chame withConsumerConfigUpdates.

  • max.partition.fetch.bytes. A predefinição é 1 MB. Este parâmetro define a quantidade máxima de dados por partição que o servidor devolve. Aumentar o valor pode melhorar o débito reduzindo o número de pedidos, especialmente quando usa o Runner v2. No entanto, se o definir demasiado alto, pode aumentar a latência, embora o processamento a jusante seja mais provável de ser o principal gargalo. Um valor inicial recomendado é de 100 MB. Para definir o valor, chame withConsumerConfigUpdates.

  • consumerPollingTimeout. A predefinição é 2 segundos. Se o cliente consumidor expirar o tempo limite antes de poder ler quaisquer registos, experimente definir um valor mais elevado. Esta definição é mais relevante quando realiza leituras entre regiões ou leituras com uma rede lenta. Para definir o valor, chame withConsumerPollingTimeout.

Certifique-se de que receive.buffer.bytes é suficientemente grande para processar o tamanho das mensagens. Se o valor for demasiado pequeno, os registos podem mostrar que os consumidores estão a ser recriados continuamente e a procurar um deslocamento específico.

Exemplos

Os exemplos de código seguintes mostram como criar pipelines do Dataflow que leem a partir do Kafka. Quando usar as Credenciais predefinidas da aplicação em conjunto com o processador de callback fornecido pelo Google Cloud Managed Service para Apache Kafka, é necessária a versão 3.7.0 ou superior do kafka-clients.

Leia a partir de um único tópico

Este exemplo usa o conetor de E/S gerido. Mostra como ler a partir de um tópico do Kafka e escrever os payloads das mensagens em ficheiros de texto.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | beam.managed.Read(
                beam.managed.KAFKA,
                config={
                  "bootstrap_servers": options.bootstrap_server,
                  "topic": options.topic,
                  "data_format": "RAW",
                  "auto_offset_reset_config": "earliest",
                  # The max_read_time_seconds parameter is intended for testing.
                  # Avoid using this parameter in production.
                  "max_read_time_seconds": 5
                }
            )
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Leia a partir de vários tópicos

Este exemplo usa o conetor KafkaIO. Mostra como ler a partir de vários tópicos do Kafka e aplicar uma lógica de pipeline separada para cada tópico.

Para exemplos de utilização mais avançados, transmita dinamicamente um conjunto de objetos KafkaSourceDescriptor, para que possa atualizar a lista de tópicos a partir dos quais ler. Esta abordagem requer o Java com o Runner v2.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

O que se segue?