כתיבת נתונים מ-Kafka ל-BigQuery באמצעות Dataflow

בדף הזה מוסבר איך להשתמש ב-Dataflow כדי לקרוא נתונים מ-שירות מנוהל של Google Cloud ל-Apache Kafka ולכתוב את הרשומות בטבלה ב-BigQuery. במדריך הזה משתמשים בתבנית Apache Kafka ל-BigQuery כדי ליצור את משימת Dataflow.

סקירה כללית

Apache Kafka היא פלטפורמה בקוד פתוח להזרמת אירועים. בדרך כלל משתמשים ב-Kafka בארכיטקטורות מבוזרות כדי לאפשר תקשורת בין רכיבים בצימוד חלש. אפשר להשתמש ב-Dataflow כדי לקרוא אירועים מ-Kafka, לעבד אותם ולכתוב את התוצאות בטבלה ב-BigQuery לצורך ניתוח נוסף.

השירות המנוהל ל-Apache Kafka הוא שירות של Google Cloud Platform שעוזר לכם להפעיל אשכולות מאובטחים וניתנים להרחבה של Kafka.

קריאת אירועים מ-Kafka ל-BigQuery
ארכיטקטורה מבוססת-אירועים באמצעות Apache Kafka

ההרשאות הנדרשות

לחשבון השירות של העובד (worker) ב-Dataflow צריכים להיות התפקידים הבאים בניהול הזהויות והרשאות הגישה (IAM):

  • לקוח Managed Kafka‏ (roles/managedkafka.client)
  • עריכה של נתוני BigQuery‏ (roles/bigquery.dataEditor)

מידע נוסף זמין במאמר בנושא אבטחה והרשאות ב-Dataflow.

יצירת אשכול Kafka

בשלב הזה יוצרים אשכול של שירות מנוהל ל-Apache Kafka. למידע נוסף, ראו יצירת אשכול של שירות מנוהל ל-Apache Kafka.

המסוף

  1. עוברים אל הדף שירות מנוהל ל-Apache Kafka > Clusters.

    כניסה לדף Clusters

  2. לוחצים על יצירה.

  3. בתיבה שם האשכול, מזינים שם לאשכול.

  4. ברשימה Region, בוחרים מיקום לאשכול.

  5. לוחצים על יצירה.

gcloud

משתמשים בפקודה managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER: שם לאשכול
  • REGION: האזור שבו יצרתם את רשת המשנה
  • PROJECT_ID: מזהה הפרויקט
  • SUBNET_NAME: תת-הרשת שבה רוצים לפרוס את האשכול

יצירת אשכול נמשכת בדרך כלל 20-30 דקות.

יצירת נושא Kafka

אחרי שיוצרים את האשכול של השירות המנוהל ל-Apache Kafka, יוצרים נושא.

המסוף

  1. עוברים אל הדף שירות מנוהל ל-Apache Kafka > Clusters.

    כניסה לדף Clusters

  2. לוחצים על שם האשכול.

  3. בדף הפרטים של האשכול, לוחצים על Create Topic.

  4. בתיבה שם הנושא, מזינים שם לנושא.

  5. לוחצים על יצירה.

gcloud

משתמשים בפקודה managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

מחליפים את מה שכתוב בשדות הבאים:

  • TOPIC_NAME: השם של הנושא שרוצים ליצור

יצירת טבלה ב-BigQuery

בשלב הזה יוצרים טבלה ב-BigQuery עם הסכימה הבאה:

שם העמודה סוג נתונים
name STRING
customer_id INTEGER

אם עדיין אין לכם מערך נתונים ב-BigQuery, תצטרכו ליצור אותו קודם. מידע נוסף זמין במאמר יצירת מערכי נתונים. לאחר מכן יוצרים טבלה ריקה חדשה:

המסוף

  1. עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. בחלונית Explorer מרחיבים את הפרויקט ואז בוחרים מערך נתונים.

  3. בקטע Dataset info, לוחצים על Create table.

  4. ברשימה יצירת טבלה מתוך, בוחרים באפשרות טבלה ריקה.

  5. בתיבה Table (טבלה), מזינים את שם הטבלה.

  6. בקטע Schema, לוחצים על Edit as text (עריכה כטקסט).

  7. מדביקים את הגדרת הסכימה הבאה:

    name:STRING,
    customer_id:INTEGER
    
  8. לוחצים על יצירת טבלה.

