Mit BigQuery-Senken-Connectors können Sie Daten aus Kafka in BigQuery streamen und so Daten in Echtzeit in BigQuery aufnehmen und analysieren. Ein BigQuery-Senken-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und schreibt die Daten in eine oder mehrere Tabellen in einem einzelnen BigQuery-Dataset.
Hinweise
Prüfen Sie vor dem Erstellen eines BigQuery-Senken-Connectors, ob Folgendes vorhanden ist:
Erstellen Sie einen Managed Service for Apache Kafka-Cluster für Ihren Connect-Cluster. Dieser Cluster ist der primäre Kafka-Cluster, der mit dem Connect-Cluster verknüpft ist. Dieser Cluster ist auch der Quellcluster, der ein Ende der BigQuery-Senken-Connector-Pipeline bildet.
Erstellen Sie einen Connect-Cluster, um Ihren BigQuery-Senken-Connector zu hosten.
Erstellen Sie ein BigQuery-Dataset zum Speichern der aus Kafka gestreamten Daten.
Erstellen und konfigurieren Sie ein Kafka-Thema im Quellcluster. Daten werden aus diesem Kafka-Thema in das BigQuery-Zieldataset übertragen.
Erforderliche Rollen und Berechtigungen
Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines BigQuery Sink-Connectors benötigen.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines BigQuery-Sink-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:
Erforderliche Berechtigungen
Die folgenden Berechtigungen sind zum Erstellen eines BigQuery-Senken-Connectors erforderlich:
-
Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster:
managedkafka.connectors.create
Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.
Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.
Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.
Berechtigungen zum Schreiben in die BigQuery-Tabelle gewähren
Das Dienstkonto des Connect-Clusters, das dem Format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com entspricht, benötigt die Berechtigung zum Schreiben in die BigQuery-Tabelle. Weisen Sie dazu dem Connect-Cluster-Dienstkonto im Projekt mit der BigQuery-Tabelle die Rolle BigQuery-Datenbearbeiter (roles/bigquery.dataEditor) zu.
Schemas für einen BigQuery-Senken-Connector
Der BigQuery-Senken-Connector verwendet den konfigurierten Wertkonverter (value.converter), um Kafka-Datensatzwerte in Felder zu parsen. Anschließend werden die Felder in Spalten mit demselben Namen in die BigQuery-Tabelle geschrieben.
Für den Connector ist ein Schema erforderlich. Das Schema kann auf folgende Weise bereitgestellt werden:
- Nachrichtenbasiertes Schema: Das Schema ist Teil jeder Nachricht.
- Tabellenbasiertes Schema: Der Connector leitet das Nachrichtenschema aus dem BigQuery-Tabellenschema ab.
- Schema-Registry: Der Connector liest das Schema aus einer Schema-Registry, z. B. der Schema-Registry für Managed Service for Apache Kafka (Vorabversion).
In den nächsten Abschnitten werden diese Optionen beschrieben.
Nachrichtenbasiertes Schema
In diesem Modus enthält jeder Kafka-Datensatz ein JSON-Schema. Der Connector verwendet das Schema, um die Datensatzdaten als BigQuery-Tabellenzeile zu schreiben.
Wenn Sie nachrichtenbasierte Schemas verwenden möchten, legen Sie die folgenden Eigenschaften für den Connector fest:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Beispiel für einen Kafka-Eintragswert:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
Wenn die Zieltabelle bereits vorhanden ist, muss das BigQuery-Tabellenschema mit dem Schema der eingebetteten Nachricht kompatibel sein. Wenn autoCreateTables=true, wird die Zieltabelle bei Bedarf automatisch vom Connector erstellt. Weitere Informationen finden Sie unter Tabellen erstellen.
Wenn der Connector das BigQuery-Tabellenschema aktualisieren soll, wenn sich die Nachrichtenschemas ändern, legen Sie allowNewBigQueryFields, allowSchemaUnionization oder allowBigQueryRequiredFieldRelaxation auf true fest.
Tabellenbasiertes Schema
In diesem Modus enthalten die Kafka-Datensätze einfache JSON-Daten ohne explizites Schema. Der Connector leitet das Schema aus der Zieltabelle ab.
Voraussetzungen:
- Die BigQuery-Tabelle muss bereits vorhanden sein.
- Die Kafka-Eintragsdaten müssen mit dem Tabellenschema kompatibel sein.
- Dieser Modus unterstützt keine dynamischen Schemaaktualisierungen basierend auf eingehenden Nachrichten.
Wenn Sie tabellenbasierte Schemas verwenden möchten, legen Sie die folgenden Attribute für den Connector fest:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
Wenn für die BigQuery-Tabelle zeitbasierte Partitionierung mit täglicher Partitionierung verwendet wird, kann bigQueryPartitionDecorator true sein. Andernfalls legen Sie false fest.
Beispiel für einen Kafka-Eintragswert:
{
"user": "userId",
"age": 30
}
Schema-Registry
In diesem Modus enthält jeder Kafka-Datensatz Apache Avro-Daten und das Nachrichtenschema wird in einer Schemaregistrierung gespeichert.
Wenn Sie den BigQuery-Senken-Connector mit einer Schemaregistrierung verwenden möchten, legen Sie die folgenden Eigenschaften für den Connector fest:
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
Ersetzen Sie SCHEMA_REGISTRY_URL durch die URL der Schemaregistry.
Wenn Sie den Connector mit der Schema-Registry von Managed Service for Apache Kafka verwenden möchten, legen Sie die folgende Eigenschaft fest:
value.converter.bearer.auth.credentials.source=GCP
Weitere Informationen finden Sie unter Kafka Connect mit Schemaregister verwenden.
BigLake-Tabellen für Apache Iceberg in BigQuery
Der BigQuery-Sink-Connector unterstützt BigLake-Tabellen für Apache Iceberg in BigQuery (im Folgenden BigLake Iceberg-Tabellen in BigQuery) als Sink-Ziel.
BigLake-Iceberg-Tabellen in BigQuery bilden die Grundlage für die Erstellung von Lakehouses im offenen Format auf Google Cloud. BigLake-Iceberg-Tabellen in BigQuery bieten dieselbe vollständig verwaltete Umgebung wie BigQuery-Tabellen, speichern Daten aber in kundeneigenen Speicher-Buckets mit Parquet, um mit offenen Tabellenformaten von Apache Iceberg kompatibel zu sein.
Informationen zum Erstellen einer Apache Iceberg-Tabelle finden Sie unter Apache Iceberg-Tabelle erstellen.
BigQuery-Senken-Connector erstellen
Console
Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.
Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.
Klicken Sie auf Connector erstellen.
Geben Sie für den Connectornamen einen String ein.
Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.
Wählen Sie für Connector-Plug-in die Option BigQuery-Senke aus.
Geben Sie im Bereich Themen die Kafka-Themen an, aus denen gelesen werden soll. Sie können eine Liste von Themen oder einen regulären Ausdruck angeben, der mit den Namen der Themen abgeglichen werden soll.
Option 1: Wählen Sie Liste mit Kafka-Themen auswählen aus. Wählen Sie in der Liste Kafka-Themen ein oder mehrere Themen aus. Klicken Sie auf OK.
Option 2: Wählen Sie Thema-Regex verwenden aus. Geben Sie im Feld Thema-Regex einen regulären Ausdruck ein.
Klicken Sie auf Dataset und geben Sie ein BigQuery-Dataset an. Sie können ein vorhandenes Dataset auswählen oder ein neues erstellen.
Optional: Fügen Sie im Feld Konfigurationen Konfigurationseigenschaften hinzu oder bearbeiten Sie die Standardeigenschaften. Weitere Informationen finden Sie unter Connector konfigurieren.
Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.
Klicken Sie auf Erstellen.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl
gcloud managed-kafka connectors createaus:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEErsetzen Sie Folgendes:
CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.
CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.
CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.
Hier ist ein Beispiel für eine Konfigurationsdatei für den BigQuery-Sink-Connector:
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"Ersetzen Sie Folgendes:
BQ_SINK_CONNECTOR_ID: Die ID oder der Name des BigQuery Sink-Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
GCP_PROJECT_ID: Die ID des Google Cloud-Projekts, in dem sich Ihr BigQuery-Dataset befindet.
GMK_TOPIC_ID: Die ID des Managed Service for Apache Kafka-Themas, aus dem die Daten in den BigQuery-Senkenconnector fließen.
BQ_DATASET_ID: Die ID des BigQuery-Datasets, das als Senke für die Pipeline dient.
Terraform
Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.
Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.
Go
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Java
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Python
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.
Connector konfigurieren
In diesem Abschnitt werden einige Konfigurationseigenschaften beschrieben, die Sie für den Connector festlegen können. Eine vollständige Liste der Eigenschaften, die für diesen Connector spezifisch sind, finden Sie unter BigQuery Sink Connector-Konfigurationen.
Tabellenname
Standardmäßig verwendet der Connector den Namen des Themas als Namen der BigQuery-Tabelle. Wenn Sie einen anderen Tabellennamen verwenden möchten, legen Sie die Eigenschaft topic2TableMap mit dem folgenden Format fest:
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
Tabellenerstellung
Der BigQuery-Senken-Connector kann die Zieltabelle erstellen, wenn sie nicht vorhanden ist.
Wenn
autoCreateTables=true, versucht der Connector, alle BigQuery-Tabellen zu erstellen, die nicht vorhanden sind. Das ist das Standardverhalten.Wenn
autoCreateTables=false, werden vom Connector keine Tabellen erstellt. Wenn eine Zieltabelle nicht vorhanden ist, tritt ein Fehler auf.
Wenn autoCreateTables true ist, können Sie die folgenden Konfigurationseigenschaften verwenden, um genauer zu steuern, wie der Connector neue Tabellen erstellt und konfiguriert:
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
Weitere Informationen zu diesen Attributen finden Sie unter BigQuery Sink Connector-Konfigurationen.
Kafka-Metadaten
Sie können zusätzliche Daten aus Kafka, z. B. Metadaten und Schlüsselinformationen, der BigQuery-Tabelle zuordnen, indem Sie die Felder kafkaDataFieldName und kafkaKeyFieldName entsprechend konfigurieren. Beispiele für Metadaten sind das Kafka-Thema, die Partition, der Offset und die Einfügezeit.