Crea un conector de Pub/Sub Sink

Los conectores receptores de Pub/Sub transmiten mensajes de temas de Kafka a temas de Pub/Sub. Esto te permite integrar tus aplicaciones basadas en Kafka con Pub/Sub, lo que facilita las arquitecturas basadas en eventos y el procesamiento de datos en tiempo real.

Antes de comenzar

Antes de crear un conector de receptor de Pub/Sub, asegúrate de tener lo siguiente:

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear un conector de Pub/Sub Sink, pídele a tu administrador que te otorgue los siguientes roles de IAM en el proyecto que contiene el clúster de Connect:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Estos roles predefinidos contienen los permisos necesarios para crear un conector de Pub/Sub Sink. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un conector de receptor de Pub/Sub:

  • Otorga el permiso para crear un conector en el clúster de Connect principal: managedkafka.connectors.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de Editor de Kafka Connector administrado, consulta Roles predefinidos de Managed Service for Apache Kafka.

Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster de Connect se encuentra en un proyecto diferente, consulta Crea un clúster de Connect en un proyecto diferente.

Otorga permisos para publicar en el tema de Pub/Sub

La cuenta de servicio del clúster de Connect, que sigue el formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requiere permiso para publicar mensajes en el tema de Pub/Sub. Para ello, otorga el rol de publicador de Pub/Sub (roles/pubsub.publisher) a la cuenta de servicio del clúster de Connect en el proyecto que contiene el tema de Pub/Sub.

Cómo funciona un conector de receptor de Pub/Sub

Un conector receptor de Pub/Sub extrae mensajes de uno o más temas de Kafka y los publica en un tema de Pub/Sub.

A continuación, se muestra un desglose detallado de cómo el conector de receptor de Pub/Sub copia los datos:

  • El conector consume mensajes de uno o más temas de Kafka dentro del clúster de origen.

  • El conector escribe mensajes en el ID del tema de Pub/Sub de destino que se especifica con la propiedad de configuración cps.topic. Esta es una propiedad obligatoria.

  • El conector también requiere que se especifique el proyecto Google Cloud que contiene el tema de Pub/Sub con la propiedad de configuración cps.project. Esta es una propiedad obligatoria.

  • De manera opcional, el conector también puede usar un extremo de Pub/Sub personalizado que se especifica con la propiedad cps.endpoint. El extremo predeterminado es "pubsub.googleapis.com:443".

  • Para optimizar el rendimiento, el conector almacena en búfer los mensajes antes de publicarlos en Pub/Sub. Puedes configurar maxBufferSize, maxBufferBytes, maxDelayThresholdMs, maxOutstandingRequestBytes y maxOutstandingMessages para controlar el almacenamiento en búfer.

  • Un registro de Kafka tiene tres componentes: encabezados, claves y valores. El conector usa convertidores de claves y valores para transformar los datos de los mensajes de Kafka al formato que espera Pub/Sub. Cuando se usan esquemas de valores de mapa o struct, la propiedad messageBodyName especifica el campo o la clave que se usará como cuerpo del mensaje de Pub/Sub.

  • El conector puede incluir el tema, la partición, el desplazamiento y la marca de tiempo de Kafka como atributos del mensaje si se usa la propiedad metadata.publish configurada como true.

  • El conector puede incluir encabezados de mensajes de Kafka como atributos de mensajes de Pub/Sub con la propiedad headers.publish establecida en true.

  • El conector puede incluir una clave de ordenamiento para los mensajes de Pub/Sub con la propiedad orderingKeySource. Las opciones para su valor incluyen "none" (predeterminado), "key" y "partition".

  • La propiedad tasks.max controla el nivel de paralelismo del conector. Aumentar tasks.max puede mejorar el rendimiento, pero el paralelismo real está limitado por la cantidad de particiones en los temas de Kafka.

Propiedades de un conector de receptor de Pub/Sub

Cuando creas un conector de receptor de Pub/Sub, debes especificar las siguientes propiedades.

Nombre del conector

Es un nombre único para el conector dentro del clúster de Connect. Si necesitas ayuda para asignarles nombres a los recursos, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.

Tipo de complemento del conector

