Los conectores receptores de Pub/Sub transmiten mensajes de temas de Kafka a temas de Pub/Sub. Esto te permite integrar tus aplicaciones basadas en Kafka con Pub/Sub, lo que facilita las arquitecturas basadas en eventos y el procesamiento de datos en tiempo real.
Antes de comenzar
Antes de crear un conector de receptor de Pub/Sub, asegúrate de tener lo siguiente:
Crea un clúster de Managed Service para Apache Kafka para tu clúster de Connect. Es el clúster principal de Kafka asociado con el clúster de Connect. Este es también el clúster de origen que forma un extremo de la canalización del conector.
Crea un clúster de Connect para alojar tu conector de receptor de Pub/Sub.
Crea y configura un tema de Kafka en el clúster de origen. Los datos se transfieren de este tema de Kafka al tema de Pub/Sub de destino.
Roles y permisos requeridos
Para obtener los permisos que necesitas para crear un conector de Pub/Sub Sink, pídele a tu administrador que te otorgue los siguientes roles de IAM en el proyecto que contiene el clúster de Connect:
-
Editor de conectores de Kafka administrado (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Publicador de Pub/Sub (
roles/pubsub.publisher)
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
Estos roles predefinidos contienen los permisos necesarios para crear un conector de Pub/Sub Sink. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:
Permisos necesarios
Se requieren los siguientes permisos para crear un conector de receptor de Pub/Sub:
-
Otorga el permiso para crear un conector en el clúster de Connect principal:
managedkafka.connectors.create
También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.
Para obtener más información sobre el rol de Editor de Kafka Connector administrado, consulta Roles predefinidos de Managed Service for Apache Kafka.
Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster de Connect se encuentra en un proyecto diferente, consulta Crea un clúster de Connect en un proyecto diferente.
Otorga permisos para publicar en el tema de Pub/Sub
La cuenta de servicio del clúster de Connect, que sigue el formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requiere permiso para publicar mensajes en el tema de Pub/Sub. Para ello, otorga el rol de publicador de Pub/Sub (roles/pubsub.publisher) a la cuenta de servicio del clúster de Connect en el proyecto que contiene el tema de Pub/Sub.
Cómo funciona un conector de receptor de Pub/Sub
Un conector receptor de Pub/Sub extrae mensajes de uno o más temas de Kafka y los publica en un tema de Pub/Sub.
A continuación, se muestra un desglose detallado de cómo el conector de receptor de Pub/Sub copia los datos:
El conector consume mensajes de uno o más temas de Kafka dentro del clúster de origen.
El conector escribe mensajes en el ID del tema de Pub/Sub de destino que se especifica con la propiedad de configuración
cps.topic. Esta es una propiedad obligatoria.El conector también requiere que se especifique el proyecto Google Cloud que contiene el tema de Pub/Sub con la propiedad de configuración
cps.project. Esta es una propiedad obligatoria.De manera opcional, el conector también puede usar un extremo de Pub/Sub personalizado que se especifica con la propiedad
cps.endpoint. El extremo predeterminado es"pubsub.googleapis.com:443".Para optimizar el rendimiento, el conector almacena en búfer los mensajes antes de publicarlos en Pub/Sub. Puedes configurar
maxBufferSize,maxBufferBytes,maxDelayThresholdMs,maxOutstandingRequestBytesymaxOutstandingMessagespara controlar el almacenamiento en búfer.Un registro de Kafka tiene tres componentes: encabezados, claves y valores. El conector usa convertidores de claves y valores para transformar los datos de los mensajes de Kafka al formato que espera Pub/Sub. Cuando se usan esquemas de valores de mapa o struct, la propiedad
messageBodyNameespecifica el campo o la clave que se usará como cuerpo del mensaje de Pub/Sub.El conector puede incluir el tema, la partición, el desplazamiento y la marca de tiempo de Kafka como atributos del mensaje si se usa la propiedad
metadata.publishconfigurada comotrue.El conector puede incluir encabezados de mensajes de Kafka como atributos de mensajes de Pub/Sub con la propiedad
headers.publishestablecida entrue.El conector puede incluir una clave de ordenamiento para los mensajes de Pub/Sub con la propiedad
orderingKeySource. Las opciones para su valor incluyen"none"(predeterminado),"key"y"partition".La propiedad
tasks.maxcontrola el nivel de paralelismo del conector. Aumentartasks.maxpuede mejorar el rendimiento, pero el paralelismo real está limitado por la cantidad de particiones en los temas de Kafka.
Propiedades de un conector de receptor de Pub/Sub
Cuando creas un conector de receptor de Pub/Sub, debes especificar las siguientes propiedades.
Nombre del conector
Es un nombre único para el conector dentro del clúster de Connect. Si necesitas ayuda para asignarles nombres a los recursos, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.
Tipo de complemento del conector
Selecciona Pub/Sub Sink como el tipo de complemento del conector. Esto determina la dirección del flujo de datos, que va de Kafka a Pub/Sub, y la implementación específica del conector que se usa. Si no usas la interfaz de usuario para configurar el conector, también debes especificar la clase del conector.
Temas de Kafka
Son los temas de Kafka desde los que el conector consume mensajes.
Puedes especificar uno o más temas, o bien usar una expresión regular para que coincida con varios temas. Por ejemplo, topic.* para hacer coincidir todos los temas que comienzan con "tema". Estos temas deben existir en el clúster de Managed Service para Apache Kafka asociado a tu clúster de Connect.
Tema de Pub/Sub
Es el tema de Pub/Sub existente en el que el conector publica mensajes. Asegúrate de que la cuenta de servicio del clúster de Connect tenga el rol roles/pubsub.publisher en el proyecto del tema, como se describe en Antes de comenzar.
Configuración
En esta sección, puedes especificar propiedades de configuración adicionales y específicas del conector.
Dado que los datos de los temas de Kafka pueden estar en varios formatos, como Avro, JSON o bytes sin procesar, una parte clave de la configuración implica especificar convertidores. Los convertidores traducen los datos del formato que se usa en tus temas de Kafka al formato interno estandarizado de Kafka Connect. Luego, el conector de receptor de Pub/Sub toma estos datos internos y los transforma en el formato que requiere Pub/Sub antes de escribirlos.
Para obtener información más general sobre el rol de los convertidores en Kafka Connect, los tipos de convertidores admitidos y las opciones de configuración comunes, consulta convertidores.
Estas son algunas configuraciones específicas del conector de receptor de Pub/Sub:
cps.project: Especifica el Google Cloud ID del proyecto que contiene el tema de Pub/Sub.cps.topic: Especifica el tema de Pub/Sub en el que se publican los datos.cps.endpoint: Especifica el extremo de Pub/Sub que se usará.
Para obtener una lista de las propiedades de configuración disponibles específicas de este conector, consulta Configuraciones del conector Pub/Sub Sink.
Crea un conector de receptor de Pub/Sub
Antes de crear un conector, revisa la documentación sobre las propiedades de un conector de receptor de Pub/Sub.
Console
En la consola de Google Cloud , ve a la página Connect Clusters.
Haz clic en el clúster de Connect para el que deseas crear el conector.
Se muestra la página Detalles de conexión del clúster.
Haz clic en Crear conector.
Aparecerá la página Crea un conector de Kafka.
Para el nombre del conector, ingresa una cadena.
Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.
En Complemento del conector, selecciona Receptor de Pub/Sub.
En Temas, elige Seleccionar una lista de temas de Kafka o Usar una expresión regular de tema. Luego, selecciona o ingresa los temas de Kafka desde los que este conector consume mensajes. Estos temas se encuentran en tu clúster de Kafka asociado.
En Selecciona un tema de Cloud Pub/Sub, elige el tema de Pub/Sub en el que este conector publica mensajes. El tema se muestra en el formato de nombre de recurso completo:
projects/{project}/topics/{topic}.(Opcional) Configura parámetros adicionales en la sección Configurations. Aquí es donde especificarías propiedades como
tasks.max,key.converteryvalue.converter, como se explicó en la sección anterior.Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.
Haz clic en Crear.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Ejecuta el comando
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEReemplaza lo siguiente:
CONNECTOR_ID: Es el ID o el nombre del conector. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
LOCATION: Es la ubicación en la que creas el conector. Debe ser la misma ubicación en la que creaste el clúster de Connect.
CONNECT_CLUSTER_ID: Es el ID del clúster de Connect en el que se crea el conector.
CONFIG_FILE: Es la ruta de acceso al archivo de configuración YAML del conector de BigQuery Sink.
A continuación, se muestra un ejemplo de un archivo de configuración para el conector Pub/Sub Sink:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"Reemplaza lo siguiente:
CPS_SINK_CONNECTOR_ID: Es el ID o el nombre del conector de Pub/Sub Sink. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.
GMK_TOPIC_ID: Es el ID del tema de Managed Service for Apache Kafka desde el que el conector receptor de Pub/Sub lee los datos.
CPS_TOPIC_ID: Es el ID del tema de Pub/Sub en el que se publican los datos.
GCP_PROJECT_ID: Es el ID del proyecto Google Clouden el que reside tu tema de Pub/Sub.
Terraform
Puedes usar un recurso de Terraform para crear un conector.
Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.
Go
Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.
Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Java
Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Python
Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.
Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.
Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.