Pub/Sub-Quellconnector erstellen

Pub/Sub-Quell-Connectors streamen Nachrichten von Pub/Sub zu Kafka. So können Sie Pub/Sub in Ihre Kafka-basierten Anwendungen und Datenpipelines einbinden.

Anwendungsfälle für Pub/Sub-Quell-Connectors:

  • Datenaufnahme in Echtzeit Daten aus Cloud-Diensten oder anderen Anwendungen in Pub/Sub veröffentlichen und dann zur Streamverarbeitung in Kafka replizieren.

  • Ereignisgesteuerte Architekturen Kafka-basierte Verarbeitung durch Nachrichten auslösen, die in Pub/Sub veröffentlicht werden.

Der Connector liest Nachrichten aus einem Pub/Sub-Abo, konvertiert jede Nachricht in einen Kafka-Datensatz und schreibt die Datensätze in ein Kafka-Thema. Standardmäßig erstellt der Connector Kafka-Datensätze so:

  • Der Kafka-Datensatzschlüssel ist null.
  • Der Kafka-Datensatzwert sind die Pub/Sub-Nachrichtendaten als Byte.
  • Die Kafka-Datensatzheader sind leer.

Sie können dieses Verhalten jedoch konfigurieren. Weitere Informationen finden Sie unter Connector konfigurieren.

Hinweis

Bevor Sie einen Pub/Sub-Quell-Connector erstellen, benötigen Sie Folgendes:

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM-Rolle für Ihr Projekt zu gewähren, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Connectors benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Connectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Connectors erforderlich:

  • Connector erstellen: managedkafka.connectors.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Berechtigungen zum Lesen aus Pub/Sub gewähren

Das Managed Kafka-Dienstkonto muss die Berechtigung haben, Nachrichten aus dem Pub/Sub-Abo zu lesen. Weisen Sie dem Dienstkonto im Projekt mit dem Pub/Sub-Abo die folgenden IAM-Rollen zu:

  • Pub/Sub-Abonnent (roles/pubsub.subscriber)
  • Pub/Sub-Betrachter (roles/pubsub.viewer)

Das Managed Kafka-Dienstkonto hat das folgende Format: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com, wobei PROJECT_NUMBER die Projektnummer des Connect-Clusters ist.

Wenn sich Ihr Connect-Cluster in einem anderen Projekt als Ihr Managed Service for Apache Kafka Cluster befindet, lesen Sie Connect-Cluster in einem anderen Projekt erstellen.

Pub/Sub-Quell-Connector erstellen

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect-Cluster auf.

    Zu den Connect-Clustern

  2. Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.

  3. Klicken Sie auf Connector erstellen.

  4. Geben Sie einen String für den Connectornamen ein.

    Tipps zum Benennen von Connectors finden Sie unter Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.

  5. Wählen Sie unter Connector-Plug-in die Option Pub/Sub-Quelle aus.

  6. Wählen Sie in der Liste Cloud Pub/Sub-Abo ein Pub/Sub-Abo aus. Der Connector ruft Nachrichten aus diesem Abo ab. Das Abo wird als vollständiger Ressourcenname angezeigt: projects/{project}/subscriptions/{subscription}.

  7. Wählen Sie in der Liste Kafka-Thema das Kafka-Thema aus, in das Nachrichten geschrieben werden.

  8. Optional: Fügen Sie im Feld Konfigurationen Konfigurationseigenschaften hinzu oder bearbeiten Sie die Standardeigenschaften. Weitere Informationen finden Sie unter Connector konfigurieren.

  9. Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie für Task-Neustart.

  10. Klicken Sie auf Erstellen.

gcloud

  1. Führen Sie den gcloud managed-kafka connectors create Befehl aus:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie unter Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors kann nicht geändert werden.

    • LOCATION: Der Standort des Connect-Clusters.

    • CONNECT_CLUSTER_ID: Die ID des Connect Clusters, in dem der Connector erstellt wird.

    • CONFIG_FILE: Der Pfad zu einer YAML- oder JSON Konfigurationsdatei.

Dies ist ein Beispiel für eine Konfigurationsdatei:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Ersetzen Sie Folgendes:

  • PROJECT_ID: Die ID des Google Cloud Projekts, in dem sich das Pub/Sub-Abo befindet.

  • PUBSUB_SUBSCRIPTION_ID: Die ID des Pub/Sub-Abos, aus dem Daten abgerufen werden sollen.

  • KAFKA_TOPIC_ID: Die ID des Kafka-Themas, in das Daten geschrieben werden.

Die Konfigurationseigenschaften cps.project, cps.subscription und kafka.topic sind erforderlich. Weitere Konfigurationsoptionen finden Sie unter Connector konfigurieren.

Terraform

Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

Go

Folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Go API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Java API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Python API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.

Connector konfigurieren

In diesem Abschnitt werden einige Konfigurationseigenschaften beschrieben, die Sie für den Connector festlegen können.

Eine vollständige Liste der Eigenschaften, die für diesen Connector spezifisch sind, finden Sie unter Konfigurationen für Pub/Sub-Quell-Connectors.

Pull-Modus

