Java プロデューサー アプリケーションを開発する

アプリケーションのデフォルト認証情報(ADC)を使用して Managed Service for Apache Kafka クラスタで認証を行う Java プロデューサー アプリケーションを開発する方法について説明します。ADC を使用すると、 Google Cloud で実行されているアプリケーションは、 Google Cloud サービスへの認証に使用する適切な認証情報を自動的に検出して使用できます。

始める前に

このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。

クラスタを作成する方法

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [ 作成] をクリックします。
  3. [クラスタ名] フィールドに、クラスタの名前を入力します。
  4. [リージョン] リストで、クラスタのロケーションを選択します。
  5. [ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
    1. [プロジェクト] で、該当するプロジェクトを選択します。
    2. [ネットワーク] で、VPC ネットワークを選択します。
    3. [サブネット] でサブネットを選択します。
    4. [完了] をクリックします。
  6. [作成] をクリックします。

[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。

gcloud

Kafka クラスタを作成するには、managed-kafka clusters create コマンドを実行します。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

次のように置き換えます。

  • KAFKA_CLUSTER: Kafka クラスタの名前
  • REGION: クラスタのロケーション
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: クラスタを作成するサブネット(例: default

サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。

このコマンドは非同期で実行され、オペレーション ID を返します。

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

クラスタの準備ができると、このコマンドの出力に state: ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。

必要なロール

クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

クライアント VM を作成する

Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。

  • リージョン。Kafka クラスタと同じリージョンに VM を作成します。

  • サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。

  • アクセス スコープhttps://www.googleapis.com/auth/cloud-platform アクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。

これらのオプションを設定する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで [インスタンスの作成] ページに移動します。

    インスタンスを作成する

  2. [マシンの構成] ペインで、次の操作を行います。

    1. [名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。

    2. [リージョン] リストで、Kafka クラスタと同じリージョンを選択します。

    3. [ゾーン] リストでゾーンを選択します。

  3. ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。

    1. [ネットワーク インターフェース] セクションに移動します。

    2. デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。

    3. [ネットワーク] フィールドで、VPC ネットワークを選択します。

    4. [サブネットワーク] リストで、サブネットを選択します。

    5. [完了] をクリックします。

  4. ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。

    1. [アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。

    2. アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。

  5. [作成] をクリックして VM を作成します。

gcloud

VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • REGION: Kafka クラスタを作成したリージョン(例: us-central1
  • SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
  • ZONE: クラスタを作成したリージョンのゾーン(us-central1-c など)

VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。

IAM ロールを付与する

Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。

  • マネージド Kafka クライアントroles/managedkafka.client
  • サービス アカウント トークン作成者roles/iam.serviceAccountTokenCreator
  • サービス アカウントの OpenID トークン作成者roles/iam.serviceAccountOpenIdTokenCreator

コンソール

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    [IAM] に移動

  2. [Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。

  3. [別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。サービス アカウント トークン作成者ロールとサービス アカウント OpenID トークン作成者ロールについても、この手順を繰り返します。

  4. [保存] をクリックします。

gcloud

IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID

  • PROJECT_NUMBER: プロジェクトの番号

プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。

gcloud projects describe PROJECT_ID

詳細については、プロジェクト名、番号、ID を確認するをご覧ください。

VM に接続する

SSH を使用して VM インスタンスに接続します。

コンソール

  1. [VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  2. VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。

gcloud

VM に接続するには、gcloud compute ssh コマンドを使用します。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • ZONE: VM を作成したゾーン

SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。

Apache Maven プロジェクトを設定する

SSH セッションから、次のコマンドを実行して Maven プロジェクトを設定します。

  1. コマンド sudo apt-get install maven openjdk-17-jdk を使用して、Java と Maven をインストールします。

  2. Apache Maven プロジェクトを設定します。

    このコマンドを実行すると、demo というディレクトリに com.google.example というパッケージが作成されます。

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    
  3. cd demo を使用してプロジェクト ディレクトリに移動します。

Java プロデューサー アプリケーションを作成する

このセクションでは、Kafka トピックにメッセージを生成する Java アプリケーションの作成について説明します。Maven を使用して Java コードを記述してコンパイルし、kafka-client.properties ファイルに必要なパラメータを構成してから、アプリケーションを実行してメッセージを送信します。

プロデューサー コードを記述する

src/main/java/com/google/example/App.java 内のコードを次のコードで置き換えます。

  package com.google.example;

  import java.util.Properties;
  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import org.apache.kafka.clients.producer.RecordMetadata;
  import org.apache.kafka.clients.producer.Callback;


  class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
  }

  public class App {
      public static void main(String[] args) throws Exception {
          Properties p = new Properties();
          p.load(new java.io.FileReader("kafka-client.properties"));

          KafkaProducer producer = new KafkaProducer(p);
          ProducerRecord message = new ProducerRecord("topicName", "key", "value");
          SendCallback callback = new SendCallback();
          producer.send(message,callback);
          producer.close();
      }
  }

アプリケーションをコンパイルする

このアプリケーションをコンパイルするには、Kafka クライアント全般に関連するパッケージと、 Google Cloud固有の認証ロジックが必要です。

  1. デモ プロジェクト ディレクトリには、このプロジェクトの Maven 構成を含む pom.xml があります。pom.xml.<dependencies> セクションに次の行を追加します。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.7.2</version>
    </dependency>
    <dependency>
        <groupId>com.google.cloud.hosted.kafka</groupId>
        <artifactId>managed-kafka-auth-login-handler</artifactId>
        <version>1.0.5</version>
    </dependency>
    
  2. mvn compile を使用してアプリケーションをコンパイルします。

アプリケーションを構成して実行する

  1. プロデューサーは、kafka-client.properties というファイルでクライアント構成パラメータを想定しています。このファイルは、デモ プロジェクト ディレクトリ(pom.xml を含むディレクトリ)に次の内容で作成します。

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
  2. これで、アプリケーションを実行する準備が整いました。

    mvn exec:java -Dexec.mainClass="com.google.example.App" --quiet
    

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。

コンソール

  1. VM インスタンスを削除します。

    1. [VM インスタンス] ページに移動します。

      [VM インスタンス] に移動

    2. VM を選択し、[削除] をクリックします。

  2. Kafka クラスタを削除します。

    1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

      [クラスタ] に移動

    2. Kafka クラスタを選択し、[削除] をクリックします。

gcloud

  1. VM を削除するには、gcloud compute instances delete コマンドを使用します。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Kafka クラスタを削除するには、gcloud managed-kafka clusters delete コマンドを使用します。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。