Create a Pub/Sub Source connector

Pub/Sub Source connectors stream messages from Pub/Sub to Kafka. This lets you integrate Pub/Sub with your Kafka-based applications and data pipelines.

The connector reads messages from a Pub/Sub subscription, converts each message into a Kafka record, and writes the records to a Kafka topic. By default, the connector creates Kafka records as follows:

  • The Kafka record key is null.
  • The Kafka record value is the Pub/Sub message body as bytes.
  • The Kafka record headers are empty.

However, you can configure this behavior. For more information, see Configure the connector.

Before you begin

Before creating a Pub/Sub Source connector, ensure you have the following:

Required roles and permissions

To get the permissions that you need to create a Pub/Sub Source connector, ask your administrator to grant you the Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM role on the project containing the Connect cluster. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create a Pub/Sub Source connector. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a Pub/Sub Source connector:

  • Grant the create a connector permission on the parent Connect cluster: managedkafka.connectors.create

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.

If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the Connect cluster is in a different project, refer to Create a Connect Cluster in a different project.

Grant permissions to read from Pub/Sub

The Managed Kafka service account must have permission to read messages from the Pub/Sub subscription. Grant the following IAM roles to the service account on the project containing the Pub/Sub subscription:

  • Pub/Sub Subscriber (roles/pubsub.subscriber)
  • Pub/Sub Viewer (roles/pubsub.viewer)

The Managed Kafka service account has the following format: service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com. Replace PROJECT_NUMBER with the project number.

Create a Pub/Sub Source connector

Console

  1. In the Google Cloud console, go to the Connect Clusters page.

    Go to Connect Clusters

  2. Click the Connect cluster where you want to create the connector.

  3. Click Create connector.

  4. For the connector name, enter a string.

    For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource.

  5. For Connector plugin, select Pub/Sub Source.

  6. In the Cloud Pub/Sub subscription list, select a Pub/Sub subscription. The connector pulls messages from this subscription. The subscription is displayed as a full resource name: projects/{project}/subscriptions/{subscription}.

  7. In the Kafka topic list, select the Kafka topic where messages are written.

  8. Optional: In the Configurations box, add configuration properties or edit the default properties. For more information, see Configure the connector.

  9. Select the Task restart policy. For more information, see Task restart policy.

  10. Click Create.

gcloud

  1. Run the gcloud managed-kafka connectors create command:

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Replace the following:

    • CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

    • LOCATION: The location of the Connect cluster.

    • CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.

    • CONFIG_FILE: The path to a YAML or JSON configuration file.

Here is an example of a configuration file:

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Replace the following:

  • PROJECT_ID: The ID of the Google Cloud project where the Pub/Sub subscription resides.

  • PUBSUB_SUBSCRIPTION_ID: The ID of the Pub/Sub subscription to pull data from.

  • KAFKA_TOPIC_ID: The ID of the Kafka topic where data is written.

The cps.project, cps.subscription, and kafka.topic configuration properties are required. For additional configuration options, see Configure the connector.

Terraform

You can use a Terraform resource to create a connector.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

To learn how to apply or remove a Terraform configuration, see Basic Terraform commands.

Go

Before trying this sample, follow the Go setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Before trying this sample, follow the Python setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.

To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

After you create a connector, you can edit, delete, pause, stop, or restart the connector.

Configure the connector

This section describes some configuration properties that you can set on the connector.

For a complete list of the properties that are specific to this connector, see Pub/Sub Source connector configs.

Pull mode

The pull mode specifies how the connector retrieves Pub/Sub messages. The following modes are supported:

  • Pull mode (default). Messages are pulled in batches. To enable this mode, set cps.streamingPull.enabled=false. To configure the batch size, set the cps.maxBatchSize property.

    For more information about pull mode, see Pull API.

  • Streaming Pull mode. Enables the maximum throughput and lowest latency when retrieving messages from Pub/Sub. To enable this mode, set cps.streamingPull.enabled=true.

    For more information about streaming pull mode, see StreamingPull API.

    If streaming pull is enabled, you can tune the performance by setting the following configuration properties:

    • cps.streamingPull.flowControlBytes: The maximum number of outstanding message bytes per task.
    • cps.streamingPull.flowControlMessages: The maximum number of outstanding messages per task.
    • cps.streamingPull.maxAckExtensionMs: The maximum amount of time the connector extends the subscribe deadline, in milliseconds.
    • cps.streamingPull.maxMsPerAckExtension: The maximum amount of time the connector extends the subscribe deadline per extension, in milliseconds.
    • cps.streamingPull.parallelStreams: The number of streams to pull messages from the subscription.

Pub/Sub endpoint

By default, the connector uses the global Pub/Sub endpoint. To specify an endpoint, set the cps.endpoint property to the endpoint address. For more information about endpoints, see Pub/Sub endpoints.

Kafka record key

The connector sets the Kafka record keys as follows:

  • By default, record keys are null.

  • To use a Pub/Sub message attribute as the key, set kafka.key.attribute to the name of the attribute. For example, kafka.key.attribute=username.

  • To use the Pub/Sub ordering key as the key, set kafka.key.attribute=orderingKey.

The key converter must be org.apache.kafka.connect.storage.StringConverter.

Kafka record headers

By default, record headers are empty.

To include Pub/Sub message attributes as headers, set kafka.record.headers=true. To include the ordering key as an attribute, also set cps.makeOrderingKeyAttribute=true.

Kafka record value

If kafka.record.headers=false and the Pub/Sub message has one or more custom attributes, the connector writes the record value as a struct with the following fields:

  • A field named "message" whose value is the Pub/Sub message body, stored as bytes.

  • One field for each Pub/Sub message attribute. To include the ordering key as an attribute, set cps.makeOrderingKeyAttribute=true.

Set the value converter to org.apache.kafka.connect.json.JsonConverter to convert the struct value.

Otherwise, if kafka.record.headers=true or the message has no custom attributes, the connector writes the message data directly as a byte array.

Kafka partitions

By default, the connector writes to a single partition in the topic. To specify how many partitions the connector writes to, set the kafka.partition.count property. The value must not exceed the topic's partition count.

To specify how the connector assigns messages to partitions, set the kafka.partition.scheme property. For more information, see Pub/Sub Source connector configs.

What's next?

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.