Python プロデューサー アプリケーションを開発する
アプリケーションのデフォルト認証情報(ADC)を使用して Managed Service for Apache Kafka クラスタで認証を行う Python プロデューサー アプリケーションを開発する方法について説明します。ADC を使用すると、 Google Cloud で実行されているアプリケーションは、 サービスを認証するための適切な認証情報を自動的に検出して Google Cloud 使用できます。
始める前に
このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。
クラスタを作成する方法
コンソール
- [Managed Service for Apache Kafka] > [Clusters] ページに移動します。
- [Create] をクリックします。
- [クラスタ名] フィールドに、クラスタの名前を入力します。
- [リージョン] リストで、クラスタのロケーションを選択します。
-
[**ネットワーク構成**] で、クラスタにアクセスできるサブネットを構成します。
- [プロジェクト] で、該当するプロジェクトを選択します。
- [**ネットワーク**] で、VPC ネットワークを選択します。
- [**サブネット**] で、サブネットを選択します。
- [完了] をクリックします。
- [作成] をクリックします。
[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備ができると、状態は Active になります。
gcloud
Kafka クラスタを作成するには、
managed-kafka clusters
create コマンドを実行します。
gcloud managed-kafka clusters create KAFKA_CLUSTER \ --location=REGION \ --cpu=3 \ --memory=3GiB \ --subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --async
次のように置き換えます。
KAFKA_CLUSTER: Kafka クラスタの名前REGION: クラスタのロケーションPROJECT_ID: プロジェクト IDSUBNET_NAME: クラスタを作成するサブネット(例:default)
サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。
このコマンドは非同期で実行され、オペレーション ID を返します。
Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.
作成オペレーションの進行状況を追跡するには、
gcloud managed-kafka
operations describe コマンドを使用します:
gcloud managed-kafka operations describe OPERATION_ID \ --location=REGION
クラスタの準備ができると、このコマンドの出力に state:
ACTIVE エントリが含まれます。詳細については、
クラスタ作成オペレーションをモニタリングするをご覧ください。
必要なロール
クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールの付与を管理者に依頼してください。
- Compute インスタンス管理者(v1) (
roles/compute.instanceAdmin.v1) - プロジェクト IAM 管理者 (
roles/resourcemanager.projectIamAdmin) - ロールの閲覧者 (
roles/iam.roleViewer) - サービス アカウント ユーザー (
roles/iam.serviceAccountUser)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
クライアント VM を設定する
Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときは、次のオプションを設定します。
リージョン 。Kafka クラスタと同じリージョンに VM を作成します。
サブネット 。Kafka クラスタの構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、 クラスタのサブネットを表示するをご覧ください。
アクセス スコープ 。
https://www.googleapis.com/auth/cloud-platformアクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。
次の手順では、これらのオプションを設定する方法について説明します。
コンソール
コンソールで [インスタンスの作成] ページに移動します。 Google Cloud
[マシンの構成] ペインで、次の操作を行います。
[名前] フィールドに、インスタンスの名前を指定します。詳細については、 リソースの命名規則をご覧ください。
[リージョン] リストで、Kafka クラスタと同じリージョンを選択します。
[ゾーン] リストで、ゾーンを選択します。
ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。
[ネットワーク インターフェース] セクションに移動します。
デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。
[ネットワーク] フィールドで、VPC ネットワークを選択します。
[サブネットワーク] リストで、サブネットを選択します。
[完了] をクリックします。
ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。
[**アクセス スコープ**] で、[**各 API にアクセス権を設定**] を選択します。
アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて [有効] を選択します。
[作成] をクリックして VM を作成します。
gcloud
VM インスタンスを作成するには、
gcloud compute instances create
コマンドを使用します。
gcloud compute instances create VM_NAME \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- REGION: Kafka クラスタを作成したリージョン(例:
us-central1) - SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
- ZONE: クラスタを作成したリージョン内のゾーン(例:
us-central1-c)
VM の作成の詳細については、 特定のサブネットに VM インスタンスを作成するをご覧ください。
IAM ロールを付与する
Compute Engine のデフォルトのサービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。
- マネージド Kafka クライアント (
roles/managedkafka.client) - サービス アカウント トークン作成者 (
roles/iam.serviceAccountTokenCreator) サービス アカウント OpenID トークン作成者 (
roles/iam.serviceAccountOpenIdTokenCreator)
コンソール
Google Cloud コンソールで、[IAM] ページに移動します。
[Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。
[別のロールを追加] をクリックし、ロール [マネージド Kafka クライアント] を選択します。 [サービス アカウント トークン作成者] ロールと [サービス アカウント OpenID トークン作成者] ロールについても、この手順を繰り返します。
[保存] をクリックします。
gcloud
IAM ロールを付与するには、
gcloud projects add-iam-policy-binding
コマンドを使用します。
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
gcloud projects add-iam-policy-binding PROJECT_ID\
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/iam.serviceAccountOpenIdTokenCreator
次のように置き換えます。
PROJECT_ID: プロジェクト ID
PROJECT_NUMBER: プロジェクトの番号
プロジェクト番号を取得するには、
gcloud projects describe コマンドを実行します。
gcloud projects describe PROJECT_ID
詳細については、 プロジェクト名、番号、ID を確認するをご覧ください。
VM に接続する
VM インスタンスに SSH で接続します。
コンソール
[VM インスタンス] ページに移動します。
VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。
gcloud
VM に接続するには、
gcloud compute ssh コマンドを使用します。
gcloud compute ssh VM_NAME \
--project=PROJECT_ID \
--zone=ZONE
次のように置き換えます。
- VM_NAME: VM の名前
- PROJECT_ID: プロジェクト ID
- ZONE: VM を作成したゾーン
SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。
Python プロデューサー アプリケーションを作成する
SSH セッションから次のコマンドを実行して、プロデューサー アプリケーションを作成します。
Python パッケージ マネージャーと仮想環境マネージャーである pip をインストールします。
sudo apt install python3-pip -y sudo apt install python3-venv -y新しい仮想環境(venv)を作成して有効にします。
python3 -m venv kafka source kafka/bin/activateconfluent-kafkaクライアントとその他の依存関係をインストールします。pip install confluent-kafka google-auth urllib3 packaging次のプロデューサー クライアント コードを
producer.pyという名前のファイルにコピーします。import confluent_kafka import argparse from tokenprovider import TokenProvider parser = argparse.ArgumentParser() parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True) parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False) parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False) args = parser.parse_args() token_provider = TokenProvider() config = { 'bootstrap.servers': args.bootstrap, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': token_provider.get_token, } producer = confluent_kafka.Producer(config) def callback(error, message): if error is not None: print(error) return print("Delivered a message to {}[{}]".format(message.topic(), message.partition())) for i in range(args.num_messages): message = f"{i} hello world!".encode('utf-8') producer.produce(args.topic_name, message, callback=callback) producer.flush()OAuth トークン プロバイダの実装が必要になりました。次のコードを
tokenprovider.pyという名前のファイルに保存します。import base64 import datetime import http.server import json import google.auth from google.auth.transport.urllib3 import Request import urllib3 import time def encode(source): """Safe base64 encoding.""" return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=') class TokenProvider(object): """ Provides OAuth tokens from Google Cloud Application Default credentials. """ HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'}) def __init__(self, **config): self.credentials, _project = google.auth.default() self.http_client = urllib3.PoolManager() def get_credentials(self): if not self.credentials.valid: self.credentials.refresh(Request(self.http_client)) return self.credentials def get_jwt(self, creds): token_data = dict( exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(), iat=datetime.datetime.now(datetime.timezone.utc).timestamp(), iss='Google', sub=creds.service_account_email ) return json.dumps(token_data) def get_token(self, args): creds = self.get_credentials() token = '.'.join([ encode(self.HEADER), encode(self.get_jwt(creds)), encode(creds.token) ]) # compute expiry time expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc) now_utc = datetime.datetime.now(datetime.timezone.utc) expiry_seconds = (expiry_utc - now_utc).total_seconds() return token, time.time() + expiry_secondsこれで、アプリケーションを実行する準備ができました。
python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、 次の手順を実施します。
コンソール
VM インスタンスを削除します。
[VM インスタンス] ページに移動します。
VM を選択して [削除] をクリックします。
Kafka クラスタを削除します。
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
Kafka クラスタを選択して [削除] をクリックします。
gcloud
VM を削除するには、
gcloud compute instances deleteコマンドを使用します。gcloud compute instances delete VM_NAME --zone=ZONEKafka クラスタを削除するには、
gcloud managed-kafka clusters deleteコマンドを使用します。gcloud managed-kafka clusters delete CLUSTER_ID \ --location=REGION --async