Mit Dataflow Daten von Kafka in BigQuery schreiben

Auf dieser Seite wird beschrieben, wie Sie mit Dataflow Daten aus Google Cloud Managed Service for Apache Kafka lesen und die Datensätze in eine BigQuery-Tabelle schreiben. In dieser Anleitung wird die Vorlage „Apache Kafka für BigQuery“ verwendet, um den Dataflow-Job zu erstellen.

Übersicht

Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Kafka wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Sie können mit Dataflow Ereignisse aus Kafka lesen, verarbeiten und die Ergebnisse zur weiteren Analyse in eine BigQuery-Tabelle schreiben.

Managed Service for Apache Kafka ist ein Google Cloud-Dienst, mit dem Sie sichere und skalierbare Kafka-Cluster ausführen können.

Kafka-Ereignisse in BigQuery lesen
Ereignisgesteuerte Architektur mit Apache Kafka

Erforderliche Berechtigungen

Das Dataflow-Worker-Dienstkonto muss die folgenden IAM-Rollen (Identity and Access Management) haben:

  • Managed Kafka Client (roles/managedkafka.client)
  • BigQuery Datenmitbearbeiter (roles/bigquery.dataEditor)

Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

Kafka-Cluster erstellen

In diesem Schritt erstellen Sie einen Managed Service for Apache Kafka-Cluster. Weitere Informationen finden Sie unter Managed Service for Apache Kafka-Cluster erstellen.

Konsole

  1. Rufen Sie die Seite Managed Service for Apache Kafka > Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf Erstellen.

  3. Geben Sie im Feld Clustername einen Namen für den Cluster ein.

  4. Wählen Sie in der Liste Region einen Standort für den Cluster aus.

  5. Klicken Sie auf Erstellen.

gcloud

Führen Sie den managed-kafka clusters create-Befehl aus.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Ersetzen Sie Folgendes:

  • CLUSTER: ein Name für den Cluster
  • REGION: Die Region, in der Sie das Subnetz erstellt haben
  • PROJECT_ID: Ihre Projekt-ID.
  • SUBNET_NAME: Das Subnetz, in dem Sie den Cluster bereitstellen möchten.

Das Erstellen eines Clusters dauert in der Regel 20 bis 30 Minuten.

Kafka-Thema erstellen

Nachdem der Managed Service for Apache Kafka-Cluster erstellt wurde, erstellen Sie ein Thema.

Konsole

  1. Rufen Sie die Seite Managed Service for Apache Kafka > Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf den Namen des Clusters.

  3. Klicken Sie auf der Seite mit den Clusterdetails auf Thema erstellen.

  4. Geben Sie im Feld Themenname einen Namen für das Thema ein.

  5. Klicken Sie auf Erstellen.

gcloud

Führen Sie den managed-kafka topics create-Befehl aus.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Ersetzen Sie Folgendes:

  • TOPIC_NAME: Der Name des zu erstellenden Themas.

BigQuery-Tabelle erstellen

In diesem Schritt erstellen Sie eine BigQuery-Tabelle mit dem folgenden Schema:

Spaltenname Datentyp
name STRING
customer_id INTEGER

Wenn Sie noch kein BigQuery-Dataset haben, erstellen Sie zuerst eines. Weitere Informationen finden Sie unter Datasets erstellen. Erstellen Sie dann eine neue leere Tabelle:

Console

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Maximieren Sie im Bereich Explorer Ihr Projekt und wählen Sie dann ein Dataset aus.

  3. Klicken Sie im Abschnitt Dataset-Informationen auf Tabelle erstellen.

  4. Wählen Sie in der Liste Tabelle erstellen aus die Option Leere Tabelle aus.

  5. Geben Sie im Feld Tabelle den Namen der Tabelle ein.

  6. Klicken Sie im Abschnitt Schema auf Als Text bearbeiten.

  7. Fügen Sie die folgende Schemadefinition ein:

    name:STRING,
    customer_id:INTEGER
    
  8. Klicken Sie auf Tabelle erstellen.

gcloud

