Exécuter des modèles Flex dans Dataflow

Cette page explique comment exécuter un job Dataflow à l'aide d'un modèle Flex. Les modèles Flex vous permettent d'empaqueter un pipeline Dataflow afin de pouvoir l'exécuter sans environnement de développement Apache Beam.

Autorisations requises

Lorsque vous exécutez un modèle Flex, Dataflow crée une tâche pour vous. Pour créer la tâche, le compte de service Dataflow doit disposer des autorisations suivantes :

  • dataflow.serviceAgent

Lorsque vous utilisez Dataflow pour la première fois, le service attribue ce rôle à votre place. Vous n'avez donc pas besoin d'accorder cette autorisation.

Par défaut, le compte de service Compute Engine est utilisé pour les VM de lanceur d'applications et les VM de nœud de calcul. Le compte de service requiert les rôles et fonctionnalités suivants :

  • Administrateur des objets de l'espace de stockage (roles/storage.objectAdmin)
  • Lecteur (roles/viewer)
  • Nœud de calcul Dataflow (roles/dataflow.worker)
  • Accès en lecture et en écriture au bucket de préproduction
  • Accès en lecture à l'image du modèle Flex

Pour accorder un accès en lecture et en écriture au bucket de préproduction, vous pouvez utiliser le rôle d'Administrateur des objets de l'espace de stockage (roles/storage.objectAdmin). Pour en savoir plus, consultez la page Rôles IAM pour Cloud Storage.

Pour accorder un accès en lecture à l'image du modèle Flex, vous pouvez utiliser le rôle Lecteur des objets de l'espace de stockage (roles/storage.objectViewer). Pour en savoir plus, consultez la section Configurer le contrôle des accès.

Exécuter un modèle Flex

Pour exécuter un modèle Flex, utilisez la commande gcloud dataflow flex-template run :

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Remplacez les éléments suivants :

  • JOB_ID : ID de votre tâche

  • TEMPLATE_FILE_LOCATION : emplacement Cloud Storage du fichier de modèle

  • REGION : région dans laquelle exécuter le job Dataflow

  • STAGING_LOCATION : emplacement Cloud Storage pour préparer les fichiers locaux

  • TEMP_LOCATION : emplacement Cloud Storage où écrire les fichiers temporaires. Si ce champ n'est pas défini, la valeur par défaut est l'emplacement de préproduction.

  • PARAMETERS : paramètres de pipeline pour le job

  • LABELS : facultatif. Libellés associés à votre job, au format KEY_1=VALUE_1,KEY_2=VALUE_2,....

Lors de l'étape de préproduction du lancement d'un modèle, Dataflow écrit les fichiers dans l'emplacement de préproduction. Dataflow lit ces fichiers intermédiaires pour créer le graphique de job. Lors de l'étape d'exécution, Dataflow écrit les fichiers à l'emplacement temporaire.

Définir les options de pipeline

Pour définir les options de pipeline lorsque vous exécutez un modèle Flex, utilisez les indicateurs suivants dans la commande gcloud dataflow flex-template run :

gcloud

Lorsque vous transmettez des paramètres de type List ou Map, vous devrez peut-être définir les paramètres dans un fichier YAML et utiliser l'indicateur flags-file.

API

L'exemple suivant montre comment inclure des options de pipeline, des tests et des options supplémentaires dans un corps de requête :

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Lorsque vous utilisez des modèles Flex, vous pouvez configurer certaines options de pipeline lors de l'initialisation du pipeline, mais les autres options de pipeline ne peuvent pas être modifiées. Si les arguments de ligne de commande requis par le modèle Flex sont écrasés, la tâche peut ignorer, remplacer ou supprimer les options de pipeline transmises par le lanceur de modèles. La tâche peut échouer au lancement ou une tâche qui n'utilise pas le modèle Flex peut être lancée. Pour en savoir plus, consultez la section Failed to read the job file (Échec de la lecture du fichier de tâche).

Lors de l'initialisation du pipeline, ne modifiez pas les options de pipeline suivantes :

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

Bloquer les clés SSH de projet à partir de VM qui utilisent des clés SSH basées sur les métadonnées

Vous pouvez empêcher les VM d'accepter les clés SSH stockées dans les métadonnées du projet en bloquant les clés SSH du projet provenant des VM. Utilisez l'option additional-experiments avec l'option de service block_project_ssh_keys :

--additional-experiments=block_project_ssh_keys

Pour en savoir plus, consultez la page Options du service Dataflow.

Mettre à jour un job de modèle Flex

L'exemple de requête suivant montre comment mettre à jour un job par flux à partir d'un modèle à l'aide de la méthode projects.locations.flexTemplates.launch. Si vous souhaitez utiliser gcloud CLI, consultez la page Mettre à jour un pipeline existant.

Si vous souhaitez mettre à jour un modèle classique, utilisez plutôt projects.locations.templates.launch.

  1. Suivez les étapes pour créer un job par flux à partir d'un modèle Flex. Envoyez la requête HTTP POST suivante avec les valeurs modifiées :

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Remplacez PROJECT_ID par l'ID du projet.
    • Remplacez REGION par la région Dataflow du job que vous mettez à jour.
    • Remplacez JOB_NAME par le nom exact du job que vous souhaitez mettre à jour.
    • Définissez parameters sur votre liste de paires clé/valeur. Les paramètres répertoriés sont spécifiques à cet exemple de modèle. Si vous utilisez un modèle personnalisé, modifiez les paramètres selon vos besoins. Si vous utilisez le modèle d'exemple, remplacez les variables suivantes.
      • Remplacez SUBSCRIPTION_NAME par le nom de votre abonnement Pub/Sub.
      • Remplacez DATASET par le nom de votre ensemble de données BigQuery.
      • Remplacez TABLE_NAME par le nom de votre table BigQuery.
    • Remplacez STORAGE_PATH par l'emplacement Cloud Storage du fichier de modèle. L'emplacement doit commencer par gs://.
  2. Utilisez le paramètre environment pour modifier les paramètres de l'environnement. Pour en savoir plus, consultez la section FlexTemplateRuntimeEnvironment.

  3. Facultatif : Pour envoyer votre requête à l'aide de curl (Linux, macOS ou Cloud Shell), enregistrez la requête dans un fichier JSON, puis exécutez la commande suivante :

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Remplacez FILE_PATH par le chemin d'accès au fichier JSON contenant le corps de la requête.

  4. Utilisez l'interface de surveillance de Dataflow pour vérifier qu'un nouveau job portant le même nom a été créé. Ce job est associé à l'état Updated (Mis à jour).

Étapes suivantes