Mengumpulkan log Telepon Duo
Dokumen ini menjelaskan cara menyerap log Telepon Duo ke Google Security Operations menggunakan Amazon S3. Parser mengekstrak kolom dari log, mentransformasi, dan memetakannya ke Model Data Terpadu (UDM). Proses ini menangani berbagai format log Duo, mengonversi stempel waktu, mengekstrak informasi pengguna, detail jaringan, dan hasil keamanan, lalu menyusun output ke dalam format UDM standar.
Sebelum memulai
Pastikan Anda memenuhi prasyarat berikut:
- Instance Google SecOps.
- Akses istimewa ke Panel Admin Duo dengan peran Pemilik.
- Akses istimewa ke AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).
Kumpulkan prasyarat Duo (kredensial API)
- Login ke Panel Admin Duo sebagai administrator dengan peran Pemilik.
- Buka Applications > Application Catalog.
- Temukan entri untuk Admin API dalam katalog.
- Klik + Tambahkan untuk membuat aplikasi.
- Salin dan simpan detail berikut di lokasi yang aman:
- Kunci Integrasi
- Secret Key
- Nama Host API (misalnya,
api-yyyyyyyy.duosecurity.com
)
- Di bagian Permissions, batalkan pilihan semua opsi izin kecuali Grant read log.
- Klik Simpan Perubahan.
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
duo-telephony-logs
). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download file .CSV untuk menyimpan Kunci Akses dan Kunci Akses Rahasia untuk referensi di masa mendatang.
- Klik Selesai.
- Pilih tab Izin.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies.
- Klik Buat kebijakan > tab JSON.
- Salin dan tempel kebijakan berikut.
Policy JSON (ganti
duo-telephony-logs
jika Anda memasukkan nama bucket yang berbeda):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::duo-telephony-logs/duo-telephony/state.json" } ] }
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat.
Beri nama peran
duo-telephony-lambda-role
, lalu klik Buat peran.
Buat fungsi Lambda
- Di Konsol AWS, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama duo-telephony-logs-collector
Runtime Python 3.13 Arsitektur x86_64 Peran eksekusi duo-telephony-lambda-role
Setelah fungsi dibuat, buka tab Code, hapus stub, dan tempelkan kode berikut (
duo-telephony-logs-collector.py
).import json import boto3 import os import hmac import hashlib import base64 import urllib.parse import urllib.request import email.utils from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional from botocore.exceptions import ClientError s3 = boto3.client('s3') def lambda_handler(event, context): """ Lambda function to fetch Duo telephony logs and store them in S3. """ try: # Get configuration from environment variables bucket_name = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'].rstrip('/') state_key = os.environ['STATE_KEY'] integration_key = os.environ['DUO_IKEY'] secret_key = os.environ['DUO_SKEY'] api_hostname = os.environ['DUO_API_HOST'] # Load state state = load_state(bucket_name, state_key) # Calculate time range now = datetime.now(timezone.utc) if state.get('last_offset'): # Continue from last offset next_offset = state['last_offset'] logs = [] has_more = True else: # Start from last timestamp or 24 hours ago mintime = state.get('last_timestamp_ms', int((now - timedelta(hours=24)).timestamp() * 1000)) # Apply 2-minute delay as recommended by Duo maxtime = int((now - timedelta(minutes=2)).timestamp() * 1000) next_offset = None logs = [] has_more = True # Fetch logs with pagination total_fetched = 0 max_iterations = int(os.environ.get('MAX_ITERATIONS', '10')) while has_more and total_fetched < max_iterations: if next_offset: # Use offset for pagination params = { 'limit': '1000', 'next_offset': next_offset } else: # Initial request with time range params = { 'mintime': str(mintime), 'maxtime': str(maxtime), 'limit': '1000', 'sort': 'ts:asc' } # Make API request with retry logic response = duo_api_call_with_retry( 'GET', api_hostname, '/admin/v2/logs/telephony', params, integration_key, secret_key ) if 'items' in response: logs.extend(response['items']) total_fetched += 1 # Check for more data if 'metadata' in response and 'next_offset' in response['metadata']: next_offset = response['metadata']['next_offset'] state['last_offset'] = next_offset else: has_more = False state['last_offset'] = None # Update timestamp for next run if logs: # Get the latest timestamp from logs latest_ts = max([log.get('ts', '') for log in logs]) if latest_ts: # Convert ISO timestamp to milliseconds dt = datetime.fromisoformat(latest_ts.replace('Z', '+00:00')) state['last_timestamp_ms'] = int(dt.timestamp() * 1000) + 1 else: has_more = False # Save logs to S3 if any were fetched if logs: timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') key = f"{s3_prefix}/telephony_{timestamp}.json" # Format logs as newline-delimited JSON log_data = '\n'.join(json.dumps(log) for log in logs) s3.put_object( Bucket=bucket_name, Key=key, Body=log_data.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Saved {len(logs)} telephony logs to s3://{bucket_name}/{key}") else: print("No new telephony logs found") # Save state save_state(bucket_name, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {len(logs)} telephony logs', 'logs_count': len(logs) }) } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) } def duo_api_call_with_retry(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str, max_retries: int = 3) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API with retry logic. """ for attempt in range(max_retries): try: return duo_api_call(method, host, path, params, ikey, skey) except Exception as e: if '429' in str(e) or '5' in str(e)[:1]: # Rate limit or server error if attempt < max_retries - 1: wait_time = (2 ** attempt) * 2 # Exponential backoff print(f"Retrying after {wait_time} seconds...") import time time.sleep(wait_time) continue raise def duo_api_call(method: str, host: str, path: str, params: Dict[str, str], ikey: str, skey: str) -> Dict[str, Any]: """ Make an authenticated API call to Duo Admin API. """ # Create canonical string for signing using RFC 2822 date format now = email.utils.formatdate() canon = [now, method.upper(), host.lower(), path] # Add parameters args = [] for key in sorted(params.keys()): val = params[key] args.append(f"{urllib.parse.quote(key, '~')}={urllib.parse.quote(val, '~')}") canon.append('&'.join(args)) canon_str = '\n'.join(canon) # Sign the request sig = hmac.new( skey.encode('utf-8'), canon_str.encode('utf-8'), hashlib.sha1 ).hexdigest() # Create authorization header auth = base64.b64encode(f"{ikey}:{sig}".encode('utf-8')).decode('utf-8') # Build URL url = f"https://{host}{path}" if params: url += '?' + '&'.join(args) # Make request req = urllib.request.Request(url) req.add_header('Authorization', f'Basic {auth}') req.add_header('Date', now) req.add_header('Host', host) req.add_header('User-Agent', 'duo-telephony-s3-ingestor/1.0') try: with urllib.request.urlopen(req, timeout=30) as response: data = json.loads(response.read().decode('utf-8')) if data.get('stat') == 'OK': return data.get('response', {}) else: raise Exception(f"API error: {data.get('message', 'Unknown error')}") except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') raise Exception(f"HTTP error {e.code}: {error_body}") def load_state(bucket: str, key: str) -> Dict[str, Any]: """Load state from S3.""" try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read().decode('utf-8')) except ClientError as e: if e.response.get('Error', {}).get('Code') in ('NoSuchKey', '404'): return {} print(f"Error loading state: {e}") return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket: str, key: str, state: Dict[str, Any]): """Save state to S3.""" try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")
Buka Configuration > Environment variables.
Klik Edit > Tambahkan variabel lingkungan baru.
Masukkan variabel lingkungan yang diberikan dalam tabel berikut, dengan mengganti nilai contoh dengan nilai Anda.
Variabel lingkungan
Kunci Nilai contoh S3_BUCKET
duo-telephony-logs
S3_PREFIX
duo-telephony/
STATE_KEY
duo-telephony/state.json
DUO_IKEY
<your-integration-key>
DUO_SKEY
<your-secret-key>
DUO_API_HOST
api-yyyyyyyy.duosecurity.com
MAX_ITERATIONS
10
Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > duo-telephony-logs-collector).
Pilih tab Configuration
Di panel Konfigurasi umum, klik Edit.
Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour
). - Target: fungsi Lambda Anda
duo-telephony-logs-collector
. - Name:
duo-telephony-logs-1h
.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
(Opsional) Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Buka Konsol AWS > IAM > Pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader
. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::duo-telephony-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::duo-telephony-logs" } ] }
Nama =
secops-reader-policy
.Klik Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.
Buat kunci akses untuk
secops-reader
: Kredensial keamanan > Kunci akses.Klik Create access key.
Download
.CSV
. (Anda akan menempelkan nilai ini ke feed).
Mengonfigurasi feed di Google SecOps untuk memproses log Telepon Duo
- Buka Setelan SIEM > Feed.
- Klik + Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
Duo Telephony logs
). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih Log Telepon Duo sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://duo-telephony-logs/duo-telephony/
- Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Tabel pemetaan UDM
Kolom log | Pemetaan UDM | Logika |
---|---|---|
context |
metadata.product_event_type |
Dipetakan langsung dari kolom context dalam log mentah. |
credits |
security_result.detection_fields.value |
Dipetakan langsung dari kolom credits dalam log mentah, bertingkat di bawah objek detection_fields dengan kunci credits yang sesuai. |
eventtype |
security_result.detection_fields.value |
Dipetakan langsung dari kolom eventtype dalam log mentah, bertingkat di bawah objek detection_fields dengan kunci eventtype yang sesuai. |
host |
principal.hostname |
Dipetakan langsung dari kolom host dalam log mentah jika bukan alamat IP. Ditetapkan ke nilai statis "ALLOW" di parser. Ditetapkan ke nilai statis "MECHANISM_UNSPECIFIED" di parser. Diuraikan dari kolom timestamp dalam log mentah, yang merepresentasikan detik sejak epoch. Ditetapkan ke "USER_UNCATEGORIZED" jika kolom context dan host ada dalam log mentah. Disetel ke "STATUS_UPDATE" jika hanya host yang ada. Jika tidak, tetapkan ke "GENERIC_EVENT". Diambil langsung dari kolom log_type log mentah. Disetel ke nilai statis "Telephony" di parser. Disetel ke nilai statis "Duo" di parser. |
phone |
principal.user.phone_numbers |
Dipetakan langsung dari kolom phone dalam log mentah. |
phone |
principal.user.userid |
Dipetakan langsung dari kolom phone dalam log mentah. Ditetapkan ke nilai statis "INFORMATIONAL" di parser. Disetel ke nilai statis "Duo Telephony" di parser. |
timestamp |
metadata.event_timestamp |
Diuraikan dari kolom timestamp dalam log mentah, yang merepresentasikan detik sejak epoch. |
type |
security_result.summary |
Dipetakan langsung dari kolom type dalam log mentah. |
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.