Leistungsmerkmale von Pub/Sub-zu-BigQuery-Pipelines

Auf dieser Seite werden die Leistungsmerkmale für Dataflow-Streamingjobs beschrieben, die Daten aus Pub/Sub lesen und in BigQuery schreiben. Er enthält Benchmark-Testergebnisse für zwei Arten von Streamingpipelines:

  • Nur Zuordnung (Transformation pro Nachricht): Pipelines, die Transformationen pro Nachricht ausführen, ohne den Status zu verfolgen oder Elemente im Stream zu gruppieren. Beispiele sind ETL, Feldvalidierung und Schemazuordnung.

  • Aggregation mit Zeitfenstern (GroupByKey): Pipelines, die zustandsbehaftete Vorgänge ausführen und Daten anhand eines Schlüssels und eines Zeitfensters gruppieren. Beispiele sind das Zählen von Ereignissen, das Berechnen von Summen und das Erfassen von Datensätzen für eine Nutzersitzung.

Die meisten Arbeitslasten für die Streaming-Datenintegration fallen in diese beiden Kategorien. Wenn Ihre Pipeline einem ähnlichen Muster folgt, können Sie diese Benchmarks verwenden, um Ihren Dataflow-Job mit einer leistungsstarken Referenzkonfiguration zu vergleichen.

Testmethodik

Die Benchmarks wurden mit den folgenden Ressourcen durchgeführt:

  • Ein vorab bereitgestelltes Pub/Sub-Thema mit einer gleichmäßigen Eingabelast. Die Nachrichten wurden mit der Vorlage Streaming Data Generator generiert.

    • Nachrichtenrate: etwa 1.000.000 Nachrichten pro Sekunde
    • Eingabelast: 1 GiB/s
    • Nachrichtenformat: Zufällig generierter JSON-Text mit einem festen Schema
    • Nachrichtengröße: ca. 1 KiB pro Nachricht
  • Eine Standard-BigQuery-Tabelle.

  • Dataflow-Streamingpipelines, die auf der Vorlage „Pub/Sub zu BigQuery“ basieren. In diesen Pipelines werden die erforderlichen Mindestvorgänge für das Parsen und die Schemazuordnung ausgeführt. Es wurde keine benutzerdefinierte Funktion (UDF) verwendet.

Nachdem die horizontale Skalierung stabilisiert und die Pipeline den stabilen Zustand erreicht hatte, durften die Pipelines etwa einen Tag lang ausgeführt werden. Danach wurden die Ergebnisse erfasst und analysiert.

Dataflow-Pipelines

Es wurden zwei Pipelinevarianten getestet:

Reine Zuordnungspipeline. Diese Pipeline führt eine einfache Zuordnung und Konvertierung von JSON-Nachrichten durch. Für diesen Test wurde die Vorlage „Pub/Sub für BigQuery“ unverändert verwendet.

  • Semantik: Die Pipeline wurde sowohl im „Genau einmal“-Modus als auch im „Mindestens einmal“-Modus getestet. Die Verarbeitung mindestens einmal bietet einen besseren Durchsatz. Sie sollte jedoch nur verwendet werden, wenn doppelte Datensätze akzeptabel sind oder die nachgelagerte Senke die Deduplizierung übernimmt.

