Leistungsmerkmale von Kafka-zu-BigQuery-Pipelines

Auf dieser Seite werden die Leistungsmerkmale für Dataflow-Streamingjobs beschrieben, die Daten aus Apache Kafka lesen und in BigQuery schreiben. Sie liefert Benchmark-Testergebnisse für reine Zuordnungspipelines, bei denen Transformationen pro Nachricht ohne Statusverfolgung oder Gruppierung von Elementen im Stream durchgeführt werden.

Viele Data Integration-Arbeitslasten, darunter ETL, Feldvalidierung und Schemazuordnung, fallen in die Kategorie „Nur Map“. Wenn Ihre Pipeline diesem Muster folgt, können Sie diese Benchmarks verwenden, um Ihren Dataflow-Job mit einer leistungsstarken Referenzkonfiguration zu vergleichen.

Testmethodik

Die Benchmarks wurden mit den folgenden Ressourcen durchgeführt:

  • Ein Managed Service for Apache Kafka-Cluster. Die Nachrichten wurden mit der Vorlage Streaming Data Generator generiert.

    • Nachrichtenrate: etwa 1.000.000 Nachrichten pro Sekunde
    • Eingabelast: 1 GiB/s
    • Nachrichtenformat: Zufällig generierter JSON-Text mit einem festen Schema
    • Nachrichtengröße: ca. 1 KiB pro Nachricht
    • Kafka-Partitionen: 1000
  • Eine Standard-BigQuery-Tabelle.

  • Eine Dataflow-Streamingpipeline, die die Vorlage „Apache Kafka für BigQuery“ verwendet. In dieser Pipeline werden die minimal erforderlichen Parsing- und Schemazuordnungen ausgeführt. Es wurde keine benutzerdefinierte Funktion (UDF) verwendet.

Nachdem die horizontale Skalierung stabilisiert und die Pipeline den stabilen Zustand erreicht hatte, durften die Pipelines etwa einen Tag lang ausgeführt werden. Danach wurden die Ergebnisse erfasst und analysiert.

Dataflow-Pipeline

Bei diesem Benchmark wird eine reine Map-Pipeline verwendet, die eine einfache Zuordnung und Konvertierung von JSON-Nachrichten durchführt. Die Pipeline wurde sowohl im Genau einmal-Modus als auch im Mindestens einmal-Modus getestet. Die At-least-once-Verarbeitung bietet einen besseren Durchsatz. Sie sollte jedoch nur verwendet werden, wenn doppelte Datensätze akzeptabel sind oder die nachgelagerte Senke die Deduplizierung übernimmt.

Jobkonfiguration

In der folgenden Tabelle sehen Sie, wie die Dataflow-Jobs konfiguriert wurden.

Einstellung Wert
Maschinentyp des Workers e2-standard-2
Worker-Maschinen-vCPUs 2
RAM der Worker-Maschine 8 GB
Persistent Disk für Worker-Maschine Nichtflüchtiger Standardspeicher (HDD), 30 GB
Maximale Anzahl der Worker 120
Streaming Engine Ja
Horizontales Autoscaling Ja
Abrechnungsmodell Ressourcenbasierte Abrechnung
Ist die Storage Write API aktiviert? Ja
Storage Write API-Streams 400
Triggerhäufigkeit der Storage Write API 5 Sekunden
Nachrichtenformat JSON
Kafka-Authentifizierungsmodus

Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC).

Weitere Informationen finden Sie unter Authentifizierungstypen für Kafka-Broker.

Die BigQuery Storage Write API wird für Streamingpipelines empfohlen. Wenn Sie den genau einmaligen Modus mit der Storage Write API verwenden, können Sie die folgenden Einstellungen anpassen:

  • Anzahl der Schreibstreams Um ausreichend Schlüsselparallelität in der Schreibphase zu gewährleisten, legen Sie die Anzahl der Storage Write API-Streams auf einen Wert fest, der größer als die Anzahl der Worker-CPUs ist. Beachten Sie dabei die Empfehlungen zum Durchsatz pro Stream.

  • Auslösehäufigkeit: Ein einstelliger Sekundenwert ist für Pipelines mit hohem Durchsatz geeignet.

