使用結構定義登錄產生 Avro 訊息

瞭解如何開發 Java 生產者應用程式,使用結構定義登錄 (預先發布版) 產生 Apache Avro 訊息。應用程式會將訊息寫入 Managed Service for Apache Kafka 叢集。

事前準備

開始本教學課程前,請先建立新的 Managed Service for Apache Kafka 叢集。如果已有叢集,可以略過這個步驟。

如何建立叢集

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 點選 「Create」(建立)
  3. 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
  4. 在「Region」(區域) 清單中,選取叢集的位置。
  5. 在「網路設定」中,設定可存取叢集的子網路:
    1. 在「Project」(專案) 部分,選取專案。
    2. 在「Network」(網路) 中選取虛擬私有雲網路。
    3. 在「Subnet」(子網路) 中,選取子網路。
    4. 按一下 [完成]
  6. 點選「建立」

按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active

gcloud

如要建立 Kafka 叢集,請執行 managed-kafka clusters create 指令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

更改下列內容:

  • KAFKA_CLUSTER:Kafka 叢集名稱
  • REGION:叢集位置
  • PROJECT_ID:專案 ID
  • SUBNET_NAME:要建立叢集的子網路,例如 default

如要瞭解支援的位置,請參閱「 Managed Service for Apache Kafka 位置」。

這項指令會非同步執行,並傳回作業 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如要追蹤建立作業的進度,請使用 gcloud managed-kafka operations describe 指令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

叢集準備就緒後,這項指令的輸出內容會包含 state: ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。

必要的角色

如要取得建立及設定用戶端 VM 所需的權限,請要求管理員在專案中授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

設定用戶端 VM

在 Compute Engine 中建立可存取 Kafka 叢集的 Linux 虛擬機器 (VM) 執行個體。設定 VM 時,請設定下列選項:

  • 區域。在與 Kafka 叢集相同的區域中建立 VM。

  • 子網路。在與 Kafka 叢集設定中使用的子網路相同的虛擬私有雲網路中建立 VM。詳情請參閱「查看叢集的子網路」。

  • 存取權範圍。將 https://www.googleapis.com/auth/cloud-platform 存取範圍指派給 VM。這個範圍會授權 VM 將要求傳送至 Managed Kafka API。

下列步驟說明如何設定這些選項。

控制台

  1. 前往 Google Cloud 控制台的「Create an instance」(建立執行個體) 頁面。

    建立執行個體

  2. 在「機器設定」窗格中,執行下列操作:

    1. 在「Name」(名稱) 欄位中,指定執行個體的名稱。詳情請參閱資源命名慣例

    2. 在「Region」(區域) 清單中,選取與 Kafka 叢集相同的區域。

    3. 在「Zone」(可用區) 清單中選取可用區。

  3. 在導覽選單中,按一下「Networking」(網路)。在隨即顯示的「Networking」(網路) 窗格中,執行下列操作:

    1. 前往「網路介面」部分。

    2. 如要展開預設網路介面,請點選 箭頭。

    3. 在「Network」(網路) 欄位中,選擇虛擬私有雲網路。

    4. 在「Subnetwork」(子網路) 清單中,選取子網路。

    5. 按一下 [完成]

  4. 按一下導覽選單中的「Security」(安全性)。在隨即顯示的「安全性」窗格中,執行下列操作:

    1. 為「Access scopes」(存取權範圍) 選取 [Set access for each API] (針對各個 API 設定存取權)

    2. 在存取範圍清單中,找到「Cloud Platform」下拉式清單,然後選取「Enabled」

  5. 按一下「建立」即可建立 VM。

gcloud

如要建立 VM 執行個體,請使用 gcloud compute instances create 指令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

更改下列內容:

  • VM_NAME:VM 的名稱
  • PROJECT_ID:專案 ID
  • REGION:您建立 Kafka 叢集的區域,例如 us-central1
  • SUBNET:與叢集設定中使用的子網路位於相同虛擬私有雲網路的子網路
  • ZONE:您建立叢集的區域中的可用區,例如 us-central1-c

