Produire des messages Avro avec le registre de schémas

Découvrez comment développer une application de production Java qui utilise le registre de schémas (aperçu) pour produire des messages Apache Avro. L'application écrit les messages dans un cluster Managed Service pour Apache Kafka.

Avant de commencer

Avant de commencer ce tutoriel, créez un cluster Managed Service pour Apache Kafka. Si vous disposez déjà d'un cluster, vous pouvez ignorer cette étape.

Créer un cluster

Console

  1. Accédez à la page Managed Service pour Apache Kafka > Clusters.

    accéder aux clusters

  2. Cliquez sur Créer.
  3. Dans le champ Nom du cluster, saisissez un nom pour le cluster.
  4. Dans la liste Région, sélectionnez un emplacement pour le cluster.
  5. Pour Configuration réseau, configurez le sous-réseau où le cluster est accessible :
    1. Pour Project (Projet), sélectionnez votre projet.
    2. Pour Réseau, sélectionnez le réseau VPC.
    3. Pour Sous-réseau, sélectionnez le sous-réseau.
    4. Cliquez sur OK.
  6. Cliquez sur Créer.

Une fois que vous avez cliqué sur Créer, l'état du cluster est Creating. Lorsque le cluster est prêt, l'état est Active.

gcloud

Pour créer un cluster Kafka, exécutez la commande managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Remplacez les éléments suivants :

  • KAFKA_CLUSTER : nom du cluster Kafka
  • REGION : emplacement du cluster
  • PROJECT_ID : ID de votre projet
  • SUBNET_NAME : sous-réseau dans lequel vous souhaitez créer le cluster, par exemple default

Pour en savoir plus sur les emplacements compatibles, consultez Emplacements Managed Service pour Apache Kafka.

La commande s'exécute de manière asynchrone et renvoie un ID d'opération :

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

Pour suivre la progression de l'opération de création, utilisez la commande gcloud managed-kafka operations describe :

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

Lorsque le cluster est prêt, le résultat de cette commande inclut l'entrée state: ACTIVE. Pour en savoir plus, consultez Surveiller l'opération de création de cluster.

Rôles requis

Pour obtenir les autorisations nécessaires pour créer et configurer une VM cliente, demandez à votre administrateur de vous accorder les rôles IAM suivants sur le projet :

Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Configurer une VM cliente

Créez une instance de machine virtuelle (VM) Linux dans Compute Engine qui peut accéder au cluster Kafka. Lorsque vous configurez la VM, définissez les options suivantes :

  • Région : Créez la VM dans la même région que votre cluster Kafka.

  • Sous-réseau Créez la VM dans le même réseau VPC que le sous-réseau que vous avez utilisé dans la configuration de votre cluster Kafka. Pour en savoir plus, consultez Afficher les sous-réseaux d'un cluster.

  • Niveaux d'accès Attribuez le niveau d'accès https://www.googleapis.com/auth/cloud-platform à la VM. Ce champ d'application autorise la VM à envoyer des requêtes à l'API Managed Kafka.

Les étapes suivantes montrent comment définir ces options.

Console

  1. Accédez à la page Créer une instance dans la console Google Cloud .

    Créer une instance

  2. Dans le volet Configuration de la machine, procédez comme suit :

    1. Dans le champ Nom, spécifiez un nom pour votre instance. Pour en savoir plus, consultez Convention d'attribution de noms aux ressources.

    2. Dans la liste Région, sélectionnez la même région que votre cluster Kafka.

    3. Dans la liste Zone, sélectionnez une zone.

  3. Dans le menu de navigation, cliquez sur Mise en réseau. Dans le volet Mise en réseau qui s'affiche, procédez comme suit :

    1. Accédez à la section Interfaces réseau.

    2. Pour développer l'interface réseau par défaut, cliquez sur la flèche .

    3. Dans le champ Réseau, sélectionnez le réseau VPC.

    4. Dans la liste Sous-réseau, sélectionnez le sous-réseau.

    5. Cliquez sur OK.

  4. Dans le menu de navigation, cliquez sur Sécurité. Dans le volet Sécurité qui s'affiche, procédez comme suit :

    1. Pour Niveaux d'accès, sélectionnez Définir l'accès pour chaque API.

    2. Dans la liste des niveaux d'accès, recherchez la liste déroulante Cloud Platform et sélectionnez Activé.

  5. Cliquez sur Créer pour créer la VM.

gcloud

Pour créer l'instance de VM, utilisez la commande gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

