Esta página descreve como usar o Google Cloud Managed Service para Apache Kafka como origem ou destino num pipeline do Dataflow.
Pode usar uma das seguintes abordagens:
Requisitos
Ative as APIs Cloud Storage, Dataflow e Managed Service for Apache Kafka no seu projeto. Consulte o artigo Ativar APIs ou execute o seguinte comando da CLI do Google Cloud:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.comA conta de serviço do worker do Dataflow tem de ter a função de gestão de identidade e de acesso (IAM) do cliente Kafka gerido (
roles/managedkafka.client).As VMs do worker do Dataflow têm de ter acesso à rede ao servidor de arranque do Kafka. Para mais informações, consulte o artigo Configure a rede do serviço gerido para Apache Kafka.
Obtenha a morada do servidor de arranque
Para executar um pipeline que se liga a um cluster do Managed Service for Apache Kafka, obtenha primeiro o endereço do servidor de arranque do cluster. Precisa deste endereço quando configurar o pipeline.
Pode usar a Google Cloud consola ou a Google Cloud CLI, da seguinte forma:
Consola
Na Google Cloud consola, aceda à página Clusters.
Clique no nome do cluster.
Clique no separador Configurações.
Copie o endereço do servidor de arranque do URL de arranque.
gcloud
Use o comando managed-kafka clusters describe.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Substitua o seguinte:
- CLUSTER_ID: o ID ou o nome do cluster
- LOCATION: a localização do cluster
Para mais informações, consulte o artigo Veja um cluster do Managed Service para Apache Kafka.
Use o Managed Service para Apache Kafka com um modelo do Dataflow
A Google oferece vários modelos do Dataflow que leem a partir do Apache Kafka:
Estes modelos podem ser usados com o Managed Service for Apache Kafka. Se um deles corresponder ao seu exemplo de utilização, considere usá-lo em vez de escrever código de pipeline personalizado.
Consola
Aceda à página Dataflow > Tarefas.
Clique em Criar tarefa a partir de modelo.
Em Nome do trabalho, introduza um nome para o trabalho.
No menu pendente do modelo Dataflow, selecione o modelo a executar.
Na caixa Servidor de arranque do Kafka, introduza o endereço do servidor de arranque.
Na caixa Tópico do Kafka, introduza o nome do tópico.
Para o modo de autenticação do Kafka, selecione APPLICATION_DEFAULT_CREDENTIALS.
Para o formato de mensagem Kafka, selecione o formato das mensagens do Apache Kafka.
Introduza outros parâmetros, conforme necessário. Os parâmetros suportados estão documentados para cada modelo.
Executar tarefa.
gcloud
Use o comando
gcloud dataflow jobs run.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Substitua o seguinte:
- JOB_NAME: um nome para a tarefa
- TEMPLATE_FILE: a localização do ficheiro de modelo no armazenamento na nuvem
- REGION_NAME: a região onde quer implementar o seu trabalho
- PROJECT_NAME: o nome do seu projeto da Google Cloud Platform
- LOCATION: a localização do cluster
- CLUSTER_ID: o ID ou o nome do cluster
- TOPIC: o nome do tópico do Kafka
Use o Managed Service for Apache Kafka com um pipeline do Beam
Esta secção descreve como usar o SDK do Apache Beam para criar e executar um pipeline do Dataflow que se liga ao Managed Service para Apache Kafka.
Na maioria dos cenários, use a transformação de E/S gerida como origem ou destino do Kafka. Se precisar de uma otimização do desempenho mais avançada, considere usar o conector KafkaIO.
Para mais informações sobre as vantagens de usar a E/S gerida, consulte o artigo
E/S gerida do Dataflow.
Requisitos
Versão 3.6.0 ou posterior do cliente Kafka.
Versão 2.61.0 ou posterior do SDK do Apache Beam.
A máquina onde inicia a tarefa do Dataflow tem de ter acesso à rede ao servidor de arranque do Apache Kafka. Por exemplo, inicie a tarefa a partir de uma instância do Compute Engine que possa aceder à VPC onde o cluster está acessível.
O principal que cria a tarefa tem de ter as seguintes funções do IAM:
- Cliente Kafka gerido (
roles/managedkafka.client) para aceder ao cluster do Apache Kafka. - Utilizador da conta de serviço (
roles/iam.serviceAccountUser) para atuar como conta de serviço do trabalhador do Dataflow. - Administrador de armazenamento (
roles/storage.admin) para carregar ficheiros de tarefas para o Cloud Storage. - Administrador do Dataflow (
roles/dataflow.admin) para criar a tarefa.
Se iniciar a tarefa a partir de uma instância do Compute Engine, pode conceder estas funções a uma conta de serviço associada à VM. Para mais informações, consulte o artigo Crie uma VM que use uma conta de serviço gerida pelo utilizador.
Também pode usar as credenciais padrão da aplicação (ADC) com a representação da conta de serviço quando cria a tarefa.
- Cliente Kafka gerido (
Configure a E/S gerida
Se o seu pipeline usar o Managed I/O para Apache Kafka, defina as seguintes opções de configuração para autenticar com o Managed Service para Apache Kafka:
"security.protocol":"SASL_SSL""sasl.mechanism":"OAUTHBEARER""sasl.login.callback.handler.class":"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler""sasl.jaas.config":"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
Os exemplos seguintes mostram como configurar a E/S gerida para o Managed Service para Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configure o conetor KafkaIO
Os exemplos seguintes mostram como configurar o conetor KafkaIO para o
Managed Service for Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
O que se segue?
- Saiba mais sobre o Managed Service para Apache Kafka.
- Escreva dados do Managed Service para Apache Kafka no BigQuery.
- Ler do Apache Kafka para o Dataflow.
- Escrever do Dataflow para o Apache Kafka.