Crea un conector de Pub/Sub Source

Los conectores de origen de Pub/Sub transmiten mensajes de Pub/Sub a Kafka, lo que te permite integrar Pub/Sub con tus aplicaciones y canalizaciones de datos basadas en Kafka.

Los casos de uso de los conectores de fuente de Pub/Sub incluyen los siguientes:

  • Transferencia de datos en tiempo real Publica datos de servicios en la nube o de otras aplicaciones en Pub/Sub y, luego, replica los datos en Kafka para el procesamiento de transmisiones.

  • Arquitecturas basadas en eventos Activa el procesamiento basado en Kafka a partir de los mensajes publicados en Pub/Sub.

El conector lee mensajes de una suscripción a Pub/Sub, convierte cada mensaje en un registro de Kafka y escribe los registros en un tema de Kafka. De forma predeterminada, el conector crea registros de Kafka de la siguiente manera:

  • La clave de registro de Kafka es null.
  • El valor del registro de Kafka son los datos del mensaje de Pub/Sub como bytes.
  • Los encabezados de registro de Kafka están vacíos.

Sin embargo, puedes configurar este comportamiento. Para obtener más información, consulta Configura el conector.

Antes de comenzar

Antes de crear un conector de fuente de Pub/Sub, asegúrate de tener lo siguiente:

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear un conector, pídele a tu administrador que te otorgue el rol de IAM Editor de conectores de Kafka administrados (roles/managedkafka.connectorEditor) en tu proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear un conector. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear un conector:

  • Crea un conector: managedkafka.connectors.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Otorga permisos para leer desde Pub/Sub

La cuenta de servicio de Kafka administrado debe tener permiso para leer mensajes de la suscripción a Pub/Sub. Otorga los siguientes roles de IAM a la cuenta de servicio en el proyecto que contiene la suscripción a Pub/Sub:

  • Suscriptor de Pub/Sub (roles/pubsub.subscriber)
  • Visualizador de Pub/Sub (roles/pubsub.viewer)

La cuenta de servicio de Kafka administrado tiene el siguiente formato: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com, en el que PROJECT_NUMBER es el número del proyecto del clúster de Connect.

Si tu clúster de Connect se encuentra en un proyecto diferente del clúster de Managed Service para Apache Kafka, consulta cómo crear un clúster de Connect en otro proyecto.

Crea un conector de fuente de Pub/Sub

Console

  1. En la consola de Google Cloud , ve a la página Connect Clusters.

    Ir a Connect Clusters

  2. Haz clic en el clúster de Connect en el que deseas crear el conector.

  3. Haz clic en Crear conector.

  4. Para el nombre del conector, ingresa una cadena.

    Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Managed Service para Apache Kafka.

  5. En Complemento del conector, selecciona Fuente de Pub/Sub.

  6. En la lista Suscripción a Cloud Pub/Sub, selecciona una suscripción a Pub/Sub. El conector extrae mensajes de esta suscripción. La suscripción se muestra como un nombre de recurso completo: projects/{project}/subscriptions/{subscription}.

  7. En la lista Tema de Kafka, selecciona el tema de Kafka en el que se escriben los mensajes.

  8. Opcional: En el cuadro Configurations, agrega propiedades de configuración o edita las propiedades predeterminadas. Para obtener más información, consulta Configura el conector.

  9. Selecciona la Política de reinicio de tareas. Para obtener más información, consulta la política de reinicio de tareas.

  10. Haz clic en Crear.

gcloud

  1. Ejecuta el comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Reemplaza lo siguiente:

    • CONNECTOR_ID: Es el ID o el nombre del conector. Si necesitas ayuda para asignarle un nombre a un conector, consulta los Lineamientos para asignarles nombres a los recursos de Managed Service para Apache Kafka. El nombre de un conector es inmutable.

    • LOCATION: Es la ubicación del clúster de Connect.

    • CONNECT_CLUSTER_ID: Es el ID del clúster de Connect en el que se crea el conector.

    • CONFIG_FILE: Es la ruta de acceso a un archivo de configuración YAML o JSON.

A continuación, se muestra un ejemplo de un archivo de configuración:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto Google Clouden el que reside la suscripción a Pub/Sub.

  • PUBSUB_SUBSCRIPTION_ID: Es el ID de la suscripción a Pub/Sub desde la que se extraerán los datos.

  • KAFKA_TOPIC_ID: Es el ID del tema de Kafka en el que se escriben los datos.

Se requieren las propiedades de configuración cps.project, cps.subscription y kafka.topic. Para obtener más opciones de configuración, consulta Cómo configurar el conector.

Terraform

Puedes usar un recurso de Terraform para crear un conector.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Si deseas obtener más información para aplicar o quitar una configuración de Terraform, consulta los comandos básicos de Terraform.

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Managed Service para Apache Kafka para Go.

Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación(ADC). Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java en Instala las bibliotecas cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Managed Service para Apache Kafka.

Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python en Instala las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Managed Service para Apache Kafka.

