Pub/Sub-Senken-Connector erstellen

Pub/Sub-Senken-Connectors streamen Nachrichten aus Kafka-Themen in Pub/Sub-Themen. So können Sie Ihre Kafka-basierten Anwendungen in Pub/Sub einbinden und ereignisgesteuerte Architekturen und Echtzeitdatenverarbeitung ermöglichen.

Hinweise

Bevor Sie einen Pub/Sub-Sink-Connector erstellen, benötigen Sie Folgendes:

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für das Projekt mit dem Connect-Cluster zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Pub/Sub-Sink-Connectors benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierten Rollen enthalten die Berechtigungen, die zum Erstellen eines Pub/Sub-Sink-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Pub/Sub-Senken-Connectors erforderlich:

  • Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster: managedkafka.connectors.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Connect-Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.

Berechtigungen zum Veröffentlichen im Pub/Sub-Thema gewähren

Das Dienstkonto des Connect-Clusters, das dem Format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com entspricht, benötigt die Berechtigung zum Veröffentlichen von Nachrichten im Pub/Sub-Thema. Weisen Sie dazu dem Dienstkonto des Connect-Clusters im Projekt, das das Pub/Sub-Thema enthält, die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) zu.

Funktionsweise eines Pub/Sub-Senken-Connectors

Ein Pub/Sub-Senken-Connector ruft Nachrichten aus einem oder mehreren Kafka-Themen ab und veröffentlicht sie in einem Pub/Sub-Thema.

So kopiert der Pub/Sub-Senken-Connector Daten:

  • Der Connector nutzt Nachrichten aus einem oder mehreren Kafka-Themen im Quellcluster.

  • Der Connector schreibt Nachrichten in die Ziel-Pub/Sub-Themen-ID, die mit der Konfigurationseigenschaft cps.topic angegeben wird. Dies ist ein erforderliches Attribut.

  • Für den Connector muss auch das Google Cloud -Projekt, das das Pub/Sub-Thema enthält, mit der Konfigurationseigenschaft cps.project angegeben werden. Dies ist ein erforderliches Attribut.

  • Der Connector kann optional auch einen benutzerdefinierten Pub/Sub-Endpunkt verwenden, der mit der Property cps.endpoint angegeben wird. Der Standardendpunkt ist "pubsub.googleapis.com:443".

  • Zur Leistungsoptimierung werden Nachrichten vom Connector gepuffert, bevor sie in Pub/Sub veröffentlicht werden. Sie können maxBufferSize, maxBufferBytes, maxDelayThresholdMs, maxOutstandingRequestBytes und maxOutstandingMessages konfigurieren, um die Pufferung zu steuern.

  • Ein Kafka-Datensatz besteht aus drei Komponenten: Headern, Schlüsseln und Werten. Der Connector verwendet Schlüssel- und Wertkonverter, um die Kafka-Nachrichtendaten in das von Pub/Sub erwartete Format zu transformieren. Wenn Sie Schemas für Struktur- oder Kartenwerte verwenden, gibt die messageBodyName-Eigenschaft das Feld oder den Schlüssel an, der als Pub/Sub-Nachrichtentext verwendet werden soll.

  • Der Connector kann das Kafka-Thema, die Partition, den Offset und den Zeitstempel als Nachrichtenattribute einbeziehen, indem er die metadata.publish-Property auf true setzt.

  • Der Connector kann Kafka-Nachrichtenkopfzeilen als Pub/Sub-Nachrichtenattribute einfügen, indem die Property headers.publish auf true gesetzt wird.

  • Der Connector kann einen Sortierschlüssel für Pub/Sub-Nachrichten mit der Eigenschaft orderingKeySource einfügen. Mögliche Werte sind "none" (Standard), "key" und "partition".

  • Mit der Eigenschaft tasks.max wird die Parallelität des Connectors gesteuert. Durch Erhöhen von tasks.max kann der Durchsatz verbessert werden. Die tatsächliche Parallelität wird jedoch durch die Anzahl der Partitionen in den Kafka-Themen begrenzt.

Eigenschaften eines Pub/Sub-Senken-Connectors

Wenn Sie einen Pub/Sub-Senken-Connector erstellen, müssen Sie die folgenden Attribute angeben.

Connector-Name

Ein eindeutiger Name für den Connector im Connect-Cluster. Hinweise zum Benennen von Ressourcen finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource.

Typ des Connector-Plug‑ins

Wählen Sie Pub/Sub Sink als Connector-Plug-in-Typ aus. Dadurch wird die Richtung des Datenflusses von Kafka nach Pub/Sub und die verwendete Connector-Implementierung bestimmt. Wenn Sie den Connector nicht über die Benutzeroberfläche konfigurieren, müssen Sie auch die Connector-Klasse angeben.

