Créer des connexions de flux de modifications à l'aide de Dataflow

Cette page explique comment créer des pipelines Dataflow qui consomment et transfèrent les données de modification Spanner à l'aide des flux de modifications. Vous pouvez utiliser l'exemple de code sur cette page pour créer des pipelines personnalisés.

Concepts fondamentaux

Voici quelques concepts de base pour les pipelines Dataflow pour les flux de modifications.

Dataflow

Dataflow est un service sans serveur, rapide et économique qui prend en charge le traitement par flux et par lot. Il offre la portabilité avec les jobs de traitement écrits à l'aide des bibliothèques Open Source Apache Beam, et automatise le provisionnement de l'infrastructure et la gestion des clusters. Dataflow fournit un flux en quasi-temps réel lors de la lecture à partir de flux de modifications.

Vous pouvez utiliser Dataflow pour consommer des flux de modifications Spanner avec le connecteur SpannerIO, qui offre une abstraction sur l'API Spanner pour interroger les flux de modifications. Avec ce connecteur, vous n'avez pas à gérer le cycle de vie des partitions de flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner. Le connecteur vous fournit un flux d'enregistrements de modifications de données. Vous pouvez ainsi vous concentrer davantage sur la logique de l'application et moins sur les détails spécifiques de l'API et le partitionnement dynamique des flux de modifications. Dans la plupart des cas où vous devez lire des données de flux de modifications, nous vous recommandons d'utiliser le connecteur SpannerIO plutôt que l'API Spanner.

Les modèles Dataflow sont des pipelines Dataflow prédéfinis qui implémentent des cas d'utilisation courants. Pour obtenir une présentation, consultez Modèles Dataflow.

Pipeline Dataflow

Un pipeline Dataflow de flux de modifications Spanner se compose de quatre parties principales :

  1. Une base de données Spanner avec un flux de modifications
  2. Connecteur SpannerIO
  3. Transformations et récepteurs définis par l'utilisateur
  4. Un outil d'écriture d'E/S de récepteur Apache Beam

image

Flux de modifications Spanner

Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications.

Connecteur Apache Beam SpannerIO

Il s'agit du connecteur SpannerIO décrit dans la section Dataflow précédente. Il s'agit d'un connecteur d'E/S source qui émet un PCollection d'enregistrements de modification des données vers les étapes ultérieures du pipeline. L'heure de l'événement de chaque enregistrement de modification de données émis correspond à l'horodatage de validation. Notez que les enregistrements émis sont non ordonnés et que le connecteur SpannerIO garantit qu'il n'y aura pas d'enregistrements tardifs.

Lorsque vous travaillez avec des flux de modifications, Dataflow utilise la création de points de contrôle. Par conséquent, chaque nœud de calcul peut attendre jusqu'à l'intervalle de point de contrôle configuré pour mettre en mémoire tampon les modifications avant de les envoyer pour un traitement ultérieur.

Transformations définies par l'utilisateur

Une transformation définie par l'utilisateur lui permet d'agréger, de transformer ou de modifier les données de traitement dans un pipeline Dataflow. Les cas d'utilisation courants incluent la suppression des informations permettant d'identifier personnellement l'utilisateur, la satisfaction des exigences de format de données en aval et le tri. Consultez la documentation officielle d'Apache Beam pour obtenir le guide de programmation sur les transformations.

Écrivain d'E/S de récepteur Apache Beam

Apache Beam contient des connecteurs d'E/S intégrés qui peuvent être utilisés pour écrire à partir d'un pipeline Dataflow dans un récepteur de données tel que BigQuery. Les récepteurs de données les plus courants sont compatibles de manière native.

Modèles Dataflow

Les modèles Dataflow permettent de créer des tâches Dataflow basées sur des images Docker prédéfinies pour des cas d'utilisation courants à l'aide de la console Google Cloud , de la CLI Google Cloud ou des appels d'API REST.

