Gravar dados do Kafka no BigQuery usando o Dataflow

Nesta página, mostramos como usar o Dataflow para ler dados do Google Cloud Managed Service para Apache Kafka e gravar os registros em uma tabela do BigQuery. Este tutorial usa o modelo do Apache Kafka para BigQuery para criar o job do Dataflow.

Visão geral

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. O Kafka é usado com frequência em arquiteturas distribuídas para permitir a comunicação entre componentes acoplado com flexibilidade. É possível usar o Dataflow para ler eventos do Kafka, processá-los e gravar os resultados em uma tabela do BigQuery para uma análise mais aprofundada.

O Managed Service para Apache Kafka é um serviço do Google Cloud Platform que ajuda você a executar clusters do Kafka seguros e escalonáveis.

Como ler eventos do Kafka no BigQuery
Arquitetura orientada a eventos usando o Apache Kafka

Permissões necessárias

A conta de serviço do worker do Dataflow precisa ter os seguintes papéis do gerenciamento de identidade e acesso (IAM):

  • Cliente Kafka gerenciado (roles/managedkafka.client)
  • Editor de dados do BigQuery (roles/bigquery.dataEditor)

Para mais informações, consulte Segurança e permissões do Dataflow.

Criar um cluster do Kafka

Nesta etapa, você vai criar um cluster do Managed Service para Apache Kafka. Para mais informações, consulte Criar um cluster do serviço gerenciado para Apache Kafka.

Console

  1. Acesse a página Serviço gerenciado para Apache Kafka > Clusters.

    Acessar Clusters

  2. Clique em Criar.

  3. Na caixa Nome do cluster, insira um nome para o cluster.

  4. Na lista Região, selecione um local para o cluster.

  5. Clique em Criar.

gcloud

Use o comando managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Substitua:

  • CLUSTER: um nome para o cluster
  • REGION: a região em que você criou a sub-rede.
  • PROJECT_ID: ID do projeto;
  • SUBNET_NAME: a sub-rede em que você quer implantar o cluster.

A criação de um cluster geralmente leva de 20 a 30 minutos.

Criar um tópico do Kafka

Depois que o cluster do Managed Service para Apache Kafka for criado, crie um tópico.

Console

  1. Acesse a página Serviço gerenciado para Apache Kafka > Clusters.

    Acessar Clusters

  2. Clique no nome do cluster.

  3. Na página de detalhes do cluster, clique em Criar tópico.

  4. Na caixa Nome do tópico, insira um nome para o tópico.

  5. Clique em Criar.

gcloud

Use o comando managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Substitua:

  • TOPIC_NAME: o nome do tópico a ser criado

Criar uma tabela do BigQuery

Nesta etapa, você cria uma tabela do BigQuery com o seguinte esquema:

Nome da coluna Tipo de dado
name STRING
customer_id INTEGER

Se você ainda não tiver um conjunto de dados do BigQuery, crie um primeiro. Para mais informações, consulte Criar conjuntos de dados. Em seguida, crie uma tabela vazia:

Console

  1. Acessar a página do BigQuery.

    Ir para o BigQuery

  2. No painel Explorer, expanda seu projeto e selecione um conjunto de dados.

  3. Na seção Informações do conjunto de dados, clique em Criar tabela.

  4. Na lista Criar tabela de, selecione Tabela vazia.

  5. Na caixa Tabela, insira o nome da tabela.

  6. Na seção Esquema, clique em Editar como texto.

  7. Cole a seguinte definição de esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Clique em Criar tabela.

gcloud

Use o comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Substitua:

  • PROJECT_ID: o ID do projeto
  • DATASET_NAME: o nome do conjunto de dados
  • TABLE_NAME: o nome da tabela a ser criada

Executar o job do Dataflow

Depois de criar o cluster do Kafka e a tabela do BigQuery, execute o modelo do Dataflow.

Console

Primeiro, extraia o endereço do servidor de inicialização do cluster:

  1. No Google Cloud console, acesse a página Clusters.

    Acessar Clusters

  2. Clique no nome do cluster.

  3. Clique na guia Configurações.

  4. Copie o endereço do servidor de inicialização em URL de inicialização.

Em seguida, execute o modelo para criar o job do Dataflow:

  1. Acesse a página Dataflow > Jobs.

    Acessar "Jobs"

  2. Clique em Criar job usando um modelo.

  3. No campo Nome do job, insira kafka-to-bq.

  4. Em Endpoint regional, selecione a região em que seu cluster do Serviço gerenciado para Apache Kafka está localizado.

  5. Selecione o modelo "Kafka para BigQuery".

  6. Insira os seguintes parâmetros de modelo:

    • Servidor de inicialização do Kafka: o endereço do servidor de inicialização
    • Tópico de origem do Kafka: o nome do tópico a ser lido
    • Modo de autenticação de origem do Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • Formato de mensagem do Kafka: JSON
    • Estratégia de nome da tabela: SINGLE_TABLE_NAME
    • Tabela de saída do BigQuery: a tabela do BigQuery, formatada da seguinte maneira: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. Em Fila de mensagens não entregues, marque Gravar erros no BigQuery.

  8. Insira um nome de tabela do BigQuery para a fila de mensagens não entregues, formatado da seguinte maneira: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    Não crie essa tabela com antecedência. O pipeline cria.

  9. Cliquem em Executar job.