gcloud

משתמשים בפקודה bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט
  • DATASET_NAME: השם של מערך הנתונים
  • TABLE_NAME: השם של הטבלה שרוצים ליצור

הפעלת המשימה ב-Dataflow

אחרי שיוצרים את אשכול Kafka ואת הטבלה ב-BigQuery, מריצים את תבנית Dataflow.

המסוף

קודם צריך לקבל את כתובת שרת האתחול של האשכול:

  1. נכנסים לדף Clusters במסוף Google Cloud .

    כניסה לדף Clusters

  2. לוחצים על שם האשכול.

  3. לוחצים על הכרטיסייה Configurations.

  4. מעתיקים את כתובת השרת של ה-bootstrap מהשדה Bootstrap URL.

בשלב הבא, מריצים את התבנית כדי ליצור את משימת Dataflow:

  1. עוברים לדף Dataflow > משימות.

    מעבר לדף Jobs

  2. לוחצים על יצירת עבודה מתבנית.

  3. בשדה Job Name, מזינים kafka-to-bq.

  4. בקטע Regional endpoint (נקודת קצה אזורית), בוחרים את האזור שבו נמצא האשכול של השירות המנוהל ל-Apache Kafka.

  5. בוחרים בתבנית Kafka to BigQuery.

  6. מזינים את פרמטרים התבנית הבאים:

    • שרת bootstrap של Kafka: כתובת שרת ה-bootstrap
    • נושא Kafka של המקור: שם הנושא לקריאה
    • מצב אימות של מקור Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • פורמט ההודעה ב-Kafka: JSON
    • Table name strategy: SINGLE_TABLE_NAME
    • טבלת הפלט של BigQuery: טבלה ב-BigQuery, בפורמט הבא: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. בקטע תור הודעות שלא ניתן להעביר, מסמנים את האפשרות כתיבת שגיאות ל-BigQuery.

  8. מזינים שם של טבלה ב-BigQuery לתור של הודעות שלא ניתן להעביר, בפורמט הבא: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    אל תיצרו את הטבלה מראש. הפייפליין יוצר אותו.

  9. לוחצים על הפעלת העבודה.

gcloud

משתמשים בפקודה dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

מחליפים את המשתנים הבאים:

  • LOCATION: האזור שבו נמצא השירות המנוהל ל-Apache Kafka
  • PROJECT_ID: השם של הפרויקט שלכם ב-Google Cloud Platform
  • CLUSTER_ID: השם של האשכול
  • TOPIC: השם של נושא Kafka
  • DATASET_NAME: השם של מערך הנתונים
  • TABLE_NAME: שם הטבלה
  • ERROR_TABLE_NAME: שם הטבלה ב-BigQuery של תור ההודעות המתות

לא כדאי ליצור את הטבלה של תור ההודעות המתות מראש. הפייפליין יוצר אותו.

שליחת הודעות ל-Kafka

אחרי שהמשימה של Dataflow מתחילה, אפשר לשלוח הודעות ל-Kafka, והצינור כותב אותן ל-BigQuery.

  1. יוצרים מכונה וירטואלית באותה רשת משנה שבה נמצא אשכול Kafka ומתקינים את כלי שורת הפקודה של Kafka. הוראות מפורטות מופיעות במאמר פרסום וצריכה של הודעות באמצעות ה-CLI בקטע הגדרת מכונת לקוח.

  2. מריצים את הפקודה הבאה כדי לכתוב הודעות לנושא Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    מחליפים את המשתנים הבאים:

    • TOPIC: השם של נושא Kafka
    • CLUSTER_ID: שם האשכול
    • LOCATION: האזור שבו נמצא האשכול
    • PROJECT_ID: השם של הפרויקט שלכם ב-Google Cloud Platform
  3. בהנחיה, מזינים את שורות הטקסט הבאות כדי לשלוח הודעות ל-Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

שימוש בתור להודעות שלא ניתן להעביר

