本頁面說明如何使用Cloud Pub/Sub 觸發程序,透過事件驅動型 Cloud Function 撰寫、部署及觸發管道執行作業。步驟如下:
使用 Kubeflow Pipelines (KFP) SDK 定義機器學習管道,並編譯成 YAML 檔案。
將編譯的管道定義上傳至 Cloud Storage 值區。
使用 Cloud Run 函式建立、設定及部署函式,並由新的或現有的 Pub/Sub 主題觸發。
定義及編譯管道
使用 Kubeflow Pipelines SDK 建構排程管道,並編譯成 YAML 檔案。
範例 hello-world-scheduled-pipeline
:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
greeting_str = f'Hello, {message}'
print(greeting_str)
return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
package_path='hello_world_scheduled_pipeline.yaml')
將已編譯的 pipeline YAML 上傳至 Cloud Storage 值區
在 Google Cloud 控制台中開啟 Cloud Storage 瀏覽器。
按一下您在設定專案時建立的 Cloud Storage bucket。
使用現有或新的資料夾,將編譯後的管道 YAML (在本範例中為
hello_world_scheduled_pipeline.yaml
) 上傳至所選資料夾。按一下上傳的 YAML 檔案,即可存取詳細資料。複製 gsutil URI,以供稍後使用。
建立具有 Pub/Sub 觸發條件的 Cloud Run 函式
前往控制台的「Cloud Run functions」頁面。
按一下 [Create function] (建立函式) 按鈕。
在「Basics」(基本) 專區中,為函式命名 (例如
my-scheduled-pipeline-function
)。在「Trigger」(觸發條件) 部分中,選取「Cloud Pub/Sub」做為觸發條件類型。
在「Select a Cloud Pub/Sub topic」(選取 Cloud Pub/Sub 主題) 清單中,按一下「Create a topic」(建立主題)。
在「建立主題」方塊中,為新主題命名 (例如
my-scheduled-pipeline-topic
),然後選取「建立主題」。其餘欄位保留預設值,然後點選「儲存」,儲存「觸發條件」部分的設定。
其他欄位則一概保留預設值,然後按一下「下一步」,前往「程式碼」部分。
選取「Runtime」(執行階段) 底下的 [Python 3.7]。
在「Entry point」(進入點) 中輸入「subscribe」(範例程式碼進入點函式名稱)。
在「Source code」(原始碼) 下,選取「Inline Editor」(內嵌編輯器) (如果尚未選取)。
在
main.py
檔案中,新增下列程式碼:import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()
更改下列內容:
- PROJECT_ID:這個管道執行的 Google Cloud 專案。
- REGION:這個管道執行的區域。
- PIPELINE_ROOT:指定管道服務帳戶可存取的 Cloud Storage URI。管道執行的構件會儲存在管道根目錄中。
在
requirements.txt
檔案中,將內容替換為下列套件 需求:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
按一下「deploy」(部署) 即可部署函式。
後續步驟
- 進一步瞭解 Google Cloud Pub/Sub。
- 分析管道結果並予以視覺化呈現。
- 瞭解如何在 Cloud Run 中從 Pub/Sub 事件建立觸發條件。
- 如要查看使用 Pub/Sub 的程式碼範例,請參閱Google Cloud 範例瀏覽器。