Kafka コマンドライン ツールを使用してメッセージを生成して使用する

Kafka コマンドライン ツールを使用して Managed Service for Apache Kafka クラスタに接続し、メッセージを生成して使用する方法について説明します。

始める前に

このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。

クラスタを作成する方法

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [ 作成] をクリックします。
  3. [クラスタ名] フィールドに、クラスタの名前を入力します。
  4. [リージョン] リストで、クラスタのロケーションを選択します。
  5. [ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
    1. [プロジェクト] で、該当するプロジェクトを選択します。
    2. [ネットワーク] で、VPC ネットワークを選択します。
    3. [サブネット] でサブネットを選択します。
    4. [完了] をクリックします。
  6. [作成] をクリックします。

[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。

gcloud

Kafka クラスタを作成するには、managed-kafka clusters create コマンドを実行します。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

次のように置き換えます。

  • KAFKA_CLUSTER: Kafka クラスタの名前
  • REGION: クラスタのロケーション
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: クラスタを作成するサブネット(例: default

サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。

このコマンドは非同期で実行され、オペレーション ID を返します。

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

クラスタの準備ができると、このコマンドの出力に state: ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。

必要なロール

クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

クライアント VM を作成する

Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。

  • リージョン。Kafka クラスタと同じリージョンに VM を作成します。

  • サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。

  • アクセス スコープhttps://www.googleapis.com/auth/cloud-platform アクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。

これらのオプションを設定する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで [インスタンスの作成] ページに移動します。

    インスタンスを作成する

  2. [マシンの構成] ペインで、次の操作を行います。

    1. [名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。

    2. [リージョン] リストで、Kafka クラスタと同じリージョンを選択します。

    3. [ゾーン] リストでゾーンを選択します。

  3. ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。

    1. [ネットワーク インターフェース] セクションに移動します。

    2. デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。

    3. [ネットワーク] フィールドで、VPC ネットワークを選択します。

    4. [サブネットワーク] リストで、サブネットを選択します。

    5. [完了] をクリックします。

  4. ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。

    1. [アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。

    2. アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。

  5. [作成] をクリックして VM を作成します。

gcloud

VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • REGION: Kafka クラスタを作成したリージョン(例: us-central1
  • SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
  • ZONE: クラスタを作成したリージョンのゾーン(us-central1-c など)

VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。

IAM ロールを付与する

Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。

  • マネージド Kafka クライアントroles/managedkafka.client
  • サービス アカウント トークン作成者roles/iam.serviceAccountTokenCreator
  • サービス アカウントの OpenID トークン作成者roles/iam.serviceAccountOpenIdTokenCreator

コンソール

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    [IAM] に移動

  2. [Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。

  3. [別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。サービス アカウント トークン作成者ロールとサービス アカウント OpenID トークン作成者ロールについても、この手順を繰り返します。

  4. [保存] をクリックします。

gcloud

IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID

  • PROJECT_NUMBER: プロジェクトの番号

プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。

gcloud projects describe PROJECT_ID

詳細については、プロジェクト名、番号、ID を確認するをご覧ください。

VM に接続する

SSH を使用して VM インスタンスに接続します。

コンソール

  1. [VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  2. VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。

gcloud

VM に接続するには、gcloud compute ssh コマンドを使用します。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • ZONE: VM を作成したゾーン

SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。

Kafka コマンドライン ツールをインストールする

SSH セッションで、次のコマンドを実行して Kafka コマンドライン ツールをインストールします。

  1. Kafka コマンドライン ツールを実行するために必要な Java と、依存関係のダウンロードに役立つ wget をインストールします。次のコマンドは、Debian Linux 環境を使用していることを前提としています。

    sudo apt-get install default-jre wget
    
  2. Kafka コマンドライン ツールをインストールします。

    wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    
  3. 次の環境変数を設定します。

    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
    

認証を設定する

SSH セッションで、次の手順に沿って Managed Service for Apache Kafka 認証ライブラリを設定します。

  1. ライブラリをダウンロードしてローカルにインストールします。

    wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
    sudo apt-get install unzip
    unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
    

    このコマンドにより、ライブラリが Kafka インストール ディレクトリの lib ディレクトリにインストールされます。Kafka コマンドライン ツールは、このディレクトリで Java の依存関係を探します。

  2. テキスト エディタを使用して、client.properties という名前のファイルを作成し、次のコードを貼り付けます。

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    

    ファイルを保存します。このファイルは、次の設定で Kafka クライアントを構成します。

    • Kafka クラスタとの安全な通信には SASL_SSL を使用します。

    • 認証に OAuth 2.0 ベアラー トークンを使用します。

    • ライブラリが提供する GcpLoginCallbackHandler クラスをログイン コールバック ハンドラとして使用して、OAuth 2.0 トークンを取得します。

メッセージの生成と使用

SSH セッションから次のコマンドを実行して、Kafka メッセージを生成して使用します。

  1. ブートストラップ アドレスを環境変数として設定します。

    export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

    次のように置き換えます。

    • CLUSTER_ID: クラスタの名前
    • REGION: クラスタを作成した場所
    • PROJECT_ID: プロジェクト ID

    詳細については、ブートストラップ アドレスを取得するをご覧ください。

  2. クラスタ内のトピックを一覧表示します。

    kafka-topics.sh --list \
      --bootstrap-server $BOOTSTRAP \
      --command-config client.properties
    
  3. トピックにメッセージを書き込みます。

    echo "hello world" | kafka-console-producer.sh \
      --topic KAFKA_TOPIC_NAME \
      --bootstrap-server $BOOTSTRAP \
      --producer.config client.properties
    

    KAFKA_TOPIC_NAME は、トピック名に置き換えます。

  4. トピックからメッセージを使用する。

    kafka-console-consumer.sh \
      --topic KAFKA_TOPIC_NAME \
      --from-beginning \
      --bootstrap-server $BOOTSTRAP \
      --consumer.config client.properties
    

    メッセージの消費を停止するには、Ctrl+C キーを押します。

  5. プロデューサーのパフォーマンス テストを実行します。

    kafka-producer-perf-test.sh \
      --topic KAFKA_TOPIC_NAME \
      --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \
      --producer-props bootstrap.servers=$BOOTSTRAP \
      --producer.config client.properties
    

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。

コンソール

  1. VM インスタンスを削除します。

    1. [VM インスタンス] ページに移動します。

      [VM インスタンス] に移動

    2. VM を選択し、[削除] をクリックします。

  2. Kafka クラスタを削除します。

    1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

      [クラスタ] に移動

    2. Kafka クラスタを選択し、[削除] をクリックします。

gcloud

  1. VM を削除するには、gcloud compute instances delete コマンドを使用します。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Kafka クラスタを削除するには、gcloud managed-kafka clusters delete コマンドを使用します。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。