מחבר יעד Kafka Connect Bigtable


מחברים של יעד הם תוספים למסגרת Kafka Connect שאפשר להשתמש בהם כדי להזרים נתונים מ-Kafka ישירות למערכות אחרות לצורך אחסון ועיבוד. ‫Kafka Connect Bigtable sink הוא מחבר ייעודי שנועד להזרים נתונים ל-Bigtable בזמן אמת עם השהיה מינימלית.

בדף הזה מוסבר על התכונות והמגבלות של המחבר. הוא גם מספק דוגמאות לשימוש בתרחישים מתקדמים עם Single Message Transforms (SMTs) ויצירה אוטומטית של טבלאות. הוראות להתקנה ומסמכי עזר מלאים זמינים במאגר של Kafka Connect Bigtable Sink Connector.

תכונות

מחבר היעד של Bigtable נרשם לנושאים של Kafka, קורא את ההודעות שמתקבלות בנושאים האלה ואז כותב את הנתונים לטבלאות של Bigtable. בקטעים הבאים מופיעה סקירה כללית של כל תכונה. פרטים על השימוש מופיעים בקטע הגדרות במסמך הזה.

מיפוי מקשים, כלי SMT וממירים

כדי לכתוב נתונים לטבלת Bigtable, צריך לספק מפתח שורה ייחודי, קבוצת עמודות ושם עמודה לכל פעולה. המידע הזה מוסק מהשדות בהודעות Kafka. אפשר ליצור את כל המזהים הנדרשים באמצעות הגדרות כמו row.key.definition,‏ row.key.delimiter או default.column.family.

יצירת טבלה אוטומטית

אתם יכולים להשתמש בהגדרות auto.create.tables ו-auto.create.column.families כדי ליצור באופן אוטומטי טבלאות יעד ומשפחות עמודות אם הן לא קיימות ביעד Bigtable. הגמישות הזו מגיעה עם עלות מסוימת בביצועים, ולכן בדרך כלל מומלץ ליצור קודם את הטבלאות שבהן רוצים להזרים נתונים.

מצבי כתיבה ומחיקת שורות

כשכותבים לטבלה, אפשר להחליף את הנתונים לגמרי אם שורה כבר קיימת, או לבחור לבטל את הפעולה באמצעות ההגדרה insert.mode. אפשר להשתמש בהגדרה הזו בשילוב עם טיפול בשגיאות של תור הודעות להמתנה (DLQ) כדי להשיג את ההבטחה של לפחות מסירה אחת.

כדי להנפיק פקודות DELETE, צריך להגדיר את המאפיין value.null.mode. אפשר להשתמש בו כדי למחוק שורות שלמות, משפחות של עמודות או עמודות ספציפיות.

תור להודעות ללא מוצא

מגדירים את הנכס errors.deadletterqueue.topic.name ומגדירים את errors.tolerance=all כך שיפרסם הודעות שלא ניתן לעבד בנושא של תור ההודעות המתות.

תאימות למחבר Confluent Platform Bigtable Sink

מחבר היעד Bigtable Kafka Connect של Google Cloud מציע שוויון מלא עם מחבר היעד Bigtable של Confluent Platform בניהול עצמי. אתם יכולים להשתמש בקובץ ההגדרות הקיים של מחבר Confluent Platform, על ידי שינוי ההגדרה connector.class ל-connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

מגבלות

ההגבלות הבאות חלות:

  • בשלב הזה, מחבר היעד של Kafka Connect Bigtable נתמך רק באשכולות Kafka שבהם אפשר להתקין מחברים באופן עצמאי (אשכולות Kafka בניהול עצמי או מקומי). המחבר הזה לא נתמך כרגע בשירות המנוהל של Google Cloud ל-Apache Kafka.

  • מחבר זה יכול ליצור משפחות של עמודות ועמודות משמות של שדות עד לשני רמות של קינון:

    • מבנים מוטבעים ברמה עמוקה יותר משתי רמות מומרים ל-JSON ונשמרים בעמודת ההורה שלהם.
    • מבני נתונים ברמת הבסיס הופכים לקבוצות עמודות. השדות במבני הנתונים האלה הופכים לשמות של עמודות.
    • ערכים פרימיטיביים ברמת הבסיס נשמרים כברירת מחדל במשפחת עמודות שמשתמשת בנושא Kafka כשם שלה. השמות של העמודות במשפחה הזו זהים לשמות השדות. אפשר לשנות את ההתנהגות הזו באמצעות ההגדרות default.column.family ו-default.column.qualifier.

התקנה

כדי להתקין את המחבר הזה, פועלים לפי שלבי ההתקנה הרגילים: מבצעים build של הפרויקט באמצעות Maven, מעתיקים את הקבצים .jar לספריית הפלאגינים של Kafka Connect ויוצרים את קובץ ההגדרות. הוראות מפורטות מופיעות בקטע הפעלת המחבר במאגר.

הגדרות אישיות

כדי להגדיר מחברים של Kafka Connect, צריך לכתוב קובצי הגדרה. מחבר היעד (sink) של Bigtable Kafka Connect מבית Google Cloud תומך בכל מאפייני המחבר הבסיסיים של Kafka, וגם בכמה שדות נוספים שמותאמים לעבודה עם טבלאות Bigtable.

