קישור של Pub/Sub ל-Apache Kafka

במאמר הזה מוסבר איך לשלב בין Apache Kafka ל-Pub/Sub באמצעות Pub/Sub Group Kafka Connector.

מידע על Pub/Sub Group Kafka Connector

‫Apache Kafka היא פלטפורמת קוד פתוח לסטרימינג של אירועים. הוא משמש בדרך כלל בארכיטקטורות מבוזרות כדי לאפשר תקשורת בין רכיבים עם צימוד רופף. ‫Pub/Sub הוא שירות מנוהל לשליחה ולקבלה של הודעות באופן אסינכרוני. בדומה ל-Kafka, אפשר להשתמש ב-Pub/Sub כדי לתקשר בין רכיבים בארכיטקטורת הענן.

המחבר Pub/Sub Group Kafka מאפשר לשלב בין שתי המערכות האלה. המחברים הבאים כלולים בחבילת ה-JAR של המחבר:

  • מחבר היעד קורא רשומות מנושא אחד או יותר ב-Kafka ומפרסם אותן ב-Pub/Sub.
  • מחבר המקור קורא הודעות מנושא Pub/Sub ומפרסם אותן ב-Kafka.

הנה כמה תרחישים שבהם כדאי להשתמש ב-Pub/Sub Group Kafka Connector:

  • אתם מעבירים ארכיטקטורה מבוססת-Kafka אל Google Cloud.
  • יש לכם מערכת קצה קדמי שמאחסנת אירועים ב-Kafka מחוץ ל-Google Cloud, אבל אתם גם משתמשים ב-Google Cloudכדי להפעיל חלק משירותי הקצה העורפי שלכם, שצריכים לקבל את האירועים ב-Kafka. Google Cloud
  • אתם אוספים יומנים מפתרון Kafka מקומי ושולחים אותם אל Google Cloud לניתוח נתונים.
  • יש לכם מערכת frontend שמשתמשת ב- Google Cloud, אבל אתם גם מאחסנים נתונים באופן מקומי באמצעות Kafka.

המחבר דורש Kafka Connect, שהיא מסגרת להזרמת נתונים בין Kafka למערכות אחרות. כדי להשתמש במחבר, צריך להריץ את Kafka Connect לצד אשכול Kafka.

ההנחה במאמר הזה היא שאתם מכירים את Kafka ואת Pub/Sub. לפני שקוראים את המאמר הזה, מומלץ לעיין באחד ממדריכי ההתחלה המהירה של Pub/Sub.

המחבר של Pub/Sub לא תומך בשילוב בין Google Cloud IAM לבין רשימות ACL של Kafka Connect.

איך מתחילים להשתמש בכלי לחיבור

בקטע הזה מוסבר איך לבצע את המשימות הבאות:

  1. מגדירים את Pub/Sub Group Kafka Connector.
  2. שליחת אירועים מ-Kafka אל Pub/Sub.
  3. שליחת הודעות מ-Pub/Sub ל-Kafka.

דרישות מוקדמות

התקנת Kafka

פועלים לפי המדריך לתחילת העבודה עם Apache Kafka כדי להתקין Kafka עם צומת יחיד במחשב המקומי. מבצעים את השלבים הבאים במדריך למתחילים:

  1. מורידים את הגרסה האחרונה של Kafka ומחלצים אותה.
  2. מפעילים את סביבת Kafka.
  3. יוצרים נושא Kafka.

אמת

מחבר Pub/Sub Group Kafka צריך לעבור אימות ב-Pub/Sub כדי לשלוח ולקבל הודעות Pub/Sub. כדי להגדיר אימות, מבצעים את השלבים הבאים:

  1. נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
  2. התקינו את ה-CLI של Google Cloud.

  3. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  4. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  5. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  6. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  7. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.
  8. התקינו את ה-CLI של Google Cloud.

  9. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  10. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  11. יוצרים או בוחרים Google Cloud פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Google Cloud פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Google Cloud שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Google Cloud

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Google Cloud .

  12. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  13. מעניקים תפקידים לחשבון המשתמש. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • USER_IDENTIFIER: המזהה של חשבון המשתמש . לדוגמה, myemail@example.com.
    • ROLE: תפקיד ה-IAM שאתם מקצים לחשבון המשתמש.

הורדת קובץ ה-JAR של המחבר

מורידים את קובץ ה-JAR של המחבר למחשב המקומי. מידע נוסף זמין במאמר Acquire the connector בקובץ ה-README ב-GitHub.

