Com os conectores de coletor do Cloud Storage, é possível transmitir dados dos tópicos do Kafka para buckets do Cloud Storage. Isso é útil para armazenar e processar grandes volumes de dados de maneira econômica e escalonável.
Antes de começar
Antes de criar um conector de coletor do Cloud Storage, verifique se você tem o seguinte:
Crie um cluster do Serviço gerenciado para Apache Kafka para seu cluster do Connect. Esse é o cluster principal do Kafka associado ao cluster do Connect. Esse também é o cluster de origem que forma uma extremidade do pipeline do conector.
Crie um cluster do Connect para hospedar o conector de coletor do Cloud Storage.
Crie um bucket do Cloud Storage para armazenar os dados transmitidos do Kafka.
Crie e configure um tópico do Kafka no cluster de origem. Os dados são movidos desse tópico do Kafka para o bucket de destino do Cloud Storage.
Papéis e permissões necessárias
Para receber as permissões necessárias para criar um conector de gravador do Cloud Storage,
peça ao administrador para conceder a você o papel do IAM
Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor)
no seu projeto.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esse papel predefinido contém as permissões necessárias para criar um conector de gravador do Cloud Storage. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para criar um conector de gravador do Cloud Storage:
-
Conceda a permissão para criar um conector no cluster pai do Connect:
managedkafka.connectors.create
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre o papel Editor do conector gerenciado do Kafka, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.
Se o cluster do Serviço gerenciado para Apache Kafka estiver no mesmo projeto que o cluster do Connect, não serão necessárias outras permissões. Se o cluster do Connect estiver em um projeto diferente, consulte Criar um cluster do Connect em um projeto diferente.
Conceder permissões para gravar no bucket do Cloud Storage
A conta de serviço do cluster do Connect, que segue o formato
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
requer as seguintes permissões do Cloud Storage:
storage.objects.createstorage.objects.delete
Para isso, conceda o papel Usuário de objetos do Storage (roles/storage.objectUser) à conta de serviço do cluster do Connect no projeto que contém o bucket do Cloud Storage.
Como funciona um conector de coletor do Cloud Storage
Um conector de coletor do Cloud Storage extrai dados de um ou mais tópicos do Kafka e grava esses dados em objetos em um único bucket do Cloud Storage.
Confira um detalhamento de como o conector de gravador do Cloud Storage copia dados:
O conector consome mensagens de um ou mais tópicos do Kafka no cluster de origem.
O conector grava os dados no bucket de destino do Cloud Storage especificado na configuração do conector.
O conector formata os dados à medida que os grava no bucket do Cloud Storage referenciando propriedades específicas na configuração do conector. Por padrão, os arquivos de saída estão no formato CSV. É possível configurar a propriedade
format.output.typepara especificar diferentes formatos de saída, como JSON.O conector também nomeia os arquivos gravados no bucket do Cloud Storage. É possível personalizar os nomes dos arquivos usando as propriedades
file.name.prefixefile.name.template. Por exemplo, é possível incluir o nome do tópico do Kafka ou as chaves de mensagem no nome do arquivo.Um registro do Kafka tem três componentes: cabeçalhos, chaves e valores.
Para incluir cabeçalhos no arquivo de saída, defina
format.output.fields. Por exemplo,format.output.fields=value,headers.Para incluir chaves no arquivo de saída, defina
format.output.fieldspara incluirkey. Por exemplo,format.output.fields=key,value,headers.As chaves também podem ser usadas para agrupar registros incluindo
keyna propriedadefile.name.template.
É possível incluir valores no arquivo de saída por padrão, já que
format.output.fieldsé definido comovalue.O conector grava os dados convertidos e formatados no bucket do Cloud Storage especificado.
O conector compacta os arquivos armazenados no bucket do Cloud Storage se você configurar a compactação de arquivos usando a propriedade
file.compression.type.As configurações do conversor são restritas pela propriedade
format.output.type.Por exemplo, quando
format.output.typeé definido comocsv, o conversor de chave precisa serorg.apache.kafka.connect.converters.ByteArrayConverterouorg.apache.kafka.connect.storage.StringConverter, e o conversor de valor precisa serorg.apache.kafka.connect.converters.ByteArrayConverter.Quando
format.output.typeé definido comojson, o esquema de valor e chave não é gravado com os dados no arquivo de saída, mesmo que a propriedadevalue.converter.schemas.enableseja verdadeira.
A propriedade
tasks.maxcontrola o nível de paralelismo do conector. Aumentartasks.maxpode melhorar a capacidade, mas o paralelismo real é limitado pelo número de partições nos tópicos do Kafka.
Propriedades de um conector de coletor do Cloud Storage
Ao criar um conector de coletor do Cloud Storage, especifique as seguintes propriedades.
Nome do conector
O nome ou ID do conector. Para conferir as diretrizes sobre como nomear o recurso, consulte Diretrizes para nomear um recurso do Serviço gerenciado para Apache Kafka. O nome é imutável.
Tipo de plug-in do conector
Selecione Gravador do Cloud Storage como o tipo de plug-in do conector no consoleGoogle Cloud . Se você não usar a interface do usuário para configurar o conector, também precisará especificar a classe dele.
Tópicos
Os tópicos do Kafka de que o conector consome mensagens.
É possível especificar um ou mais temas ou usar uma expressão regular para corresponder a vários temas. Por exemplo, topic.* para corresponder a todos os tópicos que começam
com "tópico". Esses tópicos precisam existir no cluster do Serviço gerenciado para Apache Kafka associado ao seu cluster do Connect.
Bucket do Cloud Storage
Escolha ou crie o bucket do Cloud Storage onde os dados serão armazenados.
Configuração
Nesta seção, é possível especificar outras propriedades de configuração específicas do conector para o conector de gravador do Cloud Storage.
Como os dados nos tópicos do Kafka podem estar em vários formatos, como Avro, JSON ou bytes brutos, uma parte fundamental da configuração envolve especificar conversores. Os conversores traduzem dados do formato usado nos tópicos do Kafka para o formato interno padronizado do Kafka Connect. Em seguida, o conector de gravador do Cloud Storage usa esses dados internos e os transforma no formato exigido pelo bucket do Cloud Storage antes de gravar.
Para mais informações gerais sobre a função dos conversores no Kafka Connect, tipos de conversores compatíveis e opções de configuração comuns, consulte conversores.
Confira algumas configurações específicas do conector de gravador do Cloud Storage:
gcs.credentials.default: se as credenciais Google Cloud serão descobertas automaticamente no ambiente de execução. Precisa ser definido comotrue.gcs.bucket.name: especifica o nome do bucket do Cloud Storage em que os dados são gravados. Obrigatório.file.compression.type: define o tipo de compactação para arquivos armazenados no bucket do Cloud Storage. Exemplos:gzip,snappy,zstdenone. O valor padrão énone.file.name.prefix: o prefixo a ser adicionado ao nome de cada arquivo armazenado no bucket do Cloud Storage. O valor padrão é vazio.format.output.type: o tipo de formato de dados usado para gravar dados nos arquivos de saída do Cloud Storage. Os valores aceitos são:csv,json,jsonleparquet. O valor padrão écsv.
Para uma lista das propriedades de configuração disponíveis específicas para esse conector, consulte as configurações do conector de gravador do Cloud Storage.
Criar um conector de coletor do Cloud Storage
Antes de criar um conector, consulte a documentação sobre as Propriedades de um conector de coletor do Cloud Storage.
Console
No console do Google Cloud , acesse a página Conectar clusters.
Clique no cluster do Connect para o qual você quer criar o conector.
A página Detalhes do cluster de conexão é exibida.
Clique em Criar conector.
A página Criar conector do Kafka é exibida.
Para o nome do conector, insira uma string.
Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka.
Em Plug-in do conector, selecione Gravador do Cloud Storage.
Especifique os tópicos de onde você pode transmitir dados.
Escolha o bucket de armazenamento para armazenar os dados.
(Opcional) Configure outras opções na seção Configuração.
Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.
Clique em Criar.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Execute o comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESubstitua:
CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.
CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.
CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector de coletor do BigQuery.
Confira um exemplo de arquivo de configuração para o conector de gravador do Cloud Storage:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"Substitua:
GMK_TOPIC_ID: o ID do tópico do Serviço gerenciado para Apache Kafka de onde os dados fluem para o conector de coletor do Cloud Storage.
GCS_BUCKET_NAME: o nome do bucket do Cloud Storage que atua como um coletor para o pipeline.
GCS_SINK_CONNECTOR_ID: o ID ou nome do conector de gravador do Cloud Storage. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
Terraform
É possível usar um recurso do Terraform para criar um conector.
Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.
Go
Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.
Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Java
Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Python
Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Depois de criar um conector, é possível editar, excluir, pausar, interromper ou reiniciar.