Crea un conector de BigQuery Sink

Los conectores receptores de BigQuery te permiten transmitir datos de Kafka a BigQuery, lo que habilita la transferencia y el análisis de datos en tiempo real dentro de BigQuery. Un conector receptor de BigQuery consume registros de uno o más temas de Kafka y escribe los datos en una o más tablas dentro de un solo conjunto de datos de BigQuery.

Antes de comenzar

Antes de crear un conector de receptor de BigQuery, asegúrate de tener lo siguiente:

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear un conector de BigQuery Sink, pídele a tu administrador que te otorgue el rol de IAM de Editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear un conector de BigQuery Sink. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un conector de BigQuery Sink:

  • Otorga el permiso para crear un conector en el clúster de Connect principal: managedkafka.connectors.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de Editor de Kafka Connector administrado, consulta Roles predefinidos de Managed Service for Apache Kafka.

Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster se encuentra en otro proyecto, consulta Crea un clúster de Connect en otro proyecto.

Otorga permisos para escribir en la tabla de BigQuery

La cuenta de servicio del clúster de Connect, que sigue el formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requiere permiso para escribir en la tabla de BigQuery. Para ello, otorga el rol de Editor de datos de BigQuery (roles/bigquery.dataEditor) a la cuenta de servicio del clúster de Connect en el proyecto que contiene la tabla de BigQuery.

Esquemas para un conector de receptor de BigQuery

El conector BigQuery Sink usa el convertidor de valores configurado (value.converter) para analizar los valores de los registros de Kafka en campos. Luego, escribe los campos en columnas con el mismo nombre en la tabla de BigQuery.

El conector requiere un esquema para funcionar. El esquema se puede proporcionar de las siguientes maneras:

En las siguientes secciones, se describen estas opciones.

Esquema basado en mensajes

En este modo, cada registro de Kafka incluye un esquema JSON. El conector usa el esquema para escribir los datos del registro como una fila de la tabla de BigQuery.

Para usar esquemas basados en mensajes, establece las siguientes propiedades en el conector:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Ejemplo de valor de registro de Kafka:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

Si la tabla de destino ya existe, el esquema de la tabla de BigQuery debe ser compatible con el esquema del mensaje incorporado. Si es autoCreateTables=true, el conector crea automáticamente la tabla de destino si es necesario. Para obtener más información, consulta Creación de tablas.

Si deseas que el conector actualice el esquema de la tabla de BigQuery a medida que cambian los esquemas de mensajes, establece allowNewBigQueryFields, allowSchemaUnionization o allowBigQueryRequiredFieldRelaxation en true.

Esquema basado en tablas

En este modo, los registros de Kafka contienen datos JSON sin un esquema explícito. El conector infiere el esquema de la tabla de destino.

Requisitos:

  • La tabla de BigQuery ya debe existir.
  • Los datos del registro de Kafka deben ser compatibles con el esquema de la tabla.
  • Este modo no admite actualizaciones dinámicas del esquema basadas en los mensajes entrantes.

Para usar esquemas basados en tablas, establece las siguientes propiedades en el conector:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

Si la tabla de BigQuery usa el particionamiento basado en el tiempo con particionamiento diario, bigQueryPartitionDecorator puede ser true. De lo contrario, establece esta propiedad en false.

Ejemplo de valor de registro de Kafka:

{
  "user": "userId",
  "age": 30
}

Registro de esquemas

En este modo, cada registro de Kafka contiene datos de Apache Avro, y el esquema del mensaje se almacena en un registro de esquemas.

Para usar el conector de receptor de BigQuery con un registro de esquemas, establece las siguientes propiedades en el conector:

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

Reemplaza SCHEMA_REGISTRY_URL por la URL del registro de esquemas.

Para usar el conector con el registro de esquemas de Managed Service for Apache Kafka, establece la siguiente propiedad:

  • value.converter.bearer.auth.credentials.source=GCP

Para obtener más información, consulta Usa Kafka Connect con el registro de esquemas.

Tablas de BigLake para Apache Iceberg en BigQuery

