Connect クラスタを作成する

Connect クラスタは、既存の Kafka デプロイから Google Cloud Managed Service for Apache Kafka クラスタにデータを移動したり、Managed Service for Apache Kafka クラスタから別のサービスや別の Kafka クラスタにデータを移動したりするコネクタの環境を提供します。 Google Cloud セカンダリ Kafka クラスタは、別の Google Cloud Managed Service for Apache Kafka クラスタ、セルフマネージド クラスタ、オンプレミス クラスタにできます。

始める前に

Managed Service for Apache Kafka クラスタがすでに作成されていることを確認します。Connect クラスタを接続する Managed Service for Apache Kafka クラスタの名前が必要です。

各 Connect クラスタは、Managed Service for Apache Kafka クラスタに関連付けられています。このクラスタには、Connect クラスタで実行されているコネクタの状態が保存されます。

Connect クラスタの作成に必要なロールと権限

Connect クラスタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connect クラスタ編集者 roles/managedkafka.connectClusterEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、Connect クラスタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Connect クラスタを作成するには、次の権限が必要です。

  • 指定されたロケーションで Connect クラスタを作成する権限を付与します。 managedkafka.connectClusters.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

必要な ACL プリンシパル

デフォルトでは、ACL が構成されていない場合、Managed Service for Apache Kafka クラスタは Connect クラスタにリソースへのアクセスを許可します。これを行うには、デフォルト設定である allow.everyone.if.no.acl.foundtrue に設定します。

ただし、Managed Service for Apache Kafka クラスタに ACL が構成されている場合、Connect クラスタにはリソースに対する読み取り権限と書き込み権限が自動的に付与されません。これらは手動で付与する必要があります。

ACL でプリンシパルとして使用される Connect クラスタ サービス アカウントは、User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com の形式です。

Kafka クラスタで ACL を構成している場合は、次のコマンドを使用して、Connect クラスタにトピックの読み取りと書き込み権限、コンシューマー グループの読み取り権限を付与します。

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

これらのコマンドの詳細については、きめ細かいアクセス制御用に Apache Kafka ACL を構成するをご覧ください。

別のプロジェクトに Connect クラスタを作成する

Managed Service for Apache Kafka は、サービス エージェントを使用してGoogle Cloud リソースにアクセスします。サービス エージェントは、クラスタを作成するプロジェクトに関連付けられます。

Managed Service for Apache Kafka クラスタとは異なるプロジェクトに Connect クラスタを作成すると、Connect クラスタと Kafka クラスタは、それぞれのプロジェクトに関連付けられたサービス エージェントを使用します。この場合、Connect クラスタのサービス エージェントには、Kafka クラスタのプロジェクト内のリソースにアクセスする権限が必要です。 Google Cloud

必要な権限を付与するには、Kafka クラスタのプロジェクトで、Connect クラスタのサービス エージェントに Managed Kafka サービス エージェント ロールを付与します。たとえば、プロジェクト kafka-project に Kafka クラスタを作成し、プロジェクト connect-project に Connect クラスタを作成する場合は、kafka-project の Managed Kafka サービス エージェント ロールを connect-project に関連付けられたサービス エージェントに付与します。

サービス エージェントのメールアドレスの形式は service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com です。ここで、PROJECT_NUMBER はプロジェクト番号です。ロールを付与する方法の詳細については、ロールを作成してサービス エージェントに付与するをご覧ください。

Connect クラスタのプロパティ

このセクションでは、Connect クラスタのプロパティについて説明します。

Connect クラスタ名

作成する Connect クラスタの名前。Connect クラスタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。クラスタの名前は不変です。

プライマリ Kafka クラスタ

Connect クラスタに関連付けられている Managed Service for Apache Kafka クラスタ。この関連付けられたクラスタ(プライマリ クラスタ)には、Connect クラスタで実行されているコネクタの状態が保存されます。通常、プライマリ Managed Service for Apache Kafka クラスタは、Connect クラスタで実行されているすべてのソースコネクタの宛先であり、すべてのシンクコネクタの入力としても機能します。

1 つの Managed Service for Apache Kafka クラスタに複数の Connect クラスタを設定できます。別のプロジェクトで Managed Service for Apache Kafka クラスタを選択する場合は、適切な権限が構成されていることを確認してください。

