Los conectores receptores de Cloud Storage te permiten transmitir datos de tus temas de Kafka a buckets de Cloud Storage. Esto es útil para almacenar y procesar grandes volúmenes de datos de manera rentable y escalable.
Antes de comenzar
Antes de crear un conector de receptor de Cloud Storage, asegúrate de tener lo siguiente:
Crea un clúster de Managed Service para Apache Kafka para tu clúster de Connect. Es el clúster principal de Kafka asociado con el clúster de Connect. Este es también el clúster de origen que forma un extremo de la canalización del conector.
Crea un clúster de Connect para alojar tu conector de receptor de Cloud Storage.
Crea un bucket de Cloud Storage para almacenar los datos transmitidos desde Kafka.
Crea y configura un tema de Kafka en el clúster de origen. Los datos se transfieren de este tema de Kafka al bucket de destino de Cloud Storage.
Roles y permisos requeridos
Para obtener los permisos que necesitas para crear un conector de Cloud Storage Sink, pídele a tu administrador que te otorgue el rol de IAM de Editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Este rol predefinido contiene los permisos necesarios para crear un conector de Cloud Storage Sink. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:
Permisos necesarios
Se requieren los siguientes permisos para crear un conector de Cloud Storage Sink:
-
Otorga el permiso para crear un conector en el clúster de Connect principal:
managedkafka.connectors.create
También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.
Para obtener más información sobre el rol de Editor de Kafka Connector administrado, consulta Roles predefinidos de Managed Service for Apache Kafka.
Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster de Connect se encuentra en un proyecto diferente, consulta Crea un clúster de Connect en un proyecto diferente.
Otorga permisos para escribir en el bucket de Cloud Storage
La cuenta de servicio del clúster de Connect, que sigue el formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requiere los siguientes permisos de Cloud Storage:
storage.objects.createstorage.objects.delete
Para ello, otorga el rol de Usuario de objetos de almacenamiento (roles/storage.objectUser) a la cuenta de servicio del clúster de Connect en el proyecto que contiene el bucket de Cloud Storage.
Cómo funciona un conector de receptor de Cloud Storage
Un conector receptor de Cloud Storage extrae datos de uno o más temas de Kafka y escribe esos datos en objetos dentro de un solo bucket de Cloud Storage.
A continuación, se incluye un desglose detallado de cómo el conector Cloud Storage Sink copia los datos:
El conector consume mensajes de uno o más temas de Kafka dentro del clúster de origen.
El conector escribe los datos en el bucket de Cloud Storage de destino que especificaste en la configuración del conector.
El conector formatea los datos a medida que los escribe en el bucket de Cloud Storage haciendo referencia a propiedades específicas en la configuración del conector. De forma predeterminada, los archivos de salida están en formato CSV. Puedes configurar la propiedad
format.output.typepara especificar diferentes formatos de salida, como JSON.El conector también nombra los archivos que se escriben en el bucket de Cloud Storage. Puedes personalizar los nombres de los archivos con las propiedades
file.name.prefixyfile.name.template. Por ejemplo, puedes incluir el nombre del tema de Kafka o las claves de los mensajes en el nombre del archivo.Un registro de Kafka tiene tres componentes: encabezados, claves y valores.
Puedes incluir encabezados en el archivo de salida configurando
format.output.fieldspara que los incluya. Por ejemplo,format.output.fields=value,headersPuedes incluir claves en el archivo de salida configurando
format.output.fieldspara que incluyakey. Por ejemplo,format.output.fields=key,value,headers.Las claves también se pueden usar para agrupar registros si se incluye
keyen la propiedadfile.name.template.
Puedes incluir valores en el archivo de salida de forma predeterminada, ya que
format.output.fieldsse establece envaluede forma predeterminada.El conector escribe los datos convertidos y formateados en el bucket de Cloud Storage especificado.
El conector comprime los archivos almacenados en el bucket de Cloud Storage si configuras la compresión de archivos con la propiedad
file.compression.type.Las configuraciones del convertidor están restringidas por la propiedad
format.output.type.Por ejemplo, cuando
format.output.typese establece encsv, el convertidor de claves debe serorg.apache.kafka.connect.converters.ByteArrayConverteroorg.apache.kafka.connect.storage.StringConverter, y el convertidor de valores debe serorg.apache.kafka.connect.converters.ByteArrayConverter.Cuando
format.output.typese establece enjson, el esquema de clave y valor no se escribe junto con los datos en el archivo de salida, incluso si la propiedadvalue.converter.schemas.enablees verdadera.
La propiedad
tasks.maxcontrola el nivel de paralelismo del conector. Aumentartasks.maxpuede mejorar la capacidad de procesamiento, pero el paralelismo real está limitado por la cantidad de particiones en los temas de Kafka.
Propiedades de un conector de receptor de Cloud Storage
Cuando crees un conector de receptor de Cloud Storage, especifica las siguientes propiedades.
Nombre del conector
Es el nombre o ID del conector. Si necesitas ayuda para asignarle un nombre al recurso, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre es inmutable.
Tipo de complemento del conector
Selecciona Cloud Storage Sink como el tipo de complemento del conector en la consola deGoogle Cloud . Si no usas la interfaz de usuario para configurar el conector, también debes especificar la clase del conector.
Temas
Son los temas de Kafka desde los que el conector consume mensajes.
Puedes especificar uno o más temas, o bien usar una expresión regular para que coincida con varios temas. Por ejemplo, topic.* para hacer coincidir todos los temas que comienzan con "tema". Estos temas deben existir en el clúster de Managed Service para Apache Kafka asociado a tu clúster de Connect.
Bucket de Cloud Storage
Elige o crea el bucket de Cloud Storage en el que se almacenan los datos.
Configuración
En esta sección, puedes especificar propiedades de configuración adicionales y específicas del conector para el conector de Cloud Storage Sink.
Dado que los datos de los temas de Kafka pueden estar en varios formatos, como Avro, JSON o bytes sin procesar, una parte clave de la configuración implica especificar convertidores. Los convertidores traducen los datos del formato que se usa en tus temas de Kafka al formato interno estandarizado de Kafka Connect. Luego, el conector de Cloud Storage Sink toma estos datos internos y los transforma al formato que requiere tu bucket de Cloud Storage antes de escribirlos.
Para obtener información más general sobre el rol de los convertidores en Kafka Connect, los tipos de convertidores admitidos y las opciones de configuración comunes, consulta convertidores.
Estas son algunas configuraciones específicas del conector Cloud Storage Sink:
gcs.credentials.default: Indica si se deben descubrir automáticamente las credenciales de Google Cloud desde el entorno de ejecución. Debe establecerse entrue.gcs.bucket.name: Especifica el nombre del bucket de Cloud Storage en el que se escriben los datos. Se debe establecer.file.compression.type: Establece el tipo de compresión para los archivos almacenados en el bucket de Cloud Storage. Algunos ejemplos songzip,snappy,zstdynone. El valor predeterminado esnone.file.name.prefix: Es el prefijo que se agregará al nombre de cada archivo almacenado en el bucket de Cloud Storage. El valor predeterminado es vacío.format.output.type: Es el tipo de formato de datos que se usa para escribir datos en los archivos de salida de Cloud Storage. Los valores admitidos soncsv,json,jsonlyparquet. El valor predeterminado escsv.
Para obtener una lista de las propiedades de configuración disponibles específicas de este conector, consulta Configuraciones del conector de Cloud Storage Sink.
Crea un conector de receptor de Cloud Storage
Antes de crear un conector, revisa la documentación sobre las propiedades de un conector de receptor de Cloud Storage.
Console
En la consola de Google Cloud , ve a la página Connect Clusters.
Haz clic en el clúster de Connect para el que deseas crear el conector.
Se muestra la página Detalles de conexión del clúster.
Haz clic en Crear conector.
Se muestra la página Crea un conector de Kafka.
Para el nombre del conector, ingresa una cadena.
Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.
En Connector plugin, selecciona Cloud Storage Sink.
Especifica los temas desde los que puedes transmitir datos.
Elige el bucket de almacenamiento para almacenar los datos.
(Opcional) Configura parámetros adicionales en la sección Configuración.
Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.
Haz clic en Crear.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Ejecuta el comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEReemplaza lo siguiente:
CONNECTOR_ID: Es el ID o el nombre del conector. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
LOCATION: Es la ubicación en la que creas el conector. Debe ser la misma ubicación en la que creaste el clúster de Connect.
CONNECT_CLUSTER_ID: Es el ID del clúster de Connect en el que se crea el conector.
CONFIG_FILE: Es la ruta de acceso al archivo de configuración YAML del conector de BigQuery Sink.
A continuación, se muestra un ejemplo de un archivo de configuración para el conector de Cloud Storage Sink:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"Reemplaza lo siguiente:
GMK_TOPIC_ID: Es el ID del tema de Managed Service para Apache Kafka desde el que fluyen los datos hacia el conector receptor de Cloud Storage.
GCS_BUCKET_NAME: Es el nombre del bucket de Cloud Storage que actúa como receptor de la canalización.
GCS_SINK_CONNECTOR_ID: Es el ID o el nombre del conector de Cloud Storage Sink. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
Terraform
Puedes usar un recurso de Terraform para crear un conector.
Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.
Go
Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.
Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Java
Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Python
Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.