מאפייני הביצועים של צינורות עיבוד נתונים מ-Kafka ל-BigQuery

בדף הזה מתוארים מאפייני הביצועים של משימות Dataflow streaming שקוראות מ-Apache Kafka וכותבות ל-BigQuery. היא מספקת תוצאות של בדיקות השוואה לצינורות רק של מיפוי, שמבצעים טרנספורמציות לכל הודעה בלי לעקוב אחרי מצב או לקבץ רכיבים בזרם.

עומסי עבודה רבים של שילוב נתונים, כולל ETL, אימות שדות ומיפוי סכימות, נכללים בקטגוריה map-only. אם צינור עיבוד הנתונים שלכם פועל לפי הדפוס הזה, תוכלו להשתמש בנקודות ההשוואה האלה כדי להעריך את משימת ה-Dataflow שלכם ביחס להגדרת הפניה עם ביצועים טובים.

מתודולוגיית הבדיקה

ההשוואות בוצעו באמצעות המשאבים הבאים:

  • קלאסטר של שירות מנוהל ל-Apache Kafka. ההודעות נוצרו באמצעות התבנית Streaming Data Generator.

    • קצב העברת הודעות: בערך מיליון הודעות בשנייה
    • עומס קלט: 1 GiB/s
    • פורמט ההודעה: טקסט JSON שנוצר באופן אקראי עם סכימה קבועה
    • גודל ההודעה: בערך 1 KiB לכל הודעה
    • מחיצות Kafka: 1,000
  • טבלה ב-BigQuery רגילה.

  • צינור עיבוד נתונים בסטרימינג של Dataflow שהשתמש בתבנית Apache Kafka ל-BigQuery. בצינור הזה מתבצעים הניתוח והמיפוי של הסכימה שנדרשים לפחות. לא נעשה שימוש בפונקציה מותאמת אישית בהגדרת המשתמש (UDF).

אחרי שהתייצב הגידול האופקי והצינור הגיע למצב יציב, הצינורות הורצו למשך יום בערך, ואז התוצאות נאספו ונותחו.

צינור עיבוד נתונים של Dataflow

ההשוואה הזו מתבססת על צינור (pipeline) למיפוי בלבד, שמבצע מיפוי פשוט והמרה של הודעות JSON. הצינור נבדק באמצעות מצב בדיוק פעם אחת ומצב של פעם אחת לפחות. עיבוד של לפחות פעם אחת מספק תפוקה טובה יותר. עם זאת, צריך להשתמש בו רק אם אפשר לקבל רשומות כפולות או אם יעד הנתונים מטפל בביטול כפילויות.

הגדרת משרה

בטבלה הבאה מוצגת ההגדרה של משימות Dataflow.

הגדרה ערך
סוג המכונה של העובד e2-standard-2
מעבדי vCPU במכונת העובד 2
זיכרון RAM של מכונת Worker 8 GB
דיסק אחסון מתמיד (persistent disk) של מכונת Worker Standard Persistent Disk (HDD), 30 GB
מספר העובדים המקסימלי 120
מנוע סטרימינג כן
התאמה אופקית לעומס (horizontal autoscaling) כן
מודל חיוב חיוב לפי משאבים
האם Storage Write API מופעל? כן
זרמי Storage Write API 400
תדירות ההפעלה של Storage Write API ‫5 שניות
פורמט ההודעה JSON
מצב אימות של Kafka

Application Default Credentials ‏ (ADC).

מידע נוסף זמין במאמר סוגי אימות לשרתי Kafka.

מומלץ להשתמש ב-BigQuery Storage Write API בצינורות להעברת נתונים בזמן אמת. כשמשתמשים במצב 'פעם אחת בדיוק' עם Storage Write API, אפשר לשנות את ההגדרות הבאות:

מידע נוסף זמין במאמר בנושא כתיבה מ-Dataflow ל-BigQuery.

חשוב גם להקדיש תשומת לב מיוחדת למספר המחיצות של Apache Kafka. כדי להבטיח מספיק מקביליות של מפתחות בשלב הקריאה, מספר המחיצות צריך להיות לפחות שווה למספר הכולל של מעבדי ה-vCPU של העובדים. מידע נוסף זמין במאמר קריאה מ-Apache Kafka אל Dataflow.

תוצאות ההשוואה לשוק

בקטע הזה מתוארות התוצאות של בדיקות ההשוואה.

תפוקה ושימוש במשאבים

בטבלה הבאה מוצגות תוצאות הבדיקה של קצב העברת הנתונים בצינור ושל השימוש במשאבים.

תוצאה בדיוק פעם אחת לפחות פעם אחת
קצב העברת נתונים של קלט לכל עובד ממוצע: ‎15 MBps, n=3 ממוצע: ‎18 MBps, n=3
ממוצע השימוש במעבד בכל העובדים ממוצע: 70%, n=3 ממוצע: 75%, n=3
מספר צמתי העובדים ממוצע: 63, n=3 ממוצע: 53, n=3
יחידות מחשוב של Streaming Engine לשעה ממוצע: 58, n=3 ממוצע: 0, n=3

אלגוריתם ההתאמה האוטומטית לעומס יכול להשפיע על רמת השימוש הממוצעת במעבד (CPU). כדי להשיג יעד גבוה או נמוך יותר של ניצול מעבד (CPU), אפשר להגדיר את טווח ההתאמה האוטומטית לעומס או את ההנחיה לניצול העובדים. יעדי ניצול גבוהים יותר יכולים להוביל לעלויות נמוכות יותר, אבל גם לזמן אחזור גרוע יותר של הזנב, במיוחד בעומסים משתנים.

זמן אחזור

