הפעלת עומסי עבודה של Managed Service for Apache Spark באמצעות Managed Airflow

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

בדף הזה מוסבר איך להשתמש ב-Managed Airflow (דור 2) כדי להריץ עומסי עבודה של Managed Service for Apache Spark ב-Google Cloud.

בדוגמאות שבקטעים הבאים מוסבר איך להשתמש באופרטורים לניהול עומסי עבודה של עיבוד ברצף (batch) ב-Managed Service for Apache Spark. משתמשים באופרטורים האלה ב-DAG שיוצרים, מוחקים, מפרטים ומקבלים עומס עבודה של עיבוד ברצף (batch processing) ב-Managed Service for Apache Spark:

לפני שמתחילים

  1. מפעילים את Dataproc API:

    המסוף

    מפעילים את Managed Service for Apache Spark API.

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    להפעלת ה-API

    gcloud

    מפעילים את Managed Service for Apache Spark API:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataproc.googleapis.com

  2. בוחרים את המיקום של קובץ עומס העבודה של האצווה. אפשר להשתמש באחת מהאפשרויות הבאות:

    • יוצרים קטגוריה של Cloud Storage שבה הקובץ יישמר.
    • שימוש בקטגוריה של הסביבה. מכיוון שלא צריך לסנכרן את הקובץ הזה עם Airflow, אפשר ליצור תיקיית משנה נפרדת מחוץ לתיקיות /dags או /data. לדוגמה, /batches.
    • שימוש בקטגוריה קיימת.

הגדרת קבצים ומשתני Airflow

בקטע הזה נדגים איך להגדיר קבצים ומשתני Airflow לצורך המדריך הזה.

העלאת קובץ של עומס עבודה של ML ב-Managed Service for Apache Spark לקטגוריה

עומס העבודה במדריך הזה מפעיל סקריפט pyspark:

  1. שומרים סקריפט pyspark בקובץ מקומי בשם spark-job.py. לדוגמה, אפשר להשתמש בסקריפט לדוגמה של pyspark.

  2. מעלים את הקובץ למיקום שבחרתם בקטע לפני שמתחילים.

הגדרת משתני Airflow

בדוגמאות שבקטעים הבאים נעשה שימוש במשתני Airflow. אתם מגדירים ערכים למשתנים האלה ב-Airflow, ואז קוד ה-DAG יכול לגשת לערכים האלה.

בדוגמאות במדריך הזה נעשה שימוש במשתני Airflow הבאים. אפשר להגדיר אותם לפי הצורך, בהתאם לדוגמה שבה משתמשים.

מגדירים את משתני Airflow הבאים לשימוש בקוד ה-DAG:

שימוש במסוף ובממשק המשתמש של Airflow כדי להגדיר כל משתנה של Airflow Google Cloud

  1. במסוף Google Cloud , עוברים לדף Environments.

    מעבר אל Environments

  2. ברשימת הסביבות, לוחצים על הקישור Airflow של הסביבה. ממשק המשתמש של Airflow נפתח.

  3. בממשק המשתמש של Airflow, בוחרים באפשרות Admin (ניהול) > Variables (משתנים).

  4. לוחצים על הוספת רשומה חדשה.

  5. מציינים את שם המשתנה בשדה Key ומגדירים את הערך שלו בשדה Val.

  6. לוחצים על Save.

יצירת שרת היסטוריה מתמשך

כדי לראות את קובצי ההיסטוריה של Spark של עומסי העבודה של אצווה, אפשר להשתמש בשרת היסטוריה מתמשך (PHS):

  1. יצירת שרת היסטוריה מתמשך.
  2. מוודאים שציינתם את השם של אשכול PHS בphs_cluster משתנה Airflow.

DataprocCreateBatchOperator

ה-DAG הבא מתחיל עומס עבודה של עיבוד ברצף (batch) ב-Managed Service for Apache Spark.

למידע נוסף על ארגומנטים של DataprocCreateBatchOperator, אפשר לעיין בקוד המקור של האופרטור.

מידע נוסף על מאפיינים שאפשר להעביר בפרמטר batch של DataprocCreateBatchOperator זמין בתיאור של המחלקה Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

שימוש בקובץ אימג' מותאם אישית של קונטיינר עם DataprocCreateBatchOperator

בדוגמה הבאה אפשר לראות איך משתמשים בקובץ אימג' של קונטיינר מותאם אישית כדי להריץ את עומסי העבודה. אפשר להשתמש בקונטיינר בהתאמה אישית, למשל, כדי להוסיף תלות ב-Python שלא מסופקת על ידי קובץ האימג' של הקונטיינר שמוגדר כברירת מחדל.

כדי להשתמש בקובץ אימג' של קונטיינר מותאם אישית:

  1. יוצרים קובץ אימג' של קונטיינר בהתאמה אישית ומעלים אותו ל-Container Registry.

  2. מציינים את התמונה בimage_name משתנה Airflow.

  3. שימוש ב-DataprocCreateBatchOperator עם קובץ אימג' בהתאמה אישית:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

שימוש בשירות Dataproc Metastore עם DataprocCreateBatchOperator

כדי להשתמש בשירות Dataproc Metastore מ-DAG:

  1. בודקים ששירות המאגר של המטא-נתונים כבר הופעל.

    במאמר הפעלה והשבתה של Dataproc Metastore מוסבר איך להפעיל שירות של metastore.

    מידע מפורט על אופרטור Batch ליצירת ההגדרה זמין במאמר PeripheralsConfig.

  2. אחרי ששירות המטא-חנות מופעל ופועל, מציינים את השם שלו במשתנה metastore_cluster ואת האזור שלו בregion_name משתנה Airflow.

  3. שימוש בשירות של מאגר המטא-נתונים ב-DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

אפשר להשתמש ב-DataprocDeleteBatchOperator כדי למחוק אצווה על סמך מזהה האצווה של עומס העבודה.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

‫DataprocDeleteBatchOperator מציג רשימה של חבילות שקיימות בתוך project_id ואזור נתונים נתונים.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

‫DataprocGetBatchOperator מאחזר עומס עבודה ספציפי באצווה.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

המאמרים הבאים