Clients can connect to Managed Service for Apache Kafka clusters by using the open source Apache Kafka API. All connections must be encrypted using TLS; plaintext communication is not supported. Authentication is handled through one of two supported mechanisms, each with a different credential type: SASL or mTLS.
This document describes how to authenticate using the SASL method. Clients authenticate using the credentials of an authorized Identity and Access Management principal, such as a service account. Managed Service for Apache Kafka manages the server-side broker certificates for all connections.
Before you begin
Learn more about the following:
Grant the Managed Kafka client role to the service account
You must grant the roles/managedkafka.client
IAM role on the project containing the cluster to the service
account that you are going to use to connect to the cluster.
The Managed Kafka client role includes the permission
managedkafka.clusters.connect required for all connections.
To grant the Managed Kafka client role to the service account,
follow these steps:
Console
- In the Google Cloud console, go to the IAM page.
 Go to IAM
- Check that the project is set to the consumer project that the Managed Service for Apache Kafka client would be accessing.
- Click Grant access.
- In the new page, for Add Principals, enter the email address of the service account that you are using.
- For Assign roles, select the Managed Kafka client role.
- Click Save.
gcloud CLI
- 
    
    In the Google Cloud console, activate Cloud Shell. At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize. 
- 
        Run the gcloud projects add-iam-policy-bindingcommand:gcloud projects add-iam-policy-binding PROJECT_ID \ --member serviceAccount:SERVICE_ACCOUNT_EMAIL \ --role roles/managedkafka.client Replace the following: - 
                PROJECT_ID is the project ID. 
- 
                SERVICE_ACCOUNT_EMAIL is the email address of the service account. 
 
- 
                
Configure the Kafka client for authenticating to Google Cloud
You can authenticate Kafka clients to Google Cloud by using one of the following mechanisms:
OAUTHBEARER (Recommended): This mechanism requires using Application Default Credentials (ADC). ADC is a strategy used by the authentication libraries to automatically find credentials based on the application environment. For more information about where ADC looks for credentials and in what order, see How Application Default Credentials works.
SASL/PLAIN: This mechanism requires using a username and password that can be derived from a service account key JSON file, or an access token.
In general, OAUTHBEARER is the recommended option. However, SASL/PLAIN might be a more convenient mechanism for testing.
OAuthBearer authentication
For information about how to authenticate to the open source Kafka API, see the documentation on GitHub.
SASL/PLAIN authentication
Managed Service for Apache Kafka supports SASL/PLAIN authentication with either a service account key JSON file, or an access token.
Service account key JSON file
This method is applicable to all Kafka clients.
- Download a service account key JSON file for the service account that you intend to use for your client. 
- Encode the service account file by using base64-encode to use as your authentication string. Assume the filename as - my_service_account_key.json.- On Linux or macOS systems, use the - base64command (often installed by default) as follows:- base64 -w 0 < my_service_account_key.json > password.txt- This command performs the following actions: - base64 < my_service_account_key.json: Reads the contents of the file named- my_service_account_key.json.
- Encodes the file's contents using base64 encoding. Base64 encoding is a way to represent binary data (such as JSON data in your service account file) as ASCII text. This is often used for transmitting data over channels that are designed for text. 
- > password.txt: Redirects the output of the- base64command (the base64 encoded version of your service account file) into a new file named- password.txt.
 
- You can use the contents of the password file for authentication with the following parameters. - security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="SERVICE_ACCOUNT_EMAIL_ADDRESS" \ password="CONTENTS_OF_BASE64_ENCODED_PASSWORD_FILE";- Replace the following: - SERVICE_ACCOUNT_EMAIL_ADDRESS: The email address of the service account that you want to use for authentication.
- CONTENTS_OF_BASE64_ENCODED_PASSWORD_FILE: The contents of the base64-encoded password file that you obtained in the previous step. This must be a single line.
 
When authenticating incoming connections to the cluster, Managed Service for Apache Kafka checks the following:
- The provided username matches the service account whose key is used in the password. 
- The provided service account principal has the permission - managedkafka.clusters.connect(included in the- roles/managedkafka.clientIAM role) on the cluster.
Access token
- Get an access token for the principal that you want to use for authentication. For example, get an access token for the current gcloud CLI principal: - gcloud auth print-access-token
- You can use the access token for authentication with the following parameters. - security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="PRINCIPAL_EMAIL_ADDRESS" \ password="ACCESS_TOKEN_VALUE";- Replace the following: - PRINCIPAL_EMAIL_ADDRESS: The email address of the principal that you used to obtain the access token.
- ACCESS_TOKEN_VALUE: The access token value that you obtained in the previous step.
 
When authenticating incoming connections to the cluster, Managed Service for Apache Kafka checks the following:
- The access token is valid and has not expired. 
- The provided username matches the principal email that the access token is associated with. 
- The access token's principal has the permission - managedkafka.clusters.connect(included in the- roles/managedkafka.clientIAM role) on the cluster.
Workload Identity Federation for GKE
Managed Service for Apache Kafka supports authentication to the open source Apache Kafka API using Workload Identity Federation for GKE. Authentication is supported for both SASL/PLAIN and SASL/OAUTHBEARER.
To use Workload Identity Federation for GKE with Managed Service for Apache Kafka you must comply with the following requirements:
- Use GKE version 1.31.1-gke.1241000or higher.
- Annotate your Kubernetes service account with - iam.gke.io/return-principal-id-as-email: "true". For example:- apiVersion: v1 kind: ServiceAccount metadata: name: kafka-service-account annotations: iam.gke.io/return-principal-id-as-email: "true"
- If you use the local auth server, check that you also use version - 2.40.3or higher of the google-auth package.
Verify the GKE principal has the permission
managedkafka.clusters.connect (included in the roles/managedkafka.client
IAM role).
Managed Service for Apache Kafka does not support authentication to the open source Apache Kafka API using Fleet workload identity. As an alternative, you can link your Kubernetes service account to an IAM service account.
Troubleshoot authentication errors
If a Managed Service for Apache Kafka client cannot authenticate to Managed Service for Apache Kafka, you see an error message similar to the following:
Exception in thread "main" java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.SaslAuthenticationException:
Authentication failed: Invalid username or password
To resolve the issue, check for the following causes:
- The password is malformed, and doesn't represent a valid service account key JSON blob when base64 decoded, or a valid access token. 
- The authenticating principal doesn't have the - managedkafka.clusters.connectpermission on the cluster.
- The provided username doesn't match the credential's principal. 
If a client experiences frequent disconnections every 30 minutes, this can be
due to the client not supporting periodic re-authentication.
Managed Service for Apache Kafka brokers require clients to re-authenticate every
30 minutes, enforced by the connections.max.reauth.max broker property.
Verify that your Kafka client library version is 2.2.0 or later, and supports
re-authentication.