E/S gérées Dataflow

Les E/S gérées permettent à Dataflow de gérer des connecteurs d'E/S spécifiques utilisés dans les pipelines Apache Beam. Elles simplifient la gestion des pipelines qui s'intègrent aux sources et aux récepteurs compatibles.

Les E/S gérées sont constituées de deux composants qui fonctionnent ensemble :

  • Une transformation Apache Beam qui fournit une API commune pour créer des connecteurs d'E/S (sources et récepteurs).

  • Un service Dataflow qui gère ces connecteurs d'E/S en votre nom, y compris la possibilité de les mettre à niveau indépendamment de la version Apache Beam.

Les avantages des E/S gérées sont les suivants :

  • Mises à niveau automatiques. Dataflow met automatiquement à niveau les connecteurs d'E/S gérés dans votre pipeline. Cela signifie que votre pipeline reçoit des correctifs de sécurité, des améliorations des performances et des corrections de bugs pour ces connecteurs, sans nécessiter de modifications de code. Pour en savoir plus, consultez la section Mises à niveau automatiques.

  • API cohérente. Traditionnellement, les connecteurs d'E/S dans Apache Beam ont des API distinctes, et chaque connecteur est configuré de manière différente. Les E/S gérées fournissent une API de configuration unique qui utilise des propriétés clé-valeur, ce qui permet d'obtenir un code de pipeline plus simple et plus cohérent. Pour en savoir plus, consultez la section API de configuration.

Conditions requises

  • Les SDK suivants sont compatibles avec les E/S gérées :

    • SDK Apache Beam pour Java version 2.58.0 ou ultérieure.
    • SDK Apache Beam pour Python version 2.61.0 ou ultérieure.
  • Le service de backend nécessite Dataflow Runner v2. Si Runner v2 n'est pas activé, votre pipeline s'exécute toujours, mais il ne bénéficie pas des avantages du service d'E/S géré.

Mises à niveau automatiques

Les pipelines Dataflow avec des connecteurs d'E/S gérés utilisent automatiquement la dernière version fiable du connecteur. Les mises à niveau automatiques se produisent aux points suivants du cycle de vie du job :

  • Soumission de job. Lorsque vous envoyez un job par lot ou de streaming, Dataflow utilise la dernière version du connecteur d'E/S géré qui a été testée et fonctionne correctement.

  • Mises à niveau progressives. Pour les jobs de streaming, Dataflow met à niveau vos connecteurs d'E/S gérés dans les pipelines en cours d'exécution lorsque de nouvelles versions sont disponibles. Vous n'avez pas à vous soucier de la mise à jour manuelle du connecteur ni de la version Apache Beam de votre pipeline.

    Par défaut, les mises à niveau progressives ont lieu dans une fenêtre de 30 jours, c'est-à-dire qu'elles sont effectuées environ tous les 30 jours. Vous pouvez ajuster la fenêtre ou désactiver les mises à niveau progressives pour chaque job. Pour en savoir plus, consultez la section Définir la fenêtre de mise à niveau progressive.

    Une semaine avant la mise à niveau, Dataflow écrit un message de notification dans les journaux de messages du job.

  • Jobs de remplacement. Pour les jobs de streaming, Dataflow recherche les mises à jour chaque fois que vous lancez un job de remplacement, et utilise automatiquement la dernière version connue. Dataflow effectue cette vérification même si vous ne modifiez aucun code dans le job de remplacement.

Le schéma suivant illustre le processus de mise à niveau. L'utilisateur crée un pipeline Apache Beam à l'aide de la version X du SDK. Dataflow met à niveau la version d'E/S gérée vers la dernière version compatible. La mise à niveau a lieu lorsque l'utilisateur envoie le job, après la fenêtre de mise à niveau progressive ou lorsque l'utilisateur envoie un job de remplacement.

Schéma illustrant le processus de mise à niveau des E/S gérées.

Le processus de mise à niveau ajoute environ deux minutes au temps de démarrage du premier job (par projet) qui utilise des E/S gérées, et peut durer environ une demi-minute pour les jobs suivants. Pour les mises à niveau progressives, le service Dataflow lance un job de remplacement. Cela peut entraîner un temps d'arrêt temporaire pour votre pipeline, car le pool de nœuds de calcul existant est arrêté et un nouveau pool de nœuds de calcul démarre. Pour vérifier l'état des opérations d'E/S gérées, recherchez les entrées de journal qui incluent la chaîne "Managed Transform(s)".

Définir la fenêtre de mise à niveau progressive

