Änderungen mit Dataflow streamen

Mit dem Bigtable Beam-Connector können Sie Dataflow verwenden, um Bigtable-Datenänderungsdatensätze zu lesen, ohne Partitionsänderungen in Ihrem Code verfolgen oder verarbeiten zu müssen. Diese Logik wird vom Connector übernommen.

In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie sich mit der Übersicht über Änderungs streams und mit Dataflow vertraut machen.

Alternativen zum Erstellen einer eigenen Pipeline

Wenn Sie keine eigene Dataflow-Pipeline erstellen möchten, können Sie eine der folgenden Optionen verwenden.

Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.

Sie können auch die Codebeispiele aus dem Bigtable-Tutorial oder der Bigtable-Kurzanleitung als Ausgangspunkt für Ihren Code verwenden.

Achten Sie darauf, dass der von Ihnen generierte Code die Version 26.14.0 oder höher von google cloud libraries-bom verwendet.

Connector-Details

Mit der Bigtable Beam-Connector-Methode BigtableIO.readChangeStream können Sie einen Stream von Datenänderungsdatensätzen (ChangeStreamMutation) lesen, die Sie verarbeiten können. Der Bigtable Beam-Connector ist eine Komponente des Apache Beam GitHub Repositorys. Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter BigtableIO.java.

Sie müssen den Connector mit Beam Version 2.48.0 oder höher verwenden. Prüfen Sie die Apache Beam Laufzeitunterstützung, um sicherzustellen, dass Sie eine unterstützte Java-Version verwenden. Anschließend können Sie eine Pipeline, die den Connector verwendet, in Dataflow bereitstellen. Dataflow übernimmt die Bereitstellung und Verwaltung von Ressourcen und unterstützt die Skalierbarkeit und Zuverlässigkeit der Stream-Daten verarbeitung.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Daten ohne Ereigniszeiten gruppieren

Datenänderungsdatensätze, die mit dem Bigtable Beam-Connector gestreamt werden, sind nicht mit Dataflow-Funktionen kompatibel, die von Ereigniszeiten abhängen.

Wie unter Replikation und Wasserzeichen erläutert, wird ein niedriges Wasserzeichen möglicherweise nicht aktualisiert, wenn die Replikation für die Partition nicht mit dem Rest der Instanz Schritt gehalten hat. Wenn ein niedriges Wasserzeichen nicht mehr aktualisiert wird, kann dies dazu führen, dass der Änderungsstream angehalten wird.

Um zu verhindern, dass der Stream angehalten wird, gibt der Bigtable Beam-Connector alle Daten mit einem Ausgabezeitstempel von null aus. Durch den Zeitstempel null werden alle Daten änderungsdatensätze von Dataflow als verspätete Daten betrachtet. Daher sind Dataflow-Funktionen, die von Ereigniszeiten abhängen, nicht mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie keine Windowing-Funktionen, Ereigniszeit-Trigger, oder Ereigniszeit-Timerverwenden.

Stattdessen können Sie GlobalWindows mit Triggern ohne Ereigniszeit verwenden, um diese verspäteten Daten in Bereiche zu gruppieren, wie im Beispiel aus dem Tutorial gezeigt. Weitere Informationen zu Triggern und Bereichen finden Sie unter Triggers im Beam-Programmierhandbuch.

Autoscaling

Der Connector unterstützt Dataflow-Autoscaling, das standardmäßig aktiviert ist, wenn Runner v2 (erforderlich) verwendet wird. Der Dataflow-Autoscaling-Algorithmus berücksichtigt den geschätzten Rückstand des Änderungsstreams, der auf der Dataflow-Monitoring Seite im Backlog Abschnitt beobachtet werden kann. Verwenden Sie das Flag --maxNumWorkers beim Bereitstellen eines Jobs, um die Anzahl der Worker zu begrenzen.

Informationen zum manuellen Skalieren Ihrer Pipeline anstelle von Autoscaling finden Sie unter Streamingpipeline manuell skalieren.

