Criar um conector de origem do Pub/Sub

Os conectores de origem do Pub/Sub transmitem mensagens do Pub/Sub para o Kafka, o que permite integrar o Pub/Sub aos aplicativos e pipelines de dados baseados no Kafka.

Os casos de uso dos conectores de origem do Pub/Sub incluem o seguinte:

  • Ingestão de dados em tempo real. Publique dados de serviços de nuvem ou outros aplicativos no Pub/Sub e replique os dados no Kafka para processamento de stream.

  • Arquiteturas orientadas a eventos. Acione o processamento baseado no Kafka a partir de mensagens publicadas no Pub/Sub.

O conector lê mensagens de uma assinatura do Pub/Sub, converte cada mensagem em um registro do Kafka e grava os registros em um tópico do Kafka. Por padrão, o conector cria registros do Kafka da seguinte maneira:

  • A chave de registro do Kafka é null.
  • O valor do registro do Kafka são os dados da mensagem do Pub/Sub como bytes.
  • Os cabeçalhos de registro do Kafka estão vazios.

No entanto, é possível configurar esse comportamento. Para mais informações, consulte Configurar o conector.

Antes de começar

Antes de criar um conector de origem do Pub/Sub, verifique se você tem o seguinte:

Papéis e permissões necessárias

Para ter as permissões necessárias para criar um conector, peça ao administrador para conceder a você o papel do IAM de Editor de conector do Kafka gerenciado (roles/managedkafka.connectorEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar um conector. Para acessar as permissões exatas que são necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar um conector:

  • Criar um conector: managedkafka.connectors.create

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Conceder permissões para ler do Pub/Sub

A conta de serviço do Kafka gerenciado precisa ter permissão para ler mensagens da assinatura do Pub/Sub. Conceda os seguintes papéis do IAM à conta de serviço no projeto que contém a assinatura do Pub/Sub:

  • Assinante do Pub/Sub (roles/pubsub.subscriber)
  • Leitor do Pub/Sub (roles/pubsub.viewer)

A conta de serviço do Kafka gerenciado tem o seguinte formato: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com, em que PROJECT_NUMBER é o número do projeto do cluster do Connect.

Se o cluster do Connect estiver em um projeto diferente do cluster do Serviço Gerenciado para Apache Kafka cluster, consulte Criar um cluster do Connect em um projeto diferente.

Criar um conector de origem do Pub/Sub

Console

  1. Noconsole, acesse a página Clusters do Connect. Google Cloud

    Acessar clusters do Connect

  2. Clique no cluster do Connect em que você quer criar o conector.

  3. Clique em Criar conector.

  4. Para o nome do conector, insira uma string.

    Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço Gerenciado para Apache Kafka.

  5. Em Plug-in do conector, selecione Origem do Pub/Sub.

  6. Na lista Assinatura do Cloud Pub/Sub, selecione uma assinatura do Pub/Sub. O conector extrai mensagens dessa assinatura. A assinatura é exibida como um nome de recurso completo: projects/{project}/subscriptions/{subscription}.

  7. Na lista Tópico do Kafka, selecione o tópico do Kafka em que as mensagens são gravadas.

  8. Opcional: na caixa Configurações, adicione propriedades de configuração ou edite as propriedades padrão. Para mais informações, consulte Configurar o conector.

  9. Selecione a Política de reinicialização da tarefa. Para mais informações, consulte Política de reinicialização da tarefa.

  10. Clique em Criar.

gcloud

  1. Execute o gcloud managed-kafka connectors create comando:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Substitua:

    • CONNECTOR_ID: o ID ou nome do conector. Para conferir as diretrizes de nomeação de um conector, consulte Diretrizes de nomeação de um recurso do Serviço Gerenciado para Apache Kafka. O nome de um conector não pode ser modificado.

    • LOCATION: o local do cluster do Connect.

    • CONNECT_CLUSTER_ID: o ID do cluster do Connect em que o conector é criado.

    • CONFIG_FILE: o caminho para um arquivo de configuração YAML ou JSON.

Veja um exemplo de um arquivo de configuração:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Substitua:

  • PROJECT_ID: o ID do Google Cloud projeto em que a assinatura do Pub/Sub reside.

  • PUBSUB_SUBSCRIPTION_ID: o ID da assinatura do Pub/Sub para extrair dados.

  • KAFKA_TOPIC_ID: o ID do tópico do Kafka em que os dados são gravados.

As propriedades de configuração cps.project, cps.subscription e kafka.topic são obrigatórias. Para outras opções de configuração, consulte Configurar o conector.

Terraform

É possível usar um recurso do Terraform para criar um conector.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Para saber como aplicar ou remover uma configuração do Terraform, consulte Comandos básicos do Terraform.

Go

Antes de testar este exemplo, siga as instruções de configuração do Go em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Serviço Gerenciado para Apache Kafka.

Para autenticar o Serviço Gerenciado para Apache Kafka, configure o Application Default Credentials(ADC). Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Antes de testar este exemplo, siga as instruções de configuração do Java em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Java do Serviço Gerenciado para Apache Kafka.

Para autenticar o Serviço Gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Antes de testar este exemplo, siga as instruções de configuração do Python em Instalar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Python do Serviço Gerenciado para Apache Kafka.

Para autenticar o Serviço Gerenciado para Apache Kafka, configure o Application Default Credentials. Para mais informações, consulte Configurar o ADC para um ambiente de desenvolvimento local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Depois de criar um conector, é possível editar, excluir, pausar, interromper ou reiniciar o conector.

Configurar o conector

Esta seção descreve algumas propriedades de configuração que podem ser definidas no conector.

Para uma lista completa das propriedades específicas desse conector, consulte Configurações do conector de origem do Pub/Sub.

Modo pull

O modo pull especifica como o conector recupera mensagens do Pub/Sub. Os seguintes modos são compatíveis:

  • Modo pull (padrão). As mensagens são extraídas em lotes. Para ativar esse modo, defina cps.streamingPull.enabled=false. Para configurar o tamanho do lote, defina a propriedade cps.maxBatchSize.

    Para mais informações sobre o modo pull, consulte API Pull.

  • Modo Streaming Pull. Ativa a capacidade de processamento máxima e a latência mais baixa ao recuperar mensagens do Pub/Sub. Para ativar esse modo, defina cps.streamingPull.enabled=true.

    Para mais informações sobre o modo Streaming Pull, consulte API StreamingPull.

    Se o Streaming Pull estiver ativado, é possível ajustar a performance definindo as seguintes propriedades de configuração:

    • cps.streamingPull.flowControlBytes: o número máximo de bytes de mensagens pendentes por tarefa.
    • cps.streamingPull.flowControlMessages: o número máximo de mensagens pendentes por tarefa.
    • cps.streamingPull.maxAckExtensionMs: o tempo máximo que o conector estende o prazo de assinatura, em milissegundos.
    • cps.streamingPull.maxMsPerAckExtension: o tempo máximo que o conector estende o prazo de assinatura por extensão, em milissegundos.
    • cps.streamingPull.parallelStreams: o número de streams para extrair mensagens da assinatura.

Endpoint do Pub/Sub

Por padrão, o conector usa o endpoint global do Pub/Sub. Para especificar um endpoint, defina a propriedade cps.endpoint como o endereço do endpoint. Para mais informações sobre endpoints, consulte Endpoints do Pub/Sub.

Partições do Kafka

Por padrão, o conector grava em uma única partição no tópico. Para especificar em quantas partições o conector grava, defina a propriedade kafka.partition.count. O valor não pode exceder a contagem de partições do tópico .

Para especificar como o conector atribui mensagens a partições, defina a propriedade kafka.partition.scheme. Para mais informações, consulte Configurações do conector de origem do Pub/Sub.

Conversões

Defina o conversor de chaves como org.apache.kafka.connect.storage.StringConverter.

Dependendo da configuração do conector, defina o conversor de valores como um dos seguintes:

  • org.apache.kafka.connect.converters.ByteArrayConverter
  • org.apache.kafka.connect.json.JsonConverter

Para mais informações, consulte Valor do registro.

Conversão de mensagem

O conector de origem do Pub/Sub converte mensagens do Pub/Sub em registros do Kafka. As seções a seguir descrevem o processo de conversão.

Chave de registro

O conversor de chaves precisa ser org.apache.kafka.connect.storage.StringConverter.

  • Por padrão, as chaves de registro são null.

  • Para usar um atributo de mensagem do Pub/Sub como chave, defina kafka.key.attribute como o nome do atributo. Por exemplo, kafka.key.attribute=username.

  • Para usar a chave de ordem do Pub/Sub como chave, defina kafka.key.attribute=orderingKey.

Cabeçalhos de registro

Por padrão, os cabeçalhos de registro estão vazios.

Se kafka.record.headers for true, os atributos de mensagem do Pub/Sub serão gravados como cabeçalhos de registro. Para incluir a chave de ordem, defina cps.makeOrderingKeyAttribute=true.

Valor do registro

Os valores de registro são gravados como matrizes de bytes ou como tipos struct.

Valores de registro de matriz de bytes

Se kafka.record.headers for true ou se a mensagem do Pub/Sub não tiver atributos personalizados, o conector gravará os dados da mensagem como uma matriz de bytes. Defina o conversor de valores como org.apache.kafka.connect.converters.ByteArrayConverter.

Valores de registro de struct

Se kafka.record.headers for false e a mensagem tiver pelo menos um atributo personalizado, o conector gravará o valor do registro como um struct. Defina o conversor de valores como org.apache.kafka.connect.json.JsonConverter.

O struct contém os seguintes campos:

  • message: os dados da mensagem do Pub/Sub, como bytes.

  • Um campo para cada atributo de mensagem do Pub/Sub. Para incluir a chave de ordem, defina cps.makeOrderingKeyAttribute=true.

Por exemplo, se a mensagem tiver um atributo username, o valor do registro será semelhante ao seguinte:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Se value.converter.schemas.enable for true, o struct incluirá a carga útil e o esquema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.