עדכון נושא בשירות מנוהל של Google Cloud ל-Apache Kafka

אחרי שיוצרים נושא, אפשר לערוך את ההגדרה שלו כדי לעדכן את המאפיינים הבאים: מספר המחיצות והגדרות הנושא שלא מוגדרות כברירת מחדל למאפיינים שכבר הוגדרו ברמת האשכול. אפשר רק להגדיל את מספר המחיצות, ולא להקטין אותו.

כדי לעדכן נושא יחיד, אפשר להשתמש במסוף Google Cloud , ב-Google Cloud CLI, בספריית הלקוח, ב-Managed Kafka API או בממשקי ה-API של Apache Kafka בקוד פתוח.

התפקידים וההרשאות שנדרשים לעריכת נושא

כדי לקבל את ההרשאות שנדרשות לעריכת נושא, צריך לבקש מהאדמין להקצות לכם את התפקיד ב-IAM 'עורך נושאים מנוהלים של Kafka' (roles/managedkafka.topicEditor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות לעריכת נושא. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי לערוך נושא, צריך את ההרשאות הבאות:

  • כדי לעדכן נושא: managedkafka.topics.update

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

עריכת נושא

כדי לערוך נושא, פועלים לפי השלבים הבאים:

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    מעבר אל Clusters

    מוצגת רשימה של האשכולות שיצרתם בפרויקט.

  2. לוחצים על האשכול שאליו משתייך הנושא שרוצים לערוך.

    הדף Cluster details נפתח. בדף פרטי האשכול, הנושאים מפורטים בכרטיסייה Resources.

  3. לוחצים על הנושא שרוצים לערוך.

    ייפתח הדף פרטי הנושא.

  4. כדי לבצע עריכה, לוחצים על עריכה.

  5. אחרי שמבצעים את השינויים, לוחצים על שמירה.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka topics update:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    הפקודה הזו משנה את ההגדרה של נושא קיים באשכול שצוין של שירות מנוהל ל-Apache Kafka. אפשר להשתמש בפקודה הזו כדי להגדיל את מספר המחיצות ולעדכן את הגדרות ההגדרה ברמת הנושא.

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: המזהה של הנושא.
    • CLUSTER_ID: המזהה של האשכול שמכיל את הנושא.
    • LOCATION_ID: המיקום של האשכול.
    • PARTITIONS: אופציונלי: מספר המחיצות המעודכן של הנושא. אפשר רק להגדיל את מספר המחיצות, ולא להקטין אותו.
    • CONFIGS: אופציונלי: רשימה של הגדרות תצורה לעדכון. מציינים רשימה מופרדת בפסיקים של זוגות של מפתח וערך. לדוגמה, retention.ms=3600000,retention.bytes=10000000.

REST

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: מזהה האשכול
  • TOPIC_ID: מזהה הנושא
  • UPDATE_MASK: השדות לעדכון, כרשימה מופרדת בפסיקים של שמות מלאים. לדוגמה: partitionCount
  • PARTITION_COUNT: המספר המעודכן של המחיצות בנושא

ה-method של ה-HTTP וכתובת ה-URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

תוכן בקשת JSON:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

המשך

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

הגדרת שמירת הודעות

‫Kafka מאחסן הודעות בקבצים של פלחי יומן. כברירת מחדל, Kafka מוחק קובצי פלח אחרי תקופת שמירה או כשמחיצה חורגת מסף גודל הנתונים. אפשר לשנות את ההתנהגות הזו על ידי הפעלת דחיסת יומן. אם דחיסת היומן מופעלת, Kafka שומר רק את הערך האחרון של כל מפתח.

שירות מנוהל של Google Cloud ל-Apache Kafka משתמש באחסון מדורג, כלומר פלחים של יומנים שהושלמו מאוחסנים מרחוק ולא באחסון מקומי. מידע נוסף על אחסון לפי רמות זמין במאמר Tiered Storage במסמכים בנושא Apache Kafka.

הגדרת ערכי השמירה

אם דחיסת היומן לא מופעלת, ההגדרות הבאות קובעות איך Kafka מאחסן קבצים של פלחי יומן:

  • retention.ms: משך הזמן המקסימלי לשמירת קובצי פלחים, באלפיות שנייה.
  • retention.bytes: המספר המקסימלי של בייטים לאחסון בכל מחיצה. אם הנתונים במחיצה חורגים מהערך הזה, קפקא משליך קבצים ישנים יותר של פלחים.

כדי לעדכן את ההגדרות האלה, משתמשים ב-CLI של gcloud או ב-CLI של Kafka:

gcloud

כדי להגדיר את משך השמירה של ההודעות, מריצים את הפקודה gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

מחליפים את מה שכתוב בשדות הבאים:

  • TOPIC_ID: המזהה של הנושא.
  • CLUSTER_ID: המזהה של האשכול שמכיל את הנושא.
  • LOCATION_ID: המיקום של האשכול.
  • RETENTION_PERIOD: משך הזמן המקסימלי לאחסון קובצי פלחים, באלפיות השנייה.
  • MAX_BYTES: המספר המקסימלי של בייטים לאחסון בכל מחיצה.

Kafka CLI

לפני שמריצים את הפקודה הזו, צריך להתקין את כלי שורת הפקודה של Kafka במכונה וירטואלית של Compute Engine. המכונה הווירטואלית צריכה להיות מסוגלת להגיע לרשת משנה שמחוברת לאשכול של השירות המנוהל ל-Apache Kafka. פועלים לפי ההוראות במאמר יצירה ושימוש בהודעות באמצעות כלי שורת הפקודה של Kafka.

מריצים את הפקודה kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

מחליפים את מה שכתוב בשדות הבאים:

  • BOOTSTRAP_ADDRESS: כתובת ה-bootstrap של אשכול השירות המנוהל ל-Apache Kafka.
  • TOPIC_ID: המזהה של הנושא.
  • RETENTION_PERIOD: משך הזמן המקסימלי לאחסון קובצי פלחים, באלפיות השנייה.
  • MAX_BYTES: המספר המקסימלי של בייטים לאחסון בכל מחיצה.

הפעלת דחיסה של יומנים

אם דחיסת היומן מופעלת, Kafka מאחסן רק את ההודעה האחרונה לכל מפתח. דחיסת יומנים מושבתת כברירת מחדל. כדי להפעיל דחיסה של יומן בנושא מסוים, מגדירים את התצורה cleanup.policy לערך "compact", באופן הבא:

gcloud

מריצים את הפקודה gcloud managed-kafka topics update.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

מחליפים את מה שכתוב בשדות הבאים:

  • TOPIC_ID: המזהה של הנושא.
  • CLUSTER_ID: המזהה של האשכול שמכיל את הנושא.
  • LOCATION_ID: המיקום של האשכול.

Kafka CLI

לפני שמריצים את הפקודה הזו, צריך להתקין את כלי שורת הפקודה של Kafka במכונה וירטואלית של Compute Engine. המכונה הווירטואלית צריכה להיות מסוגלת להגיע לרשת משנה שמחוברת לאשכול של השירות המנוהל ל-Apache Kafka. פועלים לפי ההוראות במאמר יצירה ושימוש בהודעות באמצעות כלי שורת הפקודה של Kafka.

מריצים את הפקודה kafka-configs.sh:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

מחליפים את מה שכתוב בשדות הבאים:

  • BOOTSTRAP_ADDRESS: כתובת ה-bootstrap של אשכול השירות המנוהל ל-Apache Kafka.
  • TOPIC_ID: המזהה של הנושא.

מגבלות

  • אי אפשר לבטל את ההגדרות של נושאים לאחסון מרוחק, כמו remote.storage.enable.

  • אי אפשר לבטל את ההגדרות של נושאים בקובצי פלחים של יומנים, כמו segment.bytes.

  • הפעלת דחיסת יומנים בנושא מסוים משביתה באופן אוטומטי את האחסון בשכבות בנושא הזה. כל קובצי היומן של הנושא מאוחסנים באופן מקומי.

מה השלב הבא?