Cloud Storage-Senkenconnector erstellen

Mit Cloud Storage-Senken-Connectors können Sie Daten aus Ihren Kafka-Themen in Cloud Storage-Buckets streamen. Dies ist nützlich, um große Datenmengen kostengünstig und skalierbar zu speichern und zu verarbeiten.

Hinweise

Bevor Sie einen Cloud Storage-Sink-Connector erstellen, benötigen Sie Folgendes:

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Cloud Storage-Sink-Connectors benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Cloud Storage-Sink-Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Cloud Storage-Sink-Connectors erforderlich:

  • Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster: managedkafka.connectors.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle Managed Kafka Connector Editor finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Connect-Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.

Berechtigungen zum Schreiben in den Cloud Storage-Bucket gewähren

Das Connect-Cluster-Dienstkonto, das dem Format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com entspricht, benötigt die folgenden Cloud Storage-Berechtigungen:

  • storage.objects.create
  • storage.objects.delete

Weisen Sie dazu dem Dienstkonto des Connect-Clusters die Rolle Storage-Objekt-Nutzer (roles/storage.objectUser) für das Projekt mit dem Cloud Storage-Bucket zu.

Funktionsweise eines Cloud Storage-Senken-Connectors

Ein Cloud Storage-Senken-Connector ruft Daten aus einem oder mehreren Kafka-Themen ab und schreibt diese Daten in Objekte in einem einzelnen Cloud Storage-Bucket.

So kopiert der Cloud Storage Sink-Connector Daten:

  • Der Connector empfängt Nachrichten aus einem oder mehreren Kafka-Themen im Quellcluster.

  • Der Connector schreibt die Daten in den Ziel-Cloud Storage-Bucket, den Sie in der Connectorkonfiguration angegeben haben.

  • Der Connector formatiert die Daten beim Schreiben in den Cloud Storage-Bucket anhand bestimmter Attribute in der Connector-Konfiguration. Die Ausgabedateien sind standardmäßig im CSV-Format. Sie können die Eigenschaft format.output.type konfigurieren, um verschiedene Ausgabeformate wie JSON anzugeben.

  • Der Connector benennt auch die Dateien, die in den Cloud Storage-Bucket geschrieben werden. Mit den Attributen file.name.prefix und file.name.template können Sie die Dateinamen anpassen. Sie können beispielsweise den Namen des Kafka-Themas oder Nachrichtenschlüssel in den Dateinamen einfügen.

  • Ein Kafka-Datensatz besteht aus drei Komponenten: Headern, Schlüsseln und Werten.

    • Sie können Header in die Ausgabedatei einfügen, indem Sie format.output.fields auf „include headers“ (Header einfügen) festlegen. Beispiel: format.output.fields=value,headers.

    • Sie können Schlüssel in die Ausgabedatei aufnehmen, indem Sie format.output.fields auf key festlegen. Beispiel: format.output.fields=key,value,headers.

      Schlüssel können auch verwendet werden, um Datensätze zu gruppieren, indem key in die file.name.template-Property aufgenommen wird.

  • Sie können Werte standardmäßig in die Ausgabedatei aufnehmen, da format.output.fields standardmäßig auf value festgelegt ist.

  • Der Connector schreibt die konvertierten und formatierten Daten in den angegebenen Cloud Storage-Bucket.

  • Der Connector komprimiert die im Cloud Storage-Bucket gespeicherten Dateien, wenn Sie die Dateikomprimierung mit der Eigenschaft file.compression.type konfigurieren.

  • Konverterkonfigurationen werden durch das Attribut format.output.type eingeschränkt.

    • Wenn format.output.type beispielsweise auf csv festgelegt ist, muss der Schlüssel-Converter org.apache.kafka.connect.converters.ByteArrayConverter oder org.apache.kafka.connect.storage.StringConverter und der Wert-Converter org.apache.kafka.connect.converters.ByteArrayConverter sein.

    • Wenn format.output.type auf json gesetzt ist, werden Wert- und Schlüssel-Schema nicht zusammen mit den Daten in die Ausgabedatei geschrieben, auch wenn die Eigenschaft value.converter.schemas.enable auf „true“ gesetzt ist.

  • Mit der Eigenschaft tasks.max wird die Parallelität des Connectors gesteuert. Durch Erhöhen von tasks.max kann der Durchsatz verbessert werden. Die tatsächliche Parallelität wird jedoch durch die Anzahl der Partitionen in den Kafka-Themen begrenzt.