בקטעים הבאים מופיעות דוגמאות מפורטות לתרחישי שימוש מתקדמים יותר, אבל לא מתוארות בהם כל ההגדרות הזמינות. דוגמאות לשימוש בסיסי ורשימה מלאה של המאפיינים זמינות במאגר של Kafka Connect Bigtable Sink Connector.

דוגמה: יצירה של מפתח שורה גמיש ומשפחת עמודות

תרחיש לדוגמה

הודעות Kafka הנכנסות מכילות פרטים על הזמנות בשופינג עם מזהי משתמשים. אתם רוצים לכתוב כל הזמנה לשורה עם שתי משפחות עמודות: אחת לפרטי המשתמש ואחת לפרטי ההזמנה.

פורמט הודעת Kafka במקור

כדי להשיג את המבנה הבא, צריך לעצב את ההודעות של Kafka שפורסמו בנושא באמצעות התג JsonConverter:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
שורה צפויה ב-Bigtable

רוצים לכתוב כל הודעה כשורה ב-Bigtable עם המבנה הבא:

מפתח שורה contact_details order_details
name טלפון אימייל orderId פריטים הנחה
user123#order123 user123 800‑555‑0199 business@example.com order123 ‪["itemUUID1", "itemUUID2"] 0.2
הגדרת מחבר
כדי להשיג את התוצאה הרצויה, כותבים את קובץ התצורה הבא:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
התוצאות של השימוש בקובץ הזה הן:
  • row.key.definition=user,order.id היא רשימה מופרדת בפסיקים של השדות שרוצים להשתמש בהם כדי ליצור את מפתח השורה. כל רשומה מחוברת עם קבוצת התווים בהגדרה row.key.delimiter.

    כשמשתמשים ב-row.key.definition, כל ההודעות צריכות להשתמש באותה סכימה. אם אתם צריכים לעבד הודעות עם מבנים שונים בעמודות שונות או במשפחות עמודות שונות, מומלץ ליצור מופעים נפרדים של מחבר. מידע נוסף מופיע בקטע דוגמה: כתיבת הודעות למספר טבלאות במסמך הזה.

  • שמות של משפחות עמודות ב-Bigtable מבוססים על השמות של מבני נתונים ברמת הבסיס (root) שאינם null. לדוגמה:

    • הערכים של פרטי יצירת הקשר הם סוגי נתונים פרימיטיביים ברמת הבסיס, ולכן צריך לצבור אותם למשפחת עמודות שמוגדרת כברירת מחדל באמצעות ההגדרה default.column.family=contact_details.
    • פרטי ההזמנה כבר כלולים באובייקט order, אבל רוצים להשתמש ב-order_details כשם של קבוצת עמודות. כדי לעשות את זה, משתמשים ב-ReplaceFields SMT ומשנים את השם של השדה.

דוגמה: יצירה אוטומטית של טבלה וכתיבות אידמפוטנטיות

תרחיש לדוגמה

ההודעות הנכנסות שלכם ב-Kafka מכילות פרטים על הזמנות קניות. הלקוחות יכולים לערוך את סל הקניות שלהם לפני השלמת ההזמנה, ולכן אתם צפויים לקבל הודעות המשך עם הזמנות ששונו, שצריך לשמור כעדכונים באותה שורה. בנוסף, אי אפשר להבטיח שטבלת היעדים קיימת בזמן הכתיבה, ולכן כדאי שהמחבר ייצור את הטבלה באופן אוטומטי אם היא לא קיימת.

הגדרת מחבר
כדי להשיג את התוצאה הרצויה, כותבים את קובץ התצורה הבא:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

דוגמה: כתיבת הודעות לכמה טבלאות

תרחיש לדוגמה

ההודעות הנכנסות של Kafka מכילות פרטים על הזמנות קניות מערוצי הפצה שונים. ההודעות האלה מתפרסמות בנושאים שונים, ואתם רוצים להשתמש באותו קובץ הגדרות כדי לכתוב אותן בטבלאות נפרדות.

הגדרת מחבר

אפשר לכתוב את ההודעות לכמה טבלאות, אבל אם משתמשים בקובץ הגדרה אחד להגדרה, כל הודעה חייבת להשתמש באותה סכימה. אם אתם צריכים לעבד הודעות מנושאים שונים לעמודות או למשפחות נפרדות, מומלץ ליצור מופעים נפרדים של מחבר.

כדי להשיג את התוצאה הרצויה, כותבים את קובץ ההגדרה הבא:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

בגישה הזו, משתמשים במאפיין table.name.format=orders_${topic} כדי להפנות באופן דינמי לכל שם של נושא Kafka. כשמגדירים כמה שמות של נושאים באמצעות ההגדרה topics=shopping_topic_store1,shopping_topic_store2, כל הודעה נכתבת בטבלה נפרדת:

  • ההודעות מהנושא shopping_topic_store1 נכתבות לטבלה orders_shopping_topic_store1.
  • ההודעות מהנושא shopping_topic_store2 נכתבות לטבלה orders_shopping_topic_store2.

המאמרים הבאים