BeyondTrust Endpoint Privilege Management(EPM)のログを収集する

以下でサポートされています。

このドキュメントでは、Google Cloud Storage を使用して BeyondTrust Endpoint Privilege Management(EPM)ログを Google Security Operations に取り込む方法について説明します。このパーサーは、BeyondTrust Endpoint からの未加工の JSON ログデータを Chronicle UDM に準拠した構造化形式に変換することに重点を置いています。まず、さまざまなフィールドのデフォルト値を初期化し、JSON ペイロードを解析します。その後、未加工ログの特定のフィールドを event.idm.read_only_udm オブジェクト内の対応する UDM フィールドにマッピングします。

始める前に

次の前提条件を満たしていることを確認します。

  • Google SecOps インスタンス
  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
  • BeyondTrust Endpoint Privilege Management テナントまたは API への特権アクセス

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(beyondtrust-epm-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

BeyondTrust EPM API 認証情報を収集する

  1. 管理者として BeyondTrust Privilege Management ウェブ コンソールにログインします。
  2. [構成> 設定> API 設定] に移動します。
  3. [Create an API Account] をクリックします。
  4. 次の構成の詳細を指定します。
    • 名前: 「Google SecOps Collector」と入力します。
    • API アクセス: 必要に応じて、監査(読み取り)などのスコープを有効にします。
  5. クライアント IDクライアント シークレットをコピーして保存します。
  6. API ベース URL をコピーします。通常は https://<your-tenant>-services.pm.beyondtrustcloud.com です(これは BPT_API_URL として使用します)。

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「beyondtrust-epm-collector-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Run function to collect BeyondTrust EPM logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウントに付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: beyondtrust-epm-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「beyondtrust-epm-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、BeyondTrust EPM API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 beyondtrust-epm-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、トピック beyondtrust-epm-trigger を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで、次の操作を行います。

    1. [認証が必要] を選択します。
    2. Identity and Access Management(IAM)を確認します。
  7. 下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。

  8. [セキュリティ] タブに移動します。

    • サービス アカウント: サービス アカウント beyondtrust-epm-collector-sa を選択します。
  9. [コンテナ] タブに移動します。

    1. [変数とシークレット] をクリックします。
    2. 環境変数ごとに [+ 変数を追加] をクリックします。
    変数名 値の例
    GCS_BUCKET beyondtrust-epm-logs
    GCS_PREFIX beyondtrust-epm/
    STATE_KEY beyondtrust-epm/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE management-api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  10. [変数とシークレット] タブで [リクエスト] まで下にスクロールします。

    • リクエストのタイムアウト: 600 秒(10 分)を入力します。
  11. [コンテナ] の [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: [1] を選択します。
    • [完了] をクリックします。
  12. [実行環境] までスクロールします。

    • [デフォルト](推奨)を選択します。
  13. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数] に「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  14. [作成] をクリックします。

  15. サービスが作成されるまで待ちます(1 ~ 2 分)。

  16. サービスを作成すると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [関数のエントリ ポイント] に「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    
    # Initialize HTTP client
    http = urllib3.PoolManager()
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch logs from BeyondTrust EPM API and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        # Get environment variables
        bucket_name = os.environ.get('GCS_BUCKET')
        prefix = os.environ.get('GCS_PREFIX', 'beyondtrust-epm/')
        state_key = os.environ.get('STATE_KEY', 'beyondtrust-epm/state.json')
    
        # BeyondTrust EPM API credentials
        api_url = os.environ.get('BPT_API_URL')
        client_id = os.environ.get('CLIENT_ID')
        client_secret = os.environ.get('CLIENT_SECRET')
        oauth_scope = os.environ.get('OAUTH_SCOPE', 'management-api')
        record_size = int(os.environ.get('RECORD_SIZE', '1000'))
        max_iterations = int(os.environ.get('MAX_ITERATIONS', '10'))
    
        if not all([bucket_name, api_url, client_id, client_secret]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(bucket_name)
    
            # Load state (last processed timestamp)
            state = load_state(bucket, state_key)
            last_timestamp = state.get('last_timestamp', (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ"))
    
            print(f'Processing logs since {last_timestamp}')
    
            # Get OAuth access token
            token = get_oauth_token(api_url, client_id, client_secret, oauth_scope)
    
            # Fetch audit events
            events = fetch_audit_events(api_url, token, last_timestamp, record_size, max_iterations)
    
            if events:
                # Store events in GCS
                current_timestamp = datetime.utcnow()
                filename = f"{prefix}beyondtrust-epm-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
                store_events_to_gcs(bucket, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                save_state(bucket, state_key, {'last_timestamp': latest_timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z'})
    
                print(f'Successfully processed {len(events)} events and stored to {filename}')
            else:
                print('No new events found')
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def get_oauth_token(api_url, client_id, client_secret, scope):
        """
        Get OAuth access token using client credentials flow for BeyondTrust EPM.
        Uses the correct endpoint: /oauth/token
        """
        token_url = f"{api_url}/oauth/token"
        headers = {'Content-Type': 'application/x-www-form-urlencoded'}
        body = f"grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope={scope}"
    
        response = http.request('POST', token_url, headers=headers, body=body, timeout=urllib3.Timeout(60.0))
    
        if response.status != 200:
            raise RuntimeError(f"Token request failed: {response.status} {response.data[:256]!r}")
    
        token_data = json.loads(response.data.decode('utf-8'))
        return token_data['access_token']
    
    def fetch_audit_events(api_url, access_token, last_timestamp, record_size, max_iterations):
        """
        Fetch audit events using the BeyondTrust EPM API endpoint: /management-api/v2/AuditEvents
        with query parameters for filtering and pagination
        """
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
    
        all_events = []
        current_start_date = last_timestamp
        iterations = 0
    
        # Enforce maximum RecordSize limit of 1000 based on BeyondTrust documentation
        record_size_limited = min(record_size, 1000)
    
        while iterations < max_iterations:
            iterations += 1
    
            if len(all_events) >= 10000:
                print(f"Reached maximum events limit (10000)")
                break
    
            # Use the BeyondTrust EPM API endpoint for audit events
            query_url = f"{api_url}/management-api/v2/AuditEvents"
            params = {
                'StartDate': current_start_date,
                'RecordSize': record_size_limited
            }
    
            # Construct URL with query parameters
            query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
            full_url = f"{query_url}?{query_string}"
    
            try:
                response = http.request('GET', full_url, headers=headers, timeout=urllib3.Timeout(300.0))
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', '30'))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    continue
    
                if response.status != 200:
                    raise RuntimeError(f"API request failed: {response.status} {response.data[:256]!r}")
    
                response_data = json.loads(response.data.decode('utf-8'))
                events = response_data.get('events', [])
    
                if not events:
                    break
    
                print(f"Page {iterations}: Retrieved {len(events)} events")
                all_events.extend(events)
    
                # If we got fewer events than RecordSize, we've reached the end
                if len(events) < record_size_limited:
                    break
    
                # For pagination, update StartDate to the timestamp of the last event
                last_event = events[-1]
                last_timestamp = extract_event_timestamp(last_event)
    
                if not last_timestamp:
                    print("Warning: Could not find timestamp in last event for pagination")
                    break
    
                # Convert to datetime and add 1 second to avoid retrieving the same event again
                try:
                    dt = parse_timestamp(last_timestamp)
                    dt = dt + timedelta(seconds=1)
                    current_start_date = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
                except Exception as e:
                    print(f"Error parsing timestamp {last_timestamp}: {e}")
                    break
    
            except Exception as e:
                print(f"Error fetching page {iterations}: {e}")
                break
    
        return all_events
    
    def extract_event_timestamp(event):
        """Extract timestamp from event, checking multiple possible fields"""
        # Check common timestamp fields
        timestamp_fields = ['event.dateTime', 'event.timestamp', 'timestamp', 'eventTime', 'dateTime', 'whenOccurred', 'date', 'time', 'event.ingested']
    
        # Try nested event.dateTime first (common in BeyondTrust)
        if isinstance(event, dict) and isinstance(event.get("event"), dict):
            ts = event["event"].get("dateTime")
            if ts:
                return ts
            ts = event["event"].get("timestamp")
            if ts:
                return ts
    
        # Fallback to other timestamp fields
        for field in timestamp_fields:
            if field in event and event[field]:
                return event[field]
    
        return None
    
    def parse_timestamp(timestamp_str):
        """Parse timestamp string to datetime object, handling various formats"""
        if isinstance(timestamp_str, (int, float)):
            # Unix timestamp (in milliseconds or seconds)
            if timestamp_str > 1e12:  # Milliseconds
                return datetime.fromtimestamp(timestamp_str / 1000, tz=timezone.utc)
            else:  # Seconds
                return datetime.fromtimestamp(timestamp_str, tz=timezone.utc)
    
        if isinstance(timestamp_str, str):
            # Try different string formats
            try:
                # ISO format with Z
                if timestamp_str.endswith('Z'):
                    return datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
                # ISO format with timezone
                elif '+' in timestamp_str or timestamp_str.endswith('00:00'):
                    return datetime.fromisoformat(timestamp_str)
                # ISO format without timezone (assume UTC)
                else:
                    dt = datetime.fromisoformat(timestamp_str)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt
            except ValueError:
                pass
    
        raise ValueError(f"Could not parse timestamp: {timestamp_str}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f'Warning: Could not load state: {str(e)}')
        return {}
    
    def save_state(bucket, key, state):
        """Save state to GCS."""
        try:
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state),
                content_type='application/json'
            )
        except Exception as e:
            print(f'Warning: Could not save state: {str(e)}')
    
    def store_events_to_gcs(bucket, key, events):
        """Store events as JSONL (one JSON object per line) in GCS"""
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = '\n'.join(json.dumps(event, default=str) for event in events)
    
        blob = bucket.blob(key)
        blob.upload_from_string(jsonl_content, content_type='application/x-ndjson')
    
    def get_latest_event_timestamp(events):
        """Get the latest timestamp from the events for state tracking"""
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            timestamp = extract_event_timestamp(event)
            if timestamp:
                try:
                    event_dt = parse_timestamp(timestamp)
                    event_iso = event_dt.isoformat() + 'Z'
                    if latest is None or event_iso > latest:
                        latest = event_iso
                except Exception as e:
                    print(f"Error parsing event timestamp {timestamp}: {e}")
                    continue
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
    • 2 つ目のファイル: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 beyondtrust-epm-collector-hourly
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 0 * * * *(1 時間ごとに正時)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)。
    ターゲット タイプ Pub/Sub
    トピック トピック beyondtrust-epm-trigger を選択します。
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

  • ログの量とレイテンシの要件に基づいて頻度を選択します。

    頻度 CRON 式 ユースケース
    5 分毎 */5 * * * * 大容量、低レイテンシ
    15 分ごと */15 * * * * 検索量が普通
    1 時間ごと 0 * * * * 標準(推奨)
    6 時間ごと 0 */6 * * * 少量、バッチ処理
    毎日 0 0 * * * 履歴データの収集

スケジューラ ジョブをテストする

  1. Cloud Scheduler コンソールで、ジョブを見つけます。
  2. [強制実行] をクリックして手動でトリガーします。
  3. 数秒待ってから、[Cloud Run > サービス > beyondtrust-epm-collector > ログ] に移動します。
  4. 関数が正常に実行されたことを確認します。
  5. GCS バケットをチェックして、ログが書き込まれたことを確認します。

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: BeyondTrust EPM logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [BeyondTrust Endpoint Privilege Management] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

BeyondTrust EPM のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: BeyondTrust EPM logs)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [BeyondTrust Endpoint Privilege Management] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://beyondtrust-epm-logs/beyondtrust-epm/
      
      • 次のように置き換えます。

        • beyondtrust-epm-logs: GCS バケット名。
        • beyondtrust-epm/: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
      • 例:

        • ルートバケット: gs://beyondtrust-epm-logs/
        • 接頭辞あり: gs://beyondtrust-epm-logs/beyondtrust-epm/
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。

    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル。

  9. [次へ] をクリックします。

  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング 論理
agent.id principal.asset.attribute.labels.value キー agent_id を持つラベルにマッピングされます
agent.version principal.asset.attribute.labels.value キー agent_version を持つラベルにマッピングされます
ecs.version principal.asset.attribute.labels.value キー ecs_version を持つラベルにマッピングされます
event_data.reason metadata.description 未加工ログのイベントの説明
event_datas.ActionId metadata.product_log_id プロダクト固有のログ識別子
file.path principal.file.full_path イベントのフルファイルパス
headers.content_length additional.fields.value.string_value キー content_length のラベルにマッピングされます
headers.content_type additional.fields.value.string_value キー content_type のラベルにマッピングされます
headers.http_host additional.fields.value.string_value キー http_host を持つラベルにマッピングされます
headers.http_version network.application_protocol_version HTTP プロトコル バージョン
headers.request_method network.http.method HTTP リクエスト メソッド
host.hostname principal.hostname プリンシパル ホスト名
host.hostname principal.asset.hostname プリンシパル アセットのホスト名
host.ip principal.asset.ip プリンシパル アセットの IP アドレス
host.ip principal.ip プリンシパル IP アドレス
host.mac principal.mac プリンシパル MAC アドレス
host.os.platform principal.platform macOS と等しい場合は MAC に設定
host.os.version principal.platform_version OS のバージョン
labels.related_item_id metadata.product_log_id 関連アイテムの識別子
process.command_line principal.process.command_line プロセス コマンドライン
process.name additional.fields.value.string_value キー process_name を持つラベルにマッピングされます
process.parent.name additional.fields.value.string_value キー process_parent_name を持つラベルにマッピングされます
process.parent.pid principal.process.parent_process.pid 親プロセスの PID を文字列に変換したもの
process.pid principal.process.pid プロセス PID を文字列に変換
user.id principal.user.userid ユーザー識別子
user.name principal.user.user_display_name ユーザーの表示名
なし metadata.event_timestamp イベントのタイムスタンプがログエントリのタイムスタンプに設定されている
なし metadata.event_type プリンシパルがない場合は GENERIC_EVENT、それ以外の場合は STATUS_UPDATE
なし network.application_protocol http_version フィールドに HTTP が含まれている場合は HTTP に設定

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。