Pour spécifier la fenêtre de mise à niveau d'un job Dataflow de streaming, définissez l'managed_transforms_rolling_upgrade_window option de service sur le nombre de jours. La valeur doit être comprise entre 10 et 90 jours inclus.

Java

--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS

Python

--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS

gcloud

Utilisez la gcloud dataflow jobs run commande avec l'option additional-experiments. Si vous utilisez un modèle Flex qui utilise des E/S gérées, utilisez la gcloud dataflow flex-template run commande.

--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS

Pour désactiver les mises à niveau progressives, définissez l'option de service managed_transforms_rolling_upgrade_window sur never. Vous pouvez toujours déclencher une mise à jour en lançant un job de remplacement.

Java

--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never

Python

--dataflow_service_options=managed_transforms_rolling_upgrade_window=never

Go

--dataflow_service_options=managed_transforms_rolling_upgrade_window=never

gcloud

Utilisez la gcloud dataflow jobs run commande avec l'option additional-experiments. Si vous utilisez des modèles Flex, utilisez la gcloud dataflow flex-template run commande.

--additional-experiments=managed_transforms_rolling_upgrade_window=never

API de configuration

Les E/S gérées sont une transformation Apache Beam clé en main qui fournit une API cohérente pour configurer les sources et les récepteurs.

Java

Pour créer une source ou un récepteur compatible avec les E/S gérées, utilisez la Managed classe. Spécifiez la source ou le récepteur à instancier, et transmettez un ensemble de paramètres de configuration, comme suit :

Map config = ImmutableMap.<String, Object>builder()
    .put("config1", "abc")
    .put("config2", 1);

pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
    .getSinglePCollection();

Vous pouvez également transmettre des paramètres de configuration sous forme de fichier YAML. Pour obtenir un exemple de code complet, consultez la section Lire à partir d'Apache Iceberg.

Python

Importez le apache_beam.transforms.managed module et appelez la méthode managed.Read ou managed.Write. Spécifiez la source ou le récepteur à instancier, et transmettez un ensemble de paramètres de configuration, comme suit :

pipeline
| beam.managed.Read(
    beam.managed.SOURCE, # Example: beam.managed.KAFKA
    config={
      "config1": "abc",
      "config2": 1
    }
)

Vous pouvez également transmettre des paramètres de configuration sous forme de fichier YAML. Pour obtenir un exemple de code complet, consultez la section Lire à partir d'Apache Kafka.

Destinations dynamiques

Pour certains récepteurs, le connecteur d'E/S géré peut sélectionner dynamiquement une destination en fonction des valeurs de champ dans les enregistrements entrants.

Pour utiliser des destinations dynamiques, fournissez une chaîne de modèle pour la destination. La chaîne de modèle peut inclure des noms de champs entre accolades, tels que "tables.{field1}". Au moment de l'exécution, le connecteur remplace la valeur du champ pour chaque enregistrement entrant, afin de déterminer la destination de cet enregistrement.

Supposons, par exemple, que vos données comportent un champ nommé airport. Vous pouvez définir la destination sur "flights.{airport}". Si airport=SFO, l'enregistrement est écrit dans flights.SFO. Pour les champs imbriqués, utilisez la notation par points. Par exemple : {top.middle.nested}.

Pour obtenir un exemple de code montrant comment utiliser des destinations dynamiques, consultez la section Écrire avec des destinations dynamiques.

Filtrage

Vous pouvez filtrer certains champs avant qu'ils ne soient écrits dans la table de destination. Pour les récepteurs compatibles avec les destinations dynamiques, vous pouvez utiliser le paramètre drop, keep ou only à cet effet. Ces paramètres vous permettent d'inclure des métadonnées de destination dans les enregistrements d'entrée, sans écrire les métadonnées dans la destination.

Vous ne pouvez définir qu'un seul de ces paramètres pour un récepteur donné.

Paramètre de configuration Type de données Description
drop liste de chaînes Liste des noms de champs à supprimer avant l'écriture dans la destination.
keep liste de chaînes Liste des noms de champs à conserver lors de l'écriture dans la destination. Les autres champs sont supprimés.
only chaîne Nom d'un seul champ à utiliser comme enregistrement de premier niveau à écrire lors de l'écriture dans la destination. Tous les autres champs sont supprimés. Ce champ doit être de type ligne.

Sources et récepteurs compatibles

Les E/S gérées sont compatibles avec les sources et les récepteurs suivants.

Pour en savoir plus, consultez la section Connecteurs d'E/S gérés dans la documentation Apache Beam.