Pub/Sub-Quell-Connectors streamen Nachrichten von Pub/Sub zu Kafka. So können Sie Pub/Sub in Ihre Kafka-basierten Anwendungen und Datenpipelines einbinden.
Der Connector liest Nachrichten aus einem Pub/Sub-Abo, konvertiert jede Nachricht in einen Kafka-Datensatz und schreibt die Datensätze in ein Kafka-Thema. Standardmäßig erstellt der Connector Kafka-Datensätze so:
- Der Kafka-Datensatzschlüssel ist
null. - Der Kafka-Datensatzwert sind die Pub/Sub-Nachrichtendaten als Byte.
- Die Kafka-Datensatzheader sind leer.
Sie können dieses Verhalten jedoch konfigurieren. Weitere Informationen finden Sie unter Connector konfigurieren.
Hinweise
Bevor Sie einen Pub/Sub-Quell-Connector erstellen, benötigen Sie Folgendes:
Ein Pub/Sub-Thema mit einem Abo.
Ein Kafka-Thema im Kafka-Cluster.
Ein Connect-Cluster. Wenn Sie den Connect-Cluster erstellen, legen Sie den Managed Service for Apache Kafka-Cluster als primären Kafka-Cluster fest.
Erforderliche Rollen und Berechtigungen
Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für das Projekt mit dem Connect-Cluster zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Pub/Sub-Quellconnectors benötigen.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Pub/Sub-Quellconnectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:
Erforderliche Berechtigungen
Die folgenden Berechtigungen sind zum Erstellen eines Pub/Sub-Quell-Connectors erforderlich:
-
Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster:
managedkafka.connectors.create
Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.
Weitere Informationen zur Rolle „Managed Kafka Connector Editor“ finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.
Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Connect-Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.
Berechtigungen zum Lesen aus Pub/Sub gewähren
Das Managed Kafka-Dienstkonto muss berechtigt sein, Nachrichten aus dem Pub/Sub-Abo zu lesen. Weisen Sie dem Dienstkonto im Projekt mit dem Pub/Sub-Abo die folgenden IAM-Rollen zu:
- Pub/Sub-Abonnent (
roles/pubsub.subscriber) - Pub/Sub-Betrachter (
roles/pubsub.viewer)
Das Managed Kafka-Dienstkonto hat das folgende Format:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com.
Ersetzen Sie PROJECT_NUMBER durch die Projektnummer.
Pub/Sub-Quell-Connector erstellen
Console
Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.
Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.
Klicken Sie auf Connector erstellen.
Geben Sie für den Connectornamen einen String ein.
Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.
Wählen Sie als Connector-Plug-in die Option Pub/Sub-Quelle aus.
Wählen Sie in der Liste Cloud Pub/Sub-Abo ein Pub/Sub-Abo aus. Der Connector ruft Nachrichten aus diesem Abo ab. Das Abo wird als vollständiger Ressourcenname angezeigt:
projects/{project}/subscriptions/{subscription}.Wählen Sie in der Liste Kafka-Thema das Kafka-Thema aus, in das Nachrichten geschrieben werden.
Optional: Fügen Sie im Feld Konfigurationen Konfigurationseigenschaften hinzu oder bearbeiten Sie die Standardeigenschaften. Weitere Informationen finden Sie unter Connector konfigurieren.
Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.
Klicken Sie auf Erstellen.
gcloud
Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl
gcloud managed-kafka connectors createaus:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEErsetzen Sie Folgendes:
CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
LOCATION: Der Standort des Connect-Clusters.
CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.
CONFIG_FILE: Der Pfad zu einer YAML- oder JSON-Konfigurationsdatei.
Hier sehen Sie ein Beispiel für eine Konfigurationsdatei:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Ersetzen Sie Folgendes:
PROJECT_ID: Die ID des Google Cloud-Projekts, in dem sich das Pub/Sub-Abo befindet.
PUBSUB_SUBSCRIPTION_ID: Die ID des Pub/Sub-Abos, aus dem Daten abgerufen werden sollen.
KAFKA_TOPIC_ID: Die ID des Kafka-Themas, in das Daten geschrieben werden.
Die Konfigurationseigenschaften cps.project, cps.subscription und kafka.topic sind erforderlich. Weitere Konfigurationsoptionen finden Sie unter Connector konfigurieren.
Terraform
Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.
Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.
Go
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Java
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Python
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.
Connector konfigurieren
In diesem Abschnitt werden einige Konfigurationseigenschaften beschrieben, die Sie für den Connector festlegen können.
Eine vollständige Liste der Eigenschaften, die für diesen Connector spezifisch sind, finden Sie unter Pub/Sub-Quellconnector-Konfigurationen.
Pull-Modus
Der Pull-Modus gibt an, wie der Connector Pub/Sub-Nachrichten abruft. Die folgenden Modi werden unterstützt:
Pull-Modus (Standard): Nachrichten werden in Batches abgerufen. Um diesen Modus zu aktivieren, legen Sie
cps.streamingPull.enabled=false.fest. Um die Batchgröße zu konfigurieren, legen Sie die Eigenschaftcps.maxBatchSizefest.Weitere Informationen zum Pull-Modus finden Sie unter Pull API.
Streaming-Pull-Modus. Ermöglicht den maximalen Durchsatz und die niedrigste Latenz beim Abrufen von Nachrichten aus Pub/Sub. Setzen Sie
cps.streamingPull.enabled=true, um diesen Modus zu aktivieren.Weitere Informationen zum Streaming-Pull-Modus finden Sie unter StreamingPull API.
Wenn das Streaming von Pull-Vorgängen aktiviert ist, können Sie die Leistung durch Festlegen der folgenden Konfigurationseigenschaften optimieren:
cps.streamingPull.flowControlBytes: Die maximale Anzahl ausstehender Message-Bytes pro Aufgabe.cps.streamingPull.flowControlMessages: Die maximale Anzahl ausstehender Nachrichten pro Aufgabe.cps.streamingPull.maxAckExtensionMs: Die maximale Zeit, um die der Connector die Frist für das Abonnieren verlängert, in Millisekunden.cps.streamingPull.maxMsPerAckExtension: Die maximale Zeit, um die der Connector die Anmeldefrist pro Verlängerung verlängert, in Millisekunden.cps.streamingPull.parallelStreams: Die Anzahl der Streams, aus denen Nachrichten aus dem Abo abgerufen werden sollen.
Pub/Sub-Endpunkt
Standardmäßig verwendet der Connector den globalen Pub/Sub-Endpunkt. Um einen Endpunkt anzugeben, legen Sie die Eigenschaft cps.endpoint auf die Endpunktadresse fest.
Weitere Informationen zu Endpunkten finden Sie unter Pub/Sub-Endpunkte.
Kafka-Datensätze
Der Quell-Connector von Pub/Sub konvertiert Pub/Sub-Nachrichten in Kafka-Datensätze. In den folgenden Abschnitten wird der Konvertierungsprozess beschrieben.
Eintragsschlüssel
Der Schlüsselkonverter muss org.apache.kafka.connect.storage.StringConverter sein.
Standardmäßig sind Datensatzschlüssel
null.Wenn Sie ein Pub/Sub-Nachrichtenattribut als Schlüssel verwenden möchten, legen Sie
kafka.key.attributeauf den Namen des Attributs fest. Beispiel:kafka.key.attribute=username.Wenn Sie den Pub/Sub-Bestellschlüssel als Schlüssel verwenden möchten, legen Sie
kafka.key.attribute=orderingKeyfest.
Datensatz-Header
Standardmäßig sind die Datensatzüberschriften leer.
Wenn kafka.record.headers gleich true ist, werden Pub/Sub-Nachrichtenattribute als Datensatzheader geschrieben. Legen Sie cps.makeOrderingKeyAttribute=true fest, um den Reihenfolgeschlüssel einzuschließen.
Datensatzwert
Wenn kafka.record.headers gleich true ist oder die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, ist der Datensatzwert die Nachrichtendaten als Byte-Array.
Legen Sie den Wertkonverter auf org.apache.kafka.connect.converters.ByteArrayConverter fest.
Andernfalls, wenn kafka.record.headers gleich false ist und die Nachricht mindestens ein benutzerdefiniertes Attribut hat, schreibt der Connector den Datensatzwert als struct. Legen Sie den Wertkonverter auf org.apache.kafka.connect.json.JsonConverter fest.
struct enthält die folgenden Felder:
message: Die Pub/Sub-Nachrichtendaten als Byte.Ein Feld für jedes Pub/Sub-Nachrichtenattribut. Legen Sie
cps.makeOrderingKeyAttribute=truefest, um den Sortierschlüssel einzuschließen.
Angenommen, die Nachricht hat das Attribut username:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Wenn value.converter.schemas.enable true ist, enthält struct sowohl die Nutzlast als auch das Schema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Kafka-Partitionen
Standardmäßig schreibt der Connector in eine einzelne Partition im Thema. Wenn Sie angeben möchten, in wie viele Partitionen der Connector schreibt, legen Sie die Property kafka.partition.count fest. Der Wert darf die Anzahl der Partitionen des Themas nicht überschreiten.
Mit der Eigenschaft kafka.partition.scheme können Sie festlegen, wie der Connector Nachrichten Partitionen zuweist. Weitere Informationen finden Sie unter Pub/Sub-Quellconnector-Konfigurationen.