Mit Cloud Storage-Senken-Connectors können Sie Daten aus Ihren Kafka-Themen in Cloud Storage-Buckets streamen. Dies ist nützlich, um große Datenmengen kostengünstig und skalierbar zu speichern und zu verarbeiten.
Hinweise
Bevor Sie einen Cloud Storage-Sink-Connector erstellen, benötigen Sie Folgendes:
Erstellen Sie einen Managed Service for Apache Kafka-Cluster für Ihren Connect-Cluster. Dies ist der primäre Kafka-Cluster, der mit dem Connect-Cluster verknüpft ist. Dies ist auch der Quellcluster, der ein Ende der Connector-Pipeline bildet.
Erstellen Sie einen Connect-Cluster, um den Cloud Storage-Senken-Connector zu hosten.
Erstellen Sie einen Cloud Storage-Bucket zum Speichern der aus Kafka gestreamten Daten.
Erstellen und konfigurieren Sie ein Kafka-Thema im Quellcluster. Daten werden von diesem Kafka-Thema in den Cloud Storage-Ziel-Bucket verschoben.
Erforderliche Rollen und Berechtigungen
Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Cloud Storage-Sink-Connectors benötigen.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Cloud Storage-Sink-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:
Erforderliche Berechtigungen
Die folgenden Berechtigungen sind zum Erstellen eines Cloud Storage-Sink-Connectors erforderlich:
-
Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster:
managedkafka.connectors.create
Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.
Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.
Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Connect-Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.
Berechtigungen zum Schreiben in den Cloud Storage-Bucket gewähren
Das Connect-Cluster-Dienstkonto, das dem Format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com entspricht, benötigt die folgenden Cloud Storage-Berechtigungen:
storage.objects.createstorage.objects.delete
Weisen Sie dazu dem Dienstkonto des Connect-Clusters die Rolle Storage-Objekt-Nutzer (roles/storage.objectUser) für das Projekt mit dem Cloud Storage-Bucket zu.
Funktionsweise eines Cloud Storage-Senken-Connectors
Ein Cloud Storage-Senken-Connector ruft Daten aus einem oder mehreren Kafka-Themen ab und schreibt diese Daten in Objekte in einem einzelnen Cloud Storage-Bucket.
So kopiert der Cloud Storage Sink-Connector Daten:
Der Connector empfängt Nachrichten aus einem oder mehreren Kafka-Themen im Quellcluster.
Der Connector schreibt die Daten in den Ziel-Cloud Storage-Bucket, den Sie in der Connectorkonfiguration angegeben haben.
Der Connector formatiert die Daten beim Schreiben in den Cloud Storage-Bucket anhand bestimmter Attribute in der Connector-Konfiguration. Die Ausgabedateien sind standardmäßig im CSV-Format. Sie können die Eigenschaft
format.output.typekonfigurieren, um verschiedene Ausgabeformate wie JSON anzugeben.Der Connector benennt auch die Dateien, die in den Cloud Storage-Bucket geschrieben werden. Mit den Attributen
file.name.prefixundfile.name.templatekönnen Sie die Dateinamen anpassen. Sie können beispielsweise den Namen des Kafka-Themas oder Nachrichtenschlüssel in den Dateinamen einfügen.Ein Kafka-Datensatz besteht aus drei Komponenten: Headern, Schlüsseln und Werten.
Sie können Header in die Ausgabedatei einfügen, indem Sie
format.output.fieldsauf „include headers“ (Header einfügen) festlegen. Beispiel:format.output.fields=value,headers.Sie können Schlüssel in die Ausgabedatei aufnehmen, indem Sie
format.output.fieldsaufkeyfestlegen. Beispiel:format.output.fields=key,value,headers.Schlüssel können auch verwendet werden, um Datensätze zu gruppieren, indem
keyin diefile.name.template-Property aufgenommen wird.
Sie können Werte standardmäßig in die Ausgabedatei aufnehmen, da
format.output.fieldsstandardmäßig aufvaluefestgelegt ist.Der Connector schreibt die konvertierten und formatierten Daten in den angegebenen Cloud Storage-Bucket.
Der Connector komprimiert die im Cloud Storage-Bucket gespeicherten Dateien, wenn Sie die Dateikomprimierung mit der Eigenschaft
file.compression.typekonfigurieren.Konverterkonfigurationen werden durch das Attribut
format.output.typeeingeschränkt.Wenn
format.output.typebeispielsweise aufcsvfestgelegt ist, muss der Schlüssel-Converterorg.apache.kafka.connect.converters.ByteArrayConverteroderorg.apache.kafka.connect.storage.StringConverterund der Wert-Converterorg.apache.kafka.connect.converters.ByteArrayConvertersein.Wenn
format.output.typeaufjsongesetzt ist, werden Wert- und Schlüssel-Schema nicht zusammen mit den Daten in die Ausgabedatei geschrieben, auch wenn die Eigenschaftvalue.converter.schemas.enableauf „true“ gesetzt ist.
Mit der Eigenschaft
tasks.maxwird die Parallelität des Connectors gesteuert. Durch Erhöhen vontasks.maxkann der Durchsatz verbessert werden. Die tatsächliche Parallelität wird jedoch durch die Anzahl der Partitionen in den Kafka-Themen begrenzt.
Eigenschaften eines Cloud Storage-Senken-Connectors
Geben Sie beim Erstellen eines Cloud Storage-Senken-Connectors die folgenden Attribute an.
Connector-Name
Der Name oder die ID des Connectors. Hinweise zum Benennen der Ressource finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource. Der Name ist unveränderlich.
Typ des Connector-Plug‑ins
Wählen Sie in derGoogle Cloud -Konsole Cloud Storage Sink als Connector-Plug-in-Typ aus. Wenn Sie den Connector nicht über die Benutzeroberfläche konfigurieren, müssen Sie auch die Connector-Klasse angeben.
Themen
Die Kafka-Themen, aus denen der Connector Nachrichten abruft.
Sie können ein oder mehrere Themen angeben oder einen regulären Ausdruck verwenden, um mehrere Themen abzugleichen. Beispiel: topic.* für alle Themen, die mit „topic“ beginnen. Diese Themen müssen im Managed Service for Apache Kafka-Cluster vorhanden sein, der Ihrem Connect-Cluster zugeordnet ist.
Cloud Storage-Bucket
Wählen Sie den Cloud Storage-Bucket aus, in dem die Daten gespeichert werden, oder erstellen Sie ihn.
Konfiguration
In diesem Abschnitt können Sie zusätzliche, connectorspezifische Konfigurationseigenschaften für den Cloud Storage-Sink-Connector angeben.
Da Daten in Kafka-Themen in verschiedenen Formaten wie Avro, JSON oder Rohbytes vorliegen können, ist die Angabe von Konvertern ein wichtiger Teil der Konfiguration. Mit Converters werden Daten aus dem Format, das in Ihren Kafka-Themen verwendet wird, in das standardisierte interne Format von Kafka Connect übersetzt. Der Cloud Storage-Sink-Connector nimmt dann diese internen Daten und transformiert sie in das Format, das für Ihren Cloud Storage-Bucket erforderlich ist, bevor er sie schreibt.
Allgemeinere Informationen zur Rolle von Konvertern in Kafka Connect, zu unterstützten Konvertertypen und zu gängigen Konfigurationsoptionen finden Sie unter Konverter.
Hier sind einige Konfigurationen, die speziell für den Cloud Storage-Sink-Connector gelten:
gcs.credentials.default: Gibt an, ob Google Cloud Anmeldedaten Google Cloud automatisch aus der Ausführungsumgebung erkannt werden sollen. Muss auftruefestgelegt sein.gcs.bucket.name: Gibt den Namen des Cloud Storage-Buckets an, in den Daten geschrieben werden. darf nicht leer bleibenfile.compression.type: Legt den Komprimierungstyp für Dateien fest, die im Cloud Storage-Bucket gespeichert sind. Beispiele sindgzip,snappy,zstdundnone. Der Standardwert istnone.file.name.prefix: Das Präfix, das dem Namen jeder Datei hinzugefügt werden soll, die im Cloud Storage-Bucket gespeichert wird. In der Standardeinstellung ist dieser Wert leer.format.output.type: Der Typ des Datenformats, das zum Schreiben von Daten in die Cloud Storage-Ausgabedateien verwendet wird. Unterstützte Werte sindcsv,json,jsonlundparquet. Der Standardwert istcsv.
Eine Liste der für diesen Connector verfügbaren Konfigurationseigenschaften finden Sie unter Cloud Storage Sink Connector-Konfigurationen.
Cloud Storage-Senken-Connector erstellen
Bevor Sie einen Connector erstellen, lesen Sie die Dokumentation zu Eigenschaften eines Cloud Storage-Senken-Connectors.
Console
Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.
Klicken Sie auf den Connect-Cluster, für den Sie den Connector erstellen möchten.
Die Seite Clusterdetails verbinden wird angezeigt.
Klicken Sie auf Connector erstellen.
Die Seite Kafka-Connector erstellen wird angezeigt.
Geben Sie für den Connectornamen einen String ein.
Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.
Wählen Sie für Connector-Plug-in die Option Cloud Storage Sink aus.
Geben Sie die Themen an, aus denen Sie Daten streamen können.
Wählen Sie den Storage-Bucket aus, in dem die Daten gespeichert werden sollen.
Optional: Konfigurieren Sie im Abschnitt Konfiguration zusätzliche Einstellungen.
Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.
Klicken Sie auf Erstellen.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl
gcloud managed-kafka connectors createaus:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEErsetzen Sie Folgendes:
CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.
CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.
CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.
Hier ist ein Beispiel für eine Konfigurationsdatei für den Cloud Storage-Sink-Connector:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"Ersetzen Sie Folgendes:
GMK_TOPIC_ID: Die ID des Managed Service for Apache Kafka-Themas, aus dem die Daten in den Cloud Storage-Senken-Connector fließen.
GCS_BUCKET_NAME: Der Name des Cloud Storage-Bucket, der als Senke für die Pipeline dient.
GCS_SINK_CONNECTOR_ID: Die ID oder der Name des Cloud Storage-Sink-Connectors. Tipps zum Benennen eines Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.
Terraform
Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.
Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.
Go
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.
Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Java
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Python
Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.
Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.