Connect クラスタを作成した後に、別の Kafka クラスタに更新することはできません。

レイテンシとネットワーク コストに関するリージョン コロケーションのメリット

Managed Service for Apache Kafka クラスタと Connect クラスタを同じリージョンに配置すると、レイテンシとネットワーク コストが削減されます。たとえば、Managed Service for Apache Kafka クラスタが region-a にあり、シンク コネクタを使用して、この Managed Service for Apache Kafka クラスタ(ソース)から region-a にもある BigQuery テーブル(シンク)にデータを書き込むとします。Connect クラスタを region-a にデプロイすると、このデプロイ オプションにより、BigQuery 書き込みオペレーションのレイテンシが最小限に抑えられ、Managed Service for Apache Kafka クラスタと Connect クラスタ間のリージョン間ネットワーク転送費用がなくなります。

マルチシステム レイテンシと費用の考慮事項

Kafka Connect はコネクタを使用してシステム間でデータを移動します。コネクタの一方の側は常に Managed Service for Apache Kafka クラスタとやり取りします。単一の Kafka Connect クラスタで複数のコネクタを実行できます。各コネクタは、ソース(システムからデータを取得する)またはシンク(システムにデータをプッシュする)として機能します。

Managed Service for Apache Kafka クラスタと同じリージョンにある Connect クラスタは、クラスタ間の通信レイテンシが低くなるというメリットがありますが、各コネクタは BigQuery テーブルや別の Kafka クラスタなどの別のシステムともやり取りします。Connect クラスタと Managed Service for Apache Kafka クラスタが同じ場所に配置されている場合でも、他のシステムは別のリージョンに存在する可能性があります。これにより、レイテンシと費用が増加します。パイプライン全体のレイテンシは、Managed Service for Apache Kafka クラスタ、Connect クラスタ、ソースまたはシンク システムの 3 つのシステムのロケーションによって異なります。

たとえば、Managed Service for Apache Kafka クラスタが region-a にあり、Connect クラスタが region-b にあり、region-c のバケットに Cloud Storage コネクタを使用している場合、2 つのネットワーク ホップ(region-a から region-bregion-b から region-c。コネクタの方向によっては逆方向)に対して課金されます。

レイテンシと費用の両方を最適化するために、Connect クラスタの配置を計画する際は、関連するすべてのリージョンを慎重に検討してください。

容量構成

容量構成では、Connect クラスタの各 vCPU の vCPU 数とメモリ量を構成する必要があります。Connect クラスタの容量は、作成後に更新できます。容量構成のプロパティは次のとおりです。

  • vCPU: Connect クラスタに割り当てられた vCPU の数。最小値は 3 vCPU です。

  • メモリ: 各 vCPU に割り当てられるメモリ容量。vCPU あたり 1 GiB ~ 8 GiB の範囲でプロビジョニングする必要があります。クラスタの作成後に、これらの上限内でメモリ量を増減できます。

    たとえば、6 個の vCPU を使用してクラスタを作成する場合、クラスタに割り当てることができる最小メモリは 6 GiB(vCPU あたり 1 GiB)で、最大メモリは 48 GiB(vCPU あたり 8 GiB)です。

Connect クラスタ内の各ワーカーに割り当てられた vCPU とメモリは、クラスタのパフォーマンス、容量、費用に大きな影響を与えます。vCPU とメモリが Connect クラスタに与える影響の内訳は次のとおりです。

vCPU 数

  • Kafka Connect は、コネクタの作業をタスクに分割します。各タスクはデータを並行して処理できます。vCPU が多いほど、同時に実行できるタスクが増え、スループットが向上します。

  • vCPU を増やすと、Connect クラスタの費用が増加します。

メモリ

  • Kafka Connect は、コネクタと Managed Service for Apache Kafka の間でデータが流れるときに、データをバッファリングするためにメモリを使用します。メモリが大きいほど、バッファを大きくできます。メモリを増やすと、特に大量のデータ ストリームの場合に、スループットを向上させることができます。非常に大きなメッセージやレコードを処理するコネクタには、OutOfMemoryError 例外が発生することなく処理できる十分なメモリが必要です。

  • メモリを増やすと、Connect クラスタの費用が増加します。

  • 複雑な変換ロジックを使用している場合は、より多くのメモリ割り当てが必要です。

