Crea un connettore di origine Pub/Sub

I connettori di origine Pub/Sub trasmettono i messaggi da Pub/Sub a Kafka. In questo modo puoi integrare Pub/Sub con le tue applicazioni basate su Kafka e pipeline di dati.

Il connettore legge i messaggi da una sottoscrizione Pub/Sub, converte ogni messaggio in un record Kafka e scrive i record in un argomento Kafka. Per impostazione predefinita, il connettore crea record Kafka nel seguente modo:

  • La chiave di registrazione Kafka è null.
  • Il valore del record Kafka sono i dati del messaggio Pub/Sub in byte.
  • Le intestazioni del record Kafka sono vuote.

Tuttavia, puoi configurare questo comportamento. Per saperne di più, consulta Configura il connettore.

Prima di iniziare

Prima di creare un connettore di origine Pub/Sub, assicurati di disporre di quanto segue:

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un connettore di origine Pub/Sub, chiedi all'amministratore di concederti il ruolo IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) nel progetto contenente il cluster Connect. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un connettore di origine Pub/Sub. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un connettore di origine Pub/Sub sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale: managedkafka.connectors.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per maggiori informazioni sul ruolo Editor connettore Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.

Se il cluster Managed Service per Apache Kafka si trova nello stesso progetto del cluster di connessione, non sono necessarie ulteriori autorizzazioni. Se il cluster Connect si trova in un progetto diverso, consulta Crea un cluster Connect in un progetto diverso.

Concedi le autorizzazioni per leggere da Pub/Sub

Il account di servizio Kafka gestito deve disporre dell'autorizzazione di lettura dei messaggi dalla sottoscrizione Pub/Sub. Concedi i seguenti ruoli IAM al account di servizio nel progetto contenente l'abbonamento Pub/Sub:

  • Pub/Sub Subscriber (roles/pubsub.subscriber)
  • Visualizzatore Pub/Sub (roles/pubsub.viewer)

Il account di servizio Managed Kafka ha il seguente formato: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com. Sostituisci PROJECT_NUMBER con il numero del progetto.

Crea un connettore di origine Pub/Sub

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster di connessione in cui vuoi creare il connettore.

  3. Fai clic su Crea connettore.

  4. Per il nome del connettore, inserisci una stringa.

    Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.

  5. Per Plug-in connettore, seleziona Origine Pub/Sub.

  6. Nell'elenco Sottoscrizione Cloud Pub/Sub, seleziona una sottoscrizione Pub/Sub. Il connettore recupera i messaggi da questo abbonamento. L'abbonamento viene visualizzato come nome completo della risorsa: projects/{project}/subscriptions/{subscription}.

  7. Nell'elenco Argomento Kafka, seleziona l'argomento Kafka in cui vengono scritti i messaggi.

  8. (Facoltativo) Nella casella Configurazioni, aggiungi le proprietà di configurazione o modifica le proprietà predefinite. Per saperne di più, consulta Configura il connettore.

  9. Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.

  10. Fai clic su Crea.

gcloud

  1. Esegui il comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Sostituisci quanto segue:

    • CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • LOCATION: la posizione del cluster Connect.

    • CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.

    • CONFIG_FILE: il percorso di un file di configurazione YAML o JSON.

Ecco un esempio di file di configurazione:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui si trova l'abbonamento Pub/Sub.

  • PUBSUB_SUBSCRIPTION_ID: l'ID dell'abbonamento Pub/Sub da cui estrarre i dati.

  • KAFKA_TOPIC_ID: l'ID dell'argomento Kafka in cui vengono scritti i dati.

Le proprietà di configurazione cps.project, cps.subscription e kafka.topic sono obbligatorie. Per ulteriori opzioni di configurazione, vedi Configurare il connettore.

Terraform

Puoi utilizzare una risorsa Terraform per creare un connettore.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Dopo aver creato un connettore, puoi modificarlo, eliminarlo, metterlo in pausa, arrestarlo o riavviarlo.

Configura il connettore

Questa sezione descrive alcune proprietà di configurazione che puoi impostare sul connettore.

Per un elenco completo delle proprietà specifiche di questo connettore, vedi Configurazioni del connettore di origine Pub/Sub.

