Pub/Sub メッセージを Kafka に複製する

このチュートリアルでは、Kafka Connect を使用して Pub/Sub から Managed Service for Apache Kafka クラスタにメッセージを取り込む方法について説明します。

Kafka Connect は、Kafka クラスタと他のシステム間のデータ移動を管理します。このチュートリアルでは、Connect クラスタPub/Sub ソースコネクタを作成します。Pub/Sub ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka トピックに書き込みます。

始める前に

コンソール

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Managed Kafka API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Managed Kafka API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. プロジェクトに次のロールがあることを確認します。 マネージド Kafka クラスタ編集者マネージド Kafka Connect クラスタ編集者マネージド Kafka コネクタ編集者マネージド Kafka トピック編集者Pub/Sub 編集者

    ロールを確認する

    1. Google Cloud コンソールで、[IAM] ページに移動します。

      IAM に移動
    2. プロジェクトを選択します。
    3. [プリンシパル] 列で、自分または自分が所属するグループの行をすべて確認します。所属するグループについては、管理者にお問い合わせください。

    4. 自分のメールアドレスを含む行の [ロール] 列で、ロールのリストに必要なロールが含まれているかどうか確認します。

    ロールを付与する

    1. Google Cloud コンソールで、[IAM] ページに移動します。

      IAM に移動
    2. プロジェクトを選択します。
    3. [ アクセスを許可] をクリックします。
    4. [新しいプリンシパル] フィールドに、ユーザー ID を入力します。 これは通常、Google アカウントのメールアドレスです。

    5. [ロールを選択] をクリックし、ロールを検索します。
    6. 追加のロールを付与するには、 [別のロールを追加] をクリックして各ロールを追加します。
    7. [保存] をクリックします。

gcloud

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud CLI をインストールします。

  3. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  4. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  5. Google Cloud プロジェクトを作成または選択します

    プロジェクトの選択または作成に必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、resourcemanager.projects.create 権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する
    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Google Cloud プロジェクトの名前に置き換えます。

  6. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  7. Managed Kafka API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する

    gcloud services enable managedkafka.googleapis.com
  8. Google Cloud CLI をインストールします。

  9. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  10. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    プロジェクトの選択または作成に必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、resourcemanager.projects.create 権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する
    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Google Cloud プロジェクトの名前に置き換えます。

  12. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  13. Managed Kafka API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する

    gcloud services enable managedkafka.googleapis.com
  14. ユーザー アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/managedkafka.clusterEditor, roles/managedkafka.connectClusterEditor, roles/managedkafka.connectorEditor, roles/managedkafka.topicEditor, roles/pubsub.editor

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • USER_IDENTIFIER: ユーザー アカウントの識別子。例: myemail@example.com
    • ROLE: ユーザー アカウントに付与する IAM ロール。

Pub/Sub トピックとサブスクリプションを作成する

このステップでは、サブスクリプションを使用して Pub/Sub トピックを作成します。

コンソール

  1. [Pub/Sub] > [トピック] ページに移動します。

    [トピック] に移動

  2. [ トピックを作成] をクリックします。

  3. [トピック ID] ボックスに、トピックの名前を入力します。

  4. [デフォルトのサブスクリプションを追加する] チェックボックスがオンになっていることを確認します。

  5. [作成] をクリックします。

gcloud

  1. Pub/Sub トピックを作成するには、gcloud pubsub topics create コマンドを実行します。

    gcloud pubsub topics create TOPIC_ID
    

    TOPIC_ID は、Pub/Sub トピックの名前に置き換えます。

  2. トピックのサブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを実行します。

    gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
    

    SUBSCRIPTION_ID は、Pub/Sub サブスクリプションの名前に置き換えます。

Pub/Sub トピックとサブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

Managed Service for Apache Kafka リソースを作成する

このセクションでは、次の Managed Service for Apache Kafka リソースを作成します。

  • トピックを含む Kafka クラスタ。
  • Pub/Sub コネクタを含む Connect クラスタ。

Kafka クラスタを作成する

