Workflow utilisant Cloud Composer

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Managed Service for Apache Spark
  • Compute Engine
  • Cloud Composer

Pour obtenir une estimation des coûts en fonction de votre utilisation prévue, utilisez le simulateur de coût.

Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai sans frais.

Avant de commencer

Configurer votre projet

  1. Connectez-vous à votre compte Google Cloud . Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Installez la Google Cloud CLI.

  6. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  7. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Installez la Google Cloud CLI.

  12. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  13. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init

Créer un modèle de workflow Managed Service pour Apache Spark

Copiez et exécutez les commandes suivantes dans une fenêtre de terminal local ou dans Cloud Shell pour créer et définir un modèle de workflow.

  1. Créez le modèle de workflow sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Ajoutez la tâche spark au modèle de workflow sparkpi. L'option "compute" step-id identifie la tâche SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Utilisez un cluster géré à nœud unique pour exécuter le workflow. Managed Service for Apache Spark crée le cluster, y exécute le workflow, puis le supprime une fois le workflow terminé.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirmez la création du modèle de workflow.

    Console

    Cliquez sur le nom sparkpi sur la page Workflows Managed Service for Apache Spark dans la console Google Cloud pour ouvrir la page Détails du modèle de workflow. Cliquez sur le nom de votre modèle de workflow pour confirmer les attributs du modèle sparkpi.

    Commande gcloud

    Exécutez la commande suivante :

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Créer et importer un DAG dans Cloud Storage

  1. Créez ou utilisez un environnement Cloud Composer existant.
  2. Définir des variables d'environnement.

    Interface utilisateur d'Airflow

    1. Dans la barre d'outils, cliquez sur Admin > Variables (Administration > Variables).
    2. Cliquez sur Créer.
    3. Saisissez les informations suivantes :
      • Key (Clé) : project_id
      • Valeur : PROJECT_ID, l'ID de votre Google Cloud projet
    4. Cliquez sur Enregistrer.

    Commande gcloud

    Saisissez les commandes suivantes :

    • ENVIRONMENT correspond au nom de l'environnement Cloud Composer.
    • LOCATION correspond à la région où se trouve l'environnement Cloud Composer.
    • PROJECT_ID est l'ID du projet contenant l'environnement Cloud Composer.
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copiez le code DAG suivant localement dans un fichier nommé "composer-dataproc-dag.py", qui utilise DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Importez votre DAG dans le dossier d'environnement de Cloud Storage. Une fois l'importation terminée, cliquez sur le lien Dossier DAGS sur la page de l'environnement Cloud Composer.

Afficher l'état d'une tâche

Interface utilisateur d'Airflow

  1. Ouvrez l'interface Web Airflow.
  2. Sur la page des DAG, cliquez sur le nom du DAG (par exemple, dataproc_workflow_dag).
  3. Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :
    • Échec : la tâche apparaît dans un encadré rouge. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention (État : Échec). La tâche apparaît dans un encadré rouge, indiquant qu'elle a échoué
    • Réussite : la tâche apparaît dans un encadré vert. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention (État : Réussite). La tâche apparaît dans un encadré vert, indiquant qu'elle a réussi

Console

Cliquez sur l'onglet "Workflows" pour afficher l'état du workflow.

Commande gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud , vous pouvez les supprimer :

  1. Supprimez l'environnement Cloud Composer.

  2. Supprimez le modèle de workflow.

Étapes suivantes