Connect クラスタは、既存の Kafka デプロイから Google Cloud Managed Service for Apache Kafka クラスタにデータを移動したり、Managed Service for Apache Kafka クラスタから別の Google Cloud サービスまたは別の Kafka クラスタにデータを移動したりするコネクタの環境を提供します。セカンダリ Kafka クラスタは、別の Google Cloud Managed Service for Apache Kafka クラスタ、セルフマネージド クラスタ、オンプレミス クラスタにできます。
始める前に
Managed Service for Apache Kafka クラスタがすでに作成されていることを確認します。Connect クラスタを接続する Managed Service for Apache Kafka クラスタの名前が必要です。
各 Connect クラスタは、Managed Service for Apache Kafka クラスタに関連付けられています。このクラスタには、Connect クラスタで実行されているコネクタの状態が保存されます。
Connect クラスタの作成に必要なロールと権限
Connect クラスタの作成に必要な権限を取得するには、プロジェクトに対する Managed Kafka Connect クラスタ編集者 (roles/managedkafka.connectClusterEditor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、Connect クラスタの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
Connect クラスタを作成するには、次の権限が必要です。
-
指定されたロケーションで Connect クラスタを作成する権限を付与します。
managedkafka.connectClusters.create
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
このロールの詳細については、Managed Service for Apache Kafka の事前定義ロールをご覧ください。
必要な ACL プリンシパル
デフォルトでは、ACL が構成されていない場合、Managed Service for Apache Kafka クラスタは Connect クラスタにリソースへのアクセスを許可します。これを行うには、デフォルト設定である allow.everyone.if.no.acl.found を true に設定します。
ただし、Managed Service for Apache Kafka クラスタに ACL が構成されている場合、Connect クラスタにはリソースに対する読み取り権限と書き込み権限が自動的に付与されません。これらは手動で付与する必要があります。
ACL でプリンシパルとして使用される Connect クラスタ サービス アカウントは、User:service-{consumer project
number}@gcp-sa-managedkafka.iam.gserviceaccount.com の形式です。
Kafka クラスタで ACL を構成している場合は、次のコマンドを使用して、Connect クラスタにトピックの読み取りと書き込み権限、コンシューマー グループの読み取り権限を付与します。
/bin/kafka-acls.sh \
--bootstrap-server BOOTSTRAP_ADDR \
--command-config PATH_TO_CLIENT_PROPERTIES \
--add \
--allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
--operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
--bootstrap-server BOOTSTRAP_ADDR \
--command-config PATH_TO_CLIENT_PROPERTIES \
--add \
--allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
--operation READ --group *
これらのコマンドの詳細については、きめ細かいアクセス制御用に Apache Kafka ACL を構成するをご覧ください。
別のプロジェクトに Connect クラスタを作成する
Connect クラスタを作成すると、同じプロジェクト内の Managed Service for Apache Kafka クラスタと同じサービス エージェントを共有します。この Managed Service for Apache Kafka クラスタが Connect クラスタに接続されたプライマリ Kafka クラスタとして指定されている場合、追加の権限は必要ありません。
サービス エージェントの形式は service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com です。プロジェクト番号は、Connect クラスタと Managed Service for Apache Kafka クラスタを含むプロジェクトのものです。
Connect クラスタがプロジェクト A にあり、関連付けられている Managed Service for Apache Kafka クラスタがプロジェクト B にある場合は、次の操作を行います。
プロジェクト
AとプロジェクトBの両方で Managed Kafka API が有効になっていることを確認します。プロジェクト
Aの Connect クラスタのサービス エージェントを特定します。サービス エージェントの形式は
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.comです。プロジェクト
Bで、Connect クラスタのサービス アカウントに Managed Kafka クライアント ロール(roles/managedkafka.client)を付与します。このロールは、Managed Service for Apache Kafka クラスタに接続し、データの読み取りや書き込みなどのオペレーションを実行するために必要な権限を付与します。
ロールを付与する方法については、ロールを作成してサービス エージェントに付与するをご覧ください。
権限を付与する際は、常に最小権限の原則に従ってください。セキュリティを確保し、不正アクセスを防ぐために、必要な権限のみを付与します。
Connect クラスタのプロパティ
このセクションでは、Connect クラスタのプロパティについて説明します。
Connect クラスタ名
作成する Connect クラスタの名前。Connect クラスタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。クラスタの名前は不変です。
プライマリ Kafka クラスタ
Connect クラスタに関連付けられている Managed Service for Apache Kafka クラスタ。この関連付けられたクラスタ(プライマリ クラスタ)には、Connect クラスタで実行されているコネクタの状態が保存されます。通常、プライマリ Managed Service for Apache Kafka クラスタは、Connect クラスタで実行されているすべてのソースコネクタの宛先と、すべてのシンクコネクタの入力としても機能します。
1 つの Managed Service for Apache Kafka クラスタに複数の Connect クラスタを設定できます。別のプロジェクトで Managed Service for Apache Kafka クラスタを選択する場合は、適切な権限が構成されていることを確認してください。
Connect クラスタを作成した後に、別の Kafka クラスタに更新することはできません。
レイテンシとネットワーク コストに関するリージョン コロケーションのメリット
Managed Service for Apache Kafka クラスタと Connect クラスタを同じリージョンに配置すると、レイテンシとネットワーク コストを削減できます。たとえば、Managed Service for Apache Kafka クラスタが region-a にあり、シンク コネクタを使用して、この Managed Service for Apache Kafka クラスタ(ソース)から region-a にもある BigQuery テーブル(シンク)にデータを書き込むとします。Connect クラスタを region-a にデプロイする場合、このデプロイ オプションにより、BigQuery 書き込みオペレーションのレイテンシが最小限に抑えられ、Managed Service for Apache Kafka クラスタと Connect クラスタ間のリージョン間ネットワーク転送費用がなくなります。
マルチシステム レイテンシと費用の考慮事項
Kafka Connect はコネクタを使用してシステム間でデータを移動します。コネクタの一方の側は常に Managed Service for Apache Kafka クラスタとやり取りします。1 つの Kafka Connect クラスタで複数のコネクタを実行できます。各コネクタは、ソース(システムからデータを取得する)またはシンク(システムにデータをプッシュする)として機能します。
Managed Service for Apache Kafka クラスタと同じリージョンにある Connect クラスタは、クラスタ間の通信レイテンシが低くなるというメリットがありますが、各コネクタは BigQuery テーブルや別の Kafka クラスタなどの別のシステムともやり取りします。Connect クラスタと Managed Service for Apache Kafka クラスタが同じ場所に配置されている場合でも、その別のシステムは別のリージョンに存在する可能性があります。これにより、レイテンシと費用が増加します。パイプライン全体のレイテンシは、Managed Service for Apache Kafka クラスタ、Connect クラスタ、ソースまたはシンク システムの 3 つのシステムのロケーションによって異なります。
たとえば、Managed Service for Apache Kafka クラスタが region-a にあり、Connect クラスタが region-b にあり、region-c のバケットに Cloud Storage コネクタを使用している場合、2 つのネットワーク ホップ(region-a から region-b、region-b から region-c。コネクタの方向によっては逆方向)に対して課金されます。
レイテンシと費用の両方を最適化するために、Connect クラスタの配置を計画する際は、関連するすべてのリージョンを慎重に検討してください。
容量構成
容量構成では、Connect クラスタの各 vCPU の vCPU 数とメモリ量を構成する必要があります。Connect クラスタの容量は、作成後に更新できます。容量構成のプロパティは次のとおりです。
vCPU: Connect クラスタに割り当てられた vCPU の数。最小値は 3 vCPU です。
メモリ: 各 vCPU に割り当てられるメモリ容量。vCPU あたり 1 GiB ~ 8 GiB の範囲でプロビジョニングする必要があります。クラスタの作成後に、これらの上限内でメモリ量を増減できます。
たとえば、6 個の vCPU を使用してクラスタを作成する場合、クラスタに割り当てることができる最小メモリは 6 GiB(vCPU あたり 1 GiB)で、最大メモリは 48 GiB(vCPU あたり 8 GiB)です。
Connect クラスタ内の各ワーカーに割り当てられる vCPU とメモリは、クラスタのパフォーマンス、容量、費用に大きな影響を与えます。vCPU とメモリが Connect クラスタに与える影響の内訳は次のとおりです。
vCPU 数
Kafka Connect は、コネクタの作業をタスクに分割します。各タスクはデータを並行して処理できます。vCPU が多いほど、同時に実行できるタスクが増え、スループットが向上します。
vCPU を増やすと、Connect クラスタの費用が増加します。
メモリ
Kafka Connect は、コネクタと Managed Service for Apache Kafka の間でデータが流れるときに、データをバッファリングするためにメモリを使用します。メモリが大きいほど、バッファを大きくできます。大容量のメモリを使用すると、特に大量のデータ ストリームの場合に、スループットを向上させることができます。非常に大きなメッセージやレコードを処理するコネクタには、
OutOfMemoryError例外が発生することなく処理できる十分なメモリが必要です。メモリを増やすと、Connect クラスタの費用が増加します。
変換ロジックを多用している場合は、より多くのメモリ割り当てが必要です。
目標は、Connect クラスタに適した容量構成を選択することです。そのためには、Connect クラスタが処理できるスループットを把握する必要があります。
ワーカー(プライマリ)サブネット
ワーカー サブネット(プライマリ サブネットとも呼ばれます)は、VPC ネットワークを Connect クラスタに接続します。このサブネットにより、クラスタ ワーカーは、コンシューマー ネットワーク内のソースとシンクのエンドポイント(Managed Service for Apache Kafka クラスタやセルフホスト Kafka クラスタなど)にアクセスできます。
ワーカー サブネットを構成するための要件は次のとおりです。
ワーカー サブネットは必須です。
サブネットは Connect クラスタと同じリージョンに配置する必要があります。
サブネットは、プライマリ Kafka クラスタの接続されたサブネットのリストのいずれかと同じ親 VPC に存在する必要があります。
サブネットの CIDR 範囲の最小サイズは /22(1,024 個のアドレス)にする必要があります。
クラスタ ワーカーには、Private Service Connect インターフェースを使用して、ワーカー サブネット内の IP アドレスが割り当てられます。ワーカーは、サブネットの VPC ネットワークからアクセス可能な任意のネットワーク宛先にアクセスできます。ただし、次の要件があります。
- エンドポイントは
172.16.0.0/14CIDR 範囲内に存在してはなりません。この範囲は、Managed Service for Apache Kafka Connect の内部使用のために予約されています。 - ファイアウォール ルールでトラフィックを許可する必要があります。ネットワーク アタッチメントのセキュリティを構成するをご覧ください。
- インターネット トラフィックの場合は、Cloud NAT を構成する必要があります。たとえば、インターネット経由でアクセス可能な Kafka クラスタからデータを複製するには、MirrorMaker コネクタに Cloud NAT が必要です。
- ワーカー サブネット VPC とは異なる VPC にある Private Service Connect エンドポイントにアクセスするには、サポートされているコンシューマー構成(NCC など)を使用していることを確認する必要があります。詳細については、エンドポイントを介した公開サービスへのアクセスについてをご覧ください。
解決可能な DNS ドメイン
DNS ドメイン名とも呼ばれる解決可能な DNS ドメインを使用すると、コンシューマー VPC ネットワークの DNS アドレスをテナント VPC で使用できるようになります。これにより、Connect クラスタは DNS 名を IP アドレスに解決し、MirrorMaker コネクタの他の Kafka クラスタなど、他のサービスとの通信を容易にできます。
解決可能な DNS ドメインの場合は、Managed Service for Apache Kafka クラスタを選択できます。プライマリ Managed Service for Apache Kafka クラスタの DNS ドメイン名を構成する必要はありません。ブートストラップ アドレスは、解決可能な DNS ドメインのリストに自動的に含まれます。
ただし、DNS ドメインを手動で指定することもできます。これは、外部 Kafka クラスタを選択する場合に必要です。プライマリ Managed Service for Apache Kafka クラスタの DNS ドメインは自動的に含まれます。他の Kafka クラスタでは、DNS ドメインの構成が引き続き必要です。
Secret Manager のリソース
ワーカーに読み込む Secret Manager を指定します。これらのシークレットは Secret Manager に安全に保存され、Connect クラスタで使用できるようになります。
必要に応じて、コネクタ構成で Secret Manager を使用できます。たとえば、鍵ファイルを Connect クラスタに読み込み、コネクタにファイルを読み取らせることができます。Secret Manager はワーカーのファイルとしてマウントされます。
接続クラスタは Secret Manager と直接統合されます。Secret Manager を使用してシークレットを保存し、管理する必要があります。
シークレットを指定する形式は projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID} です。
PROJECT_ID: Secret Manager シークレットが存在するプロジェクトの ID。SECRET_NAME: Secret Manager 内のシークレットの名前。VERSION_ID: シークレットの特定のバージョン番号。「1」、「2」、「3」などの数字。
1 つの Connect クラスタに最大 32 個のシークレットを読み込むことができます。
Connect ワーカーを実行するサービス エージェントに、使用するシークレットに対する secretmanager.secretAccessor ロール(Secret Manager シークレット アクセサー)があることを確認します。このロールにより、Connect クラスタは Secret Manager からシークレット値を取得できます。
ラベル
ラベルは、整理と識別に役立つ Key-Value ペアです。Connect クラスタの整理に役立ちます。各 Connect クラスタにラベルをアタッチし、そのラベルに基づいてリソースをフィルタできます。ラベルの例: environment:prod、application:web-app。
Connect クラスタを作成する
クラスタを作成する前に、Connect クラスタのプロパティのドキュメントを確認してください。
Connect クラスタの作成には 20 ~ 30 分かかります。
コンソール
Google Cloud コンソールで、[クラスタを接続] ページに移動します。
[作成] をクリックします。
[Connect クラスタの作成] ページが開きます。
[Connect クラスタ名] に文字列を入力します。
Connect クラスタの命名方法の詳細については、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。
[プライマリ Kafka クラスタ] で、メニューから Managed Service for Apache Kafka クラスタを選択します。
この Managed Service for Apache Kafka クラスタが実行する機能の詳細については、プライマリ Kafka クラスタをご覧ください。
[ロケーション] で、[リージョン] メニューからサポートされているロケーションを選択するか、デフォルト値を保持します。
適切なロケーションを選択する方法の詳細については、プライマリ Kafka クラスタをご覧ください。
[容量構成] で、[vCPU] と [メモリ] の値を入力するか、デフォルト値を保持します。
[vCPUs] に、クラスタの仮想 CPU の数を入力します。
[メモリ] に、CPU あたりのメモリ容量(GiB)を入力します。CPU あたりのメモリが 8 GiB を超えると、エラー メッセージが表示されます。
Managed Service for Apache Kafka クラスタのサイズ設定方法については、容量構成をご覧ください。
[ネットワーク構成] で、[ネットワーク] メニューから、プライマリ Managed Service for Apache Kafka クラスタのネットワークを選択するか、そのままにします。
[ワーカー サブネット] で、メニューからサブネットを選択するか、サブネットを保持します。
[サブネット URI パス] フィールドには自動的に値が入力されます。詳細については、ワーカー サブネットをご覧ください。
解決可能な DNS ドメインの場合、プライマリ Kafka クラスタの DNS ドメインは、解決可能な DNS ドメインとして自動的に追加されます。
DNS ドメインを追加するには、必要に応じてセクションを開きます。
[DNS ドメインを追加] をクリックします。
メニューから Kafka クラスタを選択します。
DNS ドメインは自動的に入力されます。外部 Kafka クラスタの DNS ドメイン名を入力することもできます。
[完了] をクリックします。
Secret Manager リソースの場合は、必要に応じてセクションを開きます。
[シークレット リソースを追加] をクリックします。
[Secret] メニューから Secret を選択し、[Secret のバージョン] メニューからバージョンを選択します。新しい Secret を作成することもできます。
Connect ワーカーを実行するサービス エージェントに、使用するシークレットに対する Secret Manager のシークレット アクセサーのロールがあることを確認します。Secret Manager の詳細については、Secret Manager リソースをご覧ください。
[完了] をクリックします。
シークレットをさらに追加する必要がある場合は、[シークレット リソースを追加] をクリックします。
[ラベル] で、必要に応じてセクションを開きます。
プロジェクトを整理するには、任意のラベルを Key-Value ペアとしてリソースに追加します。
[ラベルを追加] をクリックして、さまざまな環境、サービス、オーナー、チームなどを追加します。
[作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud managed-kafka connect-clusters createコマンドを実行します。gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \ --location=LOCATION \ --cpu=CPU \ --memory=MEMORY \ --primary-subnet=WORKER_SUBNET \ --kafka-cluster=KAFKA_CLUSTER \ [--project=PROJECT_ID] \ [--secret=SECRET] \ [--dns-name=DNS_DOMAIN_NAME] \ [--config-file=CONFIG_FILE] \ [--labels=LABELS] [--async]次のように置き換えます。
CONNECT_CLUSTER_ID: Connect クラスタの ID または名前。Connect クラスタの命名方法のガイドラインについては、Managed Service for Apache Kafka リソースの命名ガイドラインをご覧ください。Connect クラスタの名前は不変です。
LOCATION: Connect クラスタを作成するロケーション。これは、サポートされている Google Cloudリージョンである必要があります。Connect クラスタの作成後に、そのロケーションを変更することはできません。使用可能なロケーションのリストについては、Managed Service for Apache Kafka のロケーションをご覧ください。ロケーションの推奨事項の詳細については、プライマリ Kafka クラスタをご覧ください。
CPU: Connect クラスタの vCPU の数。最小値は 3 vCPU です。vCPU 数をご覧ください。
MEMORY: Connect クラスタのメモリ容量。「MB」、「MiB」、「GB」、「GiB」、「TB」、「TiB」の単位を使用します。例: 「3GiB」。vCPU あたり 1 GiB ~ 8 GiB の範囲でプロビジョニングする必要があります。メモリをご覧ください。
WORKER_SUBNET: Connect クラスタのワーカー サブネット。
サブネットの形式は
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_IDです。ワーカー サブネットは Connect クラスタと同じリージョンに存在する必要があります。
PROJECT_ID:(省略可)Google Cloud プロジェクトの ID。指定しない場合は、現在のプロジェクトが使用されます。
KAFKA_CLUSTER: Connect クラスタに関連付けられているプライマリ Managed Service for Apache Kafka クラスタの ID または完全修飾名。Kafka クラスタをご覧ください。Kafka クラスタの形式は
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_IDです。Connect クラスタを作成した後に、別の Kafka クラスタに更新することはできません。
SECRET:(省略可)ワーカーに読み込む Secret。Secret Manager の正確なシークレット バージョンを指定する必要があります。エイリアスはサポートされていません。1 つのクラスタに最大 32 個のシークレットを読み込むことができます。形式:
projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_IDDNS_DOMAIN_NAME:(省略可)Connect クラスタに公開するサブネットの DNS ドメイン名。Connect クラスタは、IP アドレスに依存するのではなく、ドメイン名を使用してリソースにアクセスできます。DNS ピアリングをご覧ください。
LABELS:(省略可)クラスタに関連付けるラベル。ラベルの形式について詳しくは、ラベルをご覧ください。追加するラベルの KEY=VALUE ペアのリスト。キーは小文字で始まり、使用できるのはハイフン(-)、アンダースコア(_)、小文字、数字のみです。値に使用できるのは、ハイフン(-)、アンダースコア(_)、小文字、数字のみです。
CONFIG_FILE: (省略可)クラスタまたはコネクタのデフォルトからオーバーライドされる構成を含む JSON ファイルまたは YAML ファイルのパス。このファイルは、インライン JSON または YAML もサポートしています。
--async:(省略可)処理中のオペレーションの完了を待たずに、直ちにシェルに戻ります。--asyncフラグを使用すると、クラスタの作成がバックグラウンドで行われている間に、他のタスクを続行できます。フラグを使用しない場合、システムはオペレーションが完了するまで待機してからレスポンスを返します。他のタスクを続行するには、クラスタが完全に更新されるまで待つ必要があります。
次のようなレスポンスが返されます。
Create request issued for: [sample-connectcluster] Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.OPERATION_IDを保存して進捗状況を追跡します。たとえば、この値はoperation-1753590328249-63ae19098cc06-64300a0a-06512d02です。
Terraform
Terraform リソースを使用して Connect クラスタを作成できます。
Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。
Go
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Go の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Go API のリファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報(ADC)を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
Java
このサンプルを試す前に、 クライアント ライブラリをインストールするにある Java の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Java API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、 ローカル開発環境の ADC の設定をご覧ください。
Python
このサンプルを試す前に、 クライアント ライブラリをインストールするの Python の設定手順を行ってください。詳細については、 Managed Service for Apache Kafka Python API リファレンス ドキュメントをご覧ください。
Managed Service for Apache Kafka に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の ADC の設定をご覧ください。
クラスタ作成オペレーションをモニタリングする
次のコマンドは、Connect クラスタの作成に gcloud CLI を実行した場合にのみ実行できます。
通常、Connect クラスタの作成には 20 ~ 30 分かかります。クラスタ作成の進行状況を追跡するために、
gcloud managed-kafka connect-clusters createコマンドは長時間実行オペレーション(LRO)を使用します。次のコマンドを使用して、LRO をモニタリングできます。gcloud managed-kafka operations describe OPERATION_ID \ --location=LOCATION次のように置き換えます。
OPERATION_ID: 前のセクションのオペレーション ID の値。LOCATIONは、前のセクションのロケーションの値に置き換えます。