Crea un connettore sink Pub/Sub

I connettori di sink Pub/Sub trasmettono i messaggi dagli argomenti Kafka agli argomenti Pub/Sub. In questo modo puoi integrare le tue applicazioni basate su Kafka con Pub/Sub, facilitando le architetture basate su eventi e l'elaborazione dei dati in tempo reale.

Prima di iniziare

Prima di creare un connettore di sink Pub/Sub, assicurati di disporre di quanto segue:

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un connettore sink Pub/Sub, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto contenente il cluster Connect:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questi ruoli predefiniti contengono le autorizzazioni necessarie per creare un connettore sink Pub/Sub. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un connettore di sink Pub/Sub sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per creare un connettore nel cluster di connessione principale: managedkafka.connectors.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per saperne di più sul ruolo Editor connettore Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.

Se il cluster Managed Service per Apache Kafka si trova nello stesso progetto del cluster di connessione, non sono necessarie ulteriori autorizzazioni. Se il cluster Connect si trova in un progetto diverso, consulta Crea un cluster Connect in un progetto diverso.

Concedi le autorizzazioni per pubblicare nell'argomento Pub/Sub

Il account di servizio del cluster Connect, che segue il formato service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, richiede l'autorizzazione per pubblicare messaggi nell'argomento Pub/Sub. A questo scopo, concedi il ruolo Pub/Sub Publisher (roles/pubsub.publisher) al account di servizio del cluster Connect nel progetto contenente l'argomento Pub/Sub.

Come funziona un connettore di sink Pub/Sub

Un connettore di sink Pub/Sub recupera i messaggi da uno o più argomenti Kafka e li pubblica in un argomento Pub/Sub.

Ecco una suddivisione dettagliata di come il connettore di sink Pub/Sub copia i dati:

  • Il connettore utilizza i messaggi di uno o più argomenti Kafka all'interno del cluster di origine.

  • Il connettore scrive i messaggi nell'ID argomento Pub/Sub di destinazione specificato utilizzando la proprietà di configurazione cps.topic. Questa è una proprietà obbligatoria.

  • Il connettore richiede anche che il progetto Google Cloud contenente l'argomento Pub/Sub venga specificato utilizzando la proprietà di configurazione cps.project. Questa è una proprietà obbligatoria.

  • Il connettore può anche utilizzare facoltativamente un endpoint Pub/Sub personalizzato specificato utilizzando la proprietà cps.endpoint. L'endpoint predefinito è "pubsub.googleapis.com:443".

  • Per ottimizzare il rendimento, il connettore memorizza nel buffer i messaggi prima di pubblicarli su Pub/Sub. Puoi configurare maxBufferSize, maxBufferBytes, maxDelayThresholdMs, maxOutstandingRequestBytes e maxOutstandingMessages per controllare il buffering.

  • Un record Kafka è composto da tre componenti: intestazioni, chiavi e valori. Il connettore utilizza i convertitori di chiavi e valori per trasformare i dati dei messaggi Kafka nel formato previsto da Pub/Sub. Quando utilizzi schemi di valori struct o map, la proprietà messageBodyName specifica il campo o la chiave da utilizzare come corpo del messaggio Pub/Sub.

  • Il connettore può includere l'argomento, la partizione, l'offset e il timestamp Kafka come attributi del messaggio utilizzando la proprietà metadata.publish impostata su true.

  • Il connettore può includere le intestazioni dei messaggi Kafka come attributi dei messaggi Pub/Sub utilizzando la proprietà headers.publish impostata su true.

  • Il connettore può includere una chiave di ordinamento per i messaggi Pub/Sub utilizzando la proprietà orderingKeySource. Le opzioni per il suo valore includono "none" (predefinito), "key" e "partition".

  • La proprietà tasks.max controlla il livello di parallelismo per il connettore. L'aumento di tasks.max può migliorare la velocità effettiva, ma il parallelismo effettivo è limitato dal numero di partizioni negli argomenti Kafka.

Proprietà di un connettore di sink Pub/Sub

Quando crei un connettore di sink Pub/Sub, devi specificare le seguenti proprietà.

Nome connettore

Un nome univoco per il connettore all'interno del cluster di connessione. Per le linee guida per l'assegnazione dei nomi alle risorse, consulta le linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.

Tipo di plug-in del connettore

Seleziona Sink Pub/Sub come tipo di plug-in del connettore. Questo determina la direzione del flusso di dati, ovvero da Kafka a Pub/Sub, e l'implementazione specifica del connettore utilizzata. Se non utilizzi l'interfaccia utente per configurare il connettore, devi specificare anche la classe del connettore.

Argomenti Kafka

Gli argomenti Kafka da cui il connettore utilizza i messaggi. Puoi specificare uno o più argomenti oppure utilizzare un'espressione regolare per corrispondere a più argomenti. Ad esempio, topic.* per trovare tutti gli argomenti che iniziano con "topic". Questi argomenti devono esistere all'interno del cluster Managed Service per Apache Kafka associato al tuo cluster di connessione.

