סקירה כללית
אפשר להשתמש בצינורות עיבוד נתונים של Dataflow למשימות הבאות:
- יצירת לוחות זמנים חוזרים למשימות.
- הבנה של המקומות שבהם המשאבים מנוצלים במהלך ביצוע של כמה משימות.
- הגדרת יעדים לעדכניות הנתונים וניהול שלהם.
- התעמקות בנתונים בשלבים נפרדים של צינורות המכירה כדי לתקן ולבצע אופטימיזציה של צינורות המכירה.
למידע נוסף, אפשר לעיין במאמרי העזרה של ה-API בנושא Data Pipelines.
תכונות
- יוצרים צינור עיבוד נתונים חוזר של משימות אצווה כדי להריץ משימת אצווה לפי לוח זמנים.
- יוצרים צינור נתונים מצטבר חוזר כדי להריץ משימה באצווה על הגרסה העדכנית של נתוני הקלט.
- אפשר להשתמש בכרטיס המידע של סיכום צינור עיבוד הנתונים כדי לראות את השימוש המצטבר בקיבולת ואת צריכת המשאבים של צינור עיבוד הנתונים.
- צפייה בעדכניות הנתונים של צינור סטרימינג. המדד הזה משתנה לאורך זמן, ואפשר לקשר אותו להתראה שתשלח לכם אם רמת הרעננות תהיה נמוכה יותר מהיעד שהגדרתם.
- אפשר להשתמש בתרשימים של מדדי צינורות עיבוד נתונים כדי להשוות בין משימות של צינורות עיבוד נתונים באצווה ולמצוא אנומליות.
מגבלות
זמינות אזורית: אפשר ליצור צינורות להעברת נתונים באזורים הזמינים ב-Cloud Scheduler.
מכסה:
- מספר צינורות ברירת המחדל לכל פרויקט: 500
מספר ברירת המחדל של צינורות עיבוד נתונים לכל ארגון: 2,500
המכסה ברמת הארגון מושבתת כברירת מחדל. אתם יכולים להצטרף למכסות ברמת הארגון, ואם תעשו זאת, כל ארגון יוכל להשתמש לכל היותר ב-2,500 צינורות כברירת מחדל.
תוויות: אי אפשר להשתמש בתוויות שהוגדרו על ידי המשתמש כדי לתייג צינורות עיבוד נתונים ב-Dataflow. עם זאת, כשמשתמשים בשדה
additionalUserLabels, הערכים האלה מועברים למשימת Dataflow. מידע נוסף על האופן שבו התוויות חלות על משימות Dataflow ספציפיות זמין במאמר בנושא אפשרויות של צינורות.
סוגים של צינורות נתונים
ב-Dataflow יש שני סוגים של פייפליינים: סטרימינג ואצווה. שני הסוגים של צינורות עיבוד נתונים מריצים משימות שמוגדרות בתבניות של Dataflow.
- פייפליין נתונים בסטרימינג
- פייפליין נתונים בסטרימינג מפעיל משימת סטרימינג של Dataflow מיד אחרי שהוא נוצר.
- פייפליין נתונים באצווה
פייפליין נתונים באצווה מריץ עבודת אצווה של Dataflow לפי לוח זמנים שהמשתמש מגדיר. אפשר להגדיר פרמטרים לשם הקובץ של קלט צינור עיבוד הנתונים של קבוצת הקבצים כדי לאפשר עיבוד מצטבר של צינור עיבוד הנתונים של קבוצת הקבצים.
צינורות עיבוד נתונים מצטברים של קבוצות
אפשר להשתמש ב-placeholders של תאריך ושעה כדי לציין פורמט של קובץ קלט מצטבר לצינור להעברת נתונים באצווה.
- אפשר להשתמש במחזיקי מקום לשנה, לחודש, לתאריך, לשעה, לדקה ולשנייה, והם צריכים להיות בפורמט
strftime(). לפני ה-placeholders מופיע סמל האחוז (%). - הפורמט של הפרמטר לא מאומת במהלך יצירת צינור.
- דוגמה: אם מציינים את הנתיב המפורמט gs://bucket/Y כנתיב קלט עם פרמטרים, המערכת תעריך אותו כ-gs://bucket/Y, כי Y ללא סימן % לפניו לא ממופה לפורמט
strftime().
- דוגמה: אם מציינים את הנתיב המפורמט gs://bucket/Y כנתיב קלט עם פרמטרים, המערכת תעריך אותו כ-gs://bucket/Y, כי Y ללא סימן % לפניו לא ממופה לפורמט
בכל פעם שמפעילים את צינור הנתונים של קבוצת הפריטים המתוזמנת, המערכת מעריכה את החלק של placeholder בנתיב הקלט לפי התאריך והשעה הנוכחיים (או המוזזים בזמן). ערכי התאריכים מוערכים לפי התאריך הנוכחי באזור הזמן של המשימה המתוזמנת. אם הנתיב המוערך תואם לנתיב של קובץ קלט, הקובץ נבחר לעיבוד על ידי צינור העיבוד באצווה בזמן המתוזמן.
- דוגמה: צינור להעברת נתונים באצווה מתוזמן לחזור בתחילת כל שעה לפי שעון החוף הפסיפי (PST). אם מגדירים את נתיב הקלט כפרמטר
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, ב-15 באפריל 2021 בשעה 18:00 לפי שעון החוף המערבי, נתיב הקלט יהיהgs://bucket-name/2021-04-15/prefix-18_00.csv.
שימוש בפרמטרים של הזזת זמן
אפשר להשתמש בפרמטרים של הזזת זמן של דקות או שעות עם סימן + או -.
כדי לתמוך בהתאמה של נתיב קלט עם תאריך ושעה מוערכים שעברו שינוי לפני או אחרי התאריך והשעה הנוכחיים של לוח הזמנים של צינור הנתונים, צריך להוסיף סוגריים מסולסלים לפרמטרים האלה.
בפורמט {[+|-][0-9]+[m|h]}. צינור האצווה ממשיך לחזור על עצמו בזמן המתוזמן שלו, אבל נתיב הקלט מוערך עם היסט הזמן שצוין.
- דוגמה: צינור להעברת נתונים באצווה מתוזמן לחזור בתחילת כל שעה לפי שעון החוף הפסיפי (PST). אם מגדירים את נתיב הקלט כפרמטר
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, ב-15 באפריל 2021 בשעה 18:00 לפי שעון החוף המערבי, נתיב הקלט יהיהgs://bucket-name/2021-04-15/prefix-16_00.csv.
תפקידים בפייפליין נתונים
כדי שפעולות של פייפליין נתונים ב-Dataflow יתבצעו בהצלחה, אתם צריכים את תפקידי ה-IAM הנדרשים, כמו שמתואר בהמשך:
כדי לבצע פעולות, צריך את התפקיד המתאים:
-
Datapipelines.admin: יכול לבצע את כל הפעולות בפייפליין של הנתונים -
Datapipelines.viewer: יכולים לראות צינורות עיבוד נתונים ועבודות -
Datapipelines.invoker: יכול להפעיל ריצה של משימת פייפליין (אפשר להפעיל את התפקיד הזה באמצעות ה-API)
-
לחשבון השירות שבו משתמש Cloud Scheduler צריך להיות מוקצה התפקיד
roles/iam.serviceAccountUser, בין אם מדובר בחשבון שירות שהוגדר על ידי המשתמש או בחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine. מידע נוסף זמין במאמר בנושא תפקידים בפייפליין נתונים.כדי לפעול כחשבון השירות שבו משתמשים ב-Cloud Scheduler וב-Dataflow, צריך לקבל את התפקיד
roles/iam.serviceAccountUserבחשבון הזה. אם לא בוחרים חשבון שירות ל-Cloud Scheduler ול-Dataflow, נעשה שימוש בחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine.
יצירת פייפליין
אפשר ליצור פייפליין של Dataflow בשתי דרכים:
דף ההגדרה של צינורות הנתונים: כשנכנסים בפעם הראשונה לתכונה Dataflow pipelines במסוף Google Cloud , נפתח דף הגדרה. מפעילים את ממשקי ה-API שמופיעים ברשימה כדי ליצור צינורות עיבוד נתונים.
ייבוא משרה
אתם יכולים לייבא משימת אצווה או סטרימינג של Dataflow שמבוססת על תבנית קלאסית או תבנית גמישה ולהפוך אותה לפייפליין.
נכנסים לדף Jobs של Dataflow במסוף Google Cloud .
בוחרים משימה שהושלמה, ואז בדף פרטי המשימה לוחצים על +ייבוא כצינור.
בדף יצירת צינור מתוך תבנית, הפרמטרים מאוכלסים באפשרויות של המשימה שיובאה.
בקטע Schedule your pipeline (תזמון צינור עיבוד הנתונים) של משימת אצווה, מציינים לוח זמנים של חזרה. אפשר לספק כתובת אימייל לחשבון בשביל Cloud Scheduler, שמשמש לתזמון של הפעלות אצווה. אם לא מציינים חשבון שירות, המערכת משתמשת בחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine.
יצירת פייפליין
נכנסים לדף Data pipelines של Dataflow במסוף Google Cloud .
לוחצים על +יצירת פייפליין.
בדף Create pipeline from template (יצירת צינור עיבוד נתונים מתבנית), מציינים שם לצינור העיבוד וממלאים את שאר השדות של בחירת התבנית והפרמטרים.
בקטע Schedule your pipeline (תזמון צינור עיבוד הנתונים) של משימת אצווה, מציינים לוח זמנים של חזרה. אפשר לספק כתובת אימייל לחשבון בשביל Cloud Scheduler, שמשמש לתזמון של הפעלות אצווה. אם לא מציינים ערך, המערכת משתמשת בחשבון השירות המוגדר כברירת מחדל ב-Compute Engine.
יצירת פייפליין נתונים של עיבוד באצווה
כדי ליצור את פייפליין הנתונים לדוגמה הזה, צריך שתהיה לכם גישה למשאבים הבאים בפרויקט:
- קטגוריה של Cloud Storage לאחסון קובצי קלט ופלט
- מערך נתונים ב-BigQuery כדי ליצור טבלה.
בצינור הזה לדוגמה נעשה שימוש בתבנית של צינור אצווה Cloud Storage Text to BigQuery. התבנית הזו קוראת קבצים בפורמט CSV מ-Cloud Storage, מריצה טרנספורמציה ואז מוסיפה ערכים לטבלה ב-BigQuery עם שלוש עמודות.
יוצרים את הקבצים הבאים בכונן המקומי:
קובץ
bq_three_column_table.jsonשמכיל את הסכימה הבאה של טבלת היעד ב-BigQuery.{ "BigQuery Schema": [ { "name": "col1", "type": "STRING" }, { "name": "col2", "type": "STRING" }, { "name": "col3", "type": "INT64" } ] }קובץ JavaScript, שמבצע טרנספורמציה פשוטה בנתוני הקלט לפני ההוספה ל-BigQuery.
split_csv_3cols.jsfunction transform(line) { var values = line.split(','); var obj = new Object(); obj.col1 = values[0]; obj.col2 = values[1]; obj.col3 = values[2]; var jsonString = JSON.stringify(obj); return jsonString; }קובץ CSV עם כמה רשומות שמוכנסות לטבלה ב-BigQuery.
file01.csvb8e5087a,74,27531 7a52c051,4a,25846 672de80f,cd,76981 111b92bf,2e,104653 ff658424,f0,149364 e6c17c75,84,38840 833f5a69,8f,76892 d8c833ff,7d,201386 7d3da7fb,d5,81919 3836d29b,70,181524 ca66e6e5,d7,172076 c8475eb6,03,247282 558294df,f3,155392 737b82a8,c7,235523 82c8f5dc,35,468039 57ab17f9,5e,480350 cbcdaf84,bd,354127 52b55391,eb,423078 825b8863,62,88160 26f16d4f,fd,397783
כדי להעתיק את הקבצים לתיקיות בקטגוריה של Cloud Storage בפרויקט, משתמשים בפקודה
gcloud storage cpבאופן הבא:העתקת
bq_three_column_table.jsonו-split_csv_3cols.jsאלgs://BUCKET_ID/text_to_bigquery/gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/העתקה של
file01.csvאלgs://BUCKET_ID/inputs/gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
במסוף Google Cloud , נכנסים לדף Buckets של Cloud Storage.
כדי ליצור תיקייה
tmpבקטגוריה של Cloud Storage, בוחרים את שם התיקייה כדי לפתוח את דף פרטי הקטגוריה, ואז לוחצים על יצירת תיקייה.
נכנסים לדף Data pipelines של Dataflow במסוף Google Cloud .
בוחרים באפשרות יצירת פייפליין. בדף יצירת צינור מתוך תבנית, מזינים או בוחרים את הפריטים הבאים:
- בשדה Pipeline name (שם צינור עיבוד הנתונים), מזינים
text_to_bq_batch_data_pipeline. - בקטע Regional endpoint (נקודת קצה אזורית), בוחרים אזור ב-Compute Engine. אזורי המקור והיעד חייבים להיות זהים. לכן, הקטגוריה של Cloud Storage והטבלה ב-BigQuery צריכות להיות באותו אזור.
בקטע Dataflow template (תבנית Dataflow), בProcess Data in Bulk (batch) (עיבוד נתונים בכמות גדולה (אצווה)), בוחרים באפשרות Text Files on Cloud Storage to BigQuery (קבצי טקסט ב-Cloud Storage ל-BigQuery).
בקטע Schedule your pipeline (תזמון צינור הנתונים), בוחרים תזמון, למשל Hourly (שעתי) בדקה 25, באזור הזמן שלכם. אחרי ששולחים את צינור הנתונים, אפשר לערוך את לוח הזמנים. אפשר לספק כתובת אימייל לחשבון Cloud Scheduler, שמשמש לתזמון של הפעלות אצווה. אם לא מציינים חשבון שירות, המערכת משתמשת בחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine.
בקטע פרמטרים נדרשים, מזינים את הפרטים הבאים:
- בקטע JavaScript UDF path in Cloud Storage (נתיב של פונקציית UDF ב-JavaScript ב-Cloud Storage):
gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
- בנתיב JSON:
BUCKET_ID/text_to_bigquery/bq_three_column_table.json
- בשדה שם פונקציית UDF של JavaScript:
transform - עבור טבלת הפלט של BigQuery:
PROJECT_ID:DATASET_ID.three_column_table
- בקטע Cloud Storage input path (נתיב קלט של Cloud Storage):
BUCKET_ID/inputs/file01.csv
- בקטע ספריית BigQuery זמנית:
BUCKET_ID/tmp
- במיקום זמני:
BUCKET_ID/tmp
- בקטע JavaScript UDF path in Cloud Storage (נתיב של פונקציית UDF ב-JavaScript ב-Cloud Storage):
לוחצים על יצירת צינור.
- בשדה Pipeline name (שם צינור עיבוד הנתונים), מזינים
בדף פרטי צינור, מאשרים את המידע על צינור ההפקה והתבנית ורואים את ההיסטוריה הנוכחית והקודמת.
אפשר לערוך את התזמון של פייפליין הנתונים בחלונית פרטי פייפליין בדף פרטי פייפליין.
אפשר גם להפעיל צינור עיבוד נתונים באצווה לפי דרישה באמצעות הלחצן הפעלה במסוף Dataflow Pipelines.
יצירת פייפליין נתונים לדוגמה בסטרימינג
כדי ליצור פייפליין לדוגמה להעברת נתונים בסטרימינג, פועלים לפי ההוראות ליצירת פייפליין לדוגמה להעברת נתונים באצווה, עם ההבדלים הבאים:
- בקטע Pipeline schedule (תזמון פייפליין), לא מציינים תזמון לפייפליין נתונים בסטרימינג. משימת הסטרימינג של Dataflow מתחילה באופן מיידי.
- בקטע תבנית Dataflow, באפשרות עיבוד נתונים באופן רציף (סטרימינג), בוחרים באפשרות קבצי טקסט ב-Cloud Storage ל-BigQuery.
- בסוג מכונת העובד, צינור הנתונים מעבד את קבוצת הקבצים הראשונית שתואמת לדפוס
gs://BUCKET_ID/inputs/file01.csv, וגם את כל הקבצים הנוספים שתואמים לדפוס הזה שאתם מעלים לתיקייהinputs/. אם גודל קובצי ה-CSV חורג מכמה גיגה-בייט, כדי להימנע משגיאות אפשריות של חוסר זיכרון, צריך לבחור סוג מכונה עם זיכרון גבוה יותר מסוג המכונה שמוגדר כברירת מחדלn1-standard-4, כמוn1-highmem-8.
פתרון בעיות
בקטע הזה מוסבר איך לפתור בעיות בצינורות נתונים של Dataflow.
הפעלת משימה של פייפליין נתונים נכשלת
כשמשתמשים בצינורות עיבוד נתונים כדי ליצור תזמון של משימות חוזרות, יכול להיות שהמשימה ב-Dataflow לא תופעל, ושגיאת סטטוס 503 תופיע בקובצי היומן של Cloud Scheduler.
הבעיה הזו מתרחשת כשאין ל-Dataflow אפשרות להריץ את העבודה באופן זמני.
כדי לפתור את הבעיה, מגדירים את Cloud Scheduler כך שינסה שוב להריץ את העבודה. מכיוון שהבעיה זמנית, יכול להיות שהניסיון החוזר של העבודה יצליח. למידע נוסף על הגדרת ערכי ניסיון חוזר ב-Cloud Scheduler, אפשר לעיין במאמר בנושא יצירת משימה.
בדיקת הפרות של יעדים בצינור עיבוד נתונים
בקטעים הבאים מוסבר איך לבדוק צינורות שלא עומדים ביעדי הביצועים.
צינורות עיבוד נתונים חוזרים באצווה
כדי לבצע ניתוח ראשוני של תקינות צינור הנתונים, בדף Pipeline info במסוף Google Cloud , משתמשים בתרשימים Individual job status ו-Thread time per step. התרשימים האלה נמצאים בחלונית של סטטוס צינור המכירות.
חקירה לדוגמה:
יש לכם צינור להעברת נתונים של אצווה חוזרת שפועל כל שעה, 3 דקות אחרי תחילת השעה. כל משימה פועלת בדרך כלל במשך כ-9 דקות. יש לכם מטרה להשלים את כל העבודות תוך פחות מ-10 דקות.
בתרשים של סטטוס העבודה מוצג שעבודה מסוימת רצה במשך יותר מ-10 דקות.
בטבלת ההיסטוריה Update/Execution, מחפשים את העבודה שהופעלה במהלך השעה הרלוונטית. לוחצים כדי לעבור לדף הפרטים של המשימה ב-Dataflow. בדף הזה, מאתרים את השלב שפועל הכי הרבה זמן, ואז מחפשים ביומנים שגיאות אפשריות כדי להבין מה הסיבה לעיכוב.
צינורות עיבוד נתונים בסטרימינג
כדי לבצע ניתוח ראשוני של תקינות הצינור, משתמשים בתרשים של רעננות הנתונים בכרטיסייה Pipeline info בדף Pipeline Details. התרשים הזה נמצא בחלונית הסטטוס של צינור הנתונים.
חקירה לדוגמה:
יש לכם צינור סטרימינג שבדרך כלל יוצר פלט עם רעננות נתונים של 20 שניות.
הגדרתם יעד של עדכניות נתונים למשך 30 שניות. כשבודקים את התרשים של עדכניות הנתונים, אפשר לראות שבין 9:00 ל-10:00 בבוקר, עדכניות הנתונים עלתה כמעט ל-40 שניות.

עוברים לכרטיסייה Pipeline metrics וצופים בתרשימים CPU Utilization ו-Memory Utilization כדי לבצע ניתוח נוסף.
שגיאה: מזהה צינור העיבוד כבר קיים בפרויקט
אם מנסים ליצור צינור חדש עם שם שכבר קיים בפרויקט, מוצגת הודעת השגיאה Pipeline Id already exist within the
project. כדי להימנע מהבעיה הזו, חשוב לבחור תמיד שמות ייחודיים לצינורות.