Pub/Sub-Quell-Connectors streamen Nachrichten von Pub/Sub zu Kafka. So können Sie Pub/Sub in Ihre Kafka-basierten Anwendungen und Datenpipelines einbinden.
Anwendungsfälle für Pub/Sub-Quell-Connectors:
Datenaufnahme in Echtzeit Daten aus Cloud-Diensten oder anderen Anwendungen in Pub/Sub veröffentlichen und dann zur Streamverarbeitung in Kafka replizieren.
Ereignisgesteuerte Architekturen Kafka-basierte Verarbeitung durch Nachrichten auslösen, die in Pub/Sub veröffentlicht werden.
Der Connector liest Nachrichten aus einem Pub/Sub-Abo, konvertiert jede Nachricht in einen Kafka-Datensatz und schreibt die Datensätze in ein Kafka-Thema. Standardmäßig erstellt der Connector Kafka-Datensätze so:
- Der Kafka-Datensatzschlüssel ist
null. - Der Kafka-Datensatzwert sind die Pub/Sub-Nachrichtendaten als Byte.
- Die Kafka-Datensatzheader sind leer.
Sie können dieses Verhalten jedoch konfigurieren. Weitere Informationen finden Sie unter Connector konfigurieren.
Hinweis
Bevor Sie einen Pub/Sub-Quell-Connector erstellen, benötigen Sie Folgendes:
Ein Pub/Sub-Thema mit einem Abo.
Ein Kafka-Thema im Kafka-Cluster.
Einen Connect-Cluster. Wenn Sie den Connect-Cluster erstellen, legen Sie den Managed Service for Apache Kafka-Cluster als den primären Kafka-Cluster fest.
Erforderliche Rollen und Berechtigungen
Bitten Sie Ihren Administrator, Ihnen die
Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM-Rolle für Ihr Projekt zu gewähren, um die Berechtigungen zu erhalten, die
Sie zum Erstellen eines Connectors benötigen.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:
Erforderliche Berechtigungen
Die folgenden Berechtigungen sind zum Erstellen eines Connectors erforderlich:
-
Connector erstellen:
managedkafka.connectors.create
Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.
Berechtigungen zum Lesen aus Pub/Sub gewähren
Das Managed Kafka-Dienstkonto muss die Berechtigung haben, Nachrichten aus dem Pub/Sub-Abo zu lesen. Weisen Sie dem Dienstkonto im Projekt mit dem Pub/Sub-Abo die folgenden IAM-Rollen zu:
- Pub/Sub-Abonnent (
roles/pubsub.subscriber) - Pub/Sub-Betrachter (
roles/pubsub.viewer)
Das Managed Kafka-Dienstkonto hat das folgende Format:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com,
wobei PROJECT_NUMBER die Projektnummer des Connect-Clusters ist.
Wenn sich Ihr Connect-Cluster in einem anderen Projekt als Ihr Managed Service for Apache Kafka Cluster befindet, lesen Sie Connect-Cluster in einem anderen Projekt erstellen.
Pub/Sub-Quell-Connector erstellen
Console
Rufen Sie in der Google Cloud Console die Seite Connect-Cluster auf.
Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.
Klicken Sie auf Connector erstellen.
Geben Sie einen String für den Connectornamen ein.
Tipps zum Benennen von Connectors finden Sie unter Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.
Wählen Sie unter Connector-Plug-in die Option Pub/Sub-Quelle aus.
Wählen Sie in der Liste Cloud Pub/Sub-Abo ein Pub/Sub-Abo aus. Der Connector ruft Nachrichten aus diesem Abo ab. Das Abo wird als vollständiger Ressourcenname angezeigt:
projects/{project}/subscriptions/{subscription}.Wählen Sie in der Liste Kafka-Thema das Kafka-Thema aus, in das Nachrichten geschrieben werden.
Optional: Fügen Sie im Feld Konfigurationen Konfigurationseigenschaften hinzu oder bearbeiten Sie die Standardeigenschaften. Weitere Informationen finden Sie unter Connector konfigurieren.
Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie für Task-Neustart.
Klicken Sie auf Erstellen.
gcloud
Führen Sie den
gcloud managed-kafka connectors createBefehl aus:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEErsetzen Sie Folgendes:
CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie unter Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors kann nicht geändert werden.
LOCATION: Der Standort des Connect-Clusters.
CONNECT_CLUSTER_ID: Die ID des Connect Clusters, in dem der Connector erstellt wird.
CONFIG_FILE: Der Pfad zu einer YAML- oder JSON Konfigurationsdatei.
Dies ist ein Beispiel für eine Konfigurationsdatei:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Ersetzen Sie Folgendes:
PROJECT_ID: Die ID des Google Cloud Projekts, in dem sich das Pub/Sub-Abo befindet.
PUBSUB_SUBSCRIPTION_ID: Die ID des Pub/Sub-Abos, aus dem Daten abgerufen werden sollen.
KAFKA_TOPIC_ID: Die ID des Kafka-Themas, in das Daten geschrieben werden.
Die Konfigurationseigenschaften cps.project, cps.subscription und kafka.topic sind erforderlich. Weitere Konfigurationsoptionen finden Sie unter
Connector konfigurieren.
Terraform
Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.
Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.
Go
Folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Go API.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Java
Folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Java API.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Python
Folgen Sie der Einrichtungsanleitung für Python unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Python API.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.
Connector konfigurieren
In diesem Abschnitt werden einige Konfigurationseigenschaften beschrieben, die Sie für den Connector festlegen können.
Eine vollständige Liste der Eigenschaften, die für diesen Connector spezifisch sind, finden Sie unter Konfigurationen für Pub/Sub-Quell-Connectors.
Pull-Modus
Der Pull-Modus gibt an, wie der Connector Pub/Sub-Nachrichten abruft. Die folgenden Modi werden unterstützt:
Pull-Modus (Standard). Nachrichten werden in Batches abgerufen. Wenn Sie diesen Modus aktivieren möchten, legen Sie
cps.streamingPull.enabled=false.fest. Legen Sie die Batchgröße mit dercps.maxBatchSizeEigenschaft fest.Weitere Informationen zum Pull-Modus finden Sie unter Pull API.
Streaming-Pull-Modus. Ermöglicht den maximalen Durchsatz und die niedrigste Latenz beim Abrufen von Nachrichten aus Pub/Sub. Wenn Sie diesen Modus aktivieren möchten, legen Sie
cps.streamingPull.enabled=truefest.Weitere Informationen zum Streaming-Pull-Modus finden Sie unter StreamingPull API.
Wenn der Streaming-Pull-Modus aktiviert ist, können Sie die Leistung optimieren, indem Sie die folgenden Konfigurationseigenschaften festlegen:
cps.streamingPull.flowControlBytes: Die maximale Anzahl ausstehender Nachrichtenbytes pro Aufgabe.cps.streamingPull.flowControlMessages: Die maximale Anzahl ausstehender Nachrichten pro Aufgabe.cps.streamingPull.maxAckExtensionMs: Die maximale Zeit in Millisekunden, um die der Connector die Frist für das Abo verlängert.cps.streamingPull.maxMsPerAckExtension: Die maximale Zeit in Millisekunden, um die der Connector die Frist für das Abo pro Verlängerung verlängert.cps.streamingPull.parallelStreams: Die Anzahl der Streams, aus denen Nachrichten aus dem Abo abgerufen werden sollen.
Pub/Sub-Endpunkt
Standardmäßig verwendet der Connector den globalen Pub/Sub-Endpunkt. Wenn Sie einen Endpunkt angeben möchten, legen Sie die Eigenschaft cps.endpoint auf die Endpunktadresse fest.
Weitere Informationen zu Endpunkten finden Sie unter
Pub/Sub-Endpunkte.
Kafka-Partitionen
Standardmäßig schreibt der Connector in eine einzelne Partition im Thema. Wenn Sie angeben möchten, in wie viele Partitionen der Connector schreiben soll, legen Sie die Eigenschaft kafka.partition.count fest. Der Wert darf die Anzahl der Partitionen des Themas nicht überschreiten.
Wenn Sie angeben möchten, wie der Connector Nachrichten Partitionen zuweist, legen Sie die Eigenschaft kafka.partition.scheme fest. Weitere Informationen finden Sie unter
Konfigurationen für Pub/Sub-Quell-Connectors.
Konverter
Legen Sie den Schlüsselkonverter auf org.apache.kafka.connect.storage.StringConverter fest.
Legen Sie je nach Connectorkonfiguration den Wertkonverter auf einen der folgenden Werte fest:
org.apache.kafka.connect.converters.ByteArrayConverterorg.apache.kafka.connect.json.JsonConverter
Weitere Informationen finden Sie unter Datensatzwert.
Nachrichtenkonvertierung
Der Pub/Sub-Quell-Connector konvertiert Pub/Sub-Nachrichten in Kafka-Datensätze. In den folgenden Abschnitten wird der Konvertierungsprozess beschrieben.
Datensatzschlüssel
Der Schlüsselkonverter muss org.apache.kafka.connect.storage.StringConverter sein.
Standardmäßig sind Datensatzschlüssel
null.Wenn Sie ein Pub/Sub Nachrichtenattribut als Schlüssel verwenden möchten, legen Sie
kafka.key.attributeauf den Namen des Attributs fest. Beispiel:kafka.key.attribute=username.Wenn Sie den Pub/Sub Bestellschlüssel als Schlüssel verwenden möchten, legen Sie
kafka.key.attribute=orderingKeyfest.
Datensatzheader
Standardmäßig sind Datensatzheader leer.
Wenn kafka.record.headers auf true gesetzt ist, werden Pub/Sub-Nachrichtenattribute als Datensatzheader geschrieben. Wenn Sie den Bestellschlüssel einbeziehen möchten, legen Sie cps.makeOrderingKeyAttribute=true fest.
Datensatzwert
Datensatzwerte werden entweder als Byte-Arrays oder als struct-Typen geschrieben.
Datensatzwerte als Byte-Arrays
Wenn kafka.record.headers auf true gesetzt ist oder die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, schreibt der Connector die Nachrichtendaten als Byte-Array.
Legen Sie den Wertkonverter auf org.apache.kafka.connect.converters.ByteArrayConverter fest.
Datensatzwerte als Structs
Wenn kafka.record.headers auf false gesetzt ist und die Nachricht mindestens ein benutzerdefiniertes Attribut hat, schreibt der Connector den Datensatzwert als struct. Legen Sie den Wertkonverter auf org.apache.kafka.connect.json.JsonConverter fest.
Das struct enthält die folgenden Felder:
message: Die Pub/Sub-Nachrichtendaten als Byte.Ein Feld für jedes Pub/Sub-Nachrichtenattribut. Wenn Sie den Bestellschlüssel einbeziehen möchten, legen Sie
cps.makeOrderingKeyAttribute=truefest.
Wenn die Nachricht beispielsweise ein username-Attribut hat, sieht der Datensatzwert so aus:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Wenn value.converter.schemas.enable auf true gesetzt ist, enthält das struct sowohl die Nutzlast als auch das Schema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}