העתקת קובצי התצורה של המחבר

  1. משכפלים או מורידים את מאגר GitHub של המחבר.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. מעתיקים את התוכן של הספרייה config לספריית המשנה config בהתקנת Kafka.

    cp config/* [path to Kafka installation]/config/
    

הקבצים האלה מכילים הגדרות תצורה של המחבר.

עדכון ההגדרה של Kafka Connect

  1. עוברים אל הספרייה שמכילה את קובץ ה-binary של Kafka Connect שהורדתם.
  2. בספרייה הבינארית של Kafka Connect, פותחים את הקובץ בשם config/connect-standalone.properties בכלי לעריכת טקסט.
  3. אם השורה plugin.path property מופיעה כהערה, צריך לבטל את ההערה.
  4. מעדכנים את plugin.path property כך שיכלול את הנתיב לקובץ ה-JAR של המחבר.

    דוגמה:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. מגדירים את המאפיין offset.storage.file.filename לשם של קובץ מקומי. במצב עצמאי, Kafka משתמש בקובץ הזה כדי לאחסן נתוני היסט.

    דוגמה:

    offset.storage.file.filename=/tmp/connect.offsets
    

העברת אירועים מ-Kafka ל-Pub/Sub

בקטע הזה מוסבר איך להפעיל את מחבר היעד, לפרסם אירועים ב-Kafka ואז לקרוא את ההודעות שהועברו מ-Pub/Sub.

  1. משתמשים ב-Google Cloud CLI כדי ליצור נושא Pub/Sub עם מינוי.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    מחליפים את מה שכתוב בשדות הבאים:

    • PUBSUB_TOPIC: השם של נושא ב-Pub/Sub שאליו יתקבלו ההודעות מ-Kafka.
    • PUBSUB_SUBSCRIPTION: השם של מינוי Pub/Sub לנושא.
  2. פותחים את הקובץ /config/cps-sink-connector.properties בכלי לעריכת טקסט. מוסיפים ערכים למאפיינים הבאים, שמסומנים ב-"TODO" בהערות:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    מחליפים את מה שכתוב בשדות הבאים:

    • KAFKA_TOPICS: רשימה מופרדת בפסיקים של נושאי Kafka לקריאה.
    • PROJECT_ID: הפרויקט Google Cloud שכולל את נושא ה-Pub/Sub.
    • PUBSUB_TOPIC: נושא Pub/Sub לקבלת ההודעות מ-Kafka.
  3. מריצים את הפקודה הבאה מהספרייה של Kafka:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. פועלים לפי השלבים במאמר Apache Kafka quickstart כדי לכתוב כמה אירועים לנושא Kafka.

  5. משתמשים ב-CLI של gcloud כדי לקרוא את האירועים מ-Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

העברת הודעות מ-Pub/Sub ל-Kafka

בקטע הזה מוסבר איך להפעיל את מחבר המקור, לפרסם הודעות ב-Pub/Sub ולקרוא את ההודעות שהועברו מ-Kafka.

  1. משתמשים ב-CLI של gcloud כדי ליצור נושא Pub/Sub עם מינוי.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    מחליפים את מה שכתוב בשדות הבאים:

    • PUBSUB_TOPIC: השם של נושא Pub/Sub.
    • PUBSUB_SUBSCRIPTION: השם של מינוי Pub/Sub.
  2. פותחים את הקובץ שנקרא /config/cps-source-connector.properties בכלי לעריכת טקסט. מוסיפים ערכים למאפיינים הבאים, שמסומנים ב-"TODO" בתגובות:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    מחליפים את מה שכתוב בשדות הבאים:

    • KAFKA_TOPIC: נושאי Kafka לקבלת הודעות Pub/Sub.
    • PROJECT_ID: הפרויקט Google Cloud שכולל את נושא ה-Pub/Sub.
    • PUBSUB_TOPIC: נושא ה-Pub/Sub.
  3. מריצים את הפקודה הבאה מהספרייה של Kafka:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. משתמשים ב-CLI של gcloud כדי לפרסם הודעה ב-Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. קוראים את ההודעה מ-Kafka. כדי לקרוא את ההודעות מנושא Kafka, פועלים לפי השלבים שבמדריך למתחילים של Apache Kafka.

המרה מהודעת טקסט

רשומה ב-Kafka מכילה מפתח וערך, שהם מערכים של בייטים באורך משתנה. אופציונלית, רשומת Kafka יכולה לכלול גם כותרות, שהן צמדי מפתח-ערך. הודעת Pub/Sub מורכבת משני חלקים עיקריים: גוף ההודעה ואפס או יותר מאפיינים של צמדי מפתח/ערך.

‫Kafka Connect משתמש בממירים כדי לבצע סריאליזציה של מפתחות וערכים אל Kafka וממנו. כדי לשלוט בסריאליזציה, מגדירים את המאפיינים הבאים בקובצי התצורה של המחבר:

  • key.converter: הכלי להמרה שמשמש לסדר את מפתחות הרשומות.
  • value.converter: הממיר שמשמש לסריאליזציה של ערכי רשומות.

גוף ההודעה ב-Pub/Sub הוא אובייקט ByteString, ולכן ההמרה הכי יעילה היא העתקה ישירה של מטען הייעודי (payload). לכן, מומלץ להשתמש בממיר שמפיק טיפוסי נתונים פרימיטיביים (מספר שלם, מספר עשרוני, מחרוזת או סכימת בייטים) איפה שאפשר, כדי למנוע ביטול סריאליזציה וסריאליזציה מחדש של אותו גוף הודעה.

המרת נתונים מ-Kafka ל-Pub/Sub

מחבר היעד ממיר רשומות של Kafka להודעות Pub/Sub באופן הבא:

  • מפתח הרשומה של Kafka מאוחסן כמאפיין בשם "key" בהודעת Pub/Sub.
  • כברירת מחדל, המחבר משמיט את כל הכותרות ברשומה של Kafka. עם זאת, אם מגדירים את אפשרות ההגדרה headers.publish לערך true, המחבר כותב את הכותרות כמאפיינים של Pub/Sub. המחבר מדלג על כותרות שחורגות מהמגבלות על מאפייני הודעות ב-Pub/Sub.
  • בסכימות של מספרים שלמים, מספרים עשרוניים, מחרוזות ובייטים, המחבר מעביר את הבייטים של ערך הרשומה ב-Kafka ישירות לגוף ההודעה ב-Pub/Sub.
  • בסכימות של מבנים, המחבר כותב כל שדה כמאפיין של הודעת Pub/Sub. לדוגמה, אם השדה הוא { "id"=123 }, ההודעה שמתקבלת ב-Pub/Sub כוללת את המאפיין "id"="123". ערך השדה תמיד מומר למחרוזת. סוגי המפות והמבנים לא נתמכים כסוגי שדות בתוך מבנה.
  • בסכימות של מפות, המחבר כותב כל צמד מפתח/ערך כמאפיין של הודעת Pub/Sub. לדוגמה, אם המיפוי הוא {"alice"=1,"bob"=2}, להודעת Pub/Sub שמתקבלת יש שני מאפיינים, "alice"="1" ו-"bob"="2". המפתחות והערכים מומרים למחרוזות.

לסכימות של מבנים ומיפוי יש כמה התנהגויות נוספות:

  • אפשר גם לציין שדה מסוים במבנה או מפתח מיפוי שיהיה גוף ההודעה, על ידי הגדרת מאפיין ההגדרה messageBodyName. הערך של השדה או המפתח מאוחסן כ-ByteString בגוף ההודעה. אם לא מגדירים את messageBodyName, גוף ההודעה ריק בסכימות של struct ו-map.

  • עבור ערכי מערך, המחבר תומך רק בסוגי מערך פרימיטיביים. רצף הערכים במערך משורשר לאובייקט ByteString יחיד.

המרת נתונים מ-Pub/Sub ל-Kafka

מחבר המקור ממיר הודעות Pub/Sub לרשומות Kafka באופן הבא:

  • מפתח רשומה של Kafka: כברירת מחדל, המפתח מוגדר ל-null. לחלופין, אפשר לציין מאפיין של הודעת Pub/Sub לשימוש כמפתח, על ידי הגדרת אפשרות ההגדרה kafka.key.attribute. במקרה כזה, המחבר מחפש מאפיין עם השם הזה ומגדיר את מפתח הרשומה לערך המאפיין. אם המאפיין שצוין לא קיים, מפתח הרשומה מוגדר ל-null.

  • ערך הרשומה ב-Kafka. המחבר כותב את ערך הרשומה באופן הבא:

    • אם להודעת Pub/Sub אין מאפיינים מותאמים אישית, המחבר כותב את גוף ההודעה של Pub/Sub ישירות לערך הרשומה של Kafka כסוג byte[], באמצעות הממיר שצוין על ידי value.converter.

    • אם להודעת Pub/Sub יש מאפיינים מותאמים אישית והערך של kafka.record.headers הוא false, המחבר כותב מבנה לערך הרשומה. המבנה מכיל שדה אחד לכל מאפיין, ושדה בשם "message" שהערך שלו הוא גוף ההודעה ב-Pub/Sub (מאוחסן כבייטים):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      במקרה כזה, צריך להשתמש ב-value.converter שתואם לסכימות struct, כמו org.apache.kafka.connect.json.JsonConverter.

    • אם להודעת Pub/Sub יש מאפיינים מותאמים אישית והערך של kafka.record.headers הוא true, המחבר כותב את המאפיינים ככותרות של רשומת Kafka. הוא כותב את גוף ההודעה ב-Pub/Sub ישירות לערך הרשומה ב-Kafka כסוג byte[], באמצעות הממיר שצוין על ידי value.converter.

  • כותרות של רשומות Kafka. כברירת מחדל, הכותרות ריקות, אלא אם מגדירים את kafka.record.headers לערך true.

אפשרויות להגדרות אישיות

בנוסף להגדרות שסופקו על ידי Kafka Connect API, ‏ Pub/Sub Group Kafka Connector תומך בהגדרות של sink ו-source כמו שמתואר בהגדרות של Pub/Sub Connector.

קבלת תמיכה

אם אתם צריכים עזרה, אתם יכולים ליצור כרטיס תמיכה. לשאלות כלליות ולדיונים, אפשר ליצור issue במאגר GitHub.

המאמרים הבאים