Crea un connettore BigQuery Sink

I connettori di sink BigQuery ti consentono di trasmettere i dati in streaming da Kafka a BigQuery, consentendo l'importazione e l'analisi dei dati in tempo reale all'interno di BigQuery. Un connettore di sink BigQuery utilizza i record di uno o più argomenti Kafka e scrive i dati in una o più tabelle all'interno di un singolo set di dati BigQuery.

Prima di iniziare

Prima di creare un connettore di sink BigQuery, assicurati di avere quanto segue:

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un connettore BigQuery Sink, chiedi all'amministratore di concederti il ruolo IAM Editor connettore Kafka gestito (roles/managedkafka.connectorEditor) sul tuo progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore BigQuery Sink. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un connettore di sink BigQuery sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale: managedkafka.connectors.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per saperne di più sul ruolo Editor connettore Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.

Se il cluster Managed Service per Apache Kafka si trova nello stesso progetto del cluster di connessione, non sono necessarie ulteriori autorizzazioni. Se il cluster si trova in un progetto diverso, consulta Crea un cluster Connect in un altro progetto.

Concedi le autorizzazioni per scrivere nella tabella BigQuery

Il account di servizio del cluster Connect, che segue il formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, richiede l'autorizzazione per scrivere nella tabella BigQuery. A questo scopo, concedi il ruolo Editor dati BigQuery (roles/bigquery.dataEditor) all'account di servizio del cluster Connect nel progetto contenente la tabella BigQuery.

Schemi per un connettore di sink BigQuery

Il connettore di sink BigQuery utilizza il convertitore di valori configurato (value.converter) per analizzare i valori dei record Kafka in campi. Quindi scrive i campi nelle colonne con lo stesso nome nella tabella BigQuery.

Per funzionare, il connettore richiede uno schema. Lo schema può essere fornito nei seguenti modi:

  • Schema basato sui messaggi: lo schema è incluso in ogni messaggio.
  • Schema basato su tabella: il connettore deduce lo schema del messaggio dallo schema della tabella BigQuery.
  • Registro di schema: il connettore legge lo schema da un registro di schema, ad esempio il registro di schema Managed Service per Apache Kafka (anteprima).

Le sezioni successive descrivono queste opzioni.

Schema basato sui messaggi

In questa modalità, ogni record Kafka include uno schema JSON. Il connettore utilizza lo schema per scrivere i dati del record come riga di una tabella BigQuery.

Per utilizzare gli schemi basati sui messaggi, imposta le seguenti proprietà sul connettore:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Valore di esempio del record Kafka:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

Se la tabella di destinazione esiste già, lo schema della tabella BigQuery deve essere compatibile con lo schema del messaggio incorporato. Se autoCreateTables=true, il connettore crea automaticamente la tabella di destinazione se necessario. Per saperne di più, vedi Creazione di tabelle.

Se vuoi che il connettore aggiorni lo schema della tabella BigQuery man mano che cambiano gli schemi dei messaggi, imposta allowNewBigQueryFields, allowSchemaUnionization o allowBigQueryRequiredFieldRelaxation su true.

Schema basato su tabella

In questa modalità, i record Kafka contengono dati JSON semplici senza uno schema esplicito. Il connettore deduce lo schema dalla tabella di destinazione.

Requisiti:

  • La tabella BigQuery deve già esistere.
  • I dati dei record Kafka devono essere compatibili con lo schema della tabella.
  • Questa modalità non supporta gli aggiornamenti dinamici dello schema in base ai messaggi in arrivo.

Per utilizzare gli schemi basati su tabelle, imposta le seguenti proprietà sul connettore:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

Se la tabella BigQuery utilizza il partizionamento basato sul tempo con il partizionamento giornaliero, bigQueryPartitionDecorator può essere true. In caso contrario, imposta questa proprietà su false.

Valore di esempio del record Kafka:

{
  "user": "userId",
  "age": 30
}

Registro di schema

In questa modalità, ogni record Kafka contiene dati Apache Avro e lo schema del messaggio è archiviato in un registro degli schemi.

Per utilizzare il connettore di sink BigQuery con un registro di schema, imposta le seguenti proprietà sul connettore:

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

Sostituisci SCHEMA_REGISTRY_URL con l'URL del registro dello schema.

Per utilizzare il connettore con il registro di schema Managed Service per Apache Kafka, imposta la seguente proprietà:

  • value.converter.bearer.auth.credentials.source=GCP

Per saperne di più, consulta Utilizzare Kafka Connect con il registro degli schemi.