El conector de receptor de BigQuery admite tablas de BigLake para Apache Iceberg en BigQuery (en adelante, tablas de BigLake Iceberg en BigQuery) como destino del receptor.

Las tablas de BigLake Iceberg en BigQuery proporcionan la base para compilar lakehouses de formato abierto en Google Cloud. Las tablas de BigLake Iceberg en BigQuery ofrecen la misma experiencia completamente administrada que las tablas de BigQuery, pero almacenan datos en buckets de almacenamiento que pertenecen al cliente con Parquet para ser interoperables con los formatos de tablas abiertas de Apache Iceberg.

Para obtener información sobre cómo crear una tabla de Apache Iceberg, consulta Crea una tabla de Apache Iceberg.

Crea un conector de BigQuery Sink

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect en el que deseas crear el conector.

  3. Haz clic en Crear conector.

  4. Para el nombre del conector, ingresa una cadena.

    Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.

  5. En Complemento del conector, selecciona Receptor de BigQuery.

  6. En la sección Topics, especifica los temas de Kafka desde los que se leerá. Puedes especificar una lista de temas o una expresión regular para que coincida con los nombres de los temas.

    • Opción 1: Elige Seleccionar una lista de temas de Kafka. En la lista Temas de Kafka, selecciona uno o más temas. Haz clic en OK.

    • Opción 2: Elige Usar una regex del tema. En el campo Expresión regular del tema, ingresa una expresión regular.

  7. Haz clic en Conjunto de datos y especifica un conjunto de datos de BigQuery. Puedes elegir un conjunto de datos existente o crear uno nuevo.

  8. Opcional: En el cuadro Configurations, agrega propiedades de configuración o edita las propiedades predeterminadas. Para obtener más información, consulta Configura el conector.

  9. Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.

  10. Haz clic en Crear.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Reemplaza lo siguiente:

    A continuación, se muestra un ejemplo de un archivo de configuración para el conector BigQuery Sink:

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    Reemplaza lo siguiente:

    • BQ_SINK_CONNECTOR_ID: Es el ID o el nombre del conector de receptor de BigQuery. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.

    • GCP_PROJECT_ID: Es el ID del proyecto Google Clouden el que reside tu conjunto de datos de BigQuery.

    • GMK_TOPIC_ID: Es el ID del tema de Managed Service for Apache Kafka desde el que fluyen los datos hacia el conector de receptor de BigQuery.

    • BQ_DATASET_ID: Es el ID del conjunto de datos de BigQuery que actúa como receptor de la canalización.

  3. Terraform

    Puedes usar un recurso de Terraform para crear un conector.

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.

Configura el conector

En esta sección, se describen algunas propiedades de configuración que puedes establecer en el conector. Para obtener una lista completa de las propiedades específicas de este conector, consulta Configuraciones del conector BigQuery Sink.

Nombre de la tabla

De forma predeterminada, el conector usa el nombre del tema como el nombre de la tabla de BigQuery. Para usar un nombre de tabla diferente, configura la propiedad topic2TableMap con el siguiente formato:

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

Creación de tablas

El conector de receptor de BigQuery puede crear las tablas de destino si no existen.

  • Si es autoCreateTables=true, el conector intenta crear las tablas de BigQuery que no existen. Este es el comportamiento predeterminado.

  • Si es autoCreateTables=false, el conector no crea ninguna tabla. Si no existe una tabla de destino, se produce un error.

Cuando autoCreateTables es true, puedes usar las siguientes propiedades de configuración para tener un control más detallado sobre cómo el conector crea y configura tablas nuevas:

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

Para obtener información sobre estas propiedades, consulta Configuraciones del conector de BigQuery Sink.

Metadatos de Kafka

Puedes asignar datos adicionales de Kafka, como información de metadatos y de claves, a la tabla de BigQuery configurando los campos kafkaDataFieldName y kafkaKeyFieldName, respectivamente. Entre los ejemplos de información de metadatos, se incluyen el tema, la partición, el desplazamiento y la hora de inserción de Kafka.

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.