גישה ל-Airflow REST API

Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)

ל-Apache Airflow יש ממשק API בארכיטקטורת REST שאפשר להשתמש בו כדי לבצע משימות כמו קבלת מידע על הפעלות ומשימות של DAG, עדכון DAG, קבלת הגדרות של Airflow, הוספה ומחיקה של חיבורים ורישום משתמשים.

דוגמה לשימוש ב-Airflow API בארכיטקטורת REST עם פונקציות Cloud Run מופיעה במאמר בנושא הפעלת DAG באמצעות פונקציות Cloud Run.

גרסאות של Airflow REST API

הגדרת Airflow API בארכיטקטורת REST

ה-API בארכיטקטורת REST של Airflow v1 מופעל כברירת מחדל ב-Airflow 2.

ב-Managed Airflow נעשה שימוש בקצה עורפי משלו לאימות API.

ההרשאה פועלת בדרך הרגילה ש-Airflow מספק. כשמשתמש חדש מאשר גישה דרך ה-API, החשבון של המשתמש מקבל את התפקיד Op כברירת מחדל.

אפשר להפעיל או להשבית את API בארכיטקטורת REST של Airflow, או לשנות את תפקיד המשתמש שמוגדר כברירת מחדל על ידי שינוי מברירת המחדל אפשרויות ההגדרה הבאות של Airflow:

קטע מפתח ערך הערות
api ‫(Airflow 2.2.5 ומגרסאות קודמות) auth_backend
‫(Airflow 2.3.0 ומגרסאות מאוחרות יותר) auth_backends
airflow.composer.api.backend.composer_auth כדי להשבית את ה-API היציב ל-REST, משנים את הערך ל- airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op אפשר לציין כל תפקיד אחר.

התרת קריאות ל-API בארכיטקטורת REST של Airflow באמצעות בקרת גישה לשרת האינטרנט

בהתאם לשיטה שבה משתמשים כדי לקרוא ל-Airflow API בארכיטקטורת REST, שיטת הקריאה יכולה להשתמש בכתובת IPv4 או IPv6. חשוב לזכור לבטל את החסימה של תנועת ה-IP אל Airflow REST API באמצעות בקרת גישה לשרת האינטרנט.

אם אתם לא בטוחים מאיזה כתובות IP יישלחו הקריאות שלכם ל-Airflow API בארכיטקטורת REST, אתם יכולים להשתמש באפשרות ברירת המחדל להגדרה, שהיא All IP addresses have access (default).

ביצוע קריאות ל-Airflow API בארכיטקטורת REST

בקטע הזה מופיעה דוגמה לסקריפט Python שאפשר להשתמש בו כדי להפעיל DAG באמצעות Airflow API בארכיטקטורת REST.

בסקריפט, מגדירים את המשתנים הבאים:

  • dag_id: שם של DAG, כפי שמוגדר בקובץ המקור של ה-DAG.
  • dag_config: הגדרות להרצת DAG.
  • web_server_url: כתובת ה-URL של שרת האינטרנט של Airflow. הפורמט הוא https://<web-server-id>.composer.googleusercontent.com.

  • (Airflow 3) logical_date: התאריך הלוגי של הפעלת ה-DAG.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

גישה ל-Airflow API בארכיטקטורת REST באמצעות חשבון שירות

במסד הנתונים של Airflow בגרסאות Airflow שקודמות ל-2.3.0, האורך של שדה האימייל מוגבל ל-64 תווים. לפעמים לחשבונות שירות יש כתובות אימייל שארוכות מ-64 תווים. אי אפשר ליצור משתמשי Airflow לחשבונות שירות כאלה בדרך הרגילה. אם אין משתמש Airflow לחשבון שירות כזה, גישה ל-Airflow API בארכיטקטורת REST תגרום לשגיאות HTTP 401 ו-403.

כפתרון עקיף, אפשר לרשום מראש משתמש Airflow לחשבון שירות. כדי לעשות זאת, משתמשים ב-accounts.google.com:NUMERIC_USER_ID כשם המשתמש ובמחרוזת ייחודית כלשהי ככתובת האימייל.

  1. כדי לקבל את NUMERIC_USER_ID לחשבון שירות, מריצים את הפקודה:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    מחליפים את:

    • SA_NAME בשם של חשבון השירות.
    • PROJECT_ID במזהה הפרויקט (Project ID).
  2. יוצרים משתמש Airflow עם התפקיד Op לחשבון השירות:

    ממשק המשתמש של Airflow

    1. עוברים לממשק המשתמש של Airflow.

    2. נכנסים אל אבטחה > רשימת משתמשים ולוחצים על הוספת רשומה חדשה. למשתמש שלכם ב-Airflow צריך להיות תפקיד Admin כדי לפתוח את הדף הזה.

    3. מציינים את accounts.google.com:NUMERIC_USER_ID כשם המשתמש. מחליפים את NUMERIC_USER_ID במזהה המשתמש שהתקבל בשלב הקודם.

    4. מציינים מזהה ייחודי בתור האימייל. אפשר להשתמש בכל מחרוזת ייחודית.

    5. מציינים את התפקיד של המשתמש. לדוגמה, Op.

    6. מוודאים שהתיבה פעיל? מסומנת.

    7. מציינים את השם הפרטי ואת שם המשפחה של המשתמש. אפשר להשתמש בכל מחרוזת.

    8. לוחצים על Save.

    gcloud

    מריצים את הפקודה הבאה ב-CLI של Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    מחליפים את:

    • ENVIRONMENT_NAME בשם הסביבה.
    • LOCATION עם האזור שבו הסביבה ממוקמת.
    • NUMERIC_USER_ID עם מזהה המשתמש שהתקבל בשלב הקודם.
    • UNIQUE_ID מחליפים במזהה של משתמש Airflow. אפשר להשתמש בכל מחרוזת ייחודית.
  3. אחרי שיוצרים משתמש Airflow לחשבון שירות, מתבצעת כניסה ל-Airflow של מי שקורא ל-API ומאומת כחשבון השירות, והוא מזוהה כמשתמש רשום מראש.

התאמה לעומס (scaling) של רכיב Airflow API בארכיטקטורת REST

נקודות הקצה של Airflow API בארכיטקטורת REST ושל Airflow UI מופעלות בתוך שרת האינטרנט של Airflow. אם אתם משתמשים ב-API בארכיטקטורת REST באופן אינטנסיבי, כדאי להגדיל את כמות המעבד (CPU) והזיכרון שזמינים לשרת האינטרנט של Airflow, בהתאם לעומס הצפוי.

המאמרים הבאים