הצגת רשימה של הנושאים בשירות המנוהל של Google Cloud ל-Apache Kafka

כדי להציג את רשימת הנושאים באשכול, אפשר להשתמש במסוף Google Cloud , ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או בממשקי ה-API של Apache Kafka בקוד פתוח.

התפקידים וההרשאות שנדרשים כדי לראות את רשימת הנושאים

כדי לקבל את ההרשאות שנדרשות להצגת רשימת הנושאים, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM‏ Managed Kafka Viewer (roles/managedkafka.viewer) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות להצגת רשימת הנושאים. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי להציג את הנושאים שלכם, צריך את ההרשאות הבאות:

  • רשימת הנושאים: managedkafka.topics.list
  • קבלת נושא: managedkafka.topics.get

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

הצגת הנושאים

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    מעבר אל Clusters

    מוצגת רשימה של האשכולות שיצרתם בפרויקט.

  2. לוחצים על האשכול שרוצים לראות את הנושאים שלו.

    יוצג דף הפרטים של האשכול. בכרטיסייה Resources בדף פרטי האשכול, מופיעים הנושאים.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka topics list:

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    הפקודה הזו מאחזרת רשימה של כל הנושאים שקיימים באשכול שצוין של שירות מנוהל ל-Apache Kafka. אפשר להשתמש בדגלים אופציונליים כדי לסנן, להגביל ולמיין את הפלט.

    מחליפים את מה שכתוב בשדות הבאים:

    • CLUSTER_ID: השם של האשכול שרוצים לראות את רשימת הנושאים שלו.
    • LOCATION_ID: המיקום של האשכול.
    • LIMIT (אופציונלי): המספר המקסימלי של הנושאים שיופיעו ברשימה.

Kafka CLI

לפני שמריצים את הפקודה הזו, צריך להתקין את כלי שורת הפקודה של Kafka במכונה וירטואלית של Compute Engine. המכונה הווירטואלית צריכה להיות מסוגלת להגיע לרשת משנה שמחוברת לאשכול של השירות המנוהל ל-Apache Kafka. פועלים לפי ההוראות במאמר יצירה ושימוש בהודעות באמצעות כלי שורת הפקודה של Kafka.

מריצים את הפקודה kafka-topics.sh באופן הבא:

kafka-topics.sh --list \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties

מחליפים את מה שכתוב בשדות הבאים:

REST

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: מזהה האשכול

ה-method של ה-HTTP וכתובת ה-URL:

GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "topics": [
    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
      "partitionCount": 50,
      "replicationFactor": 3,
      "configs": {
        "remote.storage.enable": "false",
        "cleanup.policy": "delete",
        "retention.ms": "-1"
      }
    },
    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": 3,
      "replicationFactor": 3
    }
  ]
}

המשך

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	req := &managedkafkapb.ListTopicsRequest{
		Parent: clusterPath,
	}
	topicIter := client.ListTopics(ctx, req)
	for {
		res, err := topicIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("topicIter.Next() got err: %w", err)
		}
		fmt.Fprintf(w, "Got topic: %v", res)
	}
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import java.io.IOException;

public class ListTopics {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    listTopics(projectId, region, clusterId);
  }

  public static void listTopics(String projectId, String region, String clusterId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
      // This operation is being handled synchronously.
      for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
        System.out.println(topic.getAllFields());
      }
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"

client = managedkafka_v1.ManagedKafkaClient()

request = managedkafka_v1.ListTopicsRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
)

response = client.list_topics(request=request)
for topic in response:
    print("Got topic:", topic)

מה השלב הבא?

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.