Python プロデューサー アプリケーションを開発する

アプリケーションのデフォルト認証情報(ADC)を使用して Managed Service for Apache Kafka クラスタで認証を行う Python プロデューサー アプリケーションを開発する方法を学習します。ADC を使用すると、 Google Cloud で実行されているアプリケーションは、 Google Cloud サービスへの認証に使用する適切な認証情報を自動的に検出して使用できます。

始める前に

このチュートリアルを開始する前に、新しい Managed Service for Apache Kafka クラスタを作成します。すでにクラスタがある場合は、この手順をスキップできます。

クラスタを作成する方法

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [ 作成] をクリックします。
  3. [クラスタ名] フィールドに、クラスタの名前を入力します。
  4. [リージョン] リストで、クラスタのロケーションを選択します。
  5. [ネットワーク構成] で、クラスタがアクセスできるサブネットを構成します。
    1. [プロジェクト] で、該当するプロジェクトを選択します。
    2. [ネットワーク] で、VPC ネットワークを選択します。
    3. [サブネット] でサブネットを選択します。
    4. [完了] をクリックします。
  6. [作成] をクリックします。

[作成] をクリックすると、クラスタの状態が Creating になります。クラスタの準備が整うと、状態は Active になります。

gcloud

Kafka クラスタを作成するには、managed-kafka clusters create コマンドを実行します。

gcloud managed-kafka clusters create KAFKA_CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
--async

