E/S gérées Dataflow

Les E/S gérées permettent à Dataflow de gérer des connecteurs d'E/S spécifiques utilisés dans les pipelines Apache Beam. Les E/S gérées simplifient la gestion des pipelines qui s'intègrent aux sources et récepteurs compatibles.

Les E/S gérées se composent de deux éléments qui fonctionnent ensemble :

  • Transformation Apache Beam qui fournit une API commune pour créer des connecteurs d'E/S (sources et récepteurs).

  • Un service Dataflow qui gère ces connecteurs d'E/S pour vous, y compris la possibilité de les mettre à niveau indépendamment de la version d'Apache Beam.

Voici quelques avantages de l'E/S gérée :

  • Mises à niveau automatiques. Dataflow met automatiquement à niveau les connecteurs d'E/S gérés dans votre pipeline. Cela signifie que votre pipeline reçoit des correctifs de sécurité, des améliorations de performances et des corrections de bugs pour ces connecteurs, sans nécessiter de modifications de code. Pour en savoir plus, consultez Mises à niveau automatiques.

  • API cohérente : Traditionnellement, les connecteurs d'E/S dans Apache Beam disposent d'API distinctes, et chaque connecteur est configuré différemment. Managed I/O fournit une API de configuration unique qui utilise des propriétés clé-valeur, ce qui permet d'obtenir un code de pipeline plus simple et plus cohérent. Pour en savoir plus, consultez la documentation de l'API Configuration.

Conditions requises

  • Les SDK suivants sont compatibles avec les E/S gérées :

    • SDK Apache Beam pour Java version 2.58.0 ou ultérieure.
    • Le SDK Apache Beam pour Python version 2.61.0 ou ultérieure.
  • Le service de backend nécessite Dataflow Runner v2. Si l'exécuteur V2 n'est pas activé, votre pipeline s'exécute quand même, mais il ne bénéficie pas des avantages du service d'E/S géré.

Mises à niveau automatiques

Les pipelines Dataflow avec des connecteurs d'E/S gérés utilisent automatiquement la dernière version fiable du connecteur. Les mises à niveau automatiques ont lieu aux points suivants du cycle de vie du job :

  • Envoi de job. Lorsque vous envoyez un job par lot ou de streaming, Dataflow utilise la dernière version du connecteur d'E/S géré qui a été testée et fonctionne correctement.

  • Mises à niveau progressives. Pour les jobs de traitement en flux continu, Dataflow met à niveau vos connecteurs d'E/S gérés dans les pipelines en cours d'exécution à mesure que de nouvelles versions sont disponibles. Vous n'avez pas à vous soucier de la mise à jour manuelle du connecteur ni de la version Apache Beam de votre pipeline.

    Par défaut, les mises à niveau continues sont effectuées dans un délai de 30 jours, c'est-à-dire environ tous les 30 jours. Vous pouvez ajuster la fenêtre ou désactiver les mises à niveau progressives pour chaque job. Pour en savoir plus, consultez Définir la fenêtre de mise à niveau progressive.

    Une semaine avant la mise à niveau, Dataflow écrit un message de notification dans les journaux de messages de job.

  • Tâches de remplacement : Pour les jobs de streaming, Dataflow recherche les mises à jour chaque fois que vous lancez un job de remplacement et utilise automatiquement la dernière version stable connue. Dataflow effectue cette vérification même si vous ne modifiez aucun code dans le job de remplacement.

Le schéma suivant illustre le processus de mise à niveau. L'utilisateur crée un pipeline Apache Beam à l'aide de la version X du SDK. Dataflow met à niveau la version des E/S gérées vers la dernière version compatible. La mise à niveau a lieu lorsque l'utilisateur envoie le job, après la période de mise à niveau progressive ou lorsqu'il envoie un job de remplacement.

Schéma illustrant le processus de mise à niveau des E/S gérées.

Le processus de mise à niveau ajoute environ deux minutes au temps de démarrage du premier job (par projet) qui utilise des E/S gérées, et environ 30 secondes pour les jobs suivants. Pour les mises à niveau progressives, le service Dataflow lance un job de remplacement. Cela peut entraîner un temps d'arrêt temporaire pour votre pipeline, car le pool de nœuds de calcul existant est arrêté et un nouveau pool de nœuds de calcul est démarré. Pour vérifier l'état des opérations d'E/S gérées, recherchez les entrées de journal qui incluent la chaîne "Managed Transform(s)".

Définir la période de mise à niveau progressive

Pour spécifier la période de mise à niveau d'un job Dataflow de streaming, définissez l'option de service managed_transforms_rolling_upgrade_window sur le nombre de jours. La valeur doit être comprise entre 10 et 90 jours, inclus.