Selecciona Pub/Sub Sink como el tipo de complemento del conector. Esto determina la dirección del flujo de datos, que va de Kafka a Pub/Sub, y la implementación específica del conector que se usa. Si no usas la interfaz de usuario para configurar el conector, también debes especificar la clase del conector.

Temas de Kafka

Son los temas de Kafka desde los que el conector consume mensajes. Puedes especificar uno o más temas, o bien usar una expresión regular para que coincida con varios temas. Por ejemplo, topic.* para hacer coincidir todos los temas que comienzan con "tema". Estos temas deben existir en el clúster de Managed Service para Apache Kafka asociado a tu clúster de Connect.

Tema de Pub/Sub

Es el tema de Pub/Sub existente en el que el conector publica mensajes. Asegúrate de que la cuenta de servicio del clúster de Connect tenga el rol roles/pubsub.publisher en el proyecto del tema, como se describe en Antes de comenzar.

Configuración

En esta sección, puedes especificar propiedades de configuración adicionales y específicas del conector.

Dado que los datos de los temas de Kafka pueden estar en varios formatos, como Avro, JSON o bytes sin procesar, una parte clave de la configuración implica especificar convertidores. Los convertidores traducen los datos del formato que se usa en tus temas de Kafka al formato interno estandarizado de Kafka Connect. Luego, el conector de receptor de Pub/Sub toma estos datos internos y los transforma en el formato que requiere Pub/Sub antes de escribirlos.

Para obtener información más general sobre el rol de los convertidores en Kafka Connect, los tipos de convertidores admitidos y las opciones de configuración comunes, consulta convertidores.

Estas son algunas configuraciones específicas del conector de receptor de Pub/Sub:

  • cps.project: Especifica el Google Cloud ID del proyecto que contiene el tema de Pub/Sub.

  • cps.topic: Especifica el tema de Pub/Sub en el que se publican los datos.

  • cps.endpoint: Especifica el extremo de Pub/Sub que se usará.

Para obtener una lista de las propiedades de configuración disponibles específicas de este conector, consulta Configuraciones del conector Pub/Sub Sink.

Crea un conector de receptor de Pub/Sub

Antes de crear un conector, revisa la documentación sobre las propiedades de un conector de receptor de Pub/Sub.

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect para el que deseas crear el conector.

    Se muestra la página Detalles de conexión del clúster.

  3. Haz clic en Crear conector.

    Aparecerá la página Crea un conector de Kafka.

  4. Para el nombre del conector, ingresa una cadena.

    Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.

  5. En Complemento del conector, selecciona Receptor de Pub/Sub.

  6. En Temas, elige Seleccionar una lista de temas de Kafka o Usar una expresión regular de tema. Luego, selecciona o ingresa los temas de Kafka desde los que este conector consume mensajes. Estos temas se encuentran en tu clúster de Kafka asociado.

  7. En Selecciona un tema de Cloud Pub/Sub, elige el tema de Pub/Sub en el que este conector publica mensajes. El tema se muestra en el formato de nombre de recurso completo: projects/{project}/topics/{topic}.

  8. (Opcional) Configura parámetros adicionales en la sección Configurations. Aquí es donde especificarías propiedades como tasks.max, key.converter y value.converter, como se explicó en la sección anterior.

  9. Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.

  10. Haz clic en Crear.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Reemplaza lo siguiente:

    A continuación, se muestra un ejemplo de un archivo de configuración para el conector Pub/Sub Sink:

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    Reemplaza lo siguiente:

    • CPS_SINK_CONNECTOR_ID: Es el ID o el nombre del conector de Pub/Sub Sink. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka. El nombre de un conector es inmutable.

    • GMK_TOPIC_ID: Es el ID del tema de Managed Service for Apache Kafka desde el que el conector receptor de Pub/Sub lee los datos.

    • CPS_TOPIC_ID: Es el ID del tema de Pub/Sub en el que se publican los datos.

    • GCP_PROJECT_ID: Es el ID del proyecto Google Clouden el que reside tu tema de Pub/Sub.

  3. Terraform

    Puedes usar un recurso de Terraform para crear un conector.

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

    Go

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

    Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

    Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.