Kafka コマンドライン ツールを使用してメッセージを生成して使用する
Kafka コマンドライン ツールを使用して Managed Service for Apache Kafka クラスタに接続し、メッセージを生成して使用する方法について説明します。
始める前に
このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。
クラスタを作成する方法
コンソール
- [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
- [ 作成] をクリックします。
- [クラスタ名] フィールドに、クラスタの名前を入力します。
- [リージョン] リストで、クラスタのロケーションを選択します。
-
[ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
- [プロジェクト] で、該当するプロジェクトを選択します。
- [ネットワーク] で、VPC ネットワークを選択します。
- [サブネット] でサブネットを選択します。
- [完了] をクリックします。
- [作成] をクリックします。
[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。
gcloud
Kafka クラスタを作成するには、managed-kafka clusters
create コマンドを実行します。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
次のように置き換えます。
KAFKA_CLUSTER: Kafka クラスタの名前REGION: クラスタのロケーションPROJECT_ID: プロジェクト IDSUBNET_NAME: クラスタを作成するサブネット(例:default)
サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。
このコマンドは非同期で実行され、オペレーション ID を返します。
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
作成オペレーションの進行状況を追跡するには、gcloud managed-kafka
operations describe コマンドを使用します。
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
クラスタの準備ができると、このコマンドの出力に state:
ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。
必要なロール
クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
Compute インスタンス管理者(v1)(
roles/compute.instanceAdmin.v1) -
プロジェクト IAM 管理者(
roles/resourcemanager.projectIamAdmin) -
ロールの閲覧者 (
roles/iam.roleViewer) -
サービス アカウント ユーザー(
roles/iam.serviceAccountUser)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
クライアント VM を作成する
Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。
リージョン。Kafka クラスタと同じリージョンに VM を作成します。
サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。
アクセス スコープ。
https://www.googleapis.com/auth/cloud-platformアクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。
これらのオプションを設定する手順は次のとおりです。
コンソール
Google Cloud コンソールで [インスタンスの作成] ページに移動します。
[マシンの構成] ペインで、次の操作を行います。
[名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。
[リージョン] リストで、Kafka クラスタと同じリージョンを選択します。
[ゾーン] リストでゾーンを選択します。
ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。
[ネットワーク インターフェース] セクションに移動します。
デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。
[ネットワーク] フィールドで、VPC ネットワークを選択します。
[サブネットワーク] リストで、サブネットを選択します。
[完了] をクリックします。
ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。
[アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。
アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。
[作成] をクリックして VM を作成します。
gcloud
VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- REGION: Kafka クラスタを作成したリージョン(例:
us-central1) - SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
- ZONE: クラスタを作成したリージョンのゾーン(
us-central1-cなど)
VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。
IAM ロールを付与する
Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。
- マネージド Kafka クライアント(
roles/managedkafka.client) - サービス アカウント トークン作成者(
roles/iam.serviceAccountTokenCreator) サービス アカウントの OpenID トークン作成者(
roles/iam.serviceAccountOpenIdTokenCreator)
コンソール
Google Cloud コンソールで、[IAM] ページに移動します。
[Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。
[別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。サービス アカウント トークン作成者ロールとサービス アカウント OpenID トークン作成者ロールについても、この手順を繰り返します。
[保存] をクリックします。
gcloud
IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
次のように置き換えます。
PROJECT_ID: プロジェクト ID
PROJECT_NUMBER: プロジェクトの番号
プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。
gcloud projects describe PROJECT_ID
詳細については、プロジェクト名、番号、ID を確認するをご覧ください。
VM に接続する
SSH を使用して VM インスタンスに接続します。
コンソール
[VM インスタンス] ページに移動します。
VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。
gcloud
VM に接続するには、gcloud compute ssh コマンドを使用します。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- ZONE: VM を作成したゾーン
SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。
Kafka コマンドライン ツールをインストールする
SSH セッションで、次のコマンドを実行して Kafka コマンドライン ツールをインストールします。
Kafka コマンドライン ツールを実行するために必要な Java と、依存関係のダウンロードに役立つ
wgetをインストールします。次のコマンドは、Debian Linux 環境を使用していることを前提としています。sudo apt-get install default-jre wgetKafka コマンドライン ツールをインストールします。
wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar xfz kafka_2.13-3.7.2.tgz次の環境変数を設定します。
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2 export PATH=$PATH:$KAFKA_HOME/bin export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
認証を設定する
SSH セッションで、次の手順に沿って Managed Service for Apache Kafka 認証ライブラリを設定します。
ライブラリをダウンロードしてローカルにインストールします。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip sudo apt-get install unzip unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/このコマンドにより、ライブラリが Kafka インストール ディレクトリの
libディレクトリにインストールされます。Kafka コマンドライン ツールは、このディレクトリで Java の依存関係を探します。テキスト エディタを使用して、
client.propertiesという名前のファイルを作成し、次のコードを貼り付けます。security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;ファイルを保存します。このファイルは、次の設定で Kafka クライアントを構成します。
Kafka クラスタとの安全な通信には SASL_SSL を使用します。
認証に OAuth 2.0 ベアラー トークンを使用します。
ライブラリが提供する
GcpLoginCallbackHandlerクラスをログイン コールバック ハンドラとして使用して、OAuth 2.0 トークンを取得します。
メッセージの生成と使用
SSH セッションから次のコマンドを実行して、Kafka メッセージを生成して使用します。
ブートストラップ アドレスを環境変数として設定します。
export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092次のように置き換えます。
CLUSTER_ID: クラスタの名前REGION: クラスタを作成した場所PROJECT_ID: プロジェクト ID
詳細については、ブートストラップ アドレスを取得するをご覧ください。
クラスタ内のトピックを一覧表示します。
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.propertiesトピックにメッセージを書き込みます。
echo "hello world" | kafka-console-producer.sh \ --topic KAFKA_TOPIC_NAME \ --bootstrap-server $BOOTSTRAP \ --producer.config client.propertiesKAFKA_TOPIC_NAME は、トピック名に置き換えます。
トピックからメッセージを使用する。
kafka-console-consumer.sh \ --topic KAFKA_TOPIC_NAME \ --from-beginning \ --bootstrap-server $BOOTSTRAP \ --consumer.config client.propertiesメッセージの消費を停止するには、Ctrl+C キーを押します。
プロデューサーのパフォーマンス テストを実行します。
kafka-producer-perf-test.sh \ --topic KAFKA_TOPIC_NAME \ --num-records 1000000 --throughput 1000 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP \ --producer.config client.properties
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。
コンソール
VM インスタンスを削除します。
[VM インスタンス] ページに移動します。
VM を選択し、[削除] をクリックします。
Kafka クラスタを削除します。
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
Kafka クラスタを選択し、[削除] をクリックします。
gcloud
VM を削除するには、
gcloud compute instances deleteコマンドを使用します。gcloud compute instances delete VM_NAME --zone=ZONEKafka クラスタを削除するには、
gcloud managed-kafka clusters deleteコマンドを使用します。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async