Los conectores de origen de Pub/Sub transmiten mensajes de Pub/Sub a Kafka. Esto te permite integrar Pub/Sub con tus aplicaciones y canalizaciones de datos basadas en Kafka.
El conector lee mensajes de una suscripción a Pub/Sub, convierte cada mensaje en un registro de Kafka y escribe los registros en un tema de Kafka. De forma predeterminada, el conector crea registros de Kafka de la siguiente manera:
- La clave de registro de Kafka es
null. - El valor del registro de Kafka son los datos del mensaje de Pub/Sub como bytes.
- Los encabezados de registro de Kafka están vacíos.
Sin embargo, puedes configurar este comportamiento. Para obtener más información, consulta Configura el conector.
Antes de comenzar
Antes de crear un conector de fuente de Pub/Sub, asegúrate de tener lo siguiente:
Un tema de Pub/Sub con una suscripción
Un tema de Kafka dentro del clúster de Kafka.
Un clúster de Connect Cuando crees el clúster de Connect, configura el clúster de Managed Service para Apache Kafka como el clúster principal de Kafka.
Roles y permisos requeridos
Para obtener los permisos que
necesitas para crear un conector de Pub/Sub Source,
pídele a tu administrador que te otorgue el
rol de IAM de editor de Managed Kafka Connector (roles/managedkafka.connectorEditor)
en el proyecto que contiene el clúster de Connect.
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Este rol predefinido contiene los permisos necesarios para crear un conector de Pub/Sub Source. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:
Permisos necesarios
Se requieren los siguientes permisos para crear un conector de fuente de Pub/Sub:
-
Otorga el permiso para crear un conector en el clúster de Connect principal:
managedkafka.connectors.create
También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.
Para obtener más información sobre el rol de editor de conectores de Kafka administrados, consulta Roles predefinidos de Managed Service for Apache Kafka.
Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster de Connect se encuentra en un proyecto diferente, consulta Crea un clúster de Connect en un proyecto diferente.
Otorga permisos para leer desde Pub/Sub
La cuenta de servicio de Kafka administrado debe tener permiso para leer mensajes de la suscripción a Pub/Sub. Otorga los siguientes roles de IAM a la cuenta de servicio en el proyecto que contiene la suscripción a Pub/Sub:
- Suscriptor de Pub/Sub (
roles/pubsub.subscriber) - Visualizador de Pub/Sub (
roles/pubsub.viewer)
La cuenta de servicio de Kafka administrado tiene el siguiente formato:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com.
Reemplaza PROJECT_NUMBER por el número del proyecto.
Crea un conector de fuente de Pub/Sub
Console
En la consola de Google Cloud , ve a la página Connect Clusters.
Haz clic en el clúster de Connect en el que deseas crear el conector.
Haz clic en Crear conector.
Para el nombre del conector, ingresa una cadena.
Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.
En Complemento del conector, selecciona Fuente de Pub/Sub.
En la lista Suscripción a Cloud Pub/Sub, selecciona una suscripción a Pub/Sub. El conector extrae mensajes de esta suscripción. La suscripción se muestra como un nombre de recurso completo:
projects/{project}/subscriptions/{subscription}.En la lista Tema de Kafka, selecciona el tema de Kafka en el que se escriben los mensajes.
Opcional: En el cuadro Configurations, agrega propiedades de configuración o edita las propiedades predeterminadas. Para obtener más información, consulta Configura el conector.
Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.
Haz clic en Crear.
gcloud
Ejecuta el comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEReemplaza lo siguiente:
CONNECTOR_ID: Es el ID o el nombre del conector. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
LOCATION: Es la ubicación del clúster de Connect.
CONNECT_CLUSTER_ID: Es el ID del clúster de Connect en el que se crea el conector.
CONFIG_FILE: Es la ruta de acceso a un archivo de configuración YAML o JSON.
A continuación, se muestra un ejemplo de un archivo de configuración:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Reemplaza lo siguiente:
PROJECT_ID: Es el ID del proyecto Google Clouden el que reside la suscripción a Pub/Sub.
PUBSUB_SUBSCRIPTION_ID: Es el ID de la suscripción a Pub/Sub desde la que se extraerán los datos.
KAFKA_TOPIC_ID: Es el ID del tema de Kafka en el que se escriben los datos.
Se requieren las propiedades de configuración cps.project, cps.subscription y kafka.topic. Para obtener más opciones de configuración, consulta Cómo configurar el conector.
Terraform
Puedes usar un recurso de Terraform para crear un conector.
Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.
Go
Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.
Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Java
Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Python
Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.
Configura el conector
En esta sección, se describen algunas propiedades de configuración que puedes establecer en el conector.
Para obtener una lista completa de las propiedades específicas de este conector, consulta Configuraciones del conector de Pub/Sub Source.
Modo de extracción
El modo de extracción especifica cómo el conector recupera los mensajes de Pub/Sub. Se admiten los siguientes modos:
Modo de extracción (predeterminado) Los mensajes se recuperan en lotes. Para habilitar este modo, establece
cps.streamingPull.enabled=false.. Para configurar el tamaño del lote, establece la propiedadcps.maxBatchSize.Para obtener más información sobre el modo de extracción, consulta la API de extracción.
Modo de extracción de transmisión. Permite obtener el máximo rendimiento y la latencia más baja cuando se recuperan mensajes de Pub/Sub. Para habilitar este modo, establece
cps.streamingPull.enabled=true.Para obtener más información sobre el modo de extracción de transmisión, consulta la API de StreamingPull.
Si la extracción de transmisión está habilitada, puedes ajustar el rendimiento configurando las siguientes propiedades de configuración:
cps.streamingPull.flowControlBytes: Es la cantidad máxima de bytes de mensajes pendientes por tarea.cps.streamingPull.flowControlMessages: Es la cantidad máxima de mensajes pendientes por tarea.cps.streamingPull.maxAckExtensionMs: Es la cantidad máxima de tiempo que el conector extiende la fecha límite de suscripción, en milisegundos.cps.streamingPull.maxMsPerAckExtension: Es la cantidad máxima de tiempo que el conector extiende el plazo de suscripción por extensión, en milisegundos.cps.streamingPull.parallelStreams: Es la cantidad de transmisiones de las que se extraerán mensajes de la suscripción.
Endpoint de Pub/Sub
De forma predeterminada, el conector usa el extremo global de Pub/Sub. Para especificar un extremo, establece la propiedad cps.endpoint en la dirección del extremo.
Para obtener más información sobre los extremos, consulta Extremos de Pub/Sub.
Registros de Kafka
El conector de origen de Pub/Sub convierte los mensajes de Pub/Sub en registros de Kafka. En las siguientes secciones, se describe el proceso de conversión.
Clave de registro
El convertidor de claves debe ser org.apache.kafka.connect.storage.StringConverter.
De forma predeterminada, las claves de registro son
null.Para usar un atributo del mensaje de Pub/Sub como clave, establece
kafka.key.attributeen el nombre del atributo. Por ejemplo,kafka.key.attribute=username.Para usar la clave de ordenamiento de Pub/Sub como clave, establece
kafka.key.attribute=orderingKey.
Encabezados de registros
De forma predeterminada, los encabezados de registro están vacíos.
Si kafka.record.headers es true, los atributos del mensaje de Pub/Sub se escriben como encabezados de registro. Para incluir la clave de ordenamiento, configura cps.makeOrderingKeyAttribute=true.
Valor del registro
Si kafka.record.headers es true o el mensaje de Pub/Sub no tiene atributos personalizados, el valor del registro son los datos del mensaje, como un array de bytes.
Establece el convertidor de valores en org.apache.kafka.connect.converters.ByteArrayConverter.
De lo contrario, si kafka.record.headers es false y el mensaje tiene al menos un atributo personalizado, el conector escribe el valor del registro como struct. Establece el convertidor de valores en org.apache.kafka.connect.json.JsonConverter.
struct contiene los siguientes campos:
message: Son los datos del mensaje de Pub/Sub, en bytes.Un campo para cada atributo del mensaje de Pub/Sub. Para incluir la clave de ordenamiento, configura
cps.makeOrderingKeyAttribute=true.
Por ejemplo, supongamos que el mensaje tiene un atributo username:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Si value.converter.schemas.enable es true, struct incluye la carga útil y el esquema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Particiones de Kafka
De forma predeterminada, el conector escribe en una sola partición del tema. Para especificar la cantidad de particiones en las que escribe el conector, configura la propiedad kafka.partition.count. El valor no debe superar el recuento de particiones del tema.
Para especificar cómo el conector asigna mensajes a las particiones, configura la propiedad kafka.partition.scheme. Para obtener más información, consulta Configuraciones del conector de fuente de Pub/Sub.