הצגת נושא בשירות מנוהל של Google Cloud ל-Apache Kafka

כדי לראות את המידע המפורט על נושא יחיד, אפשר להשתמש בGoogle Cloud מסוף, ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או בממשקי ה-API של Apache Kafka בקוד פתוח.

התפקידים וההרשאות שנדרשים כדי לצפות בנושא

כדי לקבל את ההרשאות שנדרשות לצפייה בנושא, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM ‏Managed Kafka Viewer (roles/managedkafka.viewer) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות לצפייה בנושא. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי להציג נושא, צריך את ההרשאות הבאות:

  • רשימת הנושאים: managedkafka.topics.list
  • קבלת נושא: managedkafka.topics.get

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

מאפייני הנושא במסוף

במסוף אפשר לראות את מאפייני הנושא הבאים:

  • הגדרות: בכרטיסייה הזו מופיעים פרטי הגדרה כלליים לגבי הנושא, כולל:

    • Name: המזהה הייחודי של הנושא בתוך האשכול.

    • מחיצות: מספר המחיצות בנושא. מחיצות מחלקות את הנתונים של הנושא לפלחים כדי לאפשר הרחבה ועיבוד מקבילי.

    • עותקים: מספר העותקים (רפליקות) שמתחזקים לכל מחיצה כדי להבטיח יתירות וזמינות של הנתונים.

    • Cluster: השם של האשכול בשירות המנוהל ל-Apache Kafka שאליו הנושא משויך.

    • אזור: Google Cloud האזור שבו נמצאים האשכול והנושא.

    • פרמטרים של נושא שלא מוגדרים כברירת מחדל: כל שינוי בהגדרות ברמת הנושא שהוגדר עבור הנושא, ששונה מהגדרות ברירת המחדל ברמת האשכול.

  • מעקב: בכרטיסייה הזו מוצגים תרשימים ויזואליים עם מדדים מרכזיים שקשורים לפעילות ולביצועים של הנושא. התרשימים האלה כוללים את המידע הבא:

    • מספר הבייטים: תרשים של סדרת זמן שבו מוצג קצב הייצור או השליחה של בייטים לנושא. המאפיין הזה מציין את נפח הנתונים שפורסמו בנושא לאורך זמן. המדד המתאים הוא managedkafka.googleapis.com/byte_in_count.

    • מספר הבקשות: תרשים של סדרת זמנים שמייצג את שיעור הבקשות שנשלחו לנושא. הוא משקף את הפעילות והשימוש הכוללים בנושא. המדד הקשור הוא managedkafka.googleapis.com/topic_request_count.

    • Log segments by partition: בתרשים הזה מוצג מספר פלחי היומן הפעילים לכל מחיצה בנושא. פלחי יומן הם הקבצים הפיזיים בדיסק שבהם Kafka מאחסן את נתוני הנושא. המדד הרלוונטי הוא managedkafka.googleapis.com/log_segments.

  • קבוצות של צרכנים פרטיים: בקטע הזה מפורטות קבוצות של צרכנים פרטיים שמנויות לנושא. קבוצת צרכנים היא קבוצה של צרכנים שעובדים יחד כדי לקרוא הודעות מהנושא.

הצגת נושא

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    מעבר אל Clusters

    מוצגת רשימת האשכולות שיצרתם בפרויקט.

  2. לוחצים על האשכול שרוצים לראות את הנושאים שלו.

    יוצג דף הפרטים של האשכול. בכרטיסייה Resources בדף פרטי האשכול, מופיעים הנושאים.

  3. כדי לראות נושא ספציפי, לוחצים על שם הנושא.

    יוצג דף הפרטים של הנושא.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka topics describe:

    gcloud managed-kafka topics describe TOPIC_ID \
      --cluster=CLUSTER_ID --location=LOCATION_ID
    

    הפקודה הזו מאחזרת ומציגה פרטים מקיפים על הנושא שצוין. המידע הזה כולל את הגדרות התצורה שלו, כמו מספר המחיצות, גורם השכפול וכל שינוי בהגדרות ברמת הנושא.

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: המזהה של הנושא.
    • CLUSTER_ID: המזהה של האשכול שמכיל את הנושא.
    • LOCATION_ID: המיקום של האשכול.

הפקודה gcloud managed-kafka topics describe מציגה מידע מינימלי על נושא, כמו מספר המחיצות וגורם השכפול. כדי לקבל מידע מפורט יותר, כולל הקצאות של מחיצות ואת כל הגדרות התצורה, משתמשים בכלי kafka-topics.shשל שורת הפקודה.

Kafka CLI

לפני שמריצים את הפקודה הזו, צריך להתקין את כלי שורת הפקודה של Kafka במכונה וירטואלית של Compute Engine. המכונה הווירטואלית צריכה להיות מסוגלת להגיע לרשת משנה שמחוברת לאשכול של השירות המנוהל ל-Apache Kafka. פועלים לפי ההוראות במאמר יצירה ושימוש בהודעות באמצעות כלי שורת הפקודה של Kafka.

כדי לראות פרטים על נושא, מריצים את הפקודה kafka-topics.sh --describe:

kafka-topics.sh --describe \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID

מחליפים את מה שכתוב בשדות הבאים:

  • BOOTSTRAP_ADDRESS: כתובת ה-bootstrap של אשכול השירות המנוהל ל-Apache Kafka.
  • TOPIC_ID: המזהה של הנושא.

הפקודה הזו מחזירה קבוצת משנה של מאפייני הנושא, כולל המאפיינים הבאים:

  • מספר המחיצות
  • גורם השכפול
  • הקצאת מחיצות
  • הגדרה דינמית (הגדרות שהגדרתם באופן מפורש)
  • הגדרות סטטיות (הגדרות שמוחלות כשהאשכול מופעל)

כדי להציג את כל הגדרות התצורה של נושא, כולל הגדרות עם ערכי ברירת מחדל, מריצים את הפקודה kafka-configs.sh --describe:

kafka-configs.sh --describe \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--entity-type topics \
--entity-name TOPIC_ID \
--all

הפלט הוא רשימה של הגדרות כצמדי מפתח/ערך. הדגל --all מחזיר את כל הגדרות התצורה. כדי לקבל רשימה של הגדרות דינמיות בלבד, משמיטים את הדגל --all.

REST

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: מזהה האשכול
  • TOPIC_ID: מזהה הנושא

ה-method של ה-HTTP וכתובת ה-URL:

GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

המשך

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func getTopic(w io.Writer, projectID, region, clusterID, topicID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	req := &managedkafkapb.GetTopicRequest{
		Name: topicPath,
	}
	topic, err := client.GetTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.GetTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Got topic: %#v\n", topic)
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;

public class GetTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    getTopic(projectId, region, clusterId, topicId);
  }

  public static void getTopic(String projectId, String region, String clusterId, String topicId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      // This operation is being handled synchronously.
      Topic topic =
          managedKafkaClient.getTopic(TopicName.of(projectId, region, clusterId, topicId));
      System.out.println(topic.getAllFields());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.getTopic got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"

client = managedkafka_v1.ManagedKafkaClient()

topic_path = client.topic_path(project_id, region, cluster_id, topic_id)
request = managedkafka_v1.GetTopicRequest(
    name=topic_path,
)

try:
    topic = client.get_topic(request=request)
    print("Got topic:", topic)
except NotFound as e:
    print(f"Failed to get topic {topic_id} with error: {e.message}")

מה השלב הבא?