Use o Dataflow com o Managed Service para Apache Kafka

Esta página descreve como usar o Google Cloud Managed Service para Apache Kafka como origem ou destino num pipeline do Dataflow.

Pode usar uma das seguintes abordagens:

Requisitos

  • Ative as APIs Cloud Storage, Dataflow e Managed Service for Apache Kafka no seu projeto. Consulte o artigo Ativar APIs ou execute o seguinte comando da CLI do Google Cloud:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • A conta de serviço do worker do Dataflow tem de ter a função de gestão de identidade e de acesso (IAM) do cliente Kafka gerido (roles/managedkafka.client).

  • As VMs do worker do Dataflow têm de ter acesso à rede ao servidor de arranque do Kafka. Para mais informações, consulte o artigo Configure a rede do serviço gerido para Apache Kafka.

Obtenha a morada do servidor de arranque

Para executar um pipeline que se liga a um cluster do Managed Service for Apache Kafka, obtenha primeiro o endereço do servidor de arranque do cluster. Precisa deste endereço quando configurar o pipeline.

Pode usar a Google Cloud consola ou a Google Cloud CLI, da seguinte forma:

Consola

  1. Na Google Cloud consola, aceda à página Clusters.

    Aceda a Clusters

  2. Clique no nome do cluster.

  3. Clique no separador Configurações.

  4. Copie o endereço do servidor de arranque do URL de arranque.

gcloud

Use o comando managed-kafka clusters describe.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

Substitua o seguinte:

  • CLUSTER_ID: o ID ou o nome do cluster
  • LOCATION: a localização do cluster

Para mais informações, consulte o artigo Veja um cluster do Managed Service para Apache Kafka.

Use o Managed Service para Apache Kafka com um modelo do Dataflow

A Google oferece vários modelos do Dataflow que leem a partir do Apache Kafka:

Estes modelos podem ser usados com o Managed Service for Apache Kafka. Se um deles corresponder ao seu exemplo de utilização, considere usá-lo em vez de escrever código de pipeline personalizado.

Consola

  1. Aceda à página Dataflow > Tarefas.

    Aceda a Empregos

  2. Clique em Criar tarefa a partir de modelo.

  3. Em Nome do trabalho, introduza um nome para o trabalho.

  4. No menu pendente do modelo Dataflow, selecione o modelo a executar.

  5. Na caixa Servidor de arranque do Kafka, introduza o endereço do servidor de arranque.

  6. Na caixa Tópico do Kafka, introduza o nome do tópico.

  7. Para o modo de autenticação do Kafka, selecione APPLICATION_DEFAULT_CREDENTIALS.

  8. Para o formato de mensagem Kafka, selecione o formato das mensagens do Apache Kafka.

  9. Introduza outros parâmetros, conforme necessário. Os parâmetros suportados estão documentados para cada modelo.

  10. Executar tarefa.

gcloud

Use o comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

Substitua o seguinte:

  • JOB_NAME: um nome para a tarefa
  • TEMPLATE_FILE: a localização do ficheiro de modelo no armazenamento na nuvem
  • REGION_NAME: a região onde quer implementar o seu trabalho
  • PROJECT_NAME: o nome do seu projeto da Google Cloud Platform
  • LOCATION: a localização do cluster
  • CLUSTER_ID: o ID ou o nome do cluster
  • TOPIC: o nome do tópico do Kafka

Use o Managed Service for Apache Kafka com um pipeline do Beam

Esta secção descreve como usar o SDK do Apache Beam para criar e executar um pipeline do Dataflow que se liga ao Managed Service para Apache Kafka.

Na maioria dos cenários, use a transformação de E/S gerida como origem ou destino do Kafka. Se precisar de uma otimização do desempenho mais avançada, considere usar o conector KafkaIO. Para mais informações sobre as vantagens de usar a E/S gerida, consulte o artigo E/S gerida do Dataflow.

Requisitos

  • Versão 3.6.0 ou posterior do cliente Kafka.

  • Versão 2.61.0 ou posterior do SDK do Apache Beam.

  • A máquina onde inicia a tarefa do Dataflow tem de ter acesso à rede ao servidor de arranque do Apache Kafka. Por exemplo, inicie a tarefa a partir de uma instância do Compute Engine que possa aceder à VPC onde o cluster está acessível.

  • O principal que cria a tarefa tem de ter as seguintes funções do IAM:

    • Cliente Kafka gerido (roles/managedkafka.client) para aceder ao cluster do Apache Kafka.
    • Utilizador da conta de serviço (roles/iam.serviceAccountUser) para atuar como conta de serviço do trabalhador do Dataflow.
    • Administrador de armazenamento (roles/storage.admin) para carregar ficheiros de tarefas para o Cloud Storage.
    • Administrador do Dataflow (roles/dataflow.admin) para criar a tarefa.

    Se iniciar a tarefa a partir de uma instância do Compute Engine, pode conceder estas funções a uma conta de serviço associada à VM. Para mais informações, consulte o artigo Crie uma VM que use uma conta de serviço gerida pelo utilizador.

    Também pode usar as credenciais padrão da aplicação (ADC) com a representação da conta de serviço quando cria a tarefa.

Configure a E/S gerida

Se o seu pipeline usar o Managed I/O para Apache Kafka, defina as seguintes opções de configuração para autenticar com o Managed Service para Apache Kafka:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

Os exemplos seguintes mostram como configurar a E/S gerida para o Managed Service para Apache Kafka:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

Configure o conetor KafkaIO

Os exemplos seguintes mostram como configurar o conetor KafkaIO para o Managed Service for Apache Kafka:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

O que se segue?