开发 Python 提供方应用

了解如何开发一个 Python 生产者应用,该应用通过使用应用默认凭据 (ADC) 向 Managed Service for Apache Kafka 集群进行身份验证。借助 ADC,在 Google Cloud 上运行的应用可以自动查找并使用正确的凭据向 Google Cloud 服务进行身份验证。

准备工作

在开始学习本教程之前,请创建一个新的 Managed Service for Apache Kafka 集群。如果您已有集群,则可以跳过此步骤。

如何创建集群

控制台

  1. 前往 Managed Service for Apache Kafka > 集群页面。

    转到“集群”

  2. 点击 创建
  3. 集群名称字段中,输入集群的名称。
  4. 区域列表中,为集群选择一个位置。
  5. 对于网络配置,请配置集群可访问的子网:
    1. 项目部分,选择您的项目。
    2. 对于网络,选择 VPC 网络。
    3. 对于子网,请选择相应子网。
    4. 点击完成
  6. 点击创建

点击创建后,集群状态为 Creating。当集群准备就绪时,状态为 Active

gcloud

如需创建 Kafka 集群,请运行 managed-kafka clusters create 命令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

替换以下内容:

  • KAFKA_CLUSTER:Kafka 集群的名称
  • REGION:集群的位置
  • PROJECT_ID:您的项目 ID
  • SUBNET_NAME:您要在其中创建集群的子网,例如 default

如需了解支持的位置,请参阅 Managed Service for Apache Kafka 位置

该命令会异步运行,并返回一个操作 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如需跟踪创建操作的进度,请使用 gcloud managed-kafka operations describe 命令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

当集群准备就绪时,此命令的输出内容会包含 state: ACTIVE 条目。如需了解详情,请参阅 监控集群创建操作

所需的角色

如需获得创建和配置客户端虚拟机所需的权限,请让管理员向您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

设置客户端虚拟机

在 Compute Engine 中创建一个可以访问 Kafka 集群的 Linux 虚拟机 (VM) 实例。配置虚拟机时,请设置以下选项:

  • Region 绑定将多选选项设置为所有记录中 Region 的所有值。在与 Kafka 集群相同的区域中创建虚拟机。

  • 子网。在与您在 Kafka 集群配置中使用的子网相同的 VPC 网络中创建虚拟机。如需了解详情,请参阅查看集群的子网

  • 访问权限范围。为虚拟机分配 https://www.googleapis.com/auth/cloud-platform 访问权限范围。此范围授权虚拟机向 Managed Kafka API 发送请求。

以下步骤展示了如何设置这些选项。

控制台

  1. 在 Google Cloud 控制台中,前往创建实例页面。

    创建实例

  2. 机器配置窗格中,执行以下操作:

    1. 名称字段中,指定实例的名称。如需了解详情,请参阅资源命名惯例

    2. 区域列表中,选择与 Kafka 集群相同的区域。

    3. 可用区列表中,选择一个可用区。

  3. 在导航菜单中,点击网络。在显示的网络窗格中,执行以下操作:

    1. 前往网络接口部分。

    2. 如需展开默认网络接口,请点击 箭头。

    3. 网络字段中,选择 VPC 网络。

    4. 子网列表中,选择子网。

    5. 点击完成

  4. 在导航菜单中,点击安全。在显示的安全窗格中,执行以下操作:

    1. 访问权限范围下,选择针对每个 API 设置访问权限

    2. 在访问权限范围列表中,找到 Cloud Platform 下拉列表,然后选择已启用

  5. 点击创建以创建虚拟机。

gcloud

如需创建虚拟机实例,请使用 gcloud compute instances create 命令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • REGION:创建 Kafka 集群的区域,例如 us-central1
  • SUBNET:与您在集群配置中使用的子网位于同一 VPC 网络中的子网
  • ZONE:创建集群的区域中的一个可用区,例如 us-central1-c

如需详细了解如何创建虚拟机,请参阅在特定子网中创建虚拟机实例

授予 IAM 角色

