יצירת נושא בשירות מנוהל של Google Cloud ל-Apache Kafka

בשירות המנוהל ל-Apache Kafka, ההודעות מאורגנות לפי נושאים. נושא מורכב ממחיצות. מחיצה היא רצף מסודר של רשומות שלא ניתן לשנות, שנמצא בבעלות של ברוקר יחיד באשכול Kafka. כדי לפרסם או לצרוך הודעות, צריך ליצור נושא.

כדי ליצור נושא, אפשר להשתמש במסוף, ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או בממשקי ה-API של Apache Kafka בקוד פתוח. Google Cloud

לפני שמתחילים

כדי ליצור נושא, צריך קודם ליצור אשכול. ודאו שהגדרתם את הפריטים הבאים:

תפקידים והרשאות שנדרשים כדי ליצור נושא

כדי לקבל את ההרשאות שנדרשות ליצירת נושא, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד עורך נושאים של Managed Kafka (roles/managedkafka.topicEditor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות ליצירת נושא. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי ליצור נושא, צריך את ההרשאות הבאות:

  • יצירת נושא: managedkafka.topics.create

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

מאפיינים של נושא בשירות מנוהל ל-Apache Kafka

כשיוצרים או מעדכנים נושא בשירות מנוהל ל-Apache Kafka, צריך לציין את המאפיינים הבאים.

שם הנושא

השם של הנושא בשירות המנוהל ל-Apache Kafka שאתם יוצרים. הנחיות לשמות של נושאים מפורטות במאמר הנחיות לשמות של משאבים בשירות מנוהל ל-Apache Kafka. אי אפשר לשנות את השם של נושא.

מספר המחיצות

מספר המחיצות בנושא. אפשר לערוך נושא כדי להגדיל את מספר המחיצות שלו, אבל אי אפשר להקטין אותו. הגדלת מספר המחיצות בנושא שמשתמש במפתח עשויה לשנות את אופן חלוקת ההודעות.

גורם השכפול

מספר העותקים לכל מחיצה. אם לא מציינים את הערך, נעשה שימוש בגורם השכפול שמוגדר כברירת מחדל באשכול.

גורם שכפול גבוה יותר יכול לשפר את עקביות הנתונים במקרה של כשלים בשרתים, כי הנתונים משוכפלים לכמה שרתים. לסביבות ייצור מומלץ להגדיר גורם שכפול של 3 ומעלה. ככל שיש יותר עותקים משוכפלים, כך עולה יותר לאחסן את הנושא במאגר מקומי ולהעביר את הנתונים שלו. עם זאת, הם לא מגדילים את העלויות של האחסון המתמיד. גורם השכפול לא יכול להיות גדול ממספר הברוקרים הזמינים.

פרמטרים אחרים

אפשר גם להגדיר פרמטרים אחרים של הגדרות ברמת הנושא ב-Apache Kafka. הן מוגדרות כזוגות של key=value שמבטלים את ברירות המחדל של האשכול.

להגדרות שקשורות לנושאים יש הגדרת ברירת מחדל לשרת ואפשרות לבטל אותה לכל נושא. הפורמט הוא רשימה מופרדת בפסיקים של צמדי KEY=VALUE, כאשר KEY הוא שם מאפיין ההגדרה של נושא Kafka, ו-VALUE היא ההגדרה הנדרשת.צמדי המפתח/ערך האלה עוזרים לכם לבטל את ברירות המחדל של האשכול. לדוגמה, flush.ms=10 ו-compression.type=producer.

רשימה של כל ההגדרות הנתמכות ברמת הנושא מופיעה במאמר Topic-level configs במסמכי התיעוד של Apache Kafka.

יצירת נושא

לפני שיוצרים נושא, כדאי לעיין במאפייני הנושא.

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    מעבר אל Clusters

  2. לוחצים על האשכול שרוצים ליצור לו נושא.

    הדף Cluster details נפתח.

  3. בדף הפרטים של האשכול, לוחצים על Create Topic.

    ייפתח הדף Create Kafka topic.

  4. בשדה שם הנושא, מזינים מחרוזת.

  5. בקטע מספר המחיצות, מזינים את מספר המחיצות הרצוי או משאירים את ערך ברירת המחדל.

  6. בשדה Replication factor (גורם השכפול), מזינים את גורם השכפול הרצוי או משאירים את ערך ברירת המחדל.

  7. (אופציונלי) כדי לשנות הגדרות של נושאים, מוסיפים אותן כשדות Configurations של זוגות מפתח-ערך שמופרדים בפסיקים.

  8. לוחצים על יצירה.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka topics create:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: שם הנושא.
    • CLUSTER: השם של האשכול שבו רוצים ליצור את הנושא.
    • LOCATION: האזור של האשכול.
    • PARTITIONS: מספר המחיצות בנושא.
    • REPLICATION_FACTOR: גורם השכפול של הנושא.
    • CONFIGS: פרמטרים אופציונליים ברמת הנושא. מציינים כצמדים של מפתח/ערך שמופרדים בפסיקים. לדוגמה, compression.type=producer.

Kafka CLI

לפני שמריצים את הפקודה הזו, צריך להתקין את כלי שורת הפקודה של Kafka במכונה וירטואלית של Compute Engine. המכונה הווירטואלית צריכה להיות מסוגלת להגיע לרשת משנה שמחוברת לאשכול של השירות המנוהל ל-Apache Kafka. פועלים לפי ההוראות במאמר יצירה ושימוש בהודעות באמצעות כלי שורת הפקודה של Kafka.

מריצים את הפקודה kafka-topics.sh באופן הבא:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

מחליפים את מה שכתוב בשדות הבאים:

  • BOOTSTRAP_ADDRESS: כתובת ה-bootstrap של אשכול השירות המנוהל ל-Apache Kafka.

  • TOPIC_ID: שם הנושא.

  • PARTITIONS: מספר המחיצות בנושא.

  • REPLICATION_FACTOR: פקטור השכפול של הנושא.

REST

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: מזהה האשכול
  • TOPIC_ID: מזהה הנושא
  • PARTITION_COUNT: מספר המחיצות בנושא
  • REPLICATION_FACTOR: מספר העותקים של כל מחיצה

ה-method של ה-HTTP וכתובת ה-URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

תוכן בקשת JSON:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

אפשר להשתמש במשאב של Terraform כדי ליצור נושא.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

כדי ללמוד איך להחיל הגדרות ב-Terraform או להסיר אותן, ראו פקודות בסיסיות ב-Terraform.

המשך

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

מה השלב הבא?