כתיבה מ-Dataflow אל Apache Kafka

במאמר הזה מוסבר איך לכתוב נתונים מ-Dataflow אל Apache Kafka.

ברוב תרחישי השימוש, מומלץ להשתמש במחבר מנוהל של קלט/פלט כדי לכתוב ל-Kafka.

אם אתם צריכים כוונון ביצועים מתקדם יותר, כדאי להשתמש במחבר KafkaIO. מחבר KafkaIO זמין ל-Java או באמצעות מסגרת צינורות רב-לשוניים ל-Python ול-Go.

עיבוד בדיוק פעם אחת

כברירת מחדל, המחבר KafkaIO לא מספק סמנטיקה של כתיבה בדיוק פעם אחת. המשמעות היא שייתכן שהנתונים ייכתבו לנושא Kafka כמה פעמים. כדי להפעיל כתיבה בדיוק פעם אחת, צריך להריץ את שיטת withEOS. הכתיבה בדיוק פעם אחת מבטיחה שהנתונים ייכתבו לנושא היעד ב-Kafka בדיוק פעם אחת. עם זאת, היא גם מגדילה את עלות הצינור ומקטינה את התפוקה.

אם אין לכם דרישות מחמירות לגבי סמנטיקה של 'פעם אחת בדיוק', והלוגיקה בצינור יכולה לטפל ברשומות כפולות, כדאי להפעיל את מצב 'לפחות פעם אחת' לכל הצינור כדי להפחית את העלויות. מידע נוסף זמין במאמר בנושא הגדרת מצב הסטרימינג של צינור הנתונים.

הפייפליין מתרוקן

אם מרוקנים את צינור הנתונים, לא מובטח שהסמנטיקה תהיה בדיוק פעם אחת. הדבר היחיד שמובטח הוא שנתונים שאושרו לא יאבדו. כתוצאה מכך, יכול להיות שחלק מהנתונים יעברו עיבוד בזמן שהצינור מתרוקן, בלי שההיסטים של הקריאה יועברו בחזרה ל-Kafka. כדי להשיג סמנטיקה של 'לפחות פעם אחת' ב-Kafka כשמשנים צינור, מעדכנים את הצינור במקום לבטל את העבודה ולהתחיל עבודה חדשה.

התאמה של Kafka לסמנטיקה של 'פעם אחת בדיוק'

התאמה של transaction.max.timeout.ms ושל transactional.id.expiration.ms יכולה להשלים את האסטרטגיה הכוללת שלכם לסבילות לתקלות ולמסירה של כל הודעה בדיוק פעם אחת. עם זאת, ההשפעה שלהם תלויה באופי של ההפסקה הזמנית בשירות ובהגדרה הספציפית שלכם. כדאי להגדיר את הערך של transaction.max.timeout.ms קרוב לזמן השמירה של נושאי Kafka, כדי למנוע כפילויות בנתונים שנגרמות בגלל הפסקות בשירות של ברוקר Kafka.

אם ברוקר של Kafka לא זמין באופן זמני (לדוגמה, בגלל חלוקה של הרשת או כשל בצומת), ולמפיק יש טרנזקציות שמתבצעות, יכול להיות שחלון הזמן של הטרנזקציות האלה יפוג. הגדלת הערך של transaction.max.timeout.ms מאפשרת לעסקאות להסתיים אחרי שסוכן מתאושש, ויכולה למנוע את הצורך להפעיל מחדש עסקאות ולשלוח מחדש הודעות. הפתרון הזה עוזר לשמור על סמנטיקה של 'פעם אחת בדיוק' באופן עקיף, כי הוא מקטין את הסיכוי להודעות כפולות שנגרמות בגלל הפעלה מחדש של טרנזקציות. מצד שני, זמן תפוגה קצר יותר יכול לעזור לנקות מהר יותר מזהי עסקאות לא פעילים, וכך לצמצם את השימוש הפוטנציאלי במשאבים.

הגדרה של רשתות

כברירת מחדל, Dataflow מפעיל מופעים ברשת ברירת המחדל של הענן הווירטואלי הפרטי (VPC). בהתאם להגדרות של Kafka, יכול להיות שתצטרכו להגדיר רשת ורשת משנה שונות עבור Dataflow. מידע נוסף זמין במאמר ציון רשת ורשת משנה. כשמגדירים את הרשת, צריך ליצור כללי חומת אש שמאפשרים למכונות העובד של Dataflow להגיע לשרתי Kafka.

