Crea un conector de receptor de Cloud Storage

Los conectores receptores de Cloud Storage te permiten transmitir datos de tus temas de Kafka a buckets de Cloud Storage. Esto es útil para almacenar y procesar grandes volúmenes de datos de manera rentable y escalable.

Antes de comenzar

Antes de crear un conector de receptor de Cloud Storage, asegúrate de tener lo siguiente:

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear un conector de Cloud Storage Sink, pídele a tu administrador que te otorgue el rol de IAM de Editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear un conector de Cloud Storage Sink. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un conector de Cloud Storage Sink:

  • Otorga el permiso para crear un conector en el clúster de Connect principal: managedkafka.connectors.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de Editor de Kafka Connector administrado, consulta Roles predefinidos de Managed Service for Apache Kafka.

Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster de Connect se encuentra en un proyecto diferente, consulta Crea un clúster de Connect en un proyecto diferente.

Otorga permisos para escribir en el bucket de Cloud Storage

La cuenta de servicio del clúster de Connect, que sigue el formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requiere los siguientes permisos de Cloud Storage:

  • storage.objects.create
  • storage.objects.delete

Para ello, otorga el rol de Usuario de objetos de almacenamiento (roles/storage.objectUser) a la cuenta de servicio del clúster de Connect en el proyecto que contiene el bucket de Cloud Storage.

Cómo funciona un conector de receptor de Cloud Storage

Un conector receptor de Cloud Storage extrae datos de uno o más temas de Kafka y escribe esos datos en objetos dentro de un solo bucket de Cloud Storage.

A continuación, se incluye un desglose detallado de cómo el conector Cloud Storage Sink copia los datos:

  • El conector consume mensajes de uno o más temas de Kafka dentro del clúster de origen.

  • El conector escribe los datos en el bucket de Cloud Storage de destino que especificaste en la configuración del conector.

  • El conector formatea los datos a medida que los escribe en el bucket de Cloud Storage haciendo referencia a propiedades específicas en la configuración del conector. De forma predeterminada, los archivos de salida están en formato CSV. Puedes configurar la propiedad format.output.type para especificar diferentes formatos de salida, como JSON.

  • El conector también nombra los archivos que se escriben en el bucket de Cloud Storage. Puedes personalizar los nombres de los archivos con las propiedades file.name.prefix y file.name.template. Por ejemplo, puedes incluir el nombre del tema de Kafka o las claves de los mensajes en el nombre del archivo.

  • Un registro de Kafka tiene tres componentes: encabezados, claves y valores.

    • Puedes incluir encabezados en el archivo de salida configurando format.output.fields para que los incluya. Por ejemplo, format.output.fields=value,headers

    • Puedes incluir claves en el archivo de salida configurando format.output.fields para que incluya key. Por ejemplo, format.output.fields=key,value,headers.

      Las claves también se pueden usar para agrupar registros si se incluye key en la propiedad file.name.template.

  • Puedes incluir valores en el archivo de salida de forma predeterminada, ya que format.output.fields se establece en value de forma predeterminada.

  • El conector escribe los datos convertidos y formateados en el bucket de Cloud Storage especificado.

  • El conector comprime los archivos almacenados en el bucket de Cloud Storage si configuras la compresión de archivos con la propiedad file.compression.type.

  • Las configuraciones del convertidor están restringidas por la propiedad format.output.type.

    • Por ejemplo, cuando format.output.type se establece en csv, el convertidor de claves debe ser org.apache.kafka.connect.converters.ByteArrayConverter o org.apache.kafka.connect.storage.StringConverter, y el convertidor de valores debe ser org.apache.kafka.connect.converters.ByteArrayConverter.

    • Cuando format.output.type se establece en json, el esquema de clave y valor no se escribe junto con los datos en el archivo de salida, incluso si la propiedad value.converter.schemas.enable es verdadera.

  • La propiedad tasks.max controla el nivel de paralelismo del conector. Aumentar tasks.max puede mejorar la capacidad de procesamiento, pero el paralelismo real está limitado por la cantidad de particiones en los temas de Kafka.

Propiedades de un conector de receptor de Cloud Storage

