Créer un connecteur BigQuery Sink

Les connecteurs de récepteur BigQuery vous permettent de diffuser des données depuis Kafka vers BigQuery, ce qui permet d'ingérer et d'analyser des données en temps réel dans BigQuery. Un connecteur de récepteur BigQuery consomme des enregistrements provenant d'un ou de plusieurs sujets Kafka, et écrit les données dans une ou plusieurs tables d'un même ensemble de données BigQuery.

Avant de commencer

Avant de créer un connecteur de récepteur BigQuery, assurez-vous de disposer des éléments suivants :

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour créer un connecteur BigQuery Sink, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Kafka géré (roles/managedkafka.connectorEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un connecteur BigQuery Sink. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un connecteur BigQuery Sink :

  • Accordez l'autorisation de créer un connecteur sur le cluster Connect parent : managedkafka.connectors.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle Éditeur de connecteurs Kafka gérés, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Si votre cluster Managed Service pour Apache Kafka se trouve dans le même projet que le cluster Connect, aucune autre autorisation n'est requise. Si le cluster se trouve dans un autre projet, consultez Créer un cluster Connect dans un autre projet.

Accorder des autorisations pour écrire dans la table BigQuery

Le compte de service du cluster Connect, qui suit le format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, doit être autorisé à écrire dans la table BigQuery. Pour ce faire, attribuez le rôle Éditeur de données BigQuery (roles/bigquery.dataEditor) au compte de service du cluster Connect dans le projet contenant la table BigQuery.

Schémas pour un connecteur de récepteur BigQuery

Le connecteur BigQuery Sink utilise le convertisseur de valeurs configuré (value.converter) pour analyser les valeurs d'enregistrement Kafka en champs. Il écrit ensuite les champs dans les colonnes du même nom dans la table BigQuery.

Le connecteur nécessite un schéma pour fonctionner. Le schéma peut être fourni de différentes manières :

  • Schéma basé sur les messages : le schéma est inclus dans chaque message.
  • Schéma basé sur une table : le connecteur déduit le schéma du message à partir du schéma de la table BigQuery.
  • Registre de schémas : le connecteur lit le schéma à partir d'un registre de schémas, tel que le registre de schémas Managed Service pour Apache Kafka (aperçu).

Les sections suivantes décrivent ces options.

Schéma basé sur les messages

Dans ce mode, chaque enregistrement Kafka inclut un schéma JSON. Le connecteur utilise le schéma pour écrire les données d'enregistrement sous forme de ligne de table BigQuery.

Pour utiliser des schémas basés sur des messages, définissez les propriétés suivantes sur le connecteur :

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Exemple de valeur d'enregistrement Kafka :

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

Si la table de destination existe déjà, le schéma de la table BigQuery doit être compatible avec le schéma du message intégré. Si la valeur est autoCreateTables=true, le connecteur crée automatiquement la table de destination si nécessaire. Pour en savoir plus, consultez Créer des tables.

Si vous souhaitez que le connecteur mette à jour le schéma de table BigQuery à mesure que les schémas de message changent, définissez allowNewBigQueryFields, allowSchemaUnionization ou allowBigQueryRequiredFieldRelaxation sur true.

Schéma basé sur des tableaux

Dans ce mode, les enregistrements Kafka contiennent des données JSON brutes sans schéma explicite. Le connecteur déduit le schéma de la table de destination.

Conditions requises :

  • La table BigQuery doit déjà exister.
  • Les données d'enregistrement Kafka doivent être compatibles avec le schéma de la table.
  • Ce mode n'est pas compatible avec les mises à jour dynamiques du schéma en fonction des messages entrants.

Pour utiliser des schémas basés sur des tableaux, définissez les propriétés suivantes sur le connecteur :

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

Si la table BigQuery utilise le partitionnement temporel avec un partitionnement quotidien, bigQueryPartitionDecorator peut être true. Sinon, définissez cette propriété sur false.

Exemple de valeur d'enregistrement Kafka :

{
  "user": "userId",
  "age": 30
}

Registre de schémas

Dans ce mode, chaque enregistrement Kafka contient des données Apache Avro, et le schéma du message est stocké dans un registre de schémas.

Pour utiliser le connecteur BigQuery Sink avec un registre de schémas, définissez les propriétés suivantes sur le connecteur :

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

Remplacez SCHEMA_REGISTRY_URL par l'URL du registre de schémas.

Pour utiliser le connecteur avec le registre de schémas Managed Service pour Apache Kafka, définissez la propriété suivante :

  • value.converter.bearer.auth.credentials.source=GCP

Pour en savoir plus, consultez Utiliser Kafka Connect avec le registre de schémas.

Tables BigLake pour Apache Iceberg dans BigQuery

Le connecteur BigQuery Sink est compatible avec les tables BigLake pour Apache Iceberg dans BigQuery (ci-après, tables BigLake Iceberg dans BigQuery) en tant que cible de récepteur.

Les tables BigLake Iceberg dans BigQuery constituent la base de la création de lakehouses au format ouvert sur Google Cloud. Les tables BigLake Iceberg dans BigQuery offrent la même expérience entièrement gérée que les tables BigQuery, mais stockent les données dans des buckets de stockage détenus par le client, en passant par Parquet afin d'assurer l'interopérabilité avec les formats de table ouverts Apache Iceberg.

Pour savoir comment créer une table Apache Iceberg, consultez Créer une table Apache Iceberg.

Créer un connecteur de récepteur BigQuery

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect dans lequel vous souhaitez créer le connecteur.

  3. Cliquez sur Créer un connecteur.

  4. Saisissez une chaîne pour le nom du connecteur.

    Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  5. Pour Plug-in de connecteur, sélectionnez Récepteur BigQuery.

  6. Dans la section Sujets, spécifiez les sujets Kafka à lire. Vous pouvez spécifier une liste de thèmes ou une expression régulière à comparer aux noms de thèmes.

    • Option 1 : Sélectionnez Sélectionner une liste de sujets Kafka. Dans la liste Sujets Kafka, sélectionnez un ou plusieurs sujets. Cliquez sur OK.

    • Option 2 : Sélectionnez Utiliser une expression régulière de sujet. Dans le champ Expression régulière du thème, saisissez une expression régulière.

  7. Cliquez sur Ensemble de données et spécifiez un ensemble de données BigQuery. Vous pouvez choisir un ensemble de données existant ou en créer un.

  8. Facultatif : Dans la zone Configurations, ajoutez des propriétés de configuration ou modifiez les propriétés par défaut. Pour en savoir plus, consultez Configurer le connecteur.

  9. Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.

  10. Cliquez sur Créer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka connectors create :

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • LOCATION : emplacement où vous créez le connecteur. Il doit s'agir du même emplacement que celui où vous avez créé le cluster Connect.

    • CONNECT_CLUSTER_ID : ID du cluster Connect dans lequel le connecteur est créé.

    • CONFIG_FILE : chemin d'accès au fichier de configuration YAML pour le connecteur BigQuery Sink.

    Voici un exemple de fichier de configuration pour le connecteur BigQuery Sink :

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    Remplacez les éléments suivants :

    • BQ_SINK_CONNECTOR_ID : ID ou nom du connecteur BigQuery Sink. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • GCP_PROJECT_ID : ID du projet Google Clouddans lequel réside votre ensemble de données BigQuery.

    • GMK_TOPIC_ID : ID du sujet Managed Service pour Apache Kafka à partir duquel les données sont transférées vers le connecteur BigQuery Sink.

    • BQ_DATASET_ID : ID de l'ensemble de données BigQuery qui sert de récepteur pour le pipeline.

  3. Terraform

    Vous pouvez utiliser une ressource Terraform pour créer un connecteur.

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Une fois que vous avez créé un connecteur, vous pouvez le modifier, le supprimer, le suspendre, l'arrêter ou le redémarrer.

Configurer le connecteur

Cette section décrit certaines propriétés de configuration que vous pouvez définir sur le connecteur. Pour obtenir la liste complète des propriétés spécifiques à ce connecteur, consultez Configurations du connecteur BigQuery Sink.

Nom du tableau

Par défaut, le connecteur utilise le nom du sujet comme nom de table BigQuery. Pour utiliser un autre nom de table, définissez la propriété topic2TableMap au format suivant :

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

Création de tableaux

Le connecteur de récepteur BigQuery peut créer les tables de destination si elles n'existent pas.

  • Si la valeur est autoCreateTables=true, le connecteur tente de créer les tables BigQuery qui n'existent pas. Il s'agit du comportement par défaut.

  • Si la valeur est autoCreateTables=false, le connecteur ne crée aucune table. Si une table de destination n'existe pas, une erreur se produit.

Lorsque autoCreateTables est défini sur true, vous pouvez utiliser les propriétés de configuration suivantes pour contrôler plus précisément la façon dont le connecteur crée et configure les nouvelles tables :

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

Pour en savoir plus sur ces propriétés, consultez Configurations du connecteur BigQuery Sink.

Métadonnées Kafka

Vous pouvez mapper des données supplémentaires de Kafka, telles que des informations sur les métadonnées et les clés, dans la table BigQuery en configurant respectivement les champs kafkaDataFieldName et kafkaKeyFieldName. Les informations de métadonnées incluent, par exemple, le sujet, la partition et le décalage Kafka, ainsi que l'heure d'insertion.

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.