Gerar mensagens Avro com o registro de esquema

Aprenda a desenvolver um aplicativo produtor Java que usa o registro de esquema (prévia) para produzir mensagens do Apache Avro. O aplicativo grava as mensagens em um cluster do Serviço gerenciado para Apache Kafka.

Antes de começar

Antes de iniciar este tutorial, crie um cluster do serviço gerenciado para Apache Kafka. Se você já tiver um cluster, pule esta etapa.

Como criar um cluster

Console

  1. Acesse a página Serviço Gerenciado para Apache Kafka > Clusters.

    Acessar Clusters

  2. Clique em Criar.
  3. Na caixa Nome do cluster, insira um nome para o cluster.
  4. Na lista Região, selecione um local para o cluster.
  5. Em Configuração de rede, configure a sub-rede em que o cluster está acessível:
    1. Em Projeto, selecione o projeto.
    2. Em Rede, selecione a rede VPC.
    3. Em Sub-rede, selecione a sub-rede.
    4. Clique em Concluído.
  6. Clique em Criar.

Depois de clicar em Criar, o estado do cluster será Creating. Quando o cluster estiver pronto, o estado será Active.

gcloud

Para criar um cluster do Kafka, execute o comando managed-kafka clusters create.

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

Substitua:

  • KAFKA_CLUSTER: um nome para o cluster do Kafka
  • REGION: o local do cluster
  • PROJECT_ID: ID do projeto;
  • SUBNET_NAME: a sub-rede em que você quer criar o cluster, por exemplo, default

Para informações sobre os locais aceitos, consulte Locais do serviço gerenciado para Apache Kafka.

O comando é executado de forma assíncrona e retorna um ID de operação:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

Para acompanhar o progresso da operação de criação, use o comando gcloud managed-kafka operations describe:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

Quando o cluster estiver pronto, a saída desse comando vai incluir a entrada state: ACTIVE. Para mais informações, consulte Monitorar a operação de criação de cluster.

Funções exigidas

Para ter as permissões necessárias para criar e configurar uma VM cliente, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.

Configurar uma VM cliente

Crie uma instância de máquina virtual (VM) do Linux no Compute Engine que possa acessar o cluster do Kafka. Ao configurar a VM, defina as seguintes opções:

  • Region. Crie a VM na mesma região do cluster do Kafka.

  • Sub-rede. Crie a VM na mesma rede VPC da sub-rede usada na configuração do cluster do Kafka. Para mais informações, consulte Ver as sub-redes de um cluster.

  • Escopos de acesso. Atribua o escopo de acesso https://www.googleapis.com/auth/cloud-platform à VM. Esse escopo autoriza a VM a enviar solicitações para a API Managed Kafka.

As etapas a seguir mostram como definir essas opções.

Console

  1. No console do Google Cloud , acesse a página Criar uma instância.

    Criar uma instância

  2. No painel Configuração da máquina, faça o seguinte:

    1. No campo Nome, especifique um nome para a instância. Para mais informações, consulte Convenção de nomenclatura de recursos.

    2. Na lista Região, selecione a mesma região do cluster do Kafka.

    3. Na lista Zona, selecione uma zona.

  3. No menu de navegação, clique em Rede. No painel Rede que aparece, faça o seguinte:

    1. Acesse a seção Interfaces de rede.

    2. Para expandir a interface de rede padrão, clique na seta .

    3. No campo Rede, escolha a rede VPC.

    4. Na lista Sub-rede, selecione a sub-rede.

    5. Clique em Concluído.

  4. No menu de navegação, clique em Segurança. No painel Segurança que aparece, faça o seguinte:

    1. Para Escopos de acesso, selecione Definir acesso para cada API.

    2. Na lista de escopos de acesso, encontre a lista suspensa Cloud Platform e selecione Ativado.

  5. Clique em Criar para criar a VM.

gcloud

Para criar a instância de VM, use o comando gcloud compute instances create.

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