Eigenschaften eines Cloud Storage-Senken-Connectors

Geben Sie beim Erstellen eines Cloud Storage-Senken-Connectors die folgenden Attribute an.

Connector-Name

Der Name oder die ID des Connectors. Hinweise zum Benennen der Ressource finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource. Der Name ist unveränderlich.

Typ des Connector-Plug‑ins

Wählen Sie in derGoogle Cloud -Konsole Cloud Storage Sink als Connector-Plug-in-Typ aus. Wenn Sie den Connector nicht über die Benutzeroberfläche konfigurieren, müssen Sie auch die Connector-Klasse angeben.

Themen

Die Kafka-Themen, aus denen der Connector Nachrichten abruft. Sie können ein oder mehrere Themen angeben oder einen regulären Ausdruck verwenden, um mehrere Themen abzugleichen. Beispiel: topic.* für alle Themen, die mit „topic“ beginnen. Diese Themen müssen im Managed Service for Apache Kafka-Cluster vorhanden sein, der Ihrem Connect-Cluster zugeordnet ist.

Cloud Storage-Bucket

Wählen Sie den Cloud Storage-Bucket aus, in dem die Daten gespeichert werden, oder erstellen Sie ihn.

Konfiguration

In diesem Abschnitt können Sie zusätzliche, connectorspezifische Konfigurationseigenschaften für den Cloud Storage-Sink-Connector angeben.

Da Daten in Kafka-Themen in verschiedenen Formaten wie Avro, JSON oder Rohbytes vorliegen können, ist die Angabe von Konvertern ein wichtiger Teil der Konfiguration. Mit Converters werden Daten aus dem Format, das in Ihren Kafka-Themen verwendet wird, in das standardisierte interne Format von Kafka Connect übersetzt. Der Cloud Storage-Sink-Connector nimmt dann diese internen Daten und transformiert sie in das Format, das für Ihren Cloud Storage-Bucket erforderlich ist, bevor er sie schreibt.

Allgemeinere Informationen zur Rolle von Konvertern in Kafka Connect, zu unterstützten Konvertertypen und zu gängigen Konfigurationsoptionen finden Sie unter Konverter.

Hier sind einige Konfigurationen, die speziell für den Cloud Storage-Sink-Connector gelten:

  • gcs.credentials.default: Gibt an, ob Google Cloud Anmeldedaten Google Cloud automatisch aus der Ausführungsumgebung erkannt werden sollen. Muss auf true festgelegt sein.

  • gcs.bucket.name: Gibt den Namen des Cloud Storage-Buckets an, in den Daten geschrieben werden. darf nicht leer bleiben

  • file.compression.type: Legt den Komprimierungstyp für Dateien fest, die im Cloud Storage-Bucket gespeichert sind. Beispiele sind gzip, snappy, zstd und none. Der Standardwert ist none.

  • file.name.prefix: Das Präfix, das dem Namen jeder Datei hinzugefügt werden soll, die im Cloud Storage-Bucket gespeichert wird. In der Standardeinstellung ist dieser Wert leer.

  • format.output.type: Der Typ des Datenformats, das zum Schreiben von Daten in die Cloud Storage-Ausgabedateien verwendet wird. Unterstützte Werte sind csv, json, jsonl und parquet. Der Standardwert ist csv.

Eine Liste der für diesen Connector verfügbaren Konfigurationseigenschaften finden Sie unter Cloud Storage Sink Connector-Konfigurationen.

Cloud Storage-Senken-Connector erstellen