Pipeline für die Aggregation mit Zeitfenstern. In dieser Pipeline werden Nachrichten nach einem bestimmten Schlüssel in Fenstern mit fester Größe gruppiert und die aggregierten Datensätze werden in BigQuery geschrieben. Für diesen Test wurde eine benutzerdefinierte Apache Beam-Pipeline auf Grundlage der Vorlage „Pub/Sub für BigQuery“ verwendet.

  • Aggregationslogik: Für jedes feste, nicht überlappende 1-Minuten-Zeitfenster wurden Nachrichten mit demselben Schlüssel erfasst und als einzelner aggregierter Datensatz in BigQuery geschrieben. Diese Art der Aggregation wird häufig bei der Log-Verarbeitung verwendet, um zusammengehörige Ereignisse wie die Aktivitäten eines Nutzers in einem einzelnen Datensatz für die nachgelagerte Analyse zu kombinieren.

  • Schlüsselparallelität: Für den Benchmark wurden 1.000.000 gleichmäßig verteilte Schlüssel verwendet.

  • Semantik: Die Pipeline wurde im „Genau einmal“-Modus getestet. Für Aggregationen ist eine „Genau einmal“-Semantik erforderlich, um die Richtigkeit zu gewährleisten und Doppelzählungen innerhalb einer Gruppe und eines Zeitraums zu verhindern.

Jobkonfiguration

In der folgenden Tabelle sehen Sie, wie die Dataflow-Jobs konfiguriert wurden.

Einstellung Nur Karte, genau einmal Nur Zuordnung, mindestens einmal Windowed Aggregation, genau einmal
Maschinentyp des Workers n1-standard-2 n1-standard-2 n1-standard-2
Worker-Maschinen-vCPUs 2 2 2
RAM der Worker-Maschine 7,5 GiB 7,5 GiB 7,5 GiB
Persistent Disk für Worker-Maschine Nichtflüchtiger Standardspeicher (HDD), 30 GB Nichtflüchtiger Standardspeicher (HDD), 30 GB Nichtflüchtiger Standardspeicher (HDD), 30 GB
Erste Worker 70 30 180
Maximale Anzahl der Worker 100 100 250
Streaming Engine Ja Ja Ja
Horizontales Autoscaling Ja Ja Ja
Abrechnungsmodell Ressourcenbasierte Abrechnung Ressourcenbasierte Abrechnung Ressourcenbasierte Abrechnung
Ist die Storage Write API aktiviert? Ja Ja Ja
Storage Write API-Streams 200 Nicht zutreffend 500
Triggerhäufigkeit der Storage Write API 5 Sekunden Nicht zutreffend 5 Sekunden

Die BigQuery Storage Write API wird für Streamingpipelines empfohlen. Wenn Sie den genau einmaligen Modus mit der Storage Write API verwenden, können Sie die folgenden Einstellungen anpassen:

  • Anzahl der Schreibstreams Um in der Schreibphase ausreichend Schlüsselparallelität zu gewährleisten, legen Sie die Anzahl der Storage Write API-Streams auf einen Wert fest, der größer als die Anzahl der Worker-CPUs ist. Achten Sie dabei auf einen angemessenen Durchsatz von BigQuery-Schreibstreams.

  • Auslösehäufigkeit: Ein einstelliger Sekundenwert eignet sich für Pipelines mit hohem Durchsatz.

Weitere Informationen finden Sie unter Aus Dataflow in BigQuery schreiben.

Benchmark-Ergebnisse

In diesem Abschnitt werden die Ergebnisse der Benchmark-Tests beschrieben.

Durchsatz und Ressourcennutzung

In der folgenden Tabelle sind die Testergebnisse für den Pipeline-Durchsatz und die Ressourcennutzung aufgeführt.

Ergebnis Nur Karte, genau einmal Nur Zuordnung, mindestens einmal Windowed Aggregation, genau einmal
Eingabedurchsatz pro Worker Durchschnitt: 17 MB/s, n=3 Durchschnitt: 21 MB/s, n=3 Mittelwert: 6 MB/s, n=3
Durchschnittliche CPU-Auslastung aller Worker Mittelwert: 65%, n=3 Mittelwert: 69%, n=3 Mittelwert: 80%, n=3
Anzahl der Worker-Knoten Mittelwert: 57, n=3 Mittelwert: 48, n=3 Mittelwert: 169, n=3
Streaming Engine-Recheneinheiten pro Stunde Mittelwert: 125, n=3 Mittelwert: 46, n=3 Mittelwert: 354, n=3

