使用 Cloud Run functions 的工作流

准备工作

如果您尚未设置 Google Cloud 项目和两个 (2) Cloud Storage 存储桶,请执行此操作。

设置项目

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, Cloud Storage, and Cloud Run functions APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. 安装 Google Cloud CLI。

  6. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, Cloud Storage, and Cloud Run functions APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. 安装 Google Cloud CLI。

  12. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  13. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init

在项目中创建或使用两个 Cloud Storage 存储桶

您的项目中需要两个 Cloud Storage 存储桶:一个用于输入文件,另一个用于输出文件。

  1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。

    进入“存储桶”

  2. 点击 创建
  3. 创建存储桶页面上,输入您的存储桶信息。要转到下一步,请点击继续
    1. 开始使用部分中,执行以下操作:
      • 输入符合存储桶命名要求的全局唯一的名称。
      • 如需添加存储桶标签,请展开标签部分 (),点击 添加标签,并为标签指定 keyvalue
    2. 选择数据存储位置部分,执行以下操作:
      1. 选择位置类型
      2. 位置类型下拉菜单中选择一个位置,用于永久存储存储桶的数据。
      3. 如需设置跨存储桶复制,请选择通过 Storage Transfer Service 添加跨存储桶复制,然后按照以下步骤操作:

        设置跨存储桶复制

        1. 存储桶菜单中,选择一个存储桶。
        2. 复制设置部分中,点击配置以配置复制作业的设置。

          系统会显示配置跨存储桶复制窗格。

          • 如需按对象名称前缀过滤要复制的对象,请输入要用于包含或排除对象的前缀,然后点击 添加前缀
          • 如需为复制的对象设置存储类别,请从存储类别菜单中选择一个存储类别。 如果您跳过此步骤,则复制的对象会默认使用目标存储桶的存储类别。
          • 点击完成
    3. 选择数据存储方式部分中,执行以下操作:
      1. 为存储桶选择默认存储类别,或者选择 Autoclass 对存储桶数据进行自动存储类别管理。
      2. 如需启用分层命名空间,请在针对数据密集型工作负载优化存储部分中,选择在此存储桶上启用分层命名空间
    4. 选择如何控制对对象的访问权限部分中,选择存储桶是否强制执行禁止公开访问,然后为存储桶对象选择访问权限控制方法
    5. 选择如何保护对象数据部分中,执行以下操作:
      • 数据保护下,选择您要为存储桶设置的任何选项。
        • 如需启用软删除,请选中软删除政策(用于数据恢复)复选框,然后指定您希望在删除对象后保留对象的天数。
        • 如需设置对象版本控制,请选中对象版本控制(用于版本控制)复选框,然后指定每个对象的最大版本数以及非当前版本过期前的天数。
        • 如需为对象和存储分区启用保留政策,请点击保留(合规性)复选框,然后执行以下操作:
          • 如需启用对象保留锁定,请点击启用对象保留复选框。
          • 如需启用存储桶锁定,请点击设置存储桶保留政策复选框,然后为保留期限选择时间单位和时长。
      • 如需选择对象数据的加密方式,请展开数据加密部分 (),然后选择数据加密方法
  4. 点击创建

创建工作流模板

如需创建和定义工作流模板,请在本地终端窗口或 Cloud Shell 中复制并运行以下命令。

  1. 创建工作流模板。
      gcloud dataproc workflow-templates create wordcount-template \
          --region=us-central1
      
  2. 将 wordcount 作业添加到工作流模板。
    1. 在运行命令之前指定 output-bucket-name(您的函数将提供输入存储分区)。插入输出存储分区名称后,输出存储分区参数应如下所示:gs://your-output-bucket/wordcount-output"
    2. “count”步骤 ID 是必需的,用于标识已添加的作业。
                gcloud dataproc workflow-templates add-job hadoop \
                    --workflow-template=wordcount-template \
                    --step-id=count \
                    --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
                    --region=us-central1 \
                    -- wordcount gs://input-bucket gs://output-bucket-name/wordcount-output
              
  3. 使用托管单节点集群运行工作流。Managed Service for Apache Spark 将创建集群,对其运行工作流,然后在工作流完成时删除集群。
        gcloud dataproc workflow-templates set-managed-cluster wordcount-template \
            --cluster-name=wordcount \
            --single-node \
            --region=us-central1
        
  4. 点击 Google Cloud 控制台中的 Managed Service for Apache Spark 工作流页面上的 wordcount-template 名称,以打开工作流模板详细信息页面。确认 wordcount 模板属性。 工作流模板详细信息页面

参数化工作流模板