Pour les flux de modifications Spanner, nous proposons trois modèles Flex Dataflow :

Les restrictions suivantes s'appliquent lorsque vous utilisez le modèle Flux de modification Spanner vers Pub/Sub :

Définir les autorisations IAM pour les modèles Dataflow

Avant de créer une tâche Dataflow avec les trois modèles flexibles listés, assurez-vous de disposer des autorisations IAM requises pour les comptes de service suivants :

Si vous ne disposez pas des autorisations IAM requises, vous devez spécifier un compte de service de nœud de calcul géré par l'utilisateur pour créer le job Dataflow. Pour en savoir plus, consultez la section Sécurité et autorisations pour Dataflow.

Lorsque vous essayez d'exécuter un job à partir d'un modèle Flex Dataflow sans disposer de toutes les autorisations requises, il peut échouer et afficher l'erreur Échec de la lecture du fichier de résultats ou Autorisation refusée pour la ressource. Pour en savoir plus, consultez Résoudre les problèmes liés aux modèles Flex.

Créer un pipeline Dataflow

Cette section aborde la configuration initiale du connecteur et fournit des exemples d'intégrations courantes avec la fonctionnalité de flux de modifications Spanner.

Pour suivre ces étapes, vous avez besoin d'un environnement de développement Java pour Dataflow. Pour en savoir plus, consultez Créer un pipeline Dataflow à l'aide de Java.

Créer un flux de modifications

Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une base de données Spanner avec un flux de modifications configuré.

Accorder des droits de contrôle des accès précis

Si vous prévoyez que des utilisateurs du contrôle des accès ultraprécis exécuteront le job Dataflow, assurez-vous qu'ils ont accès à un rôle de base de données disposant du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de valeur de table du flux de modifications. Assurez-vous également que le compte principal spécifie le rôle de base de données dans la configuration SpannerIO ou dans le modèle Flex Dataflow.

Pour en savoir plus, consultez À propos du contrôle des accès ultraprécis.

Ajouter le connecteur SpannerIO en tant que dépendance

Le connecteur Apache Beam SpannerIO encapsule la complexité de la consommation des flux de modification directement à l'aide de l'API Cloud Spanner, en émettant une PCollection d'enregistrements de données de flux de modification vers les étapes ultérieures du pipeline.

Ces objets peuvent être utilisés dans d'autres étapes du pipeline Dataflow de l'utilisateur. L'intégration du flux de modifications fait partie du connecteur SpannerIO. Pour pouvoir utiliser le connecteur SpannerIO, la dépendance doit être ajoutée à votre fichier pom.xml :

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Créer une base de données de métadonnées

Le connecteur doit suivre chaque partition lors de l'exécution du pipeline Apache Beam. Il conserve ces métadonnées dans une table Spanner créée par le connecteur lors de l'initialisation. Vous spécifiez la base de données dans laquelle cette table sera créée lorsque vous configurez le connecteur.

Comme décrit dans Bonnes pratiques concernant les flux de modifications, nous vous recommandons de créer une base de données à cet effet plutôt que d'autoriser le connecteur à utiliser la base de données de votre application pour stocker sa table de métadonnées.

Le propriétaire d'un job Dataflow qui utilise le connecteur SpannerIO doit disposer des autorisations IAM suivantes définies avec cette base de données de métadonnées :

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurer le connecteur

Le connecteur de flux de modifications Spanner peut être configuré comme suit :

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Vous trouverez ci-dessous la description des options readChangeStream() :

Configuration Spanner (obligatoire)

Permet de configurer le projet, l'instance et la base de données dans lesquels le flux de modifications a été créé et à partir desquels il doit être interrogé. Spécifie également, de manière facultative, le rôle de base de données à utiliser lorsque le compte principal IAM qui exécute le job Dataflow est un utilisateur de contrôle d'accès précis. Le job assume ce rôle de base de données pour accéder au flux de modifications. Pour en savoir plus, consultez À propos du contrôle des accès ultraprécis.

