במאמר הזה מוסבר איך לשלב בין Apache Kafka ל-Pub/Sub באמצעות Pub/Sub Group Kafka Connector.
מידע על Pub/Sub Group Kafka Connector
Apache Kafka היא פלטפורמת קוד פתוח לסטרימינג של אירועים. הוא משמש בדרך כלל בארכיטקטורות מבוזרות כדי לאפשר תקשורת בין רכיבים עם צימוד רופף. Pub/Sub הוא שירות מנוהל לשליחה ולקבלה של הודעות באופן אסינכרוני. בדומה ל-Kafka, אפשר להשתמש ב-Pub/Sub כדי לתקשר בין רכיבים בארכיטקטורת הענן.
המחבר Pub/Sub Group Kafka מאפשר לשלב בין שתי המערכות האלה. המחברים הבאים כלולים בחבילת ה-JAR של המחבר:
- מחבר היעד קורא רשומות מנושא אחד או יותר ב-Kafka ומפרסם אותן ב-Pub/Sub.
- מחבר המקור קורא הודעות מנושא Pub/Sub ומפרסם אותן ב-Kafka.
הנה כמה תרחישים שבהם כדאי להשתמש ב-Pub/Sub Group Kafka Connector:
- אתם מעבירים ארכיטקטורה מבוססת-Kafka אל Google Cloud.
- יש לכם מערכת קצה קדמי שמאחסנת אירועים ב-Kafka מחוץ ל-Google Cloud, אבל אתם גם משתמשים ב-Google Cloudכדי להפעיל חלק משירותי הקצה העורפי שלכם, שצריכים לקבל את האירועים ב-Kafka. Google Cloud
- אתם אוספים יומנים מפתרון Kafka מקומי ושולחים אותם אל Google Cloud לניתוח נתונים.
- יש לכם מערכת frontend שמשתמשת ב- Google Cloud, אבל אתם גם מאחסנים נתונים באופן מקומי באמצעות Kafka.
המחבר דורש Kafka Connect, שהיא מסגרת להזרמת נתונים בין Kafka למערכות אחרות. כדי להשתמש במחבר, צריך להריץ את Kafka Connect לצד אשכול Kafka.
ההנחה במאמר הזה היא שאתם מכירים את Kafka ואת Pub/Sub. לפני שקוראים את המאמר הזה, מומלץ לעיין באחד ממדריכי ההתחלה המהירה של Pub/Sub.
המחבר של Pub/Sub לא תומך בשילוב בין Google Cloud IAM לבין רשימות ACL של Kafka Connect.
איך מתחילים להשתמש בכלי לחיבור
בקטע הזה מוסבר איך לבצע את המשימות הבאות:- מגדירים את Pub/Sub Group Kafka Connector.
- שליחת אירועים מ-Kafka אל Pub/Sub.
- שליחת הודעות מ-Pub/Sub ל-Kafka.
דרישות מוקדמות
התקנת Kafka
פועלים לפי המדריך לתחילת העבודה עם Apache Kafka כדי להתקין Kafka עם צומת יחיד במחשב המקומי. מבצעים את השלבים הבאים במדריך למתחילים:
- מורידים את הגרסה האחרונה של Kafka ומחלצים אותה.
- מפעילים את סביבת Kafka.
- יוצרים נושא Kafka.
אמת
מחבר Pub/Sub Group Kafka צריך לעבור אימות ב-Pub/Sub כדי לשלוח ולקבל הודעות Pub/Sub. כדי להגדיר אימות, מבצעים את השלבים הבאים:
- נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Google Cloud פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Google Cloud פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Google Cloud שיוצרים. -
בוחרים את הפרויקט שיצרתם: Google Cloud
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Google Cloud .
-
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. -
USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה,myemail@example.com. -
ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
-
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Google Cloud פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Google Cloud פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Google Cloud שיוצרים. -
בוחרים את הפרויקט שיצרתם: Google Cloud
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Google Cloud .
-
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
-
PROJECT_ID: מזהה הפרויקט. -
USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה,myemail@example.com. -
ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
-
הורדת קובץ ה-JAR של המחבר
מורידים את קובץ ה-JAR של המחבר למחשב המקומי. מידע נוסף זמין במאמר Acquire the connector בקובץ ה-README ב-GitHub.
העתקת קובצי התצורה של המחבר
משכפלים או מורידים את מאגר GitHub של המחבר.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connectorמעתיקים את התוכן של הספרייה
configלספריית המשנהconfigבהתקנת Kafka.cp config/* [path to Kafka installation]/config/
הקבצים האלה מכילים הגדרות תצורה של המחבר.
עדכון ההגדרה של Kafka Connect
- עוברים אל הספרייה שמכילה את קובץ ה-binary של Kafka Connect שהורדתם.
- בספרייה הבינארית של Kafka Connect, פותחים את הקובץ בשם
config/connect-standalone.propertiesבכלי לעריכת טקסט. - אם השורה
plugin.path propertyמופיעה כהערה, צריך לבטל את ההערה. מעדכנים את
plugin.path propertyכך שיכלול את הנתיב לקובץ ה-JAR של המחבר.דוגמה:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jarמגדירים את המאפיין
offset.storage.file.filenameלשם של קובץ מקומי. במצב עצמאי, Kafka משתמש בקובץ הזה כדי לאחסן נתוני היסט.דוגמה:
offset.storage.file.filename=/tmp/connect.offsets
העברת אירועים מ-Kafka ל-Pub/Sub
בקטע הזה מוסבר איך להפעיל את מחבר היעד, לפרסם אירועים ב-Kafka ואז לקרוא את ההודעות שהועברו מ-Pub/Sub.
משתמשים ב-Google Cloud CLI כדי ליצור נושא Pub/Sub עם מינוי.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
מחליפים את מה שכתוב בשדות הבאים:
- PUBSUB_TOPIC: השם של נושא ב-Pub/Sub שאליו יתקבלו ההודעות מ-Kafka.
- PUBSUB_SUBSCRIPTION: השם של מינוי Pub/Sub לנושא.
פותחים את הקובץ
/config/cps-sink-connector.propertiesבכלי לעריכת טקסט. מוסיפים ערכים למאפיינים הבאים, שמסומנים ב-"TODO"בהערות:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
מחליפים את מה שכתוב בשדות הבאים:
- KAFKA_TOPICS: רשימה מופרדת בפסיקים של נושאי Kafka לקריאה.
- PROJECT_ID: הפרויקט Google Cloud שכולל את נושא ה-Pub/Sub.
- PUBSUB_TOPIC: נושא Pub/Sub לקבלת ההודעות מ-Kafka.
מריצים את הפקודה הבאה מהספרייה של Kafka:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.propertiesפועלים לפי השלבים במאמר Apache Kafka quickstart כדי לכתוב כמה אירועים לנושא Kafka.
משתמשים ב-CLI של gcloud כדי לקרוא את האירועים מ-Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
העברת הודעות מ-Pub/Sub ל-Kafka
בקטע הזה מוסבר איך להפעיל את מחבר המקור, לפרסם הודעות ב-Pub/Sub ולקרוא את ההודעות שהועברו מ-Kafka.
משתמשים ב-CLI של gcloud כדי ליצור נושא Pub/Sub עם מינוי.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
מחליפים את מה שכתוב בשדות הבאים:
- PUBSUB_TOPIC: השם של נושא Pub/Sub.
- PUBSUB_SUBSCRIPTION: השם של מינוי Pub/Sub.
פותחים את הקובץ שנקרא
/config/cps-source-connector.propertiesבכלי לעריכת טקסט. מוסיפים ערכים למאפיינים הבאים, שמסומנים ב-"TODO"בתגובות:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
מחליפים את מה שכתוב בשדות הבאים:
- KAFKA_TOPIC: נושאי Kafka לקבלת הודעות Pub/Sub.
- PROJECT_ID: הפרויקט Google Cloud שכולל את נושא ה-Pub/Sub.
- PUBSUB_TOPIC: נושא ה-Pub/Sub.
מריצים את הפקודה הבאה מהספרייה של Kafka:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.propertiesמשתמשים ב-CLI של gcloud כדי לפרסם הודעה ב-Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
קוראים את ההודעה מ-Kafka. כדי לקרוא את ההודעות מנושא Kafka, פועלים לפי השלבים שבמדריך למתחילים של Apache Kafka.
המרה מהודעת טקסט
רשומה ב-Kafka מכילה מפתח וערך, שהם מערכים של בייטים באורך משתנה. אופציונלית, רשומת Kafka יכולה לכלול גם כותרות, שהן צמדי מפתח-ערך. הודעת Pub/Sub מורכבת משני חלקים עיקריים: גוף ההודעה ואפס או יותר מאפיינים של צמדי מפתח/ערך.
Kafka Connect משתמש בממירים כדי לבצע סריאליזציה של מפתחות וערכים אל Kafka וממנו. כדי לשלוט בסריאליזציה, מגדירים את המאפיינים הבאים בקובצי התצורה של המחבר:
-
key.converter: הכלי להמרה שמשמש לסדר את מפתחות הרשומות. -
value.converter: הממיר שמשמש לסריאליזציה של ערכי רשומות.
גוף ההודעה ב-Pub/Sub הוא אובייקט ByteString, ולכן ההמרה הכי יעילה היא העתקה ישירה של מטען הייעודי (payload). לכן, מומלץ להשתמש בממיר שמפיק טיפוסי נתונים פרימיטיביים (מספר שלם, מספר עשרוני, מחרוזת או סכימת בייטים) איפה שאפשר, כדי למנוע ביטול סריאליזציה וסריאליזציה מחדש של אותו גוף הודעה.
המרת נתונים מ-Kafka ל-Pub/Sub
מחבר היעד ממיר רשומות של Kafka להודעות Pub/Sub באופן הבא:
- מפתח הרשומה של Kafka מאוחסן כמאפיין בשם
"key"בהודעת Pub/Sub. - כברירת מחדל, המחבר משמיט את כל הכותרות ברשומה של Kafka. עם זאת, אם מגדירים את אפשרות ההגדרה
headers.publishלערךtrue, המחבר כותב את הכותרות כמאפיינים של Pub/Sub. המחבר מדלג על כותרות שחורגות מהמגבלות על מאפייני הודעות ב-Pub/Sub. - בסכימות של מספרים שלמים, מספרים עשרוניים, מחרוזות ובייטים, המחבר מעביר את הבייטים של ערך הרשומה ב-Kafka ישירות לגוף ההודעה ב-Pub/Sub.
- בסכימות של מבנים, המחבר כותב כל שדה כמאפיין של הודעת Pub/Sub. לדוגמה, אם השדה הוא
{ "id"=123 }, ההודעה שמתקבלת ב-Pub/Sub כוללת את המאפיין"id"="123". ערך השדה תמיד מומר למחרוזת. סוגי המפות והמבנים לא נתמכים כסוגי שדות בתוך מבנה. - בסכימות של מפות, המחבר כותב כל צמד מפתח/ערך כמאפיין של הודעת Pub/Sub. לדוגמה, אם המיפוי הוא
{"alice"=1,"bob"=2}, להודעת Pub/Sub שמתקבלת יש שני מאפיינים,"alice"="1"ו-"bob"="2". המפתחות והערכים מומרים למחרוזות.
לסכימות של מבנים ומיפוי יש כמה התנהגויות נוספות:
אפשר גם לציין שדה מסוים במבנה או מפתח מיפוי שיהיה גוף ההודעה, על ידי הגדרת מאפיין ההגדרה
messageBodyName. הערך של השדה או המפתח מאוחסן כ-ByteStringבגוף ההודעה. אם לא מגדירים אתmessageBodyName, גוף ההודעה ריק בסכימות של struct ו-map.עבור ערכי מערך, המחבר תומך רק בסוגי מערך פרימיטיביים. רצף הערכים במערך משורשר לאובייקט
ByteStringיחיד.
המרת נתונים מ-Pub/Sub ל-Kafka
מחבר המקור ממיר הודעות Pub/Sub לרשומות Kafka באופן הבא:
מפתח רשומה של Kafka: כברירת מחדל, המפתח מוגדר ל-
null. לחלופין, אפשר לציין מאפיין של הודעת Pub/Sub לשימוש כמפתח, על ידי הגדרת אפשרות ההגדרהkafka.key.attribute. במקרה כזה, המחבר מחפש מאפיין עם השם הזה ומגדיר את מפתח הרשומה לערך המאפיין. אם המאפיין שצוין לא קיים, מפתח הרשומה מוגדר ל-null.ערך הרשומה ב-Kafka. המחבר כותב את ערך הרשומה באופן הבא:
אם להודעת Pub/Sub אין מאפיינים מותאמים אישית, המחבר כותב את גוף ההודעה של Pub/Sub ישירות לערך הרשומה של Kafka כסוג
byte[], באמצעות הממיר שצוין על ידיvalue.converter.אם להודעת Pub/Sub יש מאפיינים מותאמים אישית והערך של
kafka.record.headersהואfalse, המחבר כותב מבנה לערך הרשומה. המבנה מכיל שדה אחד לכל מאפיין, ושדה בשם"message"שהערך שלו הוא גוף ההודעה ב-Pub/Sub (מאוחסן כבייטים):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }במקרה כזה, צריך להשתמש ב-
value.converterשתואם לסכימותstruct, כמוorg.apache.kafka.connect.json.JsonConverter.אם להודעת Pub/Sub יש מאפיינים מותאמים אישית והערך של
kafka.record.headersהואtrue, המחבר כותב את המאפיינים ככותרות של רשומת Kafka. הוא כותב את גוף ההודעה ב-Pub/Sub ישירות לערך הרשומה ב-Kafka כסוגbyte[], באמצעות הממיר שצוין על ידיvalue.converter.
כותרות של רשומות Kafka. כברירת מחדל, הכותרות ריקות, אלא אם מגדירים את
kafka.record.headersלערךtrue.
אפשרויות להגדרות אישיות
בנוסף להגדרות שסופקו על ידי Kafka Connect API, Pub/Sub Group Kafka Connector תומך בהגדרות של sink ו-source כמו שמתואר בהגדרות של Pub/Sub Connector.
קבלת תמיכה
אם אתם צריכים עזרה, אתם יכולים ליצור כרטיס תמיכה. לשאלות כלליות ולדיונים, אפשר ליצור issue במאגר GitHub.
המאמרים הבאים
- הסבר על ההבדלים בין Kafka לבין Pub/Sub
- מידע נוסף על Pub/Sub Group Kafka Connector
- אפשר לעיין במאגר GitHub של Pub/Sub Group Kafka Connector.