Pub/Sub-Quellconnector erstellen

Pub/Sub-Quell-Connectors streamen Nachrichten von Pub/Sub zu Kafka. So können Sie Pub/Sub in Ihre Kafka-basierten Anwendungen und Datenpipelines einbinden.

Der Connector liest Nachrichten aus einem Pub/Sub-Abo, konvertiert jede Nachricht in einen Kafka-Datensatz und schreibt die Datensätze in ein Kafka-Thema. Standardmäßig erstellt der Connector Kafka-Datensätze so:

  • Der Kafka-Datensatzschlüssel ist null.
  • Der Kafka-Datensatzwert sind die Pub/Sub-Nachrichtendaten als Byte.
  • Die Kafka-Datensatzheader sind leer.

Sie können dieses Verhalten jedoch konfigurieren. Weitere Informationen finden Sie unter Connector konfigurieren.

Hinweise

Bevor Sie einen Pub/Sub-Quell-Connector erstellen, benötigen Sie Folgendes:

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) für das Projekt mit dem Connect-Cluster zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Pub/Sub-Quellconnectors benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Pub/Sub-Quellconnectors erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Pub/Sub-Quell-Connectors erforderlich:

  • Gewähren Sie die Berechtigung zum Erstellen eines Connectors für den übergeordneten Connect-Cluster: managedkafka.connectors.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle „Managed Kafka Connector Editor“ finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Wenn sich Ihr Managed Service for Apache Kafka-Cluster im selben Projekt wie der Connect-Cluster befindet, sind keine weiteren Berechtigungen erforderlich. Wenn sich der Connect-Cluster in einem anderen Projekt befindet, lesen Sie den Abschnitt Connect-Cluster in einem anderen Projekt erstellen.

Berechtigungen zum Lesen aus Pub/Sub gewähren

Das Managed Kafka-Dienstkonto muss berechtigt sein, Nachrichten aus dem Pub/Sub-Abo zu lesen. Weisen Sie dem Dienstkonto im Projekt mit dem Pub/Sub-Abo die folgenden IAM-Rollen zu:

  • Pub/Sub-Abonnent (roles/pubsub.subscriber)
  • Pub/Sub-Betrachter (roles/pubsub.viewer)

Das Managed Kafka-Dienstkonto hat das folgende Format: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com. Ersetzen Sie PROJECT_NUMBER durch die Projektnummer.

Pub/Sub-Quell-Connector erstellen

Console

  1. Rufen Sie in der Google Cloud Console die Seite Connect Clusters auf.

    Zu „Cluster verbinden“

  2. Klicken Sie auf den Connect-Cluster, in dem Sie den Connector erstellen möchten.

  3. Klicken Sie auf Connector erstellen.

  4. Geben Sie für den Connectornamen einen String ein.

    Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka.

  5. Wählen Sie als Connector-Plug-in die Option Pub/Sub-Quelle aus.

  6. Wählen Sie in der Liste Cloud Pub/Sub-Abo ein Pub/Sub-Abo aus. Der Connector ruft Nachrichten aus diesem Abo ab. Das Abo wird als vollständiger Ressourcenname angezeigt: projects/{project}/subscriptions/{subscription}.

  7. Wählen Sie in der Liste Kafka-Thema das Kafka-Thema aus, in das Nachrichten geschrieben werden.

  8. Optional: Fügen Sie im Feld Konfigurationen Konfigurationseigenschaften hinzu oder bearbeiten Sie die Standardeigenschaften. Weitere Informationen finden Sie unter Connector konfigurieren.

  9. Wählen Sie die Richtlinie für Task-Neustart aus. Weitere Informationen finden Sie unter Richtlinie zum Neustart von Aufgaben.

  10. Klicken Sie auf Erstellen.

gcloud

  1. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka connectors create aus:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Ersetzen Sie Folgendes:

    • CONNECTOR_ID: Die ID oder der Name des Connectors. Tipps zum Benennen von Connectors finden Sie in den Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Connectors ist unveränderlich.

    • LOCATION: Der Standort des Connect-Clusters.

    • CONNECT_CLUSTER_ID: Die ID des Connect-Clusters, in dem der Connector erstellt wird.

    • CONFIG_FILE: Der Pfad zu einer YAML- oder JSON-Konfigurationsdatei.

Hier sehen Sie ein Beispiel für eine Konfigurationsdatei:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Ersetzen Sie Folgendes:

  • PROJECT_ID: Die ID des Google Cloud-Projekts, in dem sich das Pub/Sub-Abo befindet.

  • PUBSUB_SUBSCRIPTION_ID: Die ID des Pub/Sub-Abos, aus dem Daten abgerufen werden sollen.

  • KAFKA_TOPIC_ID: Die ID des Kafka-Themas, in das Daten geschrieben werden.

Die Konfigurationseigenschaften cps.project, cps.subscription und kafka.topic sind erforderlich. Weitere Konfigurationsoptionen finden Sie unter Connector konfigurieren.

Terraform

Sie können eine Terraform-Ressource verwenden, um einen Connector zu erstellen.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

Go

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Nachdem Sie einen Connector erstellt haben, können Sie ihn bearbeiten, löschen, pausieren, beenden oder neu starten.

