Java プロデューサー アプリケーションを開発する
アプリケーションのデフォルト認証情報(ADC)を使用して Managed Service for Apache Kafka クラスタで認証を行う Java プロデューサー アプリケーションを開発する方法について説明します。ADC を使用すると、 Google Cloud で実行されているアプリケーションは、 Google Cloud サービスへの認証に使用する適切な認証情報を自動的に検出して使用できます。
始める前に
このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。
クラスタを作成する方法
コンソール
- [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
- [ 作成] をクリックします。
- [クラスタ名] フィールドに、クラスタの名前を入力します。
- [リージョン] リストで、クラスタのロケーションを選択します。
-
[ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
- [プロジェクト] で、該当するプロジェクトを選択します。
- [ネットワーク] で、VPC ネットワークを選択します。
- [サブネット] でサブネットを選択します。
- [完了] をクリックします。
- [作成] をクリックします。
[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。
gcloud
Kafka クラスタを作成するには、managed-kafka clusters
create コマンドを実行します。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
次のように置き換えます。
KAFKA_CLUSTER: Kafka クラスタの名前REGION: クラスタのロケーションPROJECT_ID: プロジェクト IDSUBNET_NAME: クラスタを作成するサブネット(例:default)
サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。
このコマンドは非同期で実行され、オペレーション ID を返します。
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
作成オペレーションの進行状況を追跡するには、gcloud managed-kafka
operations describe コマンドを使用します。
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
クラスタの準備ができると、このコマンドの出力に state:
ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。
必要なロール
クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
Compute インスタンス管理者(v1)(
roles/compute.instanceAdmin.v1) -
プロジェクト IAM 管理者(
roles/resourcemanager.projectIamAdmin) -
ロールの閲覧者 (
roles/iam.roleViewer) -
サービス アカウント ユーザー(
roles/iam.serviceAccountUser)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
クライアント VM を作成する
Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。
リージョン。Kafka クラスタと同じリージョンに VM を作成します。
サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。
アクセス スコープ。
https://www.googleapis.com/auth/cloud-platformアクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。
これらのオプションを設定する手順は次のとおりです。
コンソール
Google Cloud コンソールで [インスタンスの作成] ページに移動します。
[マシンの構成] ペインで、次の操作を行います。
[名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。
[リージョン] リストで、Kafka クラスタと同じリージョンを選択します。
[ゾーン] リストでゾーンを選択します。
ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。
[ネットワーク インターフェース] セクションに移動します。
デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。
[ネットワーク] フィールドで、VPC ネットワークを選択します。
[サブネットワーク] リストで、サブネットを選択します。
[完了] をクリックします。
ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。
[アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。
アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。
[作成] をクリックして VM を作成します。
gcloud
VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- REGION: Kafka クラスタを作成したリージョン(例:
us-central1) - SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
- ZONE: クラスタを作成したリージョンのゾーン(
us-central1-cなど)
VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。
IAM ロールを付与する
Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。
- マネージド Kafka クライアント(
roles/managedkafka.client) - サービス アカウント トークン作成者(
roles/iam.serviceAccountTokenCreator) サービス アカウントの OpenID トークン作成者(
roles/iam.serviceAccountOpenIdTokenCreator)
コンソール
Google Cloud コンソールで、[IAM] ページに移動します。
[Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。
[別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。サービス アカウント トークン作成者ロールとサービス アカウント OpenID トークン作成者ロールについても、この手順を繰り返します。
[保存] をクリックします。
gcloud
IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
次のように置き換えます。
PROJECT_ID: プロジェクト ID
PROJECT_NUMBER: プロジェクトの番号
プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。
gcloud projects describe PROJECT_ID
詳細については、プロジェクト名、番号、ID を確認するをご覧ください。
VM に接続する
SSH を使用して VM インスタンスに接続します。
コンソール
[VM インスタンス] ページに移動します。
VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。
gcloud
VM に接続するには、gcloud compute ssh コマンドを使用します。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- ZONE: VM を作成したゾーン
SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。
Apache Maven プロジェクトを設定する
SSH セッションから、次のコマンドを実行して Maven プロジェクトを設定します。
コマンド
sudo apt-get install maven openjdk-17-jdkを使用して、Java と Maven をインストールします。Apache Maven プロジェクトを設定します。
このコマンドを実行すると、
demoというディレクトリにcom.google.exampleというパッケージが作成されます。mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=falsecd demoを使用してプロジェクト ディレクトリに移動します。
Java プロデューサー アプリケーションを作成する
このセクションでは、Kafka トピックにメッセージを生成する Java アプリケーションの作成について説明します。Maven を使用して Java コードを記述してコンパイルし、kafka-client.properties ファイルに必要なパラメータを構成してから、アプリケーションを実行してメッセージを送信します。
プロデューサー コードを記述する
src/main/java/com/google/example/App.java 内のコードを次のコードで置き換えます。
package com.google.example;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
class SendCallback implements Callback {
public void onCompletion(RecordMetadata m, Exception e){
if (e == null){
System.out.println("Produced a message successfully.");
} else {
System.out.println(e.getMessage());
}
}
}
public class App {
public static void main(String[] args) throws Exception {
Properties p = new Properties();
p.load(new java.io.FileReader("kafka-client.properties"));
KafkaProducer producer = new KafkaProducer(p);
ProducerRecord message = new ProducerRecord("topicName", "key", "value");
SendCallback callback = new SendCallback();
producer.send(message,callback);
producer.close();
}
}
アプリケーションをコンパイルする
このアプリケーションをコンパイルするには、Kafka クライアント全般に関連するパッケージと、 Google Cloud固有の認証ロジックが必要です。
デモ プロジェクト ディレクトリには、このプロジェクトの Maven 構成を含む
pom.xmlがあります。pom.xml.の<dependencies>セクションに次の行を追加します。<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency> <dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.5</version> </dependency>mvn compileを使用してアプリケーションをコンパイルします。
アプリケーションを構成して実行する
プロデューサーは、
kafka-client.propertiesというファイルでクライアント構成パラメータを想定しています。このファイルは、デモ プロジェクト ディレクトリ(pom.xmlを含むディレクトリ)に次の内容で作成します。bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;これで、アプリケーションを実行する準備が整いました。
mvn exec:java -Dexec.mainClass="com.google.example.App" --quiet
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。
コンソール
VM インスタンスを削除します。
[VM インスタンス] ページに移動します。
VM を選択し、[削除] をクリックします。
Kafka クラスタを削除します。
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
Kafka クラスタを選択し、[削除] をクリックします。
gcloud
VM を削除するには、
gcloud compute instances deleteコマンドを使用します。gcloud compute instances delete VM_NAME --zone=ZONEKafka クラスタを削除するには、
gcloud managed-kafka clusters deleteコマンドを使用します。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async