開發 Python 供應端應用程式
瞭解如何開發 Python 供應端應用程式,並使用應用程式預設憑證 (ADC) 向 Managed Service for Apache Kafka 叢集驗證。ADC 可讓在 Google Cloud 上執行的應用程式自動尋找並使用正確的憑證,向 Google Cloud 服務進行驗證。
事前準備
開始本教學課程前,請先建立新的 Managed Service for Apache Kafka 叢集。如果已有叢集,可以略過這個步驟。
如何建立叢集
控制台
- 前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
- 點選 「Create」(建立)。
- 在「Cluster name」(叢集名稱) 方塊中輸入叢集的名稱。
- 在「Region」(區域) 清單中,選取叢集的位置。
-
在「網路設定」中,設定可存取叢集的子網路:
- 在「Project」(專案) 部分,選取專案。
- 在「Network」(網路) 中選取虛擬私有雲網路。
- 在「Subnet」(子網路) 中,選取子網路。
- 按一下 [完成]。
- 點選「建立」。
按一下「建立」後,叢集狀態會顯示為 Creating。叢集準備就緒時,狀態會顯示 Active。
gcloud
如要建立 Kafka 叢集,請執行 managed-kafka clusters
create 指令。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
更改下列內容:
KAFKA_CLUSTER:Kafka 叢集名稱REGION:叢集位置PROJECT_ID:專案 IDSUBNET_NAME:要建立叢集的子網路,例如default
如要瞭解支援的位置,請參閱「 Managed Service for Apache Kafka 位置」。
這項指令會非同步執行,並傳回作業 ID:
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
如要追蹤建立作業的進度,請使用 gcloud managed-kafka
operations describe 指令:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
叢集準備就緒後,這項指令的輸出內容會包含 state:
ACTIVE 項目。詳情請參閱「 監控叢集建立作業」。
必要的角色
如要取得建立及設定用戶端 VM 所需的權限,請要求管理員在專案中授予您下列 IAM 角色:
- Compute 執行個體管理員 (v1) (
roles/compute.instanceAdmin.v1) - 專案 IAM 管理員 (
roles/resourcemanager.projectIamAdmin) -
角色檢視者 (
roles/iam.roleViewer) -
服務帳戶使用者 (
roles/iam.serviceAccountUser)
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
設定用戶端 VM
在 Compute Engine 中建立可存取 Kafka 叢集的 Linux 虛擬機器 (VM) 執行個體。設定 VM 時,請設定下列選項:
區域。在與 Kafka 叢集相同的區域中建立 VM。
子網路。在與 Kafka 叢集設定中使用的子網路相同的虛擬私有雲網路中建立 VM。詳情請參閱「查看叢集的子網路」。
存取權範圍。將
https://www.googleapis.com/auth/cloud-platform存取範圍指派給 VM。這個範圍會授權 VM 將要求傳送至 Managed Kafka API。
下列步驟說明如何設定這些選項。
控制台
前往 Google Cloud 控制台的「Create an instance」(建立執行個體) 頁面。
在「機器設定」窗格中,執行下列操作:
在「Name」(名稱) 欄位中,指定執行個體的名稱。詳情請參閱資源命名慣例。
在「Region」(區域) 清單中,選取與 Kafka 叢集相同的區域。
在「Zone」(可用區) 清單中選取可用區。
在導覽選單中,按一下「Networking」(網路)。在隨即顯示的「Networking」(網路) 窗格中,執行下列操作:
前往「網路介面」部分。
如要展開預設網路介面,請點選 箭頭。
在「Network」(網路) 欄位中,選擇虛擬私有雲網路。
在「Subnetwork」(子網路) 清單中,選取子網路。
按一下 [完成]。
按一下導覽選單中的「Security」(安全性)。在隨即顯示的「安全性」窗格中,執行下列操作:
為「Access scopes」(存取權範圍) 選取 [Set access for each API] (針對各個 API 設定存取權)。
在存取範圍清單中,找到「Cloud Platform」下拉式清單,然後選取「Enabled」。
按一下「建立」即可建立 VM。
gcloud
如要建立 VM 執行個體,請使用 gcloud compute instances create 指令。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
更改下列內容:
- VM_NAME:VM 的名稱
- PROJECT_ID:專案 ID
- REGION:您建立 Kafka 叢集的區域,例如
us-central1 - SUBNET:與叢集設定中使用的子網路位於相同虛擬私有雲網路的子網路
- ZONE:您建立叢集的區域中的可用區,例如
us-central1-c
如要進一步瞭解如何建立 VM,請參閱「在特定子網路中建立 VM 執行個體」。
授予 IAM 角色
將下列 Identity and Access Management (IAM) 角色授予 Compute Engine 預設服務帳戶:
- 代管 Kafka 用戶端 (
roles/managedkafka.client) - 服務帳戶憑證建立者 (
roles/iam.serviceAccountTokenCreator) 服務帳戶 OpenID 權杖建立者 (
roles/iam.serviceAccountOpenIdTokenCreator)
控制台
前往 Google Cloud 控制台的「IAM」(身分與存取權管理) 頁面。
找出「Compute Engine default service account」(Compute Engine 預設服務帳戶) 的資料列,然後按一下 「Edit principal」(編輯主體)。
按一下「新增其他角色」,然後選取「受管理 Kafka 用戶端」角色。 針對「服務帳戶憑證建立者」和「服務帳戶 OpenID 憑證建立者」角色重複這個步驟。
按一下 [儲存]。
gcloud
如要授予 IAM 角色,請使用 gcloud projects add-iam-policy-binding 指令。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
更改下列內容:
PROJECT_ID:專案 ID
PROJECT_NUMBER:您的專案編號
如要取得專案編號,請執行 gcloud projects describe 指令:
gcloud projects describe PROJECT_ID
詳情請參閱「尋找專案名稱、編號和 ID」。
連線至 VM
使用 SSH 連線至 VM 執行個體。
控制台
前往「VM instances」(VM 執行個體) 頁面。
在 VM 執行個體清單中找到 VM 名稱,然後按一下「SSH」SSH。
gcloud
如要連線至 VM,請使用 gcloud compute ssh 指令。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
更改下列內容:
- VM_NAME:VM 的名稱
- PROJECT_ID:專案 ID
- ZONE:您建立 VM 的可用區
首次使用 SSH 時,可能需要額外設定。詳情請參閱「關於 SSH 連線」一文。
建立 Python 供應端應用程式
在 SSH 工作階段中,執行下列指令來建立 Producer 應用程式。
安裝 pip (Python 套件管理員和虛擬環境管理員):
sudo apt install python3-pip -y sudo apt install python3-venv -y建立並啟動新的虛擬環境 (venv):
python3 -m venv kafka source kafka/bin/activate安裝
confluent-kafka用戶端和其他依附元件:pip install confluent-kafka google-auth urllib3 packaging將下列生產者用戶端程式碼複製到名為
producer.py的檔案中import confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()您現在需要實作 OAuth 權杖供應商。將下列程式碼儲存為
tokenprovider.py檔案:import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_seconds現在可以執行應用程式:
python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
清除所用資源
為了避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請按照下列步驟操作。
控制台
刪除 VM 執行個體。
前往「VM instances」(VM 執行個體) 頁面。
選取 VM,然後按一下「Delete」(刪除)。
刪除 Kafka 叢集。
前往「Managed Service for Apache Kafka」>「Clusters」(叢集) 頁面。
選取 Kafka 叢集,然後按一下「Delete」(刪除)。
gcloud
如要刪除 VM,請使用
gcloud compute instances delete指令。gcloud compute instances delete VM_NAME --zone=ZONE如要刪除 Kafka 叢集,請使用
gcloud managed-kafka clusters delete指令。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async