Membuat pesan Avro dengan registry skema

Pelajari cara mengembangkan aplikasi produser Java yang menggunakan schema registry (Pratinjau) untuk membuat pesan Apache Avro. Aplikasi menulis pesan ke cluster Managed Service for Apache Kafka.

Sebelum memulai

Sebelum memulai tutorial ini, buat cluster Managed Service for Apache Kafka yang baru. Jika sudah memiliki cluster, Anda dapat melewati langkah ini.

Cara membuat cluster

Konsol

  1. Buka halaman Managed Service for Apache Kafka > Clusters.

    Buka Cluster

  2. Klik Create.
  3. Di kotak Cluster name, masukkan nama untuk cluster.
  4. Dalam daftar Region, pilih lokasi untuk cluster.
  5. Untuk Network configuration, konfigurasikan subnet tempat cluster dapat diakses:
    1. Untuk Project, pilih project Anda.
    2. Untuk Network, pilih jaringan VPC.
    3. Untuk Subnet, pilih subnet.
    4. Klik Done.
  6. Klik Create.

Setelah Anda mengklik Create, status cluster adalah Creating. Saat cluster siap, statusnya adalah Active.

gcloud

Untuk membuat cluster Kafka, jalankan perintah managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Ganti kode berikut:

  • KAFKA_CLUSTER: nama untuk cluster Kafka
  • REGION: lokasi cluster
  • PROJECT_ID: project ID Anda
  • SUBNET_NAME: subnet tempat Anda ingin membuat cluster, misalnya default

Untuk mengetahui informasi tentang lokasi yang didukung, lihat Lokasi Managed Service for Apache Kafka.

Perintah berjalan secara asinkron dan menampilkan ID operasi:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

Untuk melacak progres operasi pembuatan, gunakan perintah gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

Setelah cluster siap, output dari perintah ini akan menyertakan entri state: ACTIVE. Untuk mengetahui informasi selengkapnya, lihat Memantau operasi pembuatan cluster.

Peran yang diperlukan

Agar mendapatkan izin yang diperlukan untuk membuat dan mengonfigurasi VM klien, minta administrator untuk memberi Anda peran IAM berikut di project:

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Menyiapkan VM klien

Buat instance virtual machine (VM) Linux di Compute Engine yang dapat mengakses cluster Kafka. Saat Anda mengonfigurasi VM, tetapkan opsi berikut:

  • Region. Buat VM di region yang sama dengan cluster Kafka Anda.

  • Subnet. Buat VM di jaringan VPC yang sama dengan subnet yang Anda gunakan dalam konfigurasi cluster Kafka. Untuk mengetahui informasi selengkapnya, lihat Melihat subnet cluster.

  • Cakupan akses. Tetapkan cakupan akses https://www.googleapis.com/auth/cloud-platform ke VM. Cakupan ini mengizinkan VM untuk mengirim permintaan ke Managed Kafka API.

Langkah-langkah berikut menunjukkan cara menyetel opsi ini.

Konsol

  1. Di konsol Google Cloud , buka halaman Create an instance.

    Membuat instance

  2. Di panel Machine configuration, lakukan hal berikut:

    1. Di kolom Name, tentukan nama untuk instance Anda. Untuk mengetahui informasi selengkapnya, lihat Konvensi penamaan resource.

    2. Di daftar Region, pilih region yang sama dengan cluster Kafka Anda.

    3. Di daftar Zone, pilih zona.

  3. Di menu navigasi, klik Networking. Di panel Networking yang muncul, lakukan hal berikut:

    1. Buka bagian Network interfaces.

    2. Untuk meluaskan antarmuka jaringan default, klik panah .

    3. Di kolom Network, pilih jaringan VPC.

    4. Dalam daftar Subnetwork, pilih subnet.

    5. Klik Done.

  4. Di menu navigasi, klik Security. Di panel Security yang muncul, lakukan hal berikut:

    1. Di bagian Access scopes, pilih Set access for each API.

    2. Dalam daftar cakupan akses, temukan menu drop-down Cloud Platform dan pilih Aktifkan.

  5. Klik Create untuk membuat VM.

gcloud

Untuk membuat instance VM, gunakan perintah gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

Ganti kode berikut:

  • VM_NAME: nama VM
  • PROJECT_ID: project ID Anda
  • REGION: region tempat Anda membuat cluster Kafka, misalnya us-central1
  • SUBNET: subnet di jaringan VPC yang sama dengan subnet yang Anda gunakan dalam konfigurasi cluster
  • ZONE: zona di region tempat Anda membuat cluster, misalnya us-central1-c

Untuk mengetahui informasi selengkapnya tentang cara membuat VM, lihat Membuat instance VM di subnet tertentu.

Memberikan peran IAM

Berikan peran Identity and Access Management (IAM) berikut ke akun layanan default Compute Engine:

  • Managed Kafka Client (roles/managedkafka.client)
  • Schema Registry Admin (roles/managedkafka.schemaRegistryAdmin)
  • Service Account Token Creator (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

Konsol

  1. Di konsol Google Cloud , buka halaman IAM.

    Buka IAM

  2. Temukan baris untuk akun layanan default Compute Engine, lalu klik Edit principal.

  3. Klik Add another role, lalu pilih peran Managed Kafka Client. Ulangi langkah ini untuk peran Schema Registry Admin, Service Account Token Creator, dan Service Account OpenID Token Creator.

  4. Klik Simpan.

gcloud

Untuk memberikan peran IAM, gunakan perintah gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

Ganti kode berikut:

  • PROJECT_ID: project ID Anda

  • PROJECT_NUMBER: nomor project Anda

Untuk mendapatkan nomor project, jalankan perintah gcloud projects describe:

gcloud projects describe PROJECT_ID

Untuk mengetahui informasi selengkapnya, lihat Menemukan nama, nomor, dan ID project.

Terhubung ke VM

Gunakan SSH untuk terhubung ke instance VM.

Konsol

  1. Buka halaman VM instances.

    Buka instance VM

  2. Dalam daftar instance VM, temukan nama VM, lalu klik SSH.

gcloud

Untuk terhubung ke VM, gunakan perintah gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

Ganti kode berikut:

  • VM_NAME: nama VM
  • PROJECT_ID: project ID Anda
  • ZONE: zona tempat Anda membuat VM

Konfigurasi tambahan mungkin diperlukan untuk penggunaan SSH pertama kali. Untuk mengetahui informasi selengkapnya, lihat Tentang koneksi SSH.

Menyiapkan project Apache Maven

Dari sesi SSH Anda, jalankan perintah berikut untuk menyiapkan project Maven.

  1. Instal Java dan Maven dengan perintah:

    sudo apt-get install maven openjdk-17-jdk
    
  2. Siapkan project Apache Maven.

    Gunakan perintah berikut untuk membuat paket com.google.example dalam direktori bernama demo.

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

Menentukan skema dan implementasi Java-nya

Dalam contoh ini, pesan merepresentasikan "pengguna" yang memiliki nama dan ID opsional. Sesuai dengan skema Avro dengan dua kolom: kolom name wajib diisi dengan jenis string dan bilangan bulat id. opsional Untuk menggunakan skema ini dalam program Java, Anda juga perlu membuat implementasi Java dari objek yang sesuai dengan skema ini.

  1. Ubah ke direktori project dengan cd demo.

  2. Buat folder untuk menyimpan file skema dalam kode Anda:

    mkdir -p src/main/avro
    
  3. Buat definisi skema Avro dengan menempelkan kode berikut ke dalam file bernama src/main/avro/User.avsc:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. Konfigurasi project Maven Anda untuk menggunakan plugin pembuatan kode Java Avro dengan menambahkan kode berikut ke node build dari pom.xml. Anda. Perhatikan bahwa pom.xml mungkin memiliki node plugins lain di dalam node pluginManagement. Jangan ubah node pluginManagement pada langkah ini. Node plugins harus berada di tingkat yang sama dengan pluginManagement.

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. Tambahkan Avro sebagai dependensi dengan menambahkan kode berikut di akhir node project/dependencies dari pom.xml. Perhatikan bahwa pom.xml sudah memiliki node dependencies di dalam tag dependencyManagement. Jangan ubah node dependencyManagement pada langkah ini.

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. Membuat sumber Java

      mvn generate-sources
    
  7. Jalankan perintah berikut untuk memeriksa apakah file sumber penerapan telah dibuat. Sumbernya adalah file class Java yang mengimplementasikan konstruktor, pengakses, serializer, dan de-serializer untuk objek User. Anda akan menggunakan class ini dalam kode produsen.

    cat src/main/java/com/google/example/User.java
    

Untuk mengetahui informasi selengkapnya tentang Apache Avro, lihat Panduan memulai Apache Avro.

Buat klien produser

Bagian ini membahas langkah-langkah penulisan, pembuatan, dan menjalankan klien produsen.

Mengimplementasikan produsen

Produsen menggunakan KafkaAvroSerializer.java untuk mengenkode pesan dan mengelola skemanya. Serializer otomatis terhubung ke registry skema, mendaftarkan skema dalam subjek, mengambil ID-nya, lalu melakukan serialisasi pesan menggunakan Avro. Anda masih perlu mengonfigurasi produsen dan serializer.

  1. Buat class klien produser dengan menempelkan kode berikut ke dalam file baru bernama src/main/java/com/google/example/UserProducer.java

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. Tentukan class callback di src/main/java/com/google/example/SendCallback.java:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. Untuk mengompilasi kode ini, Anda memerlukan paket org.apache.kafka.clients dan kode serializer. Artefak Maven serializer didistribusikan melalui repositori kustom. Tambahkan node berikut ke node project dari pom.xml Anda untuk mengonfigurasi repositori ini:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. Tambahkan kode berikut ke node dependencies di file pom.xml Anda:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. Untuk memastikan semua dependensi diselesaikan dengan benar, kompilasi klien:

    mvn compile
    

Membuat registry skema

Untuk membuat registry skema, jalankan perintah berikut:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

Ganti kode berikut:

  • REGISTRY_ID: ID unik untuk registry skema baru Anda. Bagian ini membentuk bagian dari nama resource registri. Nama harus diawali dengan huruf, hanya berisi huruf (a-z, A-Z), angka (0-9), dan garis bawah (_), serta terdiri dari 63 karakter atau kurang.

  • REGION: Google Cloud region tempat schema registry akan dibuat. Lokasi ini harus cocok dengan region cluster Kafka yang menggunakan registry ini.

Definisi skema yang telah Anda buat belum diupload ke registry. Klien produser melakukan ini saat pertama kali berjalan dalam langkah-langkah berikut.

Mengonfigurasi dan menjalankan produsen

Pada tahap ini, produser tidak akan berjalan karena belum dikonfigurasi sepenuhnya. Untuk mengonfigurasi produsen, berikan konfigurasi Kafka dan schema registry.

  1. Buat file bernama client.properties di direktori yang sama dengan pom.xml Anda dan tambahkan konten berikut ke dalamnya:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    Tambahkan dependensi handler autentikasi Kafka dan schema registry ke project Maven Anda dengan menyisipkan kode berikut ke node dependencies dari pom.xml di atas dependensi kafka-avro-serializer:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    Jika Anda ingin melihat penerapan pengendali autentikasi pengendali autentikasi registry skema kustom, lihat class GcpBearerAuthCredentialProvider.

  2. Kompilasi dan jalankan klien produsen:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    Jika semuanya berjalan lancar, Anda akan melihat output Produced a message successfully yang dihasilkan oleh class SendCallback.

Periksa outputnya

  1. Periksa apakah skema User telah didaftarkan dengan nama subjek yang berasal dari nama topik dan skema:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    Output perintah ini akan terlihat seperti ini:

    ["newUsers-value"]
    
  2. Pastikan skema yang terdaftar di repositori sama dengan User:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    Output perintah akan terlihat seperti ini:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

Pembersihan

Agar akun Google Cloud Anda tidak dikenai biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

Konsol

  1. Hapus instance VM.

    1. Buka halaman VM instances.

      Buka instance VM

    2. Pilih VM, lalu klik Hapus.

  2. Hapus registry skema.

    1. Buka halaman Schema registries.

      Buka Schema registry

    2. Klik nama registry skema.

    3. Klik Hapus.

  3. Hapus cluster Kafka.

    1. Buka halaman Managed Service for Apache Kafka > Clusters.

      Buka Cluster

    2. Pilih cluster Kafka, lalu klik Hapus.

gcloud

  1. Untuk menghapus VM, gunakan perintah gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Untuk menghapus registry skema, gunakan perintah /sdk/gcloud/reference/managed-kafka/schema-registries/delete.

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. Untuk menghapus cluster Kafka, gunakan perintah gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

Langkah berikutnya

Apache Kafka® dan Apache Avro adalah merek dagang terdaftar milik Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lainnya.
Confluent adalah merek dagang terdaftar dari Confluent, Inc.