עדכון של אשכול בשירות מנוהל של Google Cloud ל-Apache Kafka

אפשר לערוך אשכול של שירות מנוהל ל-Apache Kafka כדי לעדכן מאפיינים כמו גודל האשכול (מספר ליבות ה-vCPU והזיכרון), רשימת רשתות המשנה המחוברות, הגדרת האיזון מחדש האוטומטי והגדרת ה-mTLS.

כדי לערוך אשכול, אפשר להשתמש במסוף Google Cloud , ב-Google Cloud CLI, בספריית הלקוח או ב-Managed Kafka API. אי אפשר להשתמש ב-API של Apache Kafka בקוד פתוח כדי לעדכן אשכול.

יכול להיות שעדכון של מאפיינים מסוימים, כמו מספר ליבות ה-vCPU והזיכרון, ידרוש הפעלה מחדש של האשכול. האשכול מופעל מחדש, ברוקר אחד בכל פעם. במהלך התהליך הזה, בקשות לברוקרים ספציפיים עשויות להיכשל, אבל הכשלים האלה הם זמניים. ספריות לקוח בשימוש נפוץ מטפלות בשגיאות האלה באופן אוטומטי.

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאות שנדרשות לעדכון אשכול, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM ‏Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד שמוגדר מראש וכולל את ההרשאות שנדרשות לעדכון אשכול. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי לעדכן אשכול, צריך את ההרשאות הבאות:

  • עריכת אשכול: managedkafka.clusters.update

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

שינוי הגודל של אשכול

אם מעדכנים את מספר ליבות ה-vCPU או את הזיכרון של אשכול, הכללים הבאים חלים:

  • יחס ה-vCPU לזיכרון הכולל של האשכול חייב תמיד להיות בין 1:1 ל-1:8.

  • אם מצמצמים את הקיבולת, צריך להיות לפחות vCPU אחד ו-1 GiB של זיכרון לכל ברוקר קיים. מספר הברוקרים אף פעם לא יורד.

  • אם מגדילים את הקיבולת, והשינוי גורם להוספה של ברוקרים חדשים, הממוצע של vCPU וזיכרון לכל ברוקר לא יכול לרדת ביותר מ-10% בהשוואה לממוצעים לפני העדכון.

    לדוגמה, אם מנסים להגדיל את הקיבולת של אשכול מ-45 ליבות וירטואליות (3 ברוקרים) ל-48 ליבות וירטואליות (4 ברוקרים), הפעולה תיכשל. הסיבה לכך היא שהממוצע של vCPU לכל ברוקר יורד מ-15 ל-12, כלומר ירידה של 20%, שחורגת מהמגבלה של 10%.

מידע נוסף זמין במאמר בנושא עדכון גודל האשכול.

עריכת אשכול

כדי לערוך אשכול, מבצעים את השלבים הבאים:

המסוף

  1. נכנסים לדף Clusters במסוף Google Cloud .

    מעבר אל Clusters

  2. ברשימת האשכולות, לוחצים על האשכול שרוצים לערוך את המאפיינים שלו.

    יוצג דף הפרטים של האשכול.

  3. בדף הפרטים של האשכול, לוחצים על עריכה.

  4. עורכים את המאפיינים לפי הצורך. אפשר לערוך את המאפיינים הבאים של אשכול במסוף:

    • זיכרון
    • מעבדים וירטואלים
    • תת-רשת
    • הגדרות איזון מחדש
    • הגדרת mTLS
    • תוויות
  5. לוחצים על Save.

gcloud

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    מחליפים את מה שכתוב בשדות הבאים:

    • CLUSTER_ID: המזהה או השם של האשכול. אי אפשר לעדכן את הערך הזה.
    • LOCATION: המיקום של האשכול. אי אפשר לעדכן את הערך הזה.
    • CPU: מספר המעבדים הווירטואליים באשכול.
    • MEMORY: כמות הזיכרון של האשכול. צריך להשתמש ביחידות 'MB',‏ 'MiB',‏ 'GB',‏ 'GiB',‏ 'TB' או 'TiB'. לדוגמה, ‎"10GiB".
    • SUBNETS: רשימת תת-הרשתות להתחברות. כדי להפריד בין כמה ערכים של רשתות משנה, צריך להשתמש בפסיקים.
    • auto-rebalance: הפעלה של איזון מחדש אוטומטי של מחיצות בנושא בין ברוקרים, כשמספר המעבדים באשכול משתנה. ההגדרה הזו מופעלת כברירת מחדל.
    • LABELS: תוויות לשיוך לאשכול.

אם משתמשים בדגל --async עם הפקודה, המערכת שולחת את בקשת העדכון ומחזירה תשובה באופן מיידי, בלי לחכות לסיום הפעולה. בעזרת הדגל --async, אפשר להמשיך במשימות אחרות בזמן שעדכון האשכול מתבצע ברקע. אם לא משתמשים בדגל --async, המערכת מחכה שהפעולה תסתיים לפני שהיא מחזירה תגובה. צריך לחכות עד שהעדכון של האשכול יסתיים לפני שממשיכים למשימות אחרות.

REST

לפני שמשתמשים בנתוני הבקשה, צריך להחליף את הנתונים הבאים:

  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
  • LOCATION: המיקום של האשכול
  • CLUSTER_ID: מזהה האשכול
  • UPDATE_MASK: השדות לעדכון, כרשימה מופרדת בפסיקים של שמות מלאים. דוגמה: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT: מספר המעבדים הווירטואליים באשכול
  • MEMORY: כמות הזיכרון באשכול, בבייטים
  • SUBNET_ID: מזהה תת-הרשת שאליה רוצים להתחבר

ה-method של ה-HTTP וכתובת ה-URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

תוכן בקשת JSON:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

כדי לשלוח את הבקשה צריך להרחיב אחת מהאפשרויות הבאות:

אתם אמורים לקבל תגובת JSON שדומה לזו:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

בגוף הבקשה, כוללים רק את השדות שרוצים לעדכן, כפי שמצוין בפרמטר השאילתה UPDATE_MASK. כדי להוסיף רשת משנה, מוסיפים רשומה חדשה ל-networkConfigs.

המשך

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API של שירות מנוהל ל-Apache Kafka Go.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, צריך להגדיר את Application Default Credentials‏(ADC). מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של Managed Service for Apache Kafka Java API.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

לפני שמנסים להריץ את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר התקנת ספריות הלקוח. מידע נוסף מופיע ב מאמרי העזרה של ה-API בשפת Python של שירות מנוהל ל-Apache Kafka.

כדי לבצע אימות לשירות המנוהל ל-Apache Kafka, מגדירים את ה-Application Default Credentials. מידע נוסף זמין במאמר הגדרת ADC לסביבת פיתוח מקומית.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

מגבלות

אחרי שיוצרים אשכול של שירות מנוהל ל-Apache Kafka, אי אפשר לעדכן את המאפיינים הבאים:

  • שם האשכול
  • המיקום של האשכול
  • סוג ההצפנה

אי אפשר לשנות את סוג ההצפנה, אבל אפשר לבצע רוטציה של מפתחות ההצפנה.

מה השלב הבא?

Apache Kafka®‎ הוא סימן מסחרי רשום של The Apache Software Foundation או של השותפים העצמאיים שלה בארצות הברית או במדינות אחרות.