Beschränkungen

Beachten Sie die folgenden Einschränkungen, bevor Sie den Bigtable Beam-Connector mit Dataflow verwenden.

Dataflow Runner V2

Der Connector kann nur mit Dataflow Runner v2 ausgeführt werden. Geben Sie dazu --experiments=use_runner_v2 in den Befehlszeilenargumenten an. Wenn Sie Runner v1 verwenden, schlägt die Pipeline mit der folgenden Ausnahme fehl:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

Der Connector unterstützt keine Dataflow-Snapshots.

Duplikate

Der Bigtable Beam-Connector streamt Änderungen für jeden Zeilenschlüssel und jeden Cluster in der Reihenfolge der Commit-Zeitstempel. Da er jedoch manchmal von früheren Zeitpunkten im Stream neu gestartet wird, können Duplikate entstehen.

Pipeline-Neustarts

Wenn eine Dataflow-Pipeline längere Zeit angehalten wurde, können Datenänderungsdatensätze hinter die Aufbewahrungsgrenze fallen. Wenn die Pipeline fortgesetzt wird, schlägt Bigtable die Pipeline fehl, damit Sie eine neue Pipeline mit einer neuen Startzeit für die Anfrage starten können, die innerhalb des Aufbewahrungszeitraums liegt. Bigtable tut dies, anstatt die Anfragezeit der ursprünglichen Pipeline im Hintergrund zu erhöhen, um zu verhindern, dass Datenänderungsdatensätze mit Zeitstempeln, die außerhalb des angegebenen Aufbewahrungszeitraums liegen, versehentlich gelöscht werden.

Hinweis

Bevor Sie den Connector verwenden, müssen Sie die folgenden Voraussetzungen erfüllen.

Authentifizierung einrichten

Wenn Sie die Java-Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten Sie dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Installieren Sie die Google Cloud CLI.

  2. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  3. Wenn Sie eine lokale Shell verwenden, erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Nutzerkonto:

    gcloud auth application-default login

    Wenn Sie Cloud Shell verwenden, ist dies nicht erforderlich.

    Wenn ein Authentifizierungsfehler zurückgegeben wird und Sie einen externen Identitätsanbieter (IdP) verwenden, prüfen Sie, ob Sie sich mit Ihrer föderierten Identität in der gcloud CLI angemeldet haben.

Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Standardanmeldedaten für Anwendungen für Code einrichten, der unter ausgeführt wird Google Cloud .

Änderungsstream aktivieren

Sie müssen einen Änderungsstream in einer Tabelle aktivieren, bevor Sie ihn lesen können. Sie können auch eine neue Tabelle mit aktivierten Änderungsstreams erstellen.

Metadatentabelle für Änderungsstreams

Wenn Sie Änderungen mit Dataflow streamen, erstellt der Bigtable Beam-Connector standardmäßig eine Metadatentabelle mit dem Namen __change_stream_md_table. In der Metadatentabelle für Änderungsstreams wird der Betriebsstatus des Connectors verwaltet und Metadaten zu Datenänderungsdatensätzen gespeichert.

Standardmäßig erstellt der Connector die Tabelle in derselben Instanz wie die Tabelle, die gestreamt wird. Damit die Tabelle ordnungsgemäß funktioniert, muss das Anwendungsprofil für die Metadatentabelle das Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen müssen aktiviert sein.

Weitere Informationen zum Streamen von Änderungen aus Bigtable mit dem Bigtable Beam-Connector finden Sie in der BigtableIO Dokumentation.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams mit Dataflow benötigen.

Zum Lesen der Änderungen aus Bigtable benötigen Sie diese Rolle:

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Zum Ausführen des Dataflow-Jobs benötigen Sie diese Rollen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Bigtable Beam-Connector als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml Code ähnlich der folgenden Abhängigkeit hinzu. Die Version muss 2.48.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Änderungsstream lesen