Remplacez les éléments suivants :

  • VM_NAME : nom de la VM
  • PROJECT_ID : ID de votre projet
  • REGION : région dans laquelle vous avez créé le cluster Kafka, par exemple us-central1
  • SUBNET : sous-réseau du même réseau VPC que celui utilisé dans la configuration du cluster
  • ZONE : zone de la région dans laquelle vous avez créé le cluster, par exemple us-central1-c

Pour en savoir plus sur la création d'une VM, consultez Créer une instance de VM dans un sous-réseau spécifique.

Attribuer des rôles IAM

Attribuez les rôles IAM (Identity and Access Management) suivants au compte de service Compute Engine par défaut :

  • Client Managed Kafka (roles/managedkafka.client)
  • Administrateur Schema Registry (roles/managedkafka.schemaRegistryAdmin)
  • Créateur de jetons du compte de service (roles/iam.serviceAccountTokenCreator)
  • Créateur de jetons OpenID du compte de service (roles/iam.serviceAccountOpenIdTokenCreator)

Console

  1. Dans la console Google Cloud , accédez à la page IAM.

    Accéder à IAM

  2. Recherchez la ligne Compte de service Compute Engine par défaut, puis cliquez sur Modifier le compte principal.

  3. Cliquez sur Ajouter un autre rôle, puis sélectionnez le rôle Client Kafka géré. Répétez cette étape pour les rôles Administrateur du registre de schémas, Créateur de jetons du compte de service et Créateur de jetons OpenID du compte de service.

  4. Cliquez sur Enregistrer.

gcloud

Pour attribuer des rôles IAM, utilisez la commande gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet

  • PROJECT_NUMBER : votre numéro de projet

Pour obtenir le numéro du projet, exécutez la commande gcloud projects describe :

gcloud projects describe PROJECT_ID

Pour en savoir plus, consultez Trouver le nom, le numéro et l'ID du projet.

Se connecter à la VM

Utilisez SSH pour vous connecter à l'instance de VM.

Console

  1. Accédez à la page Instances de VM.

    Accéder à la page Instances de VM

  2. Dans la liste des instances de VM, recherchez le nom de la VM, puis cliquez sur SSH.

gcloud

Pour vous connecter à la VM, utilisez la commande gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

Remplacez les éléments suivants :

  • VM_NAME : nom de la VM
  • PROJECT_ID : ID de votre projet
  • ZONE : zone dans laquelle vous avez créé la VM

Une configuration supplémentaire peut être requise pour la première utilisation de SSH. Pour en savoir plus, consultez À propos des connexions SSH.

Configurer un projet Apache Maven

Dans votre session SSH, exécutez les commandes suivantes pour configurer un projet Maven.

  1. Installez Java et Maven avec la commande :

    sudo apt-get install maven openjdk-17-jdk
    
  2. Configurez un projet Apache Maven.

    Exécutez la commande suivante pour créer un package com.google.example dans un répertoire appelé demo.

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

Définir le schéma et son implémentation Java

Dans cet exemple, un message représente un "utilisateur" qui possède un nom et un ID facultatif. Cela correspond à un schéma Avro avec deux champs : un champ obligatoire name de type string et un champ facultatif id. de type entier. Pour utiliser ce schéma dans un programme Java, vous devrez également générer une implémentation Java d'un objet correspondant à ce schéma.

  1. Accédez au répertoire du projet avec cd demo.

  2. Créez les dossiers pour stocker les fichiers de schéma dans votre code :

    mkdir -p src/main/avro
    
  3. Créez la définition du schéma Avro en collant le code suivant dans un fichier nommé src/main/avro/User.avsc :

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. Configurez votre projet Maven pour utiliser un plug-in de génération de code Avro Java en ajoutant les éléments suivants au nœud build de votre pom.xml.. Notez que le pom.xml peut contenir d'autres nœuds plugins à l'intérieur du nœud pluginManagement. Ne modifiez pas le nœud pluginManagement lors de cette étape. Le nœud plugins doit se trouver au même niveau que pluginManagement.

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. Ajoutez Avro en tant que dépendance en ajoutant ce qui suit à la fin du nœud project/dependencies de pom.xml. Notez que pom.xml comporte déjà un nœud dependencies à l'intérieur de la balise dependencyManagement. Ne modifiez pas le nœud dependencyManagement lors de cette étape.

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. Générer des sources Java

      mvn generate-sources
    
  7. Exécutez la commande suivante pour vérifier que le fichier source d'implémentation a été créé. La source est un fichier de classe Java qui implémente des constructeurs, des accesseurs, des sérialiseurs et des désérialiseurs pour les objets User. Vous utiliserez cette classe dans le code du producteur.

    cat src/main/java/com/google/example/User.java
    

