Auf dieser Seite werden die Leistungsmerkmale für Dataflow-Streamingjobs beschrieben, die Daten aus Apache Kafka lesen und in BigQuery schreiben. Sie liefert Benchmark-Testergebnisse für reine Zuordnungspipelines, bei denen Transformationen pro Nachricht ohne Statusverfolgung oder Gruppierung von Elementen im Stream durchgeführt werden.
Viele Data Integration-Arbeitslasten, darunter ETL, Feldvalidierung und Schemazuordnung, fallen in die Kategorie „Nur Map“. Wenn Ihre Pipeline diesem Muster folgt, können Sie diese Benchmarks verwenden, um Ihren Dataflow-Job mit einer leistungsstarken Referenzkonfiguration zu vergleichen.
Testmethodik
Die Benchmarks wurden mit den folgenden Ressourcen durchgeführt:
Ein Managed Service for Apache Kafka-Cluster. Die Nachrichten wurden mit der Vorlage Streaming Data Generator generiert.
- Nachrichtenrate: etwa 1.000.000 Nachrichten pro Sekunde
- Eingabelast: 1 GiB/s
- Nachrichtenformat: Zufällig generierter JSON-Text mit einem festen Schema
- Nachrichtengröße: ca. 1 KiB pro Nachricht
- Kafka-Partitionen: 1000
Eine Dataflow-Streamingpipeline, die die Vorlage „Apache Kafka für BigQuery“ verwendet. In dieser Pipeline werden die minimal erforderlichen Parsing- und Schemazuordnungen ausgeführt. Es wurde keine benutzerdefinierte Funktion (UDF) verwendet.
Nachdem die horizontale Skalierung stabilisiert und die Pipeline den stabilen Zustand erreicht hatte, durften die Pipelines etwa einen Tag lang ausgeführt werden. Danach wurden die Ergebnisse erfasst und analysiert.
Dataflow-Pipeline
Bei diesem Benchmark wird eine reine Map-Pipeline verwendet, die eine einfache Zuordnung und Konvertierung von JSON-Nachrichten durchführt. Die Pipeline wurde sowohl im Genau einmal-Modus als auch im Mindestens einmal-Modus getestet. Die At-least-once-Verarbeitung bietet einen besseren Durchsatz. Sie sollte jedoch nur verwendet werden, wenn doppelte Datensätze akzeptabel sind oder die nachgelagerte Senke die Deduplizierung übernimmt.
Jobkonfiguration
In der folgenden Tabelle sehen Sie, wie die Dataflow-Jobs konfiguriert wurden.
| Einstellung | Wert |
|---|---|
| Maschinentyp des Workers | e2-standard-2 |
| Worker-Maschinen-vCPUs | 2 |
| RAM der Worker-Maschine | 8 GB |
| Persistent Disk für Worker-Maschine | Nichtflüchtiger Standardspeicher (HDD), 30 GB |
| Maximale Anzahl der Worker | 120 |
| Streaming Engine | Ja |
| Horizontales Autoscaling | Ja |
| Abrechnungsmodell | Ressourcenbasierte Abrechnung |
| Ist die Storage Write API aktiviert? | Ja |
| Storage Write API-Streams | 400 |
| Triggerhäufigkeit der Storage Write API | 5 Sekunden |
| Nachrichtenformat | JSON |
| Kafka-Authentifizierungsmodus |
Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC). Weitere Informationen finden Sie unter Authentifizierungstypen für Kafka-Broker. |
Die BigQuery Storage Write API wird für Streamingpipelines empfohlen. Wenn Sie den genau einmaligen Modus mit der Storage Write API verwenden, können Sie die folgenden Einstellungen anpassen:
Anzahl der Schreibstreams Um ausreichend Schlüsselparallelität in der Schreibphase zu gewährleisten, legen Sie die Anzahl der Storage Write API-Streams auf einen Wert fest, der größer als die Anzahl der Worker-CPUs ist. Beachten Sie dabei die Empfehlungen zum Durchsatz pro Stream.
Auslösehäufigkeit: Ein einstelliger Sekundenwert ist für Pipelines mit hohem Durchsatz geeignet.
Weitere Informationen finden Sie unter Aus Dataflow in BigQuery schreiben.
Auch die Anzahl der Apache Kafka-Partitionen sollte berücksichtigt werden. Um eine ausreichende Schlüsselparallelität in der Lesestufe zu gewährleisten, sollte die Anzahl der Partitionen mindestens der Gesamtzahl der Worker-vCPUs entsprechen. Weitere Informationen finden Sie unter Daten aus Apache Kafka in Dataflow lesen.
Benchmark-Ergebnisse
In diesem Abschnitt werden die Ergebnisse der Benchmarktests beschrieben.
Durchsatz und Ressourcennutzung
In der folgenden Tabelle sind die Testergebnisse für den Pipeline-Durchsatz und die Ressourcennutzung aufgeführt.
| Ergebnis | Genau einmal | Mindestens einmal |
|---|---|---|
| Eingabedurchsatz pro Mitarbeiter | Durchschnitt: 15 MB/s, n=3 | Durchschnitt: 18 MB/s, n=3 |
| Durchschnittliche CPU-Auslastung aller Worker | Mittelwert: 70%, n=3 | Mittelwert: 75%, n=3 |
| Anzahl der Worker-Knoten | Mittelwert: 63, n=3 | Mittelwert: 53, n=3 |
| Streaming Engine-Recheneinheiten pro Stunde | Mittelwert: 58, n=3 | Mittelwert: 0, n=3 |
Der Autoscaling-Algorithmus kann sich auf den Grad der CPU-Zielauslastung auswirken. Um eine höhere oder niedrigere Ziel-CPU-Auslastung zu erreichen, können Sie den Autoscaling-Bereich oder den Hinweis zur Worker-Auslastung festlegen. Höhere Auslastungsziele können zu niedrigeren Kosten, aber auch zu einer schlechteren Tail-Latenz führen, insbesondere bei variablen Lasten.
Latenz
In der folgenden Tabelle sind die Benchmark-Ergebnisse für die Pipeline-Latenz im Exactly-Once-Modus ohne die Eingabephase aufgeführt.
| Gesamte End-to-End-Latenz der Phase, ohne Eingabephase | Genau einmal |
|---|---|
| P50 | Mittelwert: 1.200 ms, n=3 |
| P95 | Mittelwert: 3.000 ms, n=3 |
| P99 | Mittelwert: 5.400 ms, n=3 |
Bei den Tests wurde die End-to-End-Latenz pro Phase (die Messgröße job/streaming_engine/stage_end_to_end_latencies) über drei lange Testläufe hinweg gemessen. Dieser Messwert gibt an, wie lange die Streaming Engine in jeder Pipelinestufe benötigt. Sie umfasst alle internen Schritte der Pipeline, z. B.:
- Nachrichten für die Verarbeitung mischen und in die Warteschlange stellen
- Die tatsächliche Verarbeitungszeit, z. B. zum Konvertieren von Nachrichten in Zeilenobjekte
- Schreiben des nichtflüchtigen Zustands sowie die Zeit, die für das Schreiben des nichtflüchtigen Zustands in der Warteschlange verbracht wurde
Aufgrund einer Einschränkung des Messwerts wird die Latenz der Eingabephase nicht gemeldet. Daher ist er nicht in der Gesamtsumme enthalten.
Die hier gezeigten Benchmarks stellen eine Baseline dar. Die Latenz reagiert sehr empfindlich auf die Komplexität der Pipeline. Benutzerdefinierte UDFs, zusätzliche Transformationen und komplexe Windowing-Logik können die Latenz erhöhen.
Kosten schätzen
Sie können die Basiskosten Ihrer eigenen, vergleichbaren Pipeline mit ressourcenbasierter Abrechnung mit dem Preisrechner der Google Cloud Platform schätzen:
- Öffnen Sie den Preisrechner.
- Klicken Sie auf Der Schätzung hinzufügen.
- Wählen Sie „Dataflow“ aus.
- Wählen Sie als Diensttyp „Dataflow Classic“ aus.
- Wählen Sie Erweiterte Einstellungen aus, um alle Optionen zu sehen.
- Wählen Sie den Standort aus, an dem der Job ausgeführt wird.
- Wählen Sie als Jobtyp die Option „Streaming“ aus.
- Wählen Sie Streaming Engine aktivieren aus.
- Geben Sie Informationen zu den Joblaufstunden, Worker-Knoten, Worker-Computern und zum nichtflüchtigen Speicher ein.
- Geben Sie die geschätzte Anzahl der Streaming Engine-Recheneinheiten ein.
Die Ressourcennutzung und die Kosten skalieren in etwa linear mit dem Eingabedurchsatz. Bei kleinen Jobs mit nur wenigen Workern werden die Gesamtkosten jedoch von den Fixkosten dominiert. Als Ausgangspunkt können Sie die Anzahl der Worker-Knoten und den Ressourcenverbrauch aus den Benchmark-Ergebnissen ableiten.
Angenommen, Sie führen eine reine Map-Pipeline im „Genau einmal“-Modus mit einer Eingabedatenrate von 100 MiB/s aus. Anhand der Benchmark-Ergebnisse für eine Pipeline mit 1 GiB/s können Sie die Ressourcenanforderungen so schätzen:
- Skalierungsfaktor: (100 MiB/s) / (1 GiB/s) = 0,1
- Prognostizierte Worker-Knoten: 63 Worker × 0,1 = 6,3 Worker
- Geschätzte Anzahl von Streaming Engine-Recheneinheiten pro Stunde: 58 × 0,1 = 5,8 Einheiten pro Stunde
Dieser Wert sollte nur als erste Schätzung verwendet werden. Der tatsächliche Durchsatz und die Kosten können je nach Faktoren wie Maschinentyp, Verteilung der Nachrichtengröße, Nutzercode, Aggregationstyp, Schlüsselparallelität und Fenstergröße erheblich variieren. Weitere Informationen finden Sie unter Best Practices für die Dataflow-Kostenoptimierung.
Testpipeline ausführen
In diesem Abschnitt werden die gcloud dataflow flex-template run-Befehle gezeigt, die zum Ausführen der reinen Map-Pipeline verwendet wurden.
Modus „Genau einmal“
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400
Modus „Mindestens einmal“
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--additional-experiments=streaming_mode_at_least_once \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true
Ersetzen Sie Folgendes:
JOB_NAME: der Name des Dataflow-JobsPROJECT_ID: die Projekt-IDKAFKA_BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Apache Kafka-Clusters.KAFKA_TOPIC: der Name des Kafka-ThemasBQ_DATASET: der Name des BigQuery-DatasetsBQ_TABLE_NAME: der Name der BigQuery-Tabelle
Testdaten generieren
Verwenden Sie den folgenden Befehl, um Testdaten zu generieren und die Vorlage für den Streamingdatengenerator auszuführen:
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--max-workers=140 \
--parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON
Ersetzen Sie Folgendes:
JOB_NAME: der Name des Dataflow-JobsPROJECT_ID: die Projekt-IDSCHEMA_LOCATION: der Pfad zu einer Schemadatei in Cloud StorageKAFKA_BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Apache Kafka-Clusters.KAFKA_TOPIC: der Name des Kafka-Themas
In der Vorlage für Streaming Data Generator wird eine JSON Data Generator-Datei verwendet, um das Nachrichtenschema zu definieren. Für die Benchmarktests wurde ein Nachrichtenschema verwendet, das dem folgenden ähnelt:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Nächste Schritte
- Dataflow-Job-Monitoring-Oberfläche verwenden
- Best Practices für die Kostenoptimierung von Dataflow
- Fehlerbehebung bei langsamen oder hängenden Streamingjobs
- Daten aus Apache Kafka in Dataflow lesen
- Aus Dataflow in BigQuery schreiben