Managed Airflow (דור 3) | Managed Airflow (דור 2) | Managed Airflow (דור 1 מדור קודם)
במדריך הזה נסביר איך להשתמש ב-Managed Airflow כדי ליצור Apache Airflow DAG. ה-DAG מצטרף לנתונים ממערך נתונים ציבורי של BigQuery וקובץ CSV שמאוחסן בקטגוריה של Cloud Storage, ואז מריץ משימת אצווה של Managed Service for Apache Spark כדי לעבד את הנתונים המצורפים.
מערך הנתונים הציבורי של BigQuery במדריך הזה הוא ghcn_d, מסד נתונים משולב של סיכומי אקלים ברחבי העולם. קובץ ה-CSV מכיל מידע על התאריכים והשמות של חגים בארה"ב משנת 1997 עד 2021.
השאלה שאנחנו רוצים לענות עליה באמצעות ה-DAG היא: "מה הייתה הטמפרטורה בשיקגו בחג ההודיה ב-25 השנים האחרונות?"
מטרות
- יצירת סביבת Managed Airflow בהגדרת ברירת המחדל
- יצירת מערך נתונים ריק ב-BigQuery
- יצירת קטגוריה של Cloud Storage
- יוצרים ומריצים DAG שכולל את המשימות הבאות:
- טעינת מערך נתונים חיצוני מ-Cloud Storage ל-BigQuery
- צירוף של שני מערכי נתונים ב-BigQuery
- הרצת משימת ניתוח נתונים ב-PySpark
לפני שמתחילים
הפעלת ממשקי ה-API
מפעילים את ממשקי ה-API הבאים:
המסוף
מפעילים את ממשקי ה-API של Managed Service for Apache Spark, Managed Airflow, BigQuery ו-Cloud Storage.
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים
gcloud
מפעילים את ממשקי ה-API של Managed Service for Apache Spark, Managed Airflow, BigQuery ו-Cloud Storage:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
מתן הרשאות
מקצים לחשבון המשתמש את התפקידים וההרשאות הבאים:
הקצאת תפקידים לניהול סביבות וקטגוריות של סביבות ב-Managed Airflow.
נותנים את התפקיד BigQuery Data Owner (
roles/bigquery.dataOwner) כדי ליצור מערך נתונים ב-BigQuery.מקצים את התפקיד Storage Admin (
roles/storage.admin) כדי ליצור קטגוריה של Cloud Storage.
יצירה והכנה של סביבת Managed Airflow
יצירת סביבת Managed Airflow עם פרמטרים שמוגדרים כברירת מחדל:
- בוחרים אזור בארה"ב.
- בוחרים בגרסה העדכנית ביותר של Managed Airflow.
כדי שעובדי Airflow יוכלו להריץ בהצלחה משימות DAG, צריך להקצות לחשבון השירות שבו משתמשים בסביבת Airflow המנוהלת את התפקידים הבאים:
- BigQuery User (
roles/bigquery.user) - בעלים של נתונים ב-BigQuery (
roles/bigquery.dataOwner) - משתמש בחשבון שירות (
roles/iam.serviceAccountUser) - Dataproc Editor (
roles/dataproc.editor) - Dataproc Worker (
roles/dataproc.worker)
- BigQuery User (
יצירת משאבים קשורים
יוצרים מערך נתונים ריק ב-BigQuery עם הפרמטרים הבאים:
- Name (שם):
holiday_weather - אזור:
US
- Name (שם):
יוצרים קטגוריה חדשה של Cloud Storage
USבמספר אזורים.מריצים את הפקודה הבאה כדי להפעיל גישה פרטית ל-Google ברשת המשנה שמוגדרת כברירת מחדל באזור שבו רוצים להריץ את Managed Service for Apache Spark כדי לעמוד בדרישות הרשת. מומלץ להשתמש באותו אזור שבו נמצא סביבת Managed Airflow.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
עיבוד נתונים באמצעות Managed Service for Apache Spark
דוגמה לעבודת PySpark
הקוד שמוצג בהמשך הוא דוגמה לעבודת PySpark שממירה טמפרטורה ממעלות צלזיוס חלקי 10 למעלות צלזיוס. העבודה הזו ממירה נתוני טמפרטורה ממערך הנתונים לפורמט אחר.
העלאת קבצים תומכים ל-Cloud Storage
כדי להעלות את קובץ PySpark ואת מערך הנתונים שמאוחסן ב-holidays.csv:
שומרים את הקובץ data_analytics_process.py במחשב המקומי.
שומרים את הקובץ holidays.csv במחשב המקומי.
במסוף Google Cloud , עוברים לדף Cloud Storage browser:
לוחצים על שם הקטגוריה שיצרתם קודם.
בכרטיסייה Objects של הדלי, לוחצים על הלחצן Upload files, בוחרים את הקבצים
data_analytics_process.pyו-holidays.csvבתיבת הדו-שיח שמופיעה ולוחצים על Open.
DAG של ניתוח נתונים
עיון בדוגמה ל-DAG
ה-DAG משתמש בכמה אופרטורים כדי לשנות את הנתונים ולאחד אותם:
GCSToBigQueryOperatorקורא את הקובץ holidays.csv מ-Cloud Storage ומעביר אותו לטבלה חדשה במערך הנתוניםholidays_weatherב-BigQuery שיצרתם קודם.הפקודה
DataprocCreateBatchOperatorיוצרת ומריצה משימת PySpark באצווה באמצעות Managed Service for Apache Spark.השאילתה
BigQueryInsertJobOperatorמצטרפת לנתונים מ-holidays.csv בעמודה Date עם נתוני מזג אוויר ממערך הנתונים הציבורי של BigQuery ghcn_d. המשימותBigQueryInsertJobOperatorנוצרות באופן דינמי באמצעות לולאת for, והמשימות האלה נמצאות ב-TaskGroupכדי לשפר את הקריאות בתצוגת הגרף של ממשק המשתמש של Airflow.
שימוש בממשק המשתמש של Airflow כדי להוסיף משתנים
ב-Airflow, variables הן דרך אוניברסלית לאחסון ולאחזור של הגדרות או קביעות שרירותיות בתור מאגר פשוט של זוגות של מפתח וערך. ה-DAG הזה משתמש במשתני Airflow כדי לאחסן ערכים נפוצים. כדי להוסיף אותם לסביבה שלכם:
עוברים אל ניהול > משתנים.
מוסיפים את המשתנים הבאים:
gcp_project: מזהה הפרויקט.
gcs_bucket: השם של הקטגוריה שיצרתם קודם (בלי הקידומתgs://).
gce_region: האזור שבו רוצים להריץ את העבודה ב-Managed Service for Apache Spark שעומדת בדרישות הרשת של Managed Service for Apache Spark. זהו האזור שבו הפעלתם גישה פרטית ל-Google קודם לכן.
dataproc_service_account: חשבון השירות של סביבת Managed Airflow. אפשר למצוא את חשבון השירות הזה בכרטיסייה Environment configuration בסביבת Managed Airflow.
העלאת ה-DAG לקטגוריה של הסביבה
Managed Airflow מתזמן DAG שנמצאים בתיקייה /dags בדלי של הסביבה. כדי להעלות את ה-DAG באמצעותGoogle Cloud המסוף:
במחשב המקומי, שומרים את הקובץ data_analytics_dag.py.
במסוף Google Cloud , עוברים לדף Environments.
ברשימת הסביבות, בעמודה DAG folder (תיקיית DAG), לוחצים על הקישור DAGs. תיפתח תיקיית ה-DAG של הסביבה.
לוחצים על Upload files.
בוחרים באפשרות
data_analytics_dag.pyבמחשב המקומי ולוחצים על פתיחה.
הפעלת ה-DAG
בסביבת Managed Airflow, לוחצים על הכרטיסייה DAGs.
לוחצים על מזהה ה-DAG
data_analytics_dag.לוחצים על Trigger DAG (הפעלת DAG).
מחכים כחמש עד עשר דקות עד שרואים סימן וי ירוק שמציין שהמשימות הושלמו בהצלחה.
אימות ההצלחה של ה-DAG
במסוף Google Cloud , עוברים לדף BigQuery.
בחלונית Explorer, לוחצים על שם הפרויקט.
לחץ על
holidays_weather_joined.לוחצים על 'תצוגה מקדימה' כדי לראות את הטבלה שנוצרה. שימו לב שהמספרים בעמודת הערך הם עשיריות של מעלות צלזיוס.
לחץ על
holidays_weather_normalized.לוחצים על 'תצוגה מקדימה' כדי לראות את הטבלה שנוצרה. שימו לב שהמספרים בעמודת הערך הם במעלות צלזיוס.
מידע מעמיק עם Managed Service for Apache Spark (אופציונלי)
אפשר לנסות גרסה מתקדמת של ה-DAG הזה עם תהליך מורכב יותר של עיבוד נתונים ב-PySpark. אפשר לעיין בתוסף Managed Service for Apache Spark לדוגמה של ניתוח נתונים ב-GitHub.
הסרת המשאבים
מוחקים משאבים ספציפיים שיצרתם במדריך הזה:
מוחקים את הקטגוריה של Cloud Storage שיצרתם לצורך המדריך הזה.
מחיקת סביבת Managed Airflow, כולל מחיקה ידנית של קטגוריית הסביבה.