Bevor Sie einen Connector erstellen, lesen Sie die Dokumentation zu Eigenschaften eines Cloud Storage-Senken-Connectors.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, für den Sie den Connector erstellen möchten.

    Die Seite Clusterdetails verbinden wird angezeigt.

  3. Klicken Sie auf Connector erstellen.

    Die Seite Kafka-Connector erstellen wird angezeigt.

  4. Geben Sie für den Connectornamen einen String ein.

    Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.

  5. Wählen Sie für Connector-Plug-in die Option Cloud Storage Sink aus.

  6. Geben Sie die Themen an, aus denen Sie Daten streamen können.

  7. Wählen Sie den Storage-Bucket aus, in dem die Daten gespeichert werden sollen.

  8. Optional: Konfigurieren Sie im Abschnitt Konfiguration zusätzliche Einstellungen.

  9. Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.

  10. Klicken Sie auf Erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka connectors create aus:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • LOCATION: Der Ort, an dem Sie den Connector erstellen. Dies muss derselbe Standort sein, an dem Sie den Connect-Cluster erstellt haben.

    • CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.

    • CONFIG_FILE: Der Pfad zur YAML-Konfigurationsdatei für den BigQuery Sink-Connector.

    Hier ist ein Beispiel für eine Konfigurationsdatei für den Cloud Storage-Sink-Connector:

    connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    gcs.bucket.name: "GCS_BUCKET_NAME"
    gcs.credentials.default: "true"
    format.output.type: "json"
    name: "GCS_SINK_CONNECTOR_ID"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    

    Ersetzen Sie Folgendes:

    • GMK_TOPIC_ID: Die ID des Managed Service for Apache Kafka-Themas, aus dem die Daten in den Cloud Storage-Senken-Connector fließen.

    • GCS_BUCKET_NAME: Der Name des Cloud Storage-Bucket, der als Senke für die Pipeline dient.

    • GCS_SINK_CONNECTOR_ID: Die ID oder der Name des Cloud Storage-Sink-Connectors. Tipps zum Benennen eines Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

  3. Terraform

    Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.

    resource "google_managed_kafka_connector" "example-cloud-storage-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-gcs-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class"                = "io.aiven.kafka.connect.gcs.GcsSinkConnector"
        "tasks.max"                      = "3"
        "topics"                         = "GMK_TOPIC_ID"
        "gcs.bucket.name"                = "GCS_BUCKET_NAME"
        "gcs.credentials.default"        = "true"
        "format.output.type"             = "json"
        "name"                           = "my-gcs-sink-connector"
        "value.converter"                = "org.apache.kafka.connect.json.JsonConverter"
        "value.converter.schemas.enable" = "false"
        "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter"
      }
      provider = google-beta
    }

    Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createCloudStorageSinkConnector creates a Cloud Storage Sink connector.
    func createCloudStorageSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, gcsBucketName, tasksMax, formatOutputType, valueConverter, valueConverterSchemasEnable, keyConverter, gcsCredentialsDefault string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "GCS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// gcsBucketName := "GCS_BUCKET_NAME"
    	// tasksMax := "3"
    	// formatOutputType := "json"
    	// valueConverter := "org.apache.kafka.connect.json.JsonConverter"
    	// valueConverterSchemasEnable := "false"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// gcsCredentialsDefault := "true"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	config := map[string]string{
    		"connector.class":                "io.aiven.kafka.connect.gcs.GcsSinkConnector",
    		"tasks.max":                      tasksMax,
    		"topics":                         topics,
    		"gcs.bucket.name":                gcsBucketName,
    		"gcs.credentials.default":        gcsCredentialsDefault,
    		"format.output.type":             formatOutputType,
    		"name":                           connectorID,
    		"value.converter":                valueConverter,
    		"value.converter.schemas.enable": valueConverterSchemasEnable,
    		"key.converter":                  keyConverter,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Cloud Storage sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateCloudStorageSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-gcs-sink-connector";
        String bucketName = "my-gcs-bucket";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
        String maxTasks = "3";
        String gcsCredentialsDefault = "true";
        String formatOutputType = "json";
        String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
        String valueSchemasEnable = "false";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createCloudStorageSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            bucketName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            gcsCredentialsDefault,
            formatOutputType,
            valueConverter,
            valueSchemasEnable,
            keyConverter);
      }
    
      public static void createCloudStorageSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String bucketName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String gcsCredentialsDefault,
          String formatOutputType,
          String valueConverter,
          String valueSchemasEnable,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("gcs.bucket.name", bucketName);
        configMap.put("gcs.credentials.default", gcsCredentialsDefault);
        configMap.put("format.output.type", formatOutputType);
        configMap.put("name", connectorId);
        configMap.put("value.converter", valueConverter);
        configMap.put("value.converter.schemas.enable", valueSchemasEnable);
        configMap.put("key.converter", keyConverter);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
        "tasks.max": tasks_max,
        "topics": topics,
        "gcs.bucket.name": gcs_bucket_name,
        "gcs.credentials.default": "true",
        "format.output.type": format_output_type,
        "name": connector_id,
        "value.converter": value_converter,
        "value.converter.schemas.enable": value_converter_schemas_enable,
        "key.converter": key_converter,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.