您可以将 Managed Service for Apache Kafka 集群配置为使用双向 TLS (mTLS) 对 Kafka 客户端进行身份验证。此方法使用 Certificate Authority Service (CA Service) 中的客户端证书作为身份验证的基础。此选项可替代使用 Identity and Access Management (IAM) 主账号的默认 SASL 机制。
使用 mTLS 时,必须使用 Kafka ACL 配置授权。如需了解基本概念,请参阅以下文档:
准备工作
在配置 mTLS 身份验证之前,请完成以下操作:
确认集群是否符合条件。验证您是否已创建 Managed Service for Apache Kafka 集群(创建时间在 2025 年 6 月 24 日之后)。只有这些集群支持 mTLS 身份验证。如需验证集群的创建日期,请使用
gcloud managed-kafka clusters describe命令,或在控制台中查看集群的详情页面。配置 CA Service。设置您打算用于颁发客户端证书的 CA 服务和 CA 池。您必须在 CA 池中创建根证书和从属证书。
创建 CA 池。记下 CA 池 ID。
如需详细了解如何创建 CA 池,请参阅创建 CA 池。
为池创建并启用根 CA。
如需详细了解如何为池启用根 CA,请参阅创建根 CA。
创建并启用一个或多个从属 CA。我们建议使用从属 CA 颁发客户端证书,而不是直接使用根 CA。
如需详细了解如何创建从属 CA,请参阅创建从属证书授权机构。
所需的角色和权限
如需配置 mTLS,您需要确保您(用户)和 Managed Service for Apache Kafka 服务代理都拥有必要的 IAM 权限。无论您是创建新集群还是更新现有集群来配置 mTLS,这都适用。
用户权限
如需创建或配置 Managed Service for Apache Kafka 集群以使用 mTLS,您需要拥有创建或更新集群资源的权限。为此,请让管理员为您授予包含集群的项目的 Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) 角色。
此预定义角色包含 managedkafka.clusters.create 和 managedkafka.clusters.update 权限。借助这些权限,您可以创建新集群或修改现有集群,以添加 mTLS 所需的 CA Service (CA) 池配置。
您无需对 CA Service 资源拥有单独的权限,即可在 Kafka 集群上配置 mTLS,前提是您拥有 CA 池的完整资源路径。不过,如需在Google Cloud 控制台中查看、创建或管理 CA 池,您需要 CA Service 特有的其他角色,例如 CA Service Admin (roles/privateca.admin) 或 CA Service Operator (roles/privateca.operator)。
服务代理权限
为了使 mTLS 集成正常运行,Managed Service for Apache Kafka 服务代理需要获得访问指定 CA 池的权限。服务代理是 Google 为您的项目代管式服务账号。
如果您的 Managed Service for Apache Kafka 集群和 CA 池位于同一项目中,则服务代理默认具有必要的权限。系统会自动为项目中的服务代理授予
managedkafka.serviceAgent角色,该角色包含所需的privateca.caPools.get权限。如果您的 CA 池与 Managed Service for Apache Kafka 集群不在同一项目中,您必须手动向服务代理授予访问权限。 向包含 CA 池的项目中的服务代理授予 Private CA Pool Reader (
roles/privateca.poolReader) 角色。
所需权限摘要
如需查看所需的确切权限,请展开即可下部分。
向服务代理授予对 CA 池的访问权限
如果您的 CA Service CA 池和 Managed Service for Apache Kafka 集群位于不同的 Google Cloud 项目中,您必须向集群的服务代理授予访问 CA 池的权限。Managed Service for Apache Kafka 服务代理的名称为 service-CLUSTER_PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com。
在包含 CA 的单个池级别(推荐)或项目中的所有池级别,向 Managed Service for Apache Kafka 服务代理授予 CA Pool Reader (roles/privateca.poolReader) 角色。此角色可提供必要的 privateca.caPools.get 权限。
单个 CA 池
建议向单个 CA 池授予权限,因为这种做法遵循最小权限原则。
运行
gcloud privateca pools add-iam-policy-binding 命令:
gcloud privateca pools add-iam-policy-binding CA_POOL_ID \ --location=CA_POOL_LOCATION \ --member='serviceAccount:service-CLUSTER_PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com' \ --role='roles/privateca.poolReader'
替换以下内容:
-
CA_POOL_ID:您要授予访问权限的 CA 池的 ID。例如
test-mtls-pool1。 CA_POOL_LOCATION:CA 池所在的 Google Cloud 区域。例如
us-central1。-
CLUSTER_PROJECT_NUMBER:包含 Managed Service for Apache Kafka 集群的项目的项目编号。例如
12341234123。
所有 CA 池
或者,您也可以在项目级层设置政策,以授予服务代理访问项目内所有 CA 池的权限。
运行
gcloud projects add-iam-policy-binding 命令:
gcloud projects add-iam-policy-binding CA_PROJECT_ID \ --member='serviceAccount:service-CLUSTER_PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com' \ --role='roles/privateca.poolReader'
替换以下内容:
-
CA_PROJECT_ID:您要授予访问权限的 CA 池所在项目的 ID。例如
test-cas-project。 -
CLUSTER_PROJECT_NUMBER:包含 Managed Service for Apache Kafka 集群的项目的项目编号。例如
12341234123。
在集群上启用 mTLS
如需启用 mTLS,请为集群提供一个或多个 CA Service CA 池的资源名称,以用于客户端身份验证。您可以在创建新集群时执行此操作,也可以更新 2025 年 6 月 24 日之后创建的现有集群。
提供 CA 池标识符后,该服务会自动从指定的池下载 CA 证书,并将其安装到集群中每个代理的信任库中。
控制台
您可以在创建新集群期间启用 mTLS,也可以通过修改现有集群来启用 mTLS。
在新集群上
在 Google Cloud 控制台中,前往集群页面。
- 选择创建。
系统会打开创建 Kafka 集群页面。
- 按照 创建集群中的步骤操作。
- 在执行最后一步之前,找到可选的 mTLS 配置部分。
- 以
projects/PROJECT_ID/LOCATION/LOCATION/caPools/POOL_ID格式输入 CA 池的完整资源名称。 - 如需添加更多,请点击添加 CA 池。您最多可以添加 10 个 CA 池。
- (可选)输入任何主账号映射规则。
- 点击创建以创建已启用 mTLS 的集群。
在现有集群上
gcloud
在新集群上
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
运行带有
--mtls-ca-pools标志的gcloud managed-kafka clusters create命令。在此示例中,配置了两个 CA 池。gcloud managed-kafka clusters create CLUSTER_ID \ --location=LOCATION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/locations/LOCATION/subnetworks/SUBNET_ID \ --mtls-ca-pools=projects/PROJECT_ID/locations/LOCATION/caPools/POOL_ID_1,projects/PROJECT_ID/locations/LOCATION/caPools/POOL_ID_2
替换以下内容:
-
CLUSTER_ID:集群的 ID 或名称。
如需详细了解如何命名集群,请参阅 Managed Service for Apache Kafka 资源命名指南。 例如
test-mtls-cluster。 -
LOCATION:集群的位置。
如需详细了解支持的位置,请参阅 Managed Service for Apache Kafka 支持的位置。 例如
us-central1。 -
SUBNETS:要连接的子网列表。 使用英文逗号分隔多个子网值。
子网的格式为
projects/PROJECT_ID/locations/LOCATION/subnetworks/SUBNET_ID。 -
POOL_ID_2:第二个 CA 池的 ID。 例如
test-mtls-pool2。
POOL_ID_1:第一个 CA 池的 ID。
例如 test-mtls-pool1。
在现有集群上
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
使用
gcloud managed-kafka clusters update命令。此命令会覆盖当前配置的整个池集。提供所需 CA 池的完整列表。在此示例中,配置了两个 CA 池。gcloud managed-kafka clusters update CLUSTER_ID \ --location=LOCATION \ --mtls-ca-pools=projects/PROJECT_ID/locations/LOCATION/caPools/POOL_ID_1,projects/PROJECT_ID/locations/LOCATION/caPools/POOL_ID_2
替换以下内容:
-
CLUSTER_ID:集群的 ID 或名称。
如需详细了解如何命名集群,请参阅 Managed Service for Apache Kafka 资源命名指南。 例如
test-mtls-cluster。 -
LOCATION:集群的位置。
如需详细了解支持的位置,请参阅 Managed Service for Apache Kafka 支持的位置。 例如
us-central1。 -
POOL_ID_2:第二个 CA 池的 ID。 例如
test-mtls-pool2。
POOL_ID_1:第一个 CA 池的 ID。
例如 test-mtls-pool1。
配置正文名称映射
当客户端通过 mTLS 进行身份验证时,默认情况下,Kafka 主账号派生自证书的主题可分辨名称 (DN),格式为 User:CN=...,OU=...,O=...,L=...,ST=...,C=...。创建映射规则,将证书的主题 DN 转换为便于在 Kafka ACL 中使用的别名。
如需详细了解主题 DN 的格式,请参阅 Apache Kafka 文档中的自定义 SSL 用户名。
这些转换由 ssl.principal.mapping.rules Kafka 代理属性定义,该属性使用正则表达式提取证书正文的部分内容并重新设置其格式。
例如,您可以应用规则将完整的 Subject DN 转换为别名,如下所示:
证书主题 DN:
CN=order-processing-app,OU=marketing,O=company,C=US映射规则:
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.-]*).*$/$1/L,DEFAULT生成的 Kafka 主账号:
order-processing-app
此规则示例从证书主题中提取公用名 (CN),并将其用作 Kafka 中的正文名称。
如需使用 Google Cloud CLI 在集群上设置映射规则,请按以下步骤操作。 使用控制台时,您可以在创建或更新集群时设置映射规则。
如需更新集群,请将
gcloud managed-kafka clusters update命令与--ssl-principal-mapping-rules标志结合使用。gcloud managed-kafka clusters update CLUSTER_ID \ --location=REGION \ --ssl-principal-mapping-rules='MAPPING_RULE'替换以下内容:
CLUSTER_ID:您要创建的 Managed Service for Apache Kafka 集群的 ID。例如 -test-kafka-cluster。REGION:要在其中创建集群的 Google Cloud 区域。例如us-central1。MAPPING_RULE*:您要应用的映射规则。例如:RULE:^.*[Cc][Nn]=([a-zA-Z0-9.-]*).*$/$1/L,DEFAULT。
如需详细了解如何编写映射规则,请参阅 Apache Kafka 文档中的授权和 ACL。
为 mTLS 主账号配置 Kafka ACL
默认情况下,任何使用有效的 mTLS 证书成功通过身份验证的客户端都会被授予对集群的完全访问权限。为了强制执行最小权限原则,您必须创建 Kafka ACL,以定义 mTLS 主账号的特定权限。mTLS 客户端的主账号是其证书的主题 DN(或映射的别名),并以 User: 为前缀。
如需创建 Kafka ACL,您需要拥有 Managed Kafka ACL Editor (roles/managedkafka.aclEditor) IAM 角色。
假设您有一个由其证书标识的应用,该应用会向 orders-topic 生成消息并从 analytics-topic 中使用消息。应用的主账号在通过映射规则简化后为 order-processing-app。创建 Kafka ACL 时,您必须在正文前添加 User: 前缀。
将
WRITEACL 应用到集群。运行gcloud managed-kafka acls add-entry命令,以授予对orders-topic的WRITE权限。gcloud managed-kafka acls add-entry topic/orders-topic \ --cluster=CLUSTER_ID \ --location=REGION \ --principal=User:order-processing-app \ --operation=WRITE \ --permission-type=ALLOW \ --host="*"替换以下内容:
CLUSTER_ID:您要创建的 Managed Service for Apache Kafka 集群的 ID。例如 -test-kafka-cluster。REGION:要在其中创建集群的 Google Cloud 区域。例如us-central1。
将
READACL 应用到集群。运行gcloud managed-kafka acls add-entry命令,以授予对analytics-topic的READ权限。gcloud managed-kafka acls add-entry topic/analytics-topic \ --cluster=CLUSTER_ID \ --location=REGION \ --principal=User:order-processing-app \ --operation=READ \ --permission-type=ALLOW \ --host="*"
应用这些 ACL 后,order-processing-app 客户端仅具有您授予的特定权限。如需详细了解如何创建 ACL,请参阅创建 Kafka ACL。
配置 Kafka 客户端
在集群上配置 mTLS 后,您必须配置客户端应用以使用此方法进行身份验证。此流程包括创建客户端证书和配置客户端的属性以使用该证书。
在客户端计算机上创建并下载客户端证书。运行
gcloud privateca certificates create命令,从您为集群配置的某个 CA 池中颁发新证书。此命令会将证书
client-cert.pem及其私钥client-key.pem下载到您的本地环境。gcloud privateca certificates create CERTIFICATE_ID \ --project=PROJECT_ID \ --issuer-location=REGION \ --issuer-pool=POOL_ID \ --ca=CA_NAME \ --generate-key \ --dns-san="client.example.com" \ --subject="CN=test-client-app" \ --key-output-file=client-key.pem \ --cert-output-file=client-cert.pem替换以下内容:
CERTIFICATE_ID:证书对象的唯一名称。例如order-app-cert。PROJECT_ID:包含 CA 池的项目的 ID。例如test-project-12345。REGION:CA 池所在的区域。例如us-central1。POOL_ID:用于签发证书的 CA 池的 ID。例如test-mtls-pool1。CA_NAME:池中证书授权机构的名称。例如test-sub-ca。--dns-san="client.example.com":DNS 主题备用名称。 您可以使用与客户相关的任何值。--subject="CN=test-client-app":主题 DN。除非您已配置正文名称映射规则,否则此名称将用作 mTLS 正文。
查看客户端证书、查看证书主题,并验证
ssl.principal.mapping.rules。运行gcloud privateca certificates describe命令:gcloud privateca certificates describe CERTIFICATE_ID \ --issuer-pool=POOL_ID \ --issuer-location=REGION替换以下内容:
CERTIFICATE_ID:证书对象的唯一名称。例如order-app-cert。POOL_ID:您从中签发证书的 CA 池的 ID。例如test-mtls-pool1。REGION:CA 池所在的区域。例如us-central1。
输出类似于以下内容:
certificateDescription: aiaIssuingCertificateUrls: - http://privateca-content-68e092f4-0000-288c-95cf-30fd3814648c.storage.googleapis.com/a6553d092bbedd752e34/ca.crt authorityKeyId: keyId: 9568aa9d2baa11a097addc2e24adeaebea0d6a2a certFingerprint: sha256Hash: 230e52b8411fd094048fca194fc6cf80e41b3e8561298aec3519e13cb1fd05eb ... subjectDescription: hexSerialNumber: 2107b74cf5a814043a38a87eeb6cd7c7891a5f lifetime: P30D notAfterTime: '2025-07-13T15:34:43Z' notBeforeTime: '2025-06-13T15:34:44Z' subject: commonName: test-client-app subjectAltName: dnsNames: - client.example.com ...创建 Java 密钥库。将证书和私钥合并到一个
PKCS#12文件中,然后将其导入到 Java 密钥库 (.jks) 文件中。# Create a password for the keystore export KEYSTORE_PASSWORD="KEYSTORE_PASSWORD" # Combine the key and cert into a PKCS#12 file openssl pkcs12 -export -inkey client-key.pem -in client-cert.pem \ -name client -out client-keystore.p12 -password "pass:$KEYSTORE_PASSWORD" # Import the PKCS#12 file into a Java KeyStore keytool -importkeystore -srckeystore client-keystore.p12 -srcstoretype pkcs12 \ -destkeystore client-keystore.jks -srcstorepass "$KEYSTORE_PASSWORD" -deststorepass "$KEYSTORE_PASSWORD"您可以运行以下命令来验证密钥是否已成功存储:
keytool -v -list -keystore client-keystore.jks -storepass "$KEYSTORE_PASSWORD"输出类似于以下内容:
Keystore type: JKS Keystore provider: SUN Your keystore contains 1 entry Alias name: client Creation date: Jun 13, 2024 Entry type: Private key entry Certificate chain length: 1 Certificate[1]: Owner: CN=test-client-app Issuer: CN=test-sub-ca ...请注意,
Owner行显示证书主题 DN。默认情况下,Kafka 会将 Kafka 主账号设置为以下确切格式:CN=...,OU=...,O=...,L=...,ST=...,C=...。如需了解详情,请参阅 Apache Kafka 文档中的授权和 ACL。配置 Kafka 客户端属性和引导地址。在 Kafka 客户端应用中,设置以下属性以使用密钥库进行 SSL 连接。此外,请务必使用正确的引导地址和端口
9192。如需详细了解如何设置客户端,请参阅快速入门:创建 Managed Service for Apache Kafka 集群并连接客户端。security.protocol=SSL ssl.keystore.location=KEYSTORE_LOCATION ssl.keystore.password=KEYSTORE_PASSWORD bootstrap.servers=CLUSTER_BOOTSTRAP_ADDRESS替换以下内容:
KEYSTORE_LOCATION:.jks文件的路径。KEYSTORE_PASSWORD:密钥库的密码。CLUSTER_BOOTSTRAP_ADDRESS:集群的引导地址。如需查找引导地址,请参阅查看集群详细信息。 请务必添加端口号9192。
保护客户端配置
上述示例涉及在本地存储私钥和密码,因此我们不建议将其用于生产环境。对于生产环境,请务必妥善处理客户端密钥。包括以下选项:
将密钥库及其密码作为密文存储在 Google Cloud Secret Manager 中,并在运行时在应用代码中检索它们。
如果您在 GKE 上部署应用,请使用 Secret Manager 插件在运行时将 Secret 装载到应用的文件系统中。
监控 mTLS
您可以使用 Cloud Monitoring 和 Cloud Logging 中的指标和日志监控 mTLS 证书更新的运行状况。
如需主动监控 mTLS 证书更新的健康状况,请在 Monitoring 中使用 managedkafka.googleapis.com/mtls_truststore_update_count 指标。此指标用于统计信任库更新尝试次数,并包含 STATUS 标签,该标签可以是 SUCCESS 或失败原因(例如 CA_POOL_FETCH_ERROR)。
Managed Service for Apache Kafka 服务会尝试每小时为每个集群刷新一次信任库。我们建议您创建一个提醒,当此指标报告的持续错误计数超过 3 小时时触发该提醒,因为这可能表明存在需要人工干预的配置错误。
信任库更新会消耗 Certificate Authority Service API 的配额。请务必了解以下几点:
更新过程会调用
FetchCaCerts方法,该方法受API requests per minute per region配额的限制。此配额用量归因于包含所引用 CA 池的项目,而不是 Managed Service for Apache Kafka 项目。
默认限额为每个区域每秒 400 次查询 (QPS)。鉴于每个集群每小时一次请求的频率较低,这些信任库更新不太可能导致您超出此配额。
您还可以通过查看 Logging 中的日志来跟踪信任库更新。 查找以下日志条目,确认更新成功:
Managed Service for Apache Kafka updated the mTLS trust storeAdded root CA certificate to trust store