Substitua:

  • VM_NAME: o nome da VM
  • PROJECT_ID: ID do projeto;
  • REGION: a região em que você criou o cluster do Kafka, por exemplo, us-central1.
  • SUBNET: uma sub-rede na mesma rede VPC que a usada na configuração do cluster
  • ZONE: uma zona na região em que você criou o cluster, por exemplo, us-central1-c

Para mais informações sobre como criar uma VM, consulte Criar uma instância de VM em uma sub-rede específica.

Conceder papéis do IAM

Conceda os seguintes papéis do Identity and Access Management (IAM) à conta de serviço padrão do Compute Engine:

  • Cliente Kafka gerenciado (roles/managedkafka.client)
  • Administrador do Schema Registry (roles/managedkafka.schemaRegistryAdmin)
  • Criador do token da conta de serviço (roles/iam.serviceAccountTokenCreator)
  • Criador do token do OpenID da conta de serviço (roles/iam.serviceAccountOpenIdTokenCreator)

Console

  1. No console do Google Cloud , acesse a página IAM.

    Acessar IAM

  2. Encontre a linha da conta de serviço padrão do Compute Engine e clique em Editar principal.

  3. Clique em Adicionar outro papel e selecione Cliente gerenciado do Kafka. Repita essa etapa para os papéis Administrador do Schema Registry, Criador de token da conta de serviço e Criador de token do OpenID da conta de serviço.

  4. Clique em Salvar.

gcloud

Para conceder papéis do IAM, use o comando gcloud projects add-iam-policy-binding.

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.schemaRegistryAdmin

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

Substitua:

  • PROJECT_ID: ID do projeto;

  • PROJECT_NUMBER: o ID do seu projeto

Para saber o número do projeto, execute o comando gcloud projects describe:

gcloud projects describe PROJECT_ID

Para mais informações, consulte Encontrar o nome, o número e o ID do projeto.

Conectar-se à VM

Use SSH para se conectar à instância de VM.

Console

  1. Acesse a página Instâncias de VM.

    Acessar instâncias de VM

  2. Na lista de instâncias de VM, encontre o nome da VM e clique em SSH.

gcloud

Para se conectar à VM, use o comando gcloud compute ssh.

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

Substitua:

  • VM_NAME: o nome da VM
  • PROJECT_ID: ID do projeto;
  • ZONE: a zona em que você criou a VM

Talvez seja necessário fazer mais configurações para usar o SSH pela primeira vez. Para mais informações, consulte Sobre conexões SSH.

Configurar um projeto do Apache Maven

Na sessão SSH, execute os comandos a seguir para configurar um projeto Maven.

  1. Instale o Java e o Maven com o comando:

    sudo apt-get install maven openjdk-17-jdk
    
  2. Configure um projeto do Apache Maven.

    Use o comando a seguir para criar um pacote com.google.example em um diretório chamado demo.

    mvn archetype:generate -DartifactId=demo -DgroupId=com.google.example\
       -DarchetypeArtifactId=maven-archetype-quickstart\
       -DarchetypeVersion=1.5 -DinteractiveMode=false
    

Definir o esquema e a implementação em Java

