Produce mensajes de Avro con el registro de esquemas

Aprende a desarrollar una aplicación de productor en Java que use el registro de esquemas (versión preliminar) para producir mensajes de Apache Avro. La aplicación escribe los mensajes en un clúster de Managed Service para Apache Kafka.

Antes de comenzar

Antes de comenzar este instructivo, crea un clúster nuevo de Managed Service para Apache Kafka. Si ya tienes un clúster, puedes omitir este paso.

Cómo crear un clúster

Console

  1. Ve a la página Managed Service for Apache Kafka > Clusters.

    Ir a los clústeres

  2. Haz clic en Crear.
  3. En el cuadro Nombre del clúster, ingresa un nombre para el clúster.
  4. En la lista Región, selecciona una ubicación para el clúster.
  5. En Configuración de red, configura la subred en la que se puede acceder al clúster:
    1. En Proyecto, seleccione su proyecto.
    2. En Red, selecciona la red de VPC.
    3. En Subred, selecciona la subred.
    4. Haz clic en Listo.
  6. Haz clic en Crear.

Después de hacer clic en Crear, el estado del clúster es Creating. Cuando el clúster está listo, el estado es Active.

gcloud

Para crear un clúster de Kafka, ejecuta el comando managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Reemplaza lo siguiente:

  • KAFKA_CLUSTER: Un nombre para el clúster de Kafka
  • REGION: Es la ubicación del clúster.
  • PROJECT_ID: Es el ID del proyecto.
  • SUBNET_NAME: Es la subred en la que deseas crear el clúster, por ejemplo, default.

Para obtener información sobre las ubicaciones admitidas, consulta las ubicaciones de Managed Service para Apache Kafka.

El comando se ejecuta de forma asíncrona y muestra un ID de operación:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

Para hacer un seguimiento del progreso de la operación de creación, usa el comando gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

Cuando el clúster esté listo, el resultado de este comando incluirá la entrada state: ACTIVE. Para obtener más información, consulta Cómo supervisar la operación de creación del clúster.

Roles obligatorios

Para obtener los permisos que necesitas para crear y configurar una VM cliente, pídele a tu administrador que te otorgue los siguientes roles de IAM en el proyecto:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Configura una VM de cliente

Crea una instancia de máquina virtual (VM) de Linux en Compute Engine que pueda acceder al clúster de Kafka. Cuando configures la VM, establece las siguientes opciones:

  • Region. Crea la VM en la misma región que tu clúster de Kafka.

  • Subred. Crea la VM en la misma red de VPC que la subred que usaste en la configuración del clúster de Kafka. Para obtener más información, consulta Cómo ver las subredes de un clúster.

  • Permisos de acceso Asigna el permiso de acceso https://www.googleapis.com/auth/cloud-platform a la VM. Este alcance autoriza a la VM a enviar solicitudes a la API de Managed Kafka.

En los siguientes pasos, se muestra cómo configurar estas opciones.

Console

  1. En la consola de Google Cloud , ve a la página Crear una instancia.

    Crea una instancia

  2. En el panel Configuración de la máquina, haz lo siguiente:

    1. En el campo Nombre, especifica un nombre para tu instancia. Para obtener más información, consulta Convención de asignación de nombres de recursos.

    2. En la lista Región, selecciona la misma región que tu clúster de Kafka.

    3. En la lista Zona, selecciona una zona.

  3. En el menú de navegación, haz clic en Herramientas de redes. En el panel Networking que aparece, haz lo siguiente:

    1. Ve a la sección Interfaces de red.

    2. Para expandir la interfaz de red predeterminada, haz clic en la flecha .

    3. En el campo Red, elige la red de VPC.

    4. En la lista Subred, selecciona la subred.

    5. Haz clic en Listo.

  4. En el menú de navegación, haz clic en Seguridad. En el panel Seguridad que aparece, haz lo siguiente:

    1. En Permisos de acceso, selecciona Configurar acceso para cada API.

    2. En la lista de permisos de acceso, busca la lista desplegable Cloud Platform y selecciona Habilitado.

  5. Haz clic en Crear para crear la VM.

gcloud

Para crear la instancia de VM, usa el comando gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

