Produci messaggi Avro con il registro di schema
Scopri come sviluppare un'applicazione di produzione Java che utilizza il registro degli schemi (anteprima) per produrre messaggi Apache Avro. L'applicazione scrive i messaggi in un cluster Managed Service per Apache Kafka.
Prima di iniziare
Prima di iniziare questo tutorial, crea un nuovo cluster Managed Service per Apache Kafka. Se hai già un cluster, puoi saltare questo passaggio.
Come creare un cluster
Console
- Vai alla pagina Managed Service per Apache Kafka > Cluster.
- Fai clic su Crea.
- Nella casella Nome del cluster, inserisci un nome per il cluster.
- Nell'elenco Regione, seleziona una località per il cluster.
-
Per Configurazione di rete, configura la subnet in cui è accessibile il cluster:
- Per Progetto, seleziona il tuo progetto.
- In Rete, seleziona la rete VPC.
- In Subnet, seleziona la subnet.
- Fai clic su Fine.
- Fai clic su Crea.
Dopo aver fatto clic su Crea, lo stato del cluster è Creating. Quando il cluster
è pronto, lo stato è Active.
gcloud
Per creare un cluster Kafka, esegui il comando
managed-kafka clusters
create.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
Sostituisci quanto segue:
KAFKA_CLUSTER: un nome per il cluster KafkaREGION: la posizione del clusterPROJECT_ID: il tuo ID progettoSUBNET_NAME: la subnet in cui vuoi creare il cluster, ad esempiodefault
Per informazioni sulle località supportate, consulta Località di Managed Service per Apache Kafka.
Il comando viene eseguito in modo asincrono e restituisce un ID operazione:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
Per monitorare l'avanzamento dell'operazione di creazione, utilizza il comando
gcloud managed-kafka
operations describe:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
Quando il cluster è pronto, l'output di questo comando include la voce state:
ACTIVE. Per saperne di più, consulta
Monitorare l'operazione di creazione del cluster.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per creare e configurare una VM client, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:
-
Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) -
Project IAM Admin (
roles/resourcemanager.projectIamAdmin) -
Role Viewer (
roles/iam.roleViewer) -
Utente Service Account (
roles/iam.serviceAccountUser)
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Configura una VM client
Crea un'istanza di macchina virtuale (VM) Linux in Compute Engine che possa accedere al cluster Kafka. Quando configuri la VM, imposta le seguenti opzioni:
Regione. Crea la VM nella stessa regione del cluster Kafka.
Subnet. Crea la VM nella stessa rete VPC della subnet che hai utilizzato nella configurazione del cluster Kafka. Per saperne di più, consulta Visualizzare le subnet di un cluster.
Ambiti di accesso. Assegna l'
https://www.googleapis.com/auth/cloud-platformambito di accesso alla VM. Questo ambito autorizza la VM a inviare richieste all'API Managed Kafka.
I seguenti passaggi mostrano come impostare queste opzioni.
Console
Nella console Google Cloud , vai alla pagina Crea un'istanza.
Nel riquadro Configurazione macchina, segui questi passaggi:
Nel campo Nome, specifica un nome per l'istanza. Per ulteriori informazioni, consulta le convenzioni per la denominazione delle risorse.
Nell'elenco Regione, seleziona la stessa regione del cluster Kafka.
Nell'elenco Zona, seleziona una zona.
Nel menu di navigazione, fai clic su Networking. Nel riquadro Networking visualizzato, segui questi passaggi:
Vai alla sezione Interfacce di rete.
Per espandere l'interfaccia di rete predefinita, fai clic sulla freccia .
Nel campo Rete, scegli la rete VPC.
Nell'elenco Subnet, seleziona la subnet.
Fai clic su Fine.
Nel menu di navigazione, fai clic su Sicurezza. Nel riquadro Sicurezza visualizzato, segui questi passaggi:
Per Ambiti di accesso, seleziona Imposta l'accesso per ogni API.
Nell'elenco degli ambiti di accesso, trova l'elenco a discesa Cloud Platform e seleziona Attivato.
Fai clic su Crea per creare la VM.
gcloud
Per creare l'istanza VM, utilizza il
comando gcloud compute instances create.
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
Sostituisci quanto segue:
- VM_NAME: il nome della VM
- PROJECT_ID: il tuo ID progetto
- REGION: la regione in cui hai creato il cluster Kafka, ad esempio
us-central1 - SUBNET: una subnet nella stessa rete VPC della subnet che hai utilizzato nella configurazione del cluster
- ZONE: una zona nella regione in cui hai creato il cluster, ad esempio
us-central1-c
Per saperne di più sulla creazione di una VM, consulta Crea un'istanza VM in una subnet specifica.
Concedi ruoli IAM
Concedi i seguenti ruoli Identity and Access Management (IAM) all'account di servizio predefinito di Compute Engine:
- Managed Kafka Client (
roles/managedkafka.client) - Schema Registry Admin (
roles/managedkafka.schemaRegistryAdmin) - Creatore token service account (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
Console
Nella console Google Cloud , vai alla pagina IAM.
Trova la riga relativa all'account di servizio predefinito di Compute Engine e fai clic su Modifica entità.
Fai clic su Aggiungi un altro ruolo e seleziona il ruolo Client Kafka gestito. Ripeti questo passaggio per i ruoli Amministratore del registro degli schemi, Creatore token service account e Creatore token OpenID service account.
Fai clic su Salva.
gcloud
Per concedere ruoli IAM, utilizza il
comando gcloud projects add-iam-policy-binding.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
Sostituisci quanto segue:
PROJECT_ID: il tuo ID progetto
PROJECT_NUMBER: il numero di progetto
Per ottenere il numero di progetto, esegui il comando
gcloud projects describe:
gcloud projects describe PROJECT_ID
Per saperne di più, consulta Trovare il nome, il numero e l'ID del progetto.
Connettiti alla VM
Utilizza SSH per connetterti all'istanza VM.
Console
Vai alla pagina Istanze VM.
Nell'elenco delle istanze VM, individua il nome della VM e fai clic su SSH.
gcloud
Per connetterti alla VM, utilizza il
comando gcloud compute ssh.
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
Sostituisci quanto segue:
- VM_NAME: il nome della VM
- PROJECT_ID: il tuo ID progetto
- ZONE: la zona in cui hai creato la VM
Potrebbe essere necessaria una configurazione aggiuntiva per il primo utilizzo di SSH. Per saperne di più, consulta Informazioni sulle connessioni SSH.
Configura un progetto Apache Maven
Dalla sessione SSH, esegui questi comandi per configurare un progetto Maven.
Installa Java e Maven con il comando:
sudo apt-get install maven openjdk-17-jdkConfigura un progetto Apache Maven.
Utilizza il seguente comando per creare un pacchetto
com.google.examplein una directory denominatademo.mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
Definisci lo schema e la relativa implementazione Java
In questo esempio, un messaggio rappresenta un "utente" con un nome e
un ID facoltativo. Corrisponde a uno schema Avro con due campi:
un campo obbligatorio name di tipo string e un numero intero facoltativo id..
Per utilizzare questo schema in un programma Java, devi anche generare un'implementazione Java di un oggetto corrispondente a questo schema.
Passa alla directory del progetto con
cd demo.Crea le cartelle per archiviare i file di schema nel codice:
mkdir -p src/main/avroCrea la definizione dello schema Avro incollando il seguente codice in un file denominato
src/main/avro/User.avsc:{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }Configura il progetto Maven per utilizzare un plug-in di generazione del codice Java Avro aggiungendo quanto segue al nodo
builddipom.xml.. Tieni presente chepom.xmlpotrebbe contenere altri nodipluginsall'interno del nodopluginManagement. Non modificare il nodopluginManagementin questo passaggio. Il nodopluginsdeve trovarsi allo stesso livello dipluginManagement.<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>Aggiungi Avro come dipendenza aggiungendo quanto segue alla fine del nodo
project/dependenciesdipom.xml. Tieni presente chepom.xmlha già un nododependenciesall'interno del tagdependencyManagement. Non modificare il nododependencyManagementin questo passaggio.<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>Genera origini Java
mvn generate-sourcesEsegui questo comando per verificare che il file di origine dell'implementazione sia stato creato. L'origine è un file di classe Java che implementa costruttori, funzioni di accesso, serializzatori e deserializzatori per oggetti
User. Utilizzerai questa classe nel codice del produttore.cat src/main/java/com/google/example/User.java
Per maggiori informazioni su Apache Avro, consulta la Guida introduttiva ad Apache Avro.
Crea un client produttore
Questa sezione illustra i passaggi per scrivere, creare ed eseguire un client producer.
Implementare il producer
Il produttore utilizza KafkaAvroSerializer.java per codificare i messaggi e gestire i relativi schemi. Il serializzatore si connette automaticamente al registro degli schemi, registra lo schema in un soggetto, recupera il relativo ID e poi serializza il messaggio utilizzando Avro. Devi ancora configurare il producer e il serializzatore.
Crea la classe client del produttore incollando il seguente codice in un nuovo file denominato
src/main/java/com/google/example/UserProducer.javapackage com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }Definisci la classe di callback in
src/main/java/com/google/example/SendCallback.java:package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }Per compilare questo codice, devi disporre del pacchetto
org.apache.kafka.clientse del codice del serializzatore. L'artefatto Maven del serializzatore viene distribuito tramite un repository personalizzato. Aggiungi il seguente nodo al nodoprojectdel tuopom.xmlper configurare questo repository:<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>Aggiungi quanto segue al nodo
dependenciesnel filepom.xml:<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>Per assicurarti che tutte le dipendenze siano risolte correttamente, compila il client:
mvn compile
Crea un registro di schema
Per creare un registro di schema, esegui questo comando:
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
Sostituisci quanto segue:
REGISTRY_ID: un identificatore univoco per il nuovo registro degli schemi. Questo fa parte del nome della risorsa del registro. Il nome deve iniziare con una lettera, contenere solo lettere
(a-z, A-Z), numeri(0-9)e trattini bassi(_)e avere una lunghezza massima di 63 caratteri.REGION: Google Cloud regione in cui verrà creato il registro degli schemi. Questa località deve corrispondere alla regione del cluster o dei cluster Kafka che utilizzano questo registro.
La definizione dello schema che hai creato non è ancora stata caricata nel registro. Il client producer lo fa la prima volta che viene eseguito nei passaggi successivi.
Configura ed esegui il produttore
A questo punto, il produttore non verrà eseguito perché non è completamente configurato. Per configurare il produttore, fornisci sia la configurazione di Kafka sia quella del registro di schema.
Crea un file denominato
client.propertiesnella stessa directory dipom.xmle aggiungi il seguente contenuto:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProviderAggiungi le dipendenze del gestore di autenticazione di Kafka e del registro di schema al tuo progetto Maven inserendo quanto segue nel nodo
dependenciesdipom.xmlsopra la dipendenzakafka-avro-serializer:<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>Se vuoi vedere l'implementazione del gestore di autenticazione del registro dello schema personalizzato, consulta la classe GcpBearerAuthCredentialProvider.
Compila ed esegui il client producer:
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducerSe tutto va bene, vedrai l'output
Produced a message successfullygenerato dalla classeSendCallback.
Esamina l'output
Verifica che lo schema
Usersia stato registrato con un nome soggetto derivato dai nomi dell'argomento e dello schema:SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOSTL'output di questo comando dovrebbe essere simile al seguente:
["newUsers-value"]Verifica che lo schema registrato nel repository sia lo stesso di
User:curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/newUsers-value/versions/1L'output del comando dovrebbe essere simile al seguente:
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.
Console
Elimina l'istanza VM.
Vai alla pagina Istanze VM.
Seleziona la VM e fai clic su Elimina.
Elimina il registro di schemi.
Vai alla pagina Registri degli schemi.
Fai clic sul nome del registro degli schemi.
Fai clic su Elimina.
Elimina il cluster Kafka.
Vai alla pagina Managed Service per Apache Kafka > Cluster.
Seleziona il cluster Kafka e fai clic su Elimina.
gcloud
Per eliminare la VM, utilizza il comando
gcloud compute instances delete.gcloud compute instances delete VM_NAME --zone=ZONEPer eliminare il registro di schemi, utilizza il comando
/sdk/gcloud/reference/managed-kafka/schema-registries/delete.gcloud beta managed-kafka schema-registries delete REGISTRY_ID \ --location=REGIONPer eliminare il cluster Kafka, utilizza il comando
gcloud managed-kafka clusters delete.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
Passaggi successivi
- Panoramica del registro di schema.
- Panoramica di Managed Service per Apache Kafka.
- Autenticati nell'API Managed Kafka.
- Scrivere messaggi Avro in BigQuery utilizzando Kafka Connect