Modalità pull

La modalità pull specifica il modo in cui il connettore recupera i messaggi Pub/Sub. Sono supportate le seguenti modalità:

  • Modalità pull (impostazione predefinita). I messaggi vengono recuperati in batch. Per attivare questa modalità, imposta cps.streamingPull.enabled=false.. Per configurare le dimensioni del batch, imposta la proprietà cps.maxBatchSize.

    Per saperne di più sulla modalità pull, consulta API Pull.

  • Modalità di pull dello streaming. Consente la massima velocità effettiva e la latenza più bassa durante il recupero dei messaggi da Pub/Sub. Per attivare questa modalità, imposta cps.streamingPull.enabled=true.

    Per saperne di più sulla modalità pull di streaming, consulta l'API StreamingPull.

    Se il pull dello streaming è attivato, puoi ottimizzare le prestazioni impostando le seguenti proprietà di configurazione:

    • cps.streamingPull.flowControlBytes: il numero massimo di byte di messaggi in attesa per attività.
    • cps.streamingPull.flowControlMessages: il numero massimo di messaggi in attesa per attività.
    • cps.streamingPull.maxAckExtensionMs: il periodo di tempo massimo in cui il connettore estende il termine di iscrizione, in millisecondi.
    • cps.streamingPull.maxMsPerAckExtension: il periodo di tempo massimo per cui il connettore estende la scadenza dell'abbonamento per estensione, in millisecondi.
    • cps.streamingPull.parallelStreams: Il numero di stream da cui estrarre i messaggi dall'abbonamento.

Endpoint Pub/Sub

Per impostazione predefinita, il connettore utilizza l'endpoint Pub/Sub globale. Per specificare un endpoint, imposta la proprietà cps.endpoint sull'indirizzo dell'endpoint. Per saperne di più sugli endpoint, consulta Endpoint Pub/Sub.

Record Kafka

Il connettore di origine Pub/Sub converte i messaggi Pub/Sub in record Kafka. Le sezioni seguenti descrivono il processo di conversione.

Chiave di registrazione

Il convertitore di chiavi deve essere org.apache.kafka.connect.storage.StringConverter.

  • Per impostazione predefinita, le chiavi dei record sono null.

  • Per utilizzare un attributo del messaggio Pub/Sub come chiave, imposta kafka.key.attribute sul nome dell'attributo. Ad esempio kafka.key.attribute=username.

  • Per utilizzare la chiave di ordinamento di Pub/Sub come chiave, imposta kafka.key.attribute=orderingKey.

Intestazioni dei record

Per impostazione predefinita, le intestazioni dei record sono vuote.

Se kafka.record.headers è true, gli attributi del messaggio Pub/Sub vengono scritti come intestazioni del record. Per includere la chiave di ordinamento, imposta cps.makeOrderingKeyAttribute=true.

Valore del record

Se kafka.record.headers è true o il messaggio Pub/Sub non ha attributi personalizzati, il valore del record sono i dati del messaggio, come array di byte. Imposta il convertitore di valori su org.apache.kafka.connect.converters.ByteArrayConverter.

Altrimenti, se kafka.record.headers è false e il messaggio ha almeno un attributo personalizzato, il connettore scrive il valore del record come struct. Imposta il convertitore di valori su org.apache.kafka.connect.json.JsonConverter.

struct contiene i seguenti campi:

  • message: I dati del messaggio Pub/Sub, in byte.

  • Un campo per ogni attributo del messaggio Pub/Sub. Per includere la chiave di ordinamento, imposta cps.makeOrderingKeyAttribute=true.

Ad esempio, supponendo che il messaggio abbia un attributo username:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Se value.converter.schemas.enable è true, struct include sia il payload sia lo schema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Partizioni Kafka

Per impostazione predefinita, il connettore scrive in una singola partizione dell'argomento. Per specificare a quante partizioni scrive il connettore, imposta la proprietà kafka.partition.count. Il valore non deve superare il numero di partizioni dell'argomento.

Per specificare come il connettore assegna i messaggi alle partizioni, imposta la proprietà kafka.partition.scheme. Per ulteriori informazioni, consulta la sezione Configurazioni del connettore di origine Pub/Sub.

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.