בטבלה הבאה מוצגות תוצאות ההשוואה של זמן האחזור של צינור עיבוד הנתונים במצב 'פעם אחת בדיוק', לא כולל שלב הקלט.

זמן האחזור הכולל מקצה לקצה של השלב, לא כולל שלב הקלט בדיוק פעם אחת
P50 ממוצע: 1,200 אלפיות השנייה, n=3
P95 ממוצע: 3,000 אלפיות השנייה, n=3
P99 ממוצע: 5,400 אלפיות השנייה, n=3

הבדיקות מדדו את זמן האחזור מקצה לקצה בכל שלב (המדד job/streaming_engine/stage_end_to_end_latencies) בשלוש הרצות של בדיקות ארוכות. המדד הזה מודד את משך הזמן שבו Streaming Engine מבלה בכל שלב בצינור. הוא כולל את כל השלבים הפנימיים של הצינור, כמו:

  • ערבוב והוספה לתור של הודעות לעיבוד
  • זמן העיבוד בפועל, לדוגמה, המרת הודעות לאובייקטים של שורות
  • כתיבת מצב מתמשך, וגם הזמן שחלף בתור לכתיבת מצב מתמשך

בגלל מגבלה של המדד, זמן האחזור של שלב הקלט לא מדווח. לכן הוא לא נכלל בסכום הכולל.

המדדים להשוואה שמוצגים כאן מייצגים נקודת התחלה. ההשהיה רגישה מאוד למורכבות של צינור הנתונים. UDF בהתאמה אישית, טרנספורמציות נוספות ולוגיקה מורכבת של עיבוד החלק הנצפה בלבד יכולות להגדיל את זמן האחזור.

הערכת עלויות

כדי להעריך את עלות הבסיס של צינור דומה משלכם באמצעות חיוב לפי משאבים, אתם יכולים להשתמש במחשבון התמחור של Google Cloud Platform באופן הבא:

  1. פותחים את מחשבון העלויות.
  2. לוחצים על הוספה לאומדן.
  3. בוחרים באפשרות Dataflow.
  4. בקטע סוג השירות, בוחרים באפשרות Dataflow Classic.
  5. בוחרים באפשרות הגדרות מתקדמות כדי לראות את כל האפשרויות.
  6. בוחרים את המיקום שבו העבודה תפעל.
  7. בקטע סוג העבודה, בוחרים באפשרות 'סטרימינג'.
  8. בוחרים באפשרות הפעלת מנוע הסטרימינג.
  9. מזינים את המידע על שעות הפעלת העבודה, צמתי העובדים, מכונות העובדים ואחסון Persistent Disk.
  10. מזינים את המספר המשוער של יחידות החישוב של Streaming Engine.

השימוש במשאבים והעלות גדלים בערך באופן לינארי עם קצב העברת הנתונים של הקלט, אבל בעבודות קטנות עם מספר קטן של עובדים, העלות הכוללת מושפעת בעיקר מהעלויות הקבועות. כנקודת התחלה, אפשר להסיק את מספר צמתי העובדים ואת צריכת המשאבים מתוצאות ההשוואה.

לדוגמה, נניח שאתם מפעילים צינור (pipeline) של מיפוי בלבד במצב 'פעם אחת בדיוק', עם קצב נתוני קלט של 100 MiB/s. על סמך תוצאות ההשוואה לצינור עיבוד נתונים של ‎1 GiB/s, אפשר לאמוד את דרישות המשאבים באופן הבא:

  • גורם לקביעת קנה מידה: ‎(100 MiB/s) / (1 GiB/s) = 0.1
  • צמתים משוערים של עובדים: 63 עובדים × 0.1 = 6.3 עובדים
  • מספר יחידות החישוב הצפוי של Streaming Engine לשעה: 58 × 0.1 = 5.8 יחידות לשעה

הערך הזה צריך לשמש רק כאומדן ראשוני. התפוקה והעלות בפועל עשויות להשתנות באופן משמעותי, בהתאם לגורמים כמו סוג המכונה, חלוקת גודל ההודעה, קוד המשתמש, סוג הצבירה, מקביליות המפתח וגודל החלון. מידע נוסף זמין במאמר שיטות מומלצות לאופטימיזציה של עלויות ב-Dataflow.

הפעלת צינור עיבוד נתונים לבדיקה

בקטע הזה מוצגות הפקודות gcloud dataflow flex-template run ששימשו להפעלת צינור הנתונים של המפה בלבד.

מצב 'פעם אחת בדיוק'

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

מצב 'לפחות פעם אחת'

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם המשימה ב-Dataflow
  • PROJECT_ID: מזהה הפרויקט
  • KAFKA_BOOTSTRAP_ADDRESS: כתובת ה-bootstrap של אשכול Apache Kafka
  • KAFKA_TOPIC: השם של נושא Kafka
  • BQ_DATASET: השם של מערך הנתונים ב-BigQuery
  • BQ_TABLE_NAME: השם של הטבלה ב-BigQuery

יצירת נתונים לבדיקה

כדי ליצור נתוני בדיקה, מריצים את תבנית הגנרטור של נתונים בזרימה באמצעות הפקודה הבאה:

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם המשימה ב-Dataflow
  • PROJECT_ID: מזהה הפרויקט
  • SCHEMA_LOCATION: הנתיב לקובץ סכימה ב-Cloud Storage
  • KAFKA_BOOTSTRAP_ADDRESS: כתובת ה-bootstrap של אשכול Apache Kafka
  • KAFKA_TOPIC: השם של נושא Kafka

תבנית מחולל נתוני הסטרימינג משתמשת בקובץ מחולל נתוני JSON כדי להגדיר את סכימת ההודעות. במבחני ההשוואה נעשה שימוש בסכימת הודעות שדומה לזו שמוצגת בהמשך:

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

השלבים הבאים