Crea un conector de Pub/Sub Source

Los conectores de origen de Pub/Sub transmiten mensajes de Pub/Sub a Kafka. Esto te permite integrar Pub/Sub con tus aplicaciones y canalizaciones de datos basadas en Kafka.

El conector lee mensajes de una suscripción a Pub/Sub, convierte cada mensaje en un registro de Kafka y escribe los registros en un tema de Kafka. De forma predeterminada, el conector crea registros de Kafka de la siguiente manera:

  • La clave de registro de Kafka es null.
  • El valor del registro de Kafka son los datos del mensaje de Pub/Sub como bytes.
  • Los encabezados de registro de Kafka están vacíos.

Sin embargo, puedes configurar este comportamiento. Para obtener más información, consulta Configura el conector.

Antes de comenzar

Antes de crear un conector de fuente de Pub/Sub, asegúrate de tener lo siguiente:

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear un conector de Pub/Sub Source, pídele a tu administrador que te otorgue el rol de IAM de editor de Managed Kafka Connector (roles/managedkafka.connectorEditor) en el proyecto que contiene el clúster de Connect. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear un conector de Pub/Sub Source. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un conector de fuente de Pub/Sub:

  • Otorga el permiso para crear un conector en el clúster de Connect principal: managedkafka.connectors.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Para obtener más información sobre el rol de editor de conectores de Kafka administrados, consulta Roles predefinidos de Managed Service for Apache Kafka.

Si tu clúster de Servicio administrado para Apache Kafka se encuentra en el mismo proyecto que el clúster de Connect, no se requieren más permisos. Si el clúster de Connect se encuentra en un proyecto diferente, consulta Crea un clúster de Connect en un proyecto diferente.

Otorga permisos para leer desde Pub/Sub

La cuenta de servicio de Kafka administrado debe tener permiso para leer mensajes de la suscripción a Pub/Sub. Otorga los siguientes roles de IAM a la cuenta de servicio en el proyecto que contiene la suscripción a Pub/Sub:

  • Suscriptor de Pub/Sub (roles/pubsub.subscriber)
  • Visualizador de Pub/Sub (roles/pubsub.viewer)

La cuenta de servicio de Kafka administrado tiene el siguiente formato: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com. Reemplaza PROJECT_NUMBER por el número del proyecto.

Crea un conector de fuente de Pub/Sub

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect en el que deseas crear el conector.

  3. Haz clic en Crear conector.

  4. Para el nombre del conector, ingresa una cadena.

    Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Servicio administrado para Apache Kafka.

  5. En Complemento del conector, selecciona Fuente de Pub/Sub.

  6. En la lista Suscripción a Cloud Pub/Sub, selecciona una suscripción a Pub/Sub. El conector extrae mensajes de esta suscripción. La suscripción se muestra como un nombre de recurso completo: projects/{project}/subscriptions/{subscription}.

  7. En la lista Tema de Kafka, selecciona el tema de Kafka en el que se escriben los mensajes.

  8. Opcional: En el cuadro Configurations, agrega propiedades de configuración o edita las propiedades predeterminadas. Para obtener más información, consulta Configura el conector.

  9. Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.

  10. Haz clic en Crear.

gcloud

  1. Ejecuta el comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Reemplaza lo siguiente:

A continuación, se muestra un ejemplo de un archivo de configuración:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto Google Clouden el que reside la suscripción a Pub/Sub.

  • PUBSUB_SUBSCRIPTION_ID: Es el ID de la suscripción a Pub/Sub desde la que se extraerán los datos.

  • KAFKA_TOPIC_ID: Es el ID del tema de Kafka en el que se escriben los datos.

Se requieren las propiedades de configuración cps.project, cps.subscription y kafka.topic. Para obtener más opciones de configuración, consulta Cómo configurar el conector.

Terraform

Puedes usar un recurso de Terraform para crear un conector.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka en Go.

Para autenticarte en Managed Service for Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service for Apache Kafka.

Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service for Apache Kafka.

Para autenticarte en el servicio administrado para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.

Configura el conector

En esta sección, se describen algunas propiedades de configuración que puedes establecer en el conector.

