Duo 管理者ログを収集する
このドキュメントでは、Google Cloud Storage を使用して Duo 管理者ログを Google Security Operations に取り込む方法について説明します。パーサーはログ(JSON 形式)からフィールドを抽出し、Unified Data Model(UDM)にマッピングします。さまざまな Duo アクション タイプ(ログイン、ユーザー管理、グループ管理)を個別に処理し、アクションと利用可能なデータ(ユーザーの詳細、認証要素、セキュリティ結果など)に基づいて関連する UDM フィールドに入力します。また、IP アドレスの統合、タイムスタンプの変換、エラーの処理などのデータ変換も行います。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run 関数、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- Duo テナントへの特権アクセス(Admin API アプリケーション)
Duo Admin API アプリケーションを構成する
- Duo 管理パネルにログインします。
- [Applications] > [Application Catalog] に移動します。
- Admin API アプリケーションを追加します。
- 次の値を記録します。
- インテグレーション キー(ikey)
- 秘密鍵(skey)
- API ホスト名(例:
api-XXXXXXXX.duosecurity.com)
- [権限] で、[読み取りログの付与](管理者ログを読み取るため)を有効にします。
- アプリケーションを保存します。
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( duo-admin-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込む権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
duo-admin-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Duo administrator logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次の操作を行います。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレスを入力します。
- ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
duo-admin-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Duo Admin API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 duo-admin-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック(
duo-admin-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
duo-admin-collector-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETduo-admin-logsGCS_PREFIXduo/adminSTATE_KEYduo/admin/state.jsonDUO_IKEYDIXYZ...DUO_SKEY****************DUO_API_HOSTNAMEapi-XXXXXXXX.duosecurity.com[変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone import hmac import hashlib import base64 import email.utils import urllib.parse import time # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Duo Admin logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'duo/admin') state_key = os.environ.get('STATE_KEY', 'duo/admin/state.json') # Duo API credentials duo_ikey = os.environ.get('DUO_IKEY') duo_skey = os.environ.get('DUO_SKEY') duo_api_hostname = os.environ.get('DUO_API_HOSTNAME', '').strip() if not all([bucket_name, duo_ikey, duo_skey, duo_api_hostname]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state (last processed timestamp) state = load_state(bucket, state_key) now = int(time.time()) mintime = state.get('mintime', now - 3600) print(f'Processing logs since {mintime}') # Fetch logs from Duo Admin API page = 0 total = 0 next_mintime = mintime max_seen_ts = mintime while True: page_num = 0 data = duo_api_request( duo_ikey, duo_skey, duo_api_hostname, 'GET', '/admin/v1/logs/administrator', {'mintime': mintime} ) # Write page to GCS write_page(bucket, prefix, data, now, page) page += 1 # Extract items resp = data.get('response') items = resp if isinstance(resp, list) else (resp.get('items') if isinstance(resp, dict) else []) items = items or [] if not items: break total += len(items) # Track the newest timestamp in this batch for it in items: ts = epoch_from_item(it) if ts and ts > max_seen_ts: max_seen_ts = ts # Duo returns only the 1000 earliest events; page by advancing mintime if len(items) >= 1000 and max_seen_ts >= mintime: mintime = max_seen_ts next_mintime = max_seen_ts continue else: break # Save checkpoint: newest seen ts, or "now" if nothing new if max_seen_ts > next_mintime: save_state(bucket, state_key, {'mintime': max_seen_ts}) next_state = max_seen_ts else: save_state(bucket, state_key, {'mintime': now}) next_state = now print(f'Successfully processed {total} events across {page} pages, next_mintime: {next_state}') except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}') def write_page(bucket, prefix, payload, when, page): """Write a page of logs to GCS.""" try: timestamp_str = time.strftime('%Y/%m/%d', time.gmtime(when)) key = f"{prefix}/{timestamp_str}/duo-admin-{page:05d}.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(payload, separators=(',', ':')), content_type='application/json' ) print(f'Wrote page {page} to {key}') except Exception as e: print(f'Error writing page {page}: {str(e)}') raise def canon_params(params): """Canonicalize parameters for Duo API signature.""" parts = [] for k in sorted(params.keys()): v = params[k] if v is None: continue parts.append(f"{urllib.parse.quote(str(k), '~')}={urllib.parse.quote(str(v), '~')}") return "&".join(parts) def sign_request(method, host, path, params, ikey, skey): """Sign Duo API request.""" now = email.utils.formatdate() canon = "\n".join([ now, method.upper(), host.lower(), path, canon_params(params) ]) sig = hmac.new(skey.encode('utf-8'), canon.encode('utf-8'), hashlib.sha1).hexdigest() auth = base64.b64encode(f"{ikey}:{sig}".encode()).decode() return { 'Date': now, 'Authorization': f'Basic {auth}' } def duo_api_request(ikey, skey, host, method, path, params, timeout=60, max_retries=5): """Make a signed request to Duo Admin API with retry logic.""" assert host.startswith('api-') and host.endswith('.duosecurity.com'), \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" qs = canon_params(params) url = f"https://{host}{path}" + (f"?{qs}" if qs else "") attempt = 0 backoff = 1.0 while True: headers = sign_request(method, host, path, params, ikey, skey) headers['Accept'] = 'application/json' try: response = http.request(method.upper(), url, headers=headers, timeout=timeout) return json.loads(response.data.decode('utf-8')) except urllib3.exceptions.HTTPError as e: # Retry on 429 or 5xx if hasattr(e, 'status') and (e.status == 429 or 500 <= e.status <= 599) and attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise except Exception as e: if attempt < max_retries: time.sleep(backoff) attempt += 1 backoff *= 2 continue raise def epoch_from_item(item): """Extract epoch timestamp from log item.""" # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' ts_num = item.get('timestamp') if isinstance(ts_num, (int, float)): return int(ts_num) ts_iso = item.get('ts') if isinstance(ts_iso, str): try: # Accept "...Z" or with offset return int(datetime.fromisoformat(ts_iso.replace('Z', '+00:00')).timestamp()) except Exception: return None return None- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 duo-admin-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック トピックを選択する( duo-admin-trigger)メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run] > [サービス] > [duo-admin-collector] > [ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、ログが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Administrator Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Duo 管理者ログ] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Duo 管理者ログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Duo Administrator Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Duo 管理者ログ] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://duo-admin-logs/duo/admin/次のように置き換えます。
duo-admin-logs: GCS バケット名。duo/admin: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
例:
- ルートバケット:
gs://company-logs/ - 接頭辞あり:
gs://company-logs/duo-logs/ - サブフォルダあり:
gs://company-logs/duo/admin/
- ルートバケット:
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | 論理 |
|---|---|---|
| アクション | metadata.product_event_type | 未加工ログのアクション フィールドの値。 |
| 降順 | metadata.description | 未加工ログの説明オブジェクトの desc フィールドの値。 |
| description._status | target.group.attribute.labels.value | 未加工ログの description オブジェクト内の _status フィールドの値(特にグループ関連のアクションを処理する場合)。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。 |
| description.desc | metadata.description | 未加工ログの説明オブジェクトの desc フィールドの値。 |
| description.email | target.user.email_addresses | 未加工ログの説明オブジェクトのメール フィールドの値。 |
| description.error | security_result.summary | 未加工ログの説明オブジェクトのエラー フィールドの値。 |
| description.factor | extensions.auth.auth_details | 未加工ログの説明オブジェクトの factor フィールドの値。 |
| description.groups.0._status | target.group.attribute.labels.value | 未加工ログの説明オブジェクト内の groups 配列の最初の要素の _status フィールドの値。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。 |
| description.groups.0.name | target.group.group_display_name | 未加工ログの説明オブジェクト内の groups 配列の最初の要素の name フィールドの値。 |
| description.ip_address | principal.ip | 未加工ログの説明オブジェクトの ip_address フィールドの値。 |
| description.name | target.group.group_display_name | 未加工ログの説明オブジェクトの name フィールドの値。 |
| description.realname | target.user.user_display_name | 未加工ログの説明オブジェクトの realname フィールドの値。 |
| description.status | target.user.attribute.labels.value | 未加工ログの説明オブジェクトのステータス フィールドの値。この値は、「status」という対応する「キー」を持つ「labels」配列内に配置されます。 |
| description.uname | target.user.email_addresses または target.user.userid | 未加工ログの説明オブジェクトの uname フィールドの値。メールアドレス形式と一致する場合は email_addresses にマッピングされ、それ以外の場合は userid にマッピングされます。 |
| ホスト | principal.hostname | 未加工ログの host フィールドの値。 |
| isotimestamp | metadata.event_timestamp.seconds | 未加工ログの isotimestamp フィールドの値。エポック秒に変換されます。 |
| オブジェクト | target.group.group_display_name | 未加工ログのオブジェクト フィールドの値。 |
| timestamp | metadata.event_timestamp.seconds | 未加工ログのタイムスタンプ フィールドの値。 |
| ユーザー名 | target.user.userid または principal.user.userid | action フィールドに「login」が含まれている場合、値は target.user.userid にマッピングされます。それ以外の場合は、principal.user.userid にマッピングされます。 |
| - | extensions.auth.mechanism | アクション フィールドに「login」が含まれている場合は、「USERNAME_PASSWORD」に設定されます。 |
| - | metadata.event_type | アクション フィールドに基づいてパーサーによって決定されます。有効な値: USER_LOGIN、GROUP_CREATION、USER_UNCATEGORIZED、GROUP_DELETION、USER_CREATION、GROUP_MODIFICATION、GENERIC_EVENT。 |
| - | metadata.product_name | 常に「DUO_ADMIN」に設定します。 |
| - | metadata.product_version | 常に「MULTI-FACTOR_AUTHENTICATION」に設定されます。 |
| - | metadata.vendor_name | 常に「DUO_SECURITY」に設定されます。 |
| - | principal.user.user_role | eventtype フィールドに「admin」が含まれている場合は、「ADMINISTRATOR」に設定します。 |
| - | security_result.action | アクション フィールドに基づいてパーサーによって決定されます。アクション フィールドに「error」が含まれている場合は「BLOCK」に設定し、それ以外の場合は「ALLOW」に設定します。 |
| - | target.group.attribute.labels.key | target.group.attribute.labels を入力する場合は、常に「status」に設定します。 |
| - | target.user.attribute.labels.key | target.user.attribute.labels を入力するときは、常に「status」に設定します。 |
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。