Com os conectores de coletor do BigQuery, é possível transmitir dados do Kafka para o BigQuery, permitindo a ingestão e análise de dados em tempo real no BigQuery. Um conector de coletor do BigQuery consome registros de um ou mais tópicos do Kafka e grava os dados em uma ou mais tabelas em um único conjunto de dados do BigQuery.
Antes de começar
Antes de criar um conector de coletor do BigQuery, verifique se você tem o seguinte:
Crie um cluster do Serviço gerenciado para Apache Kafka para seu cluster do Connect. Esse cluster é o cluster principal do Kafka associado ao cluster do Connect. Esse cluster também é a origem que forma uma extremidade do pipeline do conector de coletor do BigQuery.
Crie um cluster do Connect para hospedar seu conector de coletor do BigQuery.
Crie um conjunto de dados do BigQuery para armazenar os dados transmitidos do Kafka.
Crie e configure um tópico do Kafka no cluster de origem. Os dados são movidos desse tópico do Kafka para o conjunto de dados de destino do BigQuery.
Papéis e permissões necessárias
Para receber as permissões necessárias para criar um conector de gravador do BigQuery,
peça ao administrador para conceder a você o
papel do IAM Editor de conector gerenciado do Kafka (roles/managedkafka.connectorEditor)
no seu projeto.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esse papel predefinido contém as permissões necessárias para criar um conector de gravador do BigQuery. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para criar um conector de gravador do BigQuery:
-
Conceda a permissão para criar um conector no cluster pai do Connect:
managedkafka.connectors.create
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre o papel Editor do conector gerenciado do Kafka, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.
Se o cluster do Serviço gerenciado para Apache Kafka estiver no mesmo projeto que o cluster do Connect, não serão necessárias outras permissões. Se o cluster estiver em um projeto diferente, consulte Criar um cluster do Connect em um projeto diferente.
Conceder permissões para gravar na tabela do BigQuery
A conta de serviço do cluster do Connect, que segue o formato
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
precisa de permissão para gravar na tabela do BigQuery. Para isso, conceda o papel Editor de dados do BigQuery (roles/bigquery.dataEditor) à conta de serviço do cluster do Connect no projeto que contém a tabela do BigQuery.
Esquemas para um conector de coletor do BigQuery
O conector do coletor do BigQuery usa o conversor de valores configurado (value.converter) para analisar os valores de registro do Kafka em campos. Em seguida, ele grava os campos em colunas com o mesmo nome na tabela do BigQuery.
O conector precisa de um esquema para funcionar. O esquema pode ser fornecido das seguintes maneiras:
- Esquema baseado em mensagens: o esquema é incluído como parte de cada mensagem.
- Esquema baseado em tabela: o conector infere o esquema da mensagem do esquema da tabela do BigQuery.
- Registro de esquema: o conector lê o esquema de um registro de esquema, como o registro de esquema do Serviço gerenciado para Apache Kafka (prévia).
As próximas seções descrevem essas opções.
Esquema baseado em mensagens
Nesse modo, cada registro do Kafka inclui um esquema JSON. O conector usa o esquema para gravar os dados do registro como uma linha de tabela do BigQuery.
Para usar esquemas baseados em mensagens, defina as seguintes propriedades no conector:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Exemplo de valor de registro do Kafka:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
Se a tabela de destino já existir, o esquema da tabela do BigQuery precisará ser compatível com o esquema da mensagem incorporada. Se
autoCreateTables=true, o conector vai criar automaticamente a tabela de destino, se necessário. Para mais informações, consulte Criação de tabelas.
Se você quiser que o conector atualize o esquema da tabela do BigQuery à medida que os esquemas de mensagens mudam, defina allowNewBigQueryFields, allowSchemaUnionization ou allowBigQueryRequiredFieldRelaxation como true.
Esquema baseado em tabela
Nesse modo, os registros do Kafka contêm dados JSON simples sem um esquema explícito. O conector infere o esquema da tabela de destino.
Requisitos:
- A tabela do BigQuery precisa existir.
- Os dados de registro do Kafka precisam ser compatíveis com o esquema da tabela.
- Esse modo não é compatível com atualizações dinâmicas de esquema com base em mensagens recebidas.
Para usar esquemas baseados em tabelas, defina as seguintes propriedades no conector:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
Se a tabela do BigQuery usar particionamento com base em tempo com particionamento diário, bigQueryPartitionDecorator poderá ser true. Caso contrário, defina
essa propriedade como false.
Exemplo de valor de registro do Kafka:
{
"user": "userId",
"age": 30
}
Registro de esquema
Nesse modo, cada registro do Kafka contém dados do Apache Avro, e o esquema da mensagem é armazenado em um registro de esquema.
Para usar o conector de coletor do BigQuery com um registro de esquema, defina as seguintes propriedades no conector:
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
Substitua SCHEMA_REGISTRY_URL pelo URL do registro de
esquema.
Para usar o conector com o registro de esquema do Serviço gerenciado para Apache Kafka, defina a seguinte propriedade:
value.converter.bearer.auth.credentials.source=GCP
Para mais informações, consulte Usar o Kafka Connect com o registro de esquema.
Tabelas do BigLake para Apache Iceberg no BigQuery
O conector de gravador do BigQuery é compatível com tabelas do BigLake para Apache Iceberg no BigQuery (doravante, tabelas do BigLake Iceberg no BigQuery) como um destino de gravador.
As tabelas do BigLake Iceberg no BigQuery oferecem a base para criar lakehouses de formato aberto no Google Cloud. As tabelas do BigLake Iceberg no BigQuery oferecem a mesma experiência totalmente gerenciada das tabelas do BigQuery, mas armazenam dados em buckets de armazenamento de propriedade do cliente usando o Parquet para serem interoperáveis com formatos de tabela aberta do Apache Iceberg.
Para informações sobre como criar uma tabela do Apache Iceberg, consulte Criar uma tabela do Apache Iceberg.
Criar um conector de coletor do BigQuery
Console
No console do Google Cloud , acesse a página Conectar clusters.
Clique no cluster do Connect em que você quer criar o conector.
Clique em Criar conector.
Para o nome do conector, insira uma string.
Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka.
Em Plug-in do conector, selecione Coletor do BigQuery.
Na seção Tópicos, especifique os tópicos do Kafka que serão lidos. É possível especificar uma lista de tópicos ou uma expressão regular para corresponder aos nomes dos tópicos.
Opção 1: escolha Selecionar uma lista de tópicos do Kafka. Na lista Tópicos do Kafka, selecione um ou mais tópicos. Clique em OK.
Opção 2: escolha Usar uma regex do tópico. No campo Expressão regular do tópico, insira uma expressão regular.
Clique em Conjunto de dados e especifique um conjunto de dados do BigQuery. Você pode escolher um conjunto de dados existente ou criar um novo.
Opcional: na caixa Configurações, adicione propriedades de configuração ou edite as propriedades padrão. Para mais informações, consulte Configurar o conector.
Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização de tarefas.
Clique em Criar.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Execute o comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESubstitua:
CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
LOCATION: o local em que você cria o conector. Precisa ser o mesmo local em que você criou o cluster do Connect.
CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector foi criado.
CONFIG_FILE: o caminho para o arquivo de configuração YAML do conector de coletor do BigQuery.
Confira um exemplo de arquivo de configuração para o conector BigQuery Sink:
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"Substitua:
BQ_SINK_CONNECTOR_ID: o ID ou nome do conector de coletor do BigQuery. Para conferir as diretrizes de nomeação de um conector, acesse Diretrizes de nomeação de recursos do Serviço gerenciado para Apache Kafka. O nome de um conector é imutável.
GCP_PROJECT_ID: o ID do projeto do Google Cloud em que o conjunto de dados do BigQuery está localizado.
GMK_TOPIC_ID: o ID do tópico do Serviço gerenciado para Apache Kafka de onde os dados fluem para o conector de coletor do BigQuery.
BQ_DATASET_ID: o ID do conjunto de dados do BigQuery que atua como coletor do pipeline.
Terraform
É possível usar um recurso do Terraform para criar um conector.
Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.
Go
Antes de testar esta amostra, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do serviço gerenciado para Apache Kafka.
Para autenticar o Managed Service para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Java
Antes de testar esta amostra, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Python
Antes de testar esta amostra, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do serviço gerenciado para Apache Kafka.
Para autenticar o serviço gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.
Depois de criar um conector, é possível editar, excluir, pausar, interromper ou reiniciar.
Configurar o conector
Nesta seção, descrevemos algumas propriedades de configuração que podem ser definidas no conector. Para uma lista completa das propriedades específicas desse conector, consulte Configurações do conector de gravador do BigQuery.
Nome da tabela
Por padrão, o conector usa o nome do tópico como o nome da tabela do BigQuery. Para usar um nome de tabela diferente, defina a propriedade topic2TableMap
com o seguinte formato:
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
Criação de tabelas
O conector de coletor do BigQuery pode criar as tabelas de destino se elas não existirem.
Se
autoCreateTables=true, o conector tentará criar tabelas do BigQuery que não existam. Essa é a configuração padrão.Se
autoCreateTables=false, o conector não vai criar nenhuma tabela. Se uma tabela de destino não existir, ocorrerá um erro.
Quando autoCreateTables é true, é possível usar as seguintes propriedades de configuração para ter um controle mais refinado sobre como o conector cria e configura novas tabelas:
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
Para informações sobre essas propriedades, consulte Configurações do conector de gravador do BigQuery.
Metadados do Kafka
É possível mapear outros dados do Kafka, como informações de metadados e
chaves, na tabela do BigQuery configurando os campos
kafkaDataFieldName e kafkaKeyFieldName, respectivamente. Exemplos de informações de metadados incluem o tópico, a partição, o deslocamento e o tempo de inserção do Kafka.