Nom du flux de modifications (obligatoire)

Ce nom identifie de manière unique le flux de modifications. Le nom doit être identique à celui utilisé lors de la création.

ID d'instance de métadonnées (facultatif)

Il s'agit de l'instance permettant de stocker les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API Change Streams.

ID de la base de données de métadonnées (obligatoire)

Il s'agit de la base de données permettant de stocker les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API Change Streams.

Nom de la table de métadonnées (facultatif)

Ne doit être utilisé que lors de la mise à jour d'un pipeline existant.

Il s'agit du nom de la table de métadonnées préexistante à utiliser par le connecteur. Le connecteur l'utilise pour stocker les métadonnées permettant de contrôler la consommation des données de l'API Change Streams. Si cette option est omise, Spanner crée une table avec un nom généré lors de l'initialisation du connecteur.

Priorité RPC (facultatif)

La priorité de la requête à utiliser pour les requêtes de flux de modifications. Si ce paramètre est omis, high priority sera utilisé.

InclusiveStartAt (obligatoire)

Les modifications apportées depuis le code temporel indiqué sont renvoyées à l'appelant.

InclusiveEndAt (facultatif)

Les modifications apportées jusqu'au code temporel indiqué sont renvoyées à l'appelant. Si ce paramètre est omis, les modifications seront émises indéfiniment.

Ajouter des transformations et des récepteurs pour traiter les données de modification

Une fois les étapes précédentes effectuées, le connecteur SpannerIO configuré est prêt à émettre une PCollection d'objets DataChangeRecord. Consultez Exemples de transformations et de récepteurs pour obtenir plusieurs exemples de configurations de pipeline qui traitent ces données en flux de différentes manières.

Notez que les enregistrements de flux de modifications émis par le connecteur SpannerIO ne sont pas ordonnés. En effet, les PCollections n'offrent aucune garantie d'ordre. Si vous avez besoin d'un flux ordonné, vous devez regrouper et trier les enregistrements en tant que transformations dans vos pipelines. Consultez Exemple : Ordonner par clé. Vous pouvez étendre cet exemple pour trier les enregistrements en fonction de n'importe quel champ, comme les ID de transaction.

Exemples de transformations et de récepteurs

Vous pouvez définir vos propres transformations et spécifier des récepteurs dans lesquels écrire les données. La documentation Apache Beam fournit une myriade de transformations qui peuvent être appliquées, ainsi que des connecteurs d'E/S prêts à l'emploi pour écrire les données dans des systèmes externes.

Exemple : Trier par clé

Cet exemple de code émet des enregistrements de modification des données, triés par code temporel de commit et regroupés par clés primaires à l'aide du connecteur Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Cet exemple de code utilise des états et des minuteurs pour mettre en mémoire tampon les enregistrements de chaque clé, et définit le délai d'expiration du minuteur sur une durée T configurée par l'utilisateur dans le futur (défini dans la fonction BufferKeyUntilOutputTimestamp). Lorsque le filigrane Dataflow dépasse l'heure T, ce code vide tous les enregistrements du tampon dont l'horodatage est inférieur à T, les trie par horodatage de validation et génère une paire clé/valeur où :

  • La clé est la clé d'entrée, c'est-à-dire la clé primaire hachée dans un tableau de buckets de taille 1 000.
  • La valeur correspond aux enregistrements de modification des données ordonnés qui ont été mis en mémoire tampon pour la clé.

Pour chaque clé, nous vous garantissons les éléments suivants :

  • Les minuteurs sont garantis de se déclencher dans l'ordre de leur code temporel d'expiration.
  • Les étapes en aval sont garanties de recevoir les éléments dans l'ordre dans lequel ils ont été produits.

