Mengumpulkan log MFA HYPR

Didukung di:

Dokumen ini menjelaskan cara menyerap log HYPR MFA ke Google Security Operations menggunakan webhook atau Google Cloud Storage V2.

HYPR MFA adalah solusi autentikasi multi-faktor tanpa sandi yang menyediakan autentikasi tahan phishing menggunakan kunci sandi FIDO2, biometrika, dan login yang dimulai dari perangkat seluler. HYPR menggantikan sandi tradisional dengan kriptografi kunci publik yang aman untuk menghilangkan serangan berbasis kredensial sekaligus menyederhanakan autentikasi pengguna di seluruh workstation, aplikasi web, dan layanan cloud.

Sebelum memulai

Pastikan Anda memiliki prasyarat berikut:

  • Instance Google SecOps
  • Akses administratif ke HYPR Control Center
  • Hubungi Dukungan HYPR untuk mengaktifkan Hook Peristiwa Kustom untuk aplikasi RP yang ingin Anda pantau

Perbedaan metode pengumpulan

MFA HYPR mendukung dua metode untuk mengirim log ke Google Security Operations:

  • Webhook (direkomendasikan): HYPR mengirim peristiwa secara real-time ke Google Security Operations melalui Hook Peristiwa Kustom. Metode ini memberikan pengiriman peristiwa langsung dan tidak memerlukan infrastruktur tambahan.
  • Google Cloud Storage: Peristiwa HYPR dikumpulkan melalui API dan disimpan di GCS, lalu di-ingest oleh Google Security Operations. Metode ini menyediakan pemrosesan batch dan retensi data historis.

Pilih metode yang paling sesuai dengan kebutuhan Anda:

Fitur Webhook Google Cloud Storage
Latensi Real-time (detik) Batch (menit hingga jam)
Infrastruktur Tidak ada persyaratan Project GCP dengan fungsi Cloud Run
Data historis Terbatas untuk aliran peristiwa Retensi penuh di GCS
Kompleksitas penyiapan Sederhana Sedang
Biaya Minimal Biaya komputasi dan penyimpanan GCP

Opsi 1: Mengonfigurasi integrasi webhook

Membuat feed webhook di Google SecOps

Buat feed

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Di halaman berikutnya, klik Konfigurasi satu feed.
  4. Di kolom Nama feed, masukkan nama untuk feed (misalnya, HYPR MFA Events).
  5. Pilih Webhook sebagai Jenis sumber.
  6. Pilih HYPR MFA sebagai Jenis log.
  7. Klik Berikutnya.
  8. Tentukan nilai untuk parameter input berikut:
    • Pemisah pemisahan (opsional): Biarkan kosong. Setiap permintaan webhook berisi satu peristiwa JSON.
    • Namespace aset: Namespace aset.
    • Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.
  9. Klik Berikutnya.
  10. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Buat dan simpan kunci rahasia

Setelah membuat feed, Anda harus membuat kunci rahasia untuk autentikasi:

  1. Di halaman detail feed, klik Buat Kunci Rahasia.
  2. Dialog akan menampilkan kunci rahasia.
  3. Salin dan simpan kunci rahasia dengan aman.

Mendapatkan URL endpoint feed

  1. Buka tab Detail untuk feed tersebut.
  2. Di bagian Endpoint Information, salin Feed endpoint URL.
  3. Format URL-nya adalah:

    https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    

    atau

    https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreate
    
  4. Simpan URL ini untuk langkah berikutnya.

  5. Klik Done.

Buat kunci Google Cloud API

Chronicle memerlukan kunci API untuk autentikasi. Buat kunci API yang dibatasi di Konsol Google Cloud.

Buat kunci API

  1. Buka halaman Credentials Google Cloud Console.
  2. Pilih project Anda (project yang terkait dengan instance Chronicle Anda).
  3. Klik Create credentials > API key.
  4. Kunci API dibuat dan ditampilkan dalam dialog.
  5. Klik Edit API key untuk membatasi kunci.