目標は、Connect クラスタに適した容量構成を選択することです。これを行うには、Connect クラスタが処理できるスループットを把握する必要があります。

ワーカー(プライマリ)サブネット

ワーカー サブネットプライマリ サブネットとも呼ばれます)は、VPC ネットワークを Connect クラスタに接続します。このサブネットにより、クラスタ ワーカーは、コンシューマー ネットワーク内のソースとシンクのエンドポイント(Managed Service for Apache Kafka クラスタやセルフホスト Kafka クラスタなど)にアクセスできます。

ワーカー サブネットを構成するための要件は次のとおりです。

  • ワーカー サブネットは必須です。

  • サブネットは Connect クラスタと同じリージョンに配置する必要があります。

  • サブネットは、プライマリ Kafka クラスタ接続されたサブネットのリストのいずれかと同じ親 VPC に存在する必要があります。

  • サブネットの CIDR の範囲の最小サイズは /22(1,024 個のアドレス)にする必要があります。

クラスタ ワーカーには、Private Service Connect インターフェースを使用して、ワーカー サブネット内の IP アドレスが割り当てられます。ワーカーは、サブネットの VPC ネットワークからアクセス可能な任意のネットワーク宛先にアクセスできます。ただし、次の要件があります。

  • エンドポイントは 172.16.0.0/14 CIDR の範囲内に存在してはなりません。この範囲は、Managed Service for Apache Kafka Connect の内部使用のために予約されています。
  • ファイアウォール ルールでトラフィックを許可する必要があります。ネットワーク アタッチメントのセキュリティを構成するをご覧ください。
  • インターネット トラフィックの場合は、Cloud NAT を構成する必要があります。たとえば、インターネット経由でアクセス可能な Kafka クラスタからデータを複製するには、MirrorMaker コネクタに Cloud NAT が必要です。
  • ワーカー サブネット VPC とは異なる VPC にある Private Service Connect エンドポイントにアクセスするには、サポートされているコンシューマー構成(NCC など)を使用していることを確認する必要があります。詳細については、エンドポイントを介した公開サービスへのアクセスについてをご覧ください。

解決可能な DNS ドメイン

DNS ドメイン名とも呼ばれる解決可能な DNS ドメインを使用すると、コンシューマー VPC ネットワークの DNS アドレスをテナント VPC で使用できるようになります。これにより、Connect クラスタは DNS 名を IP アドレスに解決し、MirrorMaker コネクタの他の Kafka クラスタなど、他のサービスとの通信を容易にします。

解決可能な DNS ドメインの場合は、Managed Service for Apache Kafka クラスタを選択できます。プライマリ Managed Service for Apache Kafka クラスタのブートストラップ アドレスは、解決可能な DNS ドメインのリストに自動的に含まれるため、DNS ドメイン名を構成する必要はありません。

ただし、DNS ドメインを手動で指定することもできます。これは、外部 Kafka クラスタを選択する場合に必要です。プライマリ Managed Service for Apache Kafka クラスタの DNS ドメインは自動的に含まれます。他の Kafka クラスタでは、DNS ドメインの構成が引き続き必要です。

Secret Manager のリソース

一部のコネクタでは、構成の一部としてパスワードなどの機密データが必要です。このタイプのデータを安全に管理するには、データを Secret Manager に保存し、Connect クラスタにシークレットへのアクセス権を付与します。

Kafka Connect で Secret Manager のシークレットを使用するには、次の操作を行います。

  1. マネージド Kafka サービス アカウントに Secret Manager のシークレット アクセサーroles/secretmanager.secretAccessor)ロールを付与します。このロールにより、Connect クラスタはシークレットにアクセスできます。

  2. センシティブ データを含む Secret を作成します。詳細については、シークレットを作成するをご覧ください。

  3. Connect クラスタを作成または更新するときに、クラスタがアクセスできるシークレットを指定します。Connect クラスタごとに最大 32 個のシークレットを指定できます。

