במדריך הזה מוסבר איך ליצור ולפרוס פייפליין לתזמור בתוסף Google Cloud Data Agent Kit ל-Visual Studio Code.
צינור הנתונים לדוגמה מריץ סקריפט PySpark ב-Managed Service for Apache Spark.
אפשר לפרוס צינורות של תזמור מ-VS Code כגרסאות מקומיות או באמצעות פעולת GitHub, למשל כשממזגים שינויים עם הענף main. במאמר הזה נדגים איך פורסים את הגרסה המקומית של צינור לניהול תהליכים.
לפני שמתחילים
לפני שמתחילים, צריך לבצע את הפעולות הבאות:
- התקנת התוסף Data Agent Kit ל-VS Code
- קובעים את ההגדרות.
- מוסיפים מאגר מ-GitHub לסביבת העבודה של VS Code כדי לאחסן צינורות (pipelines) של תזמור ונכסים כמו סקריפטים.
בדיקת התפקידים הנדרשים ב-IAM
כדי לקבל את ההרשאות ליצירת משאבים בפרויקט, לפריסה ולהרצה של צינורות להפעלת תהליכים, צריך לבקש מהאדמין להקצות לכם את התפקידים הנדרשים.
כדי ליצור ולנהל סביבות של Managed Service for Apache Airflow ולנהל אובייקטים בדליים המשויכים, צריך את התפקידים הבאים. מידע נוסף על תפקידי המשתמשים האלה זמין במאמר הענקת תפקידים למשתמשים במסמכי התיעוד של Managed Service for Apache Airflow.
- אדמין של סביבה ואובייקט אחסון (composer.environmentAndStorageObjectAdmin)
- משתמש בחשבון שירות
(
iam.serviceAccountUser)
כדי לעבוד עם משאבים של BigQuery ו-Cloud Storage, אתם צריכים את התפקידים הבאים.
- עריכה של נתוני BigQuery (
roles/bigquery.dataEditor) - אדמין של אובייקטים באחסון (
roles/storage.objectAdmin)
יכול להיות שתצטרכו תפקידים נוספים מעבר לתפקידים שמאפשרים לכם להשתמש בתוסף ולעבוד עם צינורות של תזמור, בהתאם למשאבים שאתם מתכננים לגשת אליהם.
יצירת חשבון שירות והקצאת תפקידי IAM לחשבון
משתמשים בחשבון שירות ייחודי לסביבת Managed Airflow Gen 3. חשבון השירות יוצר סביבת Managed Airflow מדור 3 ומריץ את כל צינורות עיבוד הנתונים לפריסה שאתם פורסים.
צריך לבקש מהאדמין לבצע את הפעולות הבאות:
- יוצרים חשבון שירות כמו שמתואר במסמכי ה-IAM.
- מקצים לחשבון השירות את התפקיד Composer Worker (
composer.worker). ברוב המקרים, התפקיד הזה מספק את ההרשאות הנדרשות.
מומלץ, אם אתם צריכים לגשת למשאבים אחרים בGoogle Cloud פרויקט, להעניק הרשאות נוספות לחשבון השירות הזה רק כשזה נחוץ לפעולה של צינור עיבוד הנתונים לניהול התהליך.
יצירת Google Cloud משאבים לצינור עיבוד הנתונים של התזמור
בשלב הזה יוצרים Google Cloud משאבים לצינור האורקסטרציה.
יצירת סביבת Managed Airflow מדור 3
יוצרים סביבת Managed Airflow מדור 3 עם ההגדרות הבאות:
- שם הסביבה: מזינים שם שישמש בהמשך להגדרת צינור הנתונים של תזמור. לדוגמה,
example-pipeline-scheduler. - מיקום: בוחרים מיקום. מומלץ ליצור את כל המשאבים במדריך הזה באותו מיקום. לדוגמה,
us-central1. - חשבון שירות: בוחרים את חשבון השירות שיצרתם עבור הסביבה הזו.
בדוגמה הבאה של פקודת Google Cloud CLI אפשר לראות את התחביר:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
הוספת פרמטרים של סביבה להגדרת מתזמן הפעולות
צריך לספק פרטי חיבור לסביבת Managed Airflow שבה יופעל צינור הנתונים של תזמור התהליכים.
מוסיפים את פרמטרי ההגדרה של הסביבה שיצרתם באמצעות הכלי לעריכת ההגדרות של Google Cloud Data Agent Kit:
- בסרגל הפעילות, לוחצים על הסמל Google Cloud Data Agent Kit.
- מרחיבים את הגדרות ולוחצים על הגדרות.
- לוחצים על כלי התזמון.
- מזינים את הפרמטרים של סביבת Managed Airflow Gen 3 שיצרתם קודם:
- מזהה הפרויקט: שם הפרויקט שבו נמצאת הסביבה.
דוגמה:
example-project. - אזור: האזור שבו נמצאת הסביבה. דוגמה:
us-central1. - סביבה: שם הסביבה. דוגמה:
example-pipeline-scheduler.
- מזהה הפרויקט: שם הפרויקט שבו נמצאת הסביבה.
דוגמה:
- לוחצים על Save.
יצירת קטגוריית אחסון לארטיפקטים של צינור עיבוד הנתונים
יוצרים קטגוריה של Cloud Storage באותו פרויקט שבו נמצא סביבת Managed Airflow, ונותנים לה שם דומה ל-example-pipelines-bucket. הבאקט הזה נדרש לאחסון המשימה שלכם ב-Managed Service for Apache Spark.
חלק מהפעולות בצינור עיבוד הנתונים, כמו פלט התוצאות לקטגוריה של Cloud Storage.
יצירת מערך נתונים וטבלה חדשים ב-BigQuery
במדריך הזה מוצג צינור להעברת נתונים שכותב נתונים לטבלה ב-BigQuery. יוצרים את המשאבים הבאים ב-BigQuery בפרויקט:
- יוצרים קבוצת נתונים חדשה בשם
wordcount_dataset. - יוצרים טבלה חדשה ב-BigQuery בשם
wordcount_output.
הוספת נכסי צינור
במדריך הזה מוצגת משימה נפוצה בהנדסת נתונים (ETL: חילוץ, טרנספורמציה, טעינה) באמצעות PySpark. המשימה כוללת קריאה מ-BigQuery, טרנספורמציה של הנתונים (ספירת מילים) וטעינה שלהם בחזרה ל-BigQuery.
לא אג'נטי
מוסיפים את הקובץ הבא לתיקייה /scripts במאגר. בהמשך מוסיפים פעולת צינור שמעבדת את הסקריפט הזה ב-Managed Service for Apache Spark.
קובץ wordcount.py לדוגמה:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
מחליפים את מה שכתוב בשדות הבאים:
- ARTIFACTS_BUCKET_NAME: השם של קטגוריית Cloud Storage שיצרתם קודם. דוגמה:
example-pipelines-bucket. - PROJECT_ID: שם הפרויקט שבו נמצאת הסביבה. דוגמה:
example-project.
פעולות סוכן
מנחים את הסוכן ליצור סקריפט PySpark לדוגמה בתיקייה /scripts במאגר. בהמשך תוסיפו פעולה של צינור נתונים שמריצה את הסקריפט הזה ב-Managed Service for Apache Spark.
מזינים הנחיה שדומה להנחיה הבאה:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
אתחול צינורות של תזמור במאגר
כשמפעילים צינורות לניהול תהליכים, התוסף Data Agent Kit ל-VS Code יוצר מסגרת שכוללת את הרכיבים הבאים:
- קובץ YAML של צינור לניהול תהליכים: דוגמה להגדרת צינור שמכילה לוח זמנים אבל לא פעולות מוגדרות.
-
deployment.yaml: הגדרת פריסה לדוגמה של צינור עיבוד נתונים שמגדירה איך צריך לפרוס את צינור עיבוד הנתונים. בקובץ הזה מפורטת התצורה הנדרשת לסביבת Managed Airflow, לדלי של הארטיפקטים ולכל משאב אחר שמשמש את פעולות צינור הנתונים. -
.github/workflows/deploy.yaml: מגדיר פעולה ב-GitHub שמבצעת פריסה של צינור הנתונים כשממזגים שינויים עם ההסתעפותmainבמאגר ב-GitHub. -
.github/workflows/validate.yaml: מגדיר פעולת GitHub שמאמתת את צינור הנתונים אחרי הפריסה שלו.
בשלבים הבאים של המסמך הזה, נרחיב את ההגדרות האלה באמצעות התוסף Data Agent Kit ל-VS Code כדי ליצור ולפרוס צינור לניהול תהליכים באופן מקומי.
לא אג'נטי
כדי לאתחל צינורות להעברת נתונים:
- בסרגל הפעילות, לוחצים על הסמל Google Cloud Data Agent Kit.
- מרחיבים את Data Engineering ולוחצים על Initialize orchestration pipeline.
- מזינים פרמטרים לצינור התזמור החדש:
- מזהה פייפליין: מזינים את מזהה הפייפליין. דוגמה:
example-pipeline. - מזהה פרויקט בענן של Google: שם הפרויקט שבו נמצאת הסביבה. דוגמה:
example-project. - אזור: האזור שבו נמצאת הסביבה שלכם. דוגמה:
us-central1. - מזהה הסביבה: השם של הסביבה שרוצים לפתח איתה.
דוגמה:
dev/staging. Scheduler Managed Service for Apache Airflow Environment: השם של הסביבה שבה רוצים לתזמר את צינורות העיבוד. בפרמטר הזה צריך לציין את אותה סביבה שצוינה במסמך.
Artifacts Bucket (קטגוריית Artifacts): שם הקטגוריה שמשמשת ל-Artifacts של צינור, ללא הקידומת
gs://. דוגמה:example-pipelines-bucket.לוחצים על הבא.
לוחצים על Initialize (הפעלה).
מציינים את סביבת העבודה שבה רוצים לאתחל את צינור הנתונים.
פעולות סוכן
מבקשים מהסוכן ליצור תשתית לצינורות של תזמור במאגר.
מזינים הנחיה שדומה להנחיה הבאה:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
אחרי שמפעילים צינורות במאגר, אי אפשר להפעיל אותם שוב כי הפיגומים החדשים יחליפו את שינויי ההגדרות שביצעתם. כדי להוסיף צינורות חדשים, יוצרים קובצי הגדרה חדשים של צינורות בפרויקט ומוסיפים אותם להגדרת הפריסה.
הוספת משימה חדשה לצינור
מכיוון שאין פעולות בהגדרת צינור הנתונים הראשונית, מוסיפים פעולה שמריצה את סקריפט PySpark.
לא אג'נטי
כדי לערוך צינור:
- בסרגל הפעילות, לוחצים על הסמל Google Cloud Data Agent Kit.
- מרחיבים את Data Engineering ואז את Orchestration Pipelines.
- בוחרים באפשרות
example-pipeline.yaml. ייפתח עורך צינורות הנתונים של צינור הנתונים שנבחר. - אופציונלי: בוחרים את הצומת Schedule trigger (הפעלת תזמון). אפשר לשנות את לוח הזמנים של הצינור על ידי ציון ביטוי בסגנון cron וזמני התחלה וסיום של לוח הזמנים. התזמון שמוגדר כברירת מחדל לצינור עיבוד הנתונים החדש הוא
0 2 * * *, והוא פועל כל יום בשעה 2:00.
מוסיפים משימה חדשה. במדריך הזה מוסיפים משימת PySpark שמריצה סקריפט PySpark שהוספתם קודם:
- לוחצים על הוספת המשימה הראשונה כדי להוסיף צומת משימה חדש.
- בוחרים באפשרות Execute PySpark script (הפעלת סקריפט PySpark) ואת קובץ
script/wordcount.py.
החלונית Execute PySpark script תיפתח.
- ב-Spark Cluster Mode, בוחרים באפשרות Serverless Spark.
- בקטע מיקום, מציינים את המיקום שבו נמצאת הסביבה.
דוגמה:
us-central1. - לוחצים על Save.
פעולות סוכן
מריצים את ההנחיה הבאה:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
פריסת הגרסה המקומית של צינור עיבוד הנתונים
מפעילים את הגרסה המקומית של צינור העברת הנתונים כדי לוודא שהיא מוגדרת בצורה נכונה.
כשפורסים גרסה מקומית של צינור הנתונים לניהול, התוסף Data Agent Kit ל-VS Code מעלה גרסה מקומית של חבילת צינור הנתונים לסביבת Managed Airflow ומריץ אותה. פריסה מקומית מיועדת לשימוש בסביבת פיתוח.
פקודת הפריסה פורסת לוח זמנים שלא מושהה. כדי למנוע את זה, אפשר להשהות את התזמון באופן ידני בחלונית Pipelines Management (ניהול צינורות). אפשר גם לערוך את קובץ ה-YAML של צינור העיבוד כדי להוסיף הערה לבלוק triggers: - schedule או להסיר אותו.
לא אג'נטי
כדי לפרוס גרסה מקומית של צינור הנתונים לדוגמה לניהול תזמור:
- בסרגל הפעילות, לוחצים על הסמל Google Cloud Data Agent Kit.
- מרחיבים את Data Engineering (הנדסת נתונים) ואז את Orchestration Pipelines (צינורות תזמור).
- בוחרים באפשרות
example-pipeline.yaml. ייפתח עורך צינורות הנתונים של צינור הנתונים שנבחר. - בוחרים באפשרות Run pipeline (הפעלת צינור) ואז בוחרים את סביבת הפיתוח או סביבת הביניים שיצרתם קודם.
פעולות סוכן
מריצים את ההנחיה הבאה:
Deploy my pipeline
מעקב אחר הפעלת צינור עיבוד הנתונים ובדיקת יומני ההפעלה
אחרי שפורסים את צינור עיבוד הנתונים, אפשר לראות את המידע המפורט, את ההיסטוריה של הרצות צינור עיבוד הנתונים ואת יומני ההפעלה של צינור עיבוד הנתונים:
- בסרגל הפעילות, לוחצים על הסמל Google Cloud Data Agent Kit.
- מרחיבים את Data Engineering ובוחרים באפשרות Pipelines management.
- לוחצים על השם של צינור העיבוד (
example-pipeline) כדי לראות את היסטור ההרצה שלו. ברשימת ההרצות של תאריך מסוים, אפשר לראות הרצות נפרדות של צינורות ופירוט של פעולות נפרדות בכל הרצה של צינור. - לוחצים על מזהה המשימה כדי לראות את יומני הביצוע של המשימה. מכיוון שתסריט PySpark לדוגמה הופעל ב-Managed Service for Apache Spark, ביומני המשימות יהיה קישור ליומני העיבוד ברצף.
פתרון בעיות בצינורות עיבוד נתונים
אם הערוץ נכשל, מופיע לחצן Diagnose בחלונית Pipelines management.
פעולות סוכן
כשלוחצים על הלחצן אבחון, הסוכן יוצר הנחיה לפתרון בעיות בצינור. ההנחיה מועתקת ללוח או נפתחת בשיחה חדשה.
הסוכן משתמש במיומנויות מיוחדות כדי לפתור בעיות בצינורות עיבוד נתונים. הוא מתמקד באיסוף יומנים, בבדיקה צולבת של הקוד שנפרס וסביבת העבודה, וביצירת ניתוח של הגורם הבסיסי (RCA).
אלה השלבים הבאים שאפשר לבצע אחרי קבלת ניתוח שורש הבעיה:
- החלת ניתוח שורש הבעיה בסביבת העבודה הנוכחית.
- מבקשים מהסוכן ליצור ענף חדש וליישם בו את השינויים.
- פותחים כרטיס תמיכה ב-Cloud Customer Care עם פרטי הניתוח של שורש הבעיה.
לקבלת עזרה בפתרון בעיות שקשורות לתוסף, אפשר לעיין במאמר בנושא פתרון בעיות.