הצגת קבוצת צרכנים בשירות מנוהל של Google Cloud ל-Apache Kafka

כדי לראות את המידע המפורט על קבוצת צרכנים אחת, אפשר להשתמש במסוףGoogle Cloud , ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או בממשקי ה-API של Apache Kafka בקוד פתוח.

תפקידים והרשאות שנדרשים כדי לראות קבוצת צרכנים

כדי לקבל את ההרשאות שנדרשות להצגת קבוצות הצרכנים, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM‏ Managed Kafka Viewer (roles/managedkafka.viewer) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות לצפייה בקבוצות הצרכנים. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי לראות את קבוצות הצרכנים, צריך את ההרשאות הבאות:

  • רשימת קבוצות צרכנים: managedkafka.consumerGroups.list
  • קבלת פרטים של קבוצת צרכנים: managedkafka.consumerGroups.get

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

הצגת קבוצת צרכנים

כדי לראות את המידע המפורט על קבוצת צרכנים באשכול ספציפי:

המסוף

  1. נכנסים לדף Cluster במסוף Google Cloud .

    מעבר אל Clusters

  2. לוחצים על האשכול שרוצים לראות את קבוצות הצרכנים שלו.

    יוצג הדף Cluster details. בדף Cluster details, מתחת לכרטיסייה Resources, מופיעות קבוצות הצרכנים.

  3. לוחצים על קבוצת הצרכנים.

    ייפתח הדף פרטי קבוצת הצרכנים.

  4. הדף מכיל את הפרטים הבאים:

    • הגדרה: בכרטיסייה הזו מוצג השם של קבוצת הצרכנים. מוצג גם הנושא שאליו קבוצת הצרכנים משויכת.
    • מעקב: בכרטיסייה הזו מוצג תרשים מעקב של השהיית ההיסט לפי מחיצה.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka consumer-groups describe:

    gcloud managed-kafka consumer-groups describe CONSUMER_GROUP_ID \
        --cluster=CLUSTER \
        --location=LOCATION

    הפקודה הזו מספקת מידע מפורט על קבוצת צרכנים ספציפית של שירות מנוהל ל-Apache Kafka. הוא כולל מידע על קבוצת הצרכנים, כולל השם, מזהה הקבוצה, זמן היצירה וזמן העדכון.

    מחליפים את מה שכתוב בשדות הבאים:

    • CONSUMER_GROUP_ID: המזהה או השם של קבוצת הצרכנים.

    • CLUSTER: המזהה או השם של האשכול.

    • LOCATION: המיקום של האשכול.

Go

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func getConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
	req := &managedkafkapb.GetConsumerGroupRequest{
		Name: consumerGroupPath,
	}
	consumerGroup, err := client.GetConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.GetConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Got consumer group: %#v\n", consumerGroup)
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import java.io.IOException;

public class GetConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String consumerGroupId = "my-consumer-group";
    getConsumerGroup(projectId, region, clusterId, consumerGroupId);
  }

  public static void getConsumerGroup(
      String projectId, String region, String clusterId, String consumerGroupId) throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      // This operation is being handled synchronously.
      ConsumerGroup consumerGroup =
          managedKafkaClient.getConsumerGroup(
              ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId));
      System.out.println(consumerGroup.getAllFields());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.getConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"

client = managedkafka_v1.ManagedKafkaClient()

consumer_group_path = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)
request = managedkafka_v1.GetConsumerGroupRequest(
    name=consumer_group_path,
)

try:
    consumer_group = client.get_consumer_group(request=request)
    print("Got consumer group:", consumer_group)
except NotFound as e:
    print(f"Failed to get consumer group {consumer_group_id} with error: {e.message}")

מה השלב הבא?

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.