אם אתם משתמשים ב-VPC Service Controls, אתם צריכים למקם את אשכול Kafka בתוך גבולות הגזרה של VPC Service Controls, או להרחיב את גבולות הגזרה ל-VPN או ל-Cloud Interconnect המורשים.

אם אשכול Kafka נפרס מחוץ ל- Google Cloud, צריך ליצור חיבור רשת בין Dataflow לבין אשכול Kafka. יש כמה אפשרויות שונות לחיבור לרשת, שלכל אחת מהן יש יתרונות וחסרונות:

‫Dedicated Interconnect היא האפשרות הכי טובה לביצועים ולמהימנות צפויים, אבל יכול לקחת יותר זמן להגדיר אותה כי צדדים שלישיים צריכים להקצות את המעגלים החדשים. בטופולוגיה שמבוססת על כתובת IP ציבורית, אפשר להתחיל במהירות כי לא צריך לבצע הרבה עבודות ברשת.

בקטעים הבאים מפורטות האפשרויות האלה.

מרחב כתובות RFC 1918 משותף

גם Dedicated Interconnect וגם IPsec VPN מאפשרים לכם גישה ישירה לכתובות IP של RFC 1918 בענן הווירטואלי הפרטי (VPC), מה שיכול לפשט את ההגדרה של Kafka. אם אתם משתמשים בטופולוגיה שמבוססת על VPN, כדאי להגדיר VPN עם תפוקה גבוהה.

כברירת מחדל, Dataflow מפעיל מופעים ברשת ה-VPC שמוגדרת כברירת מחדל. בטופולוגיה של רשת פרטית עם מסלולים שמוגדרים באופן מפורש ב-Cloud Router שמקשרים בין רשתות משנה ב- Google Cloud לאותו אשכול Kafka, אתם צריכים יותר שליטה במיקום של מופעי Dataflow. אפשר להשתמש ב-Dataflow כדי להגדיר את network וsubnetwork פרמטרים של ביצוע.

חשוב לוודא שיש מספיק כתובות IP זמינות ברשת המשנה המתאימה, כדי שמערכת Dataflow תוכל להפעיל מופעים כשהיא מנסה להרחיב את קנה המידה. בנוסף, כשיוצרים רשת נפרדת להפעלת מכונות Dataflow, צריך לוודא שיש כלל חומת אש שמאפשר תעבורת TCP בין כל המכונות הווירטואליות בפרויקט. כלל חומת האש הזה כבר מוגדר ברשת שמוגדרת כברירת מחדל.

מרחב כתובות IP ציבוריות

הארכיטקטורה הזו משתמשת בפרוטוקול Transport Layer Security ‏(TLS) כדי לאבטח את התעבורה בין לקוחות חיצוניים לבין Kafka, ומשתמשת בתעבורה לא מוצפנת לתקשורת בין ברוקרים. אם מאזין Kafka נקשר לממשק רשת שמשמש לתקשורת פנימית וחיצונית, קל להגדיר את המאזין. עם זאת, בתרחישים רבים, הכתובות שמפורסמות חיצונית של ברוקרי Kafka באשכול שונות מממשקי הרשת הפנימיים שבהם Kafka משתמש. בתרחישים כאלה, אפשר להשתמש במאפיין advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

לקוחות חיצוניים מתחברים באמצעות יציאה 9093 דרך ערוץ SSL, ולקוחות פנימיים מתחברים באמצעות יציאה 9092 דרך ערוץ טקסט לא מוצפן. כשמציינים כתובת בקטע advertised.listeners, צריך להשתמש בשמות DNS (kafkabroker-n.mydomain.com בדוגמה הזו) שמפנים לאותו מופע גם לתנועה חיצונית וגם לתנועה פנימית. יכול להיות ששימוש בכתובות IP ציבוריות לא יפעל כי יכול להיות שהכתובות לא יצליחו לפתור בעיות בתנועה פנימית.

רישום ביומן

הרישום ביומן מ-KafkaIO יכול להיות מפורט מאוד. כדאי לשקול להפחית את רמת הרישום ביומן בייצור באופן הבא:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

מידע נוסף זמין במאמר בנושא הגדרת רמות יומן של עובדי צינורות.

המאמרים הבאים