Diffuser des modifications en continu avec Dataflow

Le connecteur Bigtable Beam vous permet d'utiliser Dataflow pour lire les enregistrements de modifications des données Bigtable sans avoir à suivre ni à traiter les modifications de partition dans votre code, car le connecteur gère cette logique pour vous.

Ce document explique comment configurer et utiliser le connecteur Bigtable Beam pour lire un flux de modifications à l'aide d'un pipeline Dataflow. Avant de lire ce document, vous devez consulter la présentation des flux de modifications et connaître Dataflow.

Alternatives à la création de votre propre pipeline

Si vous ne souhaitez pas créer votre propre pipeline Dataflow, vous pouvez utiliser l'une des options suivantes.

Vous pouvez utiliser un modèle Dataflow fourni par Google.

Vous pouvez également utiliser les exemples de code du tutoriel ou du guide de démarrage rapide Bigtable comme point de départ pour votre code.

Assurez-vous que le code que vous générez utilise la version 26.14.0 ou ultérieure de google cloud libraries-bom.

Informations sur le connecteur

La méthode du connecteur Bigtable Beam, BigtableIO.readChangeStream, vous permet de lire un flux d'enregistrements de modifications des données (ChangeStreamMutation) que vous pouvez traiter. Le connecteur Bigtable Beam est un composant du dépôt GitHub Apache Beam repository. Pour obtenir une description du code du connecteur, consultez les commentaires dans BigtableIO.java.

Vous devez utiliser le connecteur avec la version 2.48.0 ou ultérieure de Beam. Vérifiez la compatibilité de l'environnement d'exécution Apache Beam runtime support pour vous assurer que vous utilisez une version de Java compatible. Vous pouvez ensuite déployer un pipeline qui utilise le connecteur vers Dataflow, qui gère le provisionnement et la gestion des ressources, et contribue à la scalabilité et à la fiabilité du traitement des données en flux.

Pour en savoir plus sur le modèle de programmation Apache Beam, consultez la documentation Beam.

Regrouper des données sans heure d'événement

Les enregistrements de modifications des données diffusés en continu à l'aide du connecteur Bigtable Beam ne sont pas compatibles avec les fonctions Dataflow qui dépendent des heures d'événement.

Comme expliqué dans Réplication et filigranes, un filigrane bas peut ne pas progresser si la réplication de la partition n'a pas rattrapé le reste de l'instance. Lorsqu'un filigrane bas cesse de progresser, le flux de modifications peut se bloquer.

Pour éviter que le flux ne se bloque, le connecteur Bigtable Beam génère toutes les données avec un code temporel de sortie égal à zéro. Le code temporel zéro permet à Dataflow de considérer tous les enregistrements de modifications des données comme des données tardives. Par conséquent, les fonctionnalités Dataflow qui dépendent des heures d'événement ne sont pas compatibles avec les flux de modifications Bigtable. Plus précisément, vous ne pouvez pas utiliser de fonctions de fenêtrage, de déclencheurs d'heure d'événement, ni de minuteurs d'heure d'événement.

Vous pouvez utiliser GlobalWindows avec des déclencheurs d'heure non événementielle pour regrouper ces données tardives dans des volets, comme illustré dans l' exemple du tutoriel. Pour en savoir plus sur les déclencheurs et les volets, consultez la section Déclencheurs du guide de programmation Beam.

Autoscaling

Le connecteur est compatible avec l'autoscaling Dataflow, qui est activé par défaut lorsque vous utilisez Runner v2 (obligatoire). L'algorithme d'autoscaling Dataflow prend en compte le backlog estimé du flux de modifications, qui peut être surveillé sur la page de surveillance Dataflow dans la section Backlog. Utilisez l'option --maxNumWorkers lorsque vous déployez une tâche pour limiter le nombre de nœuds de calcul.

Pour effectuer le scaling manuel de votre pipeline au lieu d'utiliser l'autoscaling, consultez la page Effectuer le scaling manuel d'un pipeline de traitement en flux continu.

Limites

Tenez compte des limites suivantes avant d'utiliser le connecteur Bigtable Beam avec Dataflow.

Exécuteur Dataflow v2

Le connecteur ne peut être exécuté qu'à l'aide de l'exécuteur Dataflow v2. Pour l'activer, spécifiez --experiments=use_runner_v2 dans les arguments de ligne de commande. L'exécution avec l'exécuteur v1 entraîne l'échec de votre pipeline avec l'exception suivante :

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Instantanés

Le connecteur n'est pas compatible avec les instantanés Dataflow.

Doublons

Le connecteur Bigtable Beam diffuse en continu les modifications pour chaque clé de ligne et chaque cluster dans l'ordre des codes temporels de commit, mais comme il redémarre parfois à des moments antérieurs du flux, il peut produire des doublons.

Redémarrages de pipeline

Si un pipeline Dataflow est arrêté pendant une longue période, les enregistrements de modifications des données peuvent être antérieurs à la limite de conservation. Lorsque le pipeline est repris, Bigtable le fait échouer afin que vous puissiez démarrer un nouveau pipeline avec une nouvelle heure de début de requête qui se situe dans la période de conservation. Bigtable effectue cette opération au lieu de faire progresser silencieusement l'heure de requête du pipeline d'origine, afin d'éviter la suppression involontaire d'enregistrements de modifications des données dont les codes temporels ne sont pas compris dans la période de conservation spécifiée.

Avant de commencer

Avant d'utiliser le connecteur, remplissez les conditions préalables suivantes.