Cuando crees un conector de receptor de Cloud Storage, especifica las siguientes propiedades.

Nombre del conector

Es el nombre o ID del conector. Si necesitas ayuda para asignarle un nombre al recurso, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre es inmutable.

Tipo de complemento del conector

Selecciona Cloud Storage Sink como el tipo de complemento del conector en la consola deGoogle Cloud . Si no usas la interfaz de usuario para configurar el conector, también debes especificar la clase del conector.

Temas

Son los temas de Kafka desde los que el conector consume mensajes. Puedes especificar uno o más temas, o bien usar una expresión regular para que coincida con varios temas. Por ejemplo, topic.* para hacer coincidir todos los temas que comienzan con "tema". Estos temas deben existir en el clúster de Managed Service para Apache Kafka asociado a tu clúster de Connect.

Bucket de Cloud Storage

Elige o crea el bucket de Cloud Storage en el que se almacenan los datos.

Configuración

En esta sección, puedes especificar propiedades de configuración adicionales y específicas del conector para el conector de Cloud Storage Sink.

Dado que los datos de los temas de Kafka pueden estar en varios formatos, como Avro, JSON o bytes sin procesar, una parte clave de la configuración implica especificar convertidores. Los convertidores traducen los datos del formato que se usa en tus temas de Kafka al formato interno estandarizado de Kafka Connect. Luego, el conector de Cloud Storage Sink toma estos datos internos y los transforma al formato que requiere tu bucket de Cloud Storage antes de escribirlos.

Para obtener información más general sobre el rol de los convertidores en Kafka Connect, los tipos de convertidores admitidos y las opciones de configuración comunes, consulta convertidores.

Estas son algunas configuraciones específicas del conector Cloud Storage Sink:

  • gcs.credentials.default: Indica si se deben descubrir automáticamente las credenciales de Google Cloud desde el entorno de ejecución. Debe establecerse en true.

  • gcs.bucket.name: Especifica el nombre del bucket de Cloud Storage en el que se escriben los datos. Se debe establecer.

  • file.compression.type: Establece el tipo de compresión para los archivos almacenados en el bucket de Cloud Storage. Algunos ejemplos son gzip, snappy, zstd y none. El valor predeterminado es none.

  • file.name.prefix: Es el prefijo que se agregará al nombre de cada archivo almacenado en el bucket de Cloud Storage. El valor predeterminado es vacío.

  • format.output.type: Es el tipo de formato de datos que se usa para escribir datos en los archivos de salida de Cloud Storage. Los valores admitidos son csv, json, jsonl y parquet. El valor predeterminado es csv.

Para obtener una lista de las propiedades de configuración disponibles específicas de este conector, consulta Configuraciones del conector de Cloud Storage Sink.

Crea un conector de receptor de Cloud Storage

Antes de crear un conector, revisa la documentación sobre las propiedades de un conector de receptor de Cloud Storage.

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect para el que deseas crear el conector.

    Se muestra la página Detalles de conexión del clúster.

  3. Haz clic en Crear conector.

    Se muestra la página Crea un conector de Kafka.

  4. Para el nombre del conector, ingresa una cadena.

    Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.

  5. En Connector plugin, selecciona Cloud Storage Sink.

  6. Especifica los temas desde los que puedes transmitir datos.

  7. Elige el bucket de almacenamiento para almacenar los datos.

  8. (Opcional) Configura parámetros adicionales en la sección Configuración.

  9. Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.

  10. Haz clic en Crear.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Reemplaza lo siguiente:

    A continuación, se muestra un ejemplo de un archivo de configuración para el conector de Cloud Storage Sink:

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    Reemplaza lo siguiente:

    • GMK_TOPIC_ID: Es el ID del tema de Managed Service para Apache Kafka desde el que fluyen los datos hacia el conector receptor de Cloud Storage.

    • GCS_BUCKET_NAME: Es el nombre del bucket de Cloud Storage que actúa como receptor de la canalización.

    • GCS_SINK_CONNECTOR_ID: Es el ID o el nombre del conector de Cloud Storage Sink. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.

  3. Terraform

    Puedes usar un recurso de Terraform para crear un conector.

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.