Compute Engine 默认服务账号授予以下 Identity and Access Management (IAM) 角色:

  • Managed Kafka Client (roles/managedkafka.client)
  • Service Account Token Creator (roles/iam.serviceAccountTokenCreator)
  • Service Account OpenID Token Creator (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 在 Google Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 找到 Compute Engine 默认服务账号所在的行,然后点击 修改主账号

  3. 点击添加其他角色,然后选择角色 Managed Kafka Client。 针对 Service Account Token CreatorService Account OpenID Token Creator 角色重复此步骤。

  4. 点击保存

gcloud

如需授予 IAM 角色,请使用 gcloud projects add-iam-policy-binding 命令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

替换以下内容:

  • PROJECT_ID:您的项目 ID

  • PROJECT_NUMBER:您的项目编号

如需获取项目编号,请运行 gcloud projects describe 命令:

gcloud projects describe PROJECT_ID

如需了解详情,请参阅查找项目名称、编号和 ID

连接到虚拟机

使用 SSH 连接到虚拟机实例。

控制台

  1. 转到虚拟机实例页面。

    进入“虚拟机实例”

  2. 在虚拟机实例列表中,找到虚拟机名称,然后点击 SSH

gcloud

如需连接到虚拟机,请使用 gcloud compute ssh 命令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

替换以下内容:

  • VM_NAME:虚拟机的名称
  • PROJECT_ID:您的项目 ID
  • ZONE:您在其中创建虚拟机的可用区

首次使用 SSH 时,可能需要进行额外的配置。如需了解详情,请参阅关于 SSH 连接

创建 Python 提供方应用

在 SSH 会话中,运行以下命令以创建生产者应用。

  1. 安装 pip(一种 Python 软件包管理器和虚拟环境管理器):

    sudo apt install python3-pip -y
    sudo apt install python3-venv -y
    
  2. 创建新的虚拟环境 (venv) 并将其激活:

    python3 -m venv kafka
    source kafka/bin/activate
    
  3. 安装 confluent-kafka 客户端和其他依赖项:

    pip install confluent-kafka google-auth urllib3 packaging
    
  4. 将以下生产者客户端代码复制到名为 producer.py 的文件中

    import confluent_kafka
    import argparse
    from tokenprovider import TokenProvider
    
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True)
    parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False)
    parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False)
    args = parser.parse_args()
    
    token_provider = TokenProvider()
    
    config = {
        'bootstrap.servers': args.bootstrap,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'OAUTHBEARER',
        'oauth_cb': token_provider.get_token,
    }
    
    producer = confluent_kafka.Producer(config)
    
    def callback(error, message):
        if error is not None:
            print(error)
            return
        print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))
    
    for i in range(args.num_messages):
    
      message = f"{i} hello world!".encode('utf-8')
      producer.produce(args.topic_name, message, callback=callback)
    
    producer.flush()
    
  5. 您现在需要实现 OAuth 令牌提供程序。将以下代码保存在名为 tokenprovider.py 的文件中:

    import base64
    import datetime
    import http.server
    import json
    import google.auth
    from google.auth.transport.urllib3 import Request
    import urllib3
    import time
    
    def encode(source):
      """Safe base64 encoding."""
      return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=')
    
    class TokenProvider(object):
      """
      Provides OAuth tokens from Google Cloud Application Default credentials.
      """
      HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'})
    
      def __init__(self, **config):
        self.credentials, _project = google.auth.default()
        self.http_client = urllib3.PoolManager()
    
      def get_credentials(self):
        if not self.credentials.valid:
          self.credentials.refresh(Request(self.http_client))
        return self.credentials
    
      def get_jwt(self, creds):
        token_data = dict(
          exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(),
          iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
          iss='Google',
          sub=creds.service_account_email
        )
        return json.dumps(token_data)
    
      def get_token(self, args):
        creds = self.get_credentials()
        token = '.'.join([
          encode(self.HEADER),
          encode(self.get_jwt(creds)),
          encode(creds.token)
        ])
    
        # compute expiry time
        expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc)
        now_utc = datetime.datetime.now(datetime.timezone.utc)
        expiry_seconds = (expiry_utc - now_utc).total_seconds()
    
        return token, time.time() + expiry_seconds
    
  6. 您现在可以运行应用了:

    python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

控制台

  1. 删除虚拟机实例。

    1. 转到虚拟机实例页面。

      进入“虚拟机实例”

    2. 选择虚拟机,然后点击删除

  2. 删除 Kafka 集群。

    1. 前往 Managed Service for Apache Kafka > 集群页面。

      转到“集群”

    2. 选择 Kafka 集群,然后点击删除

gcloud

  1. 如需删除虚拟机,请使用 gcloud compute instances delete 命令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如需删除 Kafka 集群,请使用 gcloud managed-kafka clusters delete 命令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。