Argomento Pub/Sub

L'argomento Pub/Sub esistente a cui il connettore pubblica i messaggi. Assicurati che il account di servizio del cluster Connect abbia il ruolo roles/pubsub.publisher nel progetto dell'argomento, come descritto in Prima di iniziare.

Configurazione

Questa sezione ti consente di specificare proprietà di configurazione aggiuntive specifiche del connettore.

Poiché i dati negli argomenti Kafka possono essere in vari formati come Avro, JSON o byte non elaborati, una parte fondamentale della configurazione consiste nello specificare i convertitori. I convertitori traducono i dati dal formato utilizzato negli argomenti Kafka nel formato interno standardizzato di Kafka Connect. Il connettore sink Pub/Sub prende questi dati interni e li trasforma nel formato richiesto da Pub/Sub prima di scriverli.

Per informazioni più generali sul ruolo dei convertitori in Kafka Connect, sui tipi di convertitori supportati e sulle opzioni di configurazione comuni, consulta la sezione Convertitori.

Ecco alcune configurazioni specifiche per il connettore di sink Pub/Sub:

  • cps.project: specifica l' Google Cloud ID progetto che contiene l'argomento Pub/Sub.

  • cps.topic: specifica l'argomento Pub/Sub a cui vengono pubblicati i dati.

  • cps.endpoint: specifica l'endpoint Pub/Sub da utilizzare.

Per un elenco delle proprietà di configurazione disponibili specifiche per questo connettore, consulta la sezione Configurazioni del connettore sink Pub/Sub.

Crea un connettore di sink Pub/Sub

Prima di creare un connettore, consulta la documentazione relativa alle proprietà di un connettore di sink Pub/Sub.

Console

  1. Nella console Google Cloud , vai alla pagina Connetti cluster.

    Vai a Connetti cluster

  2. Fai clic sul cluster di connessione per il quale vuoi creare il connettore.

    Viene visualizzata la pagina Dettagli cluster di connessione.

  3. Fai clic su Crea connettore.

    Viene visualizzata la pagina Crea connettore Kafka.

  4. Per il nome del connettore, inserisci una stringa.

    Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka.

  5. Per Plug-in connettore, seleziona Sink Pub/Sub.

  6. In Argomenti, scegli Seleziona un elenco di argomenti Kafka o Utilizza un'espressione regolare per gli argomenti. Dopodiché, seleziona o inserisci gli argomenti Kafka da cui questo connettore consuma i messaggi. Questi argomenti si trovano nel tuo cluster Kafka associato.

  7. In Seleziona un argomento Cloud Pub/Sub, scegli l'argomento Pub/Sub a cui questo connettore pubblica i messaggi. L'argomento viene visualizzato nel formato del nome completo della risorsa: projects/{project}/topics/{topic}.

  8. (Facoltativo) Configura impostazioni aggiuntive nella sezione Configurazioni. È qui che devi specificare proprietà come tasks.max, key.converter e value.converter, come descritto nella sezione precedente.

  9. Seleziona la policy di riavvio attività. Per saperne di più, consulta le norme sul riavvio delle attività.

  10. Fai clic su Crea.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka connectors create:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Sostituisci quanto segue:

    • CONNECTOR_ID: L'ID o il nome del connettore. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • LOCATION: la località in cui crei il connettore. Deve essere la stessa località in cui hai creato il cluster di connessione.

    • CONNECT_CLUSTER_ID: l'ID del cluster Connect in cui viene creato il connettore.

    • CONFIG_FILE: il percorso del file di configurazione YAML per il connettore BigQuery Sink.

    Ecco un esempio di file di configurazione per il connettore di sink Pub/Sub:

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    Sostituisci quanto segue:

    • CPS_SINK_CONNECTOR_ID: l'ID o il nome del connettore Pub/Sub Sink. Per le linee guida su come assegnare un nome a un connettore, consulta Linee guida per assegnare un nome a una risorsa Managed Service per Apache Kafka. Il nome di un connettore è immutabile.

    • GMK_TOPIC_ID: l'ID dell'argomento Managed Service per Apache Kafka da cui vengono letti i dati dal connettore di sink Pub/Sub.

    • CPS_TOPIC_ID: l'ID dell'argomento Pub/Sub a cui vengono pubblicati i dati.

    • GCP_PROJECT_ID: l'ID del progetto Google Cloud in cui si trova l'argomento Pub/Sub.

  3. Terraform

    Puoi utilizzare una risorsa Terraform per creare un connettore.

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    Per scoprire come applicare o rimuovere una configurazione Terraform, consulta Comandi Terraform di base.

    Go

    Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Go di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione(ADC). Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Installare le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Managed Service per Apache Kafka.

    Per eseguire l'autenticazione in Managed Service per Apache Kafka, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configura ADC per un ambiente di sviluppo locale.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Dopo aver creato un connettore, puoi modificarlo, eliminarlo, metterlo in pausa, arrestarlo o riavviarlo.

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.