Um eine Dataflow-Pipeline zum Lesen Ihrer Datenänderungsdatensätze zu erstellen, konfigurieren Sie den Connector und fügen dann Transformationen und Senken hinzu. Anschließend verwenden Sie den Connector, um ChangeStreamMutation-Objekte in einer Beam-Pipeline zu lesen.

Die Codebeispiele in diesem Abschnitt sind in Java geschrieben und zeigen, wie Sie eine Pipeline erstellen und damit Schlüssel-Wert-Paare in einen String umwandeln. Jedes Paar besteht aus einem Zeilenschlüssel und einem ChangeStreamMutation-Objekt. Die Pipeline wandelt die Einträge jedes Objekts in einen durch Kommas getrennten String um.

Pipeline erstellen

Dieses Java-Codebeispiel zeigt, wie Sie die Pipeline erstellen:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Datenänderungsdatensätze verarbeiten

In diesem Beispiel wird gezeigt, wie Sie alle Einträge in einem Datenänderungsdatensatz für eine Zeile durchlaufen und eine Methode zum Umwandeln in einen String basierend auf dem Eintragstyp aufrufen.

Eine Liste der Eintragstypen, die ein Datenänderungsdatensatz enthalten kann, finden Sie unter Inhalt eines Datenänderungsdatensatzes.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In diesem Beispiel wird ein Schreibvorgang umgewandelt:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag für das Löschen von Zellen umgewandelt:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag für das Löschen einer Spaltenfamilie umgewandelt:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Überwachen

Mit den folgenden Ressourcen in der Google Cloud Console können Sie Ihre Google Cloud Ressourcen überwachen, während Sie eine Dataflow-Pipeline ausführen, um einen Bigtable-Änderungsstream zu lesen:

Prüfen Sie insbesondere die folgenden Messwerte:

  • Prüfen Sie auf der Seite „Bigtable-Systemstatistiken“ die folgenden Messwerte:
    • CPU-Auslastung nach Änderungsstreams in der Messung cpu_load_by_app_profile_by_method_by_table. Zeigt die Auswirkungen des Änderungsstreams auf die CPU-Auslastung des Clusters.
    • Änderungsstream-Speicherauslastung (Byte) (change_stream_log_used_bytes).
  • Prüfen Sie auf der Dataflow-Monitoring-Seite die Datenaktualität. Dieser Messwert zeigt die Differenz zwischen der aktuellen Zeit und dem Wasserzeichen, die etwa zwei Minuten beträgt, mit gelegentlichen Spitzen, die ein oder zwei Minuten länger sind. Die Datenaktualität gibt nicht an, ob Datenänderungsdatensätze langsam verarbeitet werden. Um den kontinuierlichen Zustand und die Leistung Ihrer kritischen Anwendungen zu gewährleisten, beobachten Sie den Dataflow-Messwert für die Datenaktualität und führen Sie die folgenden Schritte aus:

    • Wenn der Messwert für die Datenaktualität konstant über dem Schwellenwert liegt, sind Ihre Ressourcen möglicherweise zu gering. Wir empfehlen, weitere Dataflow-Worker hinzuzufügen.
    • Wenn die Dataflow-Worker gut bereitgestellt sind, die Datenaktualität jedoch steigt oder konstant hoch ist, wenden Sie sich an den Google Cloud Support.
  • Mit dem Dataflow-Messwert processing_delay_from_commit_timestamp_MEAN können Sie die durchschnittliche Verarbeitungszeit von Datenänderungsdatensätzen über die Lebensdauer des Jobs hinweg ermitteln.

Die Bigtable-Messung server/latencies ist nicht hilfreich, wenn Sie eine Dataflow-Pipeline überwachen, die einen Bigtable-Änderungsstream liest, da sie die Dauer der Streaminganfrage und nicht die Latenz bei der Verarbeitung von Datenänderungsdatensätzen widerspiegelt. Eine hohe Latenz in einem Änderungsstream bedeutet nicht, dass die Anfragen langsam verarbeitet werden, sondern dass die Verbindung so lange offen war.

Nächste Schritte