スキーマ レジストリを使用して Avro メッセージを生成する
スキーマ レジストリ(プレビュー)を使用して Apache Avro メッセージを生成する Java プロデューサー アプリケーションを開発する方法について説明します。アプリケーションは、Managed Service for Apache Kafka クラスタにメッセージを書き込みます。
始める前に
このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。
クラスタを作成する方法
コンソール
- [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
- [ 作成] をクリックします。
- [クラスタ名] フィールドに、クラスタの名前を入力します。
- [リージョン] リストで、クラスタのロケーションを選択します。
-
[ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
- [プロジェクト] で、該当するプロジェクトを選択します。
- [ネットワーク] で、VPC ネットワークを選択します。
- [サブネット] でサブネットを選択します。
- [完了] をクリックします。
- [作成] をクリックします。
[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。
gcloud
Kafka クラスタを作成するには、managed-kafka clusters
create コマンドを実行します。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
次のように置き換えます。
KAFKA_CLUSTER: Kafka クラスタの名前REGION: クラスタのロケーションPROJECT_ID: プロジェクト IDSUBNET_NAME: クラスタを作成するサブネット(例:default)
サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。
このコマンドは非同期で実行され、オペレーション ID を返します。
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
作成オペレーションの進行状況を追跡するには、gcloud managed-kafka
operations describe コマンドを使用します。
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
クラスタの準備ができると、このコマンドの出力に state:
ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。
必要なロール
クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
Compute インスタンス管理者(v1)(
roles/compute.instanceAdmin.v1) -
プロジェクト IAM 管理者(
roles/resourcemanager.projectIamAdmin) -
ロールの閲覧者 (
roles/iam.roleViewer) -
サービス アカウント ユーザー(
roles/iam.serviceAccountUser)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
クライアント VM を設定する
Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。
リージョン。Kafka クラスタと同じリージョンに VM を作成します。
サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。
アクセス スコープ。
https://www.googleapis.com/auth/cloud-platformアクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。
これらのオプションを設定する手順は次のとおりです。
コンソール
Google Cloud コンソールで [インスタンスの作成] ページに移動します。
[マシンの構成] ペインで、次の操作を行います。
[名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。
[リージョン] リストで、Kafka クラスタと同じリージョンを選択します。
[ゾーン] リストでゾーンを選択します。
ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。
[ネットワーク インターフェース] セクションに移動します。
デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。
[ネットワーク] フィールドで、VPC ネットワークを選択します。
[サブネットワーク] リストで、サブネットを選択します。
[完了] をクリックします。
ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。
[アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。
アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。
[作成] をクリックして VM を作成します。
gcloud
VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- REGION: Kafka クラスタを作成したリージョン(例:
us-central1) - SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
- ZONE: クラスタを作成したリージョンのゾーン(
us-central1-cなど)
VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。
IAM ロールを付与する
Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。
- マネージド Kafka クライアント(
roles/managedkafka.client) - Schema Registry 管理者(
roles/managedkafka.schemaRegistryAdmin) - サービス アカウント トークン作成者(
roles/iam.serviceAccountTokenCreator) サービス アカウントの OpenID トークン作成者(
roles/iam.serviceAccountOpenIdTokenCreator)
コンソール
Google Cloud コンソールで、[IAM] ページに移動します。
[Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。
[別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。スキーマ レジストリ管理者、サービス アカウント トークン作成者、サービス アカウント OpenID トークン作成者のロールについても、この手順を繰り返します。
[保存] をクリックします。
gcloud
IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
次のように置き換えます。
PROJECT_ID: プロジェクト ID
PROJECT_NUMBER: プロジェクトの番号
プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。
gcloud projects describe PROJECT_ID
詳細については、プロジェクト名、番号、ID を確認するをご覧ください。
VM に接続する
SSH を使用して VM インスタンスに接続します。
コンソール
[VM インスタンス] ページに移動します。
VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。
gcloud
VM に接続するには、gcloud compute ssh コマンドを使用します。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- ZONE: VM を作成したゾーン
SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。
Apache Maven プロジェクトを設定する
SSH セッションから、次のコマンドを実行して Maven プロジェクトを設定します。
次のコマンドを使用して Java と Maven をインストールします。
sudo apt-get install maven openjdk-17-jdkApache Maven プロジェクトを設定します。
次のコマンドを使用して、
demoというディレクトリにパッケージcom.google.exampleを作成します。mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
スキーマとその Java 実装を定義する
この例では、メッセージは名前とオプションの ID を持つ「ユーザー」を表しています。これは、2 つのフィールド(必須フィールド string 型の name と、オプションの整数 id.)を含む Avro スキーマに対応します。このスキーマを Java プログラムで使用するには、このスキーマに対応するオブジェクトの Java 実装も生成する必要があります。
cd demoを使用してプロジェクト ディレクトリに移動します。コードでスキーマ ファイルを保存するフォルダを作成します。
mkdir -p src/main/avro次のコードを
src/main/avro/User.avscという名前のファイルに貼り付けて、Avro スキーマ定義を作成します。{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }次のコードを
pom.xml.のbuildノードに追加して、Avro Java コード生成プラグインを使用するように Maven プロジェクトを構成します。pom.xmlには、pluginManagementノード内に他のpluginsノードが含まれている場合があります。このステップでは、pluginManagementノードを変更しないでください。pluginsノードはpluginManagementと同じレベルにする必要があります。<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>pom.xmlのproject/dependenciesノードの末尾に次のコードを追加して、Avro を依存関係として追加します。pom.xmlには、dependencyManagementタグ内にdependenciesノードがすでに存在します。このステップでは、dependencyManagementノードを変更しないでください。<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>Java ソースを生成する
mvn generate-sources次のコマンドを実行して、実装ソース ファイルが作成されたことを確認します。ソースは、
Userオブジェクトのコンストラクタ、アクセサ、シリアライザー、デシリアライザーを実装する Java クラスファイルです。このクラスはプロデューサー コードで使用します。cat src/main/java/com/google/example/User.java
Apache Avro の詳細については、Apache Avro スタートガイドをご覧ください。
プロデューサー クライアントを作成する
このセクションでは、プロデューサー クライアントの作成、ビルド、実行の手順について説明します。
プロデューサーを実装する
プロデューサーは KafkaAvroSerializer.java を使用してメッセージをエンコードし、スキーマを管理します。シリアライザーは、スキーマ レジストリに自動的に接続し、サブジェクトにスキーマを登録して、その ID を取得し、Avro を使用してメッセージをシリアル化します。プロデューサーとシリアライザーを構成する必要があります。
次のコードを
src/main/java/com/google/example/UserProducer.javaという名前の新しいファイルに貼り付けて、プロデューサー クライアント クラスを作成します。package com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }src/main/java/com/google/example/SendCallback.javaでコールバック クラスを定義します。package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }このコードをコンパイルするには、
org.apache.kafka.clientsパッケージとシリアライザー コードが必要です。シリアライザー Maven アーティファクトは、カスタム リポジトリを介して配布されます。このリポジトリを構成するには、次のノードをpom.xmlのprojectノードに追加します。<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>pom.xmlファイルのdependenciesノードに以下を追加します。<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>すべての依存関係が正しく解決されるように、クライアントをコンパイルします。
mvn compile
スキーマ レジストリを作成する
スキーマ レジストリを作成するには、次のコマンドを実行します。
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
次のように置き換えます。
REGISTRY_ID: 新しいスキーマ レジストリの一意の識別子。これは、レジストリのリソース名の一部になります。名前は、先頭を英字にして、英字
(a-z, A-Z)、数字(0-9)、アンダースコア(_)のみで構成される必要があります。最大文字数は 63 文字です。REGION:スキーマ レジストリが作成される Google Cloud リージョン。このロケーションは、このレジストリを使用する Kafka クラスタのリージョンと一致する必要があります。
作成したスキーマ定義は、まだレジストリにアップロードされていません。プロデューサー クライアントは、次の手順で初めて実行するときにこれを行います。
プロデューサーを構成して実行する
この時点では、プロデューサーは完全に構成されていないため実行されません。プロデューサーを構成するには、Kafka とスキーマ レジストリの両方の構成を指定します。
pom.xmlと同じディレクトリにclient.propertiesというファイルを作成し、次の内容を追加します。bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProviderpom.xmlのdependenciesノードに次の行を挿入して、Kafka とスキーマ レジストリの認証ハンドラの依存関係を Maven プロジェクトに追加します。kafka-avro-serializer依存関係の上に挿入します。<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>カスタム スキーマ レジストリ認証ハンドラの認証ハンドラの実装を確認する場合は、GcpBearerAuthCredentialProvider クラスをご覧ください。
プロデューサー クライアントをコンパイルして実行します。
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducerすべて正常に行われると、
SendCallbackクラスによって生成された出力Produced a message successfullyが表示されます。
出力を確認する
Userスキーマが、トピック名とスキーマ名から派生したサブジェクト名で登録されていることを確認します。SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOSTこのコマンドの出力は次のようになります。
["newUsers-value"]リポジトリに登録されているスキーマが
Userと同じであることを確認します。curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/newUsers-value/versions/1コマンドの出力は次のようになります。
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。
コンソール
VM インスタンスを削除します。
[VM インスタンス] ページに移動します。
VM を選択し、[削除] をクリックします。
スキーマ レジストリを削除します。
[スキーマ レジストリ] ページに移動します。
スキーマ レジストリの名前をクリックします。
[削除] をクリックします。
Kafka クラスタを削除します。
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
Kafka クラスタを選択し、[削除] をクリックします。
gcloud
VM を削除するには、
gcloud compute instances deleteコマンドを使用します。gcloud compute instances delete VM_NAME --zone=ZONEスキーマ レジストリを削除するには、
/sdk/gcloud/reference/managed-kafka/schema-registries/deleteコマンドを使用します。gcloud beta managed-kafka schema-registries delete REGISTRY_ID \ --location=REGIONKafka クラスタを削除するには、
gcloud managed-kafka clusters deleteコマンドを使用します。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
次のステップ
- スキーマ レジストリの概要。
- Managed Service for Apache Kafka の概要。
- Managed Kafka API で認証します。
- Kafka Connect を使用して Avro メッセージを BigQuery に書き込む