Créer un connecteur Cloud Storage Sink

Les connecteurs de récepteur Cloud Storage vous permettent de diffuser des données de vos sujets Kafka vers des buckets Cloud Storage. Cela est utile pour stocker et traiter de grands volumes de données de manière économique et évolutive.

Avant de commencer

Avant de créer un connecteur de récepteur Cloud Storage, assurez-vous de disposer des éléments suivants :

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour créer un connecteur Cloud Storage Sink, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Kafka géré (roles/managedkafka.connectorEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un connecteur Cloud Storage Sink. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un connecteur Cloud Storage Sink :

  • Accordez l'autorisation de créer un connecteur sur le cluster Connect parent : managedkafka.connectors.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle Éditeur de connecteurs Kafka gérés, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Si votre cluster Managed Service pour Apache Kafka se trouve dans le même projet que le cluster Connect, aucune autre autorisation n'est requise. Si le cluster Connect se trouve dans un autre projet, consultez Créer un cluster Connect dans un autre projet.

Accorder des autorisations d'écriture dans le bucket Cloud Storage

Le compte de service Connect Cluster, qui suit le format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, nécessite les autorisations Cloud Storage suivantes :

  • storage.objects.create
  • storage.objects.delete

Pour ce faire, attribuez le rôle Utilisateur d'objets Storage (roles/storage.objectUser) au compte de service du cluster Connect dans le projet contenant le bucket Cloud Storage.

Fonctionnement d'un connecteur de récepteur Cloud Storage

Un connecteur de récepteur Cloud Storage extrait les données d'un ou de plusieurs sujets Kafka et les écrit dans des objets d'un même bucket Cloud Storage.

Voici une explication détaillée de la façon dont le connecteur Cloud Storage Sink copie les données :

  • Le connecteur consomme les messages d'un ou de plusieurs sujets Kafka dans le cluster source.

  • Le connecteur écrit les données dans le bucket Cloud Storage cible que vous avez spécifié dans la configuration du connecteur.

  • Le connecteur met en forme les données lorsqu'il les écrit dans le bucket Cloud Storage en se référant à des propriétés spécifiques de la configuration du connecteur. Par défaut, les fichiers de sortie sont au format CSV. Vous pouvez configurer la propriété format.output.type pour spécifier différents formats de sortie, tels que JSON.

  • Le connecteur nomme également les fichiers écrits dans le bucket Cloud Storage. Vous pouvez personnaliser les noms de fichiers à l'aide des propriétés file.name.prefix et file.name.template. Par exemple, vous pouvez inclure le nom du sujet Kafka ou les clés de message dans le nom de fichier.

  • Un enregistrement Kafka comporte trois composants : des en-têtes, des clés et des valeurs.

    • Vous pouvez inclure des en-têtes dans le fichier de sortie en définissant format.output.fields pour inclure les en-têtes. Exemple :format.output.fields=value,headers

    • Vous pouvez inclure des clés dans le fichier de sortie en définissant format.output.fields sur key. Exemple : format.output.fields=key,value,headers.

      Les clés peuvent également être utilisées pour regrouper les enregistrements en incluant key dans la propriété file.name.template.

  • Vous pouvez inclure des valeurs dans le fichier de sortie par défaut, car format.output.fields est défini sur value par défaut.

  • Le connecteur écrit les données converties et mises en forme dans le bucket Cloud Storage spécifié.

  • Le connecteur compresse les fichiers stockés dans le bucket Cloud Storage si vous configurez la compression des fichiers à l'aide de la propriété file.compression.type.

  • Les configurations du convertisseur sont limitées par la propriété format.output.type.

    • Par exemple, lorsque format.output.type est défini sur csv, le convertisseur de clé doit être org.apache.kafka.connect.converters.ByteArrayConverter ou org.apache.kafka.connect.storage.StringConverter, et le convertisseur de valeur doit être org.apache.kafka.connect.converters.ByteArrayConverter.

    • Lorsque format.output.type est défini sur json, le schéma de valeur et de clé n'est pas écrit avec les données dans le fichier de sortie, même si la propriété value.converter.schemas.enable est définie sur "true".

  • La propriété tasks.max contrôle le niveau de parallélisme du connecteur. L'augmentation de tasks.max peut améliorer le débit, mais le parallélisme réel est limité par le nombre de partitions dans les sujets Kafka.

Propriétés d'un connecteur de récepteur Cloud Storage

Lorsque vous créez un connecteur de récepteur Cloud Storage, spécifiez les propriétés suivantes.

Nom du connecteur

Nom ou ID du connecteur. Pour obtenir des instructions sur la façon de nommer la ressource, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom est immuable.

Type de plug-in de connecteur

Sélectionnez Cloud Storage Sink comme type de plug-in de connecteur dans la consoleGoogle Cloud . Si vous n'utilisez pas l'interface utilisateur pour configurer le connecteur, vous devez également spécifier la classe de connecteur.

Thèmes

Sujets Kafka à partir desquels le connecteur consomme des messages. Vous pouvez spécifier un ou plusieurs thèmes, ou utiliser une expression régulière pour faire correspondre plusieurs thèmes. Par exemple, topic.* pour correspondre à tous les sujets commençant par "topic". Ces sujets doivent exister dans le cluster Managed Service pour Apache Kafka associé à votre cluster Connect.

Bucket Cloud Storage

Sélectionnez ou créez le bucket Cloud Storage dans lequel les données sont stockées.

Configuration

Cette section vous permet de spécifier des propriétés de configuration supplémentaires et spécifiques au connecteur pour le connecteur Cloud Storage Sink.

Étant donné que les données des thèmes Kafka peuvent être dans différents formats (Avro, JSON ou octets bruts, par exemple), une partie essentielle de la configuration consiste à spécifier des convertisseurs. Les convertisseurs traduisent les données du format utilisé dans vos sujets Kafka au format interne standardisé de Kafka Connect. Le connecteur Cloud Storage Sink prend ensuite ces données internes et les transforme au format requis par votre bucket Cloud Storage avant de les écrire.

Pour obtenir des informations plus générales sur le rôle des convertisseurs dans Kafka Connect, les types de convertisseurs compatibles et les options de configuration courantes, consultez Convertisseurs.

Voici quelques configurations spécifiques au connecteur Cloud Storage Sink :

  • gcs.credentials.default : indique si les identifiants Google Cloud doivent être découverts automatiquement à partir de l'environnement d'exécution. Doit être défini sur true.

  • gcs.bucket.name : spécifie le nom du bucket Cloud Storage dans lequel les données sont écrites. Doit être définie.

  • file.compression.type : définit le type de compression pour les fichiers stockés dans le bucket Cloud Storage. Exemples : gzip, snappy, zstd et none. La valeur par défaut est none.

  • file.name.prefix : préfixe à ajouter au nom de chaque fichier stocké dans le bucket Cloud Storage. La valeur par défaut est vide.

  • format.output.type : type de format de données utilisé pour écrire les données dans les fichiers de sortie Cloud Storage. Les valeurs acceptées sont csv, json, jsonl et parquet. La valeur par défaut est csv.

Pour obtenir la liste des propriétés de configuration disponibles spécifiques à ce connecteur, consultez Configurations du connecteur Cloud Storage Sink.

Créer un connecteur de récepteur Cloud Storage

Avant de créer un connecteur, consultez la documentation sur les propriétés d'un connecteur de récepteur Cloud Storage.

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect pour lequel vous souhaitez créer le connecteur.

    La page Connecter les détails du cluster s'affiche.

  3. Cliquez sur Créer un connecteur.

    La page Créer un connecteur Kafka s'affiche.

  4. Saisissez une chaîne pour le nom du connecteur.

    Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  5. Pour Plug-in de connecteur, sélectionnez Récepteur Cloud Storage.

  6. Spécifiez les sujets à partir desquels vous pouvez diffuser des données.

  7. Choisissez le bucket Storage dans lequel stocker les données.

  8. (Facultatif) Configurez d'autres paramètres dans la section Configuration.

  9. Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.

  10. Cliquez sur Créer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka connectors create :

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • LOCATION : emplacement où vous créez le connecteur. Il doit s'agir du même emplacement que celui où vous avez créé le cluster Connect.

    • CONNECT_CLUSTER_ID : ID du cluster Connect où le connecteur est créé.

    • CONFIG_FILE : chemin d'accès au fichier de configuration YAML pour le connecteur BigQuery Sink.

    Voici un exemple de fichier de configuration pour le connecteur Cloud Storage Sink :

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    Remplacez les éléments suivants :

    • GMK_TOPIC_ID : ID du sujet Managed Service pour Apache Kafka à partir duquel les données sont transférées vers le connecteur de récepteur Cloud Storage.

    • GCS_BUCKET_NAME : nom du bucket Cloud Storage qui sert de récepteur pour le pipeline.

    • GCS_SINK_CONNECTOR_ID : ID ou nom du connecteur Cloud Storage Sink. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

  3. Terraform

    Vous pouvez utiliser une ressource Terraform pour créer un connecteur.

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Une fois que vous avez créé un connecteur, vous pouvez le modifier, le supprimer, le suspendre, l'arrêter ou le redémarrer.

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.