Google Cloud Managed Service for Apache Kafka クラスタを更新する

Managed Service for Apache Kafka クラスタを編集して、クラスタサイズ(vCPU 数とメモリ)、接続されたサブネットのリスト、自動リバランシング構成、mTLS 構成などのプロパティを更新できます。

クラスタを編集するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してクラスタを更新することはできません。

vCPU 数やメモリなどの特定のプロパティを更新すると、サービスがクラスタを再起動する必要が生じる場合があります。クラスタは、ブローカーごとに再起動されます。このプロセスでは、個々のブローカーへのリクエストが失敗する可能性がありますが、これらの障害は一時的なものです。一般的に使用されるクライアント ライブラリは、これらのエラーを自動的に処理します。

必要なロールと権限

クラスタの更新に必要な権限を取得するには、プロジェクトに対する Managed Kafka クラスタ編集者 roles/managedkafka.clusterEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、クラスタの更新に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

クラスタを更新するには、次の権限が必要です。

  • クラスタを編集する: managedkafka.clusters.update

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka クラスタ編集者のロールでは、Managed Service for Apache Kafka クラスタのトピックとコンシューマー グループの作成、削除、変更はできません。また、クラスタ内でメッセージをパブリッシュまたは使用するためのデータプレーン アクセスも許可しません。このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

クラスタのサイズ変更

クラスタの vCPU 数またはメモリを更新する場合は、次のルールが適用されます。

  • クラスタの vCPU とメモリの全体的な比率は、常に 1:1 ~ 1:8 の範囲内である必要があります。

  • ダウンスケールする場合は、既存のブローカーごとに 1 つ以上の vCPU と 1 GiB 以上のメモリが必要です。ブローカーの数が減ることはありません。

  • アップスケールし、変更によって新しいブローカーが追加される場合、ブローカーあたりの平均 vCPU とメモリは、更新前の平均値と比較して 10% 以上減少することはできません。

    たとえば、クラスタを 45 個の vCPU(3 つのブローカー)から 48 個の vCPU(4 つのブローカー)にアップスケールしようとすると、オペレーションは失敗します。これは、ブローカーあたりの平均 vCPU が 15 から 12 に減少し、20% の削減となり、10% の上限を超えているためです。

詳細については、クラスタサイズを更新するをご覧ください。

クラスタの編集

クラスタを編集する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. クラスタのリストで、プロパティを編集するクラスタをクリックします。

    クラスタの詳細ページが表示されます。

  3. クラスタの詳細ページで、[編集] をクリックします。

  4. 必要に応じてプロパティを編集します。クラスタの次のプロパティは、コンソールから編集できます。

    • メモリ
    • vCPU
    • サブネット
    • 再分散の構成
    • mTLS 構成
    • ラベル
  5. [保存] をクリックします。

gcloud

  1. Google Cloud コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud managed-kafka clusters update コマンドを実行します。

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    次のように置き換えます。

    • CLUSTER_ID: クラスタの ID または名前。この値は更新できません。
    • LOCATION: クラスタのロケーション。この値は更新できません。
    • CPU: クラスタの仮想 CPU の数。
    • MEMORY: クラスタのメモリ容量。「MB」、「MiB」、「GB」、「GiB」、「TB」、「TiB」の単位を使用します。例: 「10GiB」。
    • SUBNETS: 接続するサブネットのリスト。複数のサブネット値を指定する場合は、カンマで区切ります。
    • auto-rebalance: クラスタ内の CPU 数が変化したときに、ブローカー間でトピック パーティションの自動再分散を有効にします。これはデフォルトで有効になっています。
    • LABELS: クラスタに関連付けるラベル。

コマンドで --async フラグを使用すると、システムは更新リクエストを送信し、オペレーションの完了を待たずにすぐにレスポンスを返します。--async フラグを使用すると、クラスタの更新がバックグラウンドで行われている間も、他のタスクを続行できます。--async フラグを使用しない場合、システムはオペレーションが完了するまで待機してからレスポンスを返します。他のタスクを続行するには、クラスタが完全に更新されるまで待つ必要があります。

REST

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID
  • LOCATION: クラスタのロケーション
  • CLUSTER_ID: クラスタの ID
  • UPDATE_MASK: 更新するフィールド(完全修飾名のカンマ区切りリスト)。例: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT: クラスタの vCPU 数
  • MEMORY: クラスタのメモリ量(バイト単位)
  • SUBNET_ID: 接続するサブネットのサブネット ID

HTTP メソッドと URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

リクエストの本文(JSON):

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

リクエストの本文には、UPDATE_MASK クエリ パラメータで指定した、更新するフィールドのみを含めます。サブネットを追加するには、networkConfigs に新しいエントリを追加します。

Go

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

制限事項

Managed Service for Apache Kafka クラスタを作成した後は、次のプロパティを更新できません。

  • クラスタ名
  • クラスタのロケーション
  • 暗号化のタイプ

暗号化タイプは変更できませんが、暗号鍵をローテーションすることはできます。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。