Mengumpulkan log MFA HYPR
Dokumen ini menjelaskan cara menyerap log HYPR MFA ke Google Security Operations menggunakan webhook atau Google Cloud Storage V2.
HYPR MFA adalah solusi autentikasi multi-faktor tanpa sandi yang menyediakan autentikasi tahan phishing menggunakan kunci sandi FIDO2, biometrika, dan login yang dimulai dari perangkat seluler. HYPR menggantikan sandi tradisional dengan kriptografi kunci publik yang aman untuk menghilangkan serangan berbasis kredensial sekaligus menyederhanakan autentikasi pengguna di seluruh workstation, aplikasi web, dan layanan cloud.
Sebelum memulai
Pastikan Anda memiliki prasyarat berikut:
- Instance Google SecOps
- Akses administratif ke HYPR Control Center
- Hubungi Dukungan HYPR untuk mengaktifkan Hook Peristiwa Kustom untuk aplikasi RP yang ingin Anda pantau
Perbedaan metode pengumpulan
MFA HYPR mendukung dua metode untuk mengirim log ke Google Security Operations:
- Webhook (direkomendasikan): HYPR mengirim peristiwa secara real-time ke Google Security Operations melalui Hook Peristiwa Kustom. Metode ini memberikan pengiriman peristiwa langsung dan tidak memerlukan infrastruktur tambahan.
- Google Cloud Storage: Peristiwa HYPR dikumpulkan melalui API dan disimpan di GCS, lalu di-ingest oleh Google Security Operations. Metode ini menyediakan pemrosesan batch dan retensi data historis.
Pilih metode yang paling sesuai dengan kebutuhan Anda:
| Fitur | Webhook | Google Cloud Storage |
|---|---|---|
| Latensi | Real-time (detik) | Batch (menit hingga jam) |
| Infrastruktur | Tidak ada persyaratan | Project GCP dengan fungsi Cloud Run |
| Data historis | Terbatas untuk aliran peristiwa | Retensi penuh di GCS |
| Kompleksitas penyiapan | Sederhana | Sedang |
| Biaya | Minimal | Biaya komputasi dan penyimpanan GCP |
Opsi 1: Mengonfigurasi integrasi webhook
Membuat feed webhook di Google SecOps
Buat feed
- Buka Setelan SIEM > Feed.
- Klik Tambahkan Feed Baru.
- Di halaman berikutnya, klik Konfigurasi satu feed.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
HYPR MFA Events). - Pilih Webhook sebagai Jenis sumber.
- Pilih HYPR MFA sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- Pemisah pemisahan (opsional): Biarkan kosong. Setiap permintaan webhook berisi satu peristiwa JSON.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Buat dan simpan kunci rahasia
Setelah membuat feed, Anda harus membuat kunci rahasia untuk autentikasi:
- Di halaman detail feed, klik Buat Kunci Rahasia.
- Dialog akan menampilkan kunci rahasia.
- Salin dan simpan kunci rahasia dengan aman.
Mendapatkan URL endpoint feed
- Buka tab Detail untuk feed tersebut.
- Di bagian Endpoint Information, salin Feed endpoint URL.
Format URL-nya adalah:
https://malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreateatau
https://<REGION>-malachiteingestion-pa.googleapis.com/v2/unstructuredlogentries:batchCreateSimpan URL ini untuk langkah berikutnya.
Klik Done.
Buat kunci Google Cloud API
Chronicle memerlukan kunci API untuk autentikasi. Buat kunci API yang dibatasi di Konsol Google Cloud.
Buat kunci API
- Buka halaman Credentials Google Cloud Console.
- Pilih project Anda (project yang terkait dengan instance Chronicle Anda).
- Klik Create credentials > API key.
- Kunci API dibuat dan ditampilkan dalam dialog.
- Klik Edit API key untuk membatasi kunci.
Membatasi kunci API
- Di halaman setelan kunci API:
- Name: Masukkan nama deskriptif (misalnya,
Chronicle Webhook API Key).
- Name: Masukkan nama deskriptif (misalnya,
- Di bagian Pembatasan API:
- Pilih Restrict key.
- Di drop-down Select APIs, telusuri dan pilih Google SecOps API (atau Chronicle API).
- Klik Simpan.
- Salin nilai kunci API dari kolom Kunci API di bagian atas halaman.
- Simpan kunci API dengan aman.
Mengonfigurasi Hook Peristiwa Kustom MFA HYPR
Buat URL webhook dengan header
HYPR mendukung header kustom untuk autentikasi. Gunakan metode autentikasi header untuk keamanan yang lebih baik.
URL endpoint (tanpa parameter):
<ENDPOINT_URL>Header:
x-goog-chronicle-auth: <API_KEY> x-chronicle-auth: <SECRET_KEY>- Ganti:
<ENDPOINT_URL>: URL endpoint feed dari langkah sebelumnya.<API_KEY>: Kunci Google Cloud API yang Anda buat.<SECRET_KEY>: Kunci rahasia dari pembuatan feed Chronicle.
- Ganti:
Siapkan konfigurasi JSON Custom Event Hook
Hook Peristiwa Kustom HYPR dikonfigurasi menggunakan JSON. Siapkan konfigurasi JSON berikut, dengan mengganti nilai placeholder:
{ "name": "Chronicle SIEM Integration", "eventType": "ALL", "invocationEndpoint": "<ENDPOINT_URL>", "httpMethod": "POST", "authType": "API_KEY", "authParams": { "apiKeyAuthParameters": { "apiKeyName": "x-goog-chronicle-auth", "apiKeyValue": "<API_KEY>" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false }, { "key": "x-chronicle-auth", "value": "<SECRET_KEY>", "isValueSecret": true } ] } } }Ganti:
<ENDPOINT_URL>: URL endpoint feed Chronicle.<API_KEY>: Kunci Google Cloud API.<SECRET_KEY>: Kunci rahasia Chronicle.
Parameter konfigurasi:
name: Nama deskriptif untuk hook peristiwa (misalnya,
Chronicle SIEM Integration).eventType: Setel ke
ALLuntuk mengirim semua peristiwa HYPR, atau tentukan tag peristiwa tertentu sepertiAUTHENTICATION,REGISTRATION, atauACCESS_TOKEN.invocationEndpoint: URL endpoint feed Chronicle.
httpMethod: Tetapkan ke
POST.authType: Setel ke
API_KEYuntuk autentikasi kunci API.apiKeyName: Nama header untuk kunci API (
x-goog-chronicle-auth).apiKeyValue: Nilai kunci Google Cloud API.
headerParameters: Header tambahan termasuk
Content-Type: application/jsondan kunci rahasia Chronicle di headerx-chronicle-auth.
Membuat Hook Peristiwa Kustom di HYPR Control Center
- Login ke HYPR Control Center sebagai administrator.
- Di menu navigasi sebelah kiri, klik Integrations.
- Di halaman Integrasi, klik Tambahkan Integrasi Baru.
- HYPR Control Center menampilkan integrasi yang tersedia.
- Klik kartu di bagian Hook Peristiwa untuk Peristiwa Kustom.
- Klik Add New Event Hook.
- Pada dialog Add New Event Hook, tempelkan konten JSON yang Anda siapkan ke dalam kolom teks.
- Klik Tambahkan Hook Peristiwa.
- HYPR Control Center akan kembali ke halaman Event Hooks.
Hook Peristiwa Kustom kini dikonfigurasi dan akan mulai mengirimkan peristiwa ke Google SecOps.
Memverifikasi bahwa webhook berfungsi
Memeriksa status hook peristiwa HYPR Control Center
- Login ke HYPR Control Center.
- Buka Integrations.
- Klik integrasi Peristiwa Kustom.
- Di tabel Hook Peristiwa, pastikan hook peristiwa Anda tercantum.
- Klik nama hook peristiwa untuk melihat detailnya.
- Verifikasi bahwa konfigurasi sesuai dengan setelan Anda.
Memeriksa status feed Chronicle
- Buka SIEM Settings > Feeds di Chronicle.
- Temukan feed webhook Anda.
- Periksa kolom Status (seharusnya Aktif).
- Periksa jumlah Peristiwa yang diterima (harus bertambah).
- Periksa stempel waktu Terakhir berhasil pada (seharusnya baru).
Memverifikasi log di Chronicle
- Buka Penelusuran > Penelusuran UDM.
Gunakan kueri berikut:
metadata.vendor_name = "HYPR" AND metadata.product_name = "MFA"Sesuaikan rentang waktu ke 1 jam terakhir.
Pastikan peristiwa muncul di hasil.
Referensi metode autentikasi
Hook Peristiwa Kustom HYPR mendukung beberapa metode autentikasi. Metode yang direkomendasikan untuk Chronicle adalah autentikasi kunci API dengan header kustom.
Autentikasi Kunci API (Direkomendasikan untuk Chronicle)
Konfigurasi:
{ "authType": "API_KEY", "authParams": { "apiKeyAuthParameters": { "apiKeyName": "x-goog-chronicle-auth", "apiKeyValue": "<API_KEY>" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false }, { "key": "x-chronicle-auth", "value": "<SECRET_KEY>", "isValueSecret": true } ] } } }Kelebihan:
- Kunci dan rahasia API dikirim di header (lebih aman daripada parameter URL).
- Mendukung beberapa header autentikasi.
- Header tidak dicatat dalam log akses server web.
Autentikasi Dasar
Konfigurasi:
{ "authType": "BASIC", "authParams": { "basicAuthParameters": { "username": "your-username", "password": "your-password" }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false } ] } } }- Kasus penggunaan: Saat sistem target memerlukan Autentikasi Dasar HTTP.
Kredensial Klien OAuth 2.0
Konfigurasi:
{ "authType": "OAUTH_CLIENT_CREDENTIALS", "authParams": { "oauthParameters": { "clientParameters": { "clientId": "your-client-id", "clientSecret": "your-client-secret" }, "authorizationEndpoint": "https://login.example.com/oauth2/v2.0/token", "httpMethod": "POST", "oauthHttpParameters": { "bodyParameters": [ { "key": "scope", "value": "api://your-api/.default", "isValueSecret": false }, { "key": "grant_type", "value": "client_credentials", "isValueSecret": false } ] } }, "invocationHttpParameters": { "headerParameters": [ { "key": "Content-Type", "value": "application/json", "isValueSecret": false } ] } } }- Kasus penggunaan: Saat sistem target memerlukan autentikasi OAuth 2.0.
Jenis peristiwa dan pemfilteran
Peristiwa HYPR dikelompokkan menggunakan parameter eventTags. Anda dapat mengonfigurasi Hook Peristiwa Kustom untuk mengirim semua peristiwa atau memfilter menurut jenis peristiwa tertentu.
Tag peristiwa
- AUTENTIKASI: Peristiwa autentikasi pengguna (login, buka kunci).
- REGISTRATION: Peristiwa pendaftaran perangkat (memasangkan perangkat seluler, kunci keamanan).
- ACCESS_TOKEN: Peristiwa pembuatan dan penggunaan token akses.
- AUDIT: Peristiwa log audit (tindakan administratif, perubahan konfigurasi).
Mengonfigurasi pemfilteran peristiwa
Untuk mengirim hanya jenis peristiwa tertentu, ubah parameter eventType dalam konfigurasi JSON:
Mengirim semua peristiwa:
{ "eventType": "ALL" }Kirim hanya peristiwa autentikasi:
{ "eventType": "AUTHENTICATION" }Kirim hanya peristiwa pendaftaran:
{ "eventType": "REGISTRATION" }
Opsi 2: Mengonfigurasi integrasi Google Cloud Storage
Prasyarat tambahan untuk integrasi GCS
Selain prasyarat yang tercantum di bagian "Sebelum memulai", Anda memerlukan:
- Project GCP dengan Cloud Storage API diaktifkan
- Izin untuk membuat dan mengelola bucket GCS
- Izin untuk mengelola kebijakan IAM di bucket GCS
- Izin untuk membuat layanan Cloud Run, topik Pub/Sub, dan tugas Cloud Scheduler
- Kredensial HYPR API (hubungi Dukungan HYPR untuk mendapatkan akses API)
Membuat bucket Google Cloud Storage
- Buka Konsol Google Cloud.
- Pilih project Anda atau buat project baru.
- Di menu navigasi, buka Cloud Storage > Buckets.
- Klik Create bucket.
Berikan detail konfigurasi berikut:
Setelan Nilai Beri nama bucket Anda Masukkan nama yang unik secara global (misalnya, hypr-mfa-logs)Location type Pilih berdasarkan kebutuhan Anda (Region, Dual-region, Multi-region) Location Pilih lokasi (misalnya, us-central1)Kelas penyimpanan Standar (direkomendasikan untuk log yang sering diakses) Access control Seragam (direkomendasikan) Alat perlindungan Opsional: Aktifkan pembuatan versi objek atau kebijakan retensi Klik Create.
Kumpulkan kredensial HYPR API
Hubungi Dukungan HYPR untuk mendapatkan kredensial API guna mengakses data peristiwa HYPR. Anda akan memerlukan:
- URL Dasar API: URL instance HYPR Anda (misalnya,
https://your-tenant.hypr.com) - Token API: Token autentikasi untuk akses API
- ID Aplikasi RP: ID aplikasi Pihak Tepercaya yang akan dipantau
Buat akun layanan untuk Cloud Run Function
Fungsi Cloud Run memerlukan akun layanan dengan izin untuk menulis ke bucket GCS dan dipanggil oleh Pub/Sub.
Membuat akun layanan
- Di GCP Console, buka IAM & Admin > Service Accounts.
- Klik Create Service Account.
- Berikan detail konfigurasi berikut:
- Nama akun layanan: Masukkan
hypr-logs-collector-sa. - Deskripsi akun layanan: Masukkan
Service account for Cloud Run function to collect HYPR MFA logs.
- Nama akun layanan: Masukkan
- Klik Create and Continue.
- Di bagian Grant this service account access to project, tambahkan peran berikut:
- Klik Pilih peran.
- Telusuri dan pilih Storage Object Admin.
- Klik + Add another role.
- Telusuri dan pilih Cloud Run Invoker.
- Klik + Add another role.
- Telusuri dan pilih Cloud Functions Invoker.
- Klik Lanjutkan.
- Klik Done.
Peran ini diperlukan untuk:
- Storage Object Admin: Menulis log ke bucket GCS dan mengelola file status
- Cloud Run Invoker: Mengizinkan Pub/Sub memanggil fungsi
- Cloud Functions Invoker: Mengizinkan pemanggilan fungsi
Memberikan izin IAM pada bucket GCS
Beri akun layanan (hypr-logs-collector-sa) izin tulis di bucket GCS:
- Buka Cloud Storage > Buckets.
- Klik nama bucket Anda (misalnya,
hypr-mfa-logs). - Buka tab Izin.
- Klik Grant access.
- Berikan detail konfigurasi berikut:
- Tambahkan prinsipal: Masukkan email akun layanan (misalnya,
hypr-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Tetapkan peran: Pilih Storage Object Admin.
- Tambahkan prinsipal: Masukkan email akun layanan (misalnya,
- Klik Simpan.
Membuat topik Pub/Sub
Buat topik Pub/Sub yang akan dipublikasikan oleh Cloud Scheduler dan akan dilanggan oleh fungsi Cloud Run.
- Di GCP Console, buka Pub/Sub > Topics.
- Klik Create topic.
- Berikan detail konfigurasi berikut:
- ID Topik: Masukkan
hypr-logs-trigger. - Biarkan setelan lainnya tetap default.
- ID Topik: Masukkan
- Klik Create.
Membuat fungsi Cloud Run untuk mengumpulkan log
Fungsi Cloud Run akan dipicu oleh pesan Pub/Sub dari Cloud Scheduler untuk mengambil log dari HYPR API dan menuliskannya ke GCS.
- Di GCP Console, buka Cloud Run.
- Klik Create service.
- Pilih Function (gunakan editor inline untuk membuat fungsi).
Di bagian Konfigurasi, berikan detail konfigurasi berikut:
Setelan Nilai Nama layanan hypr-logs-collectorRegion Pilih region yang cocok dengan bucket GCS Anda (misalnya, us-central1)Runtime Pilih Python 3.12 atau yang lebih baru Di bagian Pemicu (opsional):
- Klik + Tambahkan pemicu.
- Pilih Cloud Pub/Sub.
- Di Select a Cloud Pub/Sub topic, pilih topik Pub/Sub (
hypr-logs-trigger). - Klik Simpan.
Di bagian Authentication:
- Pilih Wajibkan autentikasi.
- Periksa Identity and Access Management (IAM).
Scroll ke bawah dan luaskan Containers, Networking, Security.
Buka tab Security:
- Akun layanan: Pilih akun layanan (
hypr-logs-collector-sa).
- Akun layanan: Pilih akun layanan (
Buka tab Containers:
- Klik Variabel & Secret.
- Klik + Tambahkan variabel untuk setiap variabel lingkungan:
Nama Variabel Nilai Contoh Deskripsi GCS_BUCKEThypr-mfa-logsNama bucket GCS GCS_PREFIXhypr-eventsAwalan untuk file log STATE_KEYhypr-events/state.jsonJalur file status HYPR_API_URLhttps://your-tenant.hypr.comURL dasar HYPR API HYPR_API_TOKENyour-api-tokenToken autentikasi HYPR API HYPR_RP_APP_IDyour-rp-app-idID aplikasi RP HYPR MAX_RECORDS1000Jumlah maksimum data per proses PAGE_SIZE100Data per halaman LOOKBACK_HOURS24Periode lihat balik awal Di bagian Variables & Secrets, scroll ke bawah ke Requests:
- Waktu tunggu permintaan: Masukkan
600detik (10 menit).
- Waktu tunggu permintaan: Masukkan
Buka tab Setelan:
- Di bagian Materi:
- Memori: Pilih 512 MiB atau yang lebih tinggi.
- CPU: Pilih 1.
- Di bagian Materi:
Di bagian Penskalaan revisi:
- Jumlah minimum instance: Masukkan
0. - Jumlah maksimum instance: Masukkan
100(atau sesuaikan berdasarkan perkiraan beban).
- Jumlah minimum instance: Masukkan
Klik Create.
Tunggu hingga layanan dibuat (1-2 menit).
Setelah layanan dibuat, editor kode inline akan terbuka secara otomatis.
Menambahkan kode fungsi
- Masukkan main di kolom Entry point.
Di editor kode inline, buat dua file:
- File pertama: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'hypr-events') STATE_KEY = os.environ.get('STATE_KEY', 'hypr-events/state.json') HYPR_API_URL = os.environ.get('HYPR_API_URL') HYPR_API_TOKEN = os.environ.get('HYPR_API_TOKEN') HYPR_RP_APP_ID = os.environ.get('HYPR_RP_APP_ID') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch HYPR MFA logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, HYPR_API_URL, HYPR_API_TOKEN, HYPR_RP_APP_ID]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Convert to Unix milliseconds for HYPR API start_millis = to_unix_millis(last_time) end_millis = to_unix_millis(now) # Fetch logs records, newest_event_time = fetch_logs( api_url=HYPR_API_URL, api_token=HYPR_API_TOKEN, rp_app_id=HYPR_RP_APP_ID, start_time_ms=start_millis, end_time_ms=end_millis, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(api_url: str, api_token: str, rp_app_id: str, start_time_ms: int, end_time_ms: int, page_size: int, max_records: int): """ Fetch logs from HYPR API with pagination and rate limiting. Args: api_url: HYPR API base URL api_token: HYPR API authentication token rp_app_id: HYPR RP application ID start_time_ms: Start time in Unix milliseconds end_time_ms: End time in Unix milliseconds page_size: Number of records per page max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ # Clean up API URL base_url = api_url.rstrip('/') endpoint = f"{base_url}/rp/api/versioned/events" # Bearer token authentication headers = { 'Authorization': f'Bearer {api_token}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-HYPRCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 # Offset-based pagination start_index = 0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request parameters params = [] params.append(f"rpAppId={rp_app_id}") params.append(f"startDate={start_time_ms}") params.append(f"endDate={end_time_ms}") params.append(f"start={start_index}") params.append(f"limit={min(page_size, max_records - len(records))}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) # Extract results page_results = data.get('data', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: # HYPR uses LOGGEDTIMEINUTC field with Unix milliseconds event_time_ms = event.get('LOGGEDTIMEINUTC') if event_time_ms: event_dt = datetime.fromtimestamp(event_time_ms / 1000, tz=timezone.utc) event_time = event_dt.isoformat() if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results current_size = data.get('size', 0) if current_size < page_size: print(f"Reached last page (size={current_size} < limit={page_size})") break start_index += current_size except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time- File kedua: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Klik Deploy untuk menyimpan dan men-deploy fungsi.
Tunggu hingga deployment selesai (2-3 menit).
Buat tugas Cloud Scheduler
Cloud Scheduler akan memublikasikan pesan ke topik Pub/Sub (hypr-logs-trigger) secara berkala, yang memicu fungsi Cloud Run.
- Di GCP Console, buka Cloud Scheduler.
- Klik Create Job.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama hypr-logs-collector-hourlyRegion Pilih region yang sama dengan fungsi Cloud Run Frekuensi 0 * * * *(setiap jam, tepat pada waktunya)Zona Waktu Pilih zona waktu (UTC direkomendasikan) Jenis target Pub/Sub Topik Pilih topik Pub/Sub ( hypr-logs-trigger)Isi pesan {}(objek JSON kosong)Klik Create.
Opsi frekuensi jadwal
Pilih frekuensi berdasarkan volume log dan persyaratan latensi:
| Frekuensi | Ekspresi Cron | Kasus Penggunaan |
|---|---|---|
| Setiap 5 menit | */5 * * * * |
Volume tinggi, latensi rendah |
| Setiap 15 menit | */15 * * * * |
Volume sedang |
| Setiap jam | 0 * * * * |
Standar (direkomendasikan) |
| Setiap 6 jam | 0 */6 * * * |
Volume rendah, pemrosesan batch |
| Harian | 0 0 * * * |
Pengumpulan data historis |
Menguji integrasi
- Di konsol Cloud Scheduler, temukan tugas Anda (
hypr-logs-collector-hourly). - Klik Force run untuk memicu tugas secara manual.
- Tunggu beberapa detik.
- Buka Cloud Run > Services.
- Klik nama fungsi Anda (
hypr-logs-collector). - Klik tab Logs.
Pastikan fungsi berhasil dieksekusi. Cari:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://bucket-name/prefix/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsBuka Cloud Storage > Buckets.
Klik nama bucket Anda (misalnya,
hypr-mfa-logs).Buka folder awalan (misalnya,
hypr-events/).Pastikan file
.ndjsonbaru dibuat dengan stempel waktu saat ini.
Jika Anda melihat error dalam log:
- HTTP 401: Periksa kredensial API di variabel lingkungan
- HTTP 403: Pastikan token HYPR API memiliki izin yang diperlukan dan ID Aplikasi RP sudah benar
- HTTP 429: Pembatasan kecepatan - fungsi akan otomatis mencoba lagi dengan penundaan
- Variabel lingkungan tidak ada: Periksa apakah semua variabel yang diperlukan telah ditetapkan
Mengambil akun layanan Google SecOps
Google SecOps menggunakan akun layanan unik untuk membaca data dari bucket GCS Anda. Anda harus memberi akun layanan ini akses ke bucket Anda.
Mengonfigurasi feed di Google SecOps untuk memproses log MFA HYPR
- Buka Setelan SIEM > Feed.
- Klik Tambahkan Feed Baru.
- Klik Konfigurasi satu feed.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
HYPR MFA Logs from GCS). - Pilih Google Cloud Storage V2 sebagai Source type.
Pilih HYPR MFA sebagai Jenis log.
Klik Get Service Account. Email akun layanan yang unik akan ditampilkan, misalnya:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comSalin alamat email ini untuk digunakan di langkah berikutnya.
Klik Berikutnya.
Tentukan nilai untuk parameter input berikut:
URL bucket penyimpanan: Masukkan URI bucket GCS dengan jalur awalan:
gs://hypr-mfa-logs/hypr-events/- Ganti:
hypr-mfa-logs: Nama bucket GCS Anda.hypr-events: Awalan/jalur folder opsional tempat log disimpan (biarkan kosong untuk root).
- Ganti:
Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda:
- Jangan pernah: Tidak pernah menghapus file apa pun setelah transfer (direkomendasikan untuk pengujian).
- Hapus file yang ditransfer: Menghapus file setelah transfer berhasil.
Hapus file yang ditransfer dan direktori kosong: Menghapus file dan direktori kosong setelah transfer berhasil.
Usia File Maksimum: Menyertakan file yang diubah dalam beberapa hari terakhir. Defaultnya adalah 180 hari.
Namespace aset: Namespace aset.
Label penyerapan: Label yang akan diterapkan ke peristiwa dari feed ini.
Klik Berikutnya.
Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Memberikan izin IAM ke akun layanan Google SecOps
Akun layanan Google SecOps memerlukan peran Storage Object Viewer di bucket GCS Anda.
- Buka Cloud Storage > Buckets.
- Klik nama bucket Anda (misalnya,
hypr-mfa-logs). - Buka tab Izin.
- Klik Grant access.
- Berikan detail konfigurasi berikut:
- Add principals: Tempel email akun layanan Google SecOps.
- Tetapkan peran: Pilih Storage Object Viewer.
- Klik Simpan.
Tabel pemetaan UDM
| Kolom Log | Pemetaan UDM | Logika |
|---|---|---|
| extensions.auth.type | Jenis autentikasi (misalnya, SSO, MFA) | |
| metadata.event_type | Jenis acara (misalnya, USER_LOGIN, NETWORK_CONNECTION) | |
| EVENTNAME | metadata.product_event_type | Jenis peristiwa khusus produk |
| ID | metadata.product_log_id | ID log khusus produk |
| USERAGENT | network.http.parsed_user_agent | Agen pengguna HTTP yang diuraikan |
| USERAGENT | network.http.user_agent | String agen pengguna HTTP |
| SESSIONID | network.session_id | ID sesi |
| DEVICEMODEL | principal.asset.hardware.model | Model hardware aset |
| COMPANION,MACHINEDOMAIN | principal.asset.hostname | Nama host aset |
| REMOTEIP | principal.asset.ip | Alamat IP aset |
| DEVICEID | principal.asset_id | ID unik untuk aset |
| COMPANION,MACHINEDOMAIN | principal.hostname | Nama host yang terkait dengan akun utama |
| REMOTEIP | principal.ip | Alamat IP yang terkait dengan prinsipal |
| DEVICEOS | principal.platform | Platform (misalnya, WINDOWS, LINUX) |
| DEVICEOSVERSION | principal.platform_version | Versi platform |
| BERHASIL | security_result.action | Tindakan yang dilakukan oleh sistem keamanan (misalnya, IZINKAN, BLOKIR) |
| PESAN | security_result.description | Deskripsi hasil keamanan |
| MACHINEUSERNAME | target.user.user_display_name | Nama tampilan pengguna |
| FIDOUSER | target.user.userid | ID Pengguna |
| metadata.product_name | Nama produk | |
| metadata.vendor_name | Nama vendor/perusahaan |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.