Membatasi kunci API

  1. Di halaman setelan kunci API:
    • Name: Masukkan nama deskriptif (misalnya, Chronicle Webhook API Key).
  2. Di bagian Pembatasan API:
    1. Pilih Restrict key.
    2. Di drop-down Select APIs, telusuri dan pilih Google SecOps API (atau Chronicle API).
  3. Klik Simpan.
  4. Salin nilai kunci API dari kolom Kunci API di bagian atas halaman.
  5. Simpan kunci API dengan aman.

Mengonfigurasi Hook Peristiwa Kustom MFA HYPR

Buat URL webhook dengan header

HYPR mendukung header kustom untuk autentikasi. Gunakan metode autentikasi header untuk keamanan yang lebih baik.

  • URL endpoint (tanpa parameter):

    <ENDPOINT_URL>
    
  • Header:

    x-goog-chronicle-auth: <API_KEY>
    x-chronicle-auth: <SECRET_KEY>
    
    • Ganti:
      • <ENDPOINT_URL>: URL endpoint feed dari langkah sebelumnya.
      • <API_KEY>: Kunci Google Cloud API yang Anda buat.
      • <SECRET_KEY>: Kunci rahasia dari pembuatan feed Chronicle.

Siapkan konfigurasi JSON Custom Event Hook

  • Hook Peristiwa Kustom HYPR dikonfigurasi menggunakan JSON. Siapkan konfigurasi JSON berikut, dengan mengganti nilai placeholder:

    {
      "name": "Chronicle SIEM Integration",
      "eventType": "ALL",
      "invocationEndpoint": "<ENDPOINT_URL>",
      "httpMethod": "POST",
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
    • Ganti:

      • <ENDPOINT_URL>: URL endpoint feed Chronicle.
      • <API_KEY>: Kunci Google Cloud API.
      • <SECRET_KEY>: Kunci rahasia Chronicle.
    • Parameter konfigurasi:

    • name: Nama deskriptif untuk hook peristiwa (misalnya, Chronicle SIEM Integration).

    • eventType: Setel ke ALL untuk mengirim semua peristiwa HYPR, atau tentukan tag peristiwa tertentu seperti AUTHENTICATION, REGISTRATION, atau ACCESS_TOKEN.

    • invocationEndpoint: URL endpoint feed Chronicle.

    • httpMethod: Tetapkan ke POST.

    • authType: Setel ke API_KEY untuk autentikasi kunci API.

    • apiKeyName: Nama header untuk kunci API (x-goog-chronicle-auth).

    • apiKeyValue: Nilai kunci Google Cloud API.

    • headerParameters: Header tambahan termasuk Content-Type: application/json dan kunci rahasia Chronicle di header x-chronicle-auth.

Membuat Hook Peristiwa Kustom di HYPR Control Center

  1. Login ke HYPR Control Center sebagai administrator.
  2. Di menu navigasi sebelah kiri, klik Integrations.
  3. Di halaman Integrasi, klik Tambahkan Integrasi Baru.
  4. HYPR Control Center menampilkan integrasi yang tersedia.
  5. Klik kartu di bagian Hook Peristiwa untuk Peristiwa Kustom.
  6. Klik Add New Event Hook.
  7. Pada dialog Add New Event Hook, tempelkan konten JSON yang Anda siapkan ke dalam kolom teks.
  8. Klik Tambahkan Hook Peristiwa.
  9. HYPR Control Center akan kembali ke halaman Event Hooks.

Hook Peristiwa Kustom kini dikonfigurasi dan akan mulai mengirimkan peristiwa ke Google SecOps.

Memverifikasi bahwa webhook berfungsi

Memeriksa status hook peristiwa HYPR Control Center

  1. Login ke HYPR Control Center.
  2. Buka Integrations.
  3. Klik integrasi Peristiwa Kustom.
  4. Di tabel Hook Peristiwa, pastikan hook peristiwa Anda tercantum.
  5. Klik nama hook peristiwa untuk melihat detailnya.
  6. Verifikasi bahwa konfigurasi sesuai dengan setelan Anda.

Memeriksa status feed Chronicle

  1. Buka SIEM Settings > Feeds di Chronicle.
  2. Temukan feed webhook Anda.
  3. Periksa kolom Status (seharusnya Aktif).
  4. Periksa jumlah Peristiwa yang diterima (harus bertambah).
  5. Periksa stempel waktu Terakhir berhasil pada (seharusnya baru).

Memverifikasi log di Chronicle

  1. Buka Penelusuran > Penelusuran UDM.
  2. Gunakan kueri berikut:

    metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"
    
  3. Sesuaikan rentang waktu ke 1 jam terakhir.

  4. Pastikan peristiwa muncul di hasil.

Referensi metode autentikasi

Hook Peristiwa Kustom HYPR mendukung beberapa metode autentikasi. Metode yang direkomendasikan untuk Chronicle adalah autentikasi kunci API dengan header kustom.

  • Konfigurasi:

    {
      "authType": "API_KEY",
      "authParams": {
        "apiKeyAuthParameters": {
          "apiKeyName": "x-goog-chronicle-auth",
          "apiKeyValue": "<API_KEY>"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            },
            {
              "key": "x-chronicle-auth",
              "value": "<SECRET_KEY>",
              "isValueSecret": true
            }
          ]
        }
      }
    }
    
  • Kelebihan:

    • Kunci dan rahasia API dikirim di header (lebih aman daripada parameter URL).
    • Mendukung beberapa header autentikasi.
    • Header tidak dicatat dalam log akses server web.

