開發 Python 供應端應用程式

瞭解如何開發 Python 供應端應用程式,並使用應用程式預設憑證 (ADC) 向 Managed Service for Apache Kafka 叢集驗證。ADC 可讓在 Google Cloud 上執行的應用程式自動尋找並使用正確的憑證,向 Google Cloud 服務進行驗證。

事前準備

開始本教學課程前,請先建立新的 Managed Service for Apache Kafka 叢集。如果已有叢集,可以略過這個步驟。

如何建立叢集

控制台

  1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 點選 「Create」(建立)
  3. 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
  4. 在「Region」(區域) 清單中,選取叢集的位置。
  5. 在「網路設定」中,設定可存取叢集的子網路:
    1. 在「Project」(專案) 部分,選取專案。
    2. 在「Network」(網路) 中選取虛擬私有雲網路。
    3. 在「Subnet」(子網路) 中,選取子網路。
    4. 按一下 [完成]
  6. 點選「建立」

按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active

gcloud

如要建立 Kafka 叢集,請執行 managed-kafka clusters create 指令。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

更改下列內容:

  • KAFKA_CLUSTER:Kafka 叢集名稱
  • REGION:叢集位置
  • PROJECT_ID:專案 ID
  • SUBNET_NAME:要建立叢集的子網路,例如 default

如要瞭解支援的位置,請參閱「 Managed Service for Apache Kafka 位置」。

這項指令會非同步執行,並傳回作業 ID:

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

如要追蹤建立作業的進度,請使用 gcloud managed-kafka operations describe 指令:

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

叢集準備就緒後,這項指令的輸出內容會包含 state: ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。

必要的角色

如要取得建立及設定用戶端 VM 所需的權限,請要求管理員在專案中授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

設定用戶端 VM

在 Compute Engine 中建立可存取 Kafka 叢集的 Linux 虛擬機器 (VM) 執行個體。設定 VM 時,請設定下列選項:

  • 區域。在與 Kafka 叢集相同的區域中建立 VM。

  • 子網路。在與 Kafka 叢集設定中使用的子網路相同的虛擬私有雲網路中建立 VM。詳情請參閱「查看叢集的子網路」。

  • 存取權範圍。將 https://www.googleapis.com/auth/cloud-platform 存取範圍指派給 VM。這個範圍會授權 VM 將要求傳送至 Managed Kafka API。

下列步驟說明如何設定這些選項。

控制台

  1. 前往 Google Cloud 控制台的「Create an instance」(建立執行個體) 頁面。

    建立執行個體

  2. 在「機器設定」窗格中,執行下列操作:

    1. 在「Name」(名稱) 欄位中,指定執行個體的名稱。詳情請參閱資源命名慣例

    2. 在「Region」(區域) 清單中,選取與 Kafka 叢集相同的區域。

    3. 在「Zone」(可用區) 清單中選取可用區。

  3. 在導覽選單中,按一下「Networking」(網路)。在隨即顯示的「Networking」(網路) 窗格中,執行下列操作:

    1. 前往「網路介面」部分。

    2. 如要展開預設網路介面,請點選 箭頭。

    3. 在「Network」(網路) 欄位中,選擇虛擬私有雲網路。

    4. 在「Subnetwork」(子網路) 清單中,選取子網路。

    5. 按一下 [完成]

  4. 按一下導覽選單中的「Security」(安全性)。在隨即顯示的「安全性」窗格中,執行下列操作:

    1. 為「Access scopes」(存取權範圍) 選取 [Set access for each API] (針對各個 API 設定存取權)

    2. 在存取範圍清單中,找到「Cloud Platform」下拉式清單,然後選取「Enabled」

  5. 按一下「建立」即可建立 VM。

gcloud

如要建立 VM 執行個體,請使用 gcloud compute instances create 指令。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

更改下列內容:

  • VM_NAME:VM 的名稱
  • PROJECT_ID:專案 ID
  • REGION:您建立 Kafka 叢集的區域,例如 us-central1
  • SUBNET:與叢集設定中使用的子網路位於相同虛擬私有雲網路的子網路
  • ZONE:您建立叢集的區域中的可用區,例如 us-central1-c

如要進一步瞭解如何建立 VM,請參閱「在特定子網路中建立 VM 執行個體」。

授予 IAM 角色

將下列 Identity and Access Management (IAM) 角色授予 Compute Engine 預設服務帳戶

  • 代管 Kafka 用戶端 (roles/managedkafka.client)
  • 服務帳戶憑證建立者 (roles/iam.serviceAccountTokenCreator)
  • 服務帳戶 OpenID 權杖建立者 (roles/iam.serviceAccountOpenIdTokenCreator)

