开发 Python 提供方应用
了解如何开发一个 Python 生产者应用,该应用通过使用应用默认凭据 (ADC) 向 Managed Service for Apache Kafka 集群进行身份验证。借助 ADC,在 Google Cloud 上运行的应用可以自动查找并使用正确的凭据向 Google Cloud 服务进行身份验证。
准备工作
在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。
如何创建集群
控制台
- 前往 Managed Service for Apache Kafka > 集群页面。
- 点击 创建。
- 在集群名称字段中,输入集群的名称。
- 在区域列表中,为集群选择一个位置。
-
对于网络配置,请配置集群可访问的子网:
- 在项目部分,选择您的项目。
- 对于网络,选择 VPC 网络。
- 对于子网,请选择相应子网。
- 点击完成。
- 点击创建。
点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active。
gcloud
如需创建 Kafka 集群,请运行 managed-kafka clusters
create 命令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
替换以下内容:
KAFKA_CLUSTER:Kafka 集群的名称REGION:集群的位置PROJECT_ID:您的项目 IDSUBNET_NAME:您要在其中创建集群的子网,例如default
如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置。
该命令会异步运行,并返回一个操作 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如需跟踪创建操作的进度,请使用 gcloud managed-kafka
operations describe 命令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
当集群准备就绪时,此命令的输出内容会包含 state:
ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作。
所需的角色
如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:
-
Compute Instance Admin (v1) (
roles/compute.instanceAdmin.v1) -
Project IAM Admin (
roles/resourcemanager.projectIamAdmin) -
Role Viewer (
roles/iam.roleViewer) -
Service Account User (
roles/iam.serviceAccountUser)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
设置客户端虚拟机
在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:
Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。
子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网。
访问权限范围。为虚拟机分配
https://www.googleapis.com/auth/cloud-platform访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。
以下步骤展示了如何设置这些选项。
控制台
在 Google Cloud 控制台中,前往创建实例页面。
在机器配置窗格中,执行以下操作:
在名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例。
在区域列表中,选择与 Kafka 集群相同的区域。
在可用区列表中,选择一个可用区。
在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:
前往网络接口部分。
如需展开默认网络接口,请点击 箭头。
在网络字段中,选择 VPC 网络。
在子网列表中,选择子网。
点击完成。
在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:
在访问权限范围下,选择针对每个 API 设置访问权限。
在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用。
点击创建以创建虚拟机。
gcloud
如需创建虚拟机实例,请使用 gcloud compute instances create 命令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- REGION:创建 Kafka 集群的区域,例如
us-central1 - SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
- ZONE:创建集群的区域中的一个可用区,例如
us-central1-c
如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例。
授予 IAM 角色
向 Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:
- Managed Kafka Client (
roles/managedkafka.client) - Service Account Token Creator (
roles/iam.serviceAccountTokenCreator) Service Account OpenID Token Creator (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
在 Google Cloud 控制台中,前往 IAM 页面。
找到 Compute Engine 默认服务账号所在的行,然后点击 修改主账号。
点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Service Account Token Creator 和 Service Account OpenID Token Creator 角色重复此步骤。
点击保存。
gcloud
如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
替换以下内容:
PROJECT_ID:您的项目 ID
PROJECT_NUMBER:您的项目编号
如需获取项目编号,请运行 gcloud projects describe 命令:
gcloud projects describe PROJECT_ID
如需了解详情,请参阅查找项目名称、编号和 ID。
连接到虚拟机
使用 SSH 连接到虚拟机实例。
控制台
转到虚拟机实例页面。
在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH。
gcloud
如需连接到虚拟机,请使用 gcloud compute ssh 命令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
替换以下内容:
- VM_NAME:虚拟机的名称
- PROJECT_ID:您的项目 ID
- ZONE:您在其中创建虚拟机的可用区
首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接。
创建 Python 提供方应用
在 SSH 会话中,运行以下命令以创建生产者应用。
安装 pip(一种 Python 软件包管理器和虚拟环境管理器):
sudo apt install python3-pip -y sudo apt install python3-venv -y创建新的虚拟环境 (venv) 并将其激活:
python3 -m venv kafka source kafka/bin/activate安装
confluent-kafka客户端和其他依赖项:pip install confluent-kafka google-auth urllib3 packaging将以下生产者客户端代码复制到名为
producer.py的文件中import confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()您现在需要实现 OAuth 令牌提供程序。将以下代码保存在名为
tokenprovider.py的文件中:import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_seconds您现在可以运行应用了:
python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
控制台
gcloud
如需删除虚拟机,请使用
gcloud compute instances delete命令。gcloud compute instances delete VM_NAME --zone=ZONE如需删除 Kafka 集群,请使用
gcloud managed-kafka clusters delete命令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async