Auf dieser Seite wird beschrieben, wie Sie mit Dataflow Daten aus Google Cloud Managed Service for Apache Kafka lesen und die Datensätze in eine BigQuery-Tabelle schreiben. In dieser Anleitung wird die Vorlage „Apache Kafka für BigQuery“ verwendet, um den Dataflow-Job zu erstellen.
Übersicht
Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Kafka wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Sie können mit Dataflow Ereignisse aus Kafka lesen, verarbeiten und die Ergebnisse zur weiteren Analyse in eine BigQuery-Tabelle schreiben.
Managed Service for Apache Kafka ist ein Google Cloud-Dienst, mit dem Sie sichere und skalierbare Kafka-Cluster ausführen können.
Erforderliche Berechtigungen
Das Dataflow-Worker-Dienstkonto muss die folgenden IAM-Rollen (Identity and Access Management) haben:
- Managed Kafka Client (
roles/managedkafka.client) - BigQuery Datenmitbearbeiter (
roles/bigquery.dataEditor)
Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.
Kafka-Cluster erstellen
In diesem Schritt erstellen Sie einen Managed Service for Apache Kafka-Cluster. Weitere Informationen finden Sie unter Managed Service for Apache Kafka-Cluster erstellen.
Konsole
Rufen Sie die Seite Managed Service for Apache Kafka > Cluster auf.
Klicken Sie auf Erstellen.
Geben Sie im Feld Clustername einen Namen für den Cluster ein.
Wählen Sie in der Liste Region einen Standort für den Cluster aus.
Klicken Sie auf Erstellen.
gcloud
Führen Sie den managed-kafka clusters create-Befehl aus.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
Ersetzen Sie Folgendes:
CLUSTER: ein Name für den ClusterREGION: Die Region, in der Sie das Subnetz erstellt habenPROJECT_ID: Ihre Projekt-ID.SUBNET_NAME: Das Subnetz, in dem Sie den Cluster bereitstellen möchten.
Das Erstellen eines Clusters dauert in der Regel 20 bis 30 Minuten.
Kafka-Thema erstellen
Nachdem der Managed Service for Apache Kafka-Cluster erstellt wurde, erstellen Sie ein Thema.
Konsole
Rufen Sie die Seite Managed Service for Apache Kafka > Cluster auf.
Klicken Sie auf den Namen des Clusters.
Klicken Sie auf der Seite mit den Clusterdetails auf Thema erstellen.
Geben Sie im Feld Themenname einen Namen für das Thema ein.
Klicken Sie auf Erstellen.
gcloud
Führen Sie den managed-kafka topics create-Befehl aus.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Ersetzen Sie Folgendes:
TOPIC_NAME: Der Name des zu erstellenden Themas.
BigQuery-Tabelle erstellen
In diesem Schritt erstellen Sie eine BigQuery-Tabelle mit dem folgenden Schema:
| Spaltenname | Datentyp |
|---|---|
name |
STRING |
customer_id |
INTEGER |
Wenn Sie noch kein BigQuery-Dataset haben, erstellen Sie zuerst eines. Weitere Informationen finden Sie unter Datasets erstellen. Erstellen Sie dann eine neue leere Tabelle:
Console
Rufen Sie die Seite BigQuery auf.
Maximieren Sie im Bereich Explorer Ihr Projekt und wählen Sie dann ein Dataset aus.
Klicken Sie im Abschnitt Dataset-Informationen auf Tabelle erstellen.
Wählen Sie in der Liste Tabelle erstellen aus die Option Leere Tabelle aus.
Geben Sie im Feld Tabelle den Namen der Tabelle ein.
Klicken Sie im Abschnitt Schema auf Als Text bearbeiten.
Fügen Sie die folgende Schemadefinition ein:
name:STRING, customer_id:INTEGERKlicken Sie auf Tabelle erstellen.
gcloud
Führen Sie den Befehl bq mk aus.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Ersetzen Sie Folgendes:
PROJECT_ID: Ihre Projekt-IDDATASET_NAME: der Name des DatasetsTABLE_NAME: Der Name der Tabelle, die erstellt werden soll
Führen Sie den Dataflow-Job aus:
Führen Sie nach dem Erstellen des Kafka-Clusters und der BigQuery-Tabelle die Dataflow-Vorlage aus.
Console
Rufen Sie zuerst die Bootstrap-Serveradresse des Clusters ab:
Rufen Sie in der Google Cloud Console die Seite Cluster auf.
Klicken Sie auf den Clusternamen.
Klicken Sie auf den Tab Konfigurationen.
Kopieren Sie die Bootstrap-Serveradresse aus Bootstrap-URL.
Führen Sie als Nächstes die Vorlage aus, um den Dataflow-Job zu erstellen:
Rufen Sie die Seite Dataflow > Jobs auf.
Klicken Sie auf Job aus Vorlage erstellen.
Geben Sie im Feld Jobname
kafka-to-bqein.Wählen Sie für Regionaler Endpunkt die Region aus, in der sich Ihr Managed Service for Apache Kafka-Cluster befindet.
Wählen Sie die Vorlage "Kafka zu BigQuery" aus.
Geben Sie die folgenden Vorlagenparameter ein:
- Kafka-Bootstrap-Server: die Bootstrap-Serveradresse
- Quell-Kafka-Thema: Der Name des Themas, das gelesen werden soll.
- Modus der Kafka-Quellauthentifizierung:
APPLICATION_DEFAULT_CREDENTIALS - Kafka-Nachrichtenformat:
JSON - Strategie für Tabellennamen:
SINGLE_TABLE_NAME - BigQuery-Ausgabetabelle: Die BigQuery-Tabelle im folgenden Format:
PROJECT_ID:DATASET_NAME.TABLE_NAME
Klicken Sie unter Warteschlange für unzustellbare Nachrichten das Kästchen Schreibfehler in BigQuery an.
Geben Sie einen BigQuery-Tabellennamen für die Dead-Letter-Warteschlange ein, der so formatiert ist:
PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME.Erstellen Sie diese Tabelle nicht im Voraus. Die Pipeline erstellt sie.
Klicken Sie auf Job ausführen.
gcloud
Führen Sie den dataflow flex-template run-Befehl aus.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Ersetzen Sie die folgenden Variablen:
LOCATION: Die Region, in der sich Ihr Managed Service for Apache Kafka befindet.PROJECT_ID: der Name Ihres Google Cloud-ProjektsCLUSTER_ID: der Name des ClustersTOPIC: Der Name des Kafka-ThemasDATASET_NAME: der Name des DatasetsTABLE_NAME: der Name der TabelleERROR_TABLE_NAME: Ein BigQuery-Tabellenname für die Dead-Letter-Warteschlange
Erstellen Sie die Tabelle für die Warteschlange für unzustellbare Nachrichten nicht im Voraus. Die Pipeline erstellt sie.
Nachrichten an Kafka senden
Nachdem der Dataflow-Job gestartet wurde, können Sie Nachrichten an Kafka senden. Die Pipeline schreibt sie dann in BigQuery.
Erstellen Sie eine VM im selben Subnetz wie der Kafka-Cluster und installieren Sie die Kafka-Befehlszeilentools. Eine ausführliche Anleitung finden Sie unter Clientcomputer einrichten im Hilfeartikel Nachrichten mit der CLI veröffentlichen und nutzen.
Führen Sie den folgenden Befehl aus, um Nachrichten in das Kafka-Thema zu schreiben:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
Ersetzen Sie die folgenden Variablen:
TOPIC: Der Name des Kafka-ThemasCLUSTER_IDist der Name des Clusters.LOCATION: die Region, in der sich Ihr Cluster befindetPROJECT_ID: der Name Ihres Google Cloud-Projekts
Geben Sie an der Eingabeaufforderung die folgenden Textzeilen ein, um Nachrichten an Kafka zu senden:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
Dead-Letter-Warteschlange verwenden
Während der Job ausgeführt wird, kann es vorkommen, dass die Pipeline einzelne Nachrichten nicht in BigQuery schreiben kann. Im Folgenden finden Sie mögliche Fehler:
- Serialisierungsfehler, einschließlich falsch formatierter JSON-Daten.
- Typkonvertierungsfehler, die durch eine nicht übereinstimmende Tabelle und die JSON-Daten verursacht wurden.
- Zusätzliche Felder in den JSON-Daten, die im Tabellenschema nicht vorhanden sind.
Diese Fehler führen nicht dazu, dass der Job fehlschlägt, und sie werden im Dataflow-Joblog nicht als Fehler angezeigt. Stattdessen verwendet die Pipeline eine Dead-Letter-Warteschlange, um diese Arten von Fehlern zu verarbeiten.
Wenn Sie die Dead-Letter-Warteschlange beim Ausführen der Vorlage aktivieren möchten, legen Sie die folgenden Vorlagenparameter fest:
useBigQueryDLQ:trueoutputDeadletterTable: ein vollständig qualifizierter BigQuery-Tabellenname, z. B.my-project:dataset1.errors
Die Tabelle wird automatisch von der Pipeline erstellt. Wenn beim Verarbeiten einer Kafka-Nachricht ein Fehler auftritt, schreibt die Pipeline einen Fehlereintrag in die Tabelle.
Beispiele für Fehlermeldungen:
| Art des Fehlers | Ereignisdaten | errorMessage |
|---|---|---|
| Serialisierungfehler | "Hello World" | JSON konnte nicht in Tabellenzeile serialisiert werden: "Hello world" |
| Fehler bei Typkonvertierung | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
| Unbekanntes Feld | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Mit BigQuery-Datentypen arbeiten
Intern konvertiert der Kafka-I/O-Connector JSON-Nachrichtennutzlasten in Apache Beam-TableRow-Objekte und übersetzt die TableRow-Feldwerte in BigQuery-Typen.
In der folgenden Tabelle sind JSON-Darstellungen von BigQuery-Datentypen aufgeführt.
| BigQuery-Typ | JSON-Darstellung |
|---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)"Geben Sie die Geografie entweder mit dem bekannten Text (WKT) oder GeoJSON als String an. Weitere Informationen finden Sie unter Raumbezogene Daten laden. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z"Verwenden Sie die JavaScript-Methode |
Strukturierte Daten
Wenn Ihre JSON-Nachrichten einem konsistenten Schema folgen, können Sie JSON-Objekte mit dem Datentyp STRUCT in BigQuery darstellen.
Im folgenden Beispiel ist das Feld answers ein JSON-Objekt mit zwei Unterfeldern, a und b:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
Mit der folgenden SQL-Anweisung wird eine BigQuery-Tabelle mit einem kompatiblen Schema erstellt:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
Die Tabelle sieht so aus:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
semistrukturierte Daten
Wenn Ihre JSON-Nachrichten keinem strengen Schema folgen, sollten Sie sie in BigQuery als JSON-Datentyp speichern.
Wenn Sie JSON-Daten als Typ JSON speichern, müssen Sie das Schema nicht im Voraus definieren. Nach der Datenaufnahme können Sie die Daten mit den Feldzugriffsoperatoren (Punktnotation) und Arrayzugriffsoperatoren in GoogleSQL abfragen. Weitere Informationen finden Sie unter Mit JSON-Daten in GoogleSQL arbeiten.
UDF zum Transformieren der Daten verwenden
In diesem Tutorial wird davon ausgegangen, dass die Kafka-Nachrichten als JSON formatiert sind und das BigQuery-Tabellenschema mit den JSON-Daten übereinstimmt, ohne dass Transformationen auf die Daten angewendet werden.
Optional können Sie eine benutzerdefinierte JavaScript-Funktion (User-Defined Function, UDF) bereitstellen, die die Daten transformiert, bevor sie in BigQuery geschrieben werden. Die UDF kann auch eine zusätzliche Verarbeitung ausführen, z. B. filtern, personenbezogene Daten entfernen oder die Daten mit zusätzlichen Feldern anreichern.
Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.