Pour en savoir plus sur Apache Avro, consultez le guide de démarrage d'Apache Avro.

Créer un client producteur

Cette section explique comment écrire, compiler et exécuter un client producteur.

Implémenter le producteur

Le producteur utilise KafkaAvroSerializer.java pour encoder les messages et gérer leurs schémas. Le sérialiseur se connecte automatiquement au registre de schémas, enregistre le schéma sous un sujet, récupère son ID, puis sérialise le message à l'aide d'Avro. Vous devez toujours configurer le producteur et le sérialiseur.

  1. Créez la classe de client producteur en collant le code suivant dans un nouveau fichier appelé src/main/java/com/google/example/UserProducer.java.

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. Définissez la classe de rappel dans src/main/java/com/google/example/SendCallback.java :

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. Pour compiler ce code, vous avez besoin du package org.apache.kafka.clients et du code du sérialiseur. L'artefact Maven du sérialiseur est distribué via un dépôt personnalisé. Ajoutez le nœud suivant au nœud project de votre pom.xml pour configurer ce dépôt :

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. Ajoutez ce qui suit au nœud dependencies de votre fichier pom.xml :

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. Pour vous assurer que toutes les dépendances sont correctement résolues, compilez le client :

    mvn compile
    

Créer un registre de schémas

Pour créer un registre de schémas, exécutez la commande suivante :

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

Remplacez les éléments suivants :

  • REGISTRY_ID : identifiant unique de votre nouveau registre de schémas. Il fait partie du nom de ressource du registre. Le nom doit commencer par une lettre, ne contenir que des lettres (a-z, A-Z), des chiffres (0-9) et des traits de soulignement (_), et comporter 63 caractères au maximum.

  • REGION : Google Cloud région dans laquelle le registre de schémas sera créé. Cet emplacement doit correspondre à la région du ou des clusters Kafka utilisant ce registre.

La définition de schéma que vous avez créée n'a pas encore été importée dans le registre. Le client producteur effectue cette opération la première fois qu'il s'exécute en suivant les étapes ci-dessous.

Configurer et exécuter le producteur

À ce stade, le producteur ne s'exécutera pas, car il n'est pas entièrement configuré. Pour configurer le producteur, fournissez la configuration Kafka et celle du registre de schémas.

  1. Créez un fichier nommé client.properties dans le même répertoire que votre fichier pom.xml et ajoutez-y le contenu suivant :

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    Ajoutez les dépendances du gestionnaire d'authentification Kafka et du registre de schémas à votre projet Maven en insérant les lignes suivantes dans le nœud dependencies de pom.xml au-dessus de la dépendance kafka-avro-serializer :

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    Si vous souhaitez voir l'implémentation du gestionnaire d'authentification du registre de schéma personnalisé, consultez la classe GcpBearerAuthCredentialProvider.

  2. Compilez et exécutez le client producteur :

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    Si tout se déroule comme prévu, la sortie Produced a message successfully générée par la classe SendCallback s'affiche.

Examiner le résultat

  1. Vérifiez que le schéma User a été enregistré sous un nom de sujet dérivé des noms du sujet et du schéma :

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    Le résultat de cette commande devrait ressembler à ceci :

    ["newUsers-value"]
    
  2. Vérifiez que le schéma enregistré dans le dépôt est identique à User :

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    Le résultat de la commande devrait ressembler à ceci :

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans cette démonstration soient facturées sur votre compte Google Cloud , procédez comme suit :

Console

  1. Supprimez l'instance de VM.

    1. Accédez à la page Instances de VM.

      Accéder à la page Instances de VM

    2. Sélectionnez la VM, puis cliquez sur Supprimer.

  2. Supprimez le registre de schémas.

    1. Accédez à la page Registres de schémas.

      Accéder aux registres de schémas

    2. Cliquez sur le nom du registre de schémas.

    3. Cliquez sur Supprimer.

  3. Supprimez le cluster Kafka.

    1. Accédez à la page Managed Service pour Apache Kafka > Clusters.

      accéder aux clusters

    2. Sélectionnez le cluster Kafka, puis cliquez sur Supprimer.

gcloud

  1. Pour supprimer la VM, utilisez la commande gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Pour supprimer le registre de schémas, utilisez la commande /sdk/gcloud/reference/managed-kafka/schema-registries/delete.

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. Pour supprimer le cluster Kafka, utilisez la commande gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

Étapes suivantes

Apache Kafka® et Apache Avro sont des marques déposées d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.
Confluent est une marque déposée de Confluent, Inc.