Par exemple, avec une clé de valeur 100, le minuteur se déclenche respectivement à T1 et T10, ce qui produit un ensemble d'enregistrements de modification des données à chaque code temporel. Étant donné que les enregistrements de modifications de données générés à T1 ont été produits avant ceux générés à T10, il est également garanti que les enregistrements de modifications de données générés à T1 seront reçus par l'étape suivante avant ceux générés à T10. Ce mécanisme nous permet de garantir un ordre strict des codes temporels de validation par clé primaire pour le traitement en aval.

Ce processus se répète jusqu'à la fin du pipeline et au traitement de tous les enregistrements de modification des données (ou indéfiniment si aucune heure de fin n'est spécifiée).

Notez que cet exemple de code utilise des états et des minuteurs, au lieu de fenêtres, pour effectuer le tri par clé. La raison est que le traitement des fenêtres n'est pas garanti dans l'ordre. Cela signifie que les fenêtres plus anciennes peuvent être traitées après les fenêtres plus récentes, ce qui peut entraîner un traitement dans le désordre.

BreakRecordByModFn

Chaque enregistrement de modification de données peut contenir plusieurs modifications. Chaque modification représente une insertion, une mise à jour ou une suppression pour une seule valeur de clé primaire. Cette fonction divise chaque enregistrement de modification des données en enregistrements distincts, un par modification.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Cette fonction accepte un DataChangeRecord et génère un DataChangeRecord dont la clé est la clé primaire Spanner hachée en valeur entière.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Les minuteurs et les tampons sont associés à chaque clé. Cette fonction met en mémoire tampon chaque enregistrement de modification des données jusqu'à ce que le filigrane dépasse le code temporel auquel nous souhaitons générer les enregistrements de modification des données mises en mémoire tampon.