Weitere Informationen finden Sie unter Aus Dataflow in BigQuery schreiben.

Auch die Anzahl der Apache Kafka-Partitionen sollte berücksichtigt werden. Um eine ausreichende Schlüsselparallelität in der Lesestufe zu gewährleisten, sollte die Anzahl der Partitionen mindestens der Gesamtzahl der Worker-vCPUs entsprechen. Weitere Informationen finden Sie unter Daten aus Apache Kafka in Dataflow lesen.

Benchmark-Ergebnisse

In diesem Abschnitt werden die Ergebnisse der Benchmarktests beschrieben.

Durchsatz und Ressourcennutzung

In der folgenden Tabelle sind die Testergebnisse für den Pipeline-Durchsatz und die Ressourcennutzung aufgeführt.

Ergebnis Genau einmal Mindestens einmal
Eingabedurchsatz pro Mitarbeiter Durchschnitt: 15 MB/s, n=3 Durchschnitt: 18 MB/s, n=3
Durchschnittliche CPU-Auslastung aller Worker Mittelwert: 70%, n=3 Mittelwert: 75%, n=3
Anzahl der Worker-Knoten Mittelwert: 63, n=3 Mittelwert: 53, n=3
Streaming Engine-Recheneinheiten pro Stunde Mittelwert: 58, n=3 Mittelwert: 0, n=3

Der Autoscaling-Algorithmus kann sich auf den Grad der CPU-Zielauslastung auswirken. Um eine höhere oder niedrigere Ziel-CPU-Auslastung zu erreichen, können Sie den Autoscaling-Bereich oder den Hinweis zur Worker-Auslastung festlegen. Höhere Auslastungsziele können zu niedrigeren Kosten, aber auch zu einer schlechteren Tail-Latenz führen, insbesondere bei variablen Lasten.

Latenz

In der folgenden Tabelle sind die Benchmark-Ergebnisse für die Pipeline-Latenz im Exactly-Once-Modus ohne die Eingabephase aufgeführt.

Gesamte End-to-End-Latenz der Phase, ohne Eingabephase Genau einmal
P50 Mittelwert: 1.200 ms, n=3
P95 Mittelwert: 3.000 ms, n=3
P99 Mittelwert: 5.400 ms, n=3

Bei den Tests wurde die End-to-End-Latenz pro Phase (die Messgröße job/streaming_engine/stage_end_to_end_latencies) über drei lange Testläufe hinweg gemessen. Dieser Messwert gibt an, wie lange die Streaming Engine in jeder Pipelinestufe benötigt. Sie umfasst alle internen Schritte der Pipeline, z. B.:

  • Nachrichten für die Verarbeitung mischen und in die Warteschlange stellen
  • Die tatsächliche Verarbeitungszeit, z. B. zum Konvertieren von Nachrichten in Zeilenobjekte
  • Schreiben des nichtflüchtigen Zustands sowie die Zeit, die für das Schreiben des nichtflüchtigen Zustands in der Warteschlange verbracht wurde

Aufgrund einer Einschränkung des Messwerts wird die Latenz der Eingabephase nicht gemeldet. Daher ist er nicht in der Gesamtsumme enthalten.

Die hier gezeigten Benchmarks stellen eine Baseline dar. Die Latenz reagiert sehr empfindlich auf die Komplexität der Pipeline. Benutzerdefinierte UDFs, zusätzliche Transformationen und komplexe Windowing-Logik können die Latenz erhöhen.

Kosten schätzen

