使用 Managed Airflow 的工作流

在本文档中,您将使用 Google Cloud的以下收费组件:

  • Managed Service for Apache Spark
  • Compute Engine
  • Managed Airflow

如需根据您的预计使用情况来估算费用,请使用价格计算器

新 Google Cloud 用户可能有资格申请免费试用

准备工作

设置项目

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Managed Airflow APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. 安装 Google Cloud CLI。

  6. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Managed Airflow APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. 安装 Google Cloud CLI。

  12. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  13. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init

创建 Managed Service for Apache Spark 工作流模板

在本地终端窗口或 Cloud Shell 中复制并运行以下命令,以创建和定义工作流模板

  1. 创建 sparkpi 工作流模板。
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. 将 Spark 作业添加到 sparkpi 工作流模板。“compute”step-id 标志用于识别 SparkPi 作业。
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. 使用托管单节点集群运行工作流。Managed Service for Apache Spark 将创建集群,对其运行工作流,然后在工作流完成时删除集群。
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. 确认创建工作流模板。

    控制台

    点击 Google Cloud 控制台中的 Managed Service for Apache Spark 工作流页面上的 sparkpi 名称,以打开工作流模板详细信息页面。点击工作流模板的名称以确认 sparkpi 模板属性。

    gcloud 命令

    运行以下命令:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

创建 DAG 并将其上传到 Cloud Storage

  1. 创建或使用现有的托管式 Airflow 环境
  2. 设置环境变量。

    Airflow 界面

    1. 在工具栏中,点击 Admin > Variables
    2. 点击创建
    3. 请输入以下信息:
      • Key:project_id
      • 值:PROJECT_ID - 您的 Google Cloud 项目 ID
    4. 点击保存

    gcloud 命令

    输入以下命令:

    • ENVIRONMENT 是 Managed Airflow 环境的名称
    • LOCATION 是 Managed Airflow 环境所在的区域
    • PROJECT_ID 是包含 Managed Airflow 环境的项目的项目 ID
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. 将以下 DAG 代码从本地复制到名为“composer-dataproc-dag.py”的文件中,该文件使用 DataprocInstantiateWorkflowTemplateOperator

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. 将 DAG 上传到 Cloud Storage 中的环境文件夹。上传成功完成后,点击 Managed Airflow 环境页面上的 DAGs 文件夹链接。

查看任务的状态

Airflow 界面

  1. 打开 Airflow 网页界面
  2. 在 DAG 页面上,点击 DAG 名称(例如 dataproc_workflow_dag)。
  3. 在 DAG 详细信息页面上,点击 Graph View
  4. 查看状态:
    • 失败:任务被红色框圈起。 您还可以将指针悬停在任务上,然后查看 State: Failed任务被红色框圈起,表示任务失败
    • 成功:任务被绿色框圈起。您还可以将指针悬停在任务上,然后查看 State: Success任务被绿色框圈起,表示任务成功

控制台

点击“工作流”标签页以查看工作流状态。

gcloud 命令

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

清理

为避免系统向您的 Google Cloud 账号收取费用,您可以删除本教程中使用的资源:

  1. 删除托管式 Airflow 环境

  2. 删除工作流模板

后续步骤