Configurer l'authentification

Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez la gcloud CLI, puis configurez le service Identifiants par défaut de l'application avec vos identifiants utilisateur.

  1. Installez la Google Cloud CLI.

  2. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  3. Si vous utilisez un shell local, créez des identifiants d'authentification locaux pour votre compte utilisateur : Créer des identifiants d'authentification locaux pour votre compte utilisateur :

    gcloud auth application-default login

    Vous n'avez pas besoin d'effectuer cette opération si vous utilisez Cloud Shell.

    Si une erreur d'authentification est renvoyée et que vous utilisez un fournisseur d'identité (IdP) externe, vérifiez que vous vous êtes connecté à la gcloud CLI avec votre identité fédérée.

Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

Pour en savoir plus sur la configuration de l'authentification pour un environnement de production, consultez Configurer les identifiants par défaut de l'application pour le code exécuté sur Google Cloud .

Activer un flux de modifications

Vous devez activer un flux de modifications sur une table avant de pouvoir le lire. Vous pouvez également créer une table avec les flux de modifications activés.

Table de métadonnées du flux de modifications

Lorsque vous diffusez des modifications en continu avec Dataflow, le connecteur Bigtable Beam crée une table de métadonnées nommée __change_stream_md_table par défaut. La table de métadonnées du flux de modifications gère l'état opérationnel du connecteur et stocke les métadonnées sur les enregistrements de modifications des données.

Par défaut, le connecteur crée la table dans la même instance que celle qui est diffusée en continu. Pour vous assurer que la table fonctionne correctement, le profil d'application de la table de métadonnées doit utiliser un routage à cluster unique et avoir activé les transactions à ligne unique.

Pour en savoir plus sur la diffusion en continu des modifications de Bigtable avec le connecteur Bigtable Beam, consultez la documentation BigtableIO.

Rôles requis

Pour obtenir les autorisations nécessaires pour lire un flux de modifications Bigtable à l'aide de Dataflow, demandez à votre administrateur de vous accorder les rôles IAM suivants.

Pour lire les modifications de Bigtable, vous avez besoin de ce rôle :

  • Administrateur Bigtable (roles/bigtable.admin) sur l'instance Bigtable contenant la table à partir de laquelle vous prévoyez de diffuser les modifications en continu

Pour exécuter la tâche Dataflow, vous avez besoin de ces rôles :

Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer les accès.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Ajouter le connecteur Bigtable Beam en tant que dépendance

Ajoutez un code semblable à la dépendance suivante à votre fichier Maven pom.xml. La version doit être 2.48.0 ou ultérieure.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Lire le flux de modifications

Pour créer un pipeline Dataflow afin de lire vos enregistrements de modifications des données, configurez le connecteur, puis ajoutez des transformations et des récepteurs. Vous utilisez ensuite le connecteur pour lire les objets ChangeStreamMutation dans un pipeline Beam.

Les exemples de code de cette section, écrits en Java, montrent comment créer un pipeline et l'utiliser pour convertir des paires clé-valeur en chaîne. Chaque paire se compose d'une clé de ligne et d'un objet ChangeStreamMutation. Le pipeline convertit les entrées de chaque objet en une chaîne séparée par des virgules.

Créer le pipeline

Cet exemple de code Java montre comment créer le pipeline :

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Traiter les enregistrements de modifications des données

Cet exemple montre comment parcourir toutes les entrées d'un enregistrement de modifications des données pour une ligne et appeler une méthode de conversion en chaîne en fonction du type d'entrée.

Pour obtenir la liste des types d'entrées qu'un enregistrement de modifications des données peut contenir, consultez la section Contenu d'un enregistrement de modifications des données.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Dans cet exemple, une entrée write est convertie :

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Dans cet exemple, une entrée de suppression de cellules est convertie :

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Dans cet exemple, une entrée de suppression d'une famille de colonnes est convertie :


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Surveiller

Les ressources suivantes de la Google Cloud console vous permettent de surveiller vos Google Cloud ressources lorsque vous exécutez un pipeline Dataflow pour lire un flux de modifications Bigtable :

Vérifiez notamment les métriques suivantes :

  • Sur la page "Insights sur le système Bigtable", vérifiez les métriques suivantes :
    • Données Utilisation du processeur par flux de modifications dans la métrique cpu_load_by_app_profile_by_method_by_table. Affiche l'impact du flux de modifications sur l'utilisation du processeur de votre cluster.
    • Flux de modifications de l'utilisation du stockage (octets) (change_stream_log_used_bytes).
  • Sur la page de surveillance Dataflow, vérifiez la fraîcheur des données. Cette métrique indique la différence entre l'heure actuelle et le filigrane, qui est d'environ deux minutes, avec des pics occasionnels d'une ou deux minutes de plus. La fraîcheur des données n'indique pas si les enregistrements de modifications des données sont traités lentement. Pour garantir l'intégrité et les performances continues de vos applications critiques, surveillez la métrique de fraîcheur des données Dataflow et effectuez les actions suivantes :

    • Si la métrique de fraîcheur des données est systématiquement supérieure au seuil, votre pipeline peut être sous-provisionné. Nous vous recommandons d'ajouter des nœuds de calcul Dataflow.
    • Si les nœuds de calcul Dataflow sont correctement provisionnés, mais que la fraîcheur des données a augmenté ou est systématiquement élevée, contactez Google Cloud l'assistance.
  • La métrique Dataflow processing_delay_from_commit_timestamp_MEAN peut vous indiquer le temps de traitement moyen des enregistrements de modifications des données pendant la durée de vie de la tâche.

La métrique Bigtable server/latencies n'est pas utile lorsque vous surveillez un pipeline Dataflow qui lit un flux de modifications Bigtable, car elle reflète la durée de la requête de diffusion en continu, et non la latence de traitement de l'enregistrement de modifications des données. Une latence élevée dans un flux de modifications ne signifie pas que les requêtes sont traitées lentement, mais que la connexion est restée ouverte pendant cette durée.

Étape suivante