Los conectores de origen de Pub/Sub transmiten mensajes de Pub/Sub a Kafka, lo que te permite integrar Pub/Sub con tus aplicaciones y canalizaciones de datos basadas en Kafka.
Los casos de uso de los conectores de fuente de Pub/Sub incluyen los siguientes:
Transferencia de datos en tiempo real Publica datos de servicios en la nube o de otras aplicaciones en Pub/Sub y, luego, replica los datos en Kafka para el procesamiento de transmisiones.
Arquitecturas basadas en eventos Activa el procesamiento basado en Kafka a partir de los mensajes publicados en Pub/Sub.
El conector lee mensajes de una suscripción a Pub/Sub, convierte cada mensaje en un registro de Kafka y escribe los registros en un tema de Kafka. De forma predeterminada, el conector crea registros de Kafka de la siguiente manera:
- La clave de registro de Kafka es
null. - El valor del registro de Kafka son los datos del mensaje de Pub/Sub como bytes.
- Los encabezados de registro de Kafka están vacíos.
Sin embargo, puedes configurar este comportamiento. Para obtener más información, consulta Configura el conector.
Antes de comenzar
Antes de crear un conector de fuente de Pub/Sub, asegúrate de tener lo siguiente:
Un tema de Pub/Sub con una suscripción
Un tema de Kafka dentro del clúster de Kafka.
Un clúster de Connect Cuando crees el clúster de Connect, configura el clúster de Kafka principal como el clúster de Managed Service para Apache Kafka.
Roles y permisos requeridos
Para obtener los permisos que necesitas
para crear un conector,
pídele a tu administrador que te otorgue el rol de IAM
Editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Este rol predefinido contiene los permisos necesarios para crear un conector. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:
Permisos necesarios
Se requieren los siguientes permisos para crear un conector:
-
Crea un conector:
managedkafka.connectors.create
También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.
Otorga permisos para leer desde Pub/Sub
La cuenta de servicio de Kafka administrado debe tener permiso para leer mensajes de la suscripción a Pub/Sub. Otorga los siguientes roles de IAM a la cuenta de servicio en el proyecto que contiene la suscripción a Pub/Sub:
- Suscriptor de Pub/Sub (
roles/pubsub.subscriber) - Visualizador de Pub/Sub (
roles/pubsub.viewer)
La cuenta de servicio de Kafka administrado tiene el siguiente formato: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com, en el que PROJECT_NUMBER es el número del proyecto del clúster de Connect.
Si tu clúster de Connect se encuentra en un proyecto diferente del clúster de Managed Service para Apache Kafka, consulta cómo crear un clúster de Connect en otro proyecto.
Crea un conector de fuente de Pub/Sub
Console
En la consola de Google Cloud , ve a la página Connect Clusters.
Haz clic en el clúster de Connect en el que deseas crear el conector.
Haz clic en Crear conector.
Para el nombre del conector, ingresa una cadena.
Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Managed Service para Apache Kafka.
En Complemento del conector, selecciona Fuente de Pub/Sub.
En la lista Suscripción a Cloud Pub/Sub, selecciona una suscripción a Pub/Sub. El conector extrae mensajes de esta suscripción. La suscripción se muestra como un nombre de recurso completo:
projects/{project}/subscriptions/{subscription}.En la lista Tema de Kafka, selecciona el tema de Kafka en el que se escriben los mensajes.
Opcional: En el cuadro Configurations, agrega propiedades de configuración o edita las propiedades predeterminadas. Para obtener más información, consulta Configura el conector.
Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.
Haz clic en Crear.
gcloud
Ejecuta el comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEReemplaza lo siguiente:
CONNECTOR_ID: Es el ID o el nombre del conector. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Managed Service para Apache Kafka. El nombre de un conector es inmutable.
LOCATION: Es la ubicación del clúster de Connect.
CONNECT_CLUSTER_ID: Es el ID del clúster de Connect en el que se crea el conector.
CONFIG_FILE: Es la ruta de acceso a un archivo de configuración YAML o JSON.
A continuación, se muestra un ejemplo de un archivo de configuración:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Reemplaza lo siguiente:
PROJECT_ID: Es el ID del proyecto Google Clouden el que reside la suscripción a Pub/Sub.
PUBSUB_SUBSCRIPTION_ID: Es el ID de la suscripción a Pub/Sub desde la que se extraerán los datos.
KAFKA_TOPIC_ID: Es el ID del tema de Kafka en el que se escriben los datos.
Se requieren las propiedades de configuración cps.project, cps.subscription y kafka.topic. Para obtener más opciones de configuración, consulta Cómo configurar el conector.
Terraform
Puedes usar un recurso de Terraform para crear un conector.
Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.
Go
Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka para Go.
Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Java
Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service para Apache Kafka.
Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Python
Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service para Apache Kafka.
Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.
Configura el conector
En esta sección, se describen algunas propiedades de configuración que puedes establecer en el conector.
Para obtener una lista completa de las propiedades específicas de este conector, consulta Configuraciones del conector de Pub/Sub Source.
Modo de extracción
El modo de extracción especifica cómo el conector recupera los mensajes de Pub/Sub. Se admiten los siguientes modos:
Modo de extracción (predeterminado) Los mensajes se recuperan en lotes. Para habilitar este modo, establece
cps.streamingPull.enabled=false.. Para configurar el tamaño del lote, establece la propiedadcps.maxBatchSize.Para obtener más información sobre el modo de extracción, consulta la API de extracción.
Modo de extracción de transmisión. Permite la capacidad de procesamiento máxima y la latencia más baja cuando se recuperan mensajes de Pub/Sub. Para habilitar este modo, establece
cps.streamingPull.enabled=true.Para obtener más información sobre el modo de extracción de transmisión, consulta la API de StreamingPull.
Si la extracción de transmisión está habilitada, puedes ajustar el rendimiento configurando las siguientes propiedades de configuración:
cps.streamingPull.flowControlBytes: Es la cantidad máxima de bytes de mensajes pendientes por tarea.cps.streamingPull.flowControlMessages: Es la cantidad máxima de mensajes pendientes por tarea.cps.streamingPull.maxAckExtensionMs: Es la cantidad máxima de tiempo que el conector extiende la fecha límite de suscripción, en milisegundos.cps.streamingPull.maxMsPerAckExtension: Es la cantidad máxima de tiempo que el conector extiende el plazo de suscripción por extensión, en milisegundos.cps.streamingPull.parallelStreams: Es la cantidad de transmisiones de las que se extraerán mensajes de la suscripción.
Endpoint de Pub/Sub
De forma predeterminada, el conector usa el extremo global de Pub/Sub. Para especificar un extremo, establece la propiedad cps.endpoint en la dirección del extremo.
Para obtener más información sobre los extremos, consulta Extremos de Pub/Sub.
Particiones de Kafka
De forma predeterminada, el conector escribe en una sola partición del tema. Para especificar la cantidad de particiones en las que escribe el conector, configura la propiedad kafka.partition.count. El valor no debe superar el recuento de particiones del tema.
Para especificar cómo el conector asigna mensajes a las particiones, configura la propiedad kafka.partition.scheme. Para obtener más información, consulta Configuraciones del conector de fuente de Pub/Sub.
Usuarios que generan conversiones
Establece el convertidor de claves en org.apache.kafka.connect.storage.StringConverter.
Según la configuración del conector, establece el convertidor de valores en uno de los siguientes:
org.apache.kafka.connect.converters.ByteArrayConverterorg.apache.kafka.connect.json.JsonConverter
Para obtener más información, consulta Valor del registro.
Conversión de mensajes
El conector de origen de Pub/Sub convierte los mensajes de Pub/Sub en registros de Kafka. En las siguientes secciones, se describe el proceso de conversión.
Clave de registro
El convertidor de claves debe ser org.apache.kafka.connect.storage.StringConverter.
De forma predeterminada, las claves de registro son
null.Para usar un atributo de mensaje de Pub/Sub como clave, establece
kafka.key.attributeen el nombre del atributo. Por ejemplo,kafka.key.attribute=username.Para usar la clave de ordenamiento de Pub/Sub como clave, establece
kafka.key.attribute=orderingKey.
Encabezados de registros
De forma predeterminada, los encabezados de registro están vacíos.
Si kafka.record.headers es true, los atributos del mensaje de Pub/Sub se escriben como encabezados de registro. Para incluir la clave de ordenamiento, configura cps.makeOrderingKeyAttribute=true.
Valor del registro
Los valores de registro se escriben como arrays de bytes o como tipos struct.
Valores de registros de matrices de bytes
Si kafka.record.headers es true o el mensaje de Pub/Sub no tiene atributos personalizados, el conector escribe los datos del mensaje como un array de bytes.
Establece el convertidor de valores en org.apache.kafka.connect.converters.ByteArrayConverter.
Valores de registros de Struct
Si kafka.record.headers es false y el mensaje tiene al menos un atributo personalizado, el conector escribe el valor del registro como struct. Establece el convertidor de valores en org.apache.kafka.connect.json.JsonConverter.
El objeto struct contiene los siguientes campos:
message: Son los datos del mensaje de Pub/Sub, en bytes.Un campo para cada atributo del mensaje de Pub/Sub. Para incluir la clave de ordenamiento, configura
cps.makeOrderingKeyAttribute=true.
Por ejemplo, si el mensaje tiene un atributo username, el valor del registro se verá de la siguiente manera:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Si value.converter.schemas.enable es true, struct incluye la carga útil y el esquema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}