Duo Telephony のログを収集する

以下でサポートされています。

このドキュメントでは、Google Cloud Storage を使用して Duo Telephony ログを Google Security Operations に取り込む方法について説明します。パーサーはログからフィールドを抽出し、変換して Unified Data Model(UDM)にマッピングします。さまざまな Duo ログ形式を処理し、タイムスタンプを変換して、ユーザー情報、ネットワークの詳細、セキュリティ結果を抽出し、最終的に出力を標準化された UDM 形式に構造化します。

始める前に

次の前提条件を満たしていることを確認します。

  • Google SecOps インスタンス
  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
  • オーナーロールによる Duo 管理パネルへの特権アクセス

Duo の前提条件(API 認証情報)を収集する

  1. オーナーロールを持つ管理者として Duo 管理パネルにログインします。
  2. [Applications] > [Application Catalog] に移動します。
  3. カタログで Admin API のエントリを見つけます。
  4. [+ 追加] をクリックして、アプリケーションを作成します。
  5. 次の詳細をコピーして安全な場所に保存します。
    • 統合キー
    • 秘密鍵
    • API ホスト名(例: api-yyyyyyyy.duosecurity.com
  6. [権限] セクションで、[電話の読み取り] 権限のチェックボックスのみをオンにし、他のすべての権限の選択を解除します。
  7. [変更を保存] をクリックします。

権限を確認する

Admin API アプリケーションに必要な権限があることを確認するには:

  1. Duo 管理パネルにログインします。
  2. [Applications] > [Protect an Application] に移動します。
  3. Admin API アプリケーションを見つけます。
  4. アプリケーション名をクリックして詳細を表示します。
  5. [権限] セクションで、[電話の読み取り] のみがチェックされていることを確認します。
  6. 他の権限がオンになっているか、電話の読み取りがオンになっていない場合は、権限を更新して [変更を保存] をクリックします。

テスト API アクセス

  • 統合に進む前に、認証情報をテストします。

    # Replace with your actual credentials
    DUO_IKEY="your-integration-key"
    DUO_SKEY="your-secret-key"
    DUO_HOST="api-yyyyyyyy.duosecurity.com"
    
    # Test API access (requires signing - use Duo's API test tool or Python script)
    # Visit https://duo.com/docs/adminapi#testing to test your credentials
    

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(duo-telephony-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「duo-telephony-collector-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Run function to collect Duo Telephony logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウントに付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(duo-telephony-logs など)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: duo-telephony-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「duo-telephony-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Duo Telephony API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 duo-telephony-logs-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(duo-telephony-trigger)を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで、次の操作を行います。

    1. [認証が必要] を選択します。
    2. Identity and Access Management(IAM)を確認します。
  7. 下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。

  8. [セキュリティ] タブに移動します。

    • サービス アカウント: サービス アカウントを選択します(duo-telephony-collector-sa)。
  9. [コンテナ] タブに移動します。

    1. [変数とシークレット] をクリックします。
    2. 環境変数ごとに [+ 変数を追加] をクリックします。
    変数名 値の例
    GCS_BUCKET duo-telephony-logs
    GCS_PREFIX duo-telephony
    STATE_KEY duo-telephony/state.json
    DUO_IKEY <your-integration-key>
    DUO_SKEY <your-secret-key>
    DUO_API_HOST api-yyyyyyyy.duosecurity.com
    MAX_ITERATIONS 10
  10. [変数とシークレット] タブで [リクエスト] まで下にスクロールします。

    • リクエストのタイムアウト: 600 秒(10 分)を入力します。
  11. [コンテナ] の [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: [1] を選択します。
    • [完了] をクリックします。
  12. [実行環境] までスクロールします。

    • [デフォルト](推奨)を選択します。
  13. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数] に「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  14. [作成] をクリックします。

  15. サービスが作成されるまで待ちます(1 ~ 2 分)。

  16. サービスを作成すると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [エントリ ポイント] フィールドに「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import hmac
    import hashlib
    import base64
    import urllib.parse
    import urllib3
    import email.utils
    from datetime import datetime, timedelta, timezone
    from typing import Dict, Any, List, Optional
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Duo telephony logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        # Get environment variables
        bucket_name = os.environ.get('GCS_BUCKET')
        prefix = os.environ.get('GCS_PREFIX', 'duo-telephony').rstrip('/')
        state_key = os.environ.get('STATE_KEY', 'duo-telephony/state.json')
        integration_key = os.environ.get('DUO_IKEY')
        secret_key = os.environ.get('DUO_SKEY')
        api_hostname = os.environ.get('DUO_API_HOST')
    
        if not all([bucket_name, integration_key, secret_key, api_hostname]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(bucket_name)
    
            # Load state
            state = load_state(bucket, state_key)
    
            # Calculate time range
            now = datetime.now(timezone.utc)
    
            if state.get('last_offset'):
                # Continue from last offset
                next_offset = state['last_offset']
                logs = []
                has_more = True
            else:
                # Start from last timestamp or 24 hours ago
                mintime = state.get('last_timestamp_ms', int((now - timedelta(hours=24)).timestamp() * 1000))
                # Apply 2-minute delay as recommended by Duo
                maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000)
                next_offset = None
                logs = []
                has_more = True
    
            # Fetch logs with pagination
            total_fetched = 0
            max_iterations = int(os.environ.get('MAX_ITERATIONS', '10'))
    
            while has_more and total_fetched < max_iterations:
                page_num = total_fetched + 1
    
                if next_offset:
                    # Use offset for pagination
                    params = {
                        'limit': '100',
                        'next_offset': next_offset
                    }
                else:
                    # Initial request with time range
                    params = {
                        'mintime': str(mintime),
                        'maxtime': str(maxtime),
                        'limit': '100',
                        'sort': 'ts:asc'
                    }
    
                # Make API request with retry logic
                response = duo_api_call_with_retry(
                    'GET',
                    api_hostname,
                    '/admin/v2/logs/telephony',
                    params,
                    integration_key,
                    secret_key
                )
    
                if 'items' in response:
                    logs.extend(response['items'])
                    total_fetched += 1
    
                # Check for more data
                if 'metadata' in response and 'next_offset' in response['metadata']:
                    next_offset = response['metadata']['next_offset']
                    state['last_offset'] = next_offset
                else:
                    has_more = False
                    state['last_offset'] = None
    
                    # Update timestamp for next run
                    if logs:
                        # Get the latest timestamp from logs (ISO 8601 format)
                        latest_ts = max([log.get('ts', '') for log in logs])
                        if latest_ts:
                            # Convert ISO timestamp to milliseconds
                            dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00'))
                            state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1
    
            # Save logs to GCS if any were fetched
            if logs:
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
                blob_name = f"{prefix}/telephony_{timestamp}.json"
    
                # Format logs as newline-delimited JSON
                log_data = '\n'.join(json.dumps(log) for log in logs)
    
                blob = bucket.blob(blob_name)
                blob.upload_from_string(
                    log_data,
                    content_type='application/x-ndjson'
                )
    
                print(f"Saved {len(logs)} telephony logs to gs://{bucket_name}/{blob_name}")
            else:
                print("No new telephony logs found")
    
            # Save state
            save_state(bucket, state_key, state)
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def duo_api_call_with_retry(
        method: str,
        host: str,
        path: str,
        params: Dict[str, str],
        ikey: str,
        skey: str,
        max_retries: int = 3
    ) -> Dict[str, Any]:
        """Make an authenticated API call to Duo Admin API with retry logic."""
        for attempt in range(max_retries):
            try:
                return duo_api_call(method, host, path, params, ikey, skey)
            except Exception as e:
                if '429' in str(e) or '5' in str(e)[:1]:
                    if attempt < max_retries - 1:
                        wait_time = (2 ** attempt) * 2
                        print(f"Retrying after {wait_time} seconds...")
                        import time
                        time.sleep(wait_time)
                        continue
                raise
    
    def duo_api_call(
        method: str,
        host: str,
        path: str,
        params: Dict[str, str],
        ikey: str,
        skey: str
    ) -> Dict[str, Any]:
        """Make an authenticated API call to Duo Admin API."""
        # Create canonical string for signing using RFC 2822 date format
        now = email.utils.formatdate()
        canon = [now, method.upper(), host.lower(), path]
    
        # Add parameters
        args = []
        for key in sorted(params.keys()):
            val = params[key]
            args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}")
        canon.append('&'.join(args))
    
        canon_str = '\n'.join(canon)
    
        # Sign the request
        sig = hmac.new(
            skey.encode('utf-8'),
            canon_str.encode('utf-8'),
            hashlib.sha1
        ).hexdigest()
    
        # Create authorization header
        auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8')
    
        # Build URL
        url = f"https://{host}{path}"
        if params:
            url += '?' + '&'.join(args)
    
        # Make request
        headers = {
            'Authorization': f'Basic {auth}',
            'Date': now,
            'Host': host,
            'User-Agent': 'duo-telephony-gcs-ingestor/1.0'
        }
    
        try:
            response = http.request('GET', url, headers=headers)
            data = json.loads(response.data.decode('utf-8'))
    
            if data.get('stat') == 'OK':
                return data.get('response', {})
            else:
                raise Exception(f"API error: {data.get('message', 'Unknown error')}")
        except urllib3.exceptions.HTTPError as e:
            raise Exception(f"HTTP error: {str(e)}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f'Warning: Could not load state: {str(e)}')
        return {}
    
    def save_state(bucket, key, state):
        """Save state to GCS."""
        try:
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state),
                content_type='application/json'
            )
        except Exception as e:
            print(f'Warning: Could not save state: {str(e)}')
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 duo-telephony-logs-1h
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 0 * * * *(1 時間ごとに正時)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)。
    ターゲット タイプ Pub/Sub
    トピック Pub/Sub トピック(duo-telephony-trigger)を選択する
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

  • ログの量とレイテンシの要件に基づいて頻度を選択します。

    頻度 CRON 式 ユースケース
    5 分毎 */5 * * * * 大容量、低レイテンシ
    15 分ごと */15 * * * * 検索量が普通
    1 時間ごと 0 * * * * 標準(推奨)
    6 時間ごと 0 */6 * * * 少量、バッチ処理
    毎日 0 0 * * * 履歴データの収集