gcloud

Use o comando dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Substitua as seguintes variáveis:

  • LOCATION: a região em que o Serviço gerenciado para Apache Kafka está localizado.
  • PROJECT_ID: o nome do seu projeto do Google Cloud Platform
  • CLUSTER_ID: o nome do cluster
  • TOPIC: o nome do tópico do Kafka
  • DATASET_NAME: o nome do conjunto de dados
  • TABLE_NAME: o nome da tabela
  • ERROR_TABLE_NAME: um nome de tabela do BigQuery para a fila de mensagens não entregues

Não crie a tabela para a fila de mensagens inativas com antecedência. O pipeline cria isso.

Enviar mensagens para o Kafka

Depois que o job do Dataflow for iniciado, você poderá enviar mensagens para o Kafka, e o pipeline as gravará no BigQuery.

  1. Crie uma VM na mesma sub-rede do cluster do Kafka e instale as ferramentas de linha de comando do Kafka. Para instruções detalhadas, consulte Configurar uma máquina cliente em Publicar e consumir mensagens com a CLI.

  2. Execute o comando a seguir para gravar mensagens no tópico do Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Substitua as seguintes variáveis:

    • TOPIC: o nome do tópico do Kafka
    • CLUSTER_ID: o nome do cluster.
    • LOCATION: a região em que o cluster está localizado.
    • PROJECT_ID: o nome do seu projeto do Google Cloud Platform
  3. No prompt, insira as seguintes linhas de texto para enviar mensagens ao Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Usar uma fila de mensagens inativas

Enquanto o job estiver em execução, o pipeline poderá não gravar mensagens individuais no BigQuery. Possíveis erros incluem:

  • Erros de serialização, incluindo JSON formatado incorretamente.
  • Digite erros de conversão, causados por uma incompatibilidade no esquema da tabela e nos dados JSON.
  • Campos extras nos dados JSON que não estão no esquema da tabela.

Esses erros não causam falha no job e não aparecem como erros no registro de jobs do Dataflow. Em vez disso, o pipeline usa uma fila de mensagens inativas para processar esses tipos de erro.

Para ativar a fila de mensagens não entregues ao executar o modelo, defina os seguintes parâmetros de modelo:

  • useBigQueryDLQ: true
  • outputDeadletterTable: um nome de tabela totalmente qualificado do BigQuery, por exemplo, my-project:dataset1.errors

O pipeline cria a tabela automaticamente. Se ocorrer um erro ao processar uma mensagem do Kafka, o pipeline vai gravar uma entrada de erro na tabela.

Exemplo de mensagens de erro:

Tipo de erro Dados do evento errorMessage
Erro de serialização "Hello world" Falha ao serializar json para a linha da tabela: "Hello world"
Erro de conversão de tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo desconhecido {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Trabalhar com tipos de dados do BigQuery

Internamente, o conector de E/S do Kafka converte payloads de mensagens JSON em objetos TableRow do Apache Beam e traduz os valores do campo TableRow em tipos do BigQuery.

A tabela a seguir mostra representações JSON dos tipos de dados do BigQuery.

Tipo do BigQuery Representação JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Especifique a região geográfica usando texto conhecido (WKT) ou GeoJSON, formatado como uma string. Para mais informações, consulte Como carregar dados geoespaciais.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Use o método Date.toJSON do JavaScript para formatar o valor.

Dados estruturados

Se as mensagens JSON seguirem um esquema consistente, é possível representar objetos JSON usando o tipo de dados STRUCT no BigQuery.

No exemplo a seguir, o campo answers é um objeto JSON com dois subcampos, a e b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

A instrução SQL a seguir cria uma tabela do BigQuery com um esquema compatível:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

A tabela terá esta aparência:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Dados semiestruturados

Se as mensagens JSON não seguirem um esquema rigoroso, armazene-as no BigQuery como um tipo de dados JSON. Ao armazenar dados JSON como um tipo JSON, não é necessário definir o esquema de antemão. Após a ingestão de dados, é possível consultar os dados usando os operadores de acesso a campo (notação por pontos) e de matriz no GoogleSQL. Para mais informações, consulte Como trabalhar com dados JSON no GoogleSQL.

Usar uma UDF para transformar os dados

Este tutorial pressupõe que as mensagens do Kafka estão formatadas como JSON e que o esquema da tabela do BigQuery corresponde aos dados JSON, sem transformações aplicadas aos dados.

Também é possível fornecer uma função JavaScript definida pelo usuário (UDF) que transforma os dados antes que eles sejam gravados no BigQuery. A UDF também pode realizar outros processamentos, como filtragem, remoção de informações de identificação pessoal (PII) ou enriquecimento dos dados com campos adicionais.

Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

A seguir