Duo エンティティ コンテキスト ログを収集する
このドキュメントでは、Google Cloud Storage を使用して Duo エンティティ コンテキスト データを Google Security Operations に取り込む方法について説明します。パーサーは、まず未加工の JSON からフィールドを抽出し、それらのフィールドを UDM 属性にマッピングすることで、JSON ログを統合データモデル(UDM)に変換します。ユーザーとアセットの情報、ソフトウェアの詳細、セキュリティ ラベルなど、さまざまなデータ シナリオを処理し、UDM スキーマ内で包括的に表現します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Duo テナントへの特権アクセス(アプリケーションを管理するのに十分な管理者権限を持つ Admin API アプリケーション)
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
Duo Admin API アプリケーションを構成する
- Duo 管理パネルにログインします。
- [Applications] > [Protect an Application] に移動します。
- [Admin API] を検索し、[保護] をクリックします。
- 次の値を記録します。
- インテグレーション キー(ikey)
- 秘密鍵(skey)
- API ホスト名(例:
api-XXXXXXXX.duosecurity.com)
- [権限] で、[リソースの付与 - 読み取り](ユーザー、グループ、電話、エンドポイント、トークン、WebAuthn 認証情報を読み取るため)を有効にします。
[保存] をクリックします。
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( duo-contextなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
バケット名とリージョンを保存して、後で参照できるようにします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
duo-entity-context-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Duo entity context data」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: GCS バケットにログを書き込む
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
duo-entity-context-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
duo-entity-context-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
エンティティ コンテキスト データを収集する Cloud Run 関数を作成する
Cloud Run functions の関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Duo Admin API からエンティティ コンテキスト データを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 duo-entity-context-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(
duo-entity-context-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
duo-entity-context-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETduo-contextGCS_PREFIXduo/context/DUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.comLIMIT100RESOURCESusers,groups,phones,endpoints,tokens,webauthncredentials[変数とシークレット] セクションで、[リクエスト] までスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import time import hmac import hashlib import base64 import email.utils import urllib.parse from urllib.request import Request, urlopen # Environment variables DUO_IKEY = os.environ["DUO_IKEY"] DUO_SKEY = os.environ["DUO_SKEY"] DUO_API_HOSTNAME = os.environ["DUO_API_HOSTNAME"].strip() GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "duo/context/") # Default resources can be adjusted via ENV RESOURCES = [r.strip() for r in os.environ.get("RESOURCES", "users,groups,phones,endpoints,tokens,webauthncredentials,desktop_authenticators").split(",") if r.strip()] # Duo paging: default 100; max varies by endpoint LIMIT = int(os.environ.get("LIMIT", "100")) # Initialize Storage client storage_client = storage.Client() def _canon_params(params: dict) -> str: """RFC3986 encoding with '~' unescaped, keys sorted lexicographically.""" if not params: return "" parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue ks = urllib.parse.quote(str(k), safe="~") vs = urllib.parse.quote(str(v), safe="~") parts.append(f"{ks}={vs}") return "&".join(parts) def _sign(method: str, host: str, path: str, params: dict) -> dict: """Construct Duo Admin API Authorization + Date headers (HMAC-SHA1).""" now = email.utils.formatdate() canon = "\n".join([ now, method.upper(), host.lower(), path, _canon_params(params) ]) sig = hmac.new( DUO_SKEY.encode("utf-8"), canon.encode("utf-8"), hashlib.sha1 ).hexdigest() auth = base64.b64encode(f"{DUO_IKEY}:{sig}".encode("utf-8")).decode("utf-8") return { "Date": now, "Authorization": f"Basic {auth}" } def _call(method: str, path: str, params: dict) -> dict: host = DUO_API_HOSTNAME assert host.startswith("api-") and host.endswith(".duosecurity.com"), \ "DUO_API_HOSTNAME must be e.g. api-XXXXXXXX.duosecurity.com" qs = _canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if method.upper() == "GET" and qs else "") req = Request(url, method=method.upper()) for k, v in _sign(method, host, path, params).items(): req.add_header(k, v) with urlopen(req, timeout=60) as r: return json.loads(r.read().decode("utf-8")) def _write_json(obj: dict, when: float, resource: str, page: int) -> str: bucket = storage_client.bucket(GCS_BUCKET) prefix = GCS_PREFIX.strip("/") + "/" if GCS_PREFIX else "" key = f"{prefix}{time.strftime('%Y/%m/%d', time.gmtime(when))}/duo-{resource}-{page:05d}.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(obj, separators=(",", ":")), content_type="application/json" ) return key def _fetch_resource(resource: str) -> dict: """Fetch all pages for a list endpoint using limit/offset + metadata.next_offset.""" path = f"/admin/v1/{resource}" offset = 0 page = 0 now = time.time() total_items = 0 while True: params = {"limit": LIMIT, "offset": offset} data = _call("GET", path, params) _write_json(data, now, resource, page) page += 1 resp = data.get("response") # most endpoints return a list; if not a list, count as 1 object page if isinstance(resp, list): total_items += len(resp) elif resp is not None: total_items += 1 meta = data.get("metadata") or {} next_offset = meta.get("next_offset") if next_offset is None: break # Duo returns next_offset as int try: offset = int(next_offset) except Exception: break return { "resource": resource, "pages": page, "objects": total_items } @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Duo entity context data and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ results = [] for res in RESOURCES: print(f"Fetching resource: {res}") result = _fetch_resource(res) results.append(result) print(f"Completed {res}: {result['pages']} pages, {result['objects']} objects") print(f"All resources fetched successfully: {results}")- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.*[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 duo-entity-context-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック( duo-entity-context-trigger)を選択するメッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
データの鮮度要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 1 時間ごと 0 * * * *標準(推奨) 2 時間ごと 0 */2 * * *ある程度新しい 6 時間ごと 0 */6 * * *更新頻度が低い 毎日 0 0 * * *最小限の更新
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブ(
duo-entity-context-hourly)を見つけます。 - [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run> サービス> duo-entity-context-collector > ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、エンティティ コンテキスト データが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Entity Context)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] で [Duo エンティティ コンテキスト データ] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Duo エンティティ コンテキスト データを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Entity Context)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] で [Duo エンティティ コンテキスト データ] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://duo-context/duo/context/次のように置き換えます。
duo-context: GCS バケット名。duo/context/: ログが保存される接頭辞/フォルダパス(GCS_PREFIX環境変数と一致する必要があります)。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| 有効 | entity.asset.deployment_status | 「activated」が false の場合は「DECOMISSIONED」に設定し、それ以外の場合は「ACTIVE」に設定します。 |
| browsers.browser_family | entity.asset.software.name | 未加工ログの「browsers」配列から抽出されます。 |
| browsers.browser_version | entity.asset.software.version | 未加工ログの「browsers」配列から抽出されます。 |
| device_name | entity.asset.hostname | 未加工ログから直接マッピングされます。 |
| disk_encryption_status | entity.asset.attribute.labels.key: 「disk_encryption_status」、entity.asset.attribute.labels.value | 未加工ログから直接マッピングされ、小文字に変換されます。 |
| メール | entity.user.email_addresses | 「@」が含まれている場合は未加工ログから直接マッピングされます。それ以外の場合は、「@」が含まれている場合は「username」または「username1」が使用されます。 |
| 暗号化 | entity.asset.attribute.labels.key: 「暗号化済み」、entity.asset.attribute.labels.value | 未加工ログから直接マッピングされ、小文字に変換されます。 |
| epkey | entity.asset.product_object_id | 存在する場合は「product_object_id」として使用されます。存在しない場合は「phone_id」または「token_id」が使用されます。 |
| fingerprint | entity.asset.attribute.labels.key: 「指紋」、entity.asset.attribute.labels.value | 未加工ログから直接マッピングされ、小文字に変換されます。 |
| firewall_status | entity.asset.attribute.labels.key: 「firewall_status」、entity.asset.attribute.labels.value | 未加工ログから直接マッピングされ、小文字に変換されます。 |
| hardware_uuid | entity.asset.asset_id | 存在する場合は「asset_id」として使用され、存在しない場合は「user_id」が使用されます。 |
| last_seen | entity.asset.last_discover_time | ISO8601 タイムスタンプとして解析され、マッピングされます。 |
| モデル | entity.asset.hardware.model | 未加工ログから直接マッピングされます。 |
| 数値 | entity.user.phone_numbers | 未加工ログから直接マッピングされます。 |
| os_family | entity.asset.platform_software.platform | 値に基づいて、「WINDOWS」、「LINUX」、「MAC」にマッピングされます(大文字と小文字を区別しない)。 |
| os_version | entity.asset.platform_software.platform_version | 未加工ログから直接マッピングされます。 |
| password_status | entity.asset.attribute.labels.key: 「password_status」、entity.asset.attribute.labels.value | 未加工ログから直接マッピングされ、小文字に変換されます。 |
| phone_id | entity.asset.product_object_id | 「epkey」がない場合は「product_object_id」として使用し、それ以外の場合は「token_id」を使用します。 |
| security_agents.security_agent | entity.asset.software.name | 未加工ログの「security_agents」配列から抽出されます。 |
| security_agents.version | entity.asset.software.version | 未加工ログの「security_agents」配列から抽出されます。 |
| timestamp | entity.metadata.collected_timestamp | metadata オブジェクト内の collected_timestamp フィールドに入力します。 |
| token_id | entity.asset.product_object_id | 「epkey」と「phone_id」が存在しない場合、「product_object_id」として使用されます。 |
| trusted_endpoint | entity.asset.attribute.labels.key: 「trusted_endpoint」、entity.asset.attribute.labels.value | 未加工ログから直接マッピングされ、小文字に変換されます。 |
| type | entity.asset.type | 未加工ログの「type」に「mobile」が含まれている場合(大文字と小文字を区別しない)は「MOBILE」に設定し、それ以外の場合は「LAPTOP」に設定します。 |
| user_id | entity.asset.asset_id | 'hardware_uuid' が存在しない場合、'asset_id' として使用されます。 |
| users.email | entity.user.email_addresses | 「users」配列の最初のユーザーで「@」が含まれている場合は、「email_addresses」として使用されます。 |
| users.username | entity.user.userid | 「@」の前のユーザー名を抽出し、'users' 配列の最初のユーザーである場合は 'userid' として使用します。 |
| entity.metadata.vendor_name | 「Duo」 | |
| entity.metadata.product_name | 「Duo エンティティ コンテキスト データ」 | |
| entity.metadata.entity_type | アセット | |
| entity.relations.entity_type | ユーザー | |
| entity.relations.relationship | 所有 |
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。