Escribe datos de Kafka en BigQuery con Dataflow

En esta página, se muestra cómo usar Dataflow para leer datos del Servicio administrado de Google Cloud para Apache Kafka y escribir los registros en una tabla de BigQuery. En este instructivo, se usa la plantilla de Apache Kafka a BigQuery para crear el trabajo de Dataflow.

Descripción general

Apache Kafka es una plataforma de código abierto para eventos de transmisión. Kafka se suele usar en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Puedes usar Dataflow para leer eventos de Kafka, procesarlos y escribir los resultados en una tabla de BigQuery para su posterior análisis.

El servicio administrado para Apache Kafka es un servicio de Google Cloud Platform que te ayuda a ejecutar clústeres de Kafka seguros y escalables.

Lee eventos de Kafka en BigQuery
Arquitectura basada en eventos con Apache Kafka

Permisos necesarios

La cuenta de servicio del trabajador de Dataflow debe tener los siguientes roles de Identity and Access Management (IAM):

  • Cliente de Kafka administrado (roles/managedkafka.client)
  • Editor de datos de BigQuery (roles/bigquery.dataEditor)

Para obtener más información, consulta Seguridad y permisos de Dataflow.

Crea un clúster de Kafka

En este paso, crearás un clúster de Managed Service para Apache Kafka. Para obtener más información, consulta Crea un clúster de Managed Service para Apache Kafka.

Console

  1. Ve a la página Clusters de Managed Service for Apache Kafka >.

    Ir a los clústeres

  2. Haz clic en Crear.

  3. En el cuadro Nombre del clúster, ingresa un nombre para el clúster.

  4. En la lista Región, selecciona una ubicación para el clúster.

  5. Haz clic en Crear.

gcloud

Usa el comando managed-kafka clusters create:

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Reemplaza lo siguiente:

  • CLUSTER: Un nombre para el clúster
  • REGION: La región en la que creaste la subred
  • PROJECT_ID: El ID de tu proyecto
  • SUBNET_NAME: Es la subred en la que deseas implementar el clúster.

Por lo general, la creación de un clúster tarda entre 20 y 30 minutos.

Crea un tema de Kafka

Después de crear el clúster de Managed Service para Apache Kafka, crea un tema.

Console

  1. Ve a la página Clusters de Managed Service for Apache Kafka >.

    Ir a los clústeres

  2. Haz clic en el nombre del clúster.

  3. En la página de detalles del clúster, haz clic en Crear tema.

  4. En el cuadro Nombre del tema, ingresa un nombre para el tema.

  5. Haz clic en Crear.

gcloud

Usa el comando managed-kafka topics create:

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Reemplaza lo siguiente:

  • TOPIC_NAME: El nombre del tema que se creará

Crea una tabla de BigQuery

En este paso, crearás una tabla de BigQuery con el siguiente esquema:

Nombre de la columna Tipo de datos
name STRING
customer_id INTEGER

Si aún no tienes un conjunto de datos de BigQuery, primero crea uno. Para obtener más información, consulta Crea conjuntos de datos. Luego, crea una tabla vacía nueva:

Console

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, expande tu proyecto y, luego, elige un conjunto de datos.

  3. En la sección Información del conjunto de datos, haz clic en Crear tabla.

  4. En la lista Crear tabla desde, selecciona Tabla vacía.

  5. En el cuadro Tabla, ingresa el nombre de la tabla.

  6. En la sección Schema, haz clic en Edit as text.

  7. Pega la siguiente definición del esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Haz clic en Crear tabla.

gcloud

Usa el comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto.
  • DATASET_NAME: El nombre del conjunto de datos.
  • TABLE_NAME: el nombre de la tabla que se creará

Ejecuta el trabajo de Dataflow:

Después de crear el clúster de Kafka y la tabla de BigQuery, ejecuta la plantilla de Dataflow.

Console

Primero, obtén la dirección del servidor de arranque del clúster:

  1. En la consola de Google Cloud , ve a la página Clústeres.

    Ir a los clústeres

  2. Haz clic en el nombre del clúster.

  3. Haz clic en la pestaña Configurations.

  4. Copia la dirección del servidor de arranque de Bootstrap URL.

A continuación, ejecuta la plantilla para crear el trabajo de Dataflow:

  1. Ve a la página Trabajos de Dataflow >.

    Ir a Trabajos

  2. Haz clic en Crear trabajo a partir de una plantilla.

  3. En el campo Nombre del trabajo, ingresa kafka-to-bq.

  4. En Extremo regional, selecciona la región en la que se encuentra tu clúster de Servicio administrado para Apache Kafka.

  5. Selecciona la plantilla “Kafka a BigQuery”.

  6. Ingresa los siguientes parámetros de plantilla:

    • Servidor de arranque de Kafka: La dirección del servidor de arranque
    • Tema de Kafka de origen: Es el nombre del tema que se leerá.
    • Modo de autenticación de origen de Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • Formato del mensaje de Kafka: JSON
    • Estrategia de nombres de tablas: SINGLE_TABLE_NAME
    • Tabla de salida de BigQuery: Es la tabla de BigQuery, con el siguiente formato: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. En Cola de mensajes no entregados, marca la casilla de verificación Escribir errores en BigQuery.

  8. Ingresa un nombre de tabla de BigQuery para la cola de mensajes no entregados, con el siguiente formato: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    No crees esta tabla con anticipación. La canalización lo crea.

  9. Haga clic en Ejecutar trabajo.

gcloud

Usa el comando dataflow flex-template run:

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Reemplaza las siguientes variables:

  • LOCATION: Es la región en la que se encuentra tu servicio administrado para Apache Kafka.
  • PROJECT_ID: El nombre de tu proyecto de Google Cloud Platform
  • CLUSTER_ID: Es el nombre del clúster.
  • TOPIC: Es el nombre del tema de Kafka.
  • DATASET_NAME: El nombre del conjunto de datos.
  • TABLE_NAME: El nombre de la tabla.
  • ERROR_TABLE_NAME: Es el nombre de una tabla de BigQuery para la cola de mensajes no entregados.

No crees la tabla para la cola de mensajes no entregados con anticipación. La canalización lo crea.

Envía mensajes a Kafka

Después de que se inicie el trabajo de Dataflow, puedes enviar mensajes a Kafka, y la canalización los escribirá en BigQuery.

  1. Crea una VM en la misma subred que el clúster de Kafka y, luego, instala las herramientas de línea de comandos de Kafka. Para obtener instrucciones detalladas, consulta Configura una máquina cliente en Publica y consume mensajes con la CLI.

  2. Ejecuta el siguiente comando para escribir mensajes en el tema de Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Reemplaza las siguientes variables:

    • TOPIC: Es el nombre del tema de Kafka.
    • CLUSTER_ID: es el nombre del clúster.
    • LOCATION: Es la región en la que se encuentra el clúster.
    • PROJECT_ID: El nombre de tu proyecto de Google Cloud Platform
  3. En el mensaje, ingresa las siguientes líneas de texto para enviar mensajes a Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Usa una cola de mensajes no entregados

Mientras se ejecuta el trabajo, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Los errores posibles incluyen los siguientes:

  • Errores de serialización, incluido el JSON con formato incorrecto
  • Errores de conversión de tipo, causados por una falta de coincidencia en el esquema de la tabla y los datos JSON.
  • Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.

Estos errores no provocan la falla del trabajo y no aparecen como errores en el registro del trabajo de Dataflow. En su lugar, la canalización usa una cola de mensajes no entregados para controlar estos tipos de errores.

Para habilitar la cola de mensajes no entregados cuando ejecutes la plantilla, establece los siguientes parámetros de plantilla:

  • useBigQueryDLQ: true
  • outputDeadletterTable: Es el nombre completo de una tabla de BigQuery, por ejemplo, my-project:dataset1.errors.

La canalización crea la tabla automáticamente. Si se produce un error al procesar un mensaje de Kafka, la canalización escribe una entrada de error en la tabla.

Ejemplos de mensajes de error:

Tipo de error Datos de eventos errorMessage
Error de serialización "Hello world" No se pudo serializar el JSON en la fila de la tabla: "Hello world"
Error de conversión de tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo desconocido {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Trabaja con tipos de datos de BigQuery

Internamente, el conector de E/S de Kafka convierte las cargas útiles de los mensajes JSON en objetos TableRow de Apache Beam y traduce los valores de los campos TableRow en tipos de BigQuery.

En la siguiente tabla, se muestran las representaciones JSON de los tipos de datos de BigQuery.

Tipo de BigQuery Representación JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Especifica la ubicación geográfica con texto conocido (WKT) o GeoJSON, con formato de cadena. Para obtener más información, consulta Carga datos geoespaciales.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Usa el método Date.toJSON de JavaScript para dar formato al valor.

Datos estructurados

Si tus mensajes JSON siguen un esquema coherente, puedes representar objetos JSON con el tipo de datos STRUCT en BigQuery.

En el siguiente ejemplo, el campo answers es un objeto JSON con dos subcampos, a y b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

La siguiente instrucción de SQL crea una tabla de BigQuery con un esquema compatible:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

La tabla resultante se ve de la siguiente manera:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Datos semiestructurados

Si tus mensajes JSON no siguen un esquema estricto, considera almacenarlos en BigQuery como un tipo de datos JSON. Si almacenas datos JSON como un tipo JSON, no necesitas definir el esquema por adelantado. Después de la transferencia de datos, puedes consultarlos con los operadores de acceso a campos (notación de puntos) y acceso a arrays en GoogleSQL. Para obtener más información, consulta Trabaja con datos JSON en GoogleSQL.

Usar una UDF para transformar los datos

En este instructivo, se supone que los mensajes de Kafka tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON, sin que se apliquen transformaciones a los datos.

De forma opcional, puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de que se escriban en BigQuery. La UDF también puede realizar un procesamiento adicional, como filtrar, quitar información de identificación personal (PII) o enriquecer los datos con campos adicionales.

Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

¿Qué sigue?