הפעלת DAGs של Managed Service for Apache Airflow באמצעות פונקציות Cloud Run ו-API בארכיטקטורת REST של Airflow

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

בדף הזה מוסבר איך להשתמש בפונקציות של Cloud Run כדי להפעיל DAG של Managed Service for Apache Airflow בתגובה לאירועים.

‫Apache Airflow מיועד להרצת DAGs בלוח זמנים קבוע, אבל אפשר גם להפעיל DAGs בתגובה לאירועים. אחת הדרכים לעשות את זה היא להשתמש בפונקציות Cloud Run כדי להפעיל DAGs מנוהלים של Airflow כשמתרחש אירוע ספציפי.

בדוגמה שבמדריך הזה, DAG מופעל בכל פעם שמתרחש שינוי בקטגוריה של Cloud Storage. שינויים באובייקט כלשהו בקטגוריה מפעילים פונקציה. הפונקציה הזו שולחת בקשה ל-Airflow API בארכיטקטורת REST של סביבת Managed Airflow. ‫Airflow מעבד את הבקשה הזו ומריץ DAG. ה-DAG מוציא מידע על השינוי.

לפני שמתחילים

בדיקת הגדרות הרשת בסביבה

הפתרון הזה לא פועל בהגדרות של כתובות IP פרטיות ו-VPC Service Controls, כי אי אפשר להגדיר קישוריות מפונקציות Cloud Run לשרת האינטרנט של Airflow בהגדרות האלה.

ב-Managed Airflow (דור 2), אפשר להשתמש בגישה אחרת: הפעלת DAG באמצעות פונקציות Cloud Run והודעות Pub/Sub

הפעלת ממשקי API בפרויקט

המסוף

מפעילים את ממשקי ה-API של Managed Airflow ושל פונקציות Cloud Run.

תפקידים שנדרשים להפעלת ממשקי API

כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

הפעלת ממשקי ה-API

gcloud

מפעילים את ממשקי ה-API של פונקציות Managed Airflow ו-Cloud Run:

תפקידים שנדרשים להפעלת ממשקי API

כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

הפעלת Airflow API בארכיטקטורת REST

בהתאם לגרסת Airflow:

התרת קריאות API ל-Airflow API בארכיטקטורת REST באמצעות בקרת גישה של שרת האינטרנט

פונקציות Cloud Run יכולות לגשת ל-Airflow API בארכיטקטורת REST באמצעות כתובת IPv4 או IPv6.

אם אתם לא בטוחים מה יהיה טווח כתובות ה-IP של הקריאה, כדאי להשתמש באפשרות ברירת המחדל להגדרת בקרת הגישה לשרת האינטרנט, שהיא All IP addresses have access (default), כדי שלא תחסמו בטעות את הפונקציות של Cloud Run.

יצירת קטגוריה של Cloud Storage

בדוגמה הזו מופעל DAG בתגובה לשינויים בקטגוריה של Cloud Storage. צריך ליצור קטגוריה חדשה כדי להשתמש בה בדוגמה הזו.

קבלת כתובת ה-URL של שרת האינטרנט של Airflow

בדוגמה הזו מתבצעות בקשות ל-API בארכיטקטורת REST לנקודת הקצה של שרת האינטרנט של Airflow. משתמשים בחלק של כתובת ה-URL של ממשק האינטרנט של Airflow לפני .appspot.com בקוד של Cloud Function.

המסוף

  1. נכנסים לדף Environments במסוף Google Cloud .

    מעבר אל Environments

  2. לוחצים על שם הסביבה.

  3. בדף Environment details, עוברים לכרטיסייה Environment configuration.

  4. כתובת ה-URL של שרת האינטרנט של Airflow מופיעה בפריט Airflow web UI.

gcloud

מריצים את הפקודה הבאה:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

מחליפים את:

  • ENVIRONMENT_NAME בשם הסביבה.
  • LOCATION עם האזור שבו הסביבה ממוקמת.

קבלת מזהה הלקוח של שרת ה-IAM proxy

כדי לשלוח בקשה לנקודת קצה ל-API בארכיטקטורת REST של Airflow, הפונקציה צריכה את מזהה הלקוח של שרת proxy לניהול זהויות והרשאות גישה שמגן על שרת האינטרנט של Airflow.

ב-Managed Airflow, המידע הזה לא זמין באופן ישיר. במקום זאת, שולחים בקשה לא מאומתת לשרת האינטרנט של Airflow ומתעדים את מזהה הלקוח מכתובת ה-URL להפניה אוטומטית:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

מחליפים את AIRFLOW_URL בכתובת ה-URL של ממשק האינטרנט של Airflow.