次のように置き換えます。

  • KAFKA_CLUSTER: Kafka クラスタの名前
  • REGION: クラスタのロケーション
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: クラスタを作成するサブネット(例: default

サポートされているロケーションについては、 Managed Service for Apache Kafka のロケーションをご覧ください。

このコマンドは非同期で実行され、オペレーション ID を返します。

Check operation [projects/PROJECT_ID/locations/REGION/operations/OPERATION_ID] for status.

作成オペレーションの進行状況を追跡するには、gcloud managed-kafka operations describe コマンドを使用します。

gcloud managed-kafka operations describe OPERATION_ID \
  --location=REGION

クラスタの準備ができると、このコマンドの出力に state: ACTIVE エントリが含まれます。詳細については、 クラスタ作成オペレーションをモニタリングするをご覧ください。

必要なロール

クライアント VM の作成と構成に必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

クライアント VM を設定する

Kafka クラスタにアクセスできる Linux 仮想マシン(VM)インスタンスを Compute Engine に作成します。VM を構成するときに、次のオプションを設定します。

  • リージョン。Kafka クラスタと同じリージョンに VM を作成します。

  • サブネット。Kafka クラスタ構成で使用したサブネットと同じ VPC ネットワークに VM を作成します。詳細については、クラスタのサブネットを表示するをご覧ください。

  • アクセス スコープhttps://www.googleapis.com/auth/cloud-platform アクセス スコープを VM に割り当てます。このスコープにより、VM は Managed Kafka API にリクエストを送信できます。

これらのオプションを設定する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで [インスタンスの作成] ページに移動します。

    インスタンスを作成する

  2. [マシンの構成] ペインで、次の操作を行います。

    1. [名前] フィールドに、インスタンスの名前を指定します。詳細については、リソースの命名規則をご覧ください。

    2. [リージョン] リストで、Kafka クラスタと同じリージョンを選択します。

    3. [ゾーン] リストでゾーンを選択します。

  3. ナビゲーション メニューで、[ネットワーキング] をクリックします。表示された [ネットワーキング] ペインで、次の操作を行います。

    1. [ネットワーク インターフェース] セクションに移動します。

    2. デフォルトのネットワーク インターフェースを開くには、 矢印をクリックします。

    3. [ネットワーク] フィールドで、VPC ネットワークを選択します。

    4. [サブネットワーク] リストで、サブネットを選択します。

    5. [完了] をクリックします。

  4. ナビゲーション メニューで [セキュリティ] をクリックします。表示された [セキュリティ] ペインで、次の操作を行います。

    1. [アクセス スコープ] で、[各 API にアクセス権を設定] を選択します。

    2. アクセス スコープのリストで、[Cloud Platform] プルダウン リストを見つけて、[有効] を選択します。

  5. [作成] をクリックして VM を作成します。

gcloud

VM インスタンスを作成するには、gcloud compute instances create コマンドを使用します。

gcloud compute instances create VM_NAME \
  --scopes=https://www.googleapis.com/auth/cloud-platform \
  --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • REGION: Kafka クラスタを作成したリージョン(例: us-central1
  • SUBNET: クラスタ構成で使用したサブネットと同じ VPC ネットワーク内のサブネット
  • ZONE: クラスタを作成したリージョンのゾーン(us-central1-c など)

VM の作成の詳細については、特定のサブネットに VM インスタンスを作成するをご覧ください。

IAM ロールを付与する

Compute Engine のデフォルト サービス アカウントに次の Identity and Access Management(IAM)ロールを付与します。

  • マネージド Kafka クライアントroles/managedkafka.client
  • サービス アカウント トークン作成者roles/iam.serviceAccountTokenCreator
  • サービス アカウントの OpenID トークン作成者roles/iam.serviceAccountOpenIdTokenCreator

コンソール

  1. Google Cloud コンソールで、[IAM] ページに移動します。

    [IAM] に移動

  2. [Compute Engine のデフォルトのサービス アカウント] の行を見つけて、 [プリンシパルを編集します] をクリックします。

  3. [別のロールを追加] をクリックし、ロール [Managed Kafka クライアント] を選択します。サービス アカウント トークン作成者ロールとサービス アカウント OpenID トークン作成者ロールについても、この手順を繰り返します。

  4. [保存] をクリックします。

gcloud

IAM ロールを付与するには、gcloud projects add-iam-policy-binding コマンドを使用します。

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/managedkafka.client

gcloud projects add-iam-policy-binding PROJECT_ID\
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
  --role=roles/iam.serviceAccountOpenIdTokenCreator

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID

  • PROJECT_NUMBER: プロジェクトの番号

プロジェクト番号を取得するには、gcloud projects describe コマンドを実行します。

gcloud projects describe PROJECT_ID

詳細については、プロジェクト名、番号、ID を確認するをご覧ください。

VM に接続する

SSH を使用して VM インスタンスに接続します。

コンソール

  1. [VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  2. VM インスタンスのリストで、VM 名を見つけて [SSH] をクリックします。

gcloud

VM に接続するには、gcloud compute ssh コマンドを使用します。

gcloud compute ssh VM_NAME \
  --project=PROJECT_ID \
  --zone=ZONE

次のように置き換えます。

  • VM_NAME: VM の名前
  • PROJECT_ID: プロジェクト ID
  • ZONE: VM を作成したゾーン

SSH を初めて使用する場合は、追加の構成が必要になることがあります。詳細については、SSH 接続についてをご覧ください。

Python プロデューサー アプリケーションを作成する

SSH セッションで、次のコマンドを実行してプロデューサー アプリケーションを作成します。

  1. Python パッケージ マネージャーと仮想環境マネージャーである pip をインストールします。

    sudo apt install python3-pip -y
    sudo apt install python3-venv -y
    
  2. 新しい仮想環境(venv)を作成して有効にします。

    python3 -m venv kafka
    source kafka/bin/activate
    
  3. confluent-kafka クライアントとその他の依存関係をインストールします。

    pip install confluent-kafka google-auth urllib3 packaging
    
  4. 次のプロデューサー クライアント コードを producer.py という名前のファイルにコピーします。

    import confluent_kafka
    import argparse
    from tokenprovider import TokenProvider
    
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--bootstrap-servers', dest='bootstrap', type=str, required=True)
    parser.add_argument('-t', '--topic-name', dest='topic_name', type=str, default='example-topic', required=False)
    parser.add_argument('-n', '--num_messages', dest='num_messages', type=int, default=1, required=False)
    args = parser.parse_args()
    
    token_provider = TokenProvider()
    
    config = {
        'bootstrap.servers': args.bootstrap,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'OAUTHBEARER',
        'oauth_cb': token_provider.get_token,
    }
    
    producer = confluent_kafka.Producer(config)
    
    def callback(error, message):
        if error is not None:
            print(error)
            return
        print("Delivered a message to {}[{}]".format(message.topic(), message.partition()))
    
    for i in range(args.num_messages):
    
      message = f"{i} hello world!".encode('utf-8')
      producer.produce(args.topic_name, message, callback=callback)
    
    producer.flush()
    
  5. OAuth トークン プロバイダの実装が必要になりました。次のコードを tokenprovider.py という名前のファイルに保存します。

    import base64
    import datetime
    import http.server
    import json
    import google.auth
    from google.auth.transport.urllib3 import Request
    import urllib3
    import time
    
    def encode(source):
      """Safe base64 encoding."""
      return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8').rstrip('=')
    
    class TokenProvider(object):
      """
      Provides OAuth tokens from Google Cloud Application Default credentials.
      """
      HEADER = json.dumps({'typ':'JWT', 'alg':'GOOG_OAUTH2_TOKEN'})
    
      def __init__(self, **config):
        self.credentials, _project = google.auth.default()
        self.http_client = urllib3.PoolManager()
    
      def get_credentials(self):
        if not self.credentials.valid:
          self.credentials.refresh(Request(self.http_client))
        return self.credentials
    
      def get_jwt(self, creds):
        token_data = dict(
          exp=creds.expiry.replace(tzinfo=datetime.timezone.utc).timestamp(),
          iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
          iss='Google',
          sub=creds.service_account_email
        )
        return json.dumps(token_data)
    
      def get_token(self, args):
        creds = self.get_credentials()
        token = '.'.join([
          encode(self.HEADER),
          encode(self.get_jwt(creds)),
          encode(creds.token)
        ])
    
        # compute expiry time
        expiry_utc = creds.expiry.replace(tzinfo=datetime.timezone.utc)
        now_utc = datetime.datetime.now(datetime.timezone.utc)
        expiry_seconds = (expiry_utc - now_utc).total_seconds()
    
        return token, time.time() + expiry_seconds
    
  6. これで、アプリケーションを実行する準備が整いました。

    python producer.py -b bootstrap.CLUSTER_ID.REGION.managedkafka.PROJECT_ID.cloud.goog:9092
    

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、次の手順を実施します。

コンソール

  1. VM インスタンスを削除します。

    1. [VM インスタンス] ページに移動します。

      [VM インスタンス] に移動

    2. VM を選択し、[削除] をクリックします。

  2. Kafka クラスタを削除します。

    1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

      [クラスタ] に移動

    2. Kafka クラスタを選択し、[削除] をクリックします。

gcloud

  1. VM を削除するには、gcloud compute instances delete コマンドを使用します。

    gcloud compute instances delete VM_NAME --zone=ZONE
    
  2. Kafka クラスタを削除するには、gcloud managed-kafka clusters delete コマンドを使用します。

    gcloud managed-kafka clusters delete CLUSTER_ID \
      --location=REGION --async
    

次のステップ

Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。