In dieser Anleitung wird gezeigt, wie Sie eine Datenpipeline in Dataflow bereitstellen, um einen Echtzeitstream von Datenbankänderungen zu erhalten, die aus dem Änderungsstream einer Bigtable-Tabelle stammen. Die Ausgabe der Pipeline wird in eine Reihe von Dateien in Cloud Storage geschrieben.
Es wird ein Beispiel-Dataset für eine Musik-Streaming-App bereitgestellt. In dieser Anleitung erfassen Sie, welche Songs gehört werden, und erstellen dann eine Rangliste der fünf beliebtesten Songs für einen bestimmten Zeitraum.
Diese Anleitung richtet sich an technische Nutzer, die mit dem Schreiben von Code und dem Bereitstellen von Datenpipelines in Google Cloudvertraut sind.
Umgebung vorbereiten
Code abrufen
Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie dieses Repository bereits heruntergeladen haben, führen Sie einen Pull aus, um die neueste Version zu erhalten.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Bucket erstellen
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
with a bucket name
that meets the bucket naming requirements.
Bigtable-Instanz erstellen
Sie können für diese Anleitung eine vorhandene Instanz verwenden oder eine Instanz mit den Standardkonfigurationen in einer Region in Ihrer Nähe erstellen.
Tabelle erstellen
Die Beispielanwendung erfasst die Songs, die Nutzer hören, und speichert die Hörereignisse in Bigtable. Erstellen Sie eine Tabelle mit aktiviertem Änderungsstream, die eine Spaltenfamilie (cf) und eine Spalte (song) enthält und Nutzer-IDs für Zeilenschlüssel verwendet.
Erstellen Sie die Tabelle.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Ersetzen Sie Folgendes:
- PROJECT_ID: die ID des Projekts, das Sie verwenden
- BIGTABLE_INSTANCE_ID: Die ID der Instanz, die die neue Tabelle enthalten soll.
Pipeline starten
In dieser Pipeline wird der Änderungsstream so transformiert:
- Liest den Änderungsstream
- Ruft den Songnamen ab.
- Gruppiert die Ereignisse für das Anhören von Songs in N-Sekunden-Zeiträume.
- Zählt die fünf meistgespielten Titel
- Ausgabe der Ergebnisse
Pipeline ausführen.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich Ihre Bigtable-Instanz befindet, z. B. us-east5
.
Pipeline verstehen
Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, den Code zu verstehen, den Sie ausführen.
Änderungsstream lesen
Der Code in diesem Beispiel konfiguriert den Quellstream mit den Parametern für die spezifische Bigtable-Instanz und -Tabelle.
Songname abrufen
Wenn ein Song angehört wird, wird der Songname in die Spaltenfamilie cf
und den Spaltenqualifizierer song
geschrieben. Der Code extrahiert den Wert also aus der Änderung des Streams und gibt ihn an den nächsten Schritt der Pipeline aus.
Die fünf meistgespielten Songs zählen
Mit den integrierten Beam-Funktionen Count
und Top.of
können Sie die fünf beliebtesten Songs im aktuellen Fenster abrufen.
Ergebnisse ausgeben
Die Ergebnisse dieser Pipeline werden sowohl in Standard-Out als auch in Dateien geschrieben. Bei den Dateien werden die Schreibvorgänge in Gruppen von 10 Elementen oder in Ein-Minuten-Segmente unterteilt.
Pipeline ansehen
Rufen Sie in der Google Cloud Console die Seite Dataflow auf.
Klicken Sie auf den Job, dessen Name mit song-rank beginnt.
Klicken Sie unten auf dem Bildschirm auf Anzeigen, um das Logfeld zu öffnen.
Klicken Sie auf Worker-Logs, um die Ausgabelogs des Änderungsstreams zu überwachen.
Stream-Schreibvorgänge
Verwenden Sie die cbt
-Befehlszeile, um eine Anzahl von Songwiedergaben für verschiedene Nutzer in die Tabelle song-rank
zu schreiben. So soll simuliert werden, dass Songs über einen längeren Zeitraum gestreamt werden.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Ausgabe ansehen
Lesen Sie die Ausgabe in Cloud Storage, um die beliebtesten Songs zu sehen.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Beispielausgabe:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]