Neste exemplo, uma mensagem representa um "usuário" que tem um nome e um ID opcional. Ele corresponde a um esquema Avro com dois campos: um campo obrigatório name do tipo string e um inteiro opcional id.. Para usar esse esquema em um programa Java, também é necessário gerar uma implementação Java de um objeto correspondente a ele.

  1. Mude para o diretório do projeto com cd demo.

  2. Crie as pastas para armazenar arquivos de esquema no seu código:

    mkdir -p src/main/avro
    
  3. Crie a definição do esquema Avro colando o seguinte código em um arquivo chamado src/main/avro/User.avsc:

    {
      "namespace": "com.google.example",
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "id",  "type": ["int", "null"]}
      ]
    }
    
  4. Configure seu projeto Maven para usar um plug-in de geração de código Java do Avro adicionando o seguinte ao nó build do pom.xml.. Observe que o pom.xml pode ter outros nós plugins dentro do nó pluginManagement . Não mude o nó pluginManagement nesta etapa. O nó plugins precisa estar no mesmo nível de pluginManagement.

    <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals>
            <goal>schema</goal>
          </goals>
          <configuration>
            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
          </configuration>
        </execution>
      </executions>
    </plugin>
    </plugins>
    
  5. Adicione o Avro como uma dependência adicionando o seguinte ao final do nó project/dependencies de pom.xml. O pom.xml já tem um nó dependencies dentro da tag dependencyManagement. Não mude o nó dependencyManagement nesta etapa.

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.11.1</version>
    </dependency>
    
  6. Gerar fontes Java

      mvn generate-sources
    
  7. Execute o comando a seguir para verificar se o arquivo de origem da implementação foi criado. A origem é um arquivo de classe Java que implementa construtores, acessadores, serializadores e desserializadores para objetos User. Você vai usar essa classe no código do produtor.

    cat src/main/java/com/google/example/User.java
    

Para mais informações sobre o Apache Avro, consulte o guia de introdução do Apache Avro.

Criar um cliente produtor

Esta seção mostra as etapas para escrever, criar e executar um cliente produtor.

Implementar o produtor

O produtor usa KafkaAvroSerializer.java para codificar mensagens e gerenciar os esquemas delas. O serializador se conecta automaticamente ao registro de esquema, registra o esquema em um assunto, recupera o ID e serializa a mensagem usando Avro. Você ainda precisa configurar o produtor e o serializador.

  1. Crie a classe do cliente produtor colando o código a seguir em um novo arquivo chamado src/main/java/com/google/example/UserProducer.java

    
    package com.google.example;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    
      public class UserProducer {
    
        private static Properties configure() throws Exception {
            Properties p = new Properties();
            p.load(new java.io.FileReader("client.properties"));
            p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
            return p;
        }
    
        public static void main(String[] args) throws Exception {
            Properties p = configure();
    
            KafkaProducer<String, User> producer = new KafkaProducer<String, User>(p);
            final User u = new User("SchemaEnthusiast", 42);
            final String topicName = "newUsers";
            ProducerRecord<String, User>  message =
              new ProducerRecord<String, User>(topicName, "", u);
            producer.send(message, new SendCallback());
            producer.close();
        }
      }
    
  2. Defina a classe de callback em src/main/java/com/google/example/SendCallback.java:

    package com.google.example;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    class SendCallback implements Callback {
          public void onCompletion(RecordMetadata m, Exception e){
              if (e == null){
                System.out.println("Produced a message successfully.");
              } else {
                System.out.println(e.getMessage());
              }
          }
    }
    
  3. Para compilar esse código, você precisa do pacote org.apache.kafka.clients e do código do serializador. O artefato Maven do serializador é distribuído por um repositório personalizado. Adicione o seguinte nó ao nó project do seu pom.xml para configurar este repositório:

      <repositories>
        <repository>
          <id>confluent</id>
          <name>Confluent</name>
          <url>https://packages.confluent.io/maven/</url>
        </repository>
      </repositories>
    
  4. Adicione o seguinte ao nó dependencies no arquivo pom.xml:

       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-simple</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.8.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
  5. Para garantir que todas as dependências sejam resolvidas corretamente, compile o cliente:

    mvn compile
    

Criar um registro de esquema

Para criar um registro de esquema, execute o seguinte comando:

gcloud beta managed-kafka schema-registries create REGISTRY_ID \
    --location=REGION

Substitua:

  • REGISTRY_ID: um identificador exclusivo para o novo registro de esquema. Isso faz parte do nome do recurso do registro. O nome precisa começar com uma letra, conter apenas letras (a-z, A-Z), números (0-9) e sublinhados (_) e ter 63 caracteres ou menos.

  • REGION: Google Cloud região em que o registro de esquema será criado. Ela precisa corresponder à região do cluster ou dos clusters do Kafka que usam esse registro.