בפלט, מחפשים את המחרוזת שאחרי client_id. לדוגמה:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

שומרים את הקוד הבא בקובץ בשם get_client_id.py. ממלאים את הערכים של project_id,‏ location ו-composer_environment, ואז מריצים את הקוד ב-Cloud Shell או בסביבה המקומית.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

העלאת DAG לסביבה

העלאת DAG לסביבה הפלט של ה-DAG הבא הוא הגדרת ההרצה של ה-DAG שהתקבלה. מפעילים את ה-DAG הזה מפונקציה שיוצרים בהמשך המדריך.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

פריסת פונקציה של Cloud Functions שמפעילה את ה-DAG

אפשר לפרוס פונקציית Cloud באמצעות השפה המועדפת שנתמכת על ידי פונקציות Cloud Run או Cloud Run. במדריך הזה מוצגת פונקציית Cloud Functions שהוטמעה ב-Python וב-Java.

ציון פרמטרים להגדרת Cloud Functions

  • Trigger. בדוגמה הזו, בוחרים טריגר שפועל כשנוצר אובייקט חדש בקטגוריה, או כשמתבצע שכתוב של אובייקט קיים.

    • סוג הטריגר. ‫Cloud Storage.

    • סוג האירוע. סיום / יצירה.

    • Bucket. בוחרים קטגוריה שתפעיל את הפונקציה הזו.

    • ניסיון חוזר במקרה של כשל. לצורך הדוגמה הזו, מומלץ להשבית את האפשרות הזו. אם אתם משתמשים בפונקציה משלכם בסביבת ייצור, כדאי להפעיל את האפשרות הזו כדי לטפל בשגיאות זמניות.

  • חשבון שירות של זמן ריצה, בקטע הגדרות של זמן ריצה, build, חיבורים ואבטחה. אפשר לבחור באחת מהאפשרויות הבאות, בהתאם להעדפות שלכם:

    • בוחרים באפשרות חשבון השירות של Compute Engine שמוגדר כברירת מחדל. עם הרשאות IAM שמוגדרות כברירת מחדל, החשבון הזה יכול להריץ פונקציות שיש להן גישה לסביבות Managed Airflow.

    • יוצרים חשבון שירות בהתאמה אישית עם התפקיד Composer User ומציינים אותו כחשבון שירות של זמן הריצה של הפונקציה. האפשרות הזו מבוססת על העיקרון של הרשאות מינימליות.

  • Runtime and entry point (זמן ריצה ונקודת כניסה), בשלב Code (קוד). כשמוסיפים קוד לדוגמה הזו, בוחרים את זמן הריצה Python 3.7 או גרסה חדשה יותר ומציינים את trigger_dag כנקודת הכניסה.

הוספת דרישות

מציינים את יחסי התלות בקובץ requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

מוסיפים את הקוד הבא לקובץ main.py ומבצעים את ההחלפות הבאות:

  • מחליפים את הערך של המשתנה client_id בערך client_id שקיבלתם קודם.

  • מחליפים את הערך של המשתנה webserver_id במזהה פרויקט הדייר, שהוא חלק מכתובת ה-URL של ממשק האינטרנט של Airflow לפני .appspot.com. כתובת ה-URL של ממשק האינטרנט של Airflow הופיעה בשלב מוקדם יותר.

  • מציינים את הגרסה של Airflow API בארכיטקטורת REST שבה משתמשים:

    • אם אתם משתמשים ב-Airflow API בארכיטקטורת REST היציב, צריך להגדיר את המשתנה USE_EXPERIMENTAL_API לערך False.
    • אם אתם משתמשים ב-Airflow API בארכיטקטורת REST הניסיוני, לא צריך לבצע שינויים. המשתנה USE_EXPERIMENTAL_API כבר מוגדר לערך True.


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

בדיקת הפונקציה

כדי לוודא שהפונקציה ו-DAG פועלים כמצופה:

  1. מחכים עד שהפונקציה תופעל.
  2. מעלים קובץ לקטגוריה של Cloud Storage. לחלופין, אפשר להפעיל את הפונקציה באופן ידני על ידי בחירת הפעולה Test the function (בדיקת הפונקציה) במסוף Google Cloud .
  3. בודקים את דף ה-DAG בממשק האינטרנט של Airflow. ל-DAG צריך להיות הפעלה אחת פעילה או שכבר הסתיימה.
  4. בממשק המשתמש של Airflow, בודקים את יומני המשימות של ההרצה הזו. אפשר לראות שהמשימה print_gcs_info מוציאה את הנתונים שהתקבלו מהפונקציה ליומנים:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

המאמרים הבאים