Pub/Sub-Senken-Connectors streamen Nachrichten aus Kafka-Themen in Pub/Sub-Themen. So können Sie Ihre Kafka-basierten Anwendungen in Pub/Sub einbinden und ereignisgesteuerte Architekturen und Echtzeitdatenverarbeitung ermöglichen.
Hinweise
Bevor Sie einen Pub/Sub-Sink-Connector erstellen, benötigen Sie Folgendes:
Erstellen Sie einen Managed Service for Apache Kafka-Cluster für Ihren Connect-Cluster. Dies ist der primäre Kafka-Cluster, der mit dem Connect-Cluster verknüpft ist. Dies ist auch der Quellcluster, der ein Ende der Connector-Pipeline bildet.
Erstellen Sie einen Connect-Cluster, um Ihren Pub/Sub-Senken-Connector zu hosten.
Erstellen und konfigurieren Sie ein Kafka-Thema im Quellcluster. Daten werden von diesem Kafka-Thema zum Ziel-Pub/Sub-Thema verschoben.
Erforderliche Rollen und Berechtigungen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für das Projekt mit dem Connect-Cluster zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Pub/Sub-Sink-Connectors benötigen:
-
Managed Kafka Connector Editor (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Pub/Sub Publisher (
roles/pubsub.publisher)
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Diese vordefinierten Rollen enthalten die Berechtigungen, die zum Erstellen eines Pub/Sub-Sink-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:
Erforderliche Berechtigungen
Die folgenden Berechtigungen sind zum Erstellen eines Pub/Sub-Senken-Connectors erforderlich:
-
Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster:
managedkafka.connectors.create
Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.
Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.
Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Connect-Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.
Berechtigungen zum Veröffentlichen im Pub/Sub-Thema gewähren
Das Dienstkonto des Connect-Clusters, das dem Format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com entspricht, benötigt die Berechtigung zum Veröffentlichen von Nachrichten im Pub/Sub-Thema. Weisen Sie dazu dem Dienstkonto des Connect-Clusters im Projekt, das das Pub/Sub-Thema enthält, die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) zu.
Funktionsweise eines Pub/Sub-Senken-Connectors
Ein Pub/Sub-Senken-Connector ruft Nachrichten aus einem oder mehreren Kafka-Themen ab und veröffentlicht sie in einem Pub/Sub-Thema.
So kopiert der Pub/Sub-Senken-Connector Daten:
Der Connector nutzt Nachrichten aus einem oder mehreren Kafka-Themen im Quellcluster.
Der Connector schreibt Nachrichten in die Ziel-Pub/Sub-Themen-ID, die mit der Konfigurationseigenschaft
cps.topicangegeben wird. Dies ist ein erforderliches Attribut.Für den Connector muss auch das Google Cloud -Projekt, das das Pub/Sub-Thema enthält, mit der Konfigurationseigenschaft
cps.projectangegeben werden. Dies ist ein erforderliches Attribut.Der Connector kann optional auch einen benutzerdefinierten Pub/Sub-Endpunkt verwenden, der mit der Property
cps.endpointangegeben wird. Der Standardendpunkt ist"pubsub.googleapis.com:443".Zur Leistungsoptimierung werden Nachrichten vom Connector gepuffert, bevor sie in Pub/Sub veröffentlicht werden. Sie können
maxBufferSize,maxBufferBytes,maxDelayThresholdMs,maxOutstandingRequestBytesundmaxOutstandingMessageskonfigurieren, um die Pufferung zu steuern.Ein Kafka-Datensatz besteht aus drei Komponenten: Headern, Schlüsseln und Werten. Der Connector verwendet Schlüssel- und Wertkonverter, um die Kafka-Nachrichtendaten in das von Pub/Sub erwartete Format zu transformieren. Wenn Sie Schemas für Struktur- oder Kartenwerte verwenden, gibt die
messageBodyName-Eigenschaft das Feld oder den Schlüssel an, der als Pub/Sub-Nachrichtentext verwendet werden soll.Der Connector kann das Kafka-Thema, die Partition, den Offset und den Zeitstempel als Nachrichtenattribute einbeziehen, indem er die
metadata.publish-Property auftruesetzt.Der Connector kann Kafka-Nachrichtenkopfzeilen als Pub/Sub-Nachrichtenattribute einfügen, indem die Property
headers.publishauftruegesetzt wird.Der Connector kann einen Sortierschlüssel für Pub/Sub-Nachrichten mit der Eigenschaft
orderingKeySourceeinfügen. Mögliche Werte sind"none"(Standard),"key"und"partition".Mit der Eigenschaft
tasks.maxwird die Parallelität des Connectors gesteuert. Durch Erhöhen vontasks.maxkann der Durchsatz verbessert werden. Die tatsächliche Parallelität wird jedoch durch die Anzahl der Partitionen in den Kafka-Themen begrenzt.
Eigenschaften eines Pub/Sub-Senken-Connectors
Wenn Sie einen Pub/Sub-Senken-Connector erstellen, müssen Sie die folgenden Attribute angeben.
Connector-Name
Ein eindeutiger Name für den Connector im Connect-Cluster. Hinweise zum Benennen von Ressourcen finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource.
Typ des Connector-Plug‑ins
Wählen Sie Pub/Sub Sink als Connector-Plug-in-Typ aus. Dadurch wird die Richtung des Datenflusses von Kafka nach Pub/Sub und die verwendete Connector-Implementierung bestimmt. Wenn Sie den Connector nicht über die Benutzeroberfläche konfigurieren, müssen Sie auch die Connector-Klasse angeben.
Kafka-Themen
Die Kafka-Themen, aus denen der Connector Nachrichten abruft.
Sie können ein oder mehrere Themen angeben oder einen regulären Ausdruck verwenden, um mehrere Themen abzugleichen. Beispiel: topic.* für alle Themen, die mit „topic“ beginnen. Diese Themen müssen im Managed Service for Apache Kafka-Cluster vorhanden sein, der Ihrem Connect-Cluster zugeordnet ist.
Pub/Sub-Thema
Das vorhandene Pub/Sub-Thema, in dem der Connector Nachrichten veröffentlicht. Das Dienstkonto des Connect-Clusters muss die Rolle roles/pubsub.publisher für das Projekt des Themas haben, wie unter Vorbereitung beschrieben.
Konfiguration
In diesem Abschnitt können Sie zusätzliche, connectorspezifische Konfigurationseigenschaften angeben.
Da Daten in Kafka-Themen in verschiedenen Formaten wie Avro, JSON oder Rohbytes vorliegen können, ist die Angabe von Konvertern ein wichtiger Teil der Konfiguration. Mit Converters werden Daten aus dem Format, das in Ihren Kafka-Themen verwendet wird, in das standardisierte interne Format von Kafka Connect übersetzt. Der Pub/Sub-Senken-Connector nimmt dann diese internen Daten und wandelt sie in das von Pub/Sub benötigte Format um, bevor er sie schreibt.
Allgemeinere Informationen zur Rolle von Konvertern in Kafka Connect, zu unterstützten Konvertertypen und zu gängigen Konfigurationsoptionen finden Sie unter Konverter.
Hier sind einige Konfigurationen, die speziell für den Pub/Sub-Senken-Connector gelten:
cps.project: Gibt die Google Cloud Projekt-ID an, die das Pub/Sub-Thema enthält.cps.topic: Gibt das Pub/Sub-Thema an, in dem Daten veröffentlicht werden.cps.endpoint: Gibt den zu verwendenden Pub/Sub-Endpunkt an.
Eine Liste der für diesen Connector verfügbaren Konfigurationseigenschaften finden Sie unter Pub/Sub Sink Connector-Konfigurationen.
Pub/Sub-Senken-Connector erstellen
Bevor Sie einen Connector erstellen, lesen Sie die Dokumentation zu Eigenschaften eines Pub/Sub-Senken-Connectors.
Console
Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.
Klicken Sie auf den Connect-Cluster, für den Sie den Connector erstellen möchten.
Die Seite Clusterdetails verbinden wird angezeigt.
Klicken Sie auf Connector erstellen.
Die Seite Kafka-Connector erstellen wird angezeigt.
Geben Sie für den Connectornamen einen String ein.
Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.
Wählen Sie als Connector-Plug-in die Option Pub/Sub-Senke aus.
Wählen Sie unter Themen entweder Liste mit Kafka-Themen auswählen oder Themen-Regex verwenden aus. Wählen Sie dann die Kafka-Themen aus, aus denen dieser Connector Nachrichten abruft, oder geben Sie sie ein. Diese Themen befinden sich in Ihrem verknüpften Kafka-Cluster.
Wählen Sie unter Cloud Pub/Sub-Thema auswählen das Pub/Sub-Thema aus, in dem dieser Connector Nachrichten veröffentlicht. Das Thema wird im Format des vollständigen Ressourcennamens angezeigt:
projects/{project}/topics/{topic}.Optional: Konfigurieren Sie zusätzliche Einstellungen im Bereich Konfigurationen. Hier geben Sie Properties wie
tasks.max,key.converterundvalue.converteran, wie im vorherigen Abschnitt beschrieben.Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.
Klicken Sie auf Erstellen.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl
gcloud managed-kafka connectors createaus:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEErsetzen Sie Folgendes:
CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.
CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.
CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.
Hier ist ein Beispiel für eine Konfigurationsdatei für den Pub/Sub-Sink-Connector:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"Ersetzen Sie Folgendes:
CPS_SINK_CONNECTOR_ID: Die ID oder der Name des Pub/Sub Sink-Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
GMK_TOPIC_ID: Die ID des Managed Service for Apache Kafka-Themas, aus dem Daten vom Pub/Sub-Senken-Connector gelesen werden.
CPS_TOPIC_ID: Die ID des Pub/Sub-Themas, in dem Daten veröffentlicht werden.
GCP_PROJECT_ID: Die ID des Google Cloud-Projekts, in dem sich Ihr Pub/Sub-Thema befindet.
Terraform
Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.
Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.
Go
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Java
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Python
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.