Crea un connettore Cloud Storage Sink

I connettori di sink Cloud Storage ti consentono di trasmettere i dati in streaming dagli argomenti Kafka ai bucket Cloud Storage. Ciò è utile per archiviare ed elaborare grandi volumi di dati in modo conveniente e scalabile.

Prima di iniziare

Prima di creare un connettore di sink Cloud Storage, assicurati di disporre di quanto segue:

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un connettore sink Cloud Storage, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) nel tuo progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore sink Cloud Storage. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un connettore di sink Cloud Storage sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale: managedkafka.connectors.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per saperne di più sul ruolo Editor connettore Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.

Se il cluster Managed Service per Apache Kafka si trova nello stesso progetto del cluster di connessione, non sono necessarie ulteriori autorizzazioni. Se il cluster Connect si trova in un progetto diverso, consulta Crea un cluster Connect in un progetto diverso.

Concedere le autorizzazioni per scrivere nel bucket Cloud Storage

Il account di servizio del cluster Connect, che segue il formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, richiede le seguenti autorizzazioni Cloud Storage:

  • storage.objects.create
  • storage.objects.delete

A questo scopo, concedi il ruolo Storage Object User (roles/storage.objectUser) al account di servizio del cluster Connect nel progetto contenente il bucket Cloud Storage.

Come funziona un connettore di sink Cloud Storage

Un connettore di sink Cloud Storage estrae i dati da uno o più argomenti Kafka e li scrive in oggetti all'interno di un singolo bucket Cloud Storage.

Ecco una suddivisione dettagliata di come il connettore sink Cloud Storage copia i dati:

  • Il connettore utilizza i messaggi di uno o più argomenti Kafka all'interno del cluster di origine.

  • Il connettore scrive i dati nel bucket Cloud Storage di destinazione specificato nella configurazione del connettore.

  • Il connettore formatta i dati mentre li scrive nel bucket Cloud Storage facendo riferimento a proprietà specifiche nella configurazione del connettore. Per impostazione predefinita, i file di output sono in formato CSV. Puoi configurare la proprietà format.output.type per specificare formati di output diversi, ad esempio JSON.

  • Il connettore assegna anche un nome ai file scritti nel bucket Cloud Storage. Puoi personalizzare i nomi dei file utilizzando le proprietà file.name.prefix e file.name.template. Ad esempio, puoi includere il nome dell'argomento Kafka o le chiavi dei messaggi nel nome file.

  • Un record Kafka è composto da tre componenti: intestazioni, chiavi e valori.

    • Puoi includere le intestazioni nel file di output impostando format.output.fields in modo che le includa. Ad esempio, format.output.fields=value,headers.

    • Puoi includere le chiavi nel file di output impostando format.output.fields in modo da includere key. Ad esempio format.output.fields=key,value,headers.

      Le chiavi possono essere utilizzate anche per raggruppare i record includendo key nella proprietà file.name.template.

  • Per impostazione predefinita, puoi includere i valori nel file di output, in quanto format.output.fields è impostato su value.

  • Il connettore scrive i dati convertiti e formattati nel bucket Cloud Storage specificato.

  • Il connettore comprime i file archiviati nel bucket Cloud Storage se configuri la compressione dei file utilizzando la proprietà file.compression.type.

  • Le configurazioni del convertitore sono limitate dalla proprietà format.output.type.

    • Ad esempio, quando format.output.type è impostato su csv, il convertitore di chiavi deve essere org.apache.kafka.connect.converters.ByteArrayConverter o org.apache.kafka.connect.storage.StringConverter e il convertitore di valori deve essere org.apache.kafka.connect.converters.ByteArrayConverter.

    • Quando format.output.type è impostato su json, lo schema di valori e chiavi non viene scritto insieme ai dati nel file di output, anche se la proprietà value.converter.schemas.enable è true.

  • La proprietà tasks.max controlla il livello di parallelismo per il connettore. L'aumento di tasks.max può migliorare la velocità effettiva, ma il parallelismo effettivo è limitato dal numero di partizioni negli argomenti Kafka.

Proprietà di un connettore di sink Cloud Storage

Quando crei un connettore di sink Cloud Storage, specifica le seguenti proprietà.

Nome connettore

Il nome o l'ID del connettore. Per le linee guida su come assegnare un nome alla risorsa, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome è immutabile.

Tipo di plug-in del connettore

