Bigtable-Änderungsstream verarbeiten

In dieser Anleitung wird gezeigt, wie Sie eine Datenpipeline in Dataflow bereitstellen, um einen Echtzeitstream von Datenbankänderungen zu erhalten, die aus dem Änderungsstream einer Bigtable-Tabelle stammen. Die Ausgabe der Pipeline wird in eine Reihe von Dateien in Cloud Storage geschrieben.

Es wird ein Beispiel-Dataset für eine Musik-Streaming-App bereitgestellt. In dieser Anleitung erfassen Sie, welche Songs gehört werden, und erstellen dann eine Rangliste der fünf beliebtesten Songs für einen bestimmten Zeitraum.

Diese Anleitung richtet sich an technische Nutzer, die mit dem Schreiben von Code und dem Bereitstellen von Datenpipelines in Google Cloudvertraut sind.

Ziele

In dieser Anleitung wird Folgendes beschrieben:

  • Bigtable-Tabelle mit aktiviertem Änderungsstream erstellen
  • Stellen Sie eine Pipeline in Dataflow bereit, die den Änderungsstream transformiert und ausgibt.
  • Sehen Sie sich die Ergebnisse Ihrer Datenpipeline an.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Nutzern von Google Cloud steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweis

    Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.

    Installieren Sie die Google Cloud CLI. Initialisieren Sie die Google Cloud CLI nach der Installation mit dem folgenden Befehl:

    gcloud init

    Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

    Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

    Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

    Aktivieren Sie die Dataflow API, die Cloud Bigtable API, die Cloud Bigtable Admin API und die Cloud Storage API:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Installieren Sie die Google Cloud CLI. Initialisieren Sie die Google Cloud CLI nach der Installation mit dem folgenden Befehl:

    gcloud init

    Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

    Erstellen Sie ein Google Cloud Projekt oder wählen Sie eines aus.

    Rollen, die zum Auswählen oder Erstellen eines Projekts erforderlich sind

    • Projekt auswählen: Für die Auswahl eines Projekts ist keine bestimmte IAM-Rolle erforderlich. Sie können jedes Projekt auswählen, für das Ihnen eine Rolle zugewiesen wurde.
    • Projekt erstellen: Zum Erstellen eines Projekts benötigen Sie die Rolle „Projektersteller“ (roles/resourcemanager.projectCreator), die die Berechtigung resourcemanager.projects.create enthält. Weitere Informationen zum Zuweisen von Rollen
    • So erstellen Sie ein Google Cloud -Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud -Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Projekts in Google Cloud .

    Prüfen Sie, ob für Ihr Google Cloud Projekt die Abrechnung aktiviert ist.

    Aktivieren Sie die Dataflow API, die Cloud Bigtable API, die Cloud Bigtable Admin API und die Cloud Storage API:

    Rollen, die zum Aktivieren von APIs erforderlich sind

    Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Aktualisieren und installieren Sie die cbt-CLI.
    gcloud components update
    gcloud components install cbt

Umgebung vorbereiten

Code abrufen

Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie dieses Repository bereits heruntergeladen haben, führen Sie einen Pull aus, um die neueste Version zu erhalten.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Bucket erstellen

  • Erstellen Sie einen Cloud Storage-Bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Ersetzen Sie BUCKET_NAME durch einen Bucket-Namen, der den Anforderungen für Bucket-Namen entspricht.
  • Bigtable-Instanz erstellen

    Sie können für diese Anleitung eine vorhandene Instanz verwenden oder eine Instanz mit den Standardkonfigurationen in einer Region in Ihrer Nähe erstellen.

    Tabelle erstellen

    Die Beispielanwendung erfasst die Songs, die Nutzer hören, und speichert die Hörereignisse in Bigtable. Erstellen Sie eine Tabelle mit aktiviertem Änderungsstream, die eine Spaltenfamilie (cf) und eine Spalte (song) enthält und Nutzer-IDs für Zeilenschlüssel verwendet.

    Erstellen Sie die Tabelle.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Projekts, das Sie verwenden
    • BIGTABLE_INSTANCE_ID: Die ID der Instanz, die die neue Tabelle enthalten soll.

    Pipeline starten

    In dieser Pipeline wird der Änderungsstream so transformiert:

    1. Liest den Änderungsstream
    2. Ruft den Songnamen ab.
    3. Gruppiert die Ereignisse für das Anhören von Songs in N-Sekunden-Zeiträume.
    4. Zählt die fünf meistgespielten Songs
    5. Gibt die Ergebnisse aus

    Die Pipline ausführen

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich Ihre Bigtable-Instanz befindet, z. B. us-east5.

    Pipeline verstehen

    Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, den Code zu verstehen, den Sie ausführen.

    Änderungsstream lesen

    Der Code in diesem Beispiel konfiguriert den Quellstream mit den Parametern für die spezifische Bigtable-Instanz und -Tabelle.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Songname abrufen

    Wenn ein Song angehört wird, wird der Songname in die Spaltenfamilie cf und den Spaltenqualifizierer song geschrieben. Der Code extrahiert den Wert also aus der Änderung des Streams und gibt ihn an den nächsten Schritt der Pipeline aus.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Die fünf meistgespielten Titel zählen

    Mit den integrierten Beam-Funktionen Count und Top.of können Sie die fünf beliebtesten Songs im aktuellen Fenster abrufen.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Ergebnisse ausgeben

    Die Ergebnisse dieser Pipeline werden sowohl in Standard-Out als auch in Dateien geschrieben. Bei den Dateien werden die Schreibvorgänge in Gruppen von 10 Elementen oder in einminütige Segmente unterteilt.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Pipeline ansehen

    1. Rufen Sie in der Google Cloud Console die Seite Dataflow auf.

      Zu Dataflow

    2. Klicken Sie auf den Job, dessen Name mit song-rank beginnt.

    3. Klicken Sie unten auf dem Bildschirm auf Anzeigen, um das Logfeld zu öffnen.

    4. Klicken Sie auf Worker-Logs, um die Ausgabelogs des Änderungsstreams zu überwachen.

    Stream-Schreibvorgänge

    Verwenden Sie die cbt-Befehlszeile, um eine Anzahl von Songwiedergaben für verschiedene Nutzer in die Tabelle song-rank zu schreiben. Die Funktion ist so konzipiert, dass sie über einige Minuten hinweg schreibt, um das Streaming von Song-Aufrufen im Laufe der Zeit zu simulieren.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Ausgabe ansehen

    Lesen Sie die Ausgabe in Cloud Storage, um die beliebtesten Songs zu sehen.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Beispielausgabe:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Bereinigen

    Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

    Projekt löschen

      Google Cloud -Projekt löschen:

      gcloud projects delete PROJECT_ID

    Einzelne Ressourcen löschen

    1. Löschen Sie den Bucket und die Dateien.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Deaktivieren Sie den Änderungsstream für die Tabelle.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Löschen Sie die Tabelle song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Beenden Sie die Änderungsstream-Pipeline.

      1. Listen Sie die Jobs auf, um die Job-ID zu erhalten.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Brechen Sie den Job ab.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Ersetzen Sie JOB_ID durch die Job-ID, die nach dem vorherigen Befehl angezeigt wird.

    Nächste Schritte