Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)
בדף הזה מוסבר איך להשתמש בפונקציות של Cloud Run כדי להפעיל DAG של Managed Service for Apache Airflow בתגובה לאירועים.
Apache Airflow מיועד להרצת DAGs בלוח זמנים קבוע, אבל אפשר גם להפעיל DAGs בתגובה לאירועים. אחת הדרכים לעשות את זה היא להשתמש בפונקציות Cloud Run כדי להפעיל DAGs מנוהלים של Airflow כשמתרחש אירוע ספציפי.
בדוגמה שבמדריך הזה, DAG מופעל בכל פעם שמתרחש שינוי בקטגוריה של Cloud Storage. שינויים באובייקט כלשהו בקטגוריה מפעילים פונקציה. הפונקציה הזו שולחת בקשה ל-Airflow API בארכיטקטורת REST של סביבת Managed Airflow. Airflow מעבד את הבקשה הזו ומריץ DAG. ה-DAG מוציא מידע על השינוי.
לפני שמתחילים
בדיקת הגדרות הרשת בסביבה
הפתרון הזה לא פועל בהגדרות של כתובות IP פרטיות ו-VPC Service Controls, כי אי אפשר להגדיר קישוריות מפונקציות Cloud Run לשרת האינטרנט של Airflow בהגדרות האלה.
ב-Managed Airflow (דור 2), אפשר להשתמש בגישה אחרת: הפעלת DAG באמצעות פונקציות Cloud Run והודעות Pub/Sub
הפעלת ממשקי API בפרויקט
המסוף
מפעילים את ממשקי ה-API של Managed Airflow ושל פונקציות Cloud Run.
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים
gcloud
מפעילים את ממשקי ה-API של פונקציות Managed Airflow ו-Cloud Run:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
הפעלת Airflow API בארכיטקטורת REST
בהתאם לגרסת Airflow:
- ב-Airflow 2, ממשק ה-API היציב ל-REST כבר מופעל כברירת מחדל. אם ה-API היציב מושבת בסביבה שלכם, צריך להפעיל את ה-API היציב ל-REST.
- ב-Airflow 1, מפעילים את ה-API הניסיוני ל-REST.
התרת קריאות API ל-Airflow API בארכיטקטורת REST באמצעות בקרת גישה של שרת האינטרנט
פונקציות Cloud Run יכולות לגשת ל-Airflow API בארכיטקטורת REST באמצעות כתובת IPv4 או IPv6.
אם אתם לא בטוחים מה יהיה טווח כתובות ה-IP של הקריאה, כדאי להשתמש באפשרות ברירת המחדל להגדרת בקרת הגישה לשרת האינטרנט, שהיא All IP addresses have access (default), כדי שלא תחסמו בטעות את הפונקציות של Cloud Run.
יצירת קטגוריה של Cloud Storage
בדוגמה הזו מופעל DAG בתגובה לשינויים בקטגוריה של Cloud Storage. צריך ליצור קטגוריה חדשה כדי להשתמש בה בדוגמה הזו.
קבלת כתובת ה-URL של שרת האינטרנט של Airflow
בדוגמה הזו מתבצעות בקשות ל-API בארכיטקטורת REST לנקודת הקצה של שרת האינטרנט של Airflow.
משתמשים בחלק של כתובת ה-URL של ממשק האינטרנט של Airflow לפני .appspot.com בקוד של Cloud Function.
המסוף
נכנסים לדף Environments במסוף Google Cloud .
לוחצים על שם הסביבה.
בדף Environment details, עוברים לכרטיסייה Environment configuration.
כתובת ה-URL של שרת האינטרנט של Airflow מופיעה בפריט Airflow web UI.
gcloud
מריצים את הפקודה הבאה:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
מחליפים את:
-
ENVIRONMENT_NAMEבשם הסביבה. -
LOCATIONעם האזור שבו הסביבה ממוקמת.
קבלת מזהה הלקוח של שרת ה-IAM proxy
כדי לשלוח בקשה לנקודת קצה ל-API בארכיטקטורת REST של Airflow, הפונקציה צריכה את מזהה הלקוח של שרת proxy לניהול זהויות והרשאות גישה שמגן על שרת האינטרנט של Airflow.
ב-Managed Airflow, המידע הזה לא זמין באופן ישיר. במקום זאת, שולחים בקשה לא מאומתת לשרת האינטרנט של Airflow ומתעדים את מזהה הלקוח מכתובת ה-URL להפניה אוטומטית:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
מחליפים את AIRFLOW_URL בכתובת ה-URL של ממשק האינטרנט של Airflow.
בפלט, מחפשים את המחרוזת שאחרי client_id. לדוגמה:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
שומרים את הקוד הבא בקובץ בשם get_client_id.py. ממלאים את הערכים של project_id, location ו-composer_environment, ואז מריצים את הקוד ב-Cloud Shell או בסביבה המקומית.
העלאת DAG לסביבה
העלאת DAG לסביבה הפלט של ה-DAG הבא הוא הגדרת ההרצה של ה-DAG שהתקבלה. מפעילים את ה-DAG הזה מפונקציה שיוצרים בהמשך המדריך.
פריסת פונקציה של Cloud Functions שמפעילה את ה-DAG
אפשר לפרוס פונקציית Cloud באמצעות השפה המועדפת שנתמכת על ידי פונקציות Cloud Run או Cloud Run. במדריך הזה מוצגת פונקציית Cloud Functions שהוטמעה ב-Python וב-Java.
ציון פרמטרים להגדרת Cloud Functions
Trigger. בדוגמה הזו, בוחרים טריגר שפועל כשנוצר אובייקט חדש בקטגוריה, או כשמתבצע שכתוב של אובייקט קיים.
סוג הטריגר. Cloud Storage.
סוג האירוע. סיום / יצירה.
Bucket. בוחרים קטגוריה שתפעיל את הפונקציה הזו.
ניסיון חוזר במקרה של כשל. לצורך הדוגמה הזו, מומלץ להשבית את האפשרות הזו. אם אתם משתמשים בפונקציה משלכם בסביבת ייצור, כדאי להפעיל את האפשרות הזו כדי לטפל בשגיאות זמניות.
חשבון שירות של זמן ריצה, בקטע הגדרות של זמן ריצה, build, חיבורים ואבטחה. אפשר לבחור באחת מהאפשרויות הבאות, בהתאם להעדפות שלכם:
בוחרים באפשרות חשבון השירות של Compute Engine שמוגדר כברירת מחדל. עם הרשאות IAM שמוגדרות כברירת מחדל, החשבון הזה יכול להריץ פונקציות שיש להן גישה לסביבות Managed Airflow.
יוצרים חשבון שירות בהתאמה אישית עם התפקיד Composer User ומציינים אותו כחשבון שירות של זמן הריצה של הפונקציה. האפשרות הזו מבוססת על העיקרון של הרשאות מינימליות.
Runtime and entry point (זמן ריצה ונקודת כניסה), בשלב Code (קוד). כשמוסיפים קוד לדוגמה הזו, בוחרים את זמן הריצה Python 3.7 או גרסה חדשה יותר ומציינים את
trigger_dagכנקודת הכניסה.
הוספת דרישות
מציינים את יחסי התלות בקובץ requirements.txt:
מוסיפים את הקוד הבא לקובץ main.py ומבצעים את ההחלפות הבאות:
מחליפים את הערך של המשתנה
client_idבערךclient_idשקיבלתם קודם.מחליפים את הערך של המשתנה
webserver_idבמזהה פרויקט הדייר, שהוא חלק מכתובת ה-URL של ממשק האינטרנט של Airflow לפני.appspot.com. כתובת ה-URL של ממשק האינטרנט של Airflow הופיעה בשלב מוקדם יותר.מציינים את הגרסה של Airflow API בארכיטקטורת REST שבה משתמשים:
- אם אתם משתמשים ב-Airflow API בארכיטקטורת REST היציב, צריך להגדיר את המשתנה
USE_EXPERIMENTAL_APIלערךFalse. - אם אתם משתמשים ב-Airflow API בארכיטקטורת REST הניסיוני, לא צריך לבצע שינויים. המשתנה
USE_EXPERIMENTAL_APIכבר מוגדר לערךTrue.
- אם אתם משתמשים ב-Airflow API בארכיטקטורת REST היציב, צריך להגדיר את המשתנה
בדיקת הפונקציה
כדי לוודא שהפונקציה ו-DAG פועלים כמצופה:
- מחכים עד שהפונקציה תופעל.
- מעלים קובץ לקטגוריה של Cloud Storage. לחלופין, אפשר להפעיל את הפונקציה באופן ידני על ידי בחירת הפעולה Test the function (בדיקת הפונקציה) במסוף Google Cloud .
- בודקים את דף ה-DAG בממשק האינטרנט של Airflow. ל-DAG צריך להיות הפעלה אחת פעילה או שכבר הסתיימה.
- בממשק המשתמש של Airflow, בודקים את יומני המשימות של ההרצה הזו. אפשר לראות שהמשימה
print_gcs_infoמוציאה את הנתונים שהתקבלו מהפונקציה ליומנים:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
המאמרים הבאים
- גישה לממשק המשתמש של Airflow
- גישה ל-Airflow API בארכיטקטורת REST
- כתיבת תרשימי DAG
- כתיבה של פונקציות Cloud Run
- טריגרים של Cloud Storage