Kafka-Themen

Die Kafka-Themen, aus denen der Connector Nachrichten abruft. Sie können ein oder mehrere Themen angeben oder einen regulären Ausdruck verwenden, um mehrere Themen abzugleichen. Beispiel: topic.* für alle Themen, die mit „topic“ beginnen. Diese Themen müssen im Managed Service for Apache Kafka-Cluster vorhanden sein, der Ihrem Connect-Cluster zugeordnet ist.

Pub/Sub-Thema

Das vorhandene Pub/Sub-Thema, in dem der Connector Nachrichten veröffentlicht. Das Dienstkonto des Connect-Clusters muss die Rolle roles/pubsub.publisher für das Projekt des Themas haben, wie unter Vorbereitung beschrieben.

Konfiguration

In diesem Abschnitt können Sie zusätzliche, connectorspezifische Konfigurationseigenschaften angeben.

Da Daten in Kafka-Themen in verschiedenen Formaten wie Avro, JSON oder Rohbytes vorliegen können, ist die Angabe von Konvertern ein wichtiger Teil der Konfiguration. Mit Converters werden Daten aus dem Format, das in Ihren Kafka-Themen verwendet wird, in das standardisierte interne Format von Kafka Connect übersetzt. Der Pub/Sub-Senken-Connector nimmt dann diese internen Daten und wandelt sie in das von Pub/Sub benötigte Format um, bevor er sie schreibt.

Allgemeinere Informationen zur Rolle von Konvertern in Kafka Connect, zu unterstützten Konvertertypen und zu gängigen Konfigurationsoptionen finden Sie unter Konverter.

Hier sind einige Konfigurationen, die speziell für den Pub/Sub-Senken-Connector gelten:

  • cps.project: Gibt die Google Cloud Projekt-ID an, die das Pub/Sub-Thema enthält.

  • cps.topic: Gibt das Pub/Sub-Thema an, in dem Daten veröffentlicht werden.

  • cps.endpoint: Gibt den zu verwendenden Pub/Sub-Endpunkt an.

Eine Liste der für diesen Connector verfügbaren Konfigurationseigenschaften finden Sie unter Pub/Sub Sink Connector-Konfigurationen.

Pub/Sub-Senken-Connector erstellen

Bevor Sie einen Connector erstellen, lesen Sie die Dokumentation zu Eigenschaften eines Pub/Sub-Senken-Connectors.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, für den Sie den Connector erstellen möchten.

    Die Seite Clusterdetails verbinden wird angezeigt.

  3. Klicken Sie auf Connector erstellen.

    Die Seite Kafka-Connector erstellen wird angezeigt.

  4. Geben Sie für den Connectornamen einen String ein.

    Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.

  5. Wählen Sie als Connector-Plug-in die Option Pub/Sub-Senke aus.

  6. Wählen Sie unter Themen entweder Liste mit Kafka-Themen auswählen oder Themen-Regex verwenden aus. Wählen Sie dann die Kafka-Themen aus, aus denen dieser Connector Nachrichten abruft, oder geben Sie sie ein. Diese Themen befinden sich in Ihrem verknüpften Kafka-Cluster.

  7. Wählen Sie unter Cloud Pub/Sub-Thema auswählen das Pub/Sub-Thema aus, in dem dieser Connector Nachrichten veröffentlicht. Das Thema wird im Format des vollständigen Ressourcennamens angezeigt: projects/{project}/topics/{topic}.

  8. Optional: Konfigurieren Sie zusätzliche Einstellungen im Bereich Konfigurationen. Hier geben Sie Properties wie tasks.max, key.converter und value.converter an, wie im vorherigen Abschnitt beschrieben.

  9. Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.

  10. Klicken Sie auf Erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka connectors create aus:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.

    • CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.

    • CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.

    Hier ist ein Beispiel für eine Konfigurationsdatei für den Pub/Sub-Sink-Connector:

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    Ersetzen Sie Folgendes:

    • CPS_SINK_CONNECTOR_ID: Die ID oder der Name des Pub/Sub Sink-Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • GMK_TOPIC_ID: Die ID des Managed Service for Apache Kafka-Themas, aus dem Daten vom Pub/Sub-Senken-Connector gelesen werden.

    • CPS_TOPIC_ID: Die ID des Pub/Sub-Themas, in dem Daten veröffentlicht werden.

    • GCP_PROJECT_ID: Die ID des Google Cloud-Projekts, in dem sich Ihr Pub/Sub-Thema befindet.

  3. Terraform

    Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.