スケジューラ ジョブをテストする

  1. Cloud Scheduler コンソールで、ジョブ(duo-telephony-logs-1h)を見つけます。
  2. [強制実行] をクリックして手動でトリガーします。
  3. 数秒待ってから、[Cloud Run> サービス] に移動します。
  4. 関数名(duo-telephony-logs-collector)をクリックします。
  5. [Logs] タブをクリックします。
  6. 関数が正常に実行されたことを確認します。
  7. [Cloud Storage] > [バケット] に移動します。
  8. バケット名(duo-telephony-logs)をクリックします。
  9. プレフィックス フォルダ(duo-telephony/)に移動します。
  10. ログを含む新しい .json ファイルが作成されたことを確認します。

ログにエラーが表示された場合:

  • HTTP 401: 環境変数で API 認証情報(DUO_IKEY、DUO_SKEY、DUO_API_HOST)を確認します。
  • HTTP 403: 管理 API アプリケーションで [電話の読み取り] 権限が有効になっていることを確認します。
  • HTTP 429: レート制限 - 関数は指数バックオフで自動的に再試行されます
  • 環境変数がありません: 必要な変数がすべて Cloud Run 関数の構成で設定されていることを確認します

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Duo Telephony logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Duo Telephony Logs] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(duo-telephony-logs など)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