Para obtener una lista completa de las propiedades específicas de este conector, consulta Configuraciones del conector de Pub/Sub Source.

Modo de extracción

El modo de extracción especifica cómo el conector recupera los mensajes de Pub/Sub. Se admiten los siguientes modos:

  • Modo de extracción (predeterminado) Los mensajes se recuperan en lotes. Para habilitar este modo, establece cps.streamingPull.enabled=false.. Para configurar el tamaño del lote, establece la propiedad cps.maxBatchSize.

    Para obtener más información sobre el modo de extracción, consulta la API de extracción.

  • Modo de extracción de transmisión. Permite obtener el máximo rendimiento y la latencia más baja cuando se recuperan mensajes de Pub/Sub. Para habilitar este modo, establece cps.streamingPull.enabled=true.

    Para obtener más información sobre el modo de extracción de transmisión, consulta la API de StreamingPull.

    Si la extracción de transmisión está habilitada, puedes ajustar el rendimiento configurando las siguientes propiedades de configuración:

    • cps.streamingPull.flowControlBytes: Es la cantidad máxima de bytes de mensajes pendientes por tarea.
    • cps.streamingPull.flowControlMessages: Es la cantidad máxima de mensajes pendientes por tarea.
    • cps.streamingPull.maxAckExtensionMs: Es la cantidad máxima de tiempo que el conector extiende la fecha límite de suscripción, en milisegundos.
    • cps.streamingPull.maxMsPerAckExtension: Es la cantidad máxima de tiempo que el conector extiende el plazo de suscripción por extensión, en milisegundos.
    • cps.streamingPull.parallelStreams: Es la cantidad de transmisiones de las que se extraerán mensajes de la suscripción.

Endpoint de Pub/Sub

De forma predeterminada, el conector usa el extremo global de Pub/Sub. Para especificar un extremo, establece la propiedad cps.endpoint en la dirección del extremo. Para obtener más información sobre los extremos, consulta Extremos de Pub/Sub.

Registros de Kafka

El conector de origen de Pub/Sub convierte los mensajes de Pub/Sub en registros de Kafka. En las siguientes secciones, se describe el proceso de conversión.

Clave de registro

El convertidor de claves debe ser org.apache.kafka.connect.storage.StringConverter.

  • De forma predeterminada, las claves de registro son null.

  • Para usar un atributo del mensaje de Pub/Sub como clave, establece kafka.key.attribute en el nombre del atributo. Por ejemplo, kafka.key.attribute=username.

  • Para usar la clave de ordenamiento de Pub/Sub como clave, establece kafka.key.attribute=orderingKey.

Encabezados de registros

De forma predeterminada, los encabezados de registro están vacíos.

Si kafka.record.headers es true, los atributos del mensaje de Pub/Sub se escriben como encabezados de registro. Para incluir la clave de ordenamiento, configura cps.makeOrderingKeyAttribute=true.

Valor del registro

Si kafka.record.headers es true o el mensaje de Pub/Sub no tiene atributos personalizados, el valor del registro son los datos del mensaje, como un array de bytes. Establece el convertidor de valores en org.apache.kafka.connect.converters.ByteArrayConverter.

De lo contrario, si kafka.record.headers es false y el mensaje tiene al menos un atributo personalizado, el conector escribe el valor del registro como struct. Establece el convertidor de valores en org.apache.kafka.connect.json.JsonConverter.

struct contiene los siguientes campos:

  • message: Son los datos del mensaje de Pub/Sub, en bytes.

  • Un campo para cada atributo del mensaje de Pub/Sub. Para incluir la clave de ordenamiento, configura cps.makeOrderingKeyAttribute=true.

Por ejemplo, supongamos que el mensaje tiene un atributo username:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Si value.converter.schemas.enable es true, struct incluye la carga útil y el esquema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Particiones de Kafka

De forma predeterminada, el conector escribe en una sola partición del tema. Para especificar la cantidad de particiones en las que escribe el conector, configura la propiedad kafka.partition.count. El valor no debe superar el recuento de particiones del tema.

Para especificar cómo el conector asigna mensajes a las particiones, configura la propiedad kafka.partition.scheme. Para obtener más información, consulta Configuraciones del conector de fuente de Pub/Sub.

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.