このチュートリアルでは、Kafka Connect を使用して Pub/Sub から Managed Service for Apache Kafka クラスタにメッセージを取り込む方法について説明します。
Kafka Connect は、Kafka クラスタと他のシステム間のデータ移動を管理します。このチュートリアルでは、Connect クラスタと Pub/Sub ソースコネクタを作成します。Pub/Sub ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka トピックに書き込みます。
始める前に
コンソール
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Kafka API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Managed Kafka API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
プロジェクトに次のロールがあることを確認します。 マネージド Kafka クラスタ編集者、 マネージド Kafka Connect クラスタ編集者、 マネージド Kafka コネクタ編集者、 マネージド Kafka トピック編集者、 Pub/Sub 編集者
ロールを確認する
-
Google Cloud コンソールで、[IAM] ページに移動します。
IAM に移動 - プロジェクトを選択します。
-
[プリンシパル] 列で、自分または自分が所属するグループの行をすべて確認します。所属するグループについては、管理者にお問い合わせください。
- 自分のメールアドレスを含む行の [ロール] 列で、ロールのリストに必要なロールが含まれているかどうか確認します。
ロールを付与する
-
Google Cloud コンソールで、[IAM] ページに移動します。
IAM に移動 - プロジェクトを選択します。
- [ アクセスを許可] をクリックします。
-
[新しいプリンシパル] フィールドに、ユーザー ID を入力します。 これは通常、Google アカウントのメールアドレスです。
- [ロールを選択] をクリックし、ロールを検索します。
- 追加のロールを付与するには、 [別のロールを追加] をクリックして各ロールを追加します。
- [保存] をクリックします。
-
gcloud
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
Managed Kafka API を有効にします。
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。gcloud services enable managedkafka.googleapis.com
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
Managed Kafka API を有効にします。
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。gcloud services enable managedkafka.googleapis.com
-
ユーザー アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/managedkafka.clusterEditor, roles/managedkafka.connectClusterEditor, roles/managedkafka.connectorEditor, roles/managedkafka.topicEditor, roles/pubsub.editorgcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
次のように置き換えます。
PROJECT_ID: プロジェクト ID。USER_IDENTIFIER: ユーザー アカウントの識別子。例:myemail@example.comROLE: ユーザー アカウントに付与する IAM ロール。
Pub/Sub トピックとサブスクリプションを作成する
このステップでは、サブスクリプションを使用して Pub/Sub トピックを作成します。
コンソール
[Pub/Sub] > [トピック] ページに移動します。
[ トピックを作成] をクリックします。
[トピック ID] ボックスに、トピックの名前を入力します。
[デフォルトのサブスクリプションを追加する] チェックボックスがオンになっていることを確認します。
[作成] をクリックします。
gcloud
Pub/Sub トピックを作成するには、
gcloud pubsub topics createコマンドを実行します。gcloud pubsub topics create TOPIC_IDTOPIC_IDは、Pub/Sub トピックの名前に置き換えます。トピックのサブスクリプションを作成するには、
gcloud pubsub subscriptions createコマンドを実行します。gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_IDSUBSCRIPTION_IDは、Pub/Sub サブスクリプションの名前に置き換えます。
Pub/Sub トピックとサブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。
Managed Service for Apache Kafka リソースを作成する
このセクションでは、次の Managed Service for Apache Kafka リソースを作成します。
- トピックを含む Kafka クラスタ。
- Pub/Sub コネクタを含む Connect クラスタ。
Kafka クラスタを作成する
このステップでは、Managed Service for Apache Kafka クラスタを作成します。クラスタの作成には最大 30 分かかることがあります。
コンソール
- [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
- [ 作成] をクリックします。
- [クラスタ名] フィールドに、クラスタの名前を入力します。
- [リージョン] リストで、クラスタのロケーションを選択します。
-
[ネットワーク構成] で、クラスタにアクセス可能なサブネットを構成します。
- [プロジェクト] で、該当するプロジェクトを選択します。
- [ネットワーク] で、VPC ネットワークを選択します。
- [サブネット] でサブネットを選択します。
- [完了] をクリックします。
- [作成] をクリックします。
[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。
gcloud
Kafka クラスタを作成するには、managed-kafka clusters
create コマンドを実行します。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
次のように置き換えます。
KAFKA_CLUSTER: Kafka クラスタの名前REGION: クラスタのロケーションPROJECT_ID: プロジェクト IDSUBNET_NAME: クラスタを作成するサブネット(例:default)
サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。
このコマンドは非同期で実行され、オペレーション ID を返します。
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
作成オペレーションの進行状況を追跡するには、gcloud managed-kafka
operations describe コマンドを使用します。
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
クラスタの準備ができると、このコマンドの出力にエントリ state:
ACTIVE が含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。
Kafka トピックを作成する
Managed Service for Apache Kafka クラスタを作成したら、Kafka トピックを作成します。
コンソール
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
クラスタの名前をクリックします。
クラスタの詳細ページで、 [トピックを作成] をクリックします。
[トピック名] ボックスに、トピックの名前を入力します。
[作成] をクリックします。
gcloud
Kafka トピックを作成するには、managed-kafka topics create コマンドを実行します。
gcloud managed-kafka topics create KAFKA_TOPIC_NAME \
--cluster=KAFKA_CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
次のように置き換えます。
KAFKA_TOPIC_NAME: 作成する Kafka トピックの名前KAFKA_CLUSTER: Kafka クラスタの名前REGION: Kafka クラスタを作成したリージョン
Connect クラスタを作成する
この手順では、Connect クラスタを作成します。Connect クラスタの作成には最大 30 分かかることがあります。
この手順を開始する前に、Managed Service for Apache Kafka クラスタが完全に作成されていることを確認してください。
コンソール
[Managed Service for Apache Kafka] > [クラスタを接続] ページに移動します。
[ 作成] をクリックします。
[Connect クラスタ名] に文字列を入力します。
my-connect-cluster[プライマリ Kafka クラスタ] で、先ほど作成した Kafka を選択します。
[作成] をクリックします。
クラスタの作成中は、クラスタの状態は Creating です。クラスタの作成が完了すると、状態は Active になります。
gcloud
Connect クラスタを作成するには、gcloud managed-kafka connect-clusters create コマンドを実行します。
gcloud managed-kafka connect-clusters create CONNECT_CLUSTER \
--location=REGION \
--cpu=12 \
--memory=12GiB \
--primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--kafka-cluster=KAFKA_CLUSTER \
--async
次のように置き換えます。
CONNECT_CLUSTER: Connect クラスタの名前REGION: Kafka クラスタを作成したリージョンPROJECT_ID: プロジェクト IDSUBNET_NAME: Kafka クラスタを作成したサブネットKAFKA_CLUSTER: Kafka クラスタの名前
このコマンドは非同期で実行され、オペレーション ID を返します。
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。
gcloud managed-kafka operations describe OPERATION_ID \
--location=REGION
詳細については、クラスタ作成オペレーションをモニタリングするをご覧ください。
IAM ロールを付与する
Managed Kafka サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。
- Pub/Sub サブスクライバー
- Pub/Sub 閲覧者
これらのロールにより、コネクタは Pub/Sub からメッセージを読み取ることができます。
コンソール
Google Cloud コンソールで、[IAM] ページに移動します。
[Google 提供のロール付与を含む] を選択します。
[Managed Kafka Service Account] の行を見つけて、 [プリンシパルを編集] をクリックします。
[別のロールを追加] をクリックし、ロール [Pub/Sub サブスクライバー] を選択します。Pub/Sub 閲覧者ロールについてもこの手順を繰り返します。
[保存] をクリックします。
ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。
gcloud
サービス アカウントに IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを実行します。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/pubsub.subscriber
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
--role=roles/pubsub.viewer
次のように置き換えます。
PROJECT_ID: プロジェクト IDPROJECT_NUMBER: プロジェクトの番号
プロジェクト番号を確認するには、gcloud projects describe コマンドを使用します。
Pub/Sub ソースコネクタを作成する
このステップでは、Pub/Sub ソースコネクタを作成します。このコネクタは、Pub/Sub からメッセージを読み取り、Kafka トピックに書き込みます。
コンソール
[Managed Service for Apache Kafka] > [クラスタを接続] ページに移動します。
Connect クラスタの名前をクリックします。
[コネクタを作成] をクリックします。
[コネクタ名] に文字列を入力します。例:
pubsub-source[コネクタ プラグイン] リストで、[
Pub/Sub Source] を選択します。[Cloud Pub/Sub サブスクリプション] で、Pub/Sub トピックの作成時に作成されたデフォルトの Pub/Sub を選択します。
[Kafka トピック] で、以前に作成した Kafka トピックを選択します。
[作成] をクリックします。
gcloud
Pub/Sub ソースコネクタを作成するには、gcloud managed-kafka connectors create コマンドを実行します。
gcloud managed-kafka connectors create PUBSUB_CONNECTOR_NAME \
--connect-cluster=CONNECT_CLUSTER \
--location=REGION \
--configs=connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector,\
cps.project=PROJECT_ID,\
cps.streamingPull.enabled=true,\
cps.subscription=SUBSCRIPTION_ID,\
kafka.topic=KAFKA_TOPIC_NAME,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
tasks.max=3,\
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
次のように置き換えます。
PUBSUB_CONNECTOR_NAME: コネクタの名前(pubsub-source-connectorなど)CONNECT_CLUSTER: Connect クラスタの名前REGION: Connect クラスタを作成したリージョンPROJECT_ID: プロジェクト IDKAFKA_TOPIC_NAME: Kafka トピックの名前SUBSCRIPTION_ID: Pub/Sub サブスクリプションの名前
結果を表示
結果を表示するには、Pub/Sub にメッセージをパブリッシュします。
コンソール
Google Cloud コンソールで、[Pub/Sub] > [トピック] ページに移動します。
トピックのリストで、Pub/Sub トピックの名前をクリックします。
[メッセージ] をクリックします。
[メッセージをパブリッシュ] をクリックします。
[メッセージの数] に「
10」と入力します。[メッセージ本文] に「
{"name": "Alice", "customer_id": 1}」と入力します。[公開] をクリックします。
gcloud
メッセージを Pub/Sub トピックにパブリッシュするには、gcloud pubsub topics publish コマンドを使用します。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
TOPIC_ID は、Pub/Sub トピックの名前に置き換えます。
これで、Kafka トピックからメッセージを消費できるようになりました。詳細については、CLI を使用してメッセージを生成して使用するをご覧ください。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
コンソール
Pub/Sub トピックを削除します。
[Pub/Sub] > [トピック] ページに移動します。
トピックを選択して [削除] をクリックします。
Pub/Sub サブスクリプションを削除します。
[Pub/Sub] > [サブスクリプション] ページに移動します。
トピックで作成したサブスクリプションを選択し、[削除] をクリックします。
Connect クラスタを削除します。
[Managed Service for Apache Kafka] > [クラスタを接続] ページに移動します。
Connect クラスタを選択し、[削除] をクリックします。
Kafka クラスタを削除します。
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
Kafka クラスタを選択し、[削除] をクリックします。
gcloud
Pub/Sub サブスクリプションとトピックを削除するには、
gcloud pubsub subscriptions deleteコマンドとgcloud pubsub topics deleteコマンドを使用します。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_IDConnect クラスタを削除するには、
gcloud managed-kafka connect-clusters deleteコマンドを使用します。gcloud managed-kafka connect-clusters delete CONNECT_CLUSTER \ --location=REGION --asyncKafka クラスタを削除するには、
gcloud managed-kafka clusters deleteコマンドを使用します。gcloud managed-kafka clusters delete KAFKA_CLUSTER \ --location=REGION --async
次のステップ
- Pub/Sub コネクタのトラブルシューティングを行う。
- Pub/Sub ソースコネクタの詳細を確認する。
- Kafka Connect の詳細を確認する。