Sie können die Basiskosten Ihrer eigenen, vergleichbaren Pipeline mit ressourcenbasierter Abrechnung mit dem Preisrechner der Google Cloud Platform schätzen:

  1. Öffnen Sie den Preisrechner.
  2. Klicken Sie auf Der Schätzung hinzufügen.
  3. Wählen Sie „Dataflow“ aus.
  4. Wählen Sie als Diensttyp „Dataflow Classic“ aus.
  5. Wählen Sie Erweiterte Einstellungen aus, um alle Optionen zu sehen.
  6. Wählen Sie den Standort aus, an dem der Job ausgeführt wird.
  7. Wählen Sie als Jobtyp die Option „Streaming“ aus.
  8. Wählen Sie Streaming Engine aktivieren aus.
  9. Geben Sie Informationen zu den Joblaufstunden, Worker-Knoten, Worker-Computern und zum nichtflüchtigen Speicher ein.
  10. Geben Sie die geschätzte Anzahl der Streaming Engine-Recheneinheiten ein.

Die Ressourcennutzung und die Kosten skalieren in etwa linear mit dem Eingabedurchsatz. Bei kleinen Jobs mit nur wenigen Workern werden die Gesamtkosten jedoch von den Fixkosten dominiert. Als Ausgangspunkt können Sie die Anzahl der Worker-Knoten und den Ressourcenverbrauch aus den Benchmark-Ergebnissen ableiten.

Angenommen, Sie führen eine reine Map-Pipeline im „Genau einmal“-Modus mit einer Eingabedatenrate von 100 MiB/s aus. Anhand der Benchmark-Ergebnisse für eine Pipeline mit 1 GiB/s können Sie die Ressourcenanforderungen so schätzen:

  • Skalierungsfaktor: (100 MiB/s) / (1 GiB/s) = 0,1
  • Prognostizierte Worker-Knoten: 63 Worker × 0,1 = 6,3 Worker
  • Geschätzte Anzahl von Streaming Engine-Recheneinheiten pro Stunde: 58 × 0,1 = 5,8 Einheiten pro Stunde

Dieser Wert sollte nur als erste Schätzung verwendet werden. Der tatsächliche Durchsatz und die Kosten können je nach Faktoren wie Maschinentyp, Verteilung der Nachrichtengröße, Nutzercode, Aggregationstyp, Schlüsselparallelität und Fenstergröße erheblich variieren. Weitere Informationen finden Sie unter Best Practices für die Dataflow-Kostenoptimierung.

Testpipeline ausführen

In diesem Abschnitt werden die gcloud dataflow flex-template run-Befehle gezeigt, die zum Ausführen der reinen Map-Pipeline verwendet wurden.

Modus „Genau einmal“

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

Modus „Mindestens einmal“

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

Ersetzen Sie Folgendes:

  • JOB_NAME: der Name des Dataflow-Jobs
  • PROJECT_ID: die Projekt-ID
  • KAFKA_BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Apache Kafka-Clusters.
  • KAFKA_TOPIC: der Name des Kafka-Themas
  • BQ_DATASET: der Name des BigQuery-Datasets
  • BQ_TABLE_NAME: der Name der BigQuery-Tabelle

Testdaten generieren

Verwenden Sie den folgenden Befehl, um Testdaten zu generieren und die Vorlage für den Streamingdatengenerator auszuführen:

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

Ersetzen Sie Folgendes:

  • JOB_NAME: der Name des Dataflow-Jobs
  • PROJECT_ID: die Projekt-ID
  • SCHEMA_LOCATION: der Pfad zu einer Schemadatei in Cloud Storage
  • KAFKA_BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Apache Kafka-Clusters.
  • KAFKA_TOPIC: der Name des Kafka-Themas

In der Vorlage für Streaming Data Generator wird eine JSON Data Generator-Datei verwendet, um das Nachrichtenschema zu definieren. Für die Benchmarktests wurde ein Nachrichtenschema verwendet, das dem folgenden ähnelt:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Nächste Schritte