Duo Telephony のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Duo Telephony logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Duo Telephony Logs] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://duo-telephony-logs/duo-telephony/
      
      • 次のように置き換えます。

        • duo-telephony-logs: GCS バケット名。
        • duo-telephony: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
      • 例:

        • ルートバケット: gs://duo-telephony-logs/
        • 接頭辞あり: gs://duo-telephony-logs/duo-telephony/
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル。

  9. [次へ] をクリックします。

  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング 論理
コンテキスト metadata.product_event_type 未加工ログのコンテキスト フィールドから直接マッピングされます。
クレジット security_result.detection_fields.value 未加工ログの credits フィールドから直接マッピングされ、対応するキー credits を持つ detection_fields オブジェクトの下にネストされます。
eventtype security_result.detection_fields.value 未加工ログの eventtype フィールドから直接マッピングされ、対応するキー eventtype を持つ detection_fields オブジェクトの下にネストされます。
ホスト principal.hostname IP アドレスでない場合、未加工ログのホスト フィールドから直接マッピングされます。
security_result.action パーサーで静的な値「ALLOW」に設定します。
security_result.detection_fields.value パーサーで静的な値「MECHANISM_UNSPECIFIED」に設定します。
metadata.event_timestamp ISO 8601 タイムスタンプ文字列である未加工ログの ts フィールドから解析されます。
metadata.event_type 未加工ログにコンテキスト フィールドとホスト フィールドの両方が存在する場合は、「USER_UNCATEGORIZED」に設定されます。ホストのみが存在する場合は「STATUS_UPDATE」に設定されます。それ以外の場合は、「GENERIC_EVENT」に設定されます。
metadata.log_type 未加工ログの log_type フィールドから直接取得されます。
metadata.product_name パーサーで静的な値「Telephony」に設定されます。
metadata.vendor_name パーサーで静的な値「Duo」に設定されます。
電話 principal.user.phone_numbers 未加工ログの電話番号フィールドから直接マッピングされます。
電話 principal.user.userid 未加工ログの電話番号フィールドから直接マッピングされます。
security_result.severity パーサーで静的値「INFORMATIONAL」に設定します。
security_result.summary パーサーで静的な値「Duo Telephony」に設定されます。
ts metadata.event_timestamp ISO 8601 タイムスタンプ文字列である未加工ログの ts フィールドから解析されます。
type security_result.summary 未加工ログの type フィールドから直接マッピングされます。

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。