Führen Sie den Befehl bq mk aus.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Projekt-ID
  • DATASET_NAME: der Name des Datasets
  • TABLE_NAME: Der Name der Tabelle, die erstellt werden soll

Führen Sie den Dataflow-Job aus:

Führen Sie nach dem Erstellen des Kafka-Clusters und der BigQuery-Tabelle die Dataflow-Vorlage aus.

Console

Rufen Sie zuerst die Bootstrap-Serveradresse des Clusters ab:

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf den Clusternamen.

  3. Klicken Sie auf den Tab Konfigurationen.

  4. Kopieren Sie die Bootstrap-Serveradresse aus Bootstrap-URL.

Führen Sie als Nächstes die Vorlage aus, um den Dataflow-Job zu erstellen:

  1. Rufen Sie die Seite Dataflow > Jobs auf.

    ZU JOBS

  2. Klicken Sie auf Job aus Vorlage erstellen.

  3. Geben Sie im Feld Jobname kafka-to-bq ein.

  4. Wählen Sie für Regionaler Endpunkt die Region aus, in der sich Ihr Managed Service for Apache Kafka-Cluster befindet.

  5. Wählen Sie die Vorlage "Kafka zu BigQuery" aus.

  6. Geben Sie die folgenden Vorlagenparameter ein:

    • Kafka-Bootstrap-Server: die Bootstrap-Serveradresse
    • Quell-Kafka-Thema: Der Name des Themas, das gelesen werden soll.
    • Modus der Kafka-Quellauthentifizierung: APPLICATION_DEFAULT_CREDENTIALS
    • Kafka-Nachrichtenformat: JSON
    • Strategie für Tabellennamen: SINGLE_TABLE_NAME
    • BigQuery-Ausgabetabelle: Die BigQuery-Tabelle im folgenden Format: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. Klicken Sie unter Warteschlange für unzustellbare Nachrichten das Kästchen Schreibfehler in BigQuery an.

  8. Geben Sie einen BigQuery-Tabellennamen für die Dead-Letter-Warteschlange ein, der so formatiert ist: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME.

    Erstellen Sie diese Tabelle nicht im Voraus. Die Pipeline erstellt sie.

  9. Klicken Sie auf Job ausführen.

gcloud

Führen Sie den dataflow flex-template run-Befehl aus.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Ersetzen Sie die folgenden Variablen:

  • LOCATION: Die Region, in der sich Ihr Managed Service for Apache Kafka befindet.
  • PROJECT_ID: der Name Ihres Google Cloud-Projekts
  • CLUSTER_ID: der Name des Clusters
  • TOPIC: Der Name des Kafka-Themas
  • DATASET_NAME: der Name des Datasets
  • TABLE_NAME: der Name der Tabelle
  • ERROR_TABLE_NAME: Ein BigQuery-Tabellenname für die Dead-Letter-Warteschlange

Erstellen Sie die Tabelle für die Warteschlange für unzustellbare Nachrichten nicht im Voraus. Die Pipeline erstellt sie.

Nachrichten an Kafka senden

Nachdem der Dataflow-Job gestartet wurde, können Sie Nachrichten an Kafka senden. Die Pipeline schreibt sie dann in BigQuery.

  1. Erstellen Sie eine VM im selben Subnetz wie der Kafka-Cluster und installieren Sie die Kafka-Befehlszeilentools. Eine ausführliche Anleitung finden Sie unter Clientcomputer einrichten im Hilfeartikel Nachrichten mit der CLI veröffentlichen und nutzen.

  2. Führen Sie den folgenden Befehl aus, um Nachrichten in das Kafka-Thema zu schreiben:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Ersetzen Sie die folgenden Variablen:

    • TOPIC: Der Name des Kafka-Themas
    • CLUSTER_ID ist der Name des Clusters.
    • LOCATION: die Region, in der sich Ihr Cluster befindet
    • PROJECT_ID: der Name Ihres Google Cloud-Projekts
  3. Geben Sie an der Eingabeaufforderung die folgenden Textzeilen ein, um Nachrichten an Kafka zu senden:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Dead-Letter-Warteschlange verwenden

