预配资源

本页面介绍了如何为流水线配置资源。

关于 Orchestration Pipelines 中的资源配置

编排流水线使用基础架构即代码 (IaC) 方法来管理 Google Cloud 数据流水线使用的资源,这带来了 以下优势:

  • 版本控制。基础架构更改会在 Git 中进行跟踪。
  • 可重复性。可以可靠地重新创建环境。
  • 协作。团队成员可以查看基础架构定义并做出贡献。
  • Automation。集成到 CI/CD 流水线中。
  • 可选性和共存性。资源配置框架是可选的。 如果您已使用 Terraform 或其他成熟的 IaC 实践进行资源配置,则可以继续这样做。您可以管理与特定流水线或应用相关的部分资源,这些资源可能会与其他工具管理的资源共存。

在 Orchestration Pipelines 中,您的项目可以有一个或多个部署环境。 每个部署环境的配置定义了如何部署属于此环境的流水线和资源。例如,您可以有一个用于开发的部署环境,另一个用于生产的部署环境。

部署环境配置示例。配置的资源在 resources 映射中定义。

environments:
  dev:
    project: example-dev-project
    region: us-central1
    variables:
      dataset_name: marketing_analytics_dev
    secrets:
      dts_api_key: "projects/example-dev-project/secrets/dev-dts-key/versions/latest"
    resources:
      - type: dataproc.cluster
        name: example-static-cluster-resource
        definition:
          clusterName: example-static-cluster

部署流水线时,Orchestration Pipelines 使用无状态“创建或更新”模型来配置您定义的资源:

  • 如果资源已定义但不存在,Orchestration Pipelines 会创建该资源。

  • 如果资源存在:

    • (默认)更新资源的配置以与定义匹配。

    • 如果您在资源的配置中定义此行为,Orchestration Pipelines 可以忽略更改或重新创建资源。

  • 如果您从配置中删除资源的定义, 这不会导致删除资源。此方法优先考虑安全性,并防止意外数据丢失。

  • 如果您重命名现有资源,Orchestration Pipelines 会使用新名称创建新资源,并保留原始资源。

配置的资源与资源配置文件对比

资源配置文件是包含一个或多个 Google Cloud 资源定义的模板文件。它们与配置的资源不同,可以与配置的资源一起使用:

  • 使用配置的资源:您无需在 deployment.yaml 中每个开发环境的内联中定义相同的资源配置,只需在配置文件中定义一次,然后引用该配置即可。配置的资源支持可在资源 配置文件中定义的各种 资源类型

  • 使用流水线操作:您可以在操作中使用资源配置文件,在操作期间配置资源。通过使用资源配置文件而不是内联指定资源的配置,您可以将资源配置与流水线操作分开,并为多个流水线操作重复使用一个配置。 流水线操作仅支持 Managed Service for Apache Spark 资源的资源配置文件,例如在临时集群中执行流水线操作时。

查看可用的资源类型

请参阅资源类型

您还可以使用以下 gcloud CLI 命令查看所有可用资源:

gcloud beta orchestration-pipelines resource-types list

添加新资源

如需向部署环境的配置添加新的配置资源,请按如下方式添加其定义:

  1. 在部署环境的配置中,向 environments.DEVELOPMENT_ENVIRONMENT.resources 列表添加新项。

  2. 指定以下键:

    • type:要配置的资源类型。 Google Cloud 示例:bigquery.datasetdataform.repository。您可以使用 gcloud CLI 命令查看可用的资源类型。

    • namedeployment.yaml 文件中资源的逻辑名称。

    • parent:为需要父级资源的资源类型指定父级资源。 将父级资源的 name 作为值。

    • updateAction:指定在资源的配置中检测到 更改时必须执行的操作

      • (默认)patch:更新已更改的资源的属性。

        更新操作只会修改资源配置 (YAML) 中指定的属性,而不会更改其他现有属性。例如,您可以使用此行为仅管理对流水线执行重要的属性,并手动配置其他属性。

        如果更改影响不可变字段或资源是不可变的,则部署会失败。在这种情况下,我们建议调整定义以仅修改可变字段。如果无法做到这一点,您可以将更新操作更改为 recreate

      • skip:忽略更改,并且不更新资源的配置。

        如果您想管理资源的存在,但使用其他方式(例如手动)执行配置更改和更新,我们建议使用此选项。

      • recreate:如果检测到任何更改,请删除现有资源,然后根据当前资源的定义创建新资源。

        对于完全不可变的资源或对无法就地更新的字段进行更改时,我们建议使用此选项。

    • definition:资源的规范,以映射的形式反映资源 API 中的资源配置结构。

    • (可选)metadata:编排流水线专用元数据。某些 资源类型使用元数据字段在 Google Cloud 中配置资源。例如,metadata.location字段可用于 创建可用区级资源。

  3. 验证并部署流水线。部署流水线时,Orchestration Pipelines 将配置新资源。

示例:长时间运行的资源