Seleziona Sink Cloud Storage come tipo di plug-in del connettore nella consoleGoogle Cloud . Se non utilizzi l'interfaccia utente per configurare il connettore, devi specificare anche la classe del connettore.

Argomenti

Gli argomenti Kafka da cui il connettore utilizza i messaggi. Puoi specificare uno o più argomenti oppure utilizzare un'espressione regolare per corrispondere a più argomenti. Ad esempio, topic.* per trovare tutti gli argomenti che iniziano con "topic". Questi argomenti devono esistere all'interno del cluster Managed Service per Apache Kafka associato al tuo cluster di connessione.

Bucket Cloud Storage

Scegli o crea il bucket Cloud Storage in cui sono archiviati i dati.

Configurazione

Questa sezione consente di specificare proprietà di configurazione aggiuntive e specifiche del connettore per il connettore sink Cloud Storage.

Poiché i dati negli argomenti Kafka possono essere in vari formati come Avro, JSON o byte non elaborati, una parte fondamentale della configurazione consiste nello specificare i convertitori. I convertitori traducono i dati dal formato utilizzato negli argomenti Kafka nel formato interno standardizzato di Kafka Connect. Il connettore di sink Cloud Storage prende questi dati interni e li trasforma nel formato richiesto dal bucket Cloud Storage prima di scriverli.

Per informazioni più generali sul ruolo dei convertitori in Kafka Connect, sui tipi di convertitori supportati e sulle opzioni di configurazione comuni, consulta la sezione Convertitori.

Di seguito sono riportate alcune configurazioni specifiche per il connettore di sink Cloud Storage:

  • gcs.credentials.default: indica se rilevare o meno automaticamente le credenziali Google Cloud dall'ambiente di esecuzione. Deve essere impostato su true.

  • gcs.bucket.name: specifica il nome del bucket Cloud Storage in cui vengono scritti i dati. Deve essere impostato.

  • file.compression.type: imposta il tipo di compressione per i file archiviati nel bucket Cloud Storage. Esempi sono gzip, snappy, zstd e none. Il valore predefinito è none.

  • file.name.prefix: il prefisso da aggiungere al nome di ogni file memorizzato nel bucket Cloud Storage. Il valore predefinito è vuoto.

  • format.output.type: il tipo di formato dei dati utilizzato per scrivere i dati nei file di output di Cloud Storage. I valori supportati sono: csv, json, jsonl e parquet. Il valore predefinito è csv.

Per un elenco delle proprietà di configurazione disponibili specifiche per questo connettore, consulta la sezione Configurazioni del connettore sink Cloud Storage.

Crea un connettore di sink Cloud Storage

Prima di creare un connettore, consulta la documentazione relativa alle proprietà di un connettore di sink Cloud Storage.

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster di connessione per il quale vuoi creare il connettore.

    Viene visualizzata la pagina Dettagli cluster di connessione.

  3. Fai clic su Crea connettore.

    Viene visualizzata la pagina Crea connettore Kafka.

  4. Per il nome del connettore, inserisci una stringa.

    Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.

  5. Per Plug-in connettore, seleziona Sink Cloud Storage.

  6. Specifica gli argomenti da cui puoi trasmettere in streaming i dati.

  7. Scegli il bucket di archiviazione in cui archiviare i dati.

  8. (Facoltativo) Configura impostazioni aggiuntive nella sezione Configurazione.

  9. Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.

  10. Fai clic su Crea.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Sostituisci quanto segue:

    • CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.

    • CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.

    • CONFIG_FILE: il percorso del file di configurazione YAML per il connettore BigQuery Sink.

    Ecco un esempio di file di configurazione per il connettore di sink Cloud Storage:

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    Sostituisci quanto segue:

    • GMK_TOPIC_ID: l'ID dell'argomento Managed Service per Apache Kafka da cui i dati vengono trasmessi al connettore di sink Cloud Storage.

    • GCS_BUCKET_NAME: il nome del bucket Cloud Storage che funge da sink per la pipeline.

    • GCS_SINK_CONNECTOR_ID: l'ID o il nome del connettore sink Cloud Storage. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

  3. Terraform

    Puoi utilizzare una risorsa Terraform per creare un connettore.

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

    Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Dopo aver creato un connettore, puoi modificarlo, eliminarlo, metterlo in pausa, arrestarlo o riavviarlo.

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.