Cette page fournit des conseils et des recommandations pour mettre à niveau vos pipelines de traitement en flux continu. Par exemple, vous devrez peut-être passer à une version plus récente du SDK Apache Beam ou mettre à jour le code de votre pipeline. Différentes options sont proposées pour répondre à différents scénarios.
Contrairement aux pipelines de traitement par lot qui s'arrêtent une fois le job terminé, les pipelines de traitement en flux continu doivent souvent s'exécuter en continu pour assurer un traitement sans interruption. Par conséquent, lorsque vous mettez à niveau des pipelines de traitement en flux continu, vous devez prendre en compte les éléments suivants :
- Il peut s'avérer nécessaire de minimiser ou d'éviter les interruptions du pipeline. Dans certains cas, il est possible de tolérer une interruption temporaire du traitement pendant le déploiement d'une nouvelle version du pipeline. Dans d'autres cas, votre application ne pourra tolérer aucune interruption.
- Les processus de mise à jour de pipeline doivent gérer les modifications de schéma de manière à minimiser les interruptions du traitement des messages et des autres systèmes associés. Par exemple, si le schéma des messages dans un pipeline de traitement des événements change, il peut être nécessaire de reporter également ces modifications de schéma dans les récepteurs de données en aval.
Vous pouvez utiliser l'une des méthodes suivantes pour mettre à jour les pipelines de traitement en flux continu, en fonction de vos exigences de pipeline et de mise à jour :
- Effectuer des mises à jour en cours
- Lancer un job de remplacement
- Exécuter des pipelines en parallèle
Pour plus d'informations sur les problèmes que vous pouvez rencontrer lors d'une mise à jour et sur la façon de les éviter, consultez les sections Valider un job de remplacement et Vérifier la compatibilité des jobs.
Bonnes pratiques
- Mettez à niveau la version du SDK Apache Beam séparément de toute modification du code du pipeline.
- Testez votre pipeline après chaque modification avant d'effectuer des mises à jour supplémentaires.
- Mettez régulièrement à niveau la version du SDK Apache Beam utilisée par votre pipeline.
- Utilisez des méthodes automatisées dans la mesure du possible, comme les mises à jour en cours ou les mises à jour automatisées de pipelines parallèles.
Effectuer des mises à jour en cours
Vous pouvez mettre à jour certains pipelines de traitement en flux continu en cours sans arrêter le job. Ce scénario est appelé une mise à jour de job en cours. Les mises à jour de job en cours ne sont disponibles que dans des circonstances limitées :
- Le job doit utiliser Streaming Engine.
- Le job doit être à l'état "en cours d'exécution".
- Vous modifiez uniquement le nombre de nœuds de calcul utilisés par le job.
Pour plus d'informations, consultez la section Définir la plage d'autoscaling de la page "Autoscaling horizontal".
Pour obtenir des instructions sur la façon d'effectuer une mise à jour de job en cours, consultez la page Mettre à jour un pipeline existant.
Lancer un job de remplacement
Si le job mis à jour est compatible avec le job existant, vous pouvez mettre à jour votre pipeline à l'aide de l'option update. Lorsque vous remplacez un job existant, un nouveau job exécute le code de votre pipeline mis à jour.
Le service Dataflow conserve le nom du job, mais exécute le job de remplacement avec un ID de job mis à jour. Ce processus peut entraîner des temps d'arrêt pendant l'arrêt de la tâche existante, l'exécution de la vérification de compatibilité et le démarrage de la nouvelle tâche. Pour en savoir plus, consultez la section Effets du remplacement d'un job.
Dataflow effectue une vérification de compatibilité pour s'assurer que le code du pipeline mis à jour peut être déployé en toute sécurité sur le pipeline en cours d'exécution. Certaines modifications de code entraînent l'échec de la vérification de compatibilité, par exemple lorsque des entrées secondaires sont ajoutées ou supprimées d'une étape existante. Lorsque la vérification de compatibilité échoue, vous ne pouvez pas effectuer de mise à jour de job sur place.
Pour obtenir des instructions expliquant comment lancer un job de remplacement, consultez la section Lancer un job de remplacement.
Si la mise à jour du pipeline n'est pas compatible avec le job actuel, vous devez arrêter et remplacer le pipeline. Si votre pipeline ne peut pas tolérer de temps d'arrêt, exécutez des pipelines en parallèle.
Arrêter et remplacer des pipelines
Si vous pouvez interrompre temporairement le traitement, vous pouvez annuler ou drainer le pipeline, puis le remplacer par le pipeline mis à jour. L'annulation d'un pipeline oblige Dataflow à interrompre immédiatement le traitement et à arrêter les ressources le plus rapidement possible, ce qui peut entraîner une perte des données en cours de traitement, appelées données en cours de transfert. Dans la plupart des cas, le drainage est l'action recommandée pour éviter ce problème. Vous pouvez également utiliser des instantanés Dataflow pour enregistrer l'état d'un pipeline de traitement en flux continu, ce qui vous permet de démarrer une nouvelle version de votre tâche Dataflow sans perdre l'état. Pour en savoir plus, consultez la page Utiliser des instantanés Dataflow.
Le drainage d'un pipeline ferme immédiatement toutes les fenêtres en cours et actionne tous les déclencheurs. Bien que les données en cours de transfert ne soient pas perdues, le drainage peut entraîner des données incomplètes pour les fenêtres. Dans ce cas, les fenêtres en cours de traitement émettent des résultats partiels ou incomplets. Pour plus d'informations, consultez la section Effets liés au drainage d'un job. Une fois le job existant terminé, vous pouvez lancer un nouveau job de traitement en flux continu contenant le code de votre pipeline mis à jour, ce qui permet de reprendre le traitement.
Avec cette méthode, vous subissez un temps d'arrêt entre le moment où le job de traitement en flux continu existant s'arrête et le moment où le pipeline de remplacement est prêt à prendre le relais. Cependant, le fait d'annuler ou de drainer un pipeline existant puis de lancer un nouveau job avec le pipeline mis à jour est moins compliqué que d'exécuter des pipelines en parallèle.
Pour obtenir des instructions plus détaillées, consultez la section Drainer un job Dataflow. Après avoir drainé le job actuel, démarrez un nouveau job portant le même nom.
Relancer le traitement des messages avec des fonctionnalités d'instantanés et de recherche Pub/Sub
Dans certains cas, après avoir remplacé ou annulé un pipeline drainé, vous devrez peut-être relancer le traitement des messages Pub/Sub déjà distribués. Par exemple, vous devrez peut-être utiliser une logique métier mise à jour pour relancer le traitement des données. La recherche Pub/Sub est une fonctionnalité qui vous permet de relire les messages d'un instantané Pub/Sub. Vous pouvez utiliser la recherche Pub/Sub avec Dataflow pour relancer le traitement des messages à partir du moment où l'instantané d'abonnement est créé.
Pendant le développement et les tests, vous pouvez également utiliser la fonctionnalité de recherche Pub/Sub pour relire plusieurs fois les messages connus afin de vérifier le résultat de votre pipeline. Lorsque vous utilisez la recherche Pub/Sub, ne recherchez pas d'instantané d'abonnement lorsque l'abonnement est utilisé par un pipeline. Si vous le faites, la recherche pourrait invalider la logique du filigrane de Dataflow et affecter le traitement "exactement une fois" des messages Pub/Sub.
Voici un workflow recommandé de gcloud CLI pour utiliser les fonctionnalités d'instantané et de recherche Pub/Sub avec des pipelines Dataflow dans une fenêtre de terminal :
Pour créer un instantané de l'abonnement, exécutez la commande
gcloud pubsub snapshots create:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Pour drainer ou annuler le pipeline, utilisez la commande
gcloud dataflow jobs drainou la commandegcloud dataflow jobs cancel:gcloud dataflow jobs drain JOB_ID
ou
gcloud dataflow jobs cancel JOB_ID
Pour rechercher l'instantané, exécutez la commande
gcloud pubsub subscriptions seek:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Déployez un nouveau pipeline qui utilise l'abonnement.
Exécuter des pipelines en parallèle
Pour éviter toute interruption de votre pipeline de traitement en flux continu lors d'une mise à jour, vous pouvez exécuter des pipelines en parallèle. Cette approche vous permet de lancer un nouveau job de traitement en flux continu avec le code de votre pipeline mis à jour et de l'exécuter en parallèle avec le job existant. Vous pouvez utiliser le workflow de déploiement de mise à jour de pipeline parallèle automatisé de Dataflow ou effectuer les étapes manuellement.
Présentation des pipelines parallèles
Lorsque vous créez le pipeline, utilisez la même stratégie de fenêtrage que celle utilisée pour le pipeline existant. Pour le workflow manuel, laissez le pipeline existant s'exécuter jusqu'à ce que son filigrane dépasse l'horodatage de la première fenêtre complète traitée par le pipeline mis à jour. Ensuite, drainez ou annulez le pipeline existant. Si vous utilisez le workflow automatisé, cette tâche est effectuée pour vous. Le pipeline mis à jour continue de s'exécuter pour prendre le relais et s'occuper du traitement de manière autonome.
Le diagramme suivant illustre ce processus.
Dans le diagramme, le Pipeline B correspond à la tâche mise à jour qui relaye le Pipeline A. La valeur t correspond à l'horodatage de la fenêtre complète la plus ancienne traitée par le pipeline B. La valeur w correspond au filigrane du Pipeline A. Pour des raisons de simplicité, un filigrane est considéré comme étant parfait sans données tardives. Le traitement et la durée d'exécution sont représentés sur l'axe horizontal. Les deux pipelines utilisent des fenêtres à durée fixe (fenêtres bascules) de cinq minutes. Les résultats sont déclenchés une fois que le filigrane dépasse la fin de chaque fenêtre.
Comme des résultats simultanés se produisent pendant la période où les deux pipelines se chevauchent, configurez les deux pipelines pour écrire les résultats dans différentes destinations. Les systèmes en aval peuvent ainsi utiliser un extrait des deux récepteurs de destination, par exemple une vue de base de données, pour interroger les résultats combinés. Ces systèmes peuvent également utiliser l'extrait pour dédupliquer les résultats de la période de chevauchement. Pour en savoir plus, consultez Gérer les sorties en double.
Limites
L'utilisation de mises à jour parallèles automatiques ou manuelles des pipelines présente les limites suivantes :
- Mises à jour automatiques uniquement : le nouveau job parallèle doit être un job Streaming Engine.
- Les anciens et nouveaux noms de tâches doivent être différents, car les tâches simultanées portant le même nom ne sont pas autorisées.
- Exécuter deux pipelines en parallèle sur la même entrée peut entraîner des données en double, des agrégations partielles et des problèmes d'ordre potentiels lorsque les données sont insérées dans le récepteur. Le système en aval doit être conçu pour anticiper et gérer ces résultats.
- Lors de la lecture à partir d'une source Pub/Sub, il n'est pas recommandé d'utiliser le même abonnement pour plusieurs pipelines, car cela peut entraîner des problèmes d'exactitude. Toutefois, dans certains cas d'utilisation, comme les pipelines d'extraction, de transformation et de chargement (ETL), utiliser le même abonnement dans deux pipelines peut réduire la duplication. Des problèmes d'autoscaling sont susceptibles de se produire chaque fois que vous fournissez une valeur non nulle pour la durée de chevauchement. Cela peut être atténué en utilisant la fonctionnalité de mise à jour des jobs en cours. Pour en savoir plus, consultez Affiner l'autoscaling pour vos pipelines de traitement en flux continu Pub/Sub.
- Pour Apache Kafka, vous pouvez réduire le nombre de doublons en activant l'enregistrement des décalages dans Kafka. Pour activer l'enregistrement des décalages dans Kafka, consultez Enregistrer les décalages dans Kafka.
Mises à jour parallèles automatisées des pipelines
Dataflow fournit une compatibilité avec les API pour lancer un job de remplacement parallèle. Cette API de style déclaratif abstrait le travail manuel d'exécution des étapes procédurales. Vous déclarez le job que vous souhaitez mettre à jour, puis un nouveau job s'exécute en parallèle avec l'ancien. Une fois le nouveau job exécuté pendant la durée que vous avez spécifiée, l'ancien job est vidé. Cette fonctionnalité élimine les pauses de traitement lors des mises à jour et réduit l'effort opérationnel nécessaire pour mettre à jour les pipelines incompatibles.
Cette méthode de mise à jour est idéale pour les pipelines qui peuvent tolérer certains doublons ou agrégations partielles, et qui ne nécessitent pas d'ordre strict lors de l'insertion de données. Il est bien adapté aux pipelines ETL, ainsi qu'aux pipelines utilisant le mode de traitement en flux continu de type "au moins une fois" et la transformation Redistribute avec l'option "Autoriser les doublons" définie sur true.
Envoyer une demande de mise à jour de pipeline parallèle automatisée
Pour utiliser le workflow automatisé, lancez un nouveau job de streaming avec les options de service suivantes. Vous devez lancer le nouveau job avec un nom différent de celui de l'ancien job.
Java
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Vous pouvez également spécifier l'ID de l'ancienne tâche :
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Python
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Vous pouvez également spécifier l'ID de l'ancienne tâche :
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Go
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Vous pouvez également spécifier l'ID de l'ancienne tâche :
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
gcloud
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Vous pouvez également spécifier l'ID de l'ancienne tâche :
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Remplacez les variables suivantes :
- Vous devez fournir
parallel_replace_job_nameouparallel_replace_job_idpour identifier le job à remplacer.OLD_JOB_NAME: si vous utilisezparallel_replace_job_name, nom du job à remplacer.OLD_JOB_ID: si vous utilisezparallel_replace_job_id, ID du job à remplacer.
Vous devez fournir une valeur
parallel_replace_job_min_parallel_pipelines_duration.DURATION: durée minimale pendant laquelle les deux pipelines s'exécutent en parallèle sous forme de nombre entier ou à virgule flottante. Une fois cette durée écoulée, un signal de vidange est envoyé à l'ancien job.La durée doit être comprise entre 0 seconde (
0s) et 31 jours (744h). Utilisezs,methpour spécifier les secondes, les minutes et les heures. Par exemple,10mcorrespond à 10 minutes.
Lorsque vous lancez le nouveau job, Dataflow attend que tous les nœuds de calcul soient provisionnés avant de commencer à traiter les données. Pour surveiller l'état du déploiement, consultez les journaux des jobs Dataflow.
Exécuter manuellement des pipelines en parallèle
Pour les scénarios plus complexes ou lorsque vous avez besoin de mieux contrôler le processus de mise à jour, vous pouvez exécuter manuellement des pipelines en parallèle. Laissez le pipeline existant s'exécuter jusqu'à ce que son filigrane dépasse l'horodatage de la première fenêtre complète traitée par le pipeline mis à jour. Ensuite, drainez ou annulez le pipeline existant.
Gérer les résultats en double
L'exemple suivant décrit une approche pour gérer les résultats en double. Les deux pipelines écrivent les résultats dans des destinations différentes, utilisent des systèmes en aval pour interroger les résultats et dédupliquent les résultats de la période de chevauchement. Cet exemple utilise un pipeline qui lit les données d'entrée à partir de Pub/Sub, effectue un traitement et écrit les résultats dans BigQuery.
Dans l'état initial, le pipeline de streaming existant (Pipeline A) s'exécute et lit des messages à partir d'un sujet Pub/Sub (Topic) à l'aide d'un abonnement (Subscription A). Les résultats sont écrits dans une table BigQuery (Table A). Ils sont utilisés via une vue BigQuery, qui sert de façade pour masquer les modifications de table sous-jacentes. Il s'agit d'une méthode de conception appelée modèle de façade. Le diagramme suivant illustre l'état initial.
Créez un nouvel abonnement (Subscription B) pour le pipeline mis à jour. Déployez le pipeline mis à jour (Pipeline B), qui lit le sujet Pub/Sub (Topic) à l'aide de l'abonnement Subscription B et écrit dans une table BigQuery distincte (Table B). Le diagramme suivant illustre ce flux.
À ce stade, les pipelines Pipeline A et Pipeline B s'exécutent en parallèle et écrivent les résultats dans des tables distinctes. Vous enregistrez le temps t en tant qu'horodatage de la fenêtre complète la plus ancienne traitée par le pipeline B.
Lorsque le filigrane du Pipeline A dépasse le temps t, drainez le Pipeline A. Lorsque vous drainez le pipeline, toutes les fenêtres ouvertes se ferment et le traitement des données en cours se termine. Si le pipeline contient des fenêtres et que des fenêtres complètes sont importantes (en supposant qu'elles ne contiennent aucune donnée tardive), laissez les deux pipelines s'exécuter jusqu'au chevauchement des fenêtres complètes avant de drainer le Pipeline A. Arrêtez le job de traitement en flux continu du Pipeline A une fois que toutes les données en cours ont été traitées et écrites dans la Table A. Le diagramme suivant illustre ce stade.
À ce stade, seul le pipeline B est en cours d'exécution. Vous pouvez effectuer une requête à partir d'une vue BigQuery (vue de façade), qui sert de façade pour masquer la table A et la table B. Pour les lignes ayant le même horodatage dans les deux tables, configurez la vue de façon à renvoyer les lignes de la Table B ou revenir à la Table A si les lignes n'existent pas dans la Table B. Le diagramme suivant montre la vue (vue de façade) lue à la fois dans la table A et la table B.
À ce stade, vous pouvez supprimer l'abonnement Subscription A.
Lorsque des problèmes sont détectés avec un nouveau déploiement de pipeline, l'utilisation de pipelines en parallèle peut simplifier le rollback. Dans cet exemple, vous souhaiterez peut-être maintenir l'exécution du Pipeline A pendant que vous surveillez le bon fonctionnement du Pipeline B. En cas de problème avec le Pipeline B, vous pouvez effectuer un rollback et restaurer le Pipeline A.
Gérer les mutations de schéma
Les systèmes de gestion de données doivent souvent s'adapter aux mutations de schéma au fil du temps, parfois en raison de l'évolution des besoins métier ou pour des raisons techniques. L'application de mises à jour de schéma nécessite généralement une planification et une exécution minutieuses pour éviter de perturber les systèmes d'information de l'entreprise.
Prenons l'exemple d'un pipeline qui lit les messages contenant des charges utiles JSON à partir d'un sujet Pub/Sub. Le pipeline convertit chaque message en instance TableRow, puis écrit les lignes dans une table BigQuery. Le schéma de la table de sortie est semblable aux messages traités par le pipeline.
Dans le diagramme suivant, le schéma est nommé Schéma A.
Au fil du temps, le schéma du message peut subir des mutations complexes. Par exemple, des champs sont ajoutés, supprimés ou remplacés. Le Schéma A évolue en un nouveau schéma. Dans la discussion qui suit, le nouveau schéma est nommé Schéma B. Dans ce cas, le pipeline A doit être mis à jour et le schéma de la table de sortie doit être compatible avec le schéma B.
Pour la table de sortie, vous pouvez effectuer certaines mutations de schéma sans temps d'arrêt.
Pour donner un exemple, vous pouvez ajouter de nouveaux champs ou assouplir les modes de colonne, par exemple en remplaçant REQUIRED par NULLABLE, sans temps d'arrêt.
Ces mutations n'ont généralement pas d'incidence sur les requêtes existantes. Cependant, les mutations de schéma qui modifient ou suppriment des champs de schéma existants interrompent les requêtes ou entraînent d'autres interruptions. L'approche suivante permet d'adapter les modifications sans nécessiter de temps d'arrêt.
Séparez les données écrites par le pipeline dans une table principale et dans une ou plusieurs tables de préproduction. La table principale stocke les données historiques écrites par le pipeline. Les tables de préproduction stockent les derniers résultats du pipeline. Vous pouvez définir une vue de façade BigQuery pour les tables principales et les tables de préproduction. Cela permet aux utilisateurs d'interroger aussi bien des données historiques que des données mises à jour.
Le diagramme suivant montre comment modifier le flux de pipeline précédent pour inclure une table de préproduction (Staging Table A), une table principale et une vue de façade.
Dans le flux révisé, le pipeline A traite les messages qui utilisent le schéma A et écrit le résultat dans la table de préproduction A, qui dispose d'un schéma compatible. La table principale contient des données historiques écrites par des versions précédentes du pipeline, ainsi que des résultats fusionnés périodiquement à partir de la table de préproduction. Les utilisateurs peuvent interroger des données à jour et des données historiques en temps réel, à l'aide de la vue de façade.
Lorsque le schéma du message évolue du schéma A au schéma B, vous pouvez mettre à jour le code du pipeline pour qu'il soit compatible avec les messages qui utilisent le schéma B. Le pipeline existant doit être mis à jour avec la nouvelle mise en œuvre. En exécutant des pipelines en parallèle, vous avez l'assurance que le traitement des flux de données se poursuit sans interruption. L'arrêt et le remplacement de pipelines entraînent une interruption du traitement, car aucun pipeline ne s'exécute pendant un certain temps.
Le pipeline mis à jour écrit dans une table de préproduction supplémentaire (table de préproduction B) qui utilise le schéma B. Vous pouvez utiliser un workflow orchestré pour créer la nouvelle table de préproduction avant de mettre à jour le pipeline. Mettez à jour la vue de façade de façon à inclure les résultats de la nouvelle table de préproduction, en utilisant éventuellement une étape de workflow associée.
Le diagramme suivant montre le flux mis à jour, la table de préproduction B avec le schéma B et la façon dont la vue de façade est mise à jour pour inclure le contenu de la table principale et des deux tables de préproduction.
Dans le cadre d'un processus distinct de la mise à jour du pipeline, vous pouvez fusionner les tables de préproduction dans la table principale, périodiquement ou selon les besoins. Le diagramme suivant montre la fusion de la table de préproduction A dans la table principale.
Étapes suivantes
- Découvrez les étapes détaillées permettant de mettre à jour un pipeline existant.