Autentikasi Dasar

  • Konfigurasi:

    {
      "authType": "BASIC",
      "authParams": {
        "basicAuthParameters": {
          "username": "your-username",
          "password": "your-password"
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • Kasus penggunaan: Saat sistem target memerlukan Autentikasi Dasar HTTP.

Kredensial Klien OAuth 2.0

  • Konfigurasi:

    {
      "authType": "OAUTH_CLIENT_CREDENTIALS",
      "authParams": {
        "oauthParameters": {
          "clientParameters": {
            "clientId": "your-client-id",
            "clientSecret": "your-client-secret"
          },
          "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token",
          "httpMethod": "POST",
          "oauthHttpParameters": {
            "bodyParameters": [
              {
                "key": "scope",
                "value": "api://your-api/.default",
                "isValueSecret": false
              },
              {
                "key": "grant_type",
                "value": "client_credentials",
                "isValueSecret": false
              }
            ]
          }
        },
        "invocationHttpParameters": {
          "headerParameters": [
            {
              "key": "Content-Type",
              "value": "application/json",
              "isValueSecret": false
            }
          ]
        }
      }
    }
    
    • Kasus penggunaan: Saat sistem target memerlukan autentikasi OAuth 2.0.

Jenis peristiwa dan pemfilteran

Peristiwa HYPR dikelompokkan menggunakan parameter eventTags. Anda dapat mengonfigurasi Hook Peristiwa Kustom untuk mengirim semua peristiwa atau memfilter menurut jenis peristiwa tertentu.

Tag peristiwa

  • AUTENTIKASI: Peristiwa autentikasi pengguna (login, buka kunci).
  • REGISTRATION: Peristiwa pendaftaran perangkat (memasangkan perangkat seluler, kunci keamanan).
  • ACCESS_TOKEN: Peristiwa pembuatan dan penggunaan token akses.
  • AUDIT: Peristiwa log audit (tindakan administratif, perubahan konfigurasi).

Mengonfigurasi pemfilteran peristiwa

Untuk mengirim hanya jenis peristiwa tertentu, ubah parameter eventType dalam konfigurasi JSON:

  • Mengirim semua peristiwa:

    {
      "eventType": "ALL"
    }
    
  • Kirim hanya peristiwa autentikasi:

    {
      "eventType": "AUTHENTICATION"
    }
    
  • Kirim hanya peristiwa pendaftaran:

    {
      "eventType": "REGISTRATION"
    }
    

Opsi 2: Mengonfigurasi integrasi Google Cloud Storage

Prasyarat tambahan untuk integrasi GCS

Selain prasyarat yang tercantum di bagian "Sebelum memulai", Anda memerlukan:

  • Project GCP dengan Cloud Storage API diaktifkan
  • Izin untuk membuat dan mengelola bucket GCS
  • Izin untuk mengelola kebijakan IAM di bucket GCS
  • Izin untuk membuat layanan Cloud Run, topik Pub/Sub, dan tugas Cloud Scheduler
  • Kredensial HYPR API (hubungi Dukungan HYPR untuk mendapatkan akses API)

Membuat bucket Google Cloud Storage

  1. Buka Konsol Google Cloud.
  2. Pilih project Anda atau buat project baru.
  3. Di menu navigasi, buka Cloud Storage > Buckets.
  4. Klik Create bucket.
  5. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Beri nama bucket Anda Masukkan nama yang unik secara global (misalnya, hypr-mfa-logs)
    Location type Pilih berdasarkan kebutuhan Anda (Region, Dual-region, Multi-region)
    Location Pilih lokasi (misalnya, us-central1)
    Kelas penyimpanan Standar (direkomendasikan untuk log yang sering diakses)
    Access control Seragam (direkomendasikan)
    Alat perlindungan Opsional: Aktifkan pembuatan versi objek atau kebijakan retensi
  6. Klik Create.

Kumpulkan kredensial HYPR API

Hubungi Dukungan HYPR untuk mendapatkan kredensial API guna mengakses data peristiwa HYPR. Anda akan memerlukan:

  • URL Dasar API: URL instance HYPR Anda (misalnya, https://your-tenant.hypr.com)
  • Token API: Token autentikasi untuk akses API
  • ID Aplikasi RP: ID aplikasi Pihak Tepercaya yang akan dipantau

Buat akun layanan untuk Cloud Run Function

Fungsi Cloud Run memerlukan akun layanan dengan izin untuk menulis ke bucket GCS dan dipanggil oleh Pub/Sub.

Membuat akun layanan

  1. Di GCP Console, buka IAM & Admin > Service Accounts.
  2. Klik Create Service Account.
  3. Berikan detail konfigurasi berikut:
    • Nama akun layanan: Masukkan hypr-logs-collector-sa.
    • Deskripsi akun layanan: Masukkan Service account for Cloud Run function to collect HYPR MFA logs.
  4. Klik Create and Continue.
  5. Di bagian Grant this service account access to project, tambahkan peran berikut:
    1. Klik Pilih peran.
    2. Telusuri dan pilih Storage Object Admin.
    3. Klik + Add another role.
    4. Telusuri dan pilih Cloud Run Invoker.
    5. Klik + Add another role.
    6. Telusuri dan pilih Cloud Functions Invoker.
  6. Klik Lanjutkan.
  7. Klik Done.

Peran ini diperlukan untuk:

  • Storage Object Admin: Menulis log ke bucket GCS dan mengelola file status
  • Cloud Run Invoker: Mengizinkan Pub/Sub memanggil fungsi
  • Cloud Functions Invoker: Mengizinkan pemanggilan fungsi

Memberikan izin IAM pada bucket GCS

Beri akun layanan (hypr-logs-collector-sa) izin tulis di bucket GCS:

  1. Buka Cloud Storage > Buckets.
  2. Klik nama bucket Anda (misalnya, hypr-mfa-logs).
  3. Buka tab Izin.
  4. Klik Grant access.
  5. Berikan detail konfigurasi berikut:
    • Tambahkan prinsipal: Masukkan email akun layanan (misalnya, hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Tetapkan peran: Pilih Storage Object Admin.
  6. Klik Simpan.

Membuat topik Pub/Sub

Buat topik Pub/Sub yang akan dipublikasikan oleh Cloud Scheduler dan akan dilanggan oleh fungsi Cloud Run.

  1. Di GCP Console, buka Pub/Sub > Topics.
  2. Klik Create topic.
  3. Berikan detail konfigurasi berikut:
    • ID Topik: Masukkan hypr-logs-trigger.
    • Biarkan setelan lainnya tetap default.
  4. Klik Create.

Membuat fungsi Cloud Run untuk mengumpulkan log

Fungsi Cloud Run akan dipicu oleh pesan Pub/Sub dari Cloud Scheduler untuk mengambil log dari HYPR API dan menuliskannya ke GCS.

  1. Di GCP Console, buka Cloud Run.
  2. Klik Create service.
  3. Pilih Function (gunakan editor inline untuk membuat fungsi).
  4. Di bagian Konfigurasi, berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama layanan hypr-logs-collector
    Region Pilih region yang cocok dengan bucket GCS Anda (misalnya, us-central1)
    Runtime Pilih Python 3.12 atau yang lebih baru
  5. Di bagian Pemicu (opsional):

    1. Klik + Tambahkan pemicu.
    2. Pilih Cloud Pub/Sub.
    3. Di Select a Cloud Pub/Sub topic, pilih topik Pub/Sub (hypr-logs-trigger).
    4. Klik Simpan.
  6. Di bagian Authentication:

    1. Pilih Wajibkan autentikasi.
    2. Periksa Identity and Access Management (IAM).
  7. Scroll ke bawah dan luaskan Containers, Networking, Security.

  8. Buka tab Security:

    • Akun layanan: Pilih akun layanan (hypr-logs-collector-sa).
  9. Buka tab Containers:

    1. Klik Variabel & Secret.
    2. Klik + Tambahkan variabel untuk setiap variabel lingkungan:
    Nama Variabel Nilai Contoh Deskripsi
    GCS_BUCKET hypr-mfa-logs Nama bucket GCS
    GCS_PREFIX hypr-events Awalan untuk file log
    STATE_KEY hypr-events/state.json Jalur file status
    HYPR_API_URL https://your-tenant.hypr.com URL dasar HYPR API
    HYPR_API_TOKEN your-api-token Token autentikasi HYPR API
    HYPR_RP_APP_ID your-rp-app-id ID aplikasi RP HYPR
    MAX_RECORDS 1000 Jumlah maksimum data per proses
    PAGE_SIZE 100 Data per halaman
    LOOKBACK_HOURS 24 Periode lihat balik awal
  10. Di bagian Variables & Secrets, scroll ke bawah ke Requests:

    • Waktu tunggu permintaan: Masukkan 600 detik (10 menit).
  11. Buka tab Setelan:

    • Di bagian Materi:
      • Memori: Pilih 512 MiB atau yang lebih tinggi.
      • CPU: Pilih 1.
  12. Di bagian Penskalaan revisi:

    • Jumlah minimum instance: Masukkan 0.
    • Jumlah maksimum instance: Masukkan 100 (atau sesuaikan berdasarkan perkiraan beban).
  13. Klik Create.

  14. Tunggu hingga layanan dibuat (1-2 menit).

  15. Setelah layanan dibuat, editor kode inline akan terbuka secara otomatis.

Menambahkan kode fungsi

  1. Masukkan main di kolom Entry point.
  2. Di editor kode inline, buat dua file:

    • File pertama: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json')
    HYPR_API_URL = os.environ.get('HYPR_API_URL')
    HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN')
    HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    def to_unix_millis(dt: datetime) -> int:
        """Convert datetime to Unix epoch milliseconds."""
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        dt = dt.astimezone(timezone.utc)
        return int(dt.timestamp() * 1000)
    
    def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
            value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]):
            print('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            now = datetime.now(timezone.utc)
            last_time = None
    
            if isinstance(state, dict) and state.get("last_event_time"):
                try:
                    last_time = parse_datetime(state["last_event_time"])
                    # Overlap by 2 minutes to catch any delayed events
                    last_time = last_time - timedelta(minutes=2)
                except Exception as e:
                    print(f"Warning: Could not parse last_event_time: {e}")
    
            if last_time is None:
                last_time = now - timedelta(hours=LOOKBACK_HOURS)
    
            print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
    
            # Convert to Unix milliseconds for HYPR API
            start_millis = to_unix_millis(last_time)
            end_millis = to_unix_millis(now)
    
            # Fetch logs
            records, newest_event_time = fetch_logs(
                api_url=HYPR_API_URL,
                api_token=HYPR_API_TOKEN,
                rp_app_id=HYPR_RP_APP_ID,
                start_time_ms=start_millis,
                end_time_ms=end_millis,
                page_size=PAGE_SIZE,
                max_records=MAX_RECORDS,
            )
    
            if not records:
                print("No new log records found.")
                save_state(bucket, STATE_KEY, now.isoformat())
                return
    
            # Write to GCS as NDJSON
            timestamp = now.strftime('%Y%m%d_%H%M%S')
            object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
            blob = bucket.blob(object_key)
    
            ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
            blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
            print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
    
            # Update state with newest event time
            if newest_event_time:
                save_state(bucket, STATE_KEY, newest_event_time)
            else:
                save_state(bucket, STATE_KEY, now.isoformat())
    
            print(f"Successfully processed {len(records)} records")
    
        except Exception as e:
            print(f'Error processing logs: {str(e)}')
            raise
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            print(f"Warning: Could not load state: {e}")
    
        return {}
    
    def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
            state = {'last_event_time': last_event_time_iso}
            blob = bucket.blob(key)
            blob.upload_from_string(
                json.dumps(state, indent=2),
                content_type='application/json'
            )
            print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
            print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int):
        """
        Fetch logs from HYPR API with pagination and rate limiting.
    
        Args:
            api_url: HYPR API base URL
            api_token: HYPR API authentication token
            rp_app_id: HYPR RP application ID
            start_time_ms: Start time in Unix milliseconds
            end_time_ms: End time in Unix milliseconds
            page_size: Number of records per page
            max_records: Maximum total records to fetch
    
        Returns:
            Tuple of (records list, newest_event_time ISO string)
        """
        # Clean up API URL
        base_url = api_url.rstrip('/')
    
        endpoint = f"{base_url}/rp/api/versioned/events"
    
        # Bearer token authentication
        headers = {
            'Authorization': f'Bearer {api_token}',
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'User-Agent': 'GoogleSecOps-HYPRCollector/1.0'
        }
    
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
    
        # Offset-based pagination
        start_index = 0
    
        while True:
            page_num += 1
    
            if len(records) >= max_records:
                print(f"Reached max_records limit ({max_records})")
                break
    
            # Build request parameters
            params = []
            params.append(f"rpAppId={rp_app_id}")
            params.append(f"startDate={start_time_ms}")
            params.append(f"endDate={end_time_ms}")
            params.append(f"start={start_index}")
            params.append(f"limit={min(page_size, max_records - len(records))}")
            url = f"{endpoint}?{'&'.join(params)}"
    
            try:
                response = http.request('GET', url, headers=headers)
    
                # Handle rate limiting with exponential backoff
                if response.status == 429:
                    retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
                    print(f"Rate limited (429). Retrying after {retry_after}s...")
                    time.sleep(retry_after)
                    backoff = min(backoff * 2, 30.0)
                    continue
    
                backoff = 1.0
    
                if response.status != 200:
                    print(f"HTTP Error: {response.status}")
                    response_text = response.data.decode('utf-8')
                    print(f"Response body: {response_text}")
                    return [], None
    
                data = json.loads(response.data.decode('utf-8'))
    
                # Extract results
                page_results = data.get('data', [])
    
                if not page_results:
                    print(f"No more results (empty page)")
                    break
    
                print(f"Page {page_num}: Retrieved {len(page_results)} events")
                records.extend(page_results)
    
                # Track newest event time
                for event in page_results:
                    try:
                        # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds
                        event_time_ms = event.get('LOGGEDTIMEINUTC')
                        if event_time_ms:
                            event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc)
                            event_time = event_dt.isoformat()
                            if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                                newest_time = event_time
                    except Exception as e:
                        print(f"Warning: Could not parse event time: {e}")
    
                # Check for more results
                current_size = data.get('size', 0)
                if current_size < page_size:
                    print(f"Reached last page (size={current_size} < limit={page_size})")
                    break
    
                start_index += current_size
    
            except Exception as e:
                print(f"Error fetching logs: {e}")
                return [], None
    
        print(f"Retrieved {len(records)} total records from {page_num} pages")
        return records, newest_time
    
    • File kedua: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Klik Deploy untuk menyimpan dan men-deploy fungsi.

  4. Tunggu hingga deployment selesai (2-3 menit).

Buat tugas Cloud Scheduler

Cloud Scheduler akan memublikasikan pesan ke topik Pub/Sub (hypr-logs-trigger) secara berkala, yang memicu fungsi Cloud Run.

  1. Di GCP Console, buka Cloud Scheduler.
  2. Klik Create Job.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama hypr-logs-collector-hourly
    Region Pilih region yang sama dengan fungsi Cloud Run
    Frekuensi 0 * * * * (setiap jam, tepat pada waktunya)
    Zona Waktu Pilih zona waktu (UTC direkomendasikan)
    Jenis target Pub/Sub
    Topik Pilih topik Pub/Sub (hypr-logs-trigger)
    Isi pesan {} (objek JSON kosong)
  4. Klik Create.

Opsi frekuensi jadwal

Pilih frekuensi berdasarkan volume log dan persyaratan latensi:

Frekuensi Ekspresi Cron Kasus Penggunaan
Setiap 5 menit */5 * * * * Volume tinggi, latensi rendah
Setiap 15 menit */15 * * * * Volume sedang
Setiap jam 0 * * * * Standar (direkomendasikan)
Setiap 6 jam 0 */6 * * * Volume rendah, pemrosesan batch
Harian 0 0 * * * Pengumpulan data historis

Menguji integrasi

  1. Di konsol Cloud Scheduler, temukan tugas Anda (hypr-logs-collector-hourly).
  2. Klik Force run untuk memicu tugas secara manual.
  3. Tunggu beberapa detik.
  4. Buka Cloud Run > Services.
  5. Klik nama fungsi Anda (hypr-logs-collector).
  6. Klik tab Logs.
  7. Pastikan fungsi berhasil dieksekusi. Cari:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X events
    Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Buka Cloud Storage > Buckets.

  9. Klik nama bucket Anda (misalnya, hypr-mfa-logs).

  10. Buka folder awalan (misalnya, hypr-events/).

  11. Pastikan file .ndjson baru dibuat dengan stempel waktu saat ini.

Jika Anda melihat error dalam log:

  • HTTP 401: Periksa kredensial API di variabel lingkungan
  • HTTP 403: Pastikan token HYPR API memiliki izin yang diperlukan dan ID Aplikasi RP sudah benar
  • HTTP 429: Pembatasan kecepatan - fungsi akan otomatis mencoba lagi dengan penundaan
  • Variabel lingkungan tidak ada: Periksa apakah semua variabel yang diperlukan telah ditetapkan

Mengambil akun layanan Google SecOps

Google SecOps menggunakan akun layanan unik untuk membaca data dari bucket GCS Anda. Anda harus memberi akun layanan ini akses ke bucket Anda.

Mengonfigurasi feed di Google SecOps untuk memproses log MFA HYPR

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Klik Konfigurasi satu feed.
  4. Di kolom Nama feed, masukkan nama untuk feed (misalnya, HYPR MFA Logs from GCS).
  5. Pilih Google Cloud Storage V2 sebagai Source type.
  6. Pilih HYPR MFA sebagai Jenis log.

  7. Klik Get Service Account. Email akun layanan yang unik akan ditampilkan, misalnya:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Salin alamat email ini untuk digunakan di langkah berikutnya.

  9. Klik Berikutnya.

  10. Tentukan nilai untuk parameter input berikut:

    • URL bucket penyimpanan: Masukkan URI bucket GCS dengan jalur awalan:

      gs://hypr-mfa-logs/hypr-events/
      
      • Ganti:
        • hypr-mfa-logs: Nama bucket GCS Anda.
        • hypr-events: Awalan/jalur folder opsional tempat log disimpan (biarkan kosong untuk root).
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda:

      • Jangan pernah: Tidak pernah menghapus file apa pun setelah transfer (direkomendasikan untuk pengujian).
      • Hapus file yang ditransfer: Menghapus file setelah transfer berhasil.
      • Hapus file yang ditransfer dan direktori kosong: Menghapus file dan direktori kosong setelah transfer berhasil.

    • Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir. Defaultnya adalah 180 hari.

    • Namespace aset: Namespace aset.

    • Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.

  11. Klik Berikutnya.

  12. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Memberikan izin IAM ke akun layanan Google SecOps

Akun layanan Google SecOps memerlukan peran Storage Object Viewer di bucket GCS Anda.

  1. Buka Cloud Storage > Buckets.
  2. Klik nama bucket Anda (misalnya, hypr-mfa-logs).
  3. Buka tab Izin.
  4. Klik Grant access.
  5. Berikan detail konfigurasi berikut:
    • Add principals: Tempel email akun layanan Google SecOps.
    • Tetapkan peran: Pilih Storage Object Viewer.
  6. Klik Simpan.

Tabel pemetaan UDM

Kolom Log Pemetaan UDM Logika
extensions.auth.type Jenis autentikasi (misalnya, SSO, MFA)
metadata.event_type Jenis acara (misalnya, USER_LOGIN, NETWORK_CONNECTION)
EVENTNAME metadata.product_event_type Jenis peristiwa khusus produk
ID metadata.product_log_id ID log khusus produk
USERAGENT network.http.parsed_user_agent Agen pengguna HTTP yang diuraikan
USERAGENT network.http.user_agent String agen pengguna HTTP
SESSIONID network.session_id ID sesi
DEVICEMODEL principal.asset.hardware.model Model hardware aset
COMPANION,MACHINEDOMAIN principal.asset.hostname Nama host aset
REMOTEIP principal.asset.ip Alamat IP aset
DEVICEID principal.asset_id ID unik untuk aset
COMPANION,MACHINEDOMAIN principal.hostname Nama host yang terkait dengan akun utama
REMOTEIP principal.ip Alamat IP yang terkait dengan prinsipal
DEVICEOS principal.platform Platform (misalnya, WINDOWS, LINUX)
DEVICEOSVERSION principal.platform_version Versi platform
BERHASIL security_result.action Tindakan yang dilakukan oleh sistem keamanan (misalnya, IZINKAN, BLOKIR)
PESAN security_result.description Deskripsi hasil keamanan
MACHINEUSERNAME target.user.user_display_name Nama tampilan pengguna
FIDOUSER target.user.userid ID Pengguna
metadata.product_name Nama produk
metadata.vendor_name Nama vendor/perusahaan

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.