BigQuery-Sink-Connector erstellen

Mit BigQuery-Senken-Connectors können Sie Daten aus Kafka in BigQuery streamen und so Daten in Echtzeit in BigQuery aufnehmen und analysieren. Ein BigQuery-Senken-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und schreibt die Daten in eine oder mehrere Tabellen in einem einzelnen BigQuery-Dataset.

Hinweise

Prüfen Sie vor dem Erstellen eines BigQuery-Senken-Connectors, ob Folgendes vorhanden ist:

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines BigQuery Sink-Connectors benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines BigQuery-Sink-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines BigQuery-Senken-Connectors erforderlich:

  • Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster: managedkafka.connectors.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.

Berechtigungen zum Schreiben in die BigQuery-Tabelle gewähren

Das Dienstkonto des Connect-Clusters, das dem Format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com entspricht, benötigt die Berechtigung zum Schreiben in die BigQuery-Tabelle. Weisen Sie dazu dem Connect-Cluster-Dienstkonto im Projekt mit der BigQuery-Tabelle die Rolle BigQuery-Datenbearbeiter (roles/bigquery.dataEditor) zu.

Schemas für einen BigQuery-Senken-Connector

Der BigQuery-Senken-Connector verwendet den konfigurierten Wertkonverter (value.converter), um Kafka-Datensatzwerte in Felder zu parsen. Anschließend werden die Felder in Spalten mit demselben Namen in die BigQuery-Tabelle geschrieben.

Für den Connector ist ein Schema erforderlich. Das Schema kann auf folgende Weise bereitgestellt werden:

  • Nachrichtenbasiertes Schema: Das Schema ist Teil jeder Nachricht.
  • Tabellenbasiertes Schema: Der Connector leitet das Nachrichtenschema aus dem BigQuery-Tabellenschema ab.
  • Schema-Registry: Der Connector liest das Schema aus einer Schema-Registry, z. B. der Schema-Registry für Managed Service for Apache Kafka (Vorabversion).

In den nächsten Abschnitten werden diese Optionen beschrieben.

Nachrichtenbasiertes Schema

In diesem Modus enthält jeder Kafka-Datensatz ein JSON-Schema. Der Connector verwendet das Schema, um die Datensatzdaten als BigQuery-Tabellenzeile zu schreiben.

Wenn Sie nachrichtenbasierte Schemas verwenden möchten, legen Sie die folgenden Eigenschaften für den Connector fest:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=true

Beispiel für einen Kafka-Eintragswert:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "int64",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

Wenn die Zieltabelle bereits vorhanden ist, muss das BigQuery-Tabellenschema mit dem Schema der eingebetteten Nachricht kompatibel sein. Wenn autoCreateTables=true, wird die Zieltabelle bei Bedarf automatisch vom Connector erstellt. Weitere Informationen finden Sie unter Tabellen erstellen.

Wenn der Connector das BigQuery-Tabellenschema aktualisieren soll, wenn sich die Nachrichtenschemas ändern, legen Sie allowNewBigQueryFields, allowSchemaUnionization oder allowBigQueryRequiredFieldRelaxation auf true fest.

Tabellenbasiertes Schema

In diesem Modus enthalten die Kafka-Datensätze einfache JSON-Daten ohne explizites Schema. Der Connector leitet das Schema aus der Zieltabelle ab.

Voraussetzungen:

  • Die BigQuery-Tabelle muss bereits vorhanden sein.
  • Die Kafka-Eintragsdaten müssen mit dem Tabellenschema kompatibel sein.
  • Dieser Modus unterstützt keine dynamischen Schemaaktualisierungen basierend auf eingehenden Nachrichten.

Wenn Sie tabellenbasierte Schemas verwenden möchten, legen Sie die folgenden Attribute für den Connector fest:

  • value.converter=org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable=false
  • bigQueryPartitionDecorator=false

Wenn für die BigQuery-Tabelle zeitbasierte Partitionierung mit täglicher Partitionierung verwendet wird, kann bigQueryPartitionDecorator true sein. Andernfalls legen Sie false fest.

Beispiel für einen Kafka-Eintragswert:

{
  "user": "userId",
  "age": 30
}

Schema-Registry