Tabelle BigLake per Apache Iceberg in BigQuery

Il connettore BigQuery Sink supporta le tabelle BigLake per Apache Iceberg in BigQuery (di seguito, tabelle BigLake Iceberg in BigQuery) come destinazione sink.

Le tabelle BigLake Iceberg in BigQuery forniscono le basi per la creazione di lakehouse in formato aperto su Google Cloud. Le tabelle BigLake Iceberg in BigQuery offrono la stessa esperienza completamente gestita delle tabelle BigQuery, ma archiviano i dati in bucket di archiviazione di proprietà del cliente utilizzando Parquet per essere interoperabili con i formati di tabella aperti Apache Iceberg.

Per informazioni su come creare una tabella Apache Iceberg, vedi Crea una tabella Apache Iceberg.

Crea un connettore di sink BigQuery

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster di connessione in cui vuoi creare il connettore.

  3. Fai clic su Crea connettore.

  4. Per il nome del connettore, inserisci una stringa.

    Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.

  5. Per Plug-in connettore, seleziona Sink BigQuery.

  6. Nella sezione Argomenti, specifica gli argomenti Kafka da cui leggere. Puoi specificare un elenco di argomenti o un'espressione regolare da confrontare con i nomi degli argomenti.

    • Opzione 1: scegli Seleziona un elenco di argomenti Kafka. Nell'elenco Argomenti Kafka, seleziona uno o più argomenti. Fai clic su OK.

    • Opzione 2: scegli Utilizza un'espressione regolare per gli argomenti. Nel campo Espressione regolare argomento, inserisci un'espressione regolare.

  7. Fai clic su Set di dati e specifica un set di dati BigQuery. Puoi scegliere un set di dati esistente o crearne uno nuovo.

  8. (Facoltativo) Nella casella Configurazioni, aggiungi le proprietà di configurazione o modifica le proprietà predefinite. Per saperne di più, consulta Configura il connettore.

  9. Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.

  10. Fai clic su Crea.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Sostituisci quanto segue:

    • CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.

    • CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.

    • CONFIG_FILE: il percorso del file di configurazione YAML per il connettore BigQuery Sink.

    Ecco un esempio di file di configurazione per il connettore BigQuery Sink:

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    Sostituisci quanto segue:

    • BQ_SINK_CONNECTOR_ID: l'ID o il nome del connettore BigQuery Sink. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • GCP_PROJECT_ID: l'ID del progetto Google Cloud in cui si trova il set di dati BigQuery.

    • GMK_TOPIC_ID: l'ID dell'argomento del servizio gestito per Apache Kafka da cui i dati vengono trasmessi al connettore di sink BigQuery.

    • BQ_DATASET_ID: l'ID del set di dati BigQuery che funge da sink per la pipeline.

  3. Terraform

    Puoi utilizzare una risorsa Terraform per creare un connettore.

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

    Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Dopo aver creato un connettore, puoi modificarlo, eliminarlo, metterlo in pausa, arrestarlo o riavviarlo.

Configura il connettore

Questa sezione descrive alcune proprietà di configurazione che puoi impostare sul connettore. Per un elenco completo delle proprietà specifiche di questo connettore, consulta Configurazioni del connettore di sink BigQuery.

Nome tabella

Per impostazione predefinita, il connettore utilizza il nome dell'argomento come nome della tabella BigQuery. Per utilizzare un nome di tabella diverso, imposta la proprietà topic2TableMap con il seguente formato:

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

Creazione della tabella

Il connettore di sink BigQuery può creare le tabelle di destinazione se non esistono.

  • Se autoCreateTables=true, il connettore tenta di creare le tabelle BigQuery che non esistono. Questa è l'impostazione predefinita.

  • Se autoCreateTables=false, il connettore non crea tabelle. Se una tabella di destinazione non esiste, si verifica un errore.

Quando autoCreateTables è true, puoi utilizzare le seguenti proprietà di configurazione per un controllo più granulare su come il connettore crea e configura le nuove tabelle:

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

Per informazioni su queste proprietà, vedi Configurazioni del connettore BigQuery Sink.

Metadati Kafka

Puoi mappare ulteriori dati da Kafka, come informazioni sui metadati e informazioni sulle chiavi, nella tabella BigQuery configurando i campi kafkaDataFieldName e kafkaKeyFieldName rispettivamente. Esempi di informazioni sui metadati includono l'argomento Kafka, la partizione, l'offset e l'ora di inserimento.

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.