Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)
בדף הזה מוסבר איך להשתמש ב-Managed Airflow (דור 2) כדי להריץ עומסי עבודה של Managed Service for Apache Spark ב-Google Cloud.
בדוגמאות שבקטעים הבאים מוסבר איך להשתמש באופרטורים לניהול עומסי עבודה של עיבוד ברצף (batch) ב-Managed Service for Apache Spark. משתמשים באופרטורים האלה ב-DAG שיוצרים, מוחקים, מפרטים ומקבלים עומס עבודה של עיבוד ברצף (batch processing) ב-Managed Service for Apache Spark:
יצירת DAGs עבור אופרטורים שפועלים עם עומסי עבודה של עיבוד ברצף (batch processing) ב-Managed Service for Apache Spark:
ליצור DAGs שמשתמשים בקונטיינרים בהתאמה אישית וב-Dataproc Metastore.
מגדירים שרת היסטוריה מתמשך עבור קבוצות ה-DAG האלה.
לפני שמתחילים
מפעילים את Dataproc API:
המסוף
מפעילים את Managed Service for Apache Spark API.
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud
מפעילים את Managed Service for Apache Spark API:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud services enable dataproc.googleapis.com
בוחרים את המיקום של קובץ עומס העבודה של האצווה. אפשר להשתמש באחת מהאפשרויות הבאות:
- יוצרים קטגוריה של Cloud Storage שבה הקובץ יישמר.
- שימוש בקטגוריה של הסביבה. מכיוון שלא צריך לסנכרן את הקובץ הזה עם Airflow, אפשר ליצור תיקיית משנה נפרדת מחוץ לתיקיות
/dagsאו/data. לדוגמה,/batches. - שימוש בקטגוריה קיימת.
הגדרת קבצים ומשתני Airflow
בקטע הזה נדגים איך להגדיר קבצים ומשתני Airflow לצורך המדריך הזה.
העלאת קובץ של עומס עבודה של ML ב-Managed Service for Apache Spark לקטגוריה
עומס העבודה במדריך הזה מפעיל סקריפט pyspark:
שומרים סקריפט pyspark בקובץ מקומי בשם
spark-job.py. לדוגמה, אפשר להשתמש בסקריפט לדוגמה של pyspark.מעלים את הקובץ למיקום שבחרתם בקטע לפני שמתחילים.
הגדרת משתני Airflow
בדוגמאות שבקטעים הבאים נעשה שימוש במשתני Airflow. אתם מגדירים ערכים למשתנים האלה ב-Airflow, ואז קוד ה-DAG יכול לגשת לערכים האלה.
בדוגמאות במדריך הזה נעשה שימוש במשתני Airflow הבאים. אפשר להגדיר אותם לפי הצורך, בהתאם לדוגמה שבה משתמשים.
מגדירים את משתני Airflow הבאים לשימוש בקוד ה-DAG:
-
project_id: מזהה הפרויקט. -
bucket_name: ה-URI של מאגר שבו נמצא קובץ ה-Python הראשי של עומס העבודה (spark-job.py). בחרתם את המיקום הזה בקטע לפני שמתחילים. -
phs_cluster: שם אשכול השרתים של Persistent History Server. מגדירים את המשתנה הזה כשיוצרים שרת היסטוריה קבוע. -
image_name: השם והתג של קובץ האימג' של הקונטיינר המותאם אישית (image:tag). מגדירים את המשתנה הזה כשמשתמשים בקובץ אימג' של קונטיינר מותאם אישית עם DataprocCreateBatchOperator. -
metastore_cluster: שם השירות של Dataproc Metastore. מגדירים את המשתנה הזה כשמשתמשים בשירות Dataproc Metastore עם DataprocCreateBatchOperator. -
region_name: האזור שבו נמצא שירות Dataproc Metastore. מגדירים את המשתנה הזה כשמשתמשים בשירות Dataproc Metastore עם DataprocCreateBatchOperator.
שימוש במסוף ובממשק המשתמש של Airflow כדי להגדיר כל משתנה של Airflow Google Cloud
במסוף Google Cloud , עוברים לדף Environments.
ברשימת הסביבות, לוחצים על הקישור Airflow של הסביבה. ממשק המשתמש של Airflow נפתח.
בממשק המשתמש של Airflow, בוחרים באפשרות Admin (ניהול) > Variables (משתנים).
לוחצים על הוספת רשומה חדשה.
מציינים את שם המשתנה בשדה Key ומגדירים את הערך שלו בשדה Val.
לוחצים על Save.
יצירת שרת היסטוריה מתמשך
כדי לראות את קובצי ההיסטוריה של Spark של עומסי העבודה של אצווה, אפשר להשתמש בשרת היסטוריה מתמשך (PHS):
- יצירת שרת היסטוריה מתמשך.
- מוודאים שציינתם את השם של אשכול PHS ב
phs_clusterמשתנה Airflow.
DataprocCreateBatchOperator
ה-DAG הבא מתחיל עומס עבודה של עיבוד ברצף (batch) ב-Managed Service for Apache Spark.
למידע נוסף על ארגומנטים של DataprocCreateBatchOperator, אפשר לעיין בקוד המקור של האופרטור.
מידע נוסף על מאפיינים שאפשר להעביר בפרמטר batch
של DataprocCreateBatchOperator זמין בתיאור של המחלקה Batch.
שימוש בקובץ אימג' מותאם אישית של קונטיינר עם DataprocCreateBatchOperator
בדוגמה הבאה אפשר לראות איך משתמשים בקובץ אימג' של קונטיינר מותאם אישית כדי להריץ את עומסי העבודה. אפשר להשתמש בקונטיינר בהתאמה אישית, למשל, כדי להוסיף תלות ב-Python שלא מסופקת על ידי קובץ האימג' של הקונטיינר שמוגדר כברירת מחדל.
כדי להשתמש בקובץ אימג' של קונטיינר מותאם אישית:
יוצרים קובץ אימג' של קונטיינר בהתאמה אישית ומעלים אותו ל-Container Registry.
מציינים את התמונה ב
image_nameמשתנה Airflow.שימוש ב-DataprocCreateBatchOperator עם קובץ אימג' בהתאמה אישית:
שימוש בשירות Dataproc Metastore עם DataprocCreateBatchOperator
כדי להשתמש בשירות Dataproc Metastore מ-DAG:
בודקים ששירות המאגר של המטא-נתונים כבר הופעל.
במאמר הפעלה והשבתה של Dataproc Metastore מוסבר איך להפעיל שירות של metastore.
מידע מפורט על אופרטור Batch ליצירת ההגדרה זמין במאמר PeripheralsConfig.
אחרי ששירות המטא-חנות מופעל ופועל, מציינים את השם שלו במשתנה
metastore_clusterואת האזור שלו בregion_nameמשתנה Airflow.שימוש בשירות של מאגר המטא-נתונים ב-DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
אפשר להשתמש ב-DataprocDeleteBatchOperator כדי למחוק אצווה על סמך מזהה האצווה של עומס העבודה.
DataprocListBatchesOperator
DataprocDeleteBatchOperator מציג רשימה של חבילות שקיימות בתוך project_id ואזור נתונים נתונים.
DataprocGetBatchOperator
DataprocGetBatchOperator מאחזר עומס עבודה ספציפי באצווה.