Während der Job ausgeführt wird, kann es vorkommen, dass die Pipeline einzelne Nachrichten nicht in BigQuery schreiben kann. Im Folgenden finden Sie mögliche Fehler:

  • Serialisierungsfehler, einschließlich falsch formatierter JSON-Daten.
  • Typkonvertierungsfehler, die durch eine nicht übereinstimmende Tabelle und die JSON-Daten verursacht wurden.
  • Zusätzliche Felder in den JSON-Daten, die im Tabellenschema nicht vorhanden sind.

Diese Fehler führen nicht dazu, dass der Job fehlschlägt, und sie werden im Dataflow-Joblog nicht als Fehler angezeigt. Stattdessen verwendet die Pipeline eine Dead-Letter-Warteschlange, um diese Arten von Fehlern zu verarbeiten.

Wenn Sie die Dead-Letter-Warteschlange beim Ausführen der Vorlage aktivieren möchten, legen Sie die folgenden Vorlagenparameter fest:

  • useBigQueryDLQ: true
  • outputDeadletterTable: ein vollständig qualifizierter BigQuery-Tabellenname, z. B. my-project:dataset1.errors

Die Tabelle wird automatisch von der Pipeline erstellt. Wenn beim Verarbeiten einer Kafka-Nachricht ein Fehler auftritt, schreibt die Pipeline einen Fehlereintrag in die Tabelle.

Beispiele für Fehlermeldungen:

Art des Fehlers Ereignisdaten errorMessage
Serialisierungfehler "Hello World" JSON konnte nicht in Tabellenzeile serialisiert werden: "Hello world"
Fehler bei Typkonvertierung {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Unbekanntes Feld {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Mit BigQuery-Datentypen arbeiten

Intern konvertiert der Kafka-I/O-Connector JSON-Nachrichtennutzlasten in Apache Beam-TableRow-Objekte und übersetzt die TableRow-Feldwerte in BigQuery-Typen.

In der folgenden Tabelle sind JSON-Darstellungen von BigQuery-Datentypen aufgeführt.

BigQuery-Typ JSON-Darstellung
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Geben Sie die Geografie entweder mit dem bekannten Text (WKT) oder GeoJSON als String an. Weitere Informationen finden Sie unter Raumbezogene Daten laden.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Verwenden Sie die JavaScript-Methode Date.toJSON, um den Wert zu formatieren.

Strukturierte Daten

Wenn Ihre JSON-Nachrichten einem konsistenten Schema folgen, können Sie JSON-Objekte mit dem Datentyp STRUCT in BigQuery darstellen.

Im folgenden Beispiel ist das Feld answers ein JSON-Objekt mit zwei Unterfeldern, a und b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

Mit der folgenden SQL-Anweisung wird eine BigQuery-Tabelle mit einem kompatiblen Schema erstellt:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

Die Tabelle sieht so aus:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

semistrukturierte Daten

Wenn Ihre JSON-Nachrichten keinem strengen Schema folgen, sollten Sie sie in BigQuery als JSON-Datentyp speichern. Wenn Sie JSON-Daten als Typ JSON speichern, müssen Sie das Schema nicht im Voraus definieren. Nach der Datenaufnahme können Sie die Daten mit den Feldzugriffsoperatoren (Punktnotation) und Arrayzugriffsoperatoren in GoogleSQL abfragen. Weitere Informationen finden Sie unter Mit JSON-Daten in GoogleSQL arbeiten.

UDF zum Transformieren der Daten verwenden

In diesem Tutorial wird davon ausgegangen, dass die Kafka-Nachrichten als JSON formatiert sind und das BigQuery-Tabellenschema mit den JSON-Daten übereinstimmt, ohne dass Transformationen auf die Daten angewendet werden.

Optional können Sie eine benutzerdefinierte JavaScript-Funktion (User-Defined Function, UDF) bereitstellen, die die Daten transformiert, bevor sie in BigQuery geschrieben werden. Die UDF kann auch eine zusätzliche Verarbeitung ausführen, z. B. filtern, personenbezogene Daten entfernen oder die Daten mit zusätzlichen Feldern anreichern.

Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Nächste Schritte