In diesem Modus enthält jeder Kafka-Datensatz Apache Avro-Daten und das Nachrichtenschema wird in einer Schemaregistrierung gespeichert.

Wenn Sie den BigQuery-Senken-Connector mit einer Schemaregistrierung verwenden möchten, legen Sie die folgenden Eigenschaften für den Connector fest:

  • value.converter=io.confluent.connect.avro.AvroConverter
  • value.converter.schema.registry.url=SCHEMA_REGISTRY_URL

Ersetzen Sie SCHEMA_REGISTRY_URL durch die URL der Schemaregistry.

Wenn Sie den Connector mit der Schema-Registry von Managed Service for Apache Kafka verwenden möchten, legen Sie die folgende Eigenschaft fest:

  • value.converter.bearer.auth.credentials.source=GCP

Weitere Informationen finden Sie unter Kafka Connect mit Schemaregister verwenden.

BigLake-Tabellen für Apache Iceberg in BigQuery

Der BigQuery-Sink-Connector unterstützt BigLake-Tabellen für Apache Iceberg in BigQuery (im Folgenden BigLake Iceberg-Tabellen in BigQuery) als Sink-Ziel.

BigLake-Iceberg-Tabellen in BigQuery bilden die Grundlage für die Erstellung von Lakehouses im offenen Format auf Google Cloud. BigLake-Iceberg-Tabellen in BigQuery bieten dieselbe vollständig verwaltete Umgebung wie BigQuery-Tabellen, speichern Daten aber in kundeneigenen Speicher-Buckets mit Parquet, um mit offenen Tabellenformaten von Apache Iceberg kompatibel zu sein.

Informationen zum Erstellen einer Apache Iceberg-Tabelle finden Sie unter Apache Iceberg-Tabelle erstellen.

BigQuery-Senken-Connector erstellen

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.

  3. Klicken Sie auf Connector erstellen.

  4. Geben Sie für den Connectornamen einen String ein.

    Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.

  5. Wählen Sie für Connector-Plug-in die Option BigQuery-Senke aus.

  6. Geben Sie im Bereich Themen die Kafka-Themen an, aus denen gelesen werden soll. Sie können eine Liste von Themen oder einen regulären Ausdruck angeben, der mit den Namen der Themen abgeglichen werden soll.

    • Option 1: Wählen Sie Liste mit Kafka-Themen auswählen aus. Wählen Sie in der Liste Kafka-Themen ein oder mehrere Themen aus. Klicken Sie auf OK.

    • Option 2: Wählen Sie Thema-Regex verwenden aus. Geben Sie im Feld Thema-Regex einen regulären Ausdruck ein.

  7. Klicken Sie auf Dataset und geben Sie ein BigQuery-Dataset an. Sie können ein vorhandenes Dataset auswählen oder ein neues erstellen.

  8. Optional: Fügen Sie im Feld Konfigurationen Konfigurationseigenschaften hinzu oder bearbeiten Sie die Standardeigenschaften. Weitere Informationen finden Sie unter Connector konfigurieren.

  9. Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.

  10. Klicken Sie auf Erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka connectors create aus:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.

    • CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.

    • CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.

    Hier ist ein Beispiel für eine Konfigurationsdatei für den BigQuery-Sink-Connector:

    name: "BQ_SINK_CONNECTOR_ID"
    project: "GCP_PROJECT_ID"
    topics: "GMK_TOPIC_ID"
    tasks.max: 3
    connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    defaultDataset: "BQ_DATASET_ID"
    

    Ersetzen Sie Folgendes:

    • BQ_SINK_CONNECTOR_ID: Die ID oder der Name des BigQuery Sink-Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • GCP_PROJECT_ID: Die ID des Google Cloud-Projekts, in dem sich Ihr BigQuery-Dataset befindet.

    • GMK_TOPIC_ID: Die ID des Managed Service for Apache Kafka-Themas, aus dem die Daten in den BigQuery-Senkenconnector fließen.

    • BQ_DATASET_ID: Die ID des BigQuery-Datasets, das als Senke für die Pipeline dient.

  3. Terraform

    Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.

    resource "google_managed_kafka_connector" "example-bigquery-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-bigquery-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "name"                           = "my-bigquery-sink-connector"
        "project"                        = data.google_project.default.project_id
        "topics"                         = "GMK_TOPIC_ID"
        "tasks.max"                      = "3"
        "connector.class"                = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "defaultDataset"                 = "BQ_DATASET_ID"
      }
    
      provider = google-beta
    }

    Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createBigQuerySinkConnector creates a BigQuery Sink connector.
    func createBigQuerySinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, tasksMax, keyConverter, valueConverter, valueConverterSchemasEnable, defaultDataset string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "BQ_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// tasksMax := "3"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// defaultDataset := "BQ_DATASET_ID"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// BigQuery Sink sample connector configuration
    	config := map[string]string{
    		"name":                           connectorID,
    		"project":                        projectID,
    		"topics":                         topics,
    		"tasks.max":                      tasksMax,
    		"connector.class":                "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    		"key.converter":                  keyConverter,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"defaultDataset":                 defaultDataset,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created BigQuery sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateBigQuerySinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-bigquery-sink-connector";
        String bigqueryProjectId = "my-bigquery-project-id";
        String datasetName = "my-dataset";
        String kafkaTopicName = "kafka-topic";
        String maxTasks = "3";
        String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        createBigQuerySinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bigqueryProjectId,
            datasetName,
            kafkaTopicName,
            maxTasks,
            connectorClass,
            keyConverter,
            valueConverter,
            valueSchemasEnable);
      }
    
      public static void createBigQuerySinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bigqueryProjectId,
          String datasetName,
          String kafkaTopicName,
          String maxTasks,
          String connectorClass,
          String keyConverter,
          String valueConverter,
          String valueSchemasEnable)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("name", connectorId);
        configMap.put("project", bigqueryProjectId);
        configMap.put("topics", kafkaTopicName);
        configMap.put("tasks.max", maxTasks);
        configMap.put("connector.class", connectorClass);
        configMap.put("key.converter", keyConverter);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("defaultDataset", datasetName);
    
        Connector connector =
            Connector.newBuilder()
                .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
                .putAllConfigs(configMap)
                .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request =
              CreateConnectorRequest.newBuilder()
                  .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
                  .setConnectorId(connectorId)
                  .setConnector(connector)
                  .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "name": connector_id,
        "project": project_id,
        "topics": topics,
        "tasks.max": tasks_max,
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": key_converter,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "defaultDataset": default_dataset,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.