בזמן שהמשימה פועלת, יכול להיות שהצינור לא יצליח לכתוב הודעות פרטיות ב-BigQuery. שגיאות אפשריות:

  • שגיאות בסריאליזציה, כולל JSON בפורמט שגוי.
  • שגיאות בהמרת סוגים, שנובעות מחוסר התאמה בין סכימת הטבלה לבין נתוני ה-JSON.
  • שדות נוספים בנתוני ה-JSON שלא מופיעים בסכימת הטבלה.

השגיאות האלה לא גורמות לכשל בעבודה, והן לא מופיעות כשגיאות ביומן העבודה של Dataflow. במקום זאת, צינור הנתונים משתמש בתור של הודעות שלא ניתן להעביר כדי לטפל בסוגי השגיאות האלה.

כדי להפעיל את תור ההודעות המתות כשמריצים את התבנית, מגדירים את פרמטרים התבנית הבאים:

  • useBigQueryDLQ: true
  • outputDeadletterTable: שם טבלה מלא ב-BigQuery, למשל my-project:dataset1.errors

הצינור יוצר את הטבלה באופן אוטומטי. אם מתרחשת שגיאה במהלך עיבוד הודעת Kafka, צינור הנתונים כותב רשומה של שגיאה לטבלה.

דוגמאות להודעות שגיאה:

סוג השגיאה נתוני האירוע errorMessage
שגיאת סריאליזציה ‫"Hello world" הסריאליזציה של ה-JSON לשורת טבלה נכשלה: Hello world
שגיאה בהמרת סוג {"name":"Emily","customer_id":"abc"} ‪{ "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
שדה לא ידוע ‪{"name":"Zoe","age":34} ‪{ "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", ‪"reason" : "invalid" } ], "index" : 0 }

עבודה עם סוגי נתונים ב-BigQuery

באופן פנימי, מחבר הקלט/פלט של Kafka ממיר מטען ייעודי (payload) של הודעות JSON לאובייקטים של Apache Beam‏ TableRow, ומתרגם את ערכי השדות TableRow לסוגים של BigQuery.

בטבלה הבאה מוצגות ייצוגי JSON של סוגי נתונים ב-BigQuery.

סוג BigQuery ייצוג ב-JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

מציינים את המיקום הגיאוגרפי באמצעות טקסט מוכר (WKT) או GeoJSON, בפורמט של מחרוזת. מידע נוסף זמין במאמר בנושא טעינת נתונים גיאו-מרחביים.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

משתמשים בשיטת JavaScript‏ Date.toJSON כדי לעצב את הערך.

נתונים מובְנים

אם הודעות ה-JSON שלכם פועלות לפי סכימה עקבית, אתם יכולים לייצג אובייקטים של JSON באמצעות סוג הנתונים STRUCT ב-BigQuery.

בדוגמה הבאה, השדה answers הוא אובייקט JSON עם שני שדות משנה, a ו-b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

הצהרת ה-SQL הבאה יוצרת טבלה ב-BigQuery עם סכימה תואמת:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

הטבלה שמתקבלת נראית כך:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

נתונים חצי מובנים

אם הודעות ה-JSON שלכם לא עומדות בסכימה מחמירה, כדאי לשמור אותן ב-BigQuery כסוג נתונים JSON. כשמאחסנים נתוני JSON כסוג JSON, לא צריך להגדיר את הסכימה מראש. אחרי הטמעת הנתונים, אפשר להריץ שאילתות על הנתונים באמצעות אופרטורים של גישה לשדה (סימון נקודה) וגישה למערך ב-GoogleSQL. מידע נוסף מופיע במאמר עבודה עם נתוני JSON ב-GoogleSQL.

שימוש בפונקציה מוגדרת על ידי המשתמש כדי לבצע טרנספורמציה של הנתונים

במדריך הזה אנחנו מניחים שההודעות ב-Kafka מעוצבות כ-JSON, ושהסכימה של טבלה ב-BigQuery תואמת לנתוני ה-JSON, בלי שבוצעו טרנספורמציות בנתונים.

אפשר גם לספק פונקציה בהגדרת המשתמש (UDF) ב-JavaScript שמבצעת טרנספורמציה של הנתונים לפני שהם נכתבים ב-BigQuery. הפונקציה UDF יכולה גם לבצע עיבוד נוסף, כמו סינון, הסרת פרטים אישיים מזהים (PII) או הוספת שדות לנתונים.

מידע נוסף זמין במאמר בנושא יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

המאמרים הבאים