Develop a Python producer application
Learn how to develop a Python producer application that authenticates with a Managed Service for Apache Kafka cluster by using Application Default Credentials (ADC). ADC lets applications running on Google Cloud automatically find and use the right credentials for authenticating to Google Cloud services.
Before you start this tutorial, follow the steps in Create a cluster in Managed Service for Apache Kafka.
Before you begin
Before you start this tutorial, create a new cluster by following the steps in Create a cluster in Managed Service for Apache Kafka.
If you already have a Managed Service for Apache Kafka cluster, you can skip this step.
Set up a client VM
A producer application must run on a machine with network access to the cluster. We use a Compute Engine virtual machine instance (VM). This VM must be in the same region as the Kafka cluster. It must also be in the VPC containing the subnet that you've used in the cluster configuration.
- To create the client VM, run the following command: - gcloud compute instances create test-instance \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/default \ --zone=REGION-c
- Give the Compute Engine default service account the necessary permissions to connect to the cluster and authenticate. You need to grant the Managed Kafka Client role ( - roles/managedkafka.client), the Service Account Token Creator role (- roles/iam.serviceAccountTokenCreator), and the Service Account OpenID Token Creator role (- roles/iam.serviceAccountOpenIdTokenCreator).- gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/managedkafka.client gcloud projects add-iam-policy-binding PROJECT_ID\ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountTokenCreator gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \ --role=roles/iam.serviceAccountOpenIdTokenCreator- Replace PROJECT_NUMBER with the number of the project containing the cluster. You can look up this number using - gcloud projects describe PROJECT_ID.
Create a Python producer application
- Connect to the client VM using SSH. One way to do this is to run the following command: - gcloud compute ssh --project=PROJECT_ID \ --zone=REGION-f test-instance- Replace - PROJECT_IDwith your Google Cloud project name.- For more information about connecting using SSH, see About SSH connections. 
- Install pip, a Python package manager and the virtual environment manager: - sudo apt install python3-pip -y sudo apt install python3-venv -y
- Create a new virtual environment (venv) and activate it: - python3 -m venv kafka source kafka/bin/activate
- Install the - confluent-kafkaclient and other dependencies:- pip install confluent-kafka google-auth urllib3 packaging
- Copy the following producer client code into a file called - producer.py- import confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()
- You now need an implementation of the OAuth token provider. Save the following code in a file called - tokenprovider.py:- import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_seconds
- You are now ready to run the application: - python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
To delete the cluster, run the 
gcloud managed-kafka clusters delete command:
gcloud managed-kafka clusters delete CLUSTER_ID --location=REGION
To delete the VM, run the 
gcloud compute instances delete command:
gcloud instances delete test-instance \
  --zone=REGION-c
What's next
- Authenticate Kafka clients with Managed Service for Apache Kafka. 
- Authentication tools and documentation Managed Service for Apache Kafka