I connettori di sink Cloud Storage ti consentono di trasmettere i dati in streaming dagli argomenti Kafka ai bucket Cloud Storage. Ciò è utile per archiviare ed elaborare grandi volumi di dati in modo conveniente e scalabile.
Prima di iniziare
Prima di creare un connettore di sink Cloud Storage, assicurati di disporre di quanto segue:
Crea un cluster Managed Service per Apache Kafka per il tuo cluster di connessione. Questo è il cluster Kafka principale associato al cluster di connessione. Questo è anche il cluster di origine che forma un'estremità della pipeline del connettore.
Crea un cluster Connect per ospitare il connettore di sink Cloud Storage.
Crea un bucket Cloud Storage per archiviare i dati trasmessi in streaming da Kafka.
Crea e configura un argomento Kafka all'interno del cluster di origine. I dati vengono spostati da questo argomento Kafka al bucket Cloud Storage di destinazione.
Ruoli e autorizzazioni richiesti
Per ottenere le autorizzazioni
necessarie per creare un connettore sink Cloud Storage,
chiedi all'amministratore di concederti il
ruolo IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor)
nel tuo progetto.
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore sink Cloud Storage. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:
Autorizzazioni obbligatorie
Per creare un connettore di sink Cloud Storage sono necessarie le seguenti autorizzazioni:
-
Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale:
managedkafka.connectors.create
Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.
Per saperne di più sul ruolo Editor connettore Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.
Se il cluster Managed Service per Apache Kafka si trova nello stesso progetto del cluster di connessione, non sono necessarie ulteriori autorizzazioni. Se il cluster Connect si trova in un progetto diverso, consulta Crea un cluster Connect in un progetto diverso.
Concedere le autorizzazioni per scrivere nel bucket Cloud Storage
Il account di servizio del cluster Connect, che segue il formato
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
richiede le seguenti autorizzazioni Cloud Storage:
storage.objects.createstorage.objects.delete
A questo scopo, concedi il ruolo Storage Object User (roles/storage.objectUser) al account di servizio del cluster Connect nel progetto contenente il bucket Cloud Storage.
Come funziona un connettore di sink Cloud Storage
Un connettore di sink Cloud Storage estrae i dati da uno o più argomenti Kafka e li scrive in oggetti all'interno di un singolo bucket Cloud Storage.
Ecco una suddivisione dettagliata di come il connettore sink Cloud Storage copia i dati:
Il connettore utilizza i messaggi di uno o più argomenti Kafka all'interno del cluster di origine.
Il connettore scrive i dati nel bucket Cloud Storage di destinazione specificato nella configurazione del connettore.
Il connettore formatta i dati mentre li scrive nel bucket Cloud Storage facendo riferimento a proprietà specifiche nella configurazione del connettore. Per impostazione predefinita, i file di output sono in formato CSV. Puoi configurare la proprietà
format.output.typeper specificare formati di output diversi, ad esempio JSON.Il connettore assegna anche un nome ai file scritti nel bucket Cloud Storage. Puoi personalizzare i nomi dei file utilizzando le proprietà
file.name.prefixefile.name.template. Ad esempio, puoi includere il nome dell'argomento Kafka o le chiavi dei messaggi nel nome file.Un record Kafka è composto da tre componenti: intestazioni, chiavi e valori.
Puoi includere le intestazioni nel file di output impostando
format.output.fieldsin modo che le includa. Ad esempio,format.output.fields=value,headers.Puoi includere le chiavi nel file di output impostando
format.output.fieldsin modo da includerekey. Ad esempioformat.output.fields=key,value,headers.Le chiavi possono essere utilizzate anche per raggruppare i record includendo
keynella proprietàfile.name.template.
Per impostazione predefinita, puoi includere i valori nel file di output, in quanto
format.output.fieldsè impostato suvalue.Il connettore scrive i dati convertiti e formattati nel bucket Cloud Storage specificato.
Il connettore comprime i file archiviati nel bucket Cloud Storage se configuri la compressione dei file utilizzando la proprietà
file.compression.type.Le configurazioni del convertitore sono limitate dalla proprietà
format.output.type.Ad esempio, quando
format.output.typeè impostato sucsv, il convertitore di chiavi deve essereorg.apache.kafka.connect.converters.ByteArrayConverteroorg.apache.kafka.connect.storage.StringConvertere il convertitore di valori deve essereorg.apache.kafka.connect.converters.ByteArrayConverter.Quando
format.output.typeè impostato sujson, lo schema di valori e chiavi non viene scritto insieme ai dati nel file di output, anche se la proprietàvalue.converter.schemas.enableè true.
La proprietà
tasks.maxcontrolla il livello di parallelismo per il connettore. L'aumento ditasks.maxpuò migliorare la velocità effettiva, ma il parallelismo effettivo è limitato dal numero di partizioni negli argomenti Kafka.
Proprietà di un connettore di sink Cloud Storage
Quando crei un connettore di sink Cloud Storage, specifica le seguenti proprietà.
Nome connettore
Il nome o l'ID del connettore. Per le linee guida su come assegnare un nome alla risorsa, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome è immutabile.
Tipo di plug-in del connettore
Seleziona Sink Cloud Storage come tipo di plug-in del connettore nella consoleGoogle Cloud . Se non utilizzi l'interfaccia utente per configurare il connettore, devi specificare anche la classe del connettore.
Argomenti
Gli argomenti Kafka da cui il connettore utilizza i messaggi.
Puoi specificare uno o più argomenti oppure utilizzare un'espressione regolare per
corrispondere a più argomenti. Ad esempio, topic.* per trovare tutti gli argomenti che iniziano
con "topic". Questi argomenti devono esistere all'interno del
cluster Managed Service per Apache Kafka associato al tuo cluster di connessione.
Bucket Cloud Storage
Scegli o crea il bucket Cloud Storage in cui sono archiviati i dati.
Configurazione
Questa sezione consente di specificare proprietà di configurazione aggiuntive e specifiche del connettore per il connettore sink Cloud Storage.
Poiché i dati negli argomenti Kafka possono essere in vari formati come Avro, JSON o byte non elaborati, una parte fondamentale della configurazione consiste nello specificare i convertitori. I convertitori traducono i dati dal formato utilizzato negli argomenti Kafka nel formato interno standardizzato di Kafka Connect. Il connettore di sink Cloud Storage prende questi dati interni e li trasforma nel formato richiesto dal bucket Cloud Storage prima di scriverli.
Per informazioni più generali sul ruolo dei convertitori in Kafka Connect, sui tipi di convertitori supportati e sulle opzioni di configurazione comuni, consulta la sezione Convertitori.
Di seguito sono riportate alcune configurazioni specifiche per il connettore di sink Cloud Storage:
gcs.credentials.default: indica se rilevare o meno automaticamente le credenziali Google Cloud dall'ambiente di esecuzione. Deve essere impostato sutrue.gcs.bucket.name: specifica il nome del bucket Cloud Storage in cui vengono scritti i dati. Deve essere impostato.file.compression.type: imposta il tipo di compressione per i file archiviati nel bucket Cloud Storage. Esempi sonogzip,snappy,zstdenone. Il valore predefinito ènone.file.name.prefix: il prefisso da aggiungere al nome di ogni file memorizzato nel bucket Cloud Storage. Il valore predefinito è vuoto.format.output.type: il tipo di formato dei dati utilizzato per scrivere i dati nei file di output di Cloud Storage. I valori supportati sono:csv,json,jsonleparquet. Il valore predefinito ècsv.
Per un elenco delle proprietà di configurazione disponibili specifiche per questo connettore, consulta la sezione Configurazioni del connettore sink Cloud Storage.
Crea un connettore di sink Cloud Storage
Prima di creare un connettore, consulta la documentazione relativa alle proprietà di un connettore di sink Cloud Storage.
Console
Nella console Google Cloud , vai alla pagina Connetti cluster.
Fai clic sul cluster di connessione per il quale vuoi creare il connettore.
Viene visualizzata la pagina Dettagli cluster di connessione.
Fai clic su Crea connettore.
Viene visualizzata la pagina Crea connettore Kafka.
Per il nome del connettore, inserisci una stringa.
Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.
Per Plug-in connettore, seleziona Sink Cloud Storage.
Specifica gli argomenti da cui puoi trasmettere in streaming i dati.
Scegli il bucket di archiviazione in cui archiviare i dati.
(Facoltativo) Configura impostazioni aggiuntive nella sezione Configurazione.
Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.
Fai clic su Crea.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Esegui il comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILESostituisci quanto segue:
CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.
LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.
CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.
CONFIG_FILE: il percorso del file di configurazione YAML per il connettore BigQuery Sink.
Ecco un esempio di file di configurazione per il connettore di sink Cloud Storage:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"Sostituisci quanto segue:
GMK_TOPIC_ID: l'ID dell'argomento Managed Service per Apache Kafka da cui i dati vengono trasmessi al connettore di sink Cloud Storage.
GCS_BUCKET_NAME: il nome del bucket Cloud Storage che funge da sink per la pipeline.
GCS_SINK_CONNECTOR_ID: l'ID o il nome del connettore sink Cloud Storage. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.
Terraform
Puoi utilizzare una risorsa Terraform per creare un connettore.
Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.
Go
Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Java
Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Python
Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.
Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.
Dopo aver creato un connettore, puoi modificarlo, eliminarlo, metterlo in pausa, arrestarlo o riavviarlo.
Passaggi successivi
Risolvere i problemi relativi a un connettore di sink Cloud Storage
Mettere in pausa, riprendere, interrompere e riavviare un connettore