参数化输入存储分区变量以传递给工作流模板。

  1. 将工作流模板导出到 wordcount.yaml 文本文件以进行参数化。
    gcloud dataproc workflow-templates export wordcount-template \
        --destination=wordcount.yaml \
        --region=us-central1
    
  2. 使用文本编辑器打开 wordcount.yaml,然后将 parameters 块添加到 YAML 文件的末尾,以便在工作流触发时,Cloud Storage INPUT_BUCKET_URI 可以作为 args[1] 传递到字数二进制文件。

    导出的 YAML 文件示例如下所示。您可以采用以下两种方法之一来更新模板:

    1. 在将 your-output_bucket 替换为您的输出存储分区名称后,复制然后粘贴整个文件以替换导出的 wordcount.yaml,或者
    2. 复制并粘贴 parameters 部分到导出的 wordcount.yaml 文件的末尾。
    .
    jobs:
    - hadoopJob:
        args:
        - wordcount
        - gs://input-bucket
        - gs://your-output-bucket/wordcount-output
        mainJarFileUri: file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
      stepId: count
    placement:
      managedCluster:
        clusterName: wordcount
        config:
          softwareConfig:
            properties:
              dataproc:dataproc.allow.zero.workers: 'true'
    parameters:
    - name: INPUT_BUCKET_URI
      description: wordcount input bucket URI
      fields:
      - jobs['count'].hadoopJob.args[1]
    
  3. 导入参数化的 wordcount.yaml 文本文件。当系统要求您覆盖模板时,请键入“Y”。
    gcloud dataproc workflow-templates import  wordcount-template \
        --source=wordcount.yaml \
        --region=us-central1
    

创建 Cloud Functions 函数

  1. 在Google Cloud 控制台中打开 Cloud Run functions 页面,然后点击“创建函数”。

  2. 创建函数页面上,输入或选择以下信息:

    1. 名称:wordcount
    2. 分配的内存:保留默认选项。
    3. 触发器:
      • Cloud Storage
      • 事件类型:完成/创建
      • 存储桶:选择您的输入存储桶(请参阅在项目中创建 Cloud Storage 存储桶)。 将文件添加到此存储分区时,该函数将触发工作流。该工作流将运行 Wordcount 应用,该应用将处理存储分区中的所有文本文件。
    4. 源代码:

      • 內嵌编辑器
      • 运行时:Node.js 8
      • INDEX.JS 标签:将默认代码段替换为以下代码,然后编辑 const projectId 行以提供 -your-project-id-(开头或结尾“-”)
      const dataproc = require('@google-cloud/dataproc').v1;
      
      exports.startWorkflow = (data) => {
       const projectId = '-your-project-id-'
       const region = 'us-central1'
       const workflowTemplate = 'wordcount-template'
      
      const client = new dataproc.WorkflowTemplateServiceClient({
         apiEndpoint: `${region}-dataproc.googleapis.com`,
      });
      
      const file = data;
      console.log("Event: ", file);
      
      const inputBucketUri = `gs://${file.bucket}/${file.name}`;
      
      const request = {
        name: client.projectRegionWorkflowTemplatePath(projectId, region, workflowTemplate),
        parameters: {"INPUT_BUCKET_URI": inputBucketUri}
      };
      
      client.instantiateWorkflowTemplate(request)
        .then(responses => {
          console.log("Launched Dataproc Workflow:", responses[1]);
        })
        .catch(err => {
          console.error(err);
        });
      };
      
      • PACKAGE.JSON 标签:将默认代码段替换为以下代码。
      {
        "name": "dataproc-workflow",
        "version": "1.0.0",
        "dependencies":{ "@google-cloud/dataproc": ">=1.0.0"}
      }
      
      • 要执行的函数:Insert:"startWorkflow"。
    5. 点击“创建”。

测试函数

  1. 将公开的文件 rose.txt 复制到您的存储分区以触发该函数。在命令中插入 your-input-bucket-name(用于触发函数的存储分区)。

    gcloud storage cp gs://pub/shakespeare/rose.txt gs://your-input-bucket-name
    

  2. 等待 30 秒,然后运行以下命令以验证函数是否已成功完成。

    gcloud functions logs read wordcount
    
    ...
    Function execution took 1348 ms, finished with status: 'ok'
    

  3. 如需从 Google Cloud 控制台中的函数列表页面查看函数日志,请点击 wordcount 函数名称,然后点击函数详细信息页面上的“查看日志”。

  4. 您可以在Google Cloud 控制台的 Storage 浏览器页面中查看输出存储桶中的 wordcount-output 文件夹。

  5. 工作流完成后,作业详细信息将保留在Google Cloud 控制台中。点击 Managed Service for Apache Spark 作业 页面上列出的 count... 作业以查看工作流作业详细信息。

清理

本教程中的工作流将在工作流完成时删除其托管集群。为避免重复收费,您可以删除与本教程关联的其他资源。

删除项目

  1. 在 Google Cloud 控制台中,前往管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除 Cloud Storage 存储分区

  1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。

    进入“存储桶”

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除,然后按照说明操作。

删除工作流模板

gcloud dataproc workflow-templates delete wordcount-template \
    --region=us-central1

删除 Cloud Functions 函数

在 Google Cloud 控制台中打开 Cloud Run functions 页面,选中 wordcount 函数左侧的框,然后点击删除

后续步骤