Ce code utilise un minuteur en boucle pour déterminer quand vider le tampon :

  1. Lorsqu'il détecte pour la première fois un enregistrement de modification de données pour une clé, il définit le déclenchement du minuteur sur le code temporel de commit de l'enregistrement de modification de données + incrementIntervalSeconds (une option configurable par l'utilisateur).
  2. Lorsque le minuteur se déclenche, il ajoute tous les enregistrements de modification de données dans le tampon dont l'horodatage est inférieur au délai d'expiration du minuteur à recordsToOutput. Si le tampon contient des enregistrements de modification de données dont le code temporel est supérieur ou égal à l'heure d'expiration du minuteur, il ajoute ces enregistrements de modification de données au tampon au lieu de les générer. Il définit ensuite le prochain minuteur sur le délai d'expiration du minuteur actuel plus incrementIntervalInSeconds.
  3. Si recordsToOutput n'est pas vide, la fonction trie les enregistrements de modification des données dans recordsToOutput par code temporel de commit et ID de transaction, puis les affiche.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Trier les transactions

Ce pipeline peut être modifié pour être trié par ID de transaction et code temporel de validation. Pour ce faire, mettez en mémoire tampon les enregistrements pour chaque paire ID de transaction / code temporel de commit, au lieu de le faire pour chaque clé Spanner. Pour cela, vous devez modifier le code dans KeyByIdFn.

Exemple : Assembler des transactions

Cet exemple de code lit les enregistrements de modification des données, assemble tous les enregistrements de modification des données appartenant à la même transaction en un seul élément et génère cet élément. Notez que les transactions générées par cet exemple de code ne sont pas classées par code temporel de commit.

Cet exemple de code utilise des tampons pour assembler les transactions à partir des enregistrements de modification des données. Lorsqu'il reçoit pour la première fois un enregistrement de modification de données appartenant à une transaction, il lit le champ numberOfRecordsInTransaction de l'enregistrement de modification de données, qui décrit le nombre attendu d'enregistrements de modification de données appartenant à cette transaction. Il met en mémoire tampon les enregistrements de modifications de données appartenant à cette transaction jusqu'à ce que le nombre d'enregistrements mis en mémoire tampon corresponde à numberOfRecordsInTransaction, après quoi il génère les enregistrements de modifications de données groupés.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Cette fonction accepte un DataChangeRecord et génère un DataChangeRecord indexé par l'ID de transaction.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn met en mémoire tampon les paires clé-valeur {TransactionId, DataChangeRecord} reçues de KeyByTransactionIdFn et les met en mémoire tampon par groupes en fonction de TransactionId. Lorsque le nombre d'enregistrements mis en mémoire tampon est égal au nombre d'enregistrements contenus dans l'ensemble de la transaction, cette fonction trie les objets DataChangeRecord du groupe par séquence d'enregistrement et génère une paire clé/valeur de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Ici, nous supposons que SortKey est une classe définie par l'utilisateur qui représente une paire {CommitTimestamp, TransactionId}. Pour en savoir plus sur SortKey, consultez l'exemple d'implémentation.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Exemple : Filtrer par tag de transaction

Lorsqu'une transaction modifiant les données utilisateur est taguée, le tag correspondant et son type sont stockés dans DataChangeRecord. Ces exemples montrent comment filtrer les enregistrements du flux de modifications en fonction des tags de transaction définis par l'utilisateur et des tags système :

Filtrage des tags définis par l'utilisateur pour my-tx-tag :

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtrage des tags système/Audit de la valeur TTL :

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Exemple : Récupérer la ligne complète

Cet exemple fonctionne avec une table Spanner nommée Singer qui présente la définition suivante :

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Dans le mode de capture de valeur OLD_AND_NEW_VALUES par défaut des flux de modifications, lorsqu'une ligne Spanner est modifiée, l'enregistrement de modification de données reçu ne contient que les colonnes qui ont été modifiées. Les colonnes suivies, mais qui n'ont pas été modifiées, ne seront pas incluses dans l'enregistrement. La clé primaire de la modification peut être utilisée pour effectuer une lecture d'instantané Spanner à l'horodatage de commit de l'enregistrement de modification de données afin d'extraire les colonnes non modifiées ou même de récupérer la ligne complète.

Notez que la règle de conservation de la base de données devra peut-être être définie sur une valeur supérieure ou égale à celle de la règle de conservation du flux de modifications pour que la lecture de l'instantané réussisse.

Notez également que l'utilisation du type de capture de valeur NEW_ROW est la méthode recommandée et la plus efficace, car elle renvoie par défaut toutes les colonnes suivies de la ligne et ne nécessite pas de lecture instantanée supplémentaire dans Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Cette transformation effectue une lecture obsolète à l'horodatage de commit de chaque enregistrement reçu et mappe la ligne complète au format JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Ce code crée un client de base de données Spanner pour effectuer l'extraction complète des lignes et configure le pool de sessions pour n'avoir que quelques sessions, en effectuant des lectures dans une instance de ToFullReowJsonFn de manière séquentielle. Dataflow s'assure de générer de nombreuses instances de cette fonction, chacune avec son propre pool de clients.

Exemple : Spanner vers Pub/Sub

Dans ce scénario, l'appelant diffuse des enregistrements vers Pub/Sub aussi rapidement que possible, sans regroupement ni agrégation. Cela convient parfaitement au déclenchement du traitement en aval, comme la diffusion en flux continu de toutes les nouvelles lignes insérées dans une table Spanner vers Pub/Sub pour un traitement ultérieur.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Notez que le récepteur Pub/Sub peut être configuré pour assurer une sémantique de type exactement une fois.

Exemple : Spanner vers Cloud Storage

Dans ce scénario, l'appelant regroupe tous les enregistrements dans une fenêtre donnée et enregistre le groupe dans des fichiers Cloud Storage distincts. Cette solution est idéale pour l'analyse et l'archivage à un moment précis, qui sont indépendants de la période de conservation de Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Notez que le récepteur Cloud Storage fournit par défaut une sémantique de type "au moins une fois". Avec un traitement supplémentaire, il peut être modifié pour avoir une sémantique de type "exactement une fois".

Nous fournissons également un modèle Dataflow pour ce cas d'utilisation. Pour en savoir plus, consultez Connecter des flux de modifications à Cloud Storage.

Exemple : Spanner vers BigQuery (table de comptes)

Ici, l'appelant diffuse les enregistrements de modification dans BigQuery. Chaque enregistrement de modification de données est reflété sous la forme d'une ligne dans BigQuery. Cette option est idéale pour l'analyse. Ce code utilise les fonctions définies précédemment dans la section Récupérer la ligne complète pour récupérer la ligne complète de l'enregistrement et l'écrire dans BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Notez que le récepteur BigQuery fournit par défaut une sémantique de type "au moins une fois". Avec un traitement supplémentaire, il peut être modifié pour avoir une sémantique de type "exactement une fois".

Nous fournissons également un modèle Dataflow pour ce cas d'utilisation. Pour en savoir plus, consultez Connecter des flux de modifications à BigQuery.

Surveiller un pipeline

Deux classes de métriques sont disponibles pour surveiller un pipeline Dataflow de flux de modifications.

Métriques Dataflow standards

Dataflow fournit plusieurs métriques pour s'assurer que votre job est en bon état, comme la fraîcheur des données, le retard du système, le débit du job, l'utilisation du CPU par le nœud de calcul, etc. Pour en savoir plus, consultez Utiliser Monitoring pour les pipelines Dataflow.

Pour les pipelines de flux de modifications, deux métriques principales doivent être prises en compte : la latence du système et la fraîcheur des données.

La latence du système indique la durée maximale actuelle (en secondes) pendant laquelle un élément de données est en cours de traitement ou en attente de traitement.

La fraîcheur des données indique le temps écoulé entre le moment actuel (temps réel) et la marque de sortie. La marque de données de sortie T indique que les ressources de calcul ont traité tous les éléments avec une heure d'événement (strictement) antérieure à T. En d'autres termes, la fraîcheur des données mesure le degré de mise à jour du pipeline en ce qui concerne le traitement des événements qu'il a reçus.

Si le pipeline manque de ressources, vous pouvez constater cet effet dans ces deux métriques. La latence du système augmentera, car les éléments devront attendre plus longtemps avant d'être traités. La fraîcheur des données augmentera également, car le pipeline ne pourra pas suivre le volume de données reçues.

Métriques personnalisées du flux de modifications

Ces métriques sont exposées dans Cloud Monitoring et incluent les suivantes :

  • Latence (histogramme) entre l'enregistrement d'un enregistrement dans Spanner et son émission dans une PCollection par le connecteur. Cette métrique permet d'identifier les problèmes de performances (latence) du pipeline.
  • Nombre total d'enregistrements de données lus. Il s'agit d'une indication globale du nombre d'enregistrements émis par le connecteur. Ce nombre doit augmenter en permanence, reflétant la tendance des écritures dans la base de données Spanner sous-jacente.
  • Nombre de partitions en cours de lecture. Des partitions doivent toujours être lues. Si ce nombre est nul, cela signifie qu'une erreur s'est produite dans le pipeline.
  • Nombre total de requêtes émises lors de l'exécution du connecteur. Il s'agit d'une indication globale des requêtes de flux de modifications effectuées sur l'instance Spanner tout au long de l'exécution du pipeline. Vous pouvez l'utiliser pour obtenir une estimation de la charge du connecteur sur la base de données Spanner.

Mettre à jour un pipeline existant

Il est possible de mettre à jour un pipeline en cours d'exécution qui utilise le connecteur SpannerIO pour traiter les flux de modifications si les vérifications de compatibilité des jobs sont réussies. Pour ce faire, vous devez définir explicitement le paramètre de nom de table de métadonnées de la nouvelle tâche lorsque vous la mettez à jour. Utilisez la valeur de l'option de pipeline metadataTable du job que vous mettez à jour.

Si vous utilisez un modèle Dataflow fourni par Google, définissez le nom de la table à l'aide du paramètre spannerMetadataTableName. Vous pouvez également modifier votre job existant pour utiliser explicitement la table de métadonnées avec la méthode withMetadataTable(your-metadata-table-name) dans la configuration du connecteur. Une fois cette opération effectuée, vous pouvez suivre les instructions de la section Lancer votre job de remplacement de la documentation Dataflow pour mettre à jour un job en cours d'exécution.

Bonnes pratiques pour les flux de modification et Dataflow

Vous trouverez ci-dessous quelques bonnes pratiques pour créer des connexions de flux de modifications à l'aide de Dataflow.

Utiliser une base de données de métadonnées distincte

Nous vous recommandons de créer une base de données distincte pour que le connecteur SpannerIO l'utilise pour le stockage des métadonnées, plutôt que de le configurer pour qu'il utilise la base de données de votre application.

Pour en savoir plus, consultez Envisager une base de données de métadonnées distincte.

Dimensionner votre cluster

En règle générale,le nombre initial de nœuds de calcul dans un job de flux de modifications Spanner est d'un nœud de calcul pour 1 000 écritures par seconde. Notez que cette estimation peut varier en fonction de plusieurs facteurs, tels que la taille de chaque transaction, le nombre d'enregistrements de flux de modifications générés à partir d'une même transaction, ainsi que d'autres transformations, agrégations ou récepteurs utilisés dans le pipeline.

Après l'attribution initiale des ressources, il est important de suivre les métriques mentionnées dans Surveiller un pipeline pour s'assurer que le pipeline est en bon état. Nous vous recommandons de tester une taille initiale de pool de nœuds de calcul et de surveiller la façon dont votre pipeline gère la charge, en augmentant le nombre de nœuds si nécessaire. L'utilisation du processeur est une métrique clé pour vérifier si la charge est appropriée et si des nœuds supplémentaires sont nécessaires.

Limitations connues

L'utilisation des flux de modifications Spanner avec Dataflow présente quelques limites connues :

Autoscaling

L'autoscaling pour les pipelines incluant SpannerIO.readChangeStream nécessite Apache Beam 2.39.0 ou version ultérieure.

Si vous utilisez une version d'Apache Beam antérieure à 2.39.0, les pipelines qui incluent SpannerIO.readChangeStream doivent spécifier explicitement l'algorithme d'autoscaling comme NONE, comme décrit dans Autoscaling horizontal.

Pour effectuer manuellement le scaling d'un pipeline Dataflow au lieu d'utiliser l'autoscaling, consultez Effectuer le scaling manuel d'un pipeline de traitement en flux continu.

Exécuteur V2

Le connecteur de flux de modification Spanner nécessite Dataflow Runner V2. Cette valeur doit être spécifiée manuellement lors de l'exécution, sinon une erreur sera générée. Vous pouvez spécifier Runner V2 en configurant votre tâche avec --experiments=use_unified_worker,use_runner_v2.

Instantané

Le connecteur de flux de modifications Spanner n'est pas compatible avec les instantanés Dataflow.

En cours de drainage

Le connecteur de flux de modifications Spanner n'est pas compatible avec le drainage d'une tâche. Vous ne pouvez annuler qu'un job existant.

Vous pouvez également mettre à jour un pipeline existant sans avoir à l'arrêter.

OpenCensus

Pour utiliser OpenCensus afin de surveiller votre pipeline, spécifiez la version 0.28.3 ou ultérieure.

NullPointerException au démarrage du pipeline

Un bug dans la version 2.38.0 d'Apache Beam peut entraîner un NullPointerException lors du démarrage du pipeline dans certaines conditions. Cela empêcherait le démarrage de votre job et afficherait le message d'erreur suivant :

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Pour résoudre ce problème, utilisez Apache Beam version 2.39.0 ou ultérieure, ou spécifiez manuellement la version de beam-sdks-java-core comme 2.37.0 :

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

En savoir plus