Der Autoscaling-Algorithmus kann sich auf den Grad der CPU-Zielauslastung auswirken. Um eine höhere oder niedrigere Ziel-CPU-Auslastung zu erreichen, können Sie den Autoscaling-Bereich oder den Hinweis zur Worker-Auslastung festlegen. Höhere Auslastungsziele können zu niedrigeren Kosten, aber auch zu einer schlechteren Tail-Latenz führen, insbesondere bei schwankenden Lasten.

Bei einer Pipeline für die Fensteraggregation können sich der Aggregationstyp, die Fenstergröße und die Schlüsselparallelität stark auf die Ressourcennutzung auswirken.

Latenz

Die folgende Tabelle enthält die Benchmark-Ergebnisse für die Pipeline-Latenz.

Gesamt-End-to-End-Latenz der Phase Nur Karte, genau einmal Nur Zuordnung, mindestens einmal Windowed Aggregation, genau einmal
P50 Mittelwert: 800 ms, n=3 Mittelwert: 160 ms, n=3 Mittelwert: 3.400 ms, n=3
P95 Mittelwert: 2.000 ms, n=3 Mittelwert: 250 ms, n=3 Mittelwert: 13.000 ms, n=3
P99 Mittelwert: 2.800 ms, n=3 Mittelwert: 410 ms, n=3 Mittelwert: 25.000 ms, n=3

Die Tests haben die End-to-End-Latenz pro Phase (die Messung job/streaming_engine/stage_end_to_end_latencies) über drei lang andauernde Testläufe hinweg gemessen. Dieser Messwert gibt an, wie lange die Streaming Engine in jeder Pipelinestufe benötigt. Sie umfasst alle internen Schritte der Pipeline, z. B.:

  • Nachrichten für die Verarbeitung mischen und in die Warteschlange stellen
  • Die tatsächliche Verarbeitungszeit, z. B. das Konvertieren von Nachrichten in Zeilenobjekte
  • Schreiben des nichtflüchtigen Zustands sowie die Zeit, die für das Schreiben des nichtflüchtigen Zustands in der Warteschlange verbracht wurde

Ein weiterer Latenzmesswert ist die Datenaktualität. Die Datenaktualität wird jedoch durch Faktoren wie benutzerdefinierte Fenster und Upstream-Verzögerungen in der Quelle beeinflusst. Die Systemlatenz bietet eine objektivere Baseline für die interne Verarbeitungseffizienz und den Zustand einer Pipeline unter Last.

Die Daten wurden über etwa einen Tag pro Lauf gemessen. Die anfänglichen Startphasen wurden verworfen, um eine stabile Leistung im Steady State zu berücksichtigen. Die Ergebnisse zeigen zwei Faktoren, die zusätzliche Latenz verursachen:

  • „Genau einmal“-Modus. Um die „Genau einmal“-Semantik zu erreichen, sind deterministisches Shuffling und persistente Statusabfragen für die Deduplizierung erforderlich. Der „Mindestens einmal“-Modus ist deutlich schneller, da diese Schritte umgangen werden.

  • Aggregation mit Zeitfenstern. Nachrichten müssen vollständig gemischt, gepuffert und in den persistenten Status geschrieben werden, bevor das Fenster geschlossen wird. Dies erhöht die Ende-zu-Ende-Latenz.

Die hier gezeigten Benchmarks stellen eine Grundlage dar. Die Latenz reagiert sehr empfindlich auf die Komplexität der Pipeline. Benutzerdefinierte UDFs, zusätzliche Transformationen und komplexe Windowing-Logik können die Latenz erhöhen. Einfache, stark reduzierende Aggregationen wie „sum“ und „count“ führen in der Regel zu einer geringeren Latenz als zustandsintensive Vorgänge wie das Erfassen von Elementen in einer Liste.

Kosten schätzen