Connector konfigurieren

In diesem Abschnitt werden einige Konfigurationseigenschaften beschrieben, die Sie für den Connector festlegen können.

Eine vollständige Liste der Eigenschaften, die für diesen Connector spezifisch sind, finden Sie unter Pub/Sub-Quellconnector-Konfigurationen.

Pull-Modus

Der Pull-Modus gibt an, wie der Connector Pub/Sub-Nachrichten abruft. Die folgenden Modi werden unterstützt:

  • Pull-Modus (Standard): Nachrichten werden in Batches abgerufen. Um diesen Modus zu aktivieren, legen Sie cps.streamingPull.enabled=false. fest. Um die Batchgröße zu konfigurieren, legen Sie die Eigenschaft cps.maxBatchSize fest.

    Weitere Informationen zum Pull-Modus finden Sie unter Pull API.

  • Streaming-Pull-Modus. Ermöglicht den maximalen Durchsatz und die niedrigste Latenz beim Abrufen von Nachrichten aus Pub/Sub. Setzen Sie cps.streamingPull.enabled=true, um diesen Modus zu aktivieren.

    Weitere Informationen zum Streaming-Pull-Modus finden Sie unter StreamingPull API.

    Wenn das Streaming von Pull-Vorgängen aktiviert ist, können Sie die Leistung durch Festlegen der folgenden Konfigurationseigenschaften optimieren:

    • cps.streamingPull.flowControlBytes: Die maximale Anzahl ausstehender Message-Bytes pro Aufgabe.
    • cps.streamingPull.flowControlMessages: Die maximale Anzahl ausstehender Nachrichten pro Aufgabe.
    • cps.streamingPull.maxAckExtensionMs: Die maximale Zeit, um die der Connector die Frist für das Abonnieren verlängert, in Millisekunden.
    • cps.streamingPull.maxMsPerAckExtension: Die maximale Zeit, um die der Connector die Anmeldefrist pro Verlängerung verlängert, in Millisekunden.
    • cps.streamingPull.parallelStreams: Die Anzahl der Streams, aus denen Nachrichten aus dem Abo abgerufen werden sollen.

Pub/Sub-Endpunkt

Standardmäßig verwendet der Connector den globalen Pub/Sub-Endpunkt. Um einen Endpunkt anzugeben, legen Sie die Eigenschaft cps.endpoint auf die Endpunktadresse fest. Weitere Informationen zu Endpunkten finden Sie unter Pub/Sub-Endpunkte.

Kafka-Datensätze

Der Quell-Connector von Pub/Sub konvertiert Pub/Sub-Nachrichten in Kafka-Datensätze. In den folgenden Abschnitten wird der Konvertierungsprozess beschrieben.

Eintragsschlüssel

Der Schlüsselkonverter muss org.apache.kafka.connect.storage.StringConverter sein.

  • Standardmäßig sind Datensatzschlüssel null.

  • Wenn Sie ein Pub/Sub-Nachrichtenattribut als Schlüssel verwenden möchten, legen Sie kafka.key.attribute auf den Namen des Attributs fest. Beispiel: kafka.key.attribute=username.

  • Wenn Sie den Pub/Sub-Bestellschlüssel als Schlüssel verwenden möchten, legen Sie kafka.key.attribute=orderingKey fest.

Datensatz-Header

Standardmäßig sind die Datensatzüberschriften leer.

Wenn kafka.record.headers gleich true ist, werden Pub/Sub-Nachrichtenattribute als Datensatzheader geschrieben. Legen Sie cps.makeOrderingKeyAttribute=true fest, um den Reihenfolgeschlüssel einzuschließen.

Datensatzwert

Wenn kafka.record.headers gleich true ist oder die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, ist der Datensatzwert die Nachrichtendaten als Byte-Array. Legen Sie den Wertkonverter auf org.apache.kafka.connect.converters.ByteArrayConverter fest.

Andernfalls, wenn kafka.record.headers gleich false ist und die Nachricht mindestens ein benutzerdefiniertes Attribut hat, schreibt der Connector den Datensatzwert als struct. Legen Sie den Wertkonverter auf org.apache.kafka.connect.json.JsonConverter fest.

struct enthält die folgenden Felder:

  • message: Die Pub/Sub-Nachrichtendaten als Byte.

  • Ein Feld für jedes Pub/Sub-Nachrichtenattribut. Legen Sie cps.makeOrderingKeyAttribute=true fest, um den Sortierschlüssel einzuschließen.

Angenommen, die Nachricht hat das Attribut username:

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Wenn value.converter.schemas.enable true ist, enthält struct sowohl die Nutzlast als auch das Schema:

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Kafka-Partitionen

Standardmäßig schreibt der Connector in eine einzelne Partition im Thema. Wenn Sie angeben möchten, in wie viele Partitionen der Connector schreibt, legen Sie die Property kafka.partition.count fest. Der Wert darf die Anzahl der Partitionen des Themas nicht überschreiten.

Mit der Eigenschaft kafka.partition.scheme können Sie festlegen, wie der Connector Nachrichten Partitionen zuweist. Weitere Informationen finden Sie unter Pub/Sub-Quellconnector-Konfigurationen.

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.