Los conectores receptores de BigQuery te permiten transmitir datos de Kafka a BigQuery, lo que habilita la transferencia y el análisis de datos en tiempo real dentro de BigQuery. Un conector receptor de BigQuery consume registros de uno o más temas de Kafka y escribe los datos en una o más tablas dentro de un solo conjunto de datos de BigQuery.
Antes de comenzar
Antes de crear un conector de receptor de BigQuery, asegúrate de tener lo siguiente:
Crea un clúster de Managed Service para Apache Kafka para tu clúster de Connect. Este clúster es el clúster principal de Kafka asociado con el clúster de Connect. Este clúster también es el clúster de origen que forma un extremo de la canalización del conector de receptor de BigQuery.
Crea un clúster de Connect para alojar tu conector de receptor de BigQuery.
Crea un conjunto de datos de BigQuery para almacenar los datos transmitidos desde Kafka.
Crea y configura un tema de Kafka en el clúster de origen. Los datos se transfieren de este tema de Kafka al conjunto de datos de BigQuery de destino.
Roles y permisos requeridos
Para obtener los permisos que necesitas para crear un conector de BigQuery Sink, pídele a tu administrador que te otorgue el rol de IAM de Editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Este rol predefinido contiene los permisos necesarios para crear un conector de BigQuery Sink. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:
Permisos necesarios
Se requieren los siguientes permisos para crear un conector de BigQuery Sink:
-
Otorga el permiso para crear un conector en el clúster de Connect principal:
managedkafka.connectors.create
También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.
Para obtener más información sobre el rol de Editor de Kafka Connector administrado, consulta Roles predefinidos de Managed Service for Apache Kafka.
Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster se encuentra en otro proyecto, consulta Crea un clúster de Connect en otro proyecto.
Otorga permisos para escribir en la tabla de BigQuery
La cuenta de servicio del clúster de Connect, que sigue el formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requiere permiso para escribir en la tabla de BigQuery. Para ello, otorga el rol de Editor de datos de BigQuery (roles/bigquery.dataEditor) a la cuenta de servicio del clúster de Connect en el proyecto que contiene la tabla de BigQuery.
Esquemas para un conector de receptor de BigQuery
El conector BigQuery Sink usa el convertidor de valores configurado (value.converter) para analizar los valores de los registros de Kafka en campos. Luego, escribe los campos en columnas con el mismo nombre en la tabla de BigQuery.
El conector requiere un esquema para funcionar. El esquema se puede proporcionar de las siguientes maneras:
- Esquema basado en mensajes: El esquema se incluye como parte de cada mensaje.
- Esquema basado en tablas: El conector infiere el esquema del mensaje a partir del esquema de la tabla de BigQuery.
- Registro de esquemas: El conector lee el esquema de un registro de esquemas, como el registro de esquemas de Managed Service para Apache Kafka (versión preliminar).
En las siguientes secciones, se describen estas opciones.
Esquema basado en mensajes
En este modo, cada registro de Kafka incluye un esquema JSON. El conector usa el esquema para escribir los datos del registro como una fila de la tabla de BigQuery.
Para usar esquemas basados en mensajes, establece las siguientes propiedades en el conector:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Ejemplo de valor de registro de Kafka:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
Si la tabla de destino ya existe, el esquema de la tabla de BigQuery debe ser compatible con el esquema del mensaje incorporado. Si es autoCreateTables=true, el conector crea automáticamente la tabla de destino si es necesario. Para obtener más información, consulta Creación de tablas.
Si deseas que el conector actualice el esquema de la tabla de BigQuery a medida que cambian los esquemas de mensajes, establece allowNewBigQueryFields, allowSchemaUnionization o allowBigQueryRequiredFieldRelaxation en true.
Esquema basado en tablas
En este modo, los registros de Kafka contienen datos JSON sin un esquema explícito. El conector infiere el esquema de la tabla de destino.
Requisitos:
- La tabla de BigQuery ya debe existir.
- Los datos del registro de Kafka deben ser compatibles con el esquema de la tabla.
- Este modo no admite actualizaciones dinámicas del esquema basadas en los mensajes entrantes.
Para usar esquemas basados en tablas, establece las siguientes propiedades en el conector:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
Si la tabla de BigQuery usa el particionamiento basado en el tiempo con particionamiento diario, bigQueryPartitionDecorator puede ser true. De lo contrario, establece esta propiedad en false.
Ejemplo de valor de registro de Kafka:
{
"user": "userId",
"age": 30
}
Registro de esquemas
En este modo, cada registro de Kafka contiene datos de Apache Avro, y el esquema del mensaje se almacena en un registro de esquemas.
Para usar el conector de receptor de BigQuery con un registro de esquemas, establece las siguientes propiedades en el conector:
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
Reemplaza SCHEMA_REGISTRY_URL por la URL del registro de esquemas.
Para usar el conector con el registro de esquemas de Managed Service for Apache Kafka, establece la siguiente propiedad:
value.converter.bearer.auth.credentials.source=GCP
Para obtener más información, consulta Usa Kafka Connect con el registro de esquemas.
Tablas de BigLake para Apache Iceberg en BigQuery
El conector de receptor de BigQuery admite tablas de BigLake para Apache Iceberg en BigQuery (en adelante, tablas de BigLake Iceberg en BigQuery) como destino del receptor.
Las tablas de BigLake Iceberg en BigQuery proporcionan la base para compilar lakehouses de formato abierto en Google Cloud. Las tablas de BigLake Iceberg en BigQuery ofrecen la misma experiencia completamente administrada que las tablas de BigQuery, pero almacenan datos en buckets de almacenamiento que pertenecen al cliente con Parquet para ser interoperables con los formatos de tablas abiertas de Apache Iceberg.
Para obtener información sobre cómo crear una tabla de Apache Iceberg, consulta Crea una tabla de Apache Iceberg.
Crea un conector de BigQuery Sink
Console
En la consola de Google Cloud , ve a la página Connect Clusters.
Haz clic en el clúster de Connect en el que deseas crear el conector.
Haz clic en Crear conector.
Para el nombre del conector, ingresa una cadena.
Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.
En Complemento del conector, selecciona Receptor de BigQuery.
En la sección Topics, especifica los temas de Kafka desde los que se leerá. Puedes especificar una lista de temas o una expresión regular para que coincida con los nombres de los temas.
Opción 1: Elige Seleccionar una lista de temas de Kafka. En la lista Temas de Kafka, selecciona uno o más temas. Haz clic en OK.
Opción 2: Elige Usar una regex del tema. En el campo Expresión regular del tema, ingresa una expresión regular.
Haz clic en Conjunto de datos y especifica un conjunto de datos de BigQuery. Puedes elegir un conjunto de datos existente o crear uno nuevo.
Opcional: En el cuadro Configurations, agrega propiedades de configuración o edita las propiedades predeterminadas. Para obtener más información, consulta Configura el conector.
Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.
Haz clic en Crear.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Ejecuta el comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEReemplaza lo siguiente:
CONNECTOR_ID: Es el ID o el nombre del conector. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
LOCATION: Es la ubicación en la que creas el conector. Debe ser la misma ubicación en la que creaste el clúster de Connect.
CONNECT_CLUSTER_ID: Es el ID del clúster de Connect en el que se crea el conector.
CONFIG_FILE: Es la ruta de acceso al archivo de configuración YAML del conector de BigQuery Sink.
A continuación, se muestra un ejemplo de un archivo de configuración para el conector BigQuery Sink:
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"Reemplaza lo siguiente:
BQ_SINK_CONNECTOR_ID: Es el ID o el nombre del conector de receptor de BigQuery. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
GCP_PROJECT_ID: Es el ID del proyecto Google Clouden el que reside tu conjunto de datos de BigQuery.
GMK_TOPIC_ID: Es el ID del tema de Managed Service for Apache Kafka desde el que fluyen los datos hacia el conector de receptor de BigQuery.
BQ_DATASET_ID: Es el ID del conjunto de datos de BigQuery que actúa como receptor de la canalización.
Terraform
Puedes usar un recurso de Terraform para crear un conector.
Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.
Go
Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.
Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Java
Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Python
Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.
Configura el conector
En esta sección, se describen algunas propiedades de configuración que puedes establecer en el conector. Para obtener una lista completa de las propiedades específicas de este conector, consulta Configuraciones del conector BigQuery Sink.
Nombre de la tabla
De forma predeterminada, el conector usa el nombre del tema como el nombre de la tabla de BigQuery. Para usar un nombre de tabla diferente, configura la propiedad topic2TableMap con el siguiente formato:
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
Creación de tablas
El conector de receptor de BigQuery puede crear las tablas de destino si no existen.
Si es
autoCreateTables=true, el conector intenta crear las tablas de BigQuery que no existen. Este es el comportamiento predeterminado.Si es
autoCreateTables=false, el conector no crea ninguna tabla. Si no existe una tabla de destino, se produce un error.
Cuando autoCreateTables es true, puedes usar las siguientes propiedades de configuración para tener un control más detallado sobre cómo el conector crea y configura tablas nuevas:
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
Para obtener información sobre estas propiedades, consulta Configuraciones del conector de BigQuery Sink.
Metadatos de Kafka
Puedes asignar datos adicionales de Kafka, como información de metadatos y de claves, a la tabla de BigQuery configurando los campos kafkaDataFieldName y kafkaKeyFieldName, respectivamente. Entre los ejemplos de información de metadatos, se incluyen el tema, la partición, el desplazamiento y la hora de inserción de Kafka.