控制台

  1. 前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。

    前往「身分與存取權管理」頁面

  2. 找出「Compute Engine default service account」(Compute Engine 預設服務帳戶) 的資料列,然後按一下 「Edit principal」(編輯主體)

  3. 按一下「新增其他角色」,然後選取「受管理 Kafka 用戶端」角色。 針對「服務帳戶憑證建立者」和「服務帳戶 OpenID 憑證建立者」角色重複這個步驟。

  4. 按一下 [儲存]

gcloud

如要授予 IAM 角色,請使用 gcloud projects add-iam-policy-binding 指令。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

更改下列內容:

  • PROJECT_ID:專案 ID

  • PROJECT_NUMBER:您的專案編號

如要取得專案編號,請執行 gcloud projects describe 指令:

gcloud projects describe PROJECT_ID

詳情請參閱「尋找專案名稱、編號和 ID」。

連線至 VM

使用 SSH 連線至 VM 執行個體。

控制台

  1. 前往「VM instances」(VM 執行個體) 頁面。

    前往 VM 執行個體

  2. 在 VM 執行個體清單中找到 VM 名稱,然後按一下「SSH」SSH

gcloud

如要連線至 VM,請使用 gcloud compute ssh 指令。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

更改下列內容:

  • VM_NAME:VM 的名稱
  • PROJECT_ID:專案 ID
  • ZONE:您建立 VM 的可用區

首次使用 SSH 時,可能需要額外設定。詳情請參閱「關於 SSH 連線」一文。

建立 Python 供應端應用程式

在 SSH 工作階段中,執行下列指令來建立 Producer 應用程式。

  1. 安裝 pip (Python 套件管理員和虛擬環境管理員):

    sudo apt install python3-pip -y
    sudo apt install python3-venv -y
    
  2. 建立並啟動新的虛擬環境 (venv):

    python3 -m venv kafka
    source kafka/bin/activate
    
  3. 安裝 confluent-kafka 用戶端和其他依附元件:

    pip install confluent-kafka google-auth urllib3 packaging
    
  4. 將下列生產者用戶端程式碼複製到名為 producer.py 的檔案中

    import confluent_kafka
    import argparse
    from tokenprovider import TokenProvider
    
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True)
    parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False)
    parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False)
    args = parser.parse_args()
    
    token_provider = TokenProvider()
    
    config = {
        'bootstrap.servers': args.bootstrap,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'OAUTHBEARER',
        'oauth_cb': token_provider.get_token,
    }
    
    producer = confluent_kafka.Producer(config)
    
    def callback(error, message):
        if error is not None:
            print(error)
            return
        print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))
    
    for i in range(args.num_messages):
    
      message = f"{i} hello world!".encode('utf-8')
      producer.produce(args.topic_name, message, callback=callback)
    
    producer.flush()
    
  5. 您現在需要實作 OAuth 權杖供應商。將下列程式碼儲存為 tokenprovider.py 檔案:

    import base64
    import datetime
    import http.server
    import json
    import google.auth
    from google.auth.transport.urllib3 import Request
    import urllib3
    import time
    
    def encode(source):
      """Safe base64 encoding."""
      return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=')
    
    class TokenProvider(object):
      """
      Provides OAuth tokens from Google Cloud Application Default credentials.
      """
      HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'})
    
      def __init__(self, **config):
        self.credentials, _project = google.auth.default()
        self.http_client = urllib3.PoolManager()
    
      def get_credentials(self):
        if not self.credentials.valid:
          self.credentials.refresh(Request(self.http_client))
        return self.credentials
    
      def get_jwt(self, creds):
        token_data = dict(
          exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(),
          iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
          iss='Google',
          sub=creds.service_account_email
        )
        return json.dumps(token_data)
    
      def get_token(self, args):
        creds = self.get_credentials()
        token = '.'.join([
          encode(self.HEADER),
          encode(self.get_jwt(creds)),
          encode(creds.token)
        ])
    
        # compute expiry time
        expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc)
        now_utc = datetime.datetime.now(datetime.timezone.utc)
        expiry_seconds = (expiry_utc - now_utc).total_seconds()
    
        return token, time.time() + expiry_seconds
    
  6. 現在可以執行應用程式:

    python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

清除所用資源

為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。

控制台

  1. 刪除 VM 執行個體。

    1. 前往「VM instances」(VM 執行個體) 頁面。

      前往 VM 執行個體

    2. 選取 VM,然後按一下「Delete」(刪除)

  2. 刪除 Kafka 叢集。

    1. 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。

      前往「Clusters」(叢集)

    2. 選取 Kafka 叢集,然後按一下「Delete」(刪除)

gcloud

  1. 如要刪除 VM,請使用 gcloud compute instances delete 指令。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. 如要刪除 Kafka 叢集,請使用 gcloud managed-kafka clusters delete 指令。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。