Java

--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=DAYS

Python

--dataflow_service_options=managed_transforms_rolling_upgrade_window=DAYS

gcloud

Exécutez la commande gcloud dataflow jobs run avec l'option additional-experiments. Si vous utilisez un modèle Flex qui utilise Managed I/O, exécutez la commande gcloud dataflow flex-template run.

--additional-experiments=managed_transforms_rolling_upgrade_window=DAYS

Pour désactiver les mises à niveau progressives, définissez l'option de service managed_transforms_rolling_upgrade_window sur never. Vous pouvez toujours déclencher une mise à jour en lançant une tâche de remplacement.

Java

--dataflowServiceOptions=managed_transforms_rolling_upgrade_window=never

Python

--dataflow_service_options=managed_transforms_rolling_upgrade_window=never

Go

--dataflow_service_options=managed_transforms_rolling_upgrade_window=never

gcloud

Exécutez la commande gcloud dataflow jobs run avec l'option additional-experiments. Si vous utilisez des modèles Flex, exécutez la commande gcloud dataflow flex-template run.

--additional-experiments=managed_transforms_rolling_upgrade_window=never

API de configuration

Les E/S gérées sont une transformation Apache Beam clé en main qui fournit une API cohérente pour configurer les sources et les récepteurs.

Java

Pour créer une source ou un récepteur compatibles avec Managed I/O, vous utilisez la classe Managed. Spécifiez la source ou le récepteur à instancier, et transmettez un ensemble de paramètres de configuration, comme suit :

Map config = ImmutableMap.<String, Object>builder()
    .put("config1", "abc")
    .put("config2", 1);

pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
    .getSinglePCollection();

Vous pouvez également transmettre les paramètres de configuration sous la forme d'un fichier YAML. Pour obtenir un exemple de code complet, consultez Lire à partir d'Apache Iceberg.

Python

Importez le module apache_beam.transforms.managed et appelez la méthode managed.Read ou managed.Write. Spécifiez la source ou le récepteur à instancier, et transmettez un ensemble de paramètres de configuration, comme suit :

pipeline
| beam.managed.Read(
    beam.managed.SOURCE, # Example: beam.managed.KAFKA
    config={
      "config1": "abc",
      "config2": 1
    }
)

Vous pouvez également transmettre les paramètres de configuration sous la forme d'un fichier YAML. Pour obtenir un exemple de code complet, consultez Lire à partir d'Apache Kafka.

Destinations dynamiques

Pour certains récepteurs, le connecteur d'E/S géré peut sélectionner dynamiquement une destination en fonction des valeurs de champ dans les enregistrements entrants.

Pour utiliser des destinations dynamiques, fournissez une chaîne de modèle pour la destination. La chaîne de modèle peut inclure des noms de champs entre accolades, comme "tables.{field1}". Au moment de l'exécution, le connecteur substitue la valeur du champ pour chaque enregistrement entrant afin de déterminer la destination de cet enregistrement.

Par exemple, supposons que vos données comportent un champ nommé airport. Vous pouvez définir la destination sur "flights.{airport}". Si airport=SFO, l'enregistrement est écrit dans flights.SFO. Pour les champs imbriqués, utilisez la notation par points. Par exemple :{top.middle.nested}.

Pour obtenir un exemple de code montrant comment utiliser les destinations dynamiques, consultez Écrire avec des destinations dynamiques.

Filtrage

Vous pouvez filtrer certains champs avant qu'ils ne soient écrits dans la table de destination. Pour les récepteurs qui acceptent les destinations dynamiques, vous pouvez utiliser les paramètres drop, keep ou only à cette fin. Ces paramètres vous permettent d'inclure des métadonnées de destination dans les enregistrements d'entrée, sans les écrire dans la destination.

Vous ne pouvez définir qu'un seul de ces paramètres pour un récepteur donné.

Paramètre de configuration Type de données Description
drop liste de chaînes Liste des noms de champs à supprimer avant l'écriture dans la destination.
keep liste de chaînes Liste des noms de champs à conserver lors de l'écriture dans la destination. Les autres champs sont supprimés.
only string Nom d'un seul champ à utiliser comme enregistrement de premier niveau à écrire lors de l'écriture dans la destination. Tous les autres champs sont supprimés. Ce champ doit être de type "ligne".

Sources et récepteurs compatibles

Les E/S gérées sont compatibles avec les sources et les récepteurs suivants.

Pour en savoir plus, consultez Connecteurs d'E/S gérés dans la documentation Apache Beam.