אפשר לעדכן קבוצת צרכנים בשירות מנוהל של Google Cloud ל-Apache Kafka כדי לשנות את ההיסטים של רשימת מחיצות נושא. כך תוכלו לקבוע אילו הודעות יקבלו הצרכנים בקבוצה.
כדי לעדכן קבוצת צרכנים, אפשר להשתמש ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או ב-APIs של Apache Kafka בקוד פתוח. אי אפשר לערוך קבוצה של משתמשים פרטיים דרך Google Cloud המסוף.
לפני שמתחילים
כדי לעדכן קבוצת צרכנים, קודם צריך לוודא שהיא לא צורכת הודעות באופן פעיל.
קבוצת צרכנים נמחקת אוטומטית על ידי Kafka אם היא אף פעם לא צרכה הודעות, או אם תוקף האופסט האחרון שבוצע לו קומיט פג אחרי offsets.retention.minutes.
לפני שמעדכנים קבוצת צרכנים, פועלים לפי השלבים הבאים:
שולחים כמה הודעות לנושא שממנו קבוצת הצרכנים קוראת הודעות.
כדי לעבד כמה הודעות, צריך ליצור קבוצת צרכנים.
להפסיק את הצריכה של הודעות על ידי כל הצרכנים. כדי לעצור צרכן, מקישים על Control+C.
מידע נוסף על שליחה וצריכה של הודעות זמין במאמר יצירה וצריכה של הודעות באמצעות כלי שורת הפקודה של Kafka.
תפקידים והרשאות שנדרשים לעדכון קבוצת צרכנים
כדי לקבל את ההרשאות שנדרשות לעריכת קבוצות צרכנים, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד עורך קבוצות צרכנים מנוהלות של Kafka (roles/managedkafka.consumerGroupEditor) בפרויקט.
כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות לעריכת קבוצות הצרכנים. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:
ההרשאות הנדרשות
כדי לערוך את קבוצות הצרכנים, נדרשות ההרשאות הבאות:
-
עדכון קבוצות צרכנים:
managedkafka.consumerGroups.update
יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.
איך נותנים לסוכן השירות גישת קריאה
כדי לעדכן את ההיסטים של קבוצת הצרכנים, לסוכן השירות נדרשת גישה לפעולת הקריאה במשאבי הנושא וקבוצת הצרכנים. הגישה הזו מוגדרת באמצעות קובצי ACL של Apache Kafka.
אם לא הגדרתם אף ACL של Apache Kafka לקבוצת הצרכנים ולנושא שלה באשכול, לסוכן השירות יש גישה סביבתית למשאבים האלה. אפשר לדלג על הקטע הזה.
אם רשימות ה-ACL של Apache Kafka מוגדרות עבור קבוצת הצרכנים והנושא שלה באשכול, לסוכן השירות נדרשת גישה מפורשת לרשימת ה-ACL לפעולת הקריאה עבור שני המשאבים. כדי לעשות זאת, מוסיפים רשומות ACL שמעניקות לסוכן השירות גישה לפעולת הקריאה בקבוצת הצרכנים ובנושא הרלוונטיים. איך לעשות את זה?
-
התקינו את ה-CLI של Google Cloud.
-
אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init מריצים את הפקודה
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
מחליפים את מה שכתוב בשדות הבאים:
-
CONSUMER_GROUP_ACL_ID(חובה): המזהה הייחודי של משאב ה-ACL של השירות המנוהל ל-Apache Kafka, שאליו רוצים להוסיף את רשומת ה-ACL לקבוצת הצרכנים. כדי להחיל את הגישה על כל קבוצות הצרכנים, משתמשים ב-`allConsumerGroups`. לחלופין, כדי להחיל את הגישה על קבוצת צרכנים ספציפית, משתמשים ב-`consumerGroup/CONSUMER_GROUP_NAME`. -
TOPIC_ACL_ID(חובה): המזהה הייחודי של משאב ה-ACL בשירות המנוהל ל-Apache Kafka, שאליו רוצים להוסיף את רשומת ה-ACL לנושא. כדי להחיל את הגישה על כל הנושאים, משתמשים ב-`allTopics`. כדי להחיל את הגישה על נושא ספציפי, משתמשים ב-`topic/TOPIC_NAME`. -
CLUSTER_ID(חובה): המזהה של האשכול שמכיל את משאב ה-ACL. -
LOCATION(חובה): האזור שבו נמצא האשכול. כאן מפורטים המיקומים הנתמכים. -
PROJECT_NUMBER(חובה): מספר הפרויקט שבו נמצא האשכול. השם הזה משמש ליצירת שם המשתמש בשירות של סוכן השירות עבור רשומת ה-ACL.
-
למידע נוסף על הוספת רשומת ACL, ראו הוספת רשומת ACL.
עדכון של קבוצת צרכנים
חשוב לוודא שביצעתם את השלבים בקטע לפני שמתחילים.
כדי לעדכן קבוצת צרכנים:
gcloud
-
במסוף Google Cloud , מפעילים את Cloud Shell.
בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.
מריצים את הפקודה
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
מחליפים את מה שכתוב בשדות הבאים:
-
CLUSTER_ID: המזהה או השם של האשכול.
-
LOCATION: המיקום של האשכול.
-
CONSUMER_GROUP_ID: המזהה או השם של קבוצת הצרכנים.
-
TOPICS_FILE: ההגדרה הזו מציינת את המיקום של הקובץ שמכיל את התצורה של הנושאים שצריך לעדכן עבור קבוצת הצרכנים. הקובץ יכול להיות בפורמט JSON או YAML. אפשר להזין נתיב לקובץ או לכלול ישירות את התוכן בפורמט JSON או YAML.
קובץ הנושאים משתמש במבנה JSON כדי לייצג
ConsumerGroupמפת נושאים, בצורה{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. לכל נושא,ConsumerPartitionMetadataמספק את ההיסט ומטא-נתונים לכל מחיצה.כדי להגדיר את ההיסט עבור מחיצה אחת (מחיצה 0) בנושא בשם
topic1ל-10, הגדרת ה-JSON תיראה כך:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}דוגמה לתוכן של קובץ
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: כשמציינים נושאים בקובץ JSON או YAML, צריך לכלול את הנתיב המלא של הנושא. אפשר לקבל את הנתיב הזה על ידי הרצת הפקודה
gcloud managed-kafak topics describe, והוא בפורמטprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-