スキーマ レジストリを使用して Avro メッセージを生成する

スキーマ レジストリプレビュー)を使用して Apache Avro メッセージを生成する Java プロデューサー アプリケーションを開発する方法について説明します。アプリケーションは、Managed Service for Apache Kafka クラスタにメッセージを書き込みます。

始める前に

このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。

クラスタを作成する方法

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [ 作成] をクリックします。
  3. [クラスタ名] フィールドに、クラスタの名前を入力します。
  4. [リージョン] リストで、クラスタのロケーションを選択します。
  5. [ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
    1. [プロジェクト] で、該当するプロジェクトを選択します。
    2. [ネットワーク] で、VPC ネットワークを選択します。
    3. [サブネット] でサブネットを選択します。
    4. [完了] をクリックします。
  6. [作成] をクリックします。

[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。

gcloud

Kafka クラスタを作成するには、managed-kafka clusters create コマンドを実行します。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

次のように置き換えます。

  • KAFKA_CLUSTER: Kafka クラスタの名前
  • REGION: クラスタのロケーション
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: クラスタを作成するサブネット(例: default

サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。

このコマンドは非同期で実行され、オペレーション ID を返します。

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

クラスタの準備ができると、このコマンドの出力に state: ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。

必要なロール

クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

クライアント VM を設定する

Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。

  • リージョン。Kafka クラスタと同じリージョンに VM を作成します。

  • サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。

  • アクセス スコープhttps://www.googleapis.com/auth/cloud-platform アクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。

これらのオプションを設定する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで [インスタンスの作成] ページに移動します。

    インスタンスを作成する

  2. [マシンの構成] ペインで、次の操作を行います。

    1. [名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。

    2. [リージョン] リストで、Kafka クラスタと同じリージョンを選択します。

    3. [ゾーン] リストでゾーンを選択します。

  3. ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。

    1. [ネットワーク インターフェース] セクションに移動します。

    2. デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。

    3. [ネットワーク] フィールドで、VPC ネットワークを選択します。

    4. [サブネットワーク] リストで、サブネットを選択します。

    5. [完了] をクリックします。

  4. ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。

    1. [アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。

    2. アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。

  5. [作成] をクリックして VM を作成します。

gcloud

VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • REGION: Kafka クラスタを作成したリージョン(例: us-central1
  • SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
  • ZONE: クラスタを作成したリージョンのゾーン(us-central1-c など)

VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。

IAM ロールを付与する

Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。

  • マネージド Kafka クライアントroles/managedkafka.client
  • Schema Registry 管理者roles/managedkafka.schemaRegistryAdmin
  • サービス アカウント トークン作成者roles/iam.serviceAccountTokenCreator
  • サービス アカウントの OpenID トークン作成者roles/iam.serviceAccountOpenIdTokenCreator

コンソール

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    [IAM] に移動

  2. [Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。

  3. [別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。スキーマ レジストリ管理者サービス アカウント トークン作成者サービス アカウント OpenID トークン作成者のロールについても、この手順を繰り返します。

  4. [保存] をクリックします。

gcloud

IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID

  • PROJECT_NUMBER: プロジェクトの番号

プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。

gcloud projects describe PROJECT_ID

詳細については、プロジェクト名、番号、ID を確認するをご覧ください。

VM に接続する

SSH を使用して VM インスタンスに接続します。

コンソール

  1. [VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  2. VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。

gcloud

VM に接続するには、gcloud compute ssh コマンドを使用します。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • ZONE: VM を作成したゾーン

SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。

Apache Maven プロジェクトを設定する

SSH セッションから、次のコマンドを実行して Maven プロジェクトを設定します。

  1. 次のコマンドを使用して Java と Maven をインストールします。

    sudo apt-get install maven openjdk-17-jdk
    
  2. Apache Maven プロジェクトを設定します。

    次のコマンドを使用して、demo というディレクトリにパッケージ com.google.example を作成します。

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

スキーマとその Java 実装を定義する

この例では、メッセージは名前とオプションの ID を持つ「ユーザー」を表しています。これは、2 つのフィールド(必須フィールド string 型の name と、オプションの整数 id.)を含む Avro スキーマに対応します。このスキーマを Java プログラムで使用するには、このスキーマに対応するオブジェクトの Java 実装も生成する必要があります。

  1. cd demo を使用してプロジェクト ディレクトリに移動します。

  2. コードでスキーマ ファイルを保存するフォルダを作成します。

    mkdir -p src/main/avro
    
  3. 次のコードを src/main/avro/User.avsc という名前のファイルに貼り付けて、Avro スキーマ定義を作成します。

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. 次のコードを pom.xml.build ノードに追加して、Avro Java コード生成プラグインを使用するように Maven プロジェクトを構成します。pom.xml には、pluginManagement ノード内に他の plugins ノードが含まれている場合があります。このステップでは、pluginManagement ノードを変更しないでください。plugins ノードは pluginManagement と同じレベルにする必要があります。

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. pom.xmlproject/dependencies ノードの末尾に次のコードを追加して、Avro を依存関係として追加します。pom.xml には、dependencyManagement タグ内に dependencies ノードがすでに存在します。このステップでは、dependencyManagement ノードを変更しないでください。

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. Java ソースを生成する

      mvn generate-sources
    
  7. 次のコマンドを実行して、実装ソース ファイルが作成されたことを確認します。ソースは、User オブジェクトのコンストラクタ、アクセサ、シリアライザー、デシリアライザーを実装する Java クラスファイルです。このクラスはプロデューサー コードで使用します。

    cat src/main/java/com/google/example/User.java
    

Apache Avro の詳細については、Apache Avro スタートガイドをご覧ください。

プロデューサー クライアントを作成する

このセクションでは、プロデューサー クライアントの作成、ビルド、実行の手順について説明します。

プロデューサーを実装する

プロデューサーは KafkaAvroSerializer.java を使用してメッセージをエンコードし、スキーマを管理します。シリアライザーは、スキーマ レジストリに自動的に接続し、サブジェクトにスキーマを登録して、その ID を取得し、Avro を使用してメッセージをシリアル化します。プロデューサーとシリアライザーを構成する必要があります。

  1. 次のコードを src/main/java/com/google/example/UserProducer.java という名前の新しいファイルに貼り付けて、プロデューサー クライアント クラスを作成します。

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. src/main/java/com/google/example/SendCallback.java でコールバック クラスを定義します。

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. このコードをコンパイルするには、org.apache.kafka.clients パッケージとシリアライザー コードが必要です。シリアライザー Maven アーティファクトは、カスタム リポジトリを介して配布されます。このリポジトリを構成するには、次のノードを pom.xmlproject ノードに追加します。

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. pom.xml ファイルの dependencies ノードに以下を追加します。

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. すべての依存関係が正しく解決されるように、クライアントをコンパイルします。

    mvn compile
    

スキーマ レジストリを作成する

スキーマ レジストリを作成するには、次のコマンドを実行します。

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

次のように置き換えます。

  • REGISTRY_ID: 新しいスキーマ レジストリの一意の識別子。これは、レジストリのリソース名の一部になります。名前は、先頭を英字にして、英字 (a-z, A-Z)、数字 (0-9)、アンダースコア (_) のみで構成される必要があります。最大文字数は 63 文字です。

  • REGION:スキーマ レジストリが作成される Google Cloud リージョン。このロケーションは、このレジストリを使用する Kafka クラスタのリージョンと一致する必要があります。

作成したスキーマ定義は、まだレジストリにアップロードされていません。プロデューサー クライアントは、次の手順で初めて実行するときにこれを行います。

プロデューサーを構成して実行する

この時点では、プロデューサーは完全に構成されていないため実行されません。プロデューサーを構成するには、Kafka とスキーマ レジストリの両方の構成を指定します。

  1. pom.xml と同じディレクトリに client.properties というファイルを作成し、次の内容を追加します。

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    pom.xmldependencies ノードに次の行を挿入して、Kafka とスキーマ レジストリの認証ハンドラの依存関係を Maven プロジェクトに追加します。kafka-avro-serializer 依存関係のに挿入します。

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    カスタム スキーマ レジストリ認証ハンドラの認証ハンドラの実装を確認する場合は、GcpBearerAuthCredentialProvider クラスをご覧ください。

  2. プロデューサー クライアントをコンパイルして実行します。

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    すべて正常に行われると、SendCallback クラスによって生成された出力 Produced a message successfully が表示されます。

出力を確認する

  1. User スキーマが、トピック名とスキーマ名から派生したサブジェクト名で登録されていることを確認します。

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    このコマンドの出力は次のようになります。

    ["newUsers-value"]
    
  2. リポジトリに登録されているスキーマが User と同じであることを確認します。

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    コマンドの出力は次のようになります。

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。

コンソール

  1. VM インスタンスを削除します。

    1. [VM インスタンス] ページに移動します。

      [VM インスタンス] に移動

    2. VM を選択し、[削除] をクリックします。

  2. スキーマ レジストリを削除します。

    1. [スキーマ レジストリ] ページに移動します。

      スキーマ レジストリに移動

    2. スキーマ レジストリの名前をクリックします。

    3. [削除] をクリックします。

  3. Kafka クラスタを削除します。

    1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

      [クラスタ] に移動

    2. Kafka クラスタを選択し、[削除] をクリックします。

gcloud

  1. VM を削除するには、gcloud compute instances delete コマンドを使用します。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. スキーマ レジストリを削除するには、/sdk/gcloud/reference/managed-kafka/schema-registries/delete コマンドを使用します。

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. Kafka クラスタを削除するには、gcloud managed-kafka clusters delete コマンドを使用します。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

次のステップ

Apache Kafka® と Apache Avro は、米国その他の諸国における Apache Software Foundation またはその関連会社の登録商標です。
Confluent は Confluent, Inc. の登録商標です。