A definição de esquema que você criou ainda não foi enviada para o registro. O cliente produtor faz isso na primeira vez que é executado nas etapas a seguir.

Configurar e executar o produtor

Neste ponto, o produtor não será executado porque não está totalmente configurado. Para configurar o produtor, forneça a configuração do Kafka e do registro de esquema.

  1. Crie um arquivo chamado client.properties no mesmo diretório do seu pom.xml e adicione o seguinte conteúdo:

    bootstrap.servers=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    
    schema.registry.url=https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/schemaRegistries/REGISTRY_ID
    bearer.auth.credentials.source=CUSTOM
    bearer.auth.custom.provider.class=com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider
    

    Adicione as dependências do Kafka e do manipulador de autenticação do registro de esquema ao seu projeto Maven inserindo o seguinte no nó dependencies de pom.xml acima da dependência kafka-avro-serializer:

      <dependency>
          <groupId>com.google.cloud.hosted.kafka</groupId>
          <artifactId>managed-kafka-auth-login-handler</artifactId>
          <version>1.0.6</version>
          <exclusions>
            <exclusion>
              <groupId>io.confluent</groupId>
              <artifactId>kafka-schema-registry-client</artifactId>
            </exclusion>
        </exclusions>
      </dependency>
    

    Se quiser ver a implementação do manipulador de autenticação do registro de esquema personalizado, consulte a classe GcpBearerAuthCredentialProvider.

  2. Compile e execute o cliente produtor:

    mvn compile -q exec:java -Dexec.mainClass=com.google.example.UserProducer
    

    Se tudo correr bem, você verá a saída Produced a message successfully gerada pela classe SendCallback.

Analisar a saída

  1. Verifique se o esquema User foi registrado com um nome de assunto derivado dos nomes do tópico e do esquema:

    SR_DOMAIN=https://managedkafka.googleapis.com
    SR_PATH=/v1/projects/PROJECT_ID/locations/REGION
    SR_HOST=$SR_DOMAIN/$SR_PATH/schemaRegistries/REGISTRY_ID/subjects
    
    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)"\
      $SR_HOST
    

    A saída desse comando vai ser parecida com esta:

    ["newUsers-value"]
    
  2. Verifique se o esquema registrado no repositório é o mesmo que User:

    curl -X GET \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      $SR_HOST/newUsers-value/versions/1
    

    A saída do comando terá esta aparência:

    {
      "subject": "newUsers-value",
      "version": 1,
      "id": 2,
      "schemaType": "AVRO",
      "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.google.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":[\"int\",\"null\"]}]}",
      "references": []
    }
    

Limpar

Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, siga as etapas abaixo.

Console

  1. Exclua a instância de VM.

    1. Acesse a página Instâncias de VM.

      Acessar instâncias de VM

    2. Selecione a VM e clique em Excluir.

  2. Exclua o registro de esquema.

    1. Acesse a página Registros de esquema.

      Acessar "Registros de esquema"

    2. Clique no nome do registro de esquema.

    3. Clique em Excluir.

  3. Exclua o cluster do Kafka.

    1. Acesse a página Serviço gerenciado para Apache Kafka > Clusters.

      Acessar Clusters

    2. Selecione o cluster do Kafka e clique em Excluir.

gcloud

  1. Para excluir a VM, use o comando gcloud compute instances delete.

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Para excluir o registro de esquema, use o comando /sdk/gcloud/reference/managed-kafka/schema-registries/delete.

    gcloud beta managed-kafka schema-registries delete REGISTRY_ID \
      --location=REGION
    
  3. Para excluir o cluster do Kafka, use o comando gcloud managed-kafka clusters delete.

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

A seguir

Apache Kafka® e Apache Avro são marcas registradas da The Apache Software Foundation ou de suas afiliadas nos Estados Unidos e/ou em outros países.
Confluent é uma marca registrada da Confluent, Inc.