此示例演示了如何添加长时间运行的资源,在本例中为静态 Managed Service for Apache Spark 集群。配置后,它可以在流水线操作中使用。如果流水线使用永久性资源,我们建议使用此通用方法。

以下示例向 dev 部署环境添加了一个名为 example-static-cluster 的静态 Managed Service for Apache Spark 集群。资源的 定义是根据 Dataproc API 提供的。

environments:
  dev:
    project: "example-project"
    region: "us-central1"
    # A runner environment for executing pipeline actions
    composer_environment: "example-runner-environment"

    resources:
      - type: dataproc.cluster
        name: example-static-cluster
        updateAction: patch
        definition:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: n1-standard-4
            workerConfig:
              numInstances: 3
              machineTypeUri: n1-standard-4

此集群可以像往常一样在流水线操作中使用。与手动创建的集群相比,使用方式没有区别。

modelVersion: "1.0"
pipelineId: "example-dataproc-pipeline"
...
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-static-cluster"
            location: {{ region }}
            projectId: {{ project }}
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "scripts/lib1.py"

示例:Dataform 的自动构建和发布流程

此示例演示了 Dataform 的自动构建和发布流程。在示例场景中:

  1. Dataform 代码库通过 Developer Connect 连接到 GitHub 代码库

  2. 您将更改推送到 GitHub 代码库。

  3. 推送更改后,您将部署流水线。

  4. Dataform 从 GitHub 代码库中拉取代码,创建编译结果,并使其可供执行。

  5. 然后,工作流配置用于运行具有此自动编译版本的发布的工作流。

以下代码示例在 dev 部署环境中定义了 Dataform 代码库、发布配置及其工作流配置:

  • gitCommitish: "{{ COMMIT_SHA }}" 行将发布 配置与要部署的特定 Git 提交相关联。COMMIT_SHA 是一个 变量,它解析为已部署 流水线软件包的提交 SHA。
  • codeCompilationConfig.pipelineConfig.path 键指向包含流水线资产的子文件夹。这样,您就可以将多个 Dataform 流水线保留在单个代码库中。
  • releaseConfig 的定义中将 releaseCompilationResult 设置为 auto,指示 Orchestration Pipelines 在创建或更新 releaseConfig 资源时使用新的 gitCommitish 后触发 Dataform 编译:

    1. 框架首先 upsert(更新或创建)releaseConfig 资源以指向指定的提交。
    2. 然后,由于 auto 设置,它会调用 Dataform API 以基于该提交中的代码创建新的编译结果。
    3. releaseConfig 会再次更新,以指向新创建的 编译结果 ID,从而使该版本成为“实时”版本。
environments:
  dev:
    project: example-project
    region: us-central1
    composer_environment: example-runner-environment
    artifact_storage:
      bucket: example-bucket
      path_prefix: initialized-artifact-bucket
    pipelines:
      - source: initialized-pipeline.yaml
      - source: dataform_local_pipeline.yaml
      - source: dataform_service_pipeline.yaml
    resources:
      - name: {{ repository_name }}
        type: dataform.repository
        definition:
          labels:
            bigquery-deployment: preview
      - type: dataform.repository.releaseConfig
        name: subfolder-release
        parent: {{ repository_name }}
        definition:
          gitCommitish: {{ COMMIT_SHA }}
          releaseCompilationResult: auto
          codeCompilationConfig:
            pipelineConfig:
              pipelineType: DATAFORM
              path: weather_dataform
      - type: dataform.repository.workflowConfig
        name: {{ workflow_config_name }}
        parent: {{ repository_name }}
        definition:
          releaseConfig: subfolder-release
          invocationConfig:
            serviceAccount: {{ service_account }}
    variables:
      service_account: example-account@example-project.iam.gserviceaccount.com
      network_uri: projects/example-project/global/networks/default
      subnetwork_uri: projects/example-project/regions/us-central1/subnetworks/default
      region: us-central1
      repository_name: weather-aggregation-repo
      workflow_config_name: updated-subfolder-workflow

使用创建的工作流配置运行工作流的流水线操作示例:

modelVersion: '1.0'
pipelineId: dataform_service_pipeline
description: Updated run Dataform pipeline via Dataform Service
runner: airflow
owner: data-eng-team

defaults:
  projectId: {{ project }}
  location: {{ region }}
  executionConfig:
    retries: 1

actions:
  - pipeline:
      name: run_dataform_service
      framework:
        dataform:
          dataformService:
            location: {{ region }}
            projectId: {{ project }}
            repositoryId: {{ repository_name }}
            workflowInvocation:
              workflowConfig: projects/{{ project }}/locations/{{ region }}/repositories/{{
                repository_name }}/workflowConfigs/{{ workflow_config_name }}
  - python:
      name: fibonacci_python
      mainFilePath: scripts/fibonacci.py
      pythonCallable: fibonacciTen
      engine:
        local: {}
  - sql:
      name: dummy_bq_query
      engine:
        bigquery:
          location: {{ region }}
      query:
        inline: 'SELECT COUNT(*) FROM `{{ project }}.weather_data.sensor_readings`'
tags:
  - job:datacloud:vscode