עדכון של קבוצת צרכנים בשירות מנוהל של Google Cloud ל-Apache Kafka

אפשר לעדכן קבוצת צרכנים בשירות מנוהל של Google Cloud ל-Apache Kafka כדי לשנות את ההיסטים של רשימת מחיצות נושא. כך תוכלו לקבוע אילו הודעות יקבלו הצרכנים בקבוצה.

כדי לעדכן קבוצת צרכנים, אפשר להשתמש ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או ב-APIs של Apache Kafka בקוד פתוח. אי אפשר לערוך קבוצה של משתמשים פרטיים דרך Google Cloud המסוף.

לפני שמתחילים

כדי לעדכן קבוצת צרכנים, קודם צריך לוודא שהיא לא צורכת הודעות באופן פעיל. קבוצת צרכנים נמחקת אוטומטית על ידי Kafka אם היא אף פעם לא צרכה הודעות, או אם תוקף האופסט האחרון שבוצע לו קומיט פג אחרי offsets.retention.minutes.

לפני שמעדכנים קבוצת צרכנים, פועלים לפי השלבים הבאים:

  1. שולחים כמה הודעות לנושא שממנו קבוצת הצרכנים קוראת הודעות.

  2. כדי לעבד כמה הודעות, צריך ליצור קבוצת צרכנים.

  3. להפסיק את הצריכה של הודעות על ידי כל הצרכנים. כדי לעצור צרכן, מקישים על ‏Control+C.

מידע נוסף על שליחה וצריכה של הודעות זמין במאמר יצירה וצריכה של הודעות באמצעות כלי שורת הפקודה של Kafka.

תפקידים והרשאות שנדרשים לעדכון קבוצת צרכנים

כדי לקבל את ההרשאות שנדרשות לעריכת קבוצות צרכנים, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד עורך קבוצות צרכנים מנוהלות של Kafka (roles/managedkafka.consumerGroupEditor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות לעריכת קבוצות הצרכנים. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי לערוך את קבוצות הצרכנים, נדרשות ההרשאות הבאות:

  • עדכון קבוצות צרכנים: managedkafka.consumerGroups.update

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

איך נותנים לסוכן השירות גישת קריאה

כדי לעדכן את ההיסטים של קבוצת הצרכנים, לסוכן השירות נדרשת גישה לפעולת הקריאה במשאבי הנושא וקבוצת הצרכנים. הגישה הזו מוגדרת באמצעות קובצי ACL של Apache Kafka.

אם לא הגדרתם אף ACL של Apache Kafka לקבוצת הצרכנים ולנושא שלה באשכול, לסוכן השירות יש גישה סביבתית למשאבים האלה. אפשר לדלג על הקטע הזה.

אם רשימות ה-ACL של Apache Kafka מוגדרות עבור קבוצת הצרכנים והנושא שלה באשכול, לסוכן השירות נדרשת גישה מפורשת לרשימת ה-ACL לפעולת הקריאה עבור שני המשאבים. כדי לעשות זאת, מוסיפים רשומות ACL שמעניקות לסוכן השירות גישה לפעולת הקריאה בקבוצת הצרכנים ובנושא הרלוונטיים. איך לעשות את זה?

  1. התקינו את ה-CLI של Google Cloud.

  2. אם אתם משתמשים בספק זהויות חיצוני (IdP), קודם אתם צריכים להיכנס ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

  3. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  4. מריצים את הפקודה gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    מחליפים את מה שכתוב בשדות הבאים:

    • CONSUMER_GROUP_ACL_ID (חובה): המזהה הייחודי של משאב ה-ACL של השירות המנוהל ל-Apache Kafka, שאליו רוצים להוסיף את רשומת ה-ACL לקבוצת הצרכנים. כדי להחיל את הגישה על כל קבוצות הצרכנים, משתמשים ב-`allConsumerGroups`. לחלופין, כדי להחיל את הגישה על קבוצת צרכנים ספציפית, משתמשים ב-`consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (חובה): המזהה הייחודי של משאב ה-ACL בשירות המנוהל ל-Apache Kafka, שאליו רוצים להוסיף את רשומת ה-ACL לנושא. כדי להחיל את הגישה על כל הנושאים, משתמשים ב-`allTopics`. כדי להחיל את הגישה על נושא ספציפי, משתמשים ב-`topic/TOPIC_NAME`.
    • CLUSTER_ID (חובה): המזהה של האשכול שמכיל את משאב ה-ACL.
    • LOCATION (חובה): האזור שבו נמצא האשכול. כאן מפורטים המיקומים הנתמכים.
    • PROJECT_NUMBER (חובה): מספר הפרויקט שבו נמצא האשכול. השם הזה משמש ליצירת שם המשתמש בשירות של סוכן השירות עבור רשומת ה-ACL.

למידע נוסף על הוספת רשומת ACL, ראו הוספת רשומת ACL.

עדכון של קבוצת צרכנים

חשוב לוודא שביצעתם את השלבים בקטע לפני שמתחילים.

כדי לעדכן קבוצת צרכנים:

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    מחליפים את מה שכתוב בשדות הבאים:

    • CLUSTER_ID: המזהה או השם של האשכול.

    • LOCATION: המיקום של האשכול.

    • CONSUMER_GROUP_ID: המזהה או השם של קבוצת הצרכנים.

    • TOPICS_FILE: ההגדרה הזו מציינת את המיקום של הקובץ שמכיל את התצורה של הנושאים שצריך לעדכן עבור קבוצת הצרכנים. הקובץ יכול להיות בפורמט JSON או YAML. אפשר להזין נתיב לקובץ או לכלול ישירות את התוכן בפורמט JSON או YAML.

      קובץ הנושאים משתמש במבנה JSON כדי לייצג ConsumerGroup מפת נושאים, בצורה { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. לכל נושא, ConsumerPartitionMetadata מספק את ההיסט ומטא-נתונים לכל מחיצה.

      כדי להגדיר את ההיסט עבור מחיצה אחת (מחיצה 0) בנושא בשם topic1 ל-10, הגדרת ה-JSON תיראה כך:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      דוגמה לתוכן של קובץ topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: כשמציינים נושאים בקובץ JSON או YAML, צריך לכלול את הנתיב המלא של הנושא. אפשר לקבל את הנתיב הזה על ידי הרצת הפקודה gcloud managed-kafak topics describe, והוא בפורמט projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

מה השלב הבא?

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.