Sie können die Basiskosten Ihrer eigenen, vergleichbaren Pipeline mit ressourcenbasierter Abrechnung mit dem Preisrechner der Google Cloud Platform schätzen:

  1. Öffnen Sie den Preisrechner.
  2. Klicken Sie auf Der Schätzung hinzufügen.
  3. Wählen Sie „Dataflow“ aus.
  4. Wählen Sie als Diensttyp „Dataflow Classic“ aus.
  5. Wählen Sie Erweiterte Einstellungen aus, um alle Optionen zu sehen.
  6. Wählen Sie den Standort aus, an dem der Job ausgeführt wird.
  7. Wählen Sie als Jobtyp die Option „Streaming“ aus.
  8. Wählen Sie Streaming Engine aktivieren aus.
  9. Geben Sie Informationen zu den Joblaufstunden, Worker-Knoten, Worker-Computern und zum nichtflüchtigen Speicher ein.
  10. Geben Sie die geschätzte Anzahl der Streaming Engine-Recheneinheiten ein.

Die Ressourcennutzung und die Kosten skalieren in etwa linear mit dem Eingabedurchsatz. Bei kleinen Jobs mit nur wenigen Workern werden die Gesamtkosten jedoch von den Fixkosten dominiert. Als Ausgangspunkt können Sie die Anzahl der Worker-Knoten und den Ressourcenverbrauch aus den Benchmark-Ergebnissen extrapolieren.

Angenommen, Sie führen eine reine Map-Pipeline im „Genau einmal“-Modus mit einer Eingabedatenrate von 100 MiB/s aus. Anhand der Benchmark-Ergebnisse für eine Pipeline mit 1 GiB/s können Sie die Ressourcenanforderungen so schätzen:

  • Skalierungsfaktor: (100 MiB/s) / (1 GiB/s) = 0,1
  • Prognostizierte Worker-Knoten: 57 Worker × 0,1 = 5,7 Worker
  • Prognostizierte Anzahl von Streaming Engine-Recheneinheiten pro Stunde: 125 × 0,1 = 12,5 Einheiten pro Stunde

Dieser Wert sollte nur als erste Schätzung verwendet werden. Der tatsächliche Durchsatz und die Kosten können je nach Faktoren wie Maschinentyp, Verteilung der Nachrichtengröße, Nutzercode, Aggregationstyp, Schlüsselparallelität und Fenstergröße erheblich variieren. Weitere Informationen finden Sie unter Best Practices für die Dataflow-Kostenoptimierung.

Testpipeline ausführen

In diesem Abschnitt werden die gcloud dataflow flex-template run-Befehle gezeigt, die zum Ausführen der reinen Map-Pipeline verwendet wurden.

Modus „Genau einmal“

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5

Modus „Mindestens einmal“

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
  --enable-streaming-engine \
  --num-workers 30 \
  --max-workers 100 \
  --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
  --additional-experiments streaming_mode_at_least_once

Ersetzen Sie Folgendes:

  • JOB_ID: die Dataflow-Job-ID
  • PROJECT_ID: die Projekt-ID
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • DATASET: der Name des BigQuery-Datasets
  • TABLE_NAME: der Name der BigQuery-Tabelle

Testdaten generieren

Verwenden Sie den folgenden Befehl, um Testdaten zu generieren und die Vorlage für den Streamingdatengenerator auszuführen:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --num-workers 70 \
  --max-workers 100 \
  --parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION

Ersetzen Sie Folgendes:

  • JOB_ID: die Dataflow-Job-ID
  • PROJECT_ID: die Projekt-ID
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • SCHEMA_LOCATION: der Pfad zu einer Schemadatei in Cloud Storage

In der Vorlage für Streaming Data Generator wird eine JSON Data Generator-Datei verwendet, um das Nachrichtenschema zu definieren. Für die Benchmarktests wurde ein Nachrichtenschema verwendet, das dem folgenden ähnelt:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Nächste Schritte