Produci messaggi Avro con il registro di schema

Scopri come sviluppare un'applicazione di produzione Java che utilizza il registro degli schemi (anteprima) per produrre messaggi Apache Avro. L'applicazione scrive i messaggi in un cluster Managed Service per Apache Kafka.

Prima di iniziare

Prima di iniziare questo tutorial, crea un nuovo cluster Managed Service per Apache Kafka. Se hai già un cluster, puoi saltare questo passaggio.

Come creare un cluster

Console

  1. Vai alla pagina Managed Service per Apache Kafka > Cluster.

    Vai a Cluster

  2. Fai clic su Crea.
  3. Nella casella Nome del cluster, inserisci un nome per il cluster.
  4. Nell'elenco Regione, seleziona una località per il cluster.
  5. Per Configurazione di rete, configura la subnet in cui è accessibile il cluster:
    1. Per Progetto, seleziona il tuo progetto.
    2. In Rete, seleziona la rete VPC.
    3. In Subnet, seleziona la subnet.
    4. Fai clic su Fine.
  6. Fai clic su Crea.

Dopo aver fatto clic su Crea, lo stato del cluster è Creating. Quando il cluster è pronto, lo stato è Active.

gcloud

Per creare un cluster Kafka, esegui il comando managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Sostituisci quanto segue:

  • KAFKA_CLUSTER: un nome per il cluster Kafka
  • REGION: la posizione del cluster
  • PROJECT_ID: il tuo ID progetto
  • SUBNET_NAME: la subnet in cui vuoi creare il cluster, ad esempio default

Per informazioni sulle località supportate, consulta Località di Managed Service per Apache Kafka.

Il comando viene eseguito in modo asincrono e restituisce un ID operazione:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

Per monitorare l'avanzamento dell'operazione di creazione, utilizza il comando gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

Quando il cluster è pronto, l'output di questo comando include la voce state: ACTIVE. Per saperne di più, consulta Monitorare l'operazione di creazione del cluster.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per creare e configurare una VM client, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Configura una VM client

Crea un'istanza di macchina virtuale (VM) Linux in Compute Engine che possa accedere al cluster Kafka. Quando configuri la VM, imposta le seguenti opzioni:

  • Regione. Crea la VM nella stessa regione del cluster Kafka.

  • Subnet. Crea la VM nella stessa rete VPC della subnet che hai utilizzato nella configurazione del cluster Kafka. Per saperne di più, consulta Visualizzare le subnet di un cluster.

  • Ambiti di accesso. Assegna l'https://www.googleapis.com/auth/cloud-platform ambito di accesso alla VM. Questo ambito autorizza la VM a inviare richieste all'API Managed Kafka.

I seguenti passaggi mostrano come impostare queste opzioni.

Console

  1. Nella console Google Cloud , vai alla pagina Crea un'istanza.

    Crea un'istanza

  2. Nel riquadro Configurazione macchina, segui questi passaggi:

    1. Nel campo Nome, specifica un nome per l'istanza. Per ulteriori informazioni, consulta le convenzioni per la denominazione delle risorse.

    2. Nell'elenco Regione, seleziona la stessa regione del cluster Kafka.

    3. Nell'elenco Zona, seleziona una zona.

  3. Nel menu di navigazione, fai clic su Networking. Nel riquadro Networking visualizzato, segui questi passaggi:

    1. Vai alla sezione Interfacce di rete.

    2. Per espandere l'interfaccia di rete predefinita, fai clic sulla freccia .

    3. Nel campo Rete, scegli la rete VPC.

    4. Nell'elenco Subnet, seleziona la subnet.

    5. Fai clic su Fine.

  4. Nel menu di navigazione, fai clic su Sicurezza. Nel riquadro Sicurezza visualizzato, segui questi passaggi:

    1. Per Ambiti di accesso, seleziona Imposta l'accesso per ogni API.

    2. Nell'elenco degli ambiti di accesso, trova l'elenco a discesa Cloud Platform e seleziona Attivato.

  5. Fai clic su Crea per creare la VM.

gcloud

Per creare l'istanza VM, utilizza il comando gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

