Memperbarui cluster Google Cloud Managed Service for Apache Kafka

Anda dapat mengedit cluster Managed Service untuk Apache Kafka guna memperbarui properti seperti ukuran cluster (jumlah vCPU dan memori), daftar subnet yang terhubung, konfigurasi penyeimbangan ulang otomatis, dan konfigurasi mTLS.

Untuk mengedit cluster, Anda dapat menggunakan Google Cloud konsol, Google Cloud CLI, library klien, atau Managed Kafka API. Anda tidak dapat menggunakan Apache Kafka API open source untuk memperbarui cluster.

Memperbarui properti tertentu, seperti jumlah vCPU dan memori, mungkin mengharuskan layanan memulai ulang cluster. Cluster dimulai ulang satu broker dalam satu waktu. Selama proses ini, permintaan ke masing-masing broker mungkin gagal, tetapi kegagalan ini bersifat sementara. Library klien yang umum digunakan akan otomatis menangani error ini.

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang diperlukan guna memperbarui cluster, minta administrator untuk memberi Anda peran IAM Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk memperbarui cluster. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk memperbarui cluster:

  • Mengedit cluster: managedkafka.clusters.update

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Peran Managed Kafka Cluster Editor tidak memungkinkan Anda membuat, menghapus, atau mengubah topik dan grup konsumen di cluster Managed Service untuk Apache Kafka. Peran ini juga tidak mengizinkan akses data plane untuk memublikasikan atau menggunakan pesan dalam cluster. Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service untuk Apache Kafka.

Mengubah ukuran cluster

Jika Anda memperbarui jumlah vCPU atau memori cluster, aturan berikut akan berlaku:

  • Rasio vCPU-ke-memori keseluruhan cluster harus selalu berada di antara 1:1 dan 1:8.

  • Jika Anda melakukan penskalaan ke bawah, harus ada minimal 1 vCPU dan 1 GiB memori untuk setiap broker yang ada. Jumlah broker tidak pernah berkurang.

  • Jika Anda melakukan penskalaan ke atas, dan perubahan tersebut mengakibatkan penambahan broker baru, vCPU dan memori rata-rata per broker tidak dapat berkurang lebih dari 10% dibandingkan dengan rata-rata sebelum pembaruan.

    Misalnya, jika Anda mencoba melakukan penskalaan ke atas cluster dari 45 vCPU (3 broker) menjadi 48 vCPU (4 broker), operasi akan gagal. Hal ini karena vCPU rata-rata per broker berkurang dari 15 menjadi 12, yang merupakan pengurangan sebesar 20%, sehingga melebihi batas 10%.

Untuk mengetahui informasi selengkapnya, lihat Memperbarui ukuran cluster.

Mengedit cluster

Untuk mengedit cluster, ikuti langkah-langkah berikut:

Konsol

  1. Di Google Cloud konsol, buka halaman Clusters.

    Buka Cluster

  2. Dari daftar cluster, klik cluster yang propertinya ingin Anda edit.

    Halaman detail cluster akan ditampilkan.

  3. Di halaman detail cluster, klik Edit.

  4. Edit properti sesuai kebutuhan. Properti cluster berikut dapat diedit dari konsol:

    • Memori
    • vCPUs
    • Subnet
    • Konfigurasi penyeimbangan ulang
    • Konfigurasi mTLS
    • Label
  5. Klik Save.

gcloud

  1. Di konsol, aktifkan Cloud Shell. Google Cloud

    Aktifkan Cloud Shell

    Di bagian bawah konsol Google Cloud , sesi Cloud Shell akan dimulai dan menampilkan prompt command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi pada sesi.

  2. Jalankan perintah gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Ganti kode berikut:

    • CLUSTER_ID: ID atau nama cluster. Anda tidak dapat memperbarui nilai ini.
    • LOCATION: Lokasi cluster. Anda tidak dapat memperbarui nilai ini.
    • CPU: Jumlah CPU virtual untuk cluster.
    • MEMORY: Jumlah memori untuk cluster. Gunakan unit "MB", "MiB", "GB", "GiB", "TB", atau "TiB". Misalnya, "10GiB".
    • SUBNETS: Daftar subnet yang akan terhubung. Gunakan koma untuk memisahkan beberapa nilai subnet.
    • auto-rebalance: Mengaktifkan penyeimbangan ulang otomatis partisi topik di antara broker saat jumlah CPU dalam cluster berubah. Fitur ini diaktifkan secara default.
    • LABELS: Label yang akan dikaitkan dengan cluster.

Jika Anda menggunakan flag --async dengan perintah, sistem akan mengirim permintaan pembaruan dan segera menampilkan respons, tanpa menunggu operasi selesai. Dengan flag --async, Anda dapat melanjutkan tugas lain saat pembaruan cluster terjadi di latar belakang. Jika Anda tidak menggunakan flag --async, sistem akan menunggu operasi selesai sebelum menampilkan respons. Anda harus menunggu hingga cluster sepenuhnya diperbarui sebelum dapat melanjutkan tugas lain.

REST

Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

  • PROJECT_ID: ID proyek Google Cloud Anda
  • LOCATION: lokasi cluster
  • CLUSTER_ID: ID cluster
  • UPDATE_MASK: kolom yang akan diperbarui, sebagai daftar yang dipisahkan koma dari nama yang sepenuhnya memenuhi syarat. Contoh: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT: jumlah vCPU untuk cluster
  • MEMORY: jumlah memori untuk cluster, dalam byte
  • SUBNET_ID: ID subnet dari subnet yang akan terhubung ke

Metode HTTP dan URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

Meminta isi JSON:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

Anda akan melihat respons JSON seperti berikut:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Di isi permintaan, sertakan hanya kolom yang Anda perbarui, seperti yang ditentukan dalam parameter kueri UPDATE_MASK. Untuk menambahkan subnet, tambahkan entri baru ke networkConfigs.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Go API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Java API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Python API.

Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

Batasan

Setelah membuat cluster Managed Service untuk Apache Kafka, Anda tidak dapat memperbarui properti berikut:

  • Nama cluster
  • Lokasi cluster
  • Jenis enkripsi

Meskipun Anda tidak dapat mengubah jenis enkripsi, Anda dapat merotasi kunci enkripsi.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar dari The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.