Pub/Sub Source connectors stream messages from Pub/Sub to Kafka. This lets you integrate Pub/Sub with your Kafka-based applications and data pipelines.
The connector reads messages from a Pub/Sub subscription, converts each message into a Kafka record, and writes the records to a Kafka topic. By default, the connector creates Kafka records as follows:
- The Kafka record key is
null. - The Kafka record value is the Pub/Sub message body as bytes.
- The Kafka record headers are empty.
However, you can configure this behavior. For more information, see Configure the connector.
Before you begin
Before creating a Pub/Sub Source connector, ensure you have the following:
A Pub/Sub topic with a subscription.
A Kafka topic within the Kafka cluster.
A Connect cluster. When you create the Connect cluster, set the Managed Service for Apache Kafka cluster as the primary Kafka cluster.
Required roles and permissions
To get the permissions that
you need to create a Pub/Sub Source connector,
ask your administrator to grant you the
Managed Kafka Connector Editor (roles/managedkafka.connectorEditor)
IAM role on the project containing the Connect cluster.
For more information about granting roles, see Manage access to projects, folders, and organizations.
This predefined role contains the permissions required to create a Pub/Sub Source connector. To see the exact permissions that are required, expand the Required permissions section:
Required permissions
The following permissions are required to create a Pub/Sub Source connector:
-
Grant the create a connector permission on the parent Connect cluster:
managedkafka.connectors.create
You might also be able to get these permissions with custom roles or other predefined roles.
For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.
If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the Connect cluster is in a different project, refer to Create a Connect Cluster in a different project.
Grant permissions to read from Pub/Sub
The Managed Kafka service account must have permission to read messages from the Pub/Sub subscription. Grant the following IAM roles to the service account on the project containing the Pub/Sub subscription:
- Pub/Sub Subscriber (
roles/pubsub.subscriber) - Pub/Sub Viewer (
roles/pubsub.viewer)
The Managed Kafka service account has the following format:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com.
Replace PROJECT_NUMBER with the project number.
Create a Pub/Sub Source connector
Console
In the Google Cloud console, go to the Connect Clusters page.
Click the Connect cluster where you want to create the connector.
Click Create connector.
For the connector name, enter a string.
For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource.
For Connector plugin, select Pub/Sub Source.
In the Cloud Pub/Sub subscription list, select a Pub/Sub subscription. The connector pulls messages from this subscription. The subscription is displayed as a full resource name:
projects/{project}/subscriptions/{subscription}.In the Kafka topic list, select the Kafka topic where messages are written.
Optional: In the Configurations box, add configuration properties or edit the default properties. For more information, see Configure the connector.
Select the Task restart policy. For more information, see Task restart policy.
Click Create.
gcloud
Run the
gcloud managed-kafka connectors createcommand:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEReplace the following:
CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.
LOCATION: The location of the Connect cluster.
CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.
CONFIG_FILE: The path to a YAML or JSON configuration file.
Here is an example of a configuration file:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Replace the following:
PROJECT_ID: The ID of the Google Cloud project where the Pub/Sub subscription resides.
PUBSUB_SUBSCRIPTION_ID: The ID of the Pub/Sub subscription to pull data from.
KAFKA_TOPIC_ID: The ID of the Kafka topic where data is written.
The cps.project, cps.subscription, and kafka.topic configuration
properties are required. For additional configuration options, see
Configure the connector.
Terraform
You can use a Terraform resource to create a connector.
To learn how to apply or remove a Terraform configuration, see Basic Terraform commands.
Go
Before trying this sample, follow the Go setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Go API reference documentation.
To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials(ADC). For more information, see Set up ADC for a local development environment.
Java
Before trying this sample, follow the Java setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Java API reference documentation.
To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.
Python
Before trying this sample, follow the Python setup instructions in Install the client libraries. For more information, see the Managed Service for Apache Kafka Python API reference documentation.
To authenticate to Managed Service for Apache Kafka, set up Application Default Credentials. For more information, see Set up ADC for a local development environment.
After you create a connector, you can edit, delete, pause, stop, or restart the connector.
Configure the connector
This section describes some configuration properties that you can set on the connector.
For a complete list of the properties that are specific to this connector, see Pub/Sub Source connector configs.
Pull mode
The pull mode specifies how the connector retrieves Pub/Sub messages. The following modes are supported:
Pull mode (default). Messages are pulled in batches. To enable this mode, set
cps.streamingPull.enabled=false.To configure the batch size, set thecps.maxBatchSizeproperty.For more information about pull mode, see Pull API.
Streaming Pull mode. Enables the maximum throughput and lowest latency when retrieving messages from Pub/Sub. To enable this mode, set
cps.streamingPull.enabled=true.For more information about streaming pull mode, see StreamingPull API.
If streaming pull is enabled, you can tune the performance by setting the following configuration properties:
cps.streamingPull.flowControlBytes: The maximum number of outstanding message bytes per task.cps.streamingPull.flowControlMessages: The maximum number of outstanding messages per task.cps.streamingPull.maxAckExtensionMs: The maximum amount of time the connector extends the subscribe deadline, in milliseconds.cps.streamingPull.maxMsPerAckExtension: The maximum amount of time the connector extends the subscribe deadline per extension, in milliseconds.cps.streamingPull.parallelStreams: The number of streams to pull messages from the subscription.
Pub/Sub endpoint
By default, the connector uses the global Pub/Sub endpoint. To
specify an endpoint, set the cps.endpoint property to the endpoint address.
For more information about endpoints, see
Pub/Sub endpoints.
Kafka record key
The connector sets the Kafka record keys as follows:
By default, record keys are
null.To use a Pub/Sub message attribute as the key, set
kafka.key.attributeto the name of the attribute. For example,kafka.key.attribute=username.To use the Pub/Sub ordering key as the key, set
kafka.key.attribute=orderingKey.
The key converter must be org.apache.kafka.connect.storage.StringConverter.
Kafka record headers
By default, record headers are empty.
To include Pub/Sub message attributes as headers, set
kafka.record.headers=true. To include the ordering key as an attribute, also
set cps.makeOrderingKeyAttribute=true.
Kafka record value
If kafka.record.headers=false and the Pub/Sub message has
one or more custom attributes, the connector writes the record value as a
struct with the following fields:
A field named
"message"whose value is the Pub/Sub message body, stored as bytes.One field for each Pub/Sub message attribute. To include the ordering key as an attribute, set
cps.makeOrderingKeyAttribute=true.
Set the value converter to org.apache.kafka.connect.json.JsonConverter to
convert the struct value.
Otherwise, if kafka.record.headers=true or the message has no custom
attributes, the connector writes the message data directly as a byte array.
Kafka partitions
By default, the connector writes to a single partition in the topic. To specify
how many partitions the connector writes to, set the kafka.partition.count
property. The value must not exceed the topic's
partition count.
To specify how the connector assigns messages to partitions, set the
kafka.partition.scheme property. For more information, see
Pub/Sub Source connector configs.