このステップでは、Managed Service for Apache Kafka クラスタを作成します。クラスタの作成には最大 30 分かかることがあります。

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [ 作成] をクリックします。
  3. [クラスタ名] フィールドに、クラスタの名前を入力します。
  4. [リージョン] リストで、クラスタのロケーションを選択します。
  5. [ネットワーク構成] で、クラスタにアクセス可能なサブネットを構成します。
    1. [プロジェクト] で、該当するプロジェクトを選択します。
    2. [ネットワーク] で、VPC ネットワークを選択します。
    3. [サブネット] でサブネットを選択します。
    4. [完了] をクリックします。
  6. [作成] をクリックします。

[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。

gcloud

Kafka クラスタを作成するには、managed-kafka clusters create コマンドを実行します。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

次のように置き換えます。

  • KAFKA_CLUSTER: Kafka クラスタの名前
  • REGION: クラスタのロケーション
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: クラスタを作成するサブネット(例: default

サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。

このコマンドは非同期で実行され、オペレーション ID を返します。

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

クラスタの準備ができると、このコマンドの出力にエントリ state: ACTIVE が含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。

Kafka トピックを作成する

Managed Service for Apache Kafka クラスタを作成したら、Kafka トピックを作成します。

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. クラスタの名前をクリックします。

  3. クラスタの詳細ページで、 [トピックを作成] をクリックします。

  4. [トピック名] ボックスに、トピックの名前を入力します。

  5. [作成] をクリックします。

gcloud

Kafka トピックを作成するには、managed-kafka topics create コマンドを実行します。

gcloud managed-kafka topics create KAFKA_TOPIC_NAME \
--cluster=KAFKA_CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

次のように置き換えます。

  • KAFKA_TOPIC_NAME: 作成する Kafka トピックの名前
  • KAFKA_CLUSTER: Kafka クラスタの名前
  • REGION: Kafka クラスタを作成したリージョン

Connect クラスタを作成する

この手順では、Connect クラスタを作成します。Connect クラスタの作成には最大 30 分かかることがあります。

この手順を開始する前に、Managed Service for Apache Kafka クラスタが完全に作成されていることを確認してください。

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. [ 作成] をクリックします。

  3. [Connect クラスタ名] に文字列を入力します。my-connect-cluster

  4. [プライマリ Kafka クラスタ] で、先ほど作成した Kafka を選択します。

  5. [作成] をクリックします。

クラスタの作成中は、クラスタの状態は Creating です。クラスタの作成が完了すると、状態は Active になります。

gcloud

Connect クラスタを作成するには、gcloud managed-kafka connect-clusters create コマンドを実行します。

gcloud managed-kafka connect-clusters create CONNECT_CLUSTER \
  --location=REGION \
  --cpu=12 \
  --memory=12GiB \
  --primary-subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
  --kafka-cluster=KAFKA_CLUSTER \
  --async

次のように置き換えます。

  • CONNECT_CLUSTER: Connect クラスタの名前
  • REGION: Kafka クラスタを作成したリージョン
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: Kafka クラスタを作成したサブネット
  • KAFKA_CLUSTER: Kafka クラスタの名前

このコマンドは非同期で実行され、オペレーション ID を返します。

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

詳細については、クラスタ作成オペレーションをモニタリングするをご覧ください。

IAM ロールを付与する

Managed Kafka サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。

  • Pub/Sub サブスクライバー
  • Pub/Sub 閲覧者

これらのロールにより、コネクタは Pub/Sub からメッセージを読み取ることができます。

コンソール

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    [IAM] に移動

  2. [Google 提供のロール付与を含む] を選択します。

  3. [Managed Kafka Service Account] の行を見つけて、 [プリンシパルを編集] をクリックします。

  4. [別のロールを追加] をクリックし、ロール [Pub/Sub サブスクライバー] を選択します。Pub/Sub 閲覧者ロールについてもこの手順を繰り返します。

  5. [保存] をクリックします。

ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。

gcloud

サービス アカウントに IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを実行します。

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --role=roles/pubsub.subscriber

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --role=roles/pubsub.viewer

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • PROJECT_NUMBER: プロジェクトの番号

プロジェクト番号を確認するには、gcloud projects describe コマンドを使用します。

Pub/Sub ソースコネクタを作成する

このステップでは、Pub/Sub ソースコネクタを作成します。このコネクタは、Pub/Sub からメッセージを読み取り、Kafka トピックに書き込みます。

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタを接続] ページに移動します。

    [Connect クラスタ] に移動

  2. Connect クラスタの名前をクリックします。

  3. [コネクタを作成] をクリックします。

  4. [コネクタ名] に文字列を入力します。例: pubsub-source

  5. [コネクタ プラグイン] リストで、[Pub/Sub Source] を選択します。

  6. [Cloud Pub/Sub サブスクリプション] で、Pub/Sub トピックの作成時に作成されたデフォルトの Pub/Sub を選択します。

  7. [Kafka トピック] で、以前に作成した Kafka トピックを選択します。

  8. [作成] をクリックします。

gcloud

Pub/Sub ソースコネクタを作成するには、gcloud managed-kafka connectors create コマンドを実行します。

gcloud managed-kafka connectors create PUBSUB_CONNECTOR_NAME \
  --connect-cluster=CONNECT_CLUSTER \
  --location=REGION \
  --configs=connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector,\
cps.project=PROJECT_ID,\
cps.streamingPull.enabled=true,\
cps.subscription=SUBSCRIPTION_ID,\
kafka.topic=KAFKA_TOPIC_NAME,\
key.converter=org.apache.kafka.connect.storage.StringConverter,\
tasks.max=3,\
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

次のように置き換えます。

  • PUBSUB_CONNECTOR_NAME: コネクタの名前(pubsub-source-connector など)
  • CONNECT_CLUSTER: Connect クラスタの名前
  • REGION: Connect クラスタを作成したリージョン
  • PROJECT_ID: プロジェクト ID
  • KAFKA_TOPIC_NAME: Kafka トピックの名前
  • SUBSCRIPTION_ID: Pub/Sub サブスクリプションの名前

結果を表示

結果を表示するには、Pub/Sub にメッセージをパブリッシュします。

コンソール

  1. Google Cloud コンソールで、[Pub/Sub] > [トピック] ページに移動します。

    [トピック] に移動

  2. トピックのリストで、Pub/Sub トピックの名前をクリックします。

  3. [メッセージ] をクリックします。

  4. [メッセージをパブリッシュ] をクリックします。

  5. [メッセージの数] に「10」と入力します。

  6. [メッセージ本文] に「{"name": "Alice", "customer_id": 1}」と入力します。

  7. [公開] をクリックします。

gcloud

メッセージを Pub/Sub トピックにパブリッシュするには、gcloud pubsub topics publish コマンドを使用します。

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

TOPIC_ID は、Pub/Sub トピックの名前に置き換えます。

これで、Kafka トピックからメッセージを消費できるようになりました。詳細については、CLI を使用してメッセージを生成して使用するをご覧ください。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

コンソール

  1. Pub/Sub トピックを削除します。

    1. [Pub/Sub] > [トピック] ページに移動します。

      [トピック] に移動

    2. トピックを選択して [削除] をクリックします。

  2. Pub/Sub サブスクリプションを削除します。

    1. [Pub/Sub] > [サブスクリプション] ページに移動します。

      [サブスクリプション] に移動

    2. トピックで作成したサブスクリプションを選択し、[削除] をクリックします。

  3. Connect クラスタを削除します。

    1. [Managed Service for Apache Kafka] > [クラスタを接続] ページに移動します。

      [Connect クラスタ] に移動

    2. Connect クラスタを選択し、[削除] をクリックします。

  4. Kafka クラスタを削除します。

    1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

      [クラスタ] に移動

    2. Kafka クラスタを選択し、[削除] をクリックします。

gcloud

  1. Pub/Sub サブスクリプションとトピックを削除するには、gcloud pubsub subscriptions delete コマンドと gcloud pubsub topics delete コマンドを使用します。

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Connect クラスタを削除するには、gcloud managed-kafka connect-clusters delete コマンドを使用します。

    gcloud managed-kafka connect-clusters delete CONNECT_CLUSTER \
      --location=REGION --async
    
  3. Kafka クラスタを削除するには、gcloud managed-kafka clusters delete コマンドを使用します。

    gcloud managed-kafka clusters delete KAFKA_CLUSTER \
      --location=REGION --async
    

次のステップ