Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)
בדף הזה מוסבר איך ליצור ארכיטקטורת דחיפה מבוססת-אירועים על ידי הפעלה של DAG של Managed Airflow בתגובה לשינויים בנושא Pub/Sub. הדוגמאות במדריך הזה מדגימות איך לטפל במחזור המלא של ניהול Pub/Sub, כולל ניהול מינויים, כחלק מתהליך DAG. האפשרות הזו מתאימה לחלק מתרחישי השימוש הנפוצים שבהם צריך להפעיל DAG, אבל לא רוצים להגדיר הרשאות גישה נוספות.
לדוגמה, אפשר להשתמש בהודעות שנשלחות דרך Pub/Sub כפתרון אם לא רוצים לספק גישה ישירה לסביבת Managed Airflow מטעמי אבטחה. אפשר להגדיר פונקציית Cloud Run שיוצרת הודעות Pub/Sub ומפרסמת אותן בנושא Pub/Sub. אחר כך תוכלו ליצור DAG ששולף הודעות Pub/Sub ומטפל בהן.
בדוגמה הספציפית הזו, יוצרים פונקציית Cloud Run ופורסים שני DAG. ה-DAG הראשון שולף הודעות Pub/Sub ומפעיל את ה-DAG השני בהתאם לתוכן ההודעה ב-Pub/Sub.
המדריך הזה מתבסס על ההנחה שאתם מכירים את Python ואת מסוף Google Cloud .
מטרות
עלויות
במדריך הזה השתמשנו ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:
- Managed Airflow (אפשר גם לעיין בעלויות נוספות)
- Pub/Sub
- פונקציות Cloud Run
כדי להימנע מחיובים נוספים אחרי שסיימתם את המדריך, תוכלו למחוק את המשאבים שיצרתם. פרטים נוספים מופיעים במאמר בנושא הסרת המשאבים.
לפני שמתחילים
במדריך הזה תצטרכו Google Cloud פרויקט. מגדירים את הפרויקט באופן הבא:
במסוף Google Cloud , בוחרים פרויקט או יוצרים פרויקט:
מוודאים שהחיוב מופעל בפרויקט. איך בודקים אם החיוב מופעל בפרויקט
כדי ליצור את המשאבים הנדרשים, צריך לוודא שלמשתמש בפרויקט יש את התפקידים הבאים: Google Cloud
- משתמש בחשבון שירות (
roles/iam.serviceAccountUser) - עורך Pub/Sub (
roles/pubsub.editor) - אדמין של סביבה ואובייקטים באחסון
(
roles/composer.environmentAndStorageObjectAdmin) - אדמין של פונקציות Cloud Run (
roles/cloudfunctions.admin) - כלי לצפייה ביומנים (
roles/logging.viewer)
- משתמש בחשבון שירות (
מוודאים שלחשבון השירות שמריץ את פונקציית Cloud Run יש הרשאות מספיקות בפרויקט כדי לגשת ל-Pub/Sub. כברירת מחדל, פונקציות Cloud Run משתמשות בחשבון השירות שמוגדר כברירת מחדל ב-App Engine. לחשבון השירות הזה יש תפקיד עריכה, עם הרשאות מספיקות לשימוש במדריך הזה.
הפעלת ממשקי API בפרויקט
המסוף
מפעילים את Managed Airflow API, את Cloud Run functions API ואת Pub/Sub API.
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים
gcloud
מפעילים את ממשקי ה-API של Managed Airflow, פונקציות Cloud Run ו-Pub/Sub:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
כדי להפעיל את Cloud Composer API בפרויקט, מוסיפים את הגדרות המשאבים הבאות לסקריפט Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
מחליפים את <PROJECT_ID> במזהה הפרויקט של הפרויקט. לדוגמה, example-project.
יצירת סביבת Managed Airflow
איך יוצרים סביבת Managed Airflow (דור 2)
במסגרת התהליך הזה, מקצים את התפקיד Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) לחשבון של סוכן השירות של Composer. Managed Airflow משתמש בחשבון הזה כדי לבצע פעולות בפרויקט Google Cloud שלכם.
יוצרים נושא Pub/Sub
בדוגמה הזו מופעל DAG בתגובה להודעה שנשלחת לנושא Pub/Sub. יוצרים נושא Pub/Sub לשימוש בדוגמה הזו:
המסוף
נכנסים לדף Pub/Sub Topics במסוף Google Cloud .
לוחצים על יצירת נושא.
בשדה Topic ID (מזהה הנושא), מזינים את הערך
dag-topic-triggerכמזהה של הנושא.משאירים את ערכי ברירת המחדל בשאר האפשרויות.
לוחצים על יצירת נושא.
gcloud
כדי ליצור נושא, מריצים את הפקודה gcloud pubsub topics create ב-Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
מוסיפים את הגדרות המשאבים הבאות לסקריפט Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
מחליפים את <PROJECT_ID> במזהה הפרויקט של הפרויקט. לדוגמה, example-project.
העלאת קובצי DAG
מעלים DAG לסביבה:
- שומרים את קובץ ה-DAG הבא במחשב המקומי.
- מחליפים את
<PROJECT_ID>במזהה הפרויקט של הפרויקט. לדוגמה,example-project. - מעלים את קובץ ה-DAG הערוך לסביבה שלכם.
קוד לדוגמה מכיל שני DAG: trigger_dag ו-target_dag.
DAG trigger_dag נרשם לנושא Pub/Sub, שולף הודעות Pub/Sub ומפעיל DAG אחר שצוין במזהה ה-DAG של נתוני ההודעה ב-Pub/Sub. בדוגמה הזו, trigger_dag מפעיל
את target_dag DAG, שמוציא הודעות ליומני המשימות.
גרף ה-DAG trigger_dag מכיל את המשימות הבאות:
-
subscribe_task: הרשמה לנושא Pub/Sub. -
pull_messages_operator: קריאת נתוני הודעה ב-Pub/Sub באמצעותPubSubPullOperator. -
trigger_target_dag: הפעלה של DAG אחר (בדוגמה הזו,target_dag) בהתאם לנתונים בהודעות שנשלפו מנושא Pub/Sub.
target_dag ה-DAG מכיל רק משימה אחת: output_to_logs. המשימה הזו מדפיסה הודעות ביומן המשימות עם השהיה של שנייה אחת.
פריסה של פונקציית Cloud Run שמפרסמת הודעות בנושא Pub/Sub
בקטע הזה פורסים פונקציית Cloud Run שמפרסמת הודעות בנושא Pub/Sub.
יצירה של פונקציית Cloud Run וציון ההגדרה שלה
המסוף
נכנסים לדף Cloud Run functions במסוף Google Cloud .
לוחצים על יצירת פונקציה.
בשדה Environment (סביבה), בוחרים באפשרות 1st gen (דור ראשון).
בשדה Function name, מזינים את שם הפונקציה:
pubsub-publisher.בשדה Trigger type (סוג הטריגר), בוחרים באפשרות HTTP.
בקטע אימות, בוחרים באפשרות הפעלת קריאות ללא אימות. האפשרות הזו מאפשרת למשתמשים לא מאומתים להפעיל פונקציית HTTP.
לוחצים על 'שמירה'.
לוחצים על הבא כדי לעבור לשלב קוד.
Terraform
מומלץ להשתמש במסוף בשלב הזה, כי אין דרך פשוטה לנהל את קוד המקור של הפונקציה מ-Terraform. Google Cloud
בדוגמה הזו מוסבר איך להעלות פונקציה של Cloud Run מקובץ ארכיון ZIP מקומי על ידי יצירת קטגוריה של Cloud Storage, אחסון הקובץ בקטגוריה הזו ואז שימוש בקובץ מהקטגוריה כמקור לפונקציה של Cloud Run. אם משתמשים בגישה הזו, Terraform לא מעדכן באופן אוטומטי את קוד המקור של הפונקציה, גם אם יוצרים קובץ ארכיון חדש. כדי להעלות מחדש את קוד הפונקציה, אפשר לשנות את שם הקובץ של הארכיון.
- מורידים את הקבצים
pubsub_publisher.pyו-requirements.txt. - בקובץ
pubsub_publisher.py, מחליפים את<PROJECT_ID>במזהה הפרויקט של הפרויקט. לדוגמה,example-project. - יוצרים ארכיון ZIP בשם
pubsub_function.zipעם הקובץpbusub_publisner.pyוהקובץrequirements.txt. - שומרים את קובץ ה-ZIP באחת מהספריות שבהן מאוחסן סקריפט Terraform.
- מוסיפים את הגדרות המשאבים הבאות לסקריפט Terraform ומחליפים את
<PROJECT_ID>במזהה הפרויקט של הפרויקט.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
ציון פרמטרים של קוד פונקציית Cloud Run
המסוף
בשלב Code (קוד), בשדה Runtime (סביבת זמן ריצה), בוחרים את סביבת זמן הריצה של השפה שבה הפונקציה משתמשת. בדוגמה הזו, בוחרים באפשרות Python 3.10.
בשדה Entry point מזינים את הערך
pubsub_publisher. זה הקוד שמופעל כשפונקציית Cloud Run פועלת. הערך של הדגל הזה צריך להיות שם של פונקציה או שם מלא של מחלקה שקיימים בקוד המקור.
Terraform
דלג על שלב זה. הפרמטרים של פונקציית Cloud Run כבר מוגדרים במשאב google_cloudfunctions_function.
העלאת הקוד של פונקציית Cloud Run
המסוף
בשדה Source code, בוחרים את האפשרות המתאימה לאספקת קוד המקור של הפונקציה. במדריך הזה, מוסיפים את קוד הפונקציה באמצעות העורך המוטבע של פונקציות Cloud Run. אפשרות נוספת היא להעלות קובץ ZIP או להשתמש ב-Cloud Source Repositories.
- מכניסים את הקוד לדוגמה הבא לקובץ main.py.
- מחליפים את
<PROJECT_ID>במזהה הפרויקט של הפרויקט. לדוגמה,example-project.
Terraform
דלג על שלב זה. הפרמטרים של פונקציית Cloud Run כבר מוגדרים במשאב google_cloudfunctions_function.
ציון התלות של פונקציית Cloud Run
המסוף
מציינים את יחסי התלות של הפונקציה בקובץ המטא-נתונים requirements.txt:
כשפורסים את הפונקציה, פונקציות Cloud Run מורידות ומתקינות את יחסי התלות שמוצהרים בקובץ requirements.txt, שורה אחת לכל חבילה.
הקובץ הזה צריך להיות באותה ספרייה שבה נמצא הקובץ main.py שמכיל את קוד הפונקציה. פרטים נוספים זמינים במאמר בנושא קובצי דרישות במסמכי התיעוד של pip.
Terraform
דלג על שלב זה. התלות של פונקציית Cloud Run מוגדרת בקובץ requirements.txt בארכיון pubsub_function.zip.
פריסת פונקציית Cloud Run
המסוף
לוחצים על פריסה. בסיום הפריסה, הפונקציה מופיעה עם סימן וי ירוק בדף Cloud Run functions במסוףGoogle Cloud .
מוודאים שלחשבון השירות שמריץ את הפונקציה של Cloud Run יש מספיק הרשאות בפרויקט כדי לגשת ל-Pub/Sub.
Terraform
מאתחלים את Terraform:
terraform initבודקים את ההגדרות ומוודאים שהמשאבים שמערכת Terraform תיצור או תעדכן תואמים לציפיות שלכם:
terraform planכדי לבדוק אם ההגדרות תקינות, מריצים את הפקודה הבאה:
terraform validateמריצים את הפקודה הבאה ומזינים yes בהודעה שמופיעה, כדי להחיל את הגדרות Terraform:
terraform apply
ממתינים עד שב-Terraform תוצג ההודעה "Apply complete!".
במסוף Google Cloud , נכנסים למשאבים בממשק המשתמש כדי לוודא שהם נוצרו או עודכנו ב-Terraform.
בדיקה של פונקציית Cloud Run
כדי לוודא שהפונקציה מפרסמת הודעה בנושא ב-Pub/Sub ושה-DAG לדוגמה פועל כמצופה:
בודקים שקבוצות ה-DAG פעילות:
נכנסים לדף Environments במסוף Google Cloud .
ברשימת הסביבות, לוחצים על שם הסביבה. הדף Environment details ייפתח.
עוברים לכרטיסייה DAGs.
בודקים את הערכים בעמודה State (מצב) עבור DAGs בשם
trigger_dagו-target_dag. שני ה-DAG צריכים להיות במצבActive.
שליחה של הודעת בדיקה ב-Pub/Sub. אפשר לעשות את זה ב-Cloud Shell:
נכנסים לדף Functions במסוף Google Cloud .
לוחצים על שם הפונקציה,
pubsub-publisher.עוברים לכרטיסייה בדיקה.
בקטע Configure triggering event (הגדרת אירוע הפעלה), מזינים את צמד המפתח-ערך הבא ב-JSON:
{"message": "target_dag"}. אל תשנו את זוג הערכים של המפתח, כי ההודעה הזו תפעיל את ה-DAG של הבדיקה בהמשך.בקטע Test Command (פקודת בדיקה), לוחצים על Test in Cloud Shell (בדיקה ב-Cloud Shell).
ב-Cloud Shell Terminal, מחכים עד שפקודה מופיעה באופן אוטומטי. מריצים את הפקודה על ידי לחיצה על
Enter.אם מופיעה ההודעה Authorize Cloud Shell, לוחצים על Authorize.
בודקים שהתוכן של ההודעה תואם להודעת Pub/Sub. בדוגמה הזו, הודעת הפלט חייבת להתחיל ב-
Message b'target_dag' with message_length 10 published toכתגובה מהפונקציה.
בודקים שהופעלה הפונקציה
target_dag:ממתינים לפחות דקה עד להשלמת הפעלת DAG חדשה של
trigger_dag.נכנסים לדף Environments במסוף Google Cloud .
ברשימת הסביבות, לוחצים על שם הסביבה. הדף Environment details ייפתח.
עוברים לכרטיסייה DAGs.
לוחצים על
trigger_dagכדי לעבור לדף פרטי DAG. בכרטיסייה Runs (הפעלות) מוצגת רשימה של הפעלות DAG עבור ה-DAGtrigger_dag.ה-DAG הזה פועל כל דקה ומעבד את כל ההודעות ב-Pub/Sub שנשלחות מהפונקציה. אם לא נשלחו הודעות, המשימה
trigger_targetמסומנת כ-Skippedביומני ההרצה של DAG. אם הופעלו DAG, המשימהtrigger_targetמסומנת כ-Success.מעיינים בכמה הרצות DAG אחרונות כדי לאתר הרצת DAG שבה כל שלוש המשימות (
subscribe_task,pull_messages_operatorו-trigger_target) נמצאות בסטטוסיםSuccess.חוזרים לכרטיסייה DAGs ובודקים שבעמודה Successful runs של ה-DAG
target_dagמופיע ריצה אחת מוצלחת.
סיכום
במדריך הזה למדתם איך להשתמש בפונקציות Cloud Run כדי לפרסם הודעות בנושא Pub/Sub, ואיך לפרוס DAG שנרשם לנושא Pub/Sub, שולף הודעות Pub/Sub ומפעיל DAG אחר שצוין במזהה ה-DAG של נתוני ההודעה.
יש גם דרכים חלופיות ליצירה ולניהול של מינויים ל-Pub/Sub ולהפעלת DAG שלא נכללות במדריך הזה. לדוגמה, אתם יכולים להשתמש בפונקציות Cloud Run כדי להפעיל DAG של Airflow כשמתרחש אירוע ספציפי. כדאי לעיין במדריכים שלנו כדי לנסות את התכונות האחרות שלGoogle Cloud .
הסרת המשאבים
כדי להימנע מחיובים בחשבון Google Cloud על המשאבים שבהם השתמשתם במדריך הזה, אתם יכולים למחוק את הפרויקט שמכיל את המשאבים או להשאיר את הפרויקט ולמחוק את המשאבים הספציפיים.
מחיקת הפרויקט
כדי למחוק Google Cloud פרויקט:
gcloud projects delete PROJECT_ID
מחיקת משאבים בודדים
אם אתם מתכננים להיעזר בכמה מדריכי לימוד או מדריכים למתחילים, מומלץ להשתמש שוב באותו פרויקט כדי לא לחרוג ממכסות הפרויקטים.
המסוף
- מחיקת סביבת Managed Airflow במהלך התהליך הזה, נמחק גם דלי האחסון של הסביבה.
- מחיקת נושא Pub/Sub,
dag-topic-trigger. מחיקת פונקציית Cloud Run.
במסוף Google Cloud , נכנסים לדף Cloud Run functions.
לוחצים על תיבת הסימון של הפונקציה שרוצים למחוק,
pubsub-publisher.לוחצים על מחיקה ופועלים לפי ההוראות.
Terraform
- מוודאים שסקריפט Terraform לא מכיל רשומות של משאבים שעדיין נדרשים לפרויקט. לדוגמה, יכול להיות שתרצו להשאיר חלק מממשקי ה-API מופעלים ואת הרשאות ה-IAM עדיין מוקצות (אם הוספתם הגדרות כאלה לסקריפט Terraform).
- מריצים את
terraform destroy. - מוחקים ידנית את הקטגוריה של הסביבה. Managed Airflow לא מוחק אותו באופן אוטומטי. אפשר לעשות את זה דרך מסוף Google Cloud או Google Cloud CLI.
המאמרים הבאים
- בדיקת DAG
- בדיקת פונקציות HTTP
- פריסת פונקציה ב-Cloud Run
- כדאי לנסות בעצמכם תכונות אחרות של Google Cloud . מומלץ לעיין במדריכים שלנו.