הפעלת צינור עיבוד נתונים באמצעות Pub/Sub

בדף הזה מוסבר איך לכתוב, לפרוס ולהפעיל צינור עיבוד נתונים באמצעות פונקציית Cloud Functions מבוססת-אירועים עם טריגר Cloud Pub/Sub. איך לעשות את זה?

  1. מגדירים צינור ML באמצעות Kubeflow Pipelines (KFP) SDK ומקמפלים אותו לקובץ YAML.

  2. מעלים את הגדרת הצינור המהודרת לקטגוריה של Cloud Storage.

  3. אתם יכולים להשתמש בפונקציות Cloud Run כדי ליצור, להגדיר ולפרוס פונקציה שמופעלת על ידי נושא Pub/Sub חדש או קיים.

הגדרה וקומפילציה של צינור עיבוד נתונים

באמצעות Kubeflow Pipelines SDK, יוצרים צינור עיבוד נתונים מתוזמן ומקמפלים אותו לקובץ YAML.

דוגמה hello-world-scheduled-pipeline:

from kfp import compiler
from kfp import dsl

# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str

# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)

# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')

העלאת קובץ YAML של צינור עיבוד מעובד לקטגוריה של Cloud Storage

  1. פותחים את הדף Cloud Storage browser במסוף Google Cloud .

    Cloud Storage Browser

  2. לוחצים על קטגוריית Cloud Storage שיצרתם כשהגדרתם את הפרויקט.

  3. מעלים את קובץ ה-YAML של צינור העיבוד (בדוגמה הזו hello_world_scheduled_pipeline.yaml) לתיקייה שנבחרה, בין אם היא קיימת או חדשה.

  4. כדי לגשת לפרטים, לוחצים על קובץ ה-YAML שהועלה. מעתיקים את gsutil URI לשימוש במועד מאוחר יותר.

יצירת פונקציות Cloud Run עם טריגר Pub/Sub

  1. נכנסים לדף פונקציות Cloud Run במסוף.

    כניסה לדף פונקציות Cloud Run

  2. לוחצים על הלחצן יצירת פונקציה.

  3. בקטע Basics (עקרונות בסיסיים), נותנים שם לפונקציה (לדוגמה, my-scheduled-pipeline-function).

  4. בקטע Trigger, בוחרים באפשרות Cloud Pub/Sub בתור סוג הטריגר.

    create function configuration choose pubsub as Trigger type image

  5. ברשימה Select a Cloud Pub/Sub topic, לוחצים על Create a topic.

  6. בתיבה יצירת נושא, נותנים שם לנושא החדש (למשל my-scheduled-pipeline-topic) ולוחצים על יצירת נושא.

  7. משאירים את כל שאר השדות כברירת מחדל ולוחצים על Save (שמירה) כדי לשמור את ההגדרות של הקטע Trigger (טריגר).

  8. משאירים את כל שאר השדות בערכי ברירת המחדל ולוחצים על הבא כדי להמשיך לקטע Code (קוד).

  9. בקטע Runtime, בוחרים באפשרות Python 3.7.

  10. בקטע נקודת כניסה, מזינים subscribe (שם הפונקציה של נקודת הכניסה של קוד הדוגמה).

  11. בקטע קוד מקור, בוחרים באפשרות עורך מוטבע אם היא לא מסומנת.

  12. מוסיפים את הקוד הבא לקובץ main.py:

      import base64
      import json
      from google.cloud import aiplatform
    
      PROJECT_ID = 'your-project-id'                     # <---CHANGE THIS
      REGION = 'your-region'                             # <---CHANGE THIS
      PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS
    
      def subscribe(event, context):
        """Triggered from a message on a Cloud Pub/Sub topic.
        Args:
              event (dict): Event payload.
              context (google.cloud.functions.Context): Metadata for the event.
        """
        # decode the event payload string
        payload_message = base64.b64decode(event['data']).decode('utf-8')
        # parse payload string into JSON object
        payload_json = json.loads(payload_message)
        # trigger pipeline run with payload
        trigger_pipeline_run(payload_json)
    
      def trigger_pipeline_run(payload_json):
        """Triggers a pipeline run
        Args:
              payload_json: expected in the following format:
                {
                  "pipeline_spec_uri": "<path-to-your-compiled-pipeline>",
                  "parameter_values": {
                    "greet_name": "<any-greet-string>"
                  }
                }
        """
        pipeline_spec_uri = payload_json['pipeline_spec_uri']
        parameter_values = payload_json['parameter_values']
    
        # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri
        aiplatform.init(
            project=PROJECT_ID,
            location=REGION,
        )
        job = aiplatform.PipelineJob(
            display_name='hello-world-pipeline-cloud-function-invocation',
            template_path=pipeline_spec_uri,
            pipeline_root=PIPELINE_ROOT,
            enable_caching=False,
            parameter_values=parameter_values
        )
    
        # Submit the PipelineJob
        job.submit()
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: Google Cloud הפרויקט שבו צינור הנתונים הזה פועל.
    • REGION: האזור שבו צינור העברת הנתונים הזה פועל.
    • PIPELINE_ROOT: מציינים URI של Cloud Storage שחשבון השירות של צינורות העיבוד יכול לגשת אליו. פריטי המידע של הפעלות צינור עיבוד הנתונים מאוחסנים בתיקיית הבסיס של צינור עיבוד הנתונים.
  13. בקובץ requirements.txt, מחליפים את התוכן בדרישות החבילה הבאות:

    google-api-python-client>=1.7.8,<2
    google-cloud-aiplatform
    
  14. לוחצים על deploy כדי לפרוס את הפונקציה.

המאמרים הבאים