Google Cloud Managed Service for Apache Kafka クラスタを作成する

Managed Service for Apache Kafka クラスタは、トピックに整理されたメッセージのストリームを保存して処理する環境を提供します。

クラスタを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Managed Kafka API を使用します。オープンソースの Apache Kafka API を使用してクラスタを作成することはできません。

始める前に

次の内容を理解していることを確認します。

クラスタの作成に必要なロールと権限

クラスタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka クラスタ編集者 roles/managedkafka.clusterEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、クラスタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

クラスタを作成するには、次の権限が必要です。

  • クラスタを作成します。 managedkafka.clusters.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Managed Kafka クラスタ編集者のロールでは、Managed Service for Apache Kafka クラスタのトピックとコンシューマー グループの作成、削除、変更はできません。また、クラスタ内でメッセージをパブリッシュまたは使用するためのデータプレーン アクセスも許可しません。このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。

Managed Service for Apache Kafka クラスタのプロパティ

Managed Service for Apache Kafka クラスタを作成または更新するときは、次のプロパティを指定する必要があります。

クラスタ名

作成する Managed Service for Apache Kafka クラスタの名前または ID。クラスタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。クラスタの名前は不変です。

場所

クラスタを作成するロケーション。ロケーションには、サポートされている Google Cloud リージョンのいずれかを指定します。クラスタのロケーションは後から変更できません。使用可能なロケーションのリストについては、Managed Service for Apache Kafka のロケーションをご覧ください。

容量構成

容量構成では、Kafka セットアップの vCPU 数とメモリ量を構成する必要があります。クラスタの容量を構成する方法については、Kafka クラスタのサイズを計画するをご覧ください。

容量構成のプロパティは次のとおりです。

  • vCPUs: クラスタ内の vCPU の数。クラスタごとに少なくとも 3 つの vCPU が必要です。

  • メモリ: クラスタに割り当てられているメモリの量。vCPU あたり 1 GiB ~ 8 GiB の範囲でプロビジョニングする必要があります。

    たとえば、6 個の vCPU を使用してクラスタを作成する場合、クラスタに割り当てることができる最小メモリは 6 GiB(vCPU あたり 1 GiB)で、最大メモリは 48 GiB(vCPU あたり 8 GiB)です。

クラスタの作成後にメモリと vCPU の数を変更する方法については、クラスタ サイズを更新するをご覧ください。

ネットワークの構成

ネットワーク構成は、クラスタがアクセスできる VPC サブネットのリストです。メッセージを生成または使用するには、クライアントがこれらのサブネットのいずれかにアクセスできる必要があります。

ネットワーク構成に関するガイドラインは次のとおりです。

  • クラスタには少なくとも 1 つのサブネットが必要です。最大値は 10 です。

  • 任意のクラスタに対して、ネットワークごとに 1 つのサブネットのみが許可されます。

  • 各サブネットはクラスタと同じリージョンに存在する必要があります。プロジェクトとネットワークは異なる場合があります。

  • ブローカーとブートストラップ サーバーの IP アドレスは、各サブネットに自動的に割り当てられます。また、これらの IP アドレスの DNS エントリが、対応する VPC ネットワーク内に作成されます。

  • 別のプロジェクトのサブネットを追加する場合は、クラスタに関連付けられている Google 管理のサービス アカウントに権限を付与する必要があります。詳細については、プロジェクト間でクラスタを接続するをご覧ください。

クラスタを作成した後、サブネットのリストを更新できます。ネットワーキングの詳細については、Managed Service for Apache Kafka のネットワーキングを構成するをご覧ください。

ラベル

ラベルは、整理と識別に役立つ Key-Value ペアです。ラベルを使用すると、環境に基づいてリソースを分類できます。例: "env:production""owner:data-engineering"

ラベルに基づいてリソースをフィルタして検索できます。たとえば、部門ごとに複数の Managed Service for Apache Kafka クラスタがあるとします。ラベル "department:marketing" を使用してクラスタを構成し、検索することで、関連するクラスタをすばやく見つけることができます。

再分散の構成

この設定は、サービスがブローカー間でパーティション レプリカを自動的に再調整するかどうかを決定します。

使用可能なモードは次のとおりです。

  • スケールアップ時の自動再調整: このオプションを有効にすると、クラスタをスケールアップしたときに、サービスがレプリカの再調整を自動的にトリガーします。このモードは、負荷の均等な分散を維持するのに役立ちますが、再調整オペレーション中にパフォーマンスが一時的に低下する可能性があります。

  • 再調整なし: このオプションを有効にすると、サービスはレプリカを自動的に再調整しません。

暗号化

Managed Service for Apache Kafka は、Google-owned and Google-managed encryption keys (デフォルト)または顧客管理の暗号鍵(CMEK)を使用してメッセージを暗号化できます。すべてのメッセージは、保存時も転送時も暗号化されます。クラスタの暗号化タイプは変更できません。

デフォルトでは Google-owned and Google-managed encryption keys が使用されます。これらの鍵は、 Google Cloud のインフラストラクチャ内で作成、管理、保存されます。