Para autenticarte en Managed Service para Apache Kafka, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta Configura ADC para un entorno de desarrollo local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Después de crear un conector, puedes editarlo, borrarlo, detenerlo, pausarlo o reiniciarlo.

Configura el conector

En esta sección, se describen algunas propiedades de configuración que puedes establecer en el conector.

Para obtener una lista completa de las propiedades específicas de este conector, consulta Configuraciones del conector de Pub/Sub Source.

Modo de extracción

El modo de extracción especifica cómo el conector recupera los mensajes de Pub/Sub. Se admiten los siguientes modos:

  • Modo de extracción (predeterminado) Los mensajes se recuperan en lotes. Para habilitar este modo, establece cps.streamingPull.enabled=false.. Para configurar el tamaño del lote, establece la propiedad cps.maxBatchSize.

    Para obtener más información sobre el modo de extracción, consulta la API de extracción.

  • Modo de extracción de transmisión. Permite la capacidad de procesamiento máxima y la latencia más baja cuando se recuperan mensajes de Pub/Sub. Para habilitar este modo, establece cps.streamingPull.enabled=true.

    Para obtener más información sobre el modo de extracción de transmisión, consulta la API de StreamingPull.

    Si la extracción de transmisión está habilitada, puedes ajustar el rendimiento configurando las siguientes propiedades de configuración:

    • cps.streamingPull.flowControlBytes: Es la cantidad máxima de bytes de mensajes pendientes por tarea.
    • cps.streamingPull.flowControlMessages: Es la cantidad máxima de mensajes pendientes por tarea.
    • cps.streamingPull.maxAckExtensionMs: Es la cantidad máxima de tiempo que el conector extiende la fecha límite de suscripción, en milisegundos.
    • cps.streamingPull.maxMsPerAckExtension: Es la cantidad máxima de tiempo que el conector extiende el plazo de suscripción por extensión, en milisegundos.
    • cps.streamingPull.parallelStreams: Es la cantidad de transmisiones de las que se extraerán mensajes de la suscripción.

Endpoint de Pub/Sub

De forma predeterminada, el conector usa el extremo global de Pub/Sub. Para especificar un extremo, establece la propiedad cps.endpoint en la dirección del extremo. Para obtener más información sobre los extremos, consulta Extremos de Pub/Sub.

Particiones de Kafka

De forma predeterminada, el conector escribe en una sola partición del tema. Para especificar la cantidad de particiones en las que escribe el conector, configura la propiedad kafka.partition.count. El valor no debe superar el recuento de particiones del tema.

Para especificar cómo el conector asigna mensajes a las particiones, configura la propiedad kafka.partition.scheme. Para obtener más información, consulta Configuraciones del conector de fuente de Pub/Sub.

Usuarios que generan conversiones

Establece el convertidor de claves en org.apache.kafka.connect.storage.StringConverter.

Según la configuración del conector, establece el convertidor de valores en uno de los siguientes:

  • org.apache.kafka.connect.converters.ByteArrayConverter
  • org.apache.kafka.connect.json.JsonConverter

Para obtener más información, consulta Valor del registro.

Conversión de mensajes

El conector de origen de Pub/Sub convierte los mensajes de Pub/Sub en registros de Kafka. En las siguientes secciones, se describe el proceso de conversión.

Clave de registro

El convertidor de claves debe ser org.apache.kafka.connect.storage.StringConverter.

  • De forma predeterminada, las claves de registro son null.

  • Para usar un atributo de mensaje de Pub/Sub como clave, establece kafka.key.attribute en el nombre del atributo. Por ejemplo, kafka.key.attribute=username.

  • Para usar la clave de ordenamiento de Pub/Sub como clave, establece kafka.key.attribute=orderingKey.

Encabezados de registros

De forma predeterminada, los encabezados de registro están vacíos.

Si kafka.record.headers es true, los atributos del mensaje de Pub/Sub se escriben como encabezados de registro. Para incluir la clave de ordenamiento, configura cps.makeOrderingKeyAttribute=true.

Valor del registro

Los valores de registro se escriben como arrays de bytes o como tipos struct.

Valores de registros de matrices de bytes

Si kafka.record.headers es true o el mensaje de Pub/Sub no tiene atributos personalizados, el conector escribe los datos del mensaje como un array de bytes. Establece el convertidor de valores en org.apache.kafka.connect.converters.ByteArrayConverter.

Valores de registros de Struct

Si kafka.record.headers es false y el mensaje tiene al menos un atributo personalizado, el conector escribe el valor del registro como struct. Establece el convertidor de valores en org.apache.kafka.connect.json.JsonConverter.

El objeto struct contiene los siguientes campos:

  • message: Son los datos del mensaje de Pub/Sub, en bytes.

  • Un campo para cada atributo del mensaje de Pub/Sub. Para incluir la clave de ordenamiento, configura cps.makeOrderingKeyAttribute=true.

Por ejemplo, si el mensaje tiene un atributo username, el valor del registro se verá de la siguiente manera:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Si value.converter.schemas.enable es true, struct incluye la carga útil y el esquema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Próximos pasos

Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.