Reemplaza lo siguiente:

  • VM_NAME: El nombre de la VM
  • PROJECT_ID: Es el ID del proyecto.
  • REGION: La región en la que creaste el clúster de Kafka, por ejemplo, us-central1
  • SUBNET: Una subred en la misma red de VPC que la subred que usaste en la configuración del clúster
  • ZONE: Es una zona en la región en la que creaste el clúster, por ejemplo, us-central1-c.

Para obtener más información sobre cómo crear una VM, consulta Crea una instancia de VM en una subred específica.

Asigna roles de IAM

Otorga los siguientes roles de Identity and Access Management (IAM) a la cuenta de servicio predeterminada de Compute Engine:

  • Cliente de Kafka administrado (roles/managedkafka.client)
  • Administrador de Schema Registry (roles/managedkafka.schemaRegistryAdmin)
  • Creador de tokens de cuenta de servicio (roles/iam.serviceAccountTokenCreator)
  • Creador de tokens de OpenID para cuentas de servicio (roles/iam.serviceAccountOpenIdTokenCreator)

Console

  1. En la consola de Google Cloud , dirígete a la página IAM.

    Ir a IAM

  2. Busca la fila de la cuenta de servicio predeterminada de Compute Engine y haz clic en Editar principal.

  3. Haz clic en Agregar otro rol y selecciona el rol Cliente de Kafka administrado. Repite este paso para los roles de Administrador del Registro de esquemas, Creador de tokens de cuenta de servicio y Creador de tokens de OpenID para cuentas de servicio.

  4. Haz clic en Guardar.

gcloud

Para otorgar roles de IAM, usa el comando gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto.

  • PROJECT_NUMBER: Es el número de tu proyecto.

Para obtener el número de proyecto, ejecuta el comando gcloud projects describe:

gcloud projects describe PROJECT_ID

Para obtener más información, consulta Cómo encontrar el nombre, el número y el ID del proyecto.

Conéctate a la VM

Usa SSH para conectarte a la instancia de VM.

Console

  1. Ve a la página Instancias de VM.

    Ir a Instancias de VM

  2. En la lista de instancias de VM, busca el nombre de la VM y haz clic en SSH.

gcloud

Para conectarte a la VM, usa el comando gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

Reemplaza lo siguiente:

  • VM_NAME: El nombre de la VM
  • PROJECT_ID: Es el ID del proyecto.
  • ZONE: la zona en la que creaste la VM

Es posible que se requiera una configuración adicional para el primer uso de SSH. Para obtener más información, consulta Acerca de las conexiones SSH.

Configura un proyecto de Apache Maven

Desde tu sesión de SSH, ejecuta los siguientes comandos para configurar un proyecto de Maven.

  1. Instala Java y Maven con el siguiente comando:

    sudo apt-get install maven openjdk-17-jdk
    
  2. Configura un proyecto de Apache Maven.

    Usa el siguiente comando para crear un paquete com.google.example en un directorio llamado demo.

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

Define el esquema y su implementación en Java

En este ejemplo, un mensaje representa a un "usuario" que tiene un nombre y un ID opcional. El objeto corresponde a un esquema de Avro con dos campos: un campo obligatorio name de tipo string y un número entero opcional id.. Para usar este esquema en un programa de Java, también deberás generar una implementación en Java de un objeto que corresponda a este esquema.

  1. Cambia al directorio del proyecto con cd demo.

  2. Crea las carpetas para almacenar los archivos de esquema en tu código:

    mkdir -p src/main/avro
    
  3. Para crear la definición del esquema de Avro, pega el siguiente código en un archivo llamado src/main/avro/User.avsc:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. Configura tu proyecto de Maven para usar un complemento de generación de código Java de Avro. Para ello, agrega lo siguiente al nodo build de tu pom.xml.. Ten en cuenta que el pom.xml puede tener otros nodos plugins dentro del nodo pluginManagement. No cambies el nodo pluginManagement en este paso. El nodo plugins debe estar en el mismo nivel que pluginManagement.

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. Agrega Avro como dependencia. Para ello, agrega lo siguiente al final del nodo project/dependencies de pom.xml. Ten en cuenta que pom.xml ya tiene un nodo dependencies dentro de la etiqueta dependencyManagement. No cambies el nodo dependencyManagement en este paso.

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. Genera fuentes de Java

      mvn generate-sources
    
  7. Ejecuta el siguiente comando para verificar que se haya creado el archivo fuente de implementación. La fuente es un archivo de clase Java que implementa constructores, métodos de acceso, serializadores y deserializadores para objetos User. Usarás esta clase en el código del productor.

    cat src/main/java/com/google/example/User.java
    

