בדף הזה מתוארות מאפייני הביצועים של משימות סטרימינג של Dataflow שקוראות מ-Pub/Sub וכותבות ל-BigQuery. הוא מספק תוצאות של בדיקות השוואה לשני סוגים של צינורות עיבוד נתונים של סטרימינג:
מיפוי בלבד (טרנספורמציה לכל הודעה): צינורות עיבוד נתונים שמבצעים טרנספורמציות לכל הודעה, בלי לעקוב אחרי מצב או לקבץ רכיבים בזרם. לדוגמה, ETL, אימות שדות ומיפוי סכימות.
צבירה בחלון זמן (
GroupByKey): צינורות עיבוד נתונים שמבצעים פעולות עם שמירת מצב ומקבצים נתונים על סמך מפתח וחלון זמן. דוגמאות: ספירת אירועים, חישוב סכומים ואיסוף רשומות לסשן של משתמש.
רוב עומסי העבודה לשילוב נתונים בזמן אמת נכללים בשתי הקטגוריות האלה. אם צינור עיבוד הנתונים שלכם פועל לפי דפוס דומה, תוכלו להשתמש במדדי הביצועים האלה כדי להעריך את משימת Dataflow בהשוואה להגדרת ייחוס עם ביצועים טובים.
מתודולוגיית הבדיקה
ההשוואות בוצעו באמצעות המשאבים הבאים:
נושא Pub/Sub שהוקצה מראש עם עומס קלט קבוע. ההודעות נוצרו באמצעות התבנית Streaming Data Generator.
- קצב העברת הודעות: בערך מיליון הודעות בשנייה
- עומס קלט: 1 GiB/s
- פורמט ההודעה: טקסט JSON שנוצר באופן אקראי עם סכימה קבועה
- גודל ההודעה: בערך 1 KiB לכל הודעה
טבלה ב-BigQuery רגילה.
צינורות עיבוד נתונים בסטרימינג של Dataflow שמבוססים על התבנית Pub/Sub to BigQuery. הצינורות האלה מבצעים את הניתוח הנדרש המינימלי ומיפוי הסכימה. לא נעשה שימוש בפונקציה מותאמת אישית בהגדרת המשתמש (UDF).
אחרי שהתייצב הגידול האופקי והצינור הגיע למצב יציב, הצינורות הורצו למשך יום בערך, ואז התוצאות נאספו ונותחו.
צינורות עיבוד נתונים של Dataflow
נבדקו שתי גרסאות של צינור עיבוד הנתונים:
צינור עיבוד נתונים עם מיפוי בלבד. בצינור הזה מתבצע מיפוי והמרה פשוטים של הודעות JSON. בבדיקה הזו נעשה שימוש בתבנית Pub/Sub ל-BigQuery ללא שינוי.
- סמנטיקה: הצינור נבדק באמצעות מצב בדיוק פעם אחת וגם באמצעות מצב לפחות פעם אחת. עיבוד של לפחות פעם אחת מספק תפוקה טובה יותר. עם זאת, צריך להשתמש בו רק אם אפשר לקבל רשומות כפולות או אם יעד הנתונים מטפל בביטול כפילויות.
צינור לעיבוד נתונים עם צבירה לפי חלונות זמן. בצינור הזה, ההודעות מקובצות לפי מפתח ספציפי בחלונות בגודל קבוע, והרשומות המצטברות נכתבות ל-BigQuery. לצורך הבדיקה הזו, נעשה שימוש בצינור (pipeline) מותאם אישית של Apache Beam שמבוסס על התבנית Pub/Sub to BigQuery.
לוגיקת צבירה: לכל חלון קבוע של דקה אחת, ללא חפיפה, נאספו הודעות עם אותו מפתח ונכתבו כרשומה מצטברת אחת ב-BigQuery. בדרך כלל משתמשים בסוג הזה של צבירת נתונים בעיבוד רישום ביומן כדי לשלב אירועים קשורים, כמו פעילות של משתמש, ברשומה אחת לצורך Analysis בהמשך.
מקביליות של מפתחות: במבחן הביצועים נעשה שימוש ב-1,000,000 מפתחות עם התפלגות אחידה.
סמנטיקה: הצינור נבדק באמצעות מצב בדיוק פעם אחת. צבירות דורשות סמנטיקה של בדיוק פעם אחת כדי להבטיח את הנכונות, וכדי למנוע ספירה כפולה בתוך קבוצה וחלון.
הגדרת משרה
בטבלה הבאה מוצגת ההגדרה של משימות Dataflow.
| הגדרה | מפה בלבד, בדיוק פעם אחת | מפה בלבד, לפחות פעם אחת | צבירת נתונים בחלון, בדיוק פעם אחת |
|---|---|---|---|
| סוג המכונה של העובד | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| מעבדי vCPU במכונת העובד | 2 | 2 | 2 |
| זיכרון RAM של מכונת Worker | 7.5GiB | 7.5GiB | 7.5GiB |
| דיסק אחסון מתמיד (persistent disk) של מכונת Worker | Standard Persistent Disk (HDD), 30 GB | Standard Persistent Disk (HDD), 30 GB | Standard Persistent Disk (HDD), 30 GB |
| עובדים ראשוניים | 70 | 30 | 180 |
| מספר העובדים המקסימלי | 100 | 100 | 250 |
| מנוע סטרימינג | כן | כן | כן |
| התאמה אופקית לעומס (horizontal autoscaling) | כן | כן | כן |
| מודל חיוב | חיוב לפי משאבים | חיוב לפי משאבים | חיוב לפי משאבים |
| האם Storage Write API מופעל? | כן | כן | כן |
| זרמי Storage Write API | 200 | לא רלוונטי | 500 |
| תדירות ההפעלה של Storage Write API | 5 שניות | לא רלוונטי | 5 שניות |
מומלץ להשתמש ב-BigQuery Storage Write API בצינורות להעברת נתונים בזמן אמת. כשמשתמשים במצב 'פעם אחת בדיוק' עם Storage Write API, אפשר לשנות את ההגדרות הבאות:
מספר זרמי הכתיבה. כדי להבטיח מקביליות מספקת של מפתחות בשלב הכתיבה, צריך להגדיר את מספר הזרמים של Storage Write API לערך שגדול ממספר המעבדים של העובדים, תוך שמירה על רמה סבירה של תפוקת זרם הכתיבה ב-BigQuery.
תדירות ההפעלה. ערך של שנייה אחת עם ספרה אחת מתאים לצינורות עם תפוקה גבוהה.
מידע נוסף זמין במאמר בנושא כתיבה מ-Dataflow ל-BigQuery.
תוצאות ההשוואה לשוק
בקטע הזה מתוארות התוצאות של בדיקות ההשוואה.
תפוקה ושימוש במשאבים
בטבלה הבאה מוצגות תוצאות הבדיקה של קצב העברת הנתונים בצינור ושל השימוש במשאבים.
| תוצאה | מפה בלבד, בדיוק פעם אחת | מפה בלבד, לפחות פעם אחת | צבירה בחלון, בדיוק פעם אחת |
|---|---|---|---|
| קצב העברת נתונים של קלט לכל עובד | ממוצע: 17 MBps, n=3 | ממוצע: 21 MBps, n=3 | ממוצע: 6 MBps, n=3 |
| ממוצע השימוש במעבד בכל העובדים | ממוצע: 65%, n=3 | ממוצע: 69%, n=3 | ממוצע: 80%, n=3 |
| מספר צמתי העובדים | ממוצע: 57, n=3 | ממוצע: 48, n=3 | ממוצע: 169, n=3 |
| יחידות מחשוב של Streaming Engine לשעה | ממוצע: 125, n=3 | ממוצע: 46, n=3 | ממוצע: 354, n=3 |
אלגוריתם ההתאמה האוטומטית לעומס יכול להשפיע על רמת השימוש הממוצעת במעבד (CPU). כדי להשיג יעד גבוה או נמוך יותר של ניצול מעבד (CPU), אפשר להגדיר את טווח ההתאמה האוטומטית לעומס או את ההנחיה לניצול העובדים. יעדי ניצול גבוהים יותר יכולים להוביל לעלויות נמוכות יותר, אבל גם לזמן אחזור גרוע יותר של הזנב, במיוחד בעומסים משתנים.
בצינור צבירה של חלון, לסוג הצבירה, לגודל החלון ולמקביליות של המפתח יכולה להיות השפעה גדולה על השימוש במשאבים.
זמן אחזור
בטבלה הבאה מוצגות תוצאות ההשוואה לזמן האחזור של צינור העיבוד.
| השהיה הכוללת מקצה לקצה בשלב | מפה בלבד, בדיוק פעם אחת | מפה בלבד, לפחות פעם אחת | צבירה בחלון, בדיוק פעם אחת |
|---|---|---|---|
| P50 | ממוצע: 800 אלפיות השנייה, n=3 | ממוצע: 160 אלפיות השנייה, n=3 | ממוצע: 3,400 אלפיות השנייה, n=3 |
| P95 | ממוצע: 2,000 אלפיות השנייה, n=3 | ממוצע: 250 אלפיות השנייה, n=3 | ממוצע: 13,000 אלפיות השנייה, n=3 |
| P99 | ממוצע: 2,800 אלפיות השנייה, n=3 | ממוצע: 410 אלפיות השנייה, n=3 | ממוצע: 25,000 אלפיות השנייה, n=3 |
בבדיקות נמדד זמן האחזור מקצה לקצה בכל שלב (מדד job/streaming_engine/stage_end_to_end_latencies) בשלושה מחזורי בדיקה ארוכים. המדד הזה מודד את משך הזמן שבו Streaming Engine מבלה בכל שלב בצינור. הוא כולל את כל השלבים הפנימיים של הצינור, כמו:
- ערבוב והוספה לתור של הודעות לעיבוד
- זמן העיבוד בפועל, לדוגמה, המרת הודעות לאובייקטים של שורות
- כתיבת מצב מתמשך, וגם הזמן שחלף בתור לכתיבת מצב מתמשך
מדד נוסף של זמן האחזור הוא עדכניות הנתונים. עם זאת, עדכניות הנתונים מושפעת מגורמים כמו חלונות זמן שמוגדרים על ידי המשתמש ועיכובים במעלה הזרם במקור. זמן האחזור של המערכת מספק בסיס אובייקטיבי יותר ליעילות ולתקינות של העיבוד הפנימי של צינור הנתונים בעומס.
הנתונים נמדדו במשך יום אחד בערך לכל הרצה, והתקופות הראשוניות של ההפעלה נמחקו כדי לשקף ביצועים יציבים במצב יציב. התוצאות מראות שני גורמים שמוסיפים זמן אחזור:
מצב 'פעם אחת בדיוק'. כדי להשיג סמנטיקה של בדיוק פעם אחת, צריך לבצע ערבוב דטרמיניסטי ולחפש מצב מתמשך לצורך ביטול כפילויות. מצב 'לפחות פעם אחת' פועל הרבה יותר מהר, כי הוא מדלג על השלבים האלה.
צבירת נתונים בחלון זמן. ההודעות צריכות להיות מעורבבות, מאוחסנות בזיכרון המטמון ונכתבות במצב קבוע לפני סגירת החלון, מה שמוסיף לזמן האחזור מקצה לקצה.
המדדים להשוואה שמוצגים כאן מייצגים נקודת התחלה. ההשהיה רגישה מאוד למורכבות של צינור הנתונים. UDF בהתאמה אישית, טרנספורמציות נוספות ולוגיקה מורכבת של עיבוד החלק הנצפה בלבד יכולות להגדיל את זמן האחזור. בדרך כלל, צבירות פשוטות שמצמצמות מאוד את הנתונים, כמו sum ו-count, מובילות לזמן אחזור קצר יותר מאשר פעולות שדורשות הרבה משאבים, כמו איסוף רכיבים לרשימה.
הערכת עלויות
כדי להעריך את עלות הבסיס של צינור דומה משלכם באמצעות חיוב לפי משאבים, אתם יכולים להשתמש במחשבון התמחור של Google Cloud Platform באופן הבא:
- פותחים את מחשבון העלויות.
- לוחצים על הוספה לאומדן.
- בוחרים באפשרות Dataflow.
- בקטע סוג השירות, בוחרים באפשרות Dataflow Classic.
- בוחרים באפשרות הגדרות מתקדמות כדי לראות את כל האפשרויות.
- בוחרים את המיקום שבו העבודה תפעל.
- בקטע סוג העבודה, בוחרים באפשרות 'סטרימינג'.
- בוחרים באפשרות הפעלת מנוע הסטרימינג.
- מזינים את המידע על שעות הפעלת העבודה, צמתי העובדים, מכונות העובדים ואחסון Persistent Disk.
- מזינים את המספר המשוער של יחידות החישוב של Streaming Engine.
השימוש במשאבים והעלות גדלים בערך באופן לינארי עם קצב העברת הנתונים של הקלט, אבל בעבודות קטנות עם מספר קטן של עובדים, העלות הכוללת מושפעת בעיקר מהעלויות הקבועות. כנקודת התחלה, אפשר להסיק את מספר צמתי העובדים ואת צריכת המשאבים מתוצאות ההשוואה.
לדוגמה, נניח שאתם מפעילים צינור (pipeline) של מיפוי בלבד במצב 'פעם אחת בדיוק' (exactly-once), עם קצב נתונים של 100MiB/s. על סמך תוצאות ההשוואה לצינור עיבוד נתונים של 1 GiB/s, אפשר לאמוד את דרישות המשאבים באופן הבא:
- גורם לקביעת קנה מידה: (100 MiB/s) / (1 GiB/s) = 0.1
- צמתים משוערים של עובדים: 57 עובדים × 0.1 = 5.7 עובדים
- מספר יחידות החישוב הצפוי של Streaming Engine לשעה: 125 × 0.1 = 12.5 יחידות לשעה
הערך הזה צריך לשמש רק כאומדן ראשוני. התפוקה והעלות בפועל עשויות להשתנות באופן משמעותי, בהתאם לגורמים כמו סוג המכונה, חלוקת גודל ההודעה, קוד המשתמש, סוג הצבירה, מקביליות המפתח וגודל החלון. מידע נוסף זמין במאמר שיטות מומלצות לאופטימיזציה של עלויות ב-Dataflow.
הפעלת צינור עיבוד נתונים לבדיקה
בקטע הזה מוצגות הפקודות gcloud dataflow flex-template run ששימשו להפעלת צינור הנתונים של המפה בלבד.
מצב 'פעם אחת בדיוק'
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
מצב 'לפחות פעם אחת'
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
מחליפים את מה שכתוב בשדות הבאים:
-
JOB_ID: מזהה המשימה ב-Dataflow -
PROJECT_ID: מזהה הפרויקט -
SUBSCRIPTION_NAME: השם של המינוי ל-Pub/Sub -
DATASET: השם של מערך הנתונים ב-BigQuery -
TABLE_NAME: השם של הטבלה ב-BigQuery
יצירת נתונים לבדיקה
כדי ליצור נתוני בדיקה, מריצים את תבנית הגנרטור של נתונים בזרימה באמצעות הפקודה הבאה:
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
מחליפים את מה שכתוב בשדות הבאים:
-
JOB_ID: מזהה המשימה ב-Dataflow -
PROJECT_ID: מזהה הפרויקט -
TOPIC_NAME: השם של נושא ה-Pub/Sub -
SCHEMA_LOCATION: הנתיב לקובץ סכימה ב-Cloud Storage
תבנית מחולל נתוני הסטרימינג משתמשת בקובץ מחולל נתוני JSON כדי להגדיר את סכימת ההודעות. במבחני ההשוואה נעשה שימוש בסכימת הודעות שדומה לזו שמוצגת בהמשך:
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
השלבים הבאים
- שימוש בממשק למעקב אחרי משימות ב-Dataflow
- שיטות מומלצות לאופטימיזציה של העלויות ב-Dataflow
- פתרון בעיות של עבודות סטרימינג איטיות או תקועות
- קריאה מ-Pub/Sub אל Dataflow
- כתיבה מ-Dataflow ל-BigQuery