Secret はクラスタ ワーカーのファイルとしてマウントされます。コネクタはこれらのファイルに対する読み取り専用アクセス権を持っています。コネクタを作成するときに、コネクタの構成プロパティでシークレットを参照できます。

  • シークレット ファイルのパスを参照するには、次の形式を使用します。

    /var/secrets/PROJECT_NAME-SECRET_NAME-SECRET_VERSION
    

    例: ssl.truststore.location=/var/secrets/project1-truststore-1

  • シークレットのを構成値(パスワードなど)として使用するには、次の形式を使用します。

    ${directory:/var/secrets:PROJECT_NAME-SECRET_NAME-SECRET_VERSION}
    

    例: password=${directory:/var/secrets:project1-database_password-3}

次のように置き換えます。

  • PROJECT_NAME: Google Cloud プロジェクトの名前。
  • SECRET_NAME: Secret の名前。
  • SECRET_VERSION: シークレットのバージョン。

ラベル

ラベルは、整理と識別に役立つ Key-Value ペアです。これらは、Connect クラスタの整理に役立ちます。各 Connect クラスタにラベルをアタッチし、そのラベルに基づいてリソースをフィルタできます。ラベルの例: environment:prodapplication:web-app

Connect クラスタを作成する

クラスタを作成する前に、Connect クラスタのプロパティのドキュメントを確認してください。

Connect クラスタの作成には 20 ~ 30 分かかります。