Der Pull-Modus gibt an, wie der Connector Pub/Sub-Nachrichten abruft. Die folgenden Modi werden unterstützt:

  • Pull-Modus (Standard). Nachrichten werden in Batches abgerufen. Wenn Sie diesen Modus aktivieren möchten, legen Sie cps.streamingPull.enabled=false. fest. Legen Sie die Batchgröße mit der cps.maxBatchSize Eigenschaft fest.

    Weitere Informationen zum Pull-Modus finden Sie unter Pull API.

  • Streaming-Pull-Modus. Ermöglicht den maximalen Durchsatz und die niedrigste Latenz beim Abrufen von Nachrichten aus Pub/Sub. Wenn Sie diesen Modus aktivieren möchten, legen Sie cps.streamingPull.enabled=true fest.

    Weitere Informationen zum Streaming-Pull-Modus finden Sie unter StreamingPull API.

    Wenn der Streaming-Pull-Modus aktiviert ist, können Sie die Leistung optimieren, indem Sie die folgenden Konfigurationseigenschaften festlegen:

    • cps.streamingPull.flowControlBytes: Die maximale Anzahl ausstehender Nachrichtenbytes pro Aufgabe.
    • cps.streamingPull.flowControlMessages: Die maximale Anzahl ausstehender Nachrichten pro Aufgabe.
    • cps.streamingPull.maxAckExtensionMs: Die maximale Zeit in Millisekunden, um die der Connector die Frist für das Abo verlängert.
    • cps.streamingPull.maxMsPerAckExtension: Die maximale Zeit in Millisekunden, um die der Connector die Frist für das Abo pro Verlängerung verlängert.
    • cps.streamingPull.parallelStreams: Die Anzahl der Streams, aus denen Nachrichten aus dem Abo abgerufen werden sollen.

Pub/Sub-Endpunkt

Standardmäßig verwendet der Connector den globalen Pub/Sub-Endpunkt. Wenn Sie einen Endpunkt angeben möchten, legen Sie die Eigenschaft cps.endpoint auf die Endpunktadresse fest. Weitere Informationen zu Endpunkten finden Sie unter Pub/Sub-Endpunkte.

Kafka-Partitionen

Standardmäßig schreibt der Connector in eine einzelne Partition im Thema. Wenn Sie angeben möchten, in wie viele Partitionen der Connector schreiben soll, legen Sie die Eigenschaft kafka.partition.count fest. Der Wert darf die Anzahl der Partitionen des Themas nicht überschreiten.

Wenn Sie angeben möchten, wie der Connector Nachrichten Partitionen zuweist, legen Sie die Eigenschaft kafka.partition.scheme fest. Weitere Informationen finden Sie unter Konfigurationen für Pub/Sub-Quell-Connectors.

Konverter

Legen Sie den Schlüsselkonverter auf org.apache.kafka.connect.storage.StringConverter fest.

Legen Sie je nach Connectorkonfiguration den Wertkonverter auf einen der folgenden Werte fest:

  • org.apache.kafka.connect.converters.ByteArrayConverter
  • org.apache.kafka.connect.json.JsonConverter

Weitere Informationen finden Sie unter Datensatzwert.

Nachrichtenkonvertierung

Der Pub/Sub-Quell-Connector konvertiert Pub/Sub-Nachrichten in Kafka-Datensätze. In den folgenden Abschnitten wird der Konvertierungsprozess beschrieben.

Datensatzschlüssel

Der Schlüsselkonverter muss org.apache.kafka.connect.storage.StringConverter sein.

  • Standardmäßig sind Datensatzschlüssel null.

  • Wenn Sie ein Pub/Sub Nachrichtenattribut als Schlüssel verwenden möchten, legen Sie kafka.key.attribute auf den Namen des Attributs fest. Beispiel: kafka.key.attribute=username.

  • Wenn Sie den Pub/Sub Bestellschlüssel als Schlüssel verwenden möchten, legen Sie kafka.key.attribute=orderingKey fest.

Datensatzheader

Standardmäßig sind Datensatzheader leer.

Wenn kafka.record.headers auf true gesetzt ist, werden Pub/Sub-Nachrichtenattribute als Datensatzheader geschrieben. Wenn Sie den Bestellschlüssel einbeziehen möchten, legen Sie cps.makeOrderingKeyAttribute=true fest.

Datensatzwert

Datensatzwerte werden entweder als Byte-Arrays oder als struct-Typen geschrieben.

Datensatzwerte als Byte-Arrays

Wenn kafka.record.headers auf true gesetzt ist oder die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, schreibt der Connector die Nachrichtendaten als Byte-Array. Legen Sie den Wertkonverter auf org.apache.kafka.connect.converters.ByteArrayConverter fest.

Datensatzwerte als Structs

Wenn kafka.record.headers auf false gesetzt ist und die Nachricht mindestens ein benutzerdefiniertes Attribut hat, schreibt der Connector den Datensatzwert als struct. Legen Sie den Wertkonverter auf org.apache.kafka.connect.json.JsonConverter fest.

Das struct enthält die folgenden Felder:

  • message: Die Pub/Sub-Nachrichtendaten als Byte.

  • Ein Feld für jedes Pub/Sub-Nachrichtenattribut. Wenn Sie den Bestellschlüssel einbeziehen möchten, legen Sie cps.makeOrderingKeyAttribute=true fest.

Wenn die Nachricht beispielsweise ein username-Attribut hat, sieht der Datensatzwert so aus:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Wenn value.converter.schemas.enable auf true gesetzt ist, enthält das struct sowohl die Nutzlast als auch das Schema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder ihrer Tochtergesellschaften in den USA und/oder anderen Ländern.