CMEK は、Cloud Key Management Service を使用して管理する暗号鍵です。この機能を使用すると、サポートされている Google Cloud サービス内で保存データの暗号化に使用する鍵をより細かく制御できます。CMEK を使用すると、Cloud Key Management Service に関連する追加費用が発生します。CMEK を使用する場合、キーリングは、使用するリソースと同じロケーションに存在する必要があります。詳細については、メッセージ暗号化を構成するをご覧ください。

mTLS 構成

必要に応じて、クライアント証明書を使用する代替認証方法として mTLS を構成できます。構成の内容は次のとおりです。

  • CA プール: クラスタがクライアント認証を信頼する 1 ~ 10 個の Certificate Authority Service(CAS)プールのリスト。

  • SSL プリンシパル マッピング ルール: Kafka ACL で使用する長い証明書プリンシパル名を簡略化するための ssl.principal.mapping.rules ブローカー プロパティ(省略可ですが、推奨)。

mTLS の詳細については、mTLS 認証を構成するをご覧ください。

クラスタの作成

クラスタを作成する前に、クラスタ プロパティのドキュメントを確認してください。

通常、クラスタの作成には 20~30 分かかります。

クラスタを作成する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [作成] を選択します。

    [Kafka クラスタの作成] ページが開きます。

  3. [クラスタ名] に文字列を入力します。

    クラスタの命名方法の詳細については、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

  4. [ロケーション] に、サポートされているロケーションを入力します。

    サポートされているロケーションの詳細については、サポートされている Managed Service for Apache Kafka のロケーションをご覧ください。

  5. [容量構成] で、[メモリ] と [vCPU] の値を入力します。

    Managed Service for Apache Kafka クラスタのサイズ設定方法については、Kafka クラスタのサイズを計画するをご覧ください。

  6. [ネットワーク構成] に、次の詳細を入力します。

    1. プロジェクト: サブネットワークが配置されているプロジェクト。サブネットはクラスタと同じリージョンに存在する必要がありますが、プロジェクトは異なっていてもかまいません。
    2. ネットワーク: サブネットが接続されているネットワーク。
    3. サブネットワーク: サブネットの名前。
    4. サブネットの URI パス: このフィールドは自動的に入力されます。または、ここにサブネット パスを入力することもできます。サブネットの名前は、projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID の形式にする必要があります。
    5. [完了] をクリックします。
  7. (省略可)[接続サブネットを追加] をクリックして、サブネットを追加します。

    サブネットは最大 10 個まで追加できます。

  8. 他のデフォルト値はそのままにします。

  9. [作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud managed-kafka clusters create コマンドを実行します。

    gcloud managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS
    

    次のように置き換えます。

    • CLUSTER_ID: クラスタの ID または名前。

      クラスタの命名方法の詳細については、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

    • LOCATION: クラスタのロケーション。

      サポートされているロケーションの詳細については、Managed Service for Apache Kafka のロケーションをご覧ください。

    • CPU: クラスタの vCPU の数。

      Managed Service for Apache Kafka クラスタのサイズ設定方法については、Kafka クラスタのサイズを計画するをご覧ください。

    • MEMORY: クラスタのメモリ容量。「MB」、「MiB」、「GB」、「GiB」、「TB」、「TiB」の単位を使用します。例: 「10GiB」。

    • SUBNETS: 接続するサブネットのリスト。複数のサブネット値を指定する場合は、カンマで区切ります。

      サブネットの形式は projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID です。

    • auto-rebalance: クラスタ内の CPU 数が変化したときに、ブローカー間でトピック パーティションの自動再分散を有効にします。これはデフォルトで有効になっています。

    • ENCRYPTION_KEY: クラスタに使用する顧客管理の暗号鍵の ID。

      形式は projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY です。

    • --async: オペレーションの完了を待たずに、システムが作成リクエストを送信してすぐにレスポンスを返します。--async フラグを使用すると、クラスタの作成がバックグラウンドで行われている間に、他のタスクを続行できます。フラグを使用しない場合、システムはオペレーションが完了するまで待機してからレスポンスを返します。他のタスクを続行するには、クラスタが完全に更新されるまで待つ必要があります。

    • LABELS: クラスタに関連付けるラベル。

      ラベルの形式の詳細については、ラベルをご覧ください。

    次のようなレスポンスが返されます。

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    progress をトラッキングするために OPERATION_ID を保存します。

  3. REST

    リクエストのデータを使用する前に、次のように置き換えます。

    • PROJECT_ID: 実際の Google Cloud プロジェクト ID
    • LOCATION: クラスタのロケーション
    • CLUSTER_ID: クラスタの ID
    • CPU_COUNT: クラスタの vCPU 数
    • MEMORY: クラスタのメモリ量(バイト単位)。例: 3221225472
    • SUBNET_ID: 接続先のサブネットのサブネット ID。例: default

    HTTP メソッドと URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters?clusterId=CLUSTER_ID

    リクエストの本文(JSON):

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    リクエストを送信するには、次のいずれかのオプションを展開します。

    次のような JSON レスポンスが返されます。

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Terraform

    Terraform リソースを使用してクラスタを作成できます。

    resource "google_managed_kafka_cluster" "default" {
      project    = data.google_project.default.project_id # Replace this with your project ID in quotes
      cluster_id = "my-cluster-id"
      location   = "us-central1"
      capacity_config {
        vcpu_count   = 3
        memory_bytes = 3221225472
      }
      gcp_config {
        access_config {
          network_configs {
            subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

    Go

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
    	// cpu := 3
    	// memoryBytes := 3221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)
    
    	// Memory must be between 1 GiB and 8 GiB per CPU.
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   cpu,
    		MemoryBytes: memoryBytes,
    	}
    	var networkConfig []*managedkafkapb.NetworkConfig
    	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
    		Subnet: subnet,
    	})
    	platformConfig := &managedkafkapb.Cluster_GcpConfig{
    		GcpConfig: &managedkafkapb.GcpConfig{
    			AccessConfig: &managedkafkapb.AccessConfig{
    				NetworkConfigs: networkConfig,
    			},
    		},
    	}
    	rebalanceConfig := &managedkafkapb.RebalanceConfig{
    		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:            clusterPath,
    		CapacityConfig:  capacityConfig,
    		PlatformConfig:  platformConfig,
    		RebalanceConfig: rebalanceConfig,
    	}
    
    	req := &managedkafkapb.CreateClusterRequest{
    		Parent:    locationPath,
    		ClusterId: clusterID,
    		Cluster:   cluster,
    	}
    	op, err := client.CreateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.AccessConfig;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.CreateClusterRequest;
    import com.google.cloud.managedkafka.v1.GcpConfig;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.NetworkConfig;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.RebalanceConfig;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        int cpu = 3;
        long memoryBytes = 3221225472L; // 3 GiB
        createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
      }
    
      public static void createCluster(
          String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig =
            CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
        NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
        GcpConfig gcpConfig =
            GcpConfig.newBuilder()
                .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
                .build();
        RebalanceConfig rebalanceConfig =
            RebalanceConfig.newBuilder()
                .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
                .build();
        Cluster cluster =
            Cluster.newBuilder()
                .setCapacityConfig(capacityConfig)
                .setGcpConfig(gcpConfig)
                .setRebalanceConfig(rebalanceConfig)
                .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
    
          CreateClusterRequest request =
              CreateClusterRequest.newBuilder()
                  .setParent(LocationName.of(projectId, region).toString())
                  .setClusterId(clusterId)
                  .setCluster(cluster)
                  .build();
    
          // The duration of this operation can vary considerably, typically taking between 10-40
          // minutes.
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.createClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
          Cluster response = future.get();
          System.out.printf("Created cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

    Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 3
    # memory_bytes = 3221225472
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.vcpu_count = cpu
    cluster.capacity_config.memory_bytes = memory_bytes
    cluster.gcp_config.access_config.network_configs = [
        managedkafka_v1.NetworkConfig(subnet=subnet)
    ]
    cluster.rebalance_config.mode = (
        managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
    )
    
    request = managedkafka_v1.CreateClusterRequest(
        parent=client.common_location_path(project_id, region),
        cluster_id=cluster_id,
        cluster=cluster,
    )
    
    try:
        operation = client.create_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # The duration of this operation can vary considerably, typically taking 10-40 minutes.
        # We can set a timeout of 3000s (50 minutes).
        response = operation.result(timeout=3000)
        print("Created cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

クラスタ作成オペレーションをモニタリングする

次のコマンドは、クラスタの作成に gcloud CLI を実行した場合にのみ実行できます。

  • 通常、クラスタの作成には 20 ~ 30 分かかります。クラスタ作成の進行状況を追跡するために、gcloud managed-kafka clusters create コマンドは長時間実行オペレーション(LRO)を使用します。次のコマンドを使用して、LRO をモニタリングできます。

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    次のように置き換えます。

    • OPERATION_ID: 前のセクションのオペレーション ID の値。
    • LOCATION は、前のセクションのロケーションの値に置き換えます。

トラブルシューティング

クラスタの作成時に発生する可能性のあるエラーは次のとおりです。

Service agent service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com has not been granted the required role cloudkms.cryptoKeyEncrypterDecrypter to encrypt data using the KMS key.

Managed Service for Apache Kafka サービス エージェントに、Cloud KMS 鍵にアクセスするために必要な権限がありません。CMEK の構成に必要なロールに関するドキュメントをご覧ください。

Service does not have permission to retrieve subnet. Please grant service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com the managedkafka.serviceAgent role in the IAM policy of the project ${SUBNET_PROJECT} and ensure the Compute Engine API is enabled in project ${SUBNET_PROJECT}

Managed Service for Apache Kafka サービス エージェントに、Kafka クライアントが実行される VPC ネットワークでネットワーキングを構成するために必要なロールがありません。詳細については、プロジェクト間でクラスタを接続するをご覧ください。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。