コンソール

  1. Google Cloud コンソールで、[クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. [作成] をクリックします。

  3. [Connect クラスタ名] フィールドに、Connect クラスタの名前を入力します。詳細については、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。

  4. [プライマリ Kafka クラスタ] リストで、Managed Service for Apache Kafka クラスタを選択します。詳細については、プライマリ Kafka クラスタをご覧ください。

  5. [リージョン] リストで、Connect クラスタのロケーションを選択します。ロケーションの選択方法については、プライマリ Kafka クラスタをご覧ください。

  6. [容量構成] セクションで、次のフィールドに値を入力するか、デフォルト値をそのまま使用します。

    • [vCPUs] フィールドに、クラスタの仮想 CPU の数を入力します。

    • [メモリ] フィールドに、CPU あたりのメモリ容量(GiB)を入力します。値は CPU あたり 8 GiB を超えることはできません。

    詳細については、容量構成をご覧ください。

  7. [ネットワーク構成] セクションで、[ネットワーク] リストから VPC ネットワークを選択するか、デフォルト値のままにします。このリストは、プライマリ Kafka クラスタを選択すると入力されます。

  8. [ワーカー サブネット] セクションで、[サブネット] リストからサブネットを選択するか、デフォルト値のままにします。詳細については、ワーカー サブネットをご覧ください。サブネットを選択すると、[サブネットの URI パス] フィールドに自動的に入力されます。

  9. 省略可: 解決可能な DNS ドメインを追加します。プライマリ Kafka クラスタの DNS ドメインは、解決可能な DNS ドメインとして自動的に追加されます。追加の DNS ドメインを指定する手順は次のとおりです。

    1. [解決可能な DNS ドメイン] セクションを開きます。

    2. [DNS ドメインを追加] をクリックします。

    3. 既存の Managed Service for Apache Kafka クラスタの DNS ドメインを追加するには、[Kafka クラスタ] リストからクラスタを選択します。それ以外の場合は、[DNS ドメイン] フィールドに DNS ドメインを入力します。

    4. [完了] をクリックします。

  10. 省略可: Secret Manager リソースを追加するには、次の操作を行います。

    1. [Secret Manager リソース] セクションを開きます。

    2. [シークレット リソースを追加] をクリックします。

    3. [シークレット] リストで、シークレットを選択します。

    4. [シークレット バージョン] リストで、シークレットのバージョンを選択します。

    5. [完了] をクリックします。

  11. 省略可: ラベルを追加して、プロジェクトを整理します。ラベルを追加するには、次の手順を行います。

    1. [ラベル] セクションを展開します。

    2. [ラベルを追加] をクリックします。

    3. [キー] フィールドにラベルのキーを入力します。

    4. [] フィールドにラベルの値を入力します。

  12. [作成] をクリックします。

gcloud

  1. Google Cloud コンソールで Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud managed-kafka connect-clusters create コマンドを実行します。

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    次のように置き換えます。

    • CONNECT_CLUSTER_ID: Connect クラスタの ID または名前。Connect クラスタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。Connect クラスタの名前は不変です。

    • LOCATION: Connect クラスタを作成するロケーション。これは、サポートされている Google Cloudリージョンである必要があります。Connect クラスタの作成後に、そのロケーションを変更することはできません。使用可能なロケーションのリストについては、Managed Service for Apache Kafka のロケーションをご覧ください。ロケーションの推奨事項の詳細については、プライマリ Kafka クラスタをご覧ください。

    • CPU: Connect クラスタの vCPU の数。最小値は 3 vCPU です。vCPU 数をご覧ください。

    • MEMORY: Connect クラスタのメモリ容量。「MB」、「MiB」、「GB」、「GiB」、「TB」、「TiB」の単位を使用します。例: 「3GiB」。vCPU あたり 1 GiB ~ 8 GiB の範囲でプロビジョニングする必要があります。メモリをご覧ください。

    • WORKER_SUBNET: Connect クラスタのワーカー サブネット。

      サブネットの形式は projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID です。

      ワーカー サブネットは Connect クラスタと同じリージョンに存在する必要があります。

    • PROJECT_ID:(省略可)Google Cloud プロジェクトの ID。指定しない場合は、現在のプロジェクトが使用されます。

    • KAFKA_CLUSTER: Connect クラスタに関連付けられているプライマリ Managed Service for Apache Kafka クラスタの ID または完全修飾名。Kafka クラスタをご覧ください。Kafka クラスタの形式は projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID です。

      Connect クラスタを作成した後に、別の Kafka クラスタに更新することはできません。

    • SECRET:(省略可)ワーカーに読み込む Secret。Secret Manager の正確な Secret バージョンを指定する必要があります。エイリアスはサポートされていません。1 つのクラスタに最大 32 個のシークレットを読み込むことができます。形式: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME:(省略可)Connect クラスタに公開するサブネットの DNS ドメイン名。Connect クラスタは、IP アドレスに依存するのではなく、ドメイン名を使用してリソースにアクセスできます。DNS ピアリングをご覧ください。

    • LABELS:(省略可)クラスタに関連付けるラベル。ラベルの形式について詳しくは、ラベルをご覧ください。追加するラベルの KEY=VALUE ペアのリスト。キーは小文字で始まり、使用できるのはハイフン(-)、アンダースコア(_)、小文字、数字のみです。値に使用できるのは、ハイフン(-)、アンダースコア(_)、小文字、数字のみです。

    • CONFIG_FILE:(省略可)クラスタまたはコネクタのデフォルトからオーバーライドされる構成を含む JSON ファイルまたは YAML ファイルのパス。このファイルは、インライン JSON または YAML もサポートしています。

    • --async:(省略可)処理中のオペレーションの完了を待たずに、直ちにシェルに戻ります。--async フラグを使用すると、クラスタの作成がバックグラウンドで行われている間に、他のタスクを続行できます。フラグを使用しない場合、システムはオペレーションが完了するまで待機してからレスポンスを返します。他のタスクを続行するには、クラスタが完全に更新されるまで待つ必要があります。

    次のようなレスポンスが返されます。

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    OPERATION_ID を保存して、進捗状況を追跡します。たとえば、この値は operation-1753590328249-63ae19098cc06-64300a0a-06512d02 です。

Terraform

Terraform リソースを使用して Connect クラスタを作成できます。

resource "google_managed_kafka_connect_cluster" "default" {
  provider           = google-beta
  project            = data.google_project.default.project_id
  connect_cluster_id = "my-connect-cluster-id"
  location           = "us-central1"
  kafka_cluster      = google_managed_kafka_cluster.default.id
  capacity_config {
    vcpu_count   = 12
    memory_bytes = 12884901888 # 12 GiB
  }
  gcp_config {
    access_config {
      network_configs {
        primary_subnet = google_compute_subnetwork.default.id
      }
    }
  }
}

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

Go

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-connect-cluster"
	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)

	// Capacity configuration with 12 vCPU and 12 GiB RAM
	capacityConfig := &managedkafkapb.CapacityConfig{
		VcpuCount:   12,
		MemoryBytes: 12884901888, // 12 GiB in bytes
	}

	// Optionally, you can also specify accessible subnets and resolvable DNS
	// domains as part of your network configuration. For example:
	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
	// 	{
	// 		PrimarySubnet:      primarySubnet,
	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
	// 	},
	// }

	connectCluster := &managedkafkapb.ConnectCluster{
		Name:           clusterPath,
		KafkaCluster:   kafkaCluster,
		CapacityConfig: capacityConfig,
	}

	req := &managedkafkapb.CreateConnectClusterRequest{
		Parent:           locationPath,
		ConnectClusterId: clusterID,
		ConnectCluster:   connectCluster,
	}
	op, err := client.CreateConnectCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
	}
	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
	return nil
}

