Avro-Nachrichten mit der Schema-Registry erstellen
Hier erfahren Sie, wie Sie eine Java-Producer-Anwendung entwickeln, die die Schemaregistrierung (Vorabversion) verwendet, um Apache Avro-Nachrichten zu erstellen. Die Anwendung schreibt die Nachrichten in einen Managed Service for Apache Kafka-Cluster.
Hinweise
Bevor Sie mit dieser Anleitung beginnen, erstellen Sie einen neuen Managed Service for Apache Kafka-Cluster. Wenn Sie bereits einen Cluster haben, können Sie diesen Schritt überspringen.
Cluster erstellen
Console
- Rufen Sie die Seite Managed Service for Apache Kafka > Cluster auf.
- Klicken Sie auf Erstellen.
- Geben Sie in das Feld Clustername einen Namen für den Cluster ein.
- Wählen Sie in der Liste Region einen Standort für den Cluster aus.
-
Konfigurieren Sie unter Netzwerkkonfiguration das Subnetz, in dem der Cluster zugänglich ist:
- Wählen Sie unter Projekt Ihr Projekt aus.
- Wählen Sie unter Netzwerk das VPC-Netzwerk aus.
- Wählen Sie das Subnetz für Subnetz aus.
- Klicken Sie auf Fertig.
- Klicken Sie auf Erstellen.
Nachdem Sie auf Erstellen geklickt haben, ist der Clusterstatus Creating. Wenn der Cluster bereit ist, lautet der Status Active.
gcloud
Führen Sie den Befehl managed-kafka clusters
create aus, um einen Kafka-Cluster zu erstellen.
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
Ersetzen Sie Folgendes:
KAFKA_CLUSTER: ein Name für den Kafka-ClusterREGION: Der Standort des ClustersPROJECT_ID: Ihre Projekt-ID.SUBNET_NAME: das Subnetz, in dem Sie den Cluster erstellen möchten, z. B.default
Informationen zu unterstützten Standorten finden Sie unter Managed Service for Apache Kafka-Standorte.
Der Befehl wird asynchron ausgeführt und gibt eine Vorgangs-ID zurück:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
Verwenden Sie den Befehl gcloud managed-kafka
operations describe, um den Fortschritt des Erstellungsvorgangs zu verfolgen:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
Wenn der Cluster bereit ist, enthält die Ausgabe dieses Befehls den Eintrag state:
ACTIVE. Weitere Informationen finden Sie unter Vorgang zum Erstellen von Clustern überwachen.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für das Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen und Konfigurieren einer Client-VM benötigen:
-
Compute-Instanzadministrator (Version 1) (
roles/compute.instanceAdmin.v1) -
Projekt-IAM-Administrator (
roles/resourcemanager.projectIamAdmin) -
Rollenbetrachter (
roles/iam.roleViewer) -
Service Account User (
roles/iam.serviceAccountUser)
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Client-VM einrichten
Erstellen Sie eine Linux-VM-Instanz in Compute Engine, die auf den Kafka-Cluster zugreifen kann. Legen Sie beim Konfigurieren der VM die folgenden Optionen fest:
Region Erstellen Sie die VM in derselben Region wie Ihren Kafka-Cluster.
Subnetz Erstellen Sie die VM im selben VPC-Netzwerk wie das Subnetz, das Sie in der Konfiguration Ihres Kafka-Clusters verwendet haben. Weitere Informationen finden Sie unter Subnetze eines Clusters ansehen.
Zugriffsbereiche Weisen Sie der VM den
https://www.googleapis.com/auth/cloud-platform-Zugriffsbereich zu. Mit diesem Bereich wird die VM autorisiert, Anfragen an die Managed Kafka API zu senden.
In den folgenden Schritten wird beschrieben, wie Sie diese Optionen festlegen.
Console
Rufen Sie in der Google Cloud Console die Seite Instanz erstellen auf.
Führen Sie im Bereich Maschinenkonfiguration die folgenden Schritte aus:
Geben Sie im Feld Name einen Namen für die Instanz an. Weitere Informationen finden Sie unter Namenskonvention für Ressourcen.
Wählen Sie in der Liste Region dieselbe Region wie für Ihren Kafka-Cluster aus.
Wählen Sie in der Liste Zone eine Zone aus.
Klicken Sie im Navigationsmenü auf Netzwerk. Führen Sie im angezeigten Bereich Netzwerk die folgenden Schritte aus:
Gehen Sie zum Abschnitt Netzwerkschnittstellen.
Klicken Sie auf den Pfeil , um die Standardnetzwerkschnittstelle zu maximieren.
Wählen Sie im Feld Netzwerk das VPC-Netzwerk aus.
Wählen Sie in der Liste Subnetzwerk das Subnetzwerk aus.
Klicken Sie auf Fertig.
Klicken Sie im Navigationsmenü auf Sicherheit. Führen Sie im angezeigten Bereich Sicherheit die folgenden Schritte aus:
Wählen Sie für Zugriffsbereiche die Option Zugriff für jede API festlegen aus.
Suchen Sie in der Liste der Zugriffsbereiche nach der Drop-down-Liste Cloud Platform und wählen Sie Aktiviert aus.
Klicken Sie auf Erstellen, um die VM zu erstellen.
gcloud
Verwenden Sie den Befehl gcloud compute instances create, um die VM-Instanz zu erstellen.
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
Ersetzen Sie Folgendes:
- VM_NAME: der Name der VM
- PROJECT_ID: Ihre Projekt-ID.
- REGION: Die Region, in der Sie den Kafka-Cluster erstellt haben, z. B.
us-central1 - SUBNET: ein Subnetz im selben VPC-Netzwerk wie das Subnetz, das Sie in der Clusterkonfiguration verwendet haben
- ZONE: Eine Zone in der Region, in der Sie den Cluster erstellt haben, z. B.
us-central1-c
Weitere Informationen zum Erstellen einer VM finden Sie unter VM-Instanz in einem bestimmten Subnetz erstellen.
IAM-Rollen zuweisen
Weisen Sie dem standardmäßigen Compute Engine-Dienstkonto die folgenden IAM-Rollen (Identity and Access Management) zu:
- Managed Kafka Client (
roles/managedkafka.client) - Schema Registry-Administrator (
roles/managedkafka.schemaRegistryAdmin) - Ersteller von Dienstkonto-Token (
roles/iam.serviceAccountTokenCreator) Ersteller von OpenID-Tokens für Dienstkonten (
roles/iam.serviceAccountOpenIdTokenCreator)
Console
Rufen Sie in der Google Cloud Console die Seite IAM auf.
Suchen Sie die Zeile für das Compute Engine-Standarddienstkonto und klicken Sie auf Hauptkonto bearbeiten.
Klicken Sie auf Weitere Rolle hinzufügen und wählen Sie die Rolle Managed Kafka Client aus. Wiederholen Sie diesen Schritt für die Rollen Schema Registry Admin (Schema-Registry-Administrator), Service Account Token Creator (Ersteller von Dienstkonto-Tokens) und Service Account OpenID Token Creator (Ersteller von OpenID-Tokens für Dienstkonten).
Klicken Sie auf Speichern.
gcloud
Verwenden Sie den Befehl gcloud projects add-iam-policy-binding, um IAM-Rollen zuzuweisen.
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.schemaRegistryAdmin
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
Ersetzen Sie Folgendes:
PROJECT_ID: Ihre Projekt-ID.
PROJECT_NUMBER: Ihre Projektnummer
Führen Sie den Befehl gcloud projects describe aus, um die Projektnummer abzurufen:
gcloud projects describe PROJECT_ID
Weitere Informationen finden Sie unter Projektname, ‑nummer und ‑ID ermitteln.
Verbindung zur VM herstellen
Stellen Sie über SSH eine Verbindung zur VM-Instanz her.
Console
Rufen Sie die Seite VM-Instanzen auf.
Suchen Sie in der Liste der VM-Instanzen nach dem VM-Namen und klicken Sie auf SSH.
gcloud
Verwenden Sie den Befehl gcloud compute ssh, um eine Verbindung zur VM herzustellen.
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
Ersetzen Sie Folgendes:
- VM_NAME: der Name der VM
- PROJECT_ID: Ihre Projekt-ID.
- ZONE: die Zone, in der Sie die VM erstellt haben.
Bei der erstmaligen Verwendung von SSH ist möglicherweise eine zusätzliche Konfiguration erforderlich. Weitere Informationen finden Sie unter Informationen zu SSH-Verbindungen.
Apache Maven-Projekt einrichten
Führen Sie in Ihrer SSH-Sitzung die folgenden Befehle aus, um ein Maven-Projekt einzurichten.
Installieren Sie Java und Maven mit dem Befehl:
sudo apt-get install maven openjdk-17-jdkRichten Sie ein Apache Maven-Projekt ein.
Verwenden Sie den folgenden Befehl, um das Paket
com.google.examplein einem Verzeichnis namensdemozu erstellen.mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\ -DarchetypeArtifactId=maven-archetype-quickstart\ -DarchetypeVersion=1.5 -DinteractiveMode=false
Schema und zugehörige Java-Implementierung definieren
In diesem Beispiel stellt eine Nachricht einen „Nutzer“ mit einem Namen und einer optionalen ID dar. Dies entspricht einem Avro-Schema mit zwei Feldern: einem erforderlichen Feld name vom Typ string und einer optionalen Ganzzahl id.. Wenn Sie dieses Schema in einem Java-Programm verwenden möchten, müssen Sie auch eine Java-Implementierung eines Objekts generieren, das diesem Schema entspricht.
Wechseln Sie mit
cd demoin das Projektverzeichnis.Erstellen Sie die Ordner zum Speichern von Schemadateien in Ihrem Code:
mkdir -p src/main/avroErstellen Sie die Avro-Schemadefinition, indem Sie den folgenden Code in eine Datei mit dem Namen
src/main/avro/User.avsceinfügen:{ "namespace": "com.google.example", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "id", "type": ["int", "null"]} ] }Konfigurieren Sie Ihr Maven-Projekt für die Verwendung eines Avro-Java-Code-Generierungs-Plug-ins, indem Sie Folgendes zum Knoten
buildIhrerpom.xml.hinzufügen. Beachten Sie, dasspom.xmlmöglicherweise andereplugins-Knoten innerhalb despluginManagement-Knotens enthält. Ändern Sie den KnotenpluginManagementin diesem Schritt nicht. Der Knotenpluginsmuss sich auf derselben Ebene wiepluginManagementbefinden.<plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins>Fügen Sie Avro als Abhängigkeit hinzu, indem Sie Folgendes am Ende des Knotens
project/dependenciesvonpom.xmleinfügen. Beachten Sie, dass daspom.xmlbereits einendependencies-Knoten innerhalb desdependencyManagement-Tags hat. Ändern Sie den KnotendependencyManagementin diesem Schritt nicht.<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>Java-Quellen generieren
mvn generate-sourcesFühren Sie den folgenden Befehl aus, um zu prüfen, ob die Implementierungsquelldatei erstellt wurde. Die Quelle ist eine Java-Klassendatei, die Konstruktoren, Accessoren, Serialisierer und Deserialisierer für
User-Objekte implementiert. Sie verwenden diese Klasse im Erstellercode.cat src/main/java/com/google/example/User.java
Weitere Informationen zu Apache Avro finden Sie im Apache Avro-Startleitfaden.
Ersteller-Client erstellen
In diesem Abschnitt werden die Schritte zum Schreiben, Erstellen und Ausführen eines Producer-Clients beschrieben.
Producer implementieren
Der Producer verwendet KafkaAvroSerializer.java, um Nachrichten zu codieren und ihre Schemas zu verwalten. Der Serializer stellt automatisch eine Verbindung zur Schemaregistrierung her, registriert das Schema unter einem Betreff, ruft seine ID ab und serialisiert dann die Nachricht mit Avro. Sie müssen den Producer und den Serializer weiterhin konfigurieren.
Erstellen Sie die Producer-Client-Klasse, indem Sie den folgenden Code in eine neue Datei mit dem Namen
src/main/java/com/google/example/UserProducer.javaeinfügen.package com.google.example; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class UserProducer { private static Properties configure() throws Exception { Properties p = new Properties(); p.load(new java.io.FileReader("client.properties")); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); return p; } public static void main(String[] args) throws Exception { Properties p = configure(); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p); final User u = new User("SchemaEnthusiast", 42); final String topicName = "newUsers"; ProducerRecord<String, User> message = new ProducerRecord<String, User>(topicName, "", u); producer.send(message, new SendCallback()); producer.close(); } }Definieren Sie die Callback-Klasse in
src/main/java/com/google/example/SendCallback.java:package com.google.example; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; class SendCallback implements Callback { public void onCompletion(RecordMetadata m, Exception e){ if (e == null){ System.out.println("Produced a message successfully."); } else { System.out.println(e.getMessage()); } } }Zum Kompilieren dieses Codes benötigen Sie das
org.apache.kafka.clients-Paket und den Serialisierungscode. Das Maven-Artefakt für die Serialisierung wird über ein benutzerdefiniertes Repository verteilt. Fügen Sie den folgenden Knoten dem KnotenprojectIhrespom.xmlhinzu, um dieses Repository zu konfigurieren:<repositories> <repository> <id>confluent</id> <name>Confluent</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>Fügen Sie dem
dependencies-Knoten in der Dateipom.xmlFolgendes hinzu:<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.2</version> </dependency>Kompilieren Sie den Client, um sicherzustellen, dass alle Abhängigkeiten richtig aufgelöst werden:
mvn compile
Schema-Registry erstellen
Führen Sie den folgenden Befehl aus, um eine Schemaregistry zu erstellen:
gcloud beta managed-kafka schema-registries create REGISTRY_ID \
--location=REGION
Ersetzen Sie Folgendes:
REGISTRY_ID: Eine eindeutige Kennung für Ihre neue Schemaregistry. Dieser ist Teil des Ressourcennamens der Registry. Der Name muss mit einem Buchstaben beginnen, darf nur Buchstaben
(a-z, A-Z), Zahlen(0-9)und Unterstriche(_)enthalten und darf höchstens 63 Zeichen lang sein.REGION: Google Cloud Region, in der die Schemaregistrierung erstellt wird. Dieser Standort muss mit der Region des Kafka-Clusters oder der Kafka-Cluster übereinstimmen, die diese Registry verwenden.
Die von Ihnen erstellte Schemadefinition wurde noch nicht in die Registry hochgeladen. Der Producer-Client führt dies beim ersten Ausführen in den folgenden Schritten aus.
Producer konfigurieren und ausführen
An diesem Punkt wird der Producer nicht ausgeführt, da er nicht vollständig konfiguriert ist. Um den Producer zu konfigurieren, geben Sie sowohl die Kafka- als auch die Schema-Registry-Konfiguration an.
Erstellen Sie im selben Verzeichnis wie Ihre
pom.xml-Datei eine Datei mit dem Namenclient.propertiesund fügen Sie ihr den folgenden Inhalt hinzu:bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID bearer.auth.credentials.source=CUSTOM bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProviderFügen Sie Ihrem Maven-Projekt die Abhängigkeiten für die Kafka- und Schema Registry-Authentifizierungshandler hinzu, indem Sie Folgendes in den
dependencies-Knoten vonpom.xmlüber derkafka-avro-serializer-Abhängigkeit einfügen:<dependency> <groupId>com.google.cloud.hosted.kafka</groupId> <artifactId>managed-kafka-auth-login-handler</artifactId> <version>1.0.6</version> <exclusions> <exclusion> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> </exclusion> </exclusions> </dependency>Wenn Sie die Implementierung des Authentifizierungshandlers für die benutzerdefinierte Schemaregistrierung sehen möchten, sehen Sie sich die Klasse GcpBearerAuthCredentialProvider an.
Kompilieren Sie den Producer-Client und führen Sie ihn aus:
mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducerWenn alles wie geplant ausgeführt wird, sehen Sie die Ausgabe
Produced a message successfully, die von der KlasseSendCallbackgeneriert wird.
Ausgabe prüfen
Prüfen Sie, ob das
User-Schema unter einem Subjektnamen registriert wurde, der aus den Namen des Themas und des Schemas abgeleitet ist:SR_DOMAIN=https://managedkafka.googleapis.com SR_PATH=/v1/projects/PROJECT_ID/locations/REGION SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)"\ $SR_HOSTDie Ausgabe dieses Befehls sollte so aussehen:
["newUsers-value"]Prüfen Sie, ob das im Repository registrierte Schema mit
Userübereinstimmt:curl -X GET \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ $SR_HOST/newUsers-value/versions/1Die Ausgabe des Befehls sollte so aussehen:
{ "subject": "newUsers-value", "version": 1, "id": 2, "schemaType": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}", "references": [] }
Bereinigen
Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud -Konto die auf dieser Seite verwendeten Ressourcen in Rechnung gestellt werden:
Console
Löschen Sie die VM-Instanz.
Rufen Sie die Seite VM-Instanzen auf.
Wählen Sie die VM aus und klicken Sie auf Löschen.
Löschen Sie die Schema-Registry.
Rufen Sie die Seite Schema-Registries auf.
Klicken Sie auf den Namen der Schemaregistry.
Klicken Sie auf Löschen.
Löschen Sie den Kafka-Cluster.
Rufen Sie die Seite Managed Service for Apache Kafka > Cluster auf.
Wählen Sie den Kafka-Cluster aus und klicken Sie auf Löschen.
gcloud
Verwenden Sie zum Löschen der VM den Befehl
gcloud compute instances delete.gcloud compute instances delete VM_NAME --zone=ZONEVerwenden Sie den Befehl
/sdk/gcloud/reference/managed-kafka/schema-registries/delete, um die Schemaregistrierung zu löschen.gcloud beta managed-kafka schema-registries delete REGISTRY_ID \ --location=REGIONVerwenden Sie den Befehl
gcloud managed-kafka clusters delete, um den Kafka-Cluster zu löschen.gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async
Nächste Schritte
- Übersicht über die Schema-Registry
- Übersicht über Managed Service for Apache Kafka
- Authentifizierung für die Managed Kafka API
- Avro-Nachrichten mit Kafka Connect in BigQuery schreiben