Sostituisci quanto segue:

  • VM_NAME: il nome della VM
  • PROJECT_ID: il tuo ID progetto
  • REGION: la regione in cui hai creato il cluster Kafka, ad esempio us-central1
  • SUBNET: una subnet nella stessa rete VPC della subnet che hai utilizzato nella configurazione del cluster
  • ZONE: una zona nella regione in cui hai creato il cluster, ad esempio us-central1-c

Per saperne di più sulla creazione di una VM, consulta Crea un'istanza VM in una subnet specifica.

Concedi ruoli IAM

Concedi i seguenti ruoli Identity and Access Management (IAM) all'account di servizio predefinito di Compute Engine:

  • Managed Kafka Client (roles/managedkafka.client)
  • Schema Registry Admin (roles/managedkafka.schemaRegistryAdmin)
  • Creatore token service account (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

Console

  1. Nella console Google Cloud , vai alla pagina IAM.

    Vai a IAM

  2. Trova la riga relativa all'account di servizio predefinito di Compute Engine e fai clic su Modifica entità.

  3. Fai clic su Aggiungi un altro ruolo e seleziona il ruolo Client Kafka gestito. Ripeti questo passaggio per i ruoli Amministratore del registro degli schemi, Creatore token service account e Creatore token OpenID service account.

  4. Fai clic su Salva.

gcloud

Per concedere ruoli IAM, utilizza il comando gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto

  • PROJECT_NUMBER: il numero di progetto

Per ottenere il numero di progetto, esegui il comando gcloud projects describe:

gcloud projects describe PROJECT_ID

Per saperne di più, consulta Trovare il nome, il numero e l'ID del progetto.

Connettiti alla VM

Utilizza SSH per connetterti all'istanza VM.

Console

  1. Vai alla pagina Istanze VM.

    Vai a Istanze VM

  2. Nell'elenco delle istanze VM, individua il nome della VM e fai clic su SSH.

gcloud

Per connetterti alla VM, utilizza il comando gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

Sostituisci quanto segue:

  • VM_NAME: il nome della VM
  • PROJECT_ID: il tuo ID progetto
  • ZONE: la zona in cui hai creato la VM

Potrebbe essere necessaria una configurazione aggiuntiva per il primo utilizzo di SSH. Per saperne di più, consulta Informazioni sulle connessioni SSH.

Configura un progetto Apache Maven

Dalla sessione SSH, esegui questi comandi per configurare un progetto Maven.

  1. Installa Java e Maven con il comando:

    sudo apt-get install maven openjdk-17-jdk
    
  2. Configura un progetto Apache Maven.

    Utilizza il seguente comando per creare un pacchetto com.google.example in una directory denominata demo.

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

Definisci lo schema e la relativa implementazione Java

In questo esempio, un messaggio rappresenta un "utente" con un nome e un ID facoltativo. Corrisponde a uno schema Avro con due campi: un campo obbligatorio name di tipo string e un numero intero facoltativo id.. Per utilizzare questo schema in un programma Java, devi anche generare un'implementazione Java di un oggetto corrispondente a questo schema.

  1. Passa alla directory del progetto con cd demo.

  2. Crea le cartelle per archiviare i file di schema nel codice:

    mkdir -p src/main/avro
    
  3. Crea la definizione dello schema Avro incollando il seguente codice in un file denominato src/main/avro/User.avsc:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. Configura il progetto Maven per utilizzare un plug-in di generazione del codice Java Avro aggiungendo quanto segue al nodo build di pom.xml.. Tieni presente che pom.xml potrebbe contenere altri nodi plugins all'interno del nodo pluginManagement. Non modificare il nodo pluginManagement in questo passaggio. Il nodo plugins deve trovarsi allo stesso livello di pluginManagement.

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. Aggiungi Avro come dipendenza aggiungendo quanto segue alla fine del nodo project/dependencies di pom.xml. Tieni presente che pom.xml ha già un nodo dependencies all'interno del tag dependencyManagement. Non modificare il nodo dependencyManagement in questo passaggio.

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. Genera origini Java

      mvn generate-sources
    
  7. Esegui questo comando per verificare che il file di origine dell'implementazione sia stato creato. L'origine è un file di classe Java che implementa costruttori, funzioni di accesso, serializzatori e deserializzatori per oggetti User. Utilizzerai questa classe nel codice del produttore.

    cat src/main/java/com/google/example/User.java
    

Per maggiori informazioni su Apache Avro, consulta la Guida introduttiva ad Apache Avro.

Crea un client produttore

Questa sezione illustra i passaggi per scrivere, creare ed eseguire un client producer.

Implementare il producer

Il produttore utilizza KafkaAvroSerializer.java per codificare i messaggi e gestire i relativi schemi. Il serializzatore si connette automaticamente al registro degli schemi, registra lo schema in un soggetto, recupera il relativo ID e poi serializza il messaggio utilizzando Avro. Devi ancora configurare il producer e il serializzatore.

  1. Crea la classe client del produttore incollando il seguente codice in un nuovo file denominato src/main/java/com/google/example/UserProducer.java

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. Definisci la classe di callback in src/main/java/com/google/example/SendCallback.java:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. Per compilare questo codice, devi disporre del pacchetto org.apache.kafka.clients e del codice del serializzatore. L'artefatto Maven del serializzatore viene distribuito tramite un repository personalizzato. Aggiungi il seguente nodo al nodo project del tuo pom.xml per configurare questo repository:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. Aggiungi quanto segue al nodo dependencies nel file pom.xml:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. Per assicurarti che tutte le dipendenze siano risolte correttamente, compila il client:

    mvn compile
    

Crea un registro di schema

Per creare un registro di schema, esegui questo comando:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

Sostituisci quanto segue:

  • REGISTRY_ID: un identificatore univoco per il nuovo registro degli schemi. Questo fa parte del nome della risorsa del registro. Il nome deve iniziare con una lettera, contenere solo lettere (a-z, A-Z), numeri (0-9) e trattini bassi (_) e avere una lunghezza massima di 63 caratteri.

  • REGION: Google Cloud regione in cui verrà creato il registro degli schemi. Questa località deve corrispondere alla regione del cluster o dei cluster Kafka che utilizzano questo registro.

La definizione dello schema che hai creato non è ancora stata caricata nel registro. Il client producer lo fa la prima volta che viene eseguito nei passaggi successivi.

Configura ed esegui il produttore

A questo punto, il produttore non verrà eseguito perché non è completamente configurato. Per configurare il produttore, fornisci sia la configurazione di Kafka sia quella del registro di schema.

  1. Crea un file denominato client.properties nella stessa directory di pom.xml e aggiungi il seguente contenuto:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    Aggiungi le dipendenze del gestore di autenticazione di Kafka e del registro di schema al tuo progetto Maven inserendo quanto segue nel nodo dependencies di pom.xml sopra la dipendenza kafka-avro-serializer:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    Se vuoi vedere l'implementazione del gestore di autenticazione del registro dello schema personalizzato, consulta la classe GcpBearerAuthCredentialProvider.

  2. Compila ed esegui il client producer:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    Se tutto va bene, vedrai l'output Produced a message successfully generato dalla classe SendCallback.

Esamina l'output

  1. Verifica che lo schema User sia stato registrato con un nome soggetto derivato dai nomi dell'argomento e dello schema:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    L'output di questo comando dovrebbe essere simile al seguente:

    ["newUsers-value"]
    
  2. Verifica che lo schema registrato nel repository sia lo stesso di User:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    L'output del comando dovrebbe essere simile al seguente:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

Console

  1. Elimina l'istanza VM.

    1. Vai alla pagina Istanze VM.

      Vai a Istanze VM

    2. Seleziona la VM e fai clic su Elimina.

  2. Elimina il registro di schemi.

    1. Vai alla pagina Registri degli schemi.

      Vai a Registri di schema

    2. Fai clic sul nome del registro degli schemi.

    3. Fai clic su Elimina.

  3. Elimina il cluster Kafka.

    1. Vai alla pagina Managed Service per Apache Kafka > Cluster.

      Vai a Cluster

    2. Seleziona il cluster Kafka e fai clic su Elimina.

gcloud

  1. Per eliminare la VM, utilizza il comando gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Per eliminare il registro di schemi, utilizza il comando /sdk/gcloud/reference/managed-kafka/schema-registries/delete.

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. Per eliminare il cluster Kafka, utilizza il comando gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

Passaggi successivi

Apache Kafka® e Apache Avro sono marchi registrati di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.
Confluent è un marchio registrato di Confluent, Inc.