I connettori di sink BigQuery ti consentono di trasmettere i dati in streaming da Kafka a BigQuery, consentendo l'importazione e l'analisi dei dati in tempo reale all'interno di BigQuery. Un connettore di sink BigQuery utilizza i record di uno o più argomenti Kafka e scrive i dati in una o più tabelle all'interno di un singolo set di dati BigQuery.
Prima di iniziare
Prima di creare un connettore di sink BigQuery, assicurati di avere quanto segue:
Crea un cluster Managed Service per Apache Kafka per il tuo cluster di connessione. Questo cluster è il cluster Kafka principale associato al cluster di connessione. Questo cluster è anche il cluster di origine che forma un'estremità della pipeline del connettore di sink BigQuery.
Crea un cluster Connect per ospitare il connettore di sink BigQuery.
Crea un set di dati BigQuery per archiviare i dati trasmessi in streaming da Kafka.
Crea e configura un argomento Kafka all'interno del cluster di origine. I dati vengono spostati da questo argomento Kafka al set di dati BigQuery di destinazione.
Ruoli e autorizzazioni richiesti
Per ottenere le autorizzazioni
necessarie per creare un connettore BigQuery Sink,
chiedi all'amministratore di concederti il
ruolo IAM Editor connettore Kafka gestito (roles/managedkafka.connectorEditor)
sul tuo progetto.
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore BigQuery Sink. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:
Autorizzazioni obbligatorie
Per creare un connettore di sink BigQuery sono necessarie le seguenti autorizzazioni:
-
Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale:
managedkafka.connectors.create
Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.
Per saperne di più sul ruolo Editor connettore Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.
Se il cluster Managed Service per Apache Kafka si trova nello stesso progetto del cluster di connessione, non sono necessarie ulteriori autorizzazioni. Se il cluster si trova in un progetto diverso, consulta Crea un cluster Connect in un altro progetto.
Concedi le autorizzazioni per scrivere nella tabella BigQuery
Il account di servizio del cluster Connect, che segue il formato
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
richiede l'autorizzazione per scrivere nella tabella BigQuery. A questo scopo,
concedi il ruolo Editor dati BigQuery (roles/bigquery.dataEditor)
all'account di servizio del cluster Connect nel progetto contenente la
tabella BigQuery.
Schemi per un connettore di sink BigQuery
Il connettore di sink BigQuery utilizza il convertitore di valori configurato
(value.converter) per analizzare i valori dei record Kafka in campi. Quindi scrive i campi nelle colonne con lo stesso nome nella tabella BigQuery.
Per funzionare, il connettore richiede uno schema. Lo schema può essere fornito nei seguenti modi:
- Schema basato sui messaggi: lo schema è incluso in ogni messaggio.
- Schema basato su tabella: il connettore deduce lo schema del messaggio dallo schema della tabella BigQuery.
- Registro di schema: il connettore legge lo schema da un registro di schema, ad esempio il registro di schema Managed Service per Apache Kafka (anteprima).
Le sezioni successive descrivono queste opzioni.
Schema basato sui messaggi
In questa modalità, ogni record Kafka include uno schema JSON. Il connettore utilizza lo schema per scrivere i dati del record come riga di una tabella BigQuery.
Per utilizzare gli schemi basati sui messaggi, imposta le seguenti proprietà sul connettore:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true
Valore di esempio del record Kafka:
{
"schema": {
"type": "struct",
"fields": [
{
"field": "user",
"type": "string",
"optional": false
},
{
"field": "age",
"type": "int64",
"optional": false
}
]
},
"payload": {
"user": "userId",
"age": 30
}
}
Se la tabella di destinazione esiste già, lo schema della tabella BigQuery deve essere compatibile con lo schema del messaggio incorporato. Se
autoCreateTables=true, il connettore crea automaticamente la tabella di destinazione
se necessario. Per saperne di più, vedi Creazione di tabelle.
Se vuoi che il connettore aggiorni lo schema della tabella BigQuery man mano che cambiano gli schemi dei messaggi, imposta allowNewBigQueryFields, allowSchemaUnionization o allowBigQueryRequiredFieldRelaxation su true.
Schema basato su tabella
In questa modalità, i record Kafka contengono dati JSON semplici senza uno schema esplicito. Il connettore deduce lo schema dalla tabella di destinazione.
Requisiti:
- La tabella BigQuery deve già esistere.
- I dati dei record Kafka devono essere compatibili con lo schema della tabella.
- Questa modalità non supporta gli aggiornamenti dinamici dello schema in base ai messaggi in arrivo.
Per utilizzare gli schemi basati su tabelle, imposta le seguenti proprietà sul connettore:
value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsebigQueryPartitionDecorator=false
Se la tabella BigQuery utilizza
il partizionamento basato sul tempo con
il partizionamento giornaliero, bigQueryPartitionDecorator può essere true. In caso contrario, imposta
questa proprietà su false.
Valore di esempio del record Kafka:
{
"user": "userId",
"age": 30
}
Registro di schema
In questa modalità, ogni record Kafka contiene dati Apache Avro e lo schema del messaggio è archiviato in un registro degli schemi.
Per utilizzare il connettore di sink BigQuery con un registro di schema, imposta le seguenti proprietà sul connettore:
value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url=SCHEMA_REGISTRY_URL
Sostituisci SCHEMA_REGISTRY_URL con l'URL del registro
dello schema.
Per utilizzare il connettore con il registro di schema Managed Service per Apache Kafka, imposta la seguente proprietà:
value.converter.bearer.auth.credentials.source=GCP
Per saperne di più, consulta Utilizzare Kafka Connect con il registro degli schemi.
Tabelle BigLake per Apache Iceberg in BigQuery
Il connettore BigQuery Sink supporta le tabelle BigLake per Apache Iceberg in BigQuery (di seguito, tabelle BigLake Iceberg in BigQuery) come destinazione sink.
Le tabelle BigLake Iceberg in BigQuery forniscono le basi per la creazione di lakehouse in formato aperto su Google Cloud. Le tabelle BigLake Iceberg in BigQuery offrono la stessa esperienza completamente gestita delle tabelle BigQuery, ma archiviano i dati in bucket di archiviazione di proprietà del cliente utilizzando Parquet per essere interoperabili con i formati di tabella aperti Apache Iceberg.
Per informazioni su come creare una tabella Apache Iceberg, vedi Crea una tabella Apache Iceberg.
Crea un connettore di sink BigQuery
Console
Nella console Google Cloud , vai alla pagina Connetti cluster.
Fai clic sul cluster di connessione in cui vuoi creare il connettore.
Fai clic su Crea connettore.
Per il nome del connettore, inserisci una stringa.
Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.
Per Plug-in connettore, seleziona Sink BigQuery.
Nella sezione Argomenti, specifica gli argomenti Kafka da cui leggere. Puoi specificare un elenco di argomenti o un'espressione regolare da confrontare con i nomi degli argomenti.
Opzione 1: scegli Seleziona un elenco di argomenti Kafka. Nell'elenco Argomenti Kafka, seleziona uno o più argomenti. Fai clic su OK.
Opzione 2: scegli Utilizza un'espressione regolare per gli argomenti. Nel campo Espressione regolare argomento, inserisci un'espressione regolare.
Fai clic su Set di dati e specifica un set di dati BigQuery. Puoi scegliere un set di dati esistente o crearne uno nuovo.
(Facoltativo) Nella casella Configurazioni, aggiungi le proprietà di configurazione o modifica le proprietà predefinite. Per saperne di più, consulta Configura il connettore.
Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.
Fai clic su Crea.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Esegui il comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESostituisci quanto segue:
CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.
LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.
CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.
CONFIG_FILE: il percorso del file di configurazione YAML per il connettore BigQuery Sink.
Ecco un esempio di file di configurazione per il connettore BigQuery Sink:
name: "BQ_SINK_CONNECTOR_ID" project: "GCP_PROJECT_ID" topics: "GMK_TOPIC_ID" tasks.max: 3 connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" key.converter: "org.apache.kafka.connect.storage.StringConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" defaultDataset: "BQ_DATASET_ID"Sostituisci quanto segue:
BQ_SINK_CONNECTOR_ID: l'ID o il nome del connettore BigQuery Sink. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.
GCP_PROJECT_ID: l'ID del progetto Google Cloud in cui si trova il set di dati BigQuery.
GMK_TOPIC_ID: l'ID dell'argomento del servizio gestito per Apache Kafka da cui i dati vengono trasmessi al connettore di sink BigQuery.
BQ_DATASET_ID: l'ID del set di dati BigQuery che funge da sink per la pipeline.
Terraform
Puoi utilizzare una risorsa Terraform per creare un connettore.
Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.
Go
Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Java
Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Python
Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Dopo aver creato un connettore, puoi modificarlo, eliminarlo, metterlo in pausa, arrestarlo o riavviarlo.
Configura il connettore
Questa sezione descrive alcune proprietà di configurazione che puoi impostare sul connettore. Per un elenco completo delle proprietà specifiche di questo connettore, consulta Configurazioni del connettore di sink BigQuery.
Nome tabella
Per impostazione predefinita, il connettore utilizza il nome dell'argomento come nome della tabella BigQuery. Per utilizzare un nome di tabella diverso, imposta la proprietà topic2TableMap
con il seguente formato:
topic2TableMap=TOPIC_1:TABLE_1,TOPIC_2:TABLE_2,...
Creazione della tabella
Il connettore di sink BigQuery può creare le tabelle di destinazione se non esistono.
Se
autoCreateTables=true, il connettore tenta di creare le tabelle BigQuery che non esistono. Questa è l'impostazione predefinita.Se
autoCreateTables=false, il connettore non crea tabelle. Se una tabella di destinazione non esiste, si verifica un errore.
Quando autoCreateTables è true, puoi utilizzare le seguenti proprietà di configurazione
per un controllo più granulare su come il connettore crea e
configura le nuove tabelle:
allBQFieldsNullableclusteringPartitionFieldNamesconvertDoubleSpecialValuespartitionExpirationMssanitizeFieldNamessanitizeTopicstimestampPartitionFieldName
Per informazioni su queste proprietà, vedi Configurazioni del connettore BigQuery Sink.
Metadati Kafka
Puoi mappare ulteriori dati da Kafka, come informazioni sui metadati e
informazioni sulle chiavi, nella tabella BigQuery configurando i campi
kafkaDataFieldName e kafkaKeyFieldName rispettivamente. Esempi di
informazioni sui metadati includono l'argomento Kafka, la partizione, l'offset e l'ora
di inserimento.