Para obtener más información sobre Apache Avro, consulta la guía de introducción a Apache Avro.

Crea un cliente productor

En esta sección, se explican los pasos para escribir, compilar y ejecutar un cliente productor.

Implementa el productor

El productor usa KafkaAvroSerializer.java para codificar mensajes y administrar sus esquemas. El serializador se conecta automáticamente al registro de esquemas, registra el esquema en un tema, recupera su ID y, luego, serializa el mensaje con Avro. Aún debes configurar el productor y el serializador.

  1. Para crear la clase del cliente productor, pega el siguiente código en un archivo nuevo llamado src/main/java/com/google/example/UserProducer.java.

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. Define la clase de devolución de llamada en src/main/java/com/google/example/SendCallback.java:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. Para compilar este código, necesitas el paquete org.apache.kafka.clients y el código del serializador. El artefacto de Maven del serializador se distribuye a través de un repositorio personalizado. Agrega el siguiente nodo al nodo project de tu pom.xml para configurar este repositorio:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. Agrega lo siguiente al nodo dependencies en tu archivo pom.xml:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. Para asegurarte de que todas las dependencias se resuelvan correctamente, compila el cliente:

    mvn compile
    

Crea un registro de esquemas

Para crear un registro de esquemas, ejecuta el siguiente comando:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

Reemplaza lo siguiente:

  • REGISTRY_ID: Es un identificador único para tu nuevo registro de esquemas. Esto forma parte del nombre del recurso del registro. El nombre debe comenzar con una letra, contener solo letras (a-z, A-Z), números (0-9) y guiones bajos (_), y tener 63 caracteres o menos.

  • REGION: Google Cloud región en la que se creará el registro de esquemas. Esta ubicación debe coincidir con la región del clúster o los clústeres de Kafka que usan este registro.

La definición de esquema que creaste aún no se subió al registro. El cliente productor hace esto la primera vez que se ejecuta en los siguientes pasos.

Configura y ejecuta el productor

En este punto, el productor no se ejecutará, ya que no está completamente configurado. Para configurar el productor, proporciona la configuración de Kafka y del registro de esquemas.

  1. Crea un archivo llamado client.properties en el mismo directorio que tu archivo pom.xml y agrégale el siguiente contenido:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    Agrega las dependencias del controlador de autenticación de Kafka y del registro de esquemas a tu proyecto de Maven insertando lo siguiente en el nodo dependencies de pom.xml arriba de la dependencia kafka-avro-serializer:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    Si deseas ver la implementación del controlador de autenticación del registro de esquema personalizado, consulta la clase GcpBearerAuthCredentialProvider.

  2. Compila y ejecuta el cliente productor:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    Si todo sale bien, verás el resultado Produced a message successfully generado por la clase SendCallback.

Examina el resultado

  1. Comprueba que el esquema User se haya registrado con un nombre de asunto derivado de los nombres del tema y del esquema:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    El resultado de este comando debería verse así:

    ["newUsers-value"]
    
  2. Verifica que el esquema registrado en el repositorio sea el mismo que User:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    El resultado del comando debería verse así:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

Realiza una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.

Console

  1. Borra la instancia de VM.

    1. Ve a la página Instancias de VM.

      Ir a Instancias de VM

    2. Selecciona la VM y haz clic en Borrar.

  2. Borra el registro de esquemas.

    1. Ve a la página Registros de esquemas.

      Ir a Registros de esquemas

    2. Haz clic en el nombre del registro de esquema.

    3. Haz clic en Borrar.

  3. Borra el clúster de Kafka.

    1. Ve a la página Managed Service para Apache Kafka > Clústeres.

      Ir a los clústeres

    2. Selecciona el clúster de Kafka y haz clic en Borrar.

gcloud

  1. Para borrar la VM, usa el comando gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Para borrar el registro de esquemas, usa el comando /sdk/gcloud/reference/managed-kafka/schema-registries/delete.

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. Para borrar el clúster de Kafka, usa el comando gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

¿Qué sigue?

Apache Kafka® y Apache Avro son marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.
Confluent es una marca registrada de Confluent, Inc.