Google Cloud Managed Service for Apache Kafka クラスタを更新する

Google Cloud Managed Service for Apache Kafka クラスタを編集して、vCPU 数、メモリ、サブネット、暗号化タイプ、ラベルなどのプロパティを更新できます。ブローカーがクラスタに追加されたときに、サービスがブローカー間でパーティションの再調整を行うかどうかを構成することもできます。このサービスは、クラスタのメモリと vCPU の構成に基づいて新しいブローカーを自動的に作成します。

クラスタを編集するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してクラスタを更新することはできません。

始める前に

vCPU 数またはメモリを更新する場合は、次のルールが適用されます。

  • クラスタの vCPU とメモリの全体的な比率は、常に 1:1 ~ 1:8 の範囲内である必要があります。

  • ダウンスケールする場合は、既存のブローカーごとに 1 つ以上の vCPU と 1 GiB 以上のメモリが必要です。ブローカーの数が減ることはありません。

  • アップスケールし、変更によって新しいブローカーが追加される場合、ブローカーあたりの平均 vCPU とメモリは、更新前の平均値と比較して 10% 以上減少することはできません。

    たとえば、クラスタを 45 個の vCPU(3 つのブローカー)から 48 個の vCPU(4 つのブローカー)にスケールアップしようとすると、オペレーションは失敗します。これは、ブローカーあたりの平均 vCPU が 15 から 12 に減少し、20% の削減となり、10% の上限を超えているためです。

詳細については、クラスタサイズを更新するをご覧ください。

vCPU 数やメモリなどの特定のプロパティを更新すると、サービスがクラスタを再起動する必要が生じる場合があります。クラスタは、一度に 1 つのブローカーずつ再起動されます。これにより、個々のブローカーへのリクエストが一時的に失敗しますが、これらの障害は一時的なものです。一般的に使用されるクライアント ライブラリは、このようなエラーを自動的に処理します。

クラスタ名、クラスタのロケーション、暗号化タイプは編集できません。

クラスタの編集に必要なロールと権限

クラスタの更新に必要な権限を取得するには、プロジェクトに対する Managed Kafka クラスタ編集者 roles/managedkafka.clusterEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、クラスタの更新に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

クラスタを更新するには、次の権限が必要です。

  • クラスタを編集する: managedkafka.clusters.update

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka クラスタ編集者のロールでは、Managed Service for Apache Kafka クラスタのトピックとコンシューマー グループの作成、削除、変更はできません。また、クラスタ内でメッセージをパブリッシュまたは使用するためのデータプレーン アクセスも許可しません。このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

クラスタの編集

クラスタを編集する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. クラスタのリストで、プロパティを編集するクラスタをクリックします。

    クラスタの詳細ページが表示されます。

  3. クラスタの詳細ページで、[編集] をクリックします。

  4. 必要に応じてプロパティを編集します。クラスタの次のプロパティは、コンソールから編集できます。

    • メモリ
    • vCPU
    • サブネット
    • 再分散の構成
    • mTLS 構成
    • ラベル
  5. [保存] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka clusters update コマンドを実行します。

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    次のように置き換えます。

    • CLUSTER_ID: クラスタの ID または名前。この値は更新できません。
    • LOCATION: クラスタのロケーション。この値は更新できません。
    • CPU: クラスタの仮想 CPU の数。
    • MEMORY: クラスタのメモリ容量。「MB」、「MiB」、「GB」、「GiB」、「TB」、「TiB」の単位を使用します。例: 「10GiB」。
    • SUBNETS: 接続するサブネットのリスト。複数のサブネット値を指定する場合は、カンマで区切ります。
    • auto-rebalance: クラスタ内の CPU 数が変化したときに、ブローカー間でトピック パーティションの自動再分散を有効にします。これはデフォルトで有効になっています。
    • LABELS: クラスタに関連付けるラベル。
  3. コマンドで --async フラグを使用すると、システムは更新リクエストを送信し、オペレーションの完了を待たずにすぐにレスポンスを返します。--async フラグを使用すると、クラスタの更新がバックグラウンドで行われている間も、他のタスクを続行できます。--async フラグを使用しない場合、システムはオペレーションが完了するまで待機してからレスポンスを返します。他のタスクを続行するには、クラスタが完全に更新されるまで待つ必要があります。

    REST

    リクエストのデータを使用する前に、次のように置き換えます。

    • PROJECT_ID: 実際の Google Cloud プロジェクト ID
    • LOCATION: クラスタのロケーション
    • CLUSTER_ID: クラスタの ID
    • UPDATE_MASK: 更新するフィールド(完全修飾名のカンマ区切りリスト)。例: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT: クラスタの vCPU 数
    • MEMORY: クラスタのメモリ量(バイト単位)
    • SUBNET_ID: 接続するサブネットのサブネット ID

    HTTP メソッドと URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    リクエストの本文(JSON):

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    リクエストを送信するには、次のいずれかのオプションを展開します。

    次のような JSON レスポンスが返されます。

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    リクエストの本文には、UPDATE_MASK クエリ パラメータで指定した、更新するフィールドのみを含めます。サブネットを追加するには、networkConfigs に新しいエントリを追加します。

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。