Java

このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
import com.google.cloud.managedkafka.v1.ConnectCluster;
import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
import com.google.cloud.managedkafka.v1.LocationName;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class CreateConnectCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-connect-cluster";
    String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
    String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
    int cpu = 12;
    long memoryBytes = 12884901888L; // 12 GiB
    createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
  }

  public static void createConnectCluster(
      String projectId,
      String region,
      String clusterId,
      String subnet,
      String kafkaCluster,
      int cpu,
      long memoryBytes)
      throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
        .setMemoryBytes(memoryBytes).build();
    ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
        .setPrimarySubnet(subnet)
        .build();
    // Optionally, you can also specify additional accessible subnets and resolvable
    // DNS domains as part of your network configuration. For example:
    // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
    // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
    ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
        .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
        .build();
    ConnectCluster connectCluster = ConnectCluster.newBuilder()
        .setCapacityConfig(capacityConfig)
        .setGcpConfig(gcpConfig)
        .setKafkaCluster(kafkaCluster)
        .build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.createConnectClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
        .create(settingsBuilder.build())) {
      CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
          .setParent(LocationName.of(projectId, region).toString())
          .setConnectClusterId(clusterId)
          .setConnectCluster(connectCluster)
          .build();

      // The duration of this operation can vary considerably, typically taking
      // between 10-30 minutes.
      OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
          .createConnectClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf(
          "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(), operation.isDone(), future.getMetadata().get().toString());

      while (!future.isDone()) {
        // The pollingFuture gives us the most recent status of the operation
        RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
        OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
        System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
            currentOp.getName(),
            currentOp.isDone());
      }

      // NOTE: future.get() blocks completion until the operation is complete (isDone
      // = True)
      ConnectCluster response = future.get();
      System.out.printf("Created connect cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
          e.getMessage());
      throw e;
    }
  }
}

Python

このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。

Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig

# TODO(developer): Update with your values.
# project_id = "my-project-id"
# region = "us-central1"
# connect_cluster_id = "my-connect-cluster"
# kafka_cluster_id = "my-kafka-cluster"
# primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
# cpu = 12
# memory_bytes = 12884901888  # 12 GiB

connect_client = ManagedKafkaConnectClient()
kafka_client = managedkafka_v1.ManagedKafkaClient()

parent = connect_client.common_location_path(project_id, region)
kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)

connect_cluster = ConnectCluster()
connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
connect_cluster.kafka_cluster = kafka_cluster_path
connect_cluster.capacity_config.vcpu_count = cpu
connect_cluster.capacity_config.memory_bytes = memory_bytes
connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
# Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
# For example:
# connect_cluster.gcp_config.access_config.network_configs = [
#     ConnectNetworkConfig(
#         primary_subnet=primary_subnet,
#         additional_subnets=additional_subnets,
#         dns_domain_names=dns_domain_names,
#     )
# ]

request = CreateConnectClusterRequest(
    parent=parent,
    connect_cluster_id=connect_cluster_id,
    connect_cluster=connect_cluster,
)

try:
    operation = connect_client.create_connect_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    # Creating a Connect cluster can take 10-40 minutes.
    response = operation.result(timeout=3000)
    print("Created Connect cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

クラスタ作成オペレーションをモニタリングする

次のコマンドは、Connect クラスタの作成に gcloud CLI を実行した場合にのみ実行できます。

  • 通常、Connect クラスタの作成には 20 ~ 30 分かかります。クラスタ作成の進行状況を追跡するために、gcloud managed-kafka connect-clusters create コマンドは長時間実行オペレーション(LRO)を使用します。次のコマンドを使用して、LRO をモニタリングできます。

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    次のように置き換えます。

    • OPERATION_ID: 前のセクションのオペレーション ID の値。
    • LOCATION は、前のセクションのロケーションの値に置き換えます。

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。