Connector konfigurieren

In diesem Abschnitt werden einige Konfigurationseigenschaften beschrieben, die Sie für den Connector festlegen können. Eine vollständige Liste der Eigenschaften, die für diesen Connector spezifisch sind, finden Sie unter BigQuery Sink Connector-Konfigurationen.

Tabellenname

Standardmäßig verwendet der Connector den Namen des Themas als Namen der BigQuery-Tabelle. Wenn Sie einen anderen Tabellennamen verwenden möchten, legen Sie die Eigenschaft topic2TableMap mit dem folgenden Format fest:

topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...

Tabellenerstellung

Der BigQuery-Senken-Connector kann die Zieltabelle erstellen, wenn sie nicht vorhanden ist.

  • Wenn autoCreateTables=true, versucht der Connector, alle BigQuery-Tabellen zu erstellen, die nicht vorhanden sind. Das ist das Standardverhalten.

  • Wenn autoCreateTables=false, werden vom Connector keine Tabellen erstellt. Wenn eine Zieltabelle nicht vorhanden ist, tritt ein Fehler auf.

Wenn autoCreateTables true ist, können Sie die folgenden Konfigurationseigenschaften verwenden, um genauer zu steuern, wie der Connector neue Tabellen erstellt und konfiguriert:

  • allBQFieldsNullable
  • clusteringPartitionFieldNames
  • convertDoubleSpecialValues
  • partitionExpirationMs
  • sanitizeFieldNames
  • sanitizeTopics
  • timestampPartitionFieldName

Weitere Informationen zu diesen Attributen finden Sie unter BigQuery Sink Connector-Konfigurationen.

Kafka-Metadaten

Sie können zusätzliche Daten aus Kafka, z. B. Metadaten und Schlüsselinformationen, der BigQuery-Tabelle zuordnen, indem Sie die Felder kafkaDataFieldName und kafkaKeyFieldName entsprechend konfigurieren. Beispiele für Metadaten sind das Kafka-Thema, die Partition, der Offset und die Einfügezeit.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.