Produce and consume messages with the Kafka command-line tools

Learn how to use the Kafka command-line tools to connect to a Managed Service for Apache Kafka cluster, produce messages, and consume messages.

Before you begin

Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka.

If you already have a Managed Service for Apache Kafka cluster, you can skip this step.

Create a client VM

Set up a client on a Compute Engine instance that can access the VPC containing the subnet where the Kafka cluster is reachable.

  1. Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example, the following command creates a Compute Engine instance called test-instance:

    gcloud compute instances create test-instance \
        --scopes=https://www.googleapis.com/auth/cloud-platform \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/default \
        --zone=REGION-c
    

    For more information about creating a VM, see Create a VM instance in a specific subnet.

  2. Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role (roles/managedkafka.client), the Service Account Token Creator role (roles/iam.serviceAccountTokenCreator), and the Service Account OpenID Token Creator role (roles/iam.serviceAccountOpenIdTokenCreator).

    gcloud projects add-iam-policy-binding PROJECT_ID \
       --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
       --role=roles/managedkafka.client
    
    gcloud projects add-iam-policy-binding PROJECT_ID\
       --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
       --role=roles/iam.serviceAccountTokenCreator
    
    gcloud projects add-iam-policy-binding PROJECT_ID \
       --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
       --role=roles/iam.serviceAccountOpenIdTokenCreator
    

    Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using gcloud projects describe PROJECT_ID. For more information, see Find the project name, number, and ID.

Connect to the VM

Use SSH to connect to the VM that you just created in the previous step, for example, using Google Cloud CLI:

   gcloud compute ssh test-instance --project=PROJECT_ID --zone=REGION-c

Additional configuration might be required for first time SSH usage. For more information about connecting using SSH, see About SSH connections.

Install the Kafka command-line tools

  1. Install Java to run Kafka command-line tools and wget to help download dependencies. The following commands assume you are using a Debian Linux environment.

    sudo apt-get install default-jre wget
    
  2. Install the Kafka command-line tools on the VM.

    wget -O kafka_2.13-3.7.2.tgz https://dlcdn.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
    tar xfz kafka_2.13-3.7.2.tgz
    
  3. Set the following environment variables:

    export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
    export PATH=$PATH:$KAFKA_HOME/bin
    export CLASSPATH=$CLASSPATH:$KAFKA_HOME/libs/release-and-dependencies/*:$KAFKA_HOME/libs/release-and-dependencies/dependency/*
    

Set up authentication

Set up the Managed Service for Apache Kafka authentication library.

  1. Download the dependencies and install them locally. Since the Kafka command-line tools look for Java dependencies in the lib directory of the Kafka installation directory, we add these dependencies there.

    wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
    sudo apt-get install unzip
    unzip -n -j release-and-dependencies.zip -d $KAFKA_HOME/libs/
    
  2. Set up the client machine configuration properties.

    cat <<EOF> client.properties
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    EOF
    

    This code configures a Kafka client for the following settings:

  • Use SASL_SSL for secure communication with the Kafka cluster.

  • Employ OAuth 2.0 bearer tokens for authentication.

  • Use a Google Cloud-specific login callback handler to obtain OAuth 2.0 tokens.

Send and consume messages

Run these commands on the client machine.

  1. Set up the project-ID address as an environment variable.

    export PROJECT_ID=PROJECT_ID
    export CLUSTER_ID=CLUSTER_ID
    

    Replace the following:

    • PROJECT_ID with the name of the project.
    • CLUSTER_ID with the name of the new cluster.
  2. Set the bootstrap address as an environment variable.

    export BOOTSTRAP=bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092

    For more information, see Get the bootstrap address.

  3. List the topics in the cluster.

    kafka-topics.sh --list \
    --bootstrap-server $BOOTSTRAP \
    --command-config client.properties
    
  4. Write a message to a topic.

    echo "hello world" | kafka-console-producer.sh --topic KAFKA_TOPIC_NAME \
    --bootstrap-server $BOOTSTRAP --producer.config client.properties
    

    Replace KAFKA_TOPIC_NAME with the name of the topic.

  5. Consume message from the topic.

     kafka-console-consumer.sh --topic KAFKA_TOPIC_NAME --from-beginning \
     --bootstrap-server $BOOTSTRAP --consumer.config client.properties
    

    To stop consuming messages, enter Ctrl+C.

  6. Run a producer performance test.

    kafka-producer-perf-test.sh --topic KAFKA_TOPIC_NAME --num-records 1000000 \
    --throughput -1 --print-metrics --record-size 1024 \
    --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

  1. To delete the cluster, run the gcloud managed-kafka clusters delete command:

    gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION

What's next

Apache Kafka® is a registered trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.