Produce and consume messages with the Kafka command-line tools
Learn how to use the Kafka command-line tools to connect to a Managed Service for Apache Kafka cluster, produce messages, and consume messages.
Before you begin
Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka.
If you already have a Managed Service for Apache Kafka cluster, you can skip this step.
Create a client VM
Set up a client on a Compute Engine instance that can access the VPC containing the subnet where the Kafka cluster is reachable.
Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example, the following command creates a Compute Engine instance called
test-instance:gcloud compute instances create test-instance \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/default \ --zone=REGION-cFor more information about creating a VM, see Create a VM instance in a specific subnet.
Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role (
roles/managedkafka.client), the Service Account Token Creator role (roles/iam.serviceAccountTokenCreator), and the Service Account OpenID Token Creator role (roles/iam.serviceAccountOpenIdTokenCreator).gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/managedkafka.client gcloud projects add-iam-policy-binding PROJECT_ID\ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountTokenCreator gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountOpenIdTokenCreatorReplace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using
gcloud projects describe PROJECT_ID. For more information, see Find the project name, number, and ID.
Connect to the VM
Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:
gcloud compute ssh test-instance --project=PROJECT_ID --zone=REGION-c
Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see About SSH connections.
Install the Kafka command-line tools
Install Java to run Kafka command-line tools and
wgetto help download dependencies. The following commands assume you are using a Debian Linux environment.sudo apt-get install default-jre wgetInstall the Kafka command-line tools on the VM.
wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz tar xfz kafka_2.13-3.7.2.tgzSet the following environment variables:
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2 export PATH=$PATH:$KAFKA_HOME/bin export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
Set up authentication
Set up the Managed Service for Apache Kafka authentication library.
Download the dependencies and install them locally. Since the Kafka command-line tools look for Java dependencies in the
libdirectory of the Kafka installation directory, we add these dependencies there.wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip sudo apt-get install unzip unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/Set up the client machine configuration properties.
cat <<EOF> client.properties security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; EOFThis code configures a Kafka client for the following settings:
Use SASL_SSL for secure communication with the Kafka cluster.
Employ OAuth 2.0 bearer tokens for authentication.
Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.
Send and consume messages
Run these commands on the client machine.
Set up the project-ID address as an environment variable.
export PROJECT_ID=PROJECT_ID export CLUSTER_ID=CLUSTER_IDReplace the following:
PROJECT_IDwith the name of the project.CLUSTER_IDwith the name of the new cluster.
Set the bootstrap address as an environment variable.
export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
For more information, see Get the bootstrap address.
List the topics in the cluster.
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.propertiesWrite a message to a topic.
echo "hello world" | kafka-console-producer.sh --topic KAFKA_TOPIC_NAME \ --bootstrap-server $BOOTSTRAP --producer.config client.propertiesReplace KAFKA_TOPIC_NAME with the name of the topic.
Consume message from the topic.
kafka-console-consumer.sh --topic KAFKA_TOPIC_NAME --from-beginning \ --bootstrap-server $BOOTSTRAP --consumer.config client.propertiesTo stop consuming messages, enter Ctrl+C.
Run a producer performance test.
kafka-producer-perf-test.sh --topic KAFKA_TOPIC_NAME --num-records 1000000 \ --throughput -1 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
To delete the cluster, run the
gcloud managed-kafka clusters deletecommand:gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION