Flujo de trabajo con Managed Airflow

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Managed Service for Apache Spark
  • Compute Engine
  • Managed Airflow

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios Google Cloud nuevos decumplan con los requisitos para acceder a una prueba gratuita.

Antes de comenzar

Configura tu proyecto

  1. Accede a tu Google Cloud cuenta de. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Managed Airflow APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Instala Google Cloud CLI.

  6. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  7. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Managed Airflow APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Instala Google Cloud CLI.

  12. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  13. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init

Crea una plantilla de flujo de trabajo de Managed Service para Apache Spark

Copia y ejecuta los siguientes comandos en una ventana de la terminal local o en Cloud Shell para crear y definir una plantilla de flujo de trabajo.

  1. Crea la plantilla de flujo de trabajo sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Agrega el trabajo de Spark a la plantilla de flujo de trabajo.sparkpi La marca step-id de “compute” identifica el trabajo de SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Usa un clúster administrado, de un solo nodo para ejecutar el flujo de trabajo. Managed Service para Apache Spark creará el clúster, ejecutará el flujo de trabajo en él y, luego, borrará el clúster cuando este se complete.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Confirma la creación de la plantilla de flujo de trabajo.

    Console

    Haz clic en el nombre sparkpi en la página Flujos de trabajo de Managed Service para Apache Spark en la consola de para abrir la página Detalles de la plantilla de flujo de trabajo. Google Cloud Haz clic en el nombre de tu plantilla de flujo de trabajo para confirmar los atributos de la plantilla sparkpi.

    Comando de gcloud

    Ejecuta el siguiente comando:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Crea y sube un DAG a Cloud Storage

  1. Crea o usa un entorno existente de Managed Airflow.
  2. Configurar variables de entorno

    IU de Airflow

    1. En la barra de herramientas, haz clic en Administrador > Variables.
    2. Haz clic en Crear.
    3. Ingresa la siguiente información:
      • Key:project_id
      • Val: PROJECT_ID : Es el ID de tu Google Cloud proyecto de
    4. Haz clic en Guardar.

    Comando de gcloud

    Ingresa los siguientes comandos:

    • ENVIRONMENT es el nombre del entorno de Managed Airflow
    • LOCATION es la región en la que se encuentra el entorno de Managed Airflow .
    • PROJECT_ID es el ID del proyecto que contiene el entorno de Managed Airflow.
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Copia el siguiente código de DAG de forma local en un archivo llamado "composer-dataproc-dag.py", que usa el DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Sube tu DAG a tu carpeta de entorno en Cloud Storage. Una vez que la carga se haya completado de forma correcta, haz clic en el vínculo Carpeta de DAG en la página del entorno de Managed Airflow.

Visualiza el estado de una tarea

IU de Airflow

  1. Abre la interfaz web de Airflow.
  2. En la página de los DAG, haz clic en el nombre del DAG (por ejemplo, dataproc_workflow_dag).
  3. En la página de detalles de los DAG, haz clic en Graph View.
  4. Verifica el estado:
    • Tarea con errores: la tarea estará encerrada en un cuadro rojo. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: con errores. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: con errores. La tarea tiene un cuadro rojo a su alrededor, lo que indica que falló
    • Éxito: La tarea tiene un cuadro verde a su alrededor. También puedes mantener el puntero sobre la tarea y ver si aparece el mensaje Estado: correcto. La tarea tiene un cuadro verde a su alrededor, lo que indica que se realizó correctamente.

Console

Haz clic en la pestaña Flujos de trabajo para ver el estado del flujo de trabajo.

Comando de gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Limpia

Para evitar que se apliquen cargos a tu Google Cloud cuenta de, puedes borrar los recursos que usaste en este instructivo:

  1. Borra el entorno de Managed Airflow.

  2. Borra la plantilla de flujo de trabajo.

¿Qué sigue?