如要進一步瞭解如何建立 VM,請參閱「在特定子網路中建立 VM 執行個體」。

授予 IAM 角色

將下列 Identity and Access Management (IAM) 角色授予 Compute Engine 預設服務帳戶

  • 代管 Kafka 用戶端 (roles/managedkafka.client)
  • 結構定義儲存庫管理員 (roles/managedkafka.schemaRegistryAdmin)
  • 服務帳戶憑證建立者 (roles/iam.serviceAccountTokenCreator)
  • 服務帳戶 OpenID 權杖建立者 (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「身分與存取權管理」頁面

  2. 找出「Compute Engine default service account」(Compute Engine 預設服務帳戶) 的資料列,然後按一下 「Edit principal」(編輯主體)

  3. 按一下「新增其他角色」,然後選取「受管理 Kafka 用戶端」角色。 針對「Schema Registry Admin」(結構定義登錄管理員)、「Service Account Token Creator」(服務帳戶權杖建立者) 和「Service Account OpenID Token Creator」(服務帳戶 OpenID 權杖建立者) 角色,重複執行這個步驟。

  4. 按一下 [儲存]

gcloud

如要授予 IAM 角色,請使用 gcloud projects add-iam-policy-binding 指令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

更改下列內容:

  • PROJECT_ID:專案 ID

  • PROJECT_NUMBER:您的專案編號

如要取得專案編號,請執行 gcloud projects describe 指令:

gcloud projects describe PROJECT_ID

詳情請參閱「尋找專案名稱、編號和 ID」。

連線至 VM

使用 SSH 連線至 VM 執行個體。

控制台

  1. 前往「VM instances」(VM 執行個體) 頁面。

    前往 VM 執行個體

  2. 在 VM 執行個體清單中找到 VM 名稱,然後按一下「SSH」SSH

gcloud

如要連線至 VM,請使用 gcloud compute ssh 指令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

更改下列內容:

  • VM_NAME:VM 的名稱
  • PROJECT_ID:專案 ID
  • ZONE:您建立 VM 的可用區

首次使用 SSH 時,可能需要額外設定。詳情請參閱「關於 SSH 連線」一文。

設定 Apache Maven 專案

在 SSH 工作階段中,執行下列指令來設定 Maven 專案。

  1. 使用下列指令安裝 Java 和 Maven:

    sudo apt-get install maven openjdk-17-jdk
    
  2. 設定 Apache Maven 專案。

    使用下列指令在名為 demo 的目錄中建立套件 com.google.example

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

定義結構定義及其 Java 實作項目

在這個範例中,訊息代表「使用者」,該使用者有名稱和選用 ID。這對應於具有兩個欄位的 Avro 結構定義:必要欄位 name (類型為 string) 和選用的整數 id.。如要在 Java 程式中使用這個結構定義,您也需要產生對應於這個結構定義的物件 Java 實作項目。

  1. 使用 cd demo 切換至專案目錄。

  2. 在程式碼中建立資料夾,用於儲存結構定義檔案:

    mkdir -p src/main/avro
    
  3. 將下列程式碼貼入名為 src/main/avro/User.avsc 的檔案,建立 Avro 結構定義:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. 將下列內容新增至 pom.xml.build 節點,設定 Maven 專案來使用 Avro Java 程式碼產生外掛程式。請注意,pom.xml 可能在 pluginManagement 節點內有其他 plugins 節點。請勿在此步驟中變更 pluginManagement 節點。plugins 節點必須與 pluginManagement 位於相同層級。

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. pom.xmlproject/dependencies 節點結尾處新增下列內容,將 Avro 新增為依附元件。請注意,pom.xml 已經在 dependencyManagement 標記內有 dependencies 節點。請勿在此步驟中變更 dependencyManagement 節點。

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. 產生 Java 來源

      mvn generate-sources
    
  7. 執行下列指令,確認實作來源檔案是否已建立。來源是 Java 類別檔案,可實作 User 物件的建構函式、存取子、序列化器和還原序列化器。您會在生產者程式碼中使用這個類別。

    cat src/main/java/com/google/example/User.java
    

如要進一步瞭解 Apache Avro,請參閱 Apache Avro 入門指南

建立生產者用戶端

本節逐步說明如何編寫、建構及執行 Producer 用戶端。

實作生產端

製作人會使用 KafkaAvroSerializer.java 編碼訊息並管理架構。序列化工具會自動連線至結構定義登錄,在主體下註冊結構定義、擷取其 ID,然後使用 Avro 序列化訊息。您仍需設定生產者和序列化程式。

  1. 將下列程式碼貼到名為 src/main/java/com/google/example/UserProducer.java 的新檔案中,建立生產者用戶端類別

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. src/main/java/com/google/example/SendCallback.java 中定義回呼類別:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. 如要編譯這段程式碼,您需要 org.apache.kafka.clients 套件和序列化程式碼。序列化程式 Maven 構件是透過自訂存放區發布。在 pom.xmlproject 節點中新增下列節點,設定這個存放區:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. pom.xml 檔案的 dependencies 節點中新增下列內容:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. 為確保所有依附元件都已正確解析,請編譯用戶端:

    mvn compile
    

建立結構定義登錄

如要建立結構定義登錄,請執行下列指令:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

更改下列內容:

  • REGISTRY_ID:新結構定義登錄檔的專屬 ID。這會構成登錄檔資源名稱的一部分。名稱必須以英文字母開頭,只能使用英文字母 (a-z, A-Z)、數字 (0-9) 和底線 (_),且不得超過 63 個字元。

  • REGION: Google Cloud 要建立結構定義登錄檔的區域。這個位置必須與使用此登錄檔的 Kafka 叢集或叢集區域相符。

您建立的結構定義尚未上傳至登錄檔。在下列步驟中,製作人用戶端會在首次執行時執行這項操作。

設定及執行 Producer

此時,由於尚未完成設定,因此不會執行製作人。如要設定生產者,請提供 Kafka 和結構定義儲存庫設定。

  1. 在與 pom.xml 相同的目錄中建立名為 client.properties 的檔案,並在當中加入下列內容:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    將 Kafka 和結構定義登錄驗證處理常式依附元件新增至 Maven 專案,方法是在 pom.xml 節點的 dependencies 上方插入下列 kafka-avro-serializer 依附元件:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    如要查看自訂結構定義登錄驗證處理常式驗證處理常式的實作方式,請參閱 GcpBearerAuthCredentialProvider 類別。

  2. 編譯並執行製作人用戶端:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    如果一切順利,您會看到 SendCallback 類別產生的輸出內容 Produced a message successfully

檢查輸出內容

  1. 確認 User 結構定義已在衍生自主題和結構定義名稱的主體名稱下註冊:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    這項指令的輸出內容應如下所示:

    ["newUsers-value"]
    
  2. 確認存放區中註冊的結構定義與 User 相同:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    指令輸出內容應如下所示:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

清除所用資源

為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。

控制台

  1. 刪除 VM 執行個體。

    1. 前往「VM instances」(VM 執行個體) 頁面。

      前往 VM 執行個體

    2. 選取 VM,然後按一下「Delete」(刪除)

  2. 刪除結構定義儲存庫。

    1. 前往「Schema registries」(結構定義登錄) 頁面。

      前往「結構定義登錄」頁面

    2. 按一下結構定義登錄的名稱。

    3. 點選「刪除」。

  3. 刪除 Kafka 叢集。

    1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

      前往「Clusters」(叢集)

    2. 選取 Kafka 叢集,然後按一下「Delete」(刪除)

gcloud

  1. 如要刪除 VM,請使用 gcloud compute instances delete 指令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如要刪除結構定義登錄,請使用 /sdk/gcloud/reference/managed-kafka/schema-registries/delete 指令。

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. 如要刪除 Kafka 叢集,請使用 gcloud managed-kafka clusters delete 指令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

後續步驟

Apache Kafka® 和 Apache Avro 是 The Apache Software Foundation 或其關聯企業在美國和/或其他國家/地區的註冊商標。
Confluent 是 Confluent, Inc. 的註冊商標。