本頁面記錄各種 Dataflow Flex 範本設定選項,包括:
如要設定 Flex 範本範例,請參閱「Flex 範本教學課程」。
瞭解 Flex 範本權限
使用 Flex 範本時,您需要三組權限:
- 建立資源的權限
- 建構 Flex 範本的權限
- 執行 Flex 範本的權限
建立資源的權限
如要開發及執行 Flex 範本管道,您需要建立各種資源 (例如暫存 bucket)。如果是一次性資源建立工作,可以使用基本擁有者角色。
建構 Flex 範本的權限
身為 Flex 範本的開發人員,您必須建構範本,才能供使用者使用。建構作業包括將範本規格上傳至 Cloud Storage bucket,以及佈建 Docker 映像檔,其中包含執行管道所需的程式碼和依附元件。如要建構 Flex 範本,您需要 Cloud Storage 的讀取和寫入權限,以及 Artifact Registry 存放區的 Artifact Registry 寫入者存取權。如要授予這些權限,請指派下列角色:
- 儲存空間管理員 (
roles/storage.admin) - Cloud Build 編輯者 (
roles/cloudbuild.builds.editor) - Artifact Registry 寫入者 (
roles/artifactregistry.writer)
執行 Flex 範本的權限
執行 Flex 範本時,Dataflow 會為您建立工作。如要建立工作,Dataflow 服務帳戶必須具備下列權限:
dataflow.serviceAgent
首次使用 Dataflow 時,服務會為您指派這個角色,因此您不需要授予這項權限。
根據預設,啟動器 VM 和 worker VM 會使用 Compute Engine 服務帳戶。這個服務帳戶必須擁有下列角色和權限:
- Storage 物件管理員 (
roles/storage.objectAdmin) - 檢視者 (
roles/viewer) - Dataflow Worker (
roles/dataflow.worker) - 暫存 bucket 的讀寫權限
- 具備 Flex 範本映像檔的讀取權限
如要授予暫存 bucket 的讀寫權限,可以使用「Storage 物件管理員」(roles/storage.objectAdmin) 角色。詳情請參閱 Cloud Storage 的 IAM 角色。
如要授予 Flex 範本映像檔的讀取權限,可以使用「Storage 物件檢視者」(roles/storage.objectViewer) 角色。詳情請參閱「設定存取控管」。
設定必要的 Dockerfile 環境變數
如要為 Flex 範本工作建立自己的 Dockerfile,請指定下列環境變數:
Java
在 Dockerfile 中指定 FLEX_TEMPLATE_JAVA_MAIN_CLASS 和 FLEX_TEMPLATE_JAVA_CLASSPATH。
| ENV | 說明 | 必填 |
|---|---|---|
FLEX_TEMPLATE_JAVA_MAIN_CLASS |
指定要執行的 Java 類別,以便啟動 Flex 範本。 | 是 |
FLEX_TEMPLATE_JAVA_CLASSPATH |
指定類別檔案的位置。 | 是 |
FLEX_TEMPLATE_JAVA_OPTIONS |
指定啟動 Flex 範本時要傳遞的 Java 選項。 | 否 |
Python
在 Dockerfile 中指定 FLEX_TEMPLATE_PYTHON_PY_FILE。
如要管理管道依附元件,請在 Dockerfile 中設定變數,如以下這些項目:
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILEFLEX_TEMPLATE_PYTHON_PY_OPTIONSFLEX_TEMPLATE_PYTHON_SETUP_FILEFLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES
例如,GitHub 上的使用 Python Flex 範本串流教學課程中,設定下列環境變數:
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
| ENV | 說明 | 必填 |
|---|---|---|
FLEX_TEMPLATE_PYTHON_PY_FILE |
指定要執行的 Python 檔案,以啟動 Flex 範本。 | 是 |
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE |
指定包含管道依附元件的需求檔案。詳情請參閱 Apache Beam 說明文件中的「PyPI 依附元件」。 | 否 |
FLEX_TEMPLATE_PYTHON_SETUP_FILE |
指定管道套件「setup.py」檔案的路徑。詳情請參閱 Apache Beam 說明文件中的「多重檔案依附元件」。 | 否 |
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES |
指定不公開的套件。如要瞭解如何使用額外套件,請參閱「本機或非 PyPI 依附元件」。 |
否 |
FLEX_TEMPLATE_PYTHON_PY_OPTIONS |
指定啟動 Flex 範本時要傳遞的 Python 選項。 | 否 |
Python 的套件依附元件
如果 Dataflow Python 管道使用其他依附元件,您可能需要設定 Flex 範本,在 Dataflow worker VM 上安裝其他依附元件。
在限制網際網路存取的環境中,執行使用 Flex 範本的 Python Dataflow 工作時,您必須在建立範本時預先封裝依附元件。
請使用下列其中一種選項預先封裝 Python 依附元件。
如需管理 Java 和 Go 管道中管道依附元件的操作說明,請參閱「在 Dataflow 中管理管道依附元件」。
使用需求檔案,並預先將依附元件與範本封裝在一起
如果您使用自己的 Dockerfile 定義 Flex 範本映像檔,請按照下列步驟操作:
建立
requirements.txt檔案,列出管道依附元件。COPY requirements.txt /template/ ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"在 Flex 範本映像檔中安裝依附元件。
RUN pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE將依附元件下載到本機需求快取,範本啟動時,這些依附元件會暫存到 Dataflow worker。
RUN pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
使用這種方法時,系統會在執行階段將 requirements.txt 檔案中的依附元件安裝到 Dataflow worker。您可能會在 Google Cloud 控制台的建議分頁中看到這類洞察資訊。如要避免在執行階段安裝依附元件,請使用自訂容器映像檔。
以下程式碼範例會在 Flex 範本中使用需求檔案。
將管道架構為套件,並使用本機套件
使用多個 Python 本機檔案或模組時,請將管道建構為套件。檔案結構可能如下列範例所示:
main.py
pyproject.toml
setup.py
src/
my_package/
__init__.py
my_custom_dofns_and_transforms.py
my_pipeline_launcher.py
other_utils_and_helpers.py
將頂層進入點 (例如
main.py檔案) 放在根目錄中。將其餘檔案放在src目錄的獨立資料夾中,例如my_package。在根目錄中新增套件設定檔,並提供套件詳細資料和需求條件。
pyproject.toml
[project] name = "my_package" version = "package_version" dependencies = [ # Add list of packages (and versions) that my_package depends on. # Example: "apache-beam[gcp]==2.54.0", ]setup.py
"""An optional setuptools configuration stub for the pipeline package. Use pyproject.toml to define the package. Add this file only if you must use the --setup_file pipeline option or the FLEX_TEMPLATE_PYTHON_SETUP_FILE configuration option. """ import setuptools setuptools.setup()如要進一步瞭解如何設定本機套件,請參閱「封裝 Python 專案」。
為管道匯入本機模組或檔案時,請使用
my_package套件名稱做為匯入路徑。from my_package import word_count_transform在 Flex 範本映像檔中安裝管道套件。您的 Flex 範本 Dockerfile 可能包含類似下列範例的內容:
Dockerfile
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py" ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py" # Copy pipeline, packages and requirements. WORKDIR ${WORKDIR} COPY main.py . COPY pyproject.toml . COPY setup.py . COPY src src # Install local package. RUN pip install -e .
使用這種方法時,系統會在執行階段將 requirements.txt 檔案中的依附元件安裝到 Dataflow worker。您可能會在 Google Cloud 控制台的建議分頁中看到這類洞察資訊。如要避免在執行階段安裝依附元件,請使用自訂容器映像檔。
如需採用建議做法的範例,請參閱 GitHub 中的具有依附元件和自訂容器映像檔的管道 Flex 範本教學課程。
使用預先安裝所有依附元件的自訂容器
如要避免在執行階段安裝依附元件,請使用自訂容器。如果管道在無法存取網際網路的環境中執行,建議使用這個選項。
如要使用自訂容器,請按照下列步驟操作:
建立自訂容器映像檔,預先安裝必要依附元件。
在 Flex 範本 Dockerfile 中預先安裝相同的依附元件。
如要避免在執行階段安裝依附元件,請勿在 Flex 範本設定中使用
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE或FLEX_TEMPLATE_PYTHON_SETUP_FILE選項。修改後的 Flex 範本
Dockerfile可能如下所示:FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py" COPY . /template # If you use a requirements file, pre-install the requirements.txt. RUN pip install --no-cache-dir -r /template/requirements.txt # If you supply the pipeline in a package, pre-install the local package and its dependencies. RUN pip install -e /template使用這個方法時,請執行下列操作:
- 建構 Flex 範本映像檔
- 建構自訂 SDK 容器映像檔
- 在兩個映像檔中安裝相同的依附元件
或者,如要減少維護的映像檔數量,請將自訂容器映像檔做為 Flex 範本的基本映像檔。
如果您使用 Apache Beam SDK 2.49.0 以下版本,請在管道啟動器中新增
--sdk_location=container管道選項。這個選項會指示管道使用自訂容器中的 SDK,而非下載 SDK。options = PipelineOptions(beam_args, save_main_session=True, streaming=True, sdk_location="container")在
flex-template run指令中設定sdk_container_image參數。例如:gcloud dataflow flex-template run $JOB_NAME \ --region=$REGION \ --template-file-gcs-location=$TEMPLATE_PATH \ --parameters=sdk_container_image=$CUSTOM_CONTAINER_IMAGE \ --additional-experiments=use_runner_v2詳情請參閱「在 Dataflow 中使用自訂容器」。
選擇基本映像檔
您可以使用 Google 提供的基本映像檔,透過 Docker 封裝範本容器映像檔。從「Flex 範本基本映像檔」中選擇最新標記。建議您使用特定圖片標記,而非 latest。基本映像檔託管於 gcr.io/dataflow-templates-base。
請以下列格式指定基本映像檔:
gcr.io/dataflow-templates-base/IMAGE_NAME:TAG
更改下列內容:
IMAGE_NAME:Google 提供的基本映像檔TAG:基本映像檔的版本名稱,請參閱「Flex 範本基本映像檔參考資料」
使用自訂容器映像檔
如果管道使用自訂容器映像檔,建議您將自訂映像檔做為 Flex 範本 Docker 映像檔的基本映像檔。如要這麼做,請將 Google 提供的範本基本映像檔中的 Flex 範本啟動器二進位檔,複製到自訂映像檔中。
以下是映像檔的 Dockerfile 範例,可用於自訂 SDK 容器映像檔和 Flex 範本可能如下所示:
FROM gcr.io/dataflow-templates-base/IMAGE_NAME:TAG as template_launcher
FROM apache/beam_python3.10_sdk:2.69.0
# RUN <...Make image customizations here...>
# See: https://cloud.google.com/dataflow/docs/guides/build-container-image
# Configure the Flex Template here.
COPY --from=template_launcher /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher
COPY my_pipeline.py /template/
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/my_pipeline.py"
更改下列內容:
IMAGE_NAME:Google 提供的基本映像檔。例如:python311-template-launcher-base。TAG:基本映像檔的版本標記,請參閱「Flex 範本基本映像檔參考資料」。為提升穩定性及方便進行疑難排解,請避免使用latest。請改為固定使用特定版本標記。
如需採用這種做法的範例,請參閱具有依附元件和自訂容器映像檔的管道 Flex 範本教學課程。
使用私人登錄檔中的映像檔
如果私人登錄檔使用 HTTPS 並具有有效憑證,您可以建構儲存在私人 Docker 登錄檔中的 Flex 範本映像檔。
如要使用私人登錄檔中的映像檔,請指定映像檔的路徑,以及登錄檔的使用者名稱和密碼。使用者名稱和密碼必須儲存在 Secret Manager。您可以透過下列其中一種格式提供密鑰:
projects/{project}/secrets/{secret}/versions/{secret_version}projects/{project}/secrets/{secret}
如果您使用第二種格式,由於未指定版本,Dataflow 會使用最新版本。
如果登錄檔使用自行簽署的憑證,您也需要在 Cloud Storage 中指定自行簽署憑證的路徑。
下表說明可用於設定私人登錄檔的 gcloud CLI 選項。
| 參數 | 說明 |
|---|---|
image
|
登錄檔位址。例如:gcp.repository.example.com:9082/registry/example/image:latest。 |
image-repository-username-secret-id
|
用於向私人登錄檔驗證的使用者名稱,其 Secret Manager 密鑰 ID。例如:projects/example-project/secrets/username-secret。 |
image-repository-password-secret-id
|
用於向私人登錄檔驗證的密碼,其 Secret Manager 密鑰 ID。例如:projects/example-project/secrets/password-secret/versions/latest。 |
image-repository-cert-path
|
私人登錄檔的自行簽署憑證的完整 Cloud Storage 網址。如果登錄檔使用自行簽署憑證,則必須提供這個值。例如:gs://example-bucket/self-signed.crt。 |
以下是 Google Cloud CLI 指令範例,可使用私人登錄檔中含有自行簽署憑證的映像檔,建構 Flex 範本。
gcloud dataflow flex-template build gs://example-bucket/custom-pipeline-private-repo.json --sdk-language=JAVA --image="gcp.repository.example.com:9082/registry/example/image:latest" --image-repository-username-secret-id="projects/example-project/secrets/username-secret" --image-repository-password-secret-id="projects/example-project/secrets/password-secret/versions/latest" --image-repository-cert-path="gs://example-bucket/self-signed.crt" --metadata-file=metadata.json
如要建構自己的 Flex 範本,必須替換範例值,並視需要指定不同或額外選項。如要進一步瞭解相關內容,請參閱下列資源:
指定管道選項
如要瞭解 Flex 範本直接支援的管道選項,請參閱「管道選項」。
您也可以間接使用任何 Apache Beam 管道選項。如果您為 Flex 範本工作使用 metadata.json 檔案,請在檔案中加入這些管道選項。這個中繼資料檔案必須符合 TemplateMetadata 中的格式。
否則,啟動 Flex 範本工作時,請使用參數欄位傳遞這些管道選項。
如為範本中未明確公開的執行階段實驗選項或常見執行階段管道選項,請使用 additional-experiments 或 additional-pipeline-options 欄位傳遞這些選項。
API
使用
parameters欄位加入管道選項。使用
additionalExperiments和additionalPipelineOptions欄位,加入執行階段實驗和管道選項。
以下範例說明如何在要求主體中加入管道選項、實驗和其他選項:
{
"jobName": "my-flex-template-job",
"parameters": {
"option_defined_in_metadata": "value"
},
"environment": {
"additionalExperiments": [
"use_runner_v2"
],
"additionalPipelineOptions": {
"common_pipeline_option": "value"
}
}
}
gcloud
使用
parameters旗標加入管道選項。使用
additional-experiments和additional-pipeline-options旗標,加入執行階段實驗和管道選項。
傳遞 List 或 Map 類型的參數時,您可能需要在 YAML 檔案中定義參數,並使用 flags-file 旗標。
使用 Flex 範本時,您可以在管道初始化期間設定部分管道選項,但其他管道選項無法變更。如果覆寫 Flex 範本所需的指令列引數,工作可能會忽略、覆寫或捨棄範本啟動器傳遞的管道選項。工作可能無法啟動,或啟動未使用 Flex 範本的工作。詳情請參閱「無法讀取工作檔案」。
在管道初始化期間,請勿變更下列管道選項:
Java
runnerprojectjobNametemplateLocationregion
Python
runnerprojectjob_nametemplate_locationregion
Go
runnerprojectjob_nametemplate_locationregion
封鎖使用中繼資料安全殼層金鑰的 VM 專案安全殼層金鑰
您可以封鎖 VM 的專案安全殼層金鑰,防止 VM 接受儲存在專案中繼資料中的安全殼層金鑰。搭配 block_project_ssh_keys 服務選項使用 additional-experiments 旗標:
--additional-experiments=block_project_ssh_keys
詳情請參閱 Dataflow 服務選項。
中繼資料
您可以用額外的中繼資料來擴充範本,如此一來,範本執行時,就能驗證自訂參數。如要為範本建立中繼資料,請按照下列步驟操作:
中繼資料參數
| 參數鍵 | 必填 | 值說明 | |
|---|---|---|---|
name |
是 | 範本的名稱。 | |
description |
否 | 說明參數的一小段文字。 | |
streaming |
否 | 如果 true,這個範本支援串流。預設值為 false。 |
|
supportsAtLeastOnce |
否 | 如果 true,這個範本支援至少一次處理。預設值為 false。如果範本設計為搭配至少一次串流模式使用,請將這個參數設為 true。 |
|
supportsExactlyOnce |
否 | 如果 true,這個範本支援僅須處理一次。預設值為 true。 |
|
defaultStreamingMode |
否 | 如果範本同時支援至少一次模式和僅一次模式,預設串流模式為至少一次模式。請使用下列其中一個值:"AT_LEAST_ONCE"、"EXACTLY_ONCE"。如未指定,預設串流模式為恰好一次。 |
|
parameters |
否 | 範本使用的額外參數陣列。根據預設會使用空陣列。 | |
name |
是 | 範本中使用的參數名稱。 | |
label |
是 | 使用人類可讀的字串,用於在 Google Cloud 控制台中為參數加上標籤。 | |
helpText |
是 | 說明參數的一小段文字。 | |
isOptional |
否 | 如果參數為必要,則為 false;如果參數為選用,則為 true。除非設定值,否則 isOptional 預設為 false。如果中繼資料未包含這個參數鍵,中繼資料就會成為必要參數。 |
|
regexes |
否 | 字串形式的 POSIX-egrep 規則運算式陣列,用於驗證參數的值。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是單一規則運算式,可驗證開頭為一個字母、後續有一或多個字元的值。根據預設會使用空陣列。 |
中繼資料檔案範例
Java
{ "name": "Streaming Beam SQL", "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery", "parameters": [ { "name": "inputSubscription", "label": "Pub/Sub input subscription.", "helpText": "Pub/Sub subscription to read from.", "regexes": [ "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}" ] }, { "name": "outputTable", "label": "BigQuery output table", "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.", "isOptional": true, "regexes": [ "[^:]+:[^.]+[.].+" ] } ] }
Python
{ "name": "Streaming beam Python flex template", "description": "Streaming beam example for python flex template.", "parameters": [ { "name": "input_subscription", "label": "Input PubSub subscription.", "helpText": "Name of the input PubSub subscription to consume from.", "regexes": [ "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}" ] }, { "name": "output_table", "label": "BigQuery output table name.", "helpText": "Name of the BigQuery output table name.", "isOptional": true, "regexes": [ "([^:]+:)?[^.]+[.].+" ] } ] }
您可以從 Dataflow 範本目錄下載 Google 提供的範本中繼資料檔案。
瞭解暫存位置和臨時位置
執行 Flex 範本時,Google Cloud CLI 會提供 --staging-location 和 --temp-location 選項。同樣地,Dataflow REST API 也為 FlexTemplateRuntimeEnvironment 提供 stagingLocation 和 tempLocation 欄位。
如果是 Flex 範本,暫存位置是 Cloud Storage 網址,在啟動範本的暫存步驟中,檔案會寫入這個位置。Dataflow 會讀取這些暫存檔案,建立範本圖表。暫存位置是 Cloud Storage 網址,執行作業步驟期間會將暫存檔案寫入該網址。
更新 Flex 範本工作
以下範例要求說明如何使用 projects.locations.flexTemplates.launch 方法,更新範本串流工作。如要使用 gcloud CLI,請參閱「更新現有管道」。
如要更新傳統範本,請使用 projects.locations.templates.launch 。
按照步驟透過 Flex 範本建立串流工作。傳送下列 HTTP POST 要求,並附上修改的值:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }- 將
PROJECT_ID替換為專案 ID。 - 將
REGION替換為正在更新工作的 Dataflow區域。 - 將
JOB_NAME替換為要更新的工作確切名稱。 - 將
parameters設為您的鍵/值組合清單。列出的參數僅適用於這個範本範例。如果使用自訂範本,請視需要修改參數。如果您使用範本,請替換下列變數。- 將
SUBSCRIPTION_NAME替換為 Pub/Sub 訂閱名稱。 - 將
DATASET替換為 BigQuery 資料集名稱。 - 將
TABLE_NAME替換為 BigQuery 資料表名稱。
- 將
- 將
STORAGE_PATH替換為範本檔案的 Cloud Storage 位置。位置資訊開頭應為gs://。
- 將
使用
environment參數變更環境設定。如要瞭解詳情,請參閱FlexTemplateRuntimeEnvironment。選用:如要使用 curl (Linux、macOS 或 Cloud Shell) 傳送要求,請將要求儲存至 JSON 檔案,然後執行下列指令:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch將 FILE_PATH 替換為包含要求主體的 JSON 檔案路徑。
使用 Dataflow 監控介面,確認已建立相同名稱的新工作。這項工作的狀態為「已更新」。
限制
Flex 範本工作有以下限制:
- 您必須使用 Google 提供的基本映像檔,透過 Docker 封裝容器。如需適用映像檔的清單,請參閱「Flex 範本基本映像檔」。
- 建構管道的程式必須在呼叫
run後結束,管道才會啟動。 - 不支援
waitUntilFinish(Java) 和wait_until_finish(Python)。
後續步驟
- 如要進一步瞭解傳統範本、Flex 範本及其用途情境,請參閱「Dataflow 範本」。
- 如需 Flex 範本疑難排解資訊,請參閱「疑難排解 Flex 範本逾時」。
- 如要查看更多參考架構、圖表和最佳做法,請瀏覽 Cloud Architecture Center。