בדף הזה מוסבר איך להשתמש ב-Dataflow כדי לקרוא נתונים מ-שירות מנוהל של Google Cloud ל-Apache Kafka ולכתוב את הרשומות בטבלה ב-BigQuery. במדריך הזה משתמשים בתבנית Apache Kafka ל-BigQuery כדי ליצור את משימת Dataflow.
סקירה כללית
Apache Kafka היא פלטפורמה בקוד פתוח להזרמת אירועים. בדרך כלל משתמשים ב-Kafka בארכיטקטורות מבוזרות כדי לאפשר תקשורת בין רכיבים בצימוד חלש. אפשר להשתמש ב-Dataflow כדי לקרוא אירועים מ-Kafka, לעבד אותם ולכתוב את התוצאות בטבלה ב-BigQuery לצורך ניתוח נוסף.
השירות המנוהל ל-Apache Kafka הוא שירות של Google Cloud Platform שעוזר לכם להפעיל אשכולות מאובטחים וניתנים להרחבה של Kafka.
ההרשאות הנדרשות
לחשבון השירות של העובד (worker) ב-Dataflow צריכים להיות התפקידים הבאים בניהול הזהויות והרשאות הגישה (IAM):
- לקוח Managed Kafka (
roles/managedkafka.client) - עריכה של נתוני BigQuery (
roles/bigquery.dataEditor)
מידע נוסף זמין במאמר בנושא אבטחה והרשאות ב-Dataflow.
יצירת אשכול Kafka
בשלב הזה יוצרים אשכול של שירות מנוהל ל-Apache Kafka. למידע נוסף, ראו יצירת אשכול של שירות מנוהל ל-Apache Kafka.
המסוף
עוברים אל הדף שירות מנוהל ל-Apache Kafka > Clusters.
לוחצים על יצירה.
בתיבה שם האשכול, מזינים שם לאשכול.
ברשימה Region, בוחרים מיקום לאשכול.
לוחצים על יצירה.
gcloud
משתמשים בפקודה managed-kafka clusters create.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
מחליפים את מה שכתוב בשדות הבאים:
-
CLUSTER: שם לאשכול -
REGION: האזור שבו יצרתם את רשת המשנה -
PROJECT_ID: מזהה הפרויקט -
SUBNET_NAME: תת-הרשת שבה רוצים לפרוס את האשכול
יצירת אשכול נמשכת בדרך כלל 20-30 דקות.
יצירת נושא Kafka
אחרי שיוצרים את האשכול של השירות המנוהל ל-Apache Kafka, יוצרים נושא.
המסוף
עוברים אל הדף שירות מנוהל ל-Apache Kafka > Clusters.
לוחצים על שם האשכול.
בדף הפרטים של האשכול, לוחצים על Create Topic.
בתיבה שם הנושא, מזינים שם לנושא.
לוחצים על יצירה.
gcloud
משתמשים בפקודה managed-kafka topics create.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
מחליפים את מה שכתוב בשדות הבאים:
-
TOPIC_NAME: השם של הנושא שרוצים ליצור
יצירת טבלה ב-BigQuery
בשלב הזה יוצרים טבלה ב-BigQuery עם הסכימה הבאה:
| שם העמודה | סוג נתונים |
|---|---|
name |
STRING |
customer_id |
INTEGER |
אם עדיין אין לכם מערך נתונים ב-BigQuery, תצטרכו ליצור אותו קודם. מידע נוסף זמין במאמר יצירת מערכי נתונים. לאחר מכן יוצרים טבלה ריקה חדשה:
המסוף
עוברים לדף BigQuery.
בחלונית Explorer מרחיבים את הפרויקט ואז בוחרים מערך נתונים.
בקטע Dataset info, לוחצים על Create table.
ברשימה יצירת טבלה מתוך, בוחרים באפשרות טבלה ריקה.
בתיבה Table (טבלה), מזינים את שם הטבלה.
בקטע Schema, לוחצים על Edit as text (עריכה כטקסט).
מדביקים את הגדרת הסכימה הבאה:
name:STRING, customer_id:INTEGERלוחצים על יצירת טבלה.
gcloud
משתמשים בפקודה bq mk.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט -
DATASET_NAME: השם של מערך הנתונים -
TABLE_NAME: השם של הטבלה שרוצים ליצור
הפעלת המשימה ב-Dataflow
אחרי שיוצרים את אשכול Kafka ואת הטבלה ב-BigQuery, מריצים את תבנית Dataflow.
המסוף
קודם צריך לקבל את כתובת שרת האתחול של האשכול:
נכנסים לדף Clusters במסוף Google Cloud .
לוחצים על שם האשכול.
לוחצים על הכרטיסייה Configurations.
מעתיקים את כתובת השרת של ה-bootstrap מהשדה Bootstrap URL.
בשלב הבא, מריצים את התבנית כדי ליצור את משימת Dataflow:
עוברים לדף Dataflow > משימות.
לוחצים על יצירת עבודה מתבנית.
בשדה Job Name, מזינים
kafka-to-bq.בקטע Regional endpoint (נקודת קצה אזורית), בוחרים את האזור שבו נמצא האשכול של השירות המנוהל ל-Apache Kafka.
בוחרים בתבנית Kafka to BigQuery.
מזינים את פרמטרים התבנית הבאים:
- שרת bootstrap של Kafka: כתובת שרת ה-bootstrap
- נושא Kafka של המקור: שם הנושא לקריאה
- מצב אימות של מקור Kafka:
APPLICATION_DEFAULT_CREDENTIALS - פורמט ההודעה ב-Kafka:
JSON - Table name strategy:
SINGLE_TABLE_NAME - טבלת הפלט של BigQuery: טבלה ב-BigQuery, בפורמט הבא:
PROJECT_ID:DATASET_NAME.TABLE_NAME
בקטע תור הודעות שלא ניתן להעביר, מסמנים את האפשרות כתיבת שגיאות ל-BigQuery.
מזינים שם של טבלה ב-BigQuery לתור של הודעות שלא ניתן להעביר, בפורמט הבא:
PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAMEאל תיצרו את הטבלה מראש. הפייפליין יוצר אותו.
לוחצים על הפעלת העבודה.
gcloud
משתמשים בפקודה dataflow flex-template run.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
מחליפים את המשתנים הבאים:
-
LOCATION: האזור שבו נמצא השירות המנוהל ל-Apache Kafka -
PROJECT_ID: השם של הפרויקט שלכם ב-Google Cloud Platform -
CLUSTER_ID: השם של האשכול -
TOPIC: השם של נושא Kafka -
DATASET_NAME: השם של מערך הנתונים -
TABLE_NAME: שם הטבלה -
ERROR_TABLE_NAME: שם הטבלה ב-BigQuery של תור ההודעות המתות
לא כדאי ליצור את הטבלה של תור ההודעות המתות מראש. הפייפליין יוצר אותו.
שליחת הודעות ל-Kafka
אחרי שהמשימה של Dataflow מתחילה, אפשר לשלוח הודעות ל-Kafka, והצינור כותב אותן ל-BigQuery.
יוצרים מכונה וירטואלית באותה רשת משנה שבה נמצא אשכול Kafka ומתקינים את כלי שורת הפקודה של Kafka. הוראות מפורטות מופיעות במאמר פרסום וצריכה של הודעות באמצעות ה-CLI בקטע הגדרת מכונת לקוח.
מריצים את הפקודה הבאה כדי לכתוב הודעות לנושא Kafka:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
מחליפים את המשתנים הבאים:
-
TOPIC: השם של נושא Kafka -
CLUSTER_ID: שם האשכול -
LOCATION: האזור שבו נמצא האשכול -
PROJECT_ID: השם של הפרויקט שלכם ב-Google Cloud Platform
-
בהנחיה, מזינים את שורות הטקסט הבאות כדי לשלוח הודעות ל-Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
שימוש בתור להודעות שלא ניתן להעביר
בזמן שהמשימה פועלת, יכול להיות שהצינור לא יצליח לכתוב הודעות פרטיות ב-BigQuery. שגיאות אפשריות:
- שגיאות בסריאליזציה, כולל JSON בפורמט שגוי.
- שגיאות בהמרת סוגים, שנובעות מחוסר התאמה בין סכימת הטבלה לבין נתוני ה-JSON.
- שדות נוספים בנתוני ה-JSON שלא מופיעים בסכימת הטבלה.
השגיאות האלה לא גורמות לכשל בעבודה, והן לא מופיעות כשגיאות ביומן העבודה של Dataflow. במקום זאת, צינור הנתונים משתמש בתור של הודעות שלא ניתן להעביר כדי לטפל בסוגי השגיאות האלה.
כדי להפעיל את תור ההודעות המתות כשמריצים את התבנית, מגדירים את פרמטרים התבנית הבאים:
useBigQueryDLQ:true-
outputDeadletterTable: שם טבלה מלא ב-BigQuery, למשלmy-project:dataset1.errors
הצינור יוצר את הטבלה באופן אוטומטי. אם מתרחשת שגיאה במהלך עיבוד הודעת Kafka, צינור הנתונים כותב רשומה של שגיאה לטבלה.
דוגמאות להודעות שגיאה:
| סוג השגיאה | נתוני האירוע | errorMessage |
|---|---|---|
| שגיאת סריאליזציה | "Hello world" | הסריאליזציה של ה-JSON לשורת טבלה נכשלה: Hello world |
| שגיאה בהמרת סוג | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
| שדה לא ידוע | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
עבודה עם סוגי נתונים ב-BigQuery
באופן פנימי, מחבר הקלט/פלט של Kafka ממיר מטען ייעודי (payload) של הודעות JSON לאובייקטים של Apache Beam TableRow, ומתרגם את ערכי השדות TableRow לסוגים של BigQuery.
בטבלה הבאה מוצגות ייצוגי JSON של סוגי נתונים ב-BigQuery.
| סוג BigQuery | ייצוג ב-JSON |
|---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)"מציינים את המיקום הגיאוגרפי באמצעות טקסט מוכר (WKT) או GeoJSON, בפורמט של מחרוזת. מידע נוסף זמין במאמר בנושא טעינת נתונים גיאו-מרחביים. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z"משתמשים בשיטת JavaScript |
נתונים מובְנים
אם הודעות ה-JSON שלכם פועלות לפי סכימה עקבית, אתם יכולים לייצג אובייקטים של JSON באמצעות סוג הנתונים STRUCT ב-BigQuery.
בדוגמה הבאה, השדה answers הוא אובייקט JSON עם שני שדות משנה, a ו-b:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
הצהרת ה-SQL הבאה יוצרת טבלה ב-BigQuery עם סכימה תואמת:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
הטבלה שמתקבלת נראית כך:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
נתונים חצי מובנים
אם הודעות ה-JSON שלכם לא עומדות בסכימה מחמירה, כדאי לשמור אותן ב-BigQuery כסוג נתונים JSON.
כשמאחסנים נתוני JSON כסוג JSON, לא צריך להגדיר את הסכימה מראש. אחרי הטמעת הנתונים, אפשר להריץ שאילתות על הנתונים באמצעות אופרטורים של גישה לשדה (סימון נקודה) וגישה למערך ב-GoogleSQL. מידע נוסף מופיע במאמר עבודה עם נתוני JSON ב-GoogleSQL.
שימוש בפונקציה מוגדרת על ידי המשתמש כדי לבצע טרנספורמציה של הנתונים
במדריך הזה אנחנו מניחים שההודעות ב-Kafka מעוצבות כ-JSON, ושהסכימה של טבלה ב-BigQuery תואמת לנתוני ה-JSON, בלי שבוצעו טרנספורמציות בנתונים.
אפשר גם לספק פונקציה בהגדרת המשתמש (UDF) ב-JavaScript שמבצעת טרנספורמציה של הנתונים לפני שהם נכתבים ב-BigQuery. הפונקציה UDF יכולה גם לבצע עיבוד נוסף, כמו סינון, הסרת פרטים אישיים מזהים (PII) או הוספת שדות לנתונים.
מידע נוסף זמין במאמר בנושא יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.