Mengumpulkan log URLScan IO
Dokumen ini menjelaskan cara menyerap log URLScan IO ke Google Security Operations menggunakan Amazon S3.
Sebelum memulai
Pastikan Anda memiliki prasyarat berikut:
- Instance Google SecOps
- Akses istimewa ke tenant URLScan IO
- Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)
Mendapatkan prasyarat URLScan IO
- Login ke URLScan IO.
- Klik ikon profil Anda.
- Pilih API Key dari menu.
- Jika Anda belum memiliki kunci API:
- Klik tombol Create API Key.
- Masukkan deskripsi untuk kunci API (misalnya,
Google SecOps Integration). - Pilih izin untuk kunci (untuk akses hanya baca, pilih izin Baca).
- Klik Generate API Key.
- Salin dan simpan detail berikut di lokasi yang aman:
- API_KEY: String kunci API yang dibuat (format:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) - URL Dasar API:
https://urlscan.io/api/v1(ini konstan untuk semua pengguna)
- API_KEY: String kunci API yang dibuat (format:
- Catat batas kuota API Anda:
- Akun gratis: Dibatasi hingga 1.000 panggilan API per hari, 60 per menit
- Akun Pro: Batas yang lebih tinggi berdasarkan tingkat langganan
- Jika Anda perlu membatasi penelusuran hanya pada pemindaian organisasi Anda, catat:
- ID pengguna: Nama pengguna atau email Anda (untuk digunakan dengan filter penelusuran
user:) - ID tim: Jika menggunakan fitur tim (untuk digunakan dengan filter penelusuran
team:)
- ID pengguna: Nama pengguna atau email Anda (untuk digunakan dengan filter penelusuran
Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps
- Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket.
- Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya,
urlscan-logs-bucket). - Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
- Pilih Pengguna yang dibuat.
- Pilih tab Kredensial keamanan.
- Klik Create Access Key di bagian Access Keys.
- Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
- Klik Berikutnya.
- Opsional: Tambahkan tag deskripsi.
- Klik Create access key.
- Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
- Klik Done.
- Pilih tab Permissions.
- Klik Tambahkan izin di bagian Kebijakan izin.
- Pilih Tambahkan izin.
- Pilih Lampirkan kebijakan secara langsung.
- Cari kebijakan AmazonS3FullAccess.
- Pilih kebijakan.
- Klik Berikutnya.
- Klik Add permissions.
Mengonfigurasi kebijakan dan peran IAM untuk upload S3
- Di konsol AWS, buka IAM > Policies.
- Klik Buat kebijakan > tab JSON.
Masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::urlscan-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::urlscan-logs-bucket/urlscan/state.json" } ] }- Ganti
urlscan-logs-bucketjika Anda memasukkan nama bucket yang berbeda.
- Ganti
Klik Berikutnya > Buat kebijakan.
Buka IAM > Roles > Create role > AWS service > Lambda.
Lampirkan kebijakan yang baru dibuat.
Beri nama peran
urlscan-lambda-role, lalu klik Buat peran.
Buat fungsi Lambda
- Di AWS Console, buka Lambda > Functions > Create function.
- Klik Buat dari awal.
Berikan detail konfigurasi berikut:
Setelan Nilai Nama urlscan-collectorRuntime Python 3.13 Arsitektur x86_64 Peran eksekusi urlscan-lambda-roleSetelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (
urlscan-collector.py):import json import os import boto3 from datetime import datetime, timedelta import urllib3 import base64 s3 = boto3.client('s3') http = urllib3.PoolManager() def lambda_handler(event, context): # Environment variables bucket = os.environ['S3_BUCKET'] prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_key = os.environ['API_KEY'] api_base = os.environ['API_BASE'] search_query = os.environ.get('SEARCH_QUERY', 'date:>now-1h') page_size = int(os.environ.get('PAGE_SIZE', '100')) max_pages = int(os.environ.get('MAX_PAGES', '10')) # Load state state = load_state(bucket, state_key) last_run = state.get('last_run') # Prepare search query if last_run: # Adjust search query based on last run search_time = datetime.fromisoformat(last_run) time_diff = datetime.utcnow() - search_time hours = int(time_diff.total_seconds() / 3600) + 1 search_query = f'date:>now-{hours}h' # Search for scans headers = {'API-Key': api_key} all_results = [] for page in range(max_pages): search_url = f"{api_base}/search/" params = { 'q': search_query, 'size': page_size, 'offset': page * page_size } # Make search request response = http.request( 'GET', search_url, fields=params, headers=headers ) if response.status != 200: print(f"Search failed: {response.status}") break search_data = json.loads(response.data.decode('utf-8')) results = search_data.get('results', []) if not results: break # Fetch full result for each scan for result in results: uuid = result.get('task', {}).get('uuid') if uuid: result_url = f"{api_base}/result/{uuid}/" result_response = http.request( 'GET', result_url, headers=headers ) if result_response.status == 200: full_result = json.loads(result_response.data.decode('utf-8')) all_results.append(full_result) else: print(f"Failed to fetch result for {uuid}: {result_response.status}") # Check if we have more pages if len(results) < page_size: break # Write results to S3 if all_results: now = datetime.utcnow() file_key = f"{prefix}year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/urlscan_{now.strftime('%Y%m%d_%H%M%S')}.json" # Create NDJSON content ndjson_content = '\n'.join([json.dumps(r, separators=(',', ':')) for r in all_results]) # Upload to S3 s3.put_object( Bucket=bucket, Key=file_key, Body=ndjson_content.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Uploaded {len(all_results)} results to s3://{bucket}/{file_key}") # Update state state['last_run'] = datetime.utcnow().isoformat() save_state(bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Processed {len(all_results)} scan results', 'location': f"s3://{bucket}/{prefix}" }) } def load_state(bucket, key): try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read()) except s3.exceptions.NoSuchKey: return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket, key, state): try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")Buka Configuration > Environment variables.
Klik Edit > Add new environment variable.
Masukkan variabel lingkungan berikut, ganti dengan nilai Anda:
Kunci Nilai contoh S3_BUCKETurlscan-logs-bucketS3_PREFIXurlscan/STATE_KEYurlscan/state.jsonAPI_KEY<your-api-key>API_BASEhttps://urlscan.io/api/v1SEARCH_QUERYdate:>now-1hPAGE_SIZE100MAX_PAGES10Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).
Pilih tab Configuration
Di panel General configuration, klik Edit.
Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.
Membuat jadwal EventBridge
- Buka Amazon EventBridge > Scheduler > Create schedule.
- Berikan detail konfigurasi berikut:
- Jadwal berulang: Tarif (
1 hour). - Target: fungsi Lambda Anda
urlscan-collector. - Name:
urlscan-collector-1h.
- Jadwal berulang: Tarif (
- Klik Buat jadwal.
Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps
- Buka Konsol AWS > IAM > Pengguna.
- Klik Add users.
- Berikan detail konfigurasi berikut:
- Pengguna: Masukkan
secops-reader. - Jenis akses: Pilih Kunci akses – Akses terprogram.
- Pengguna: Masukkan
- Klik Buat pengguna.
- Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
Di editor JSON, masukkan kebijakan berikut:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::urlscan-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::urlscan-logs-bucket" } ] }Tetapkan nama ke
secops-reader-policy.Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.
Buka Kredensial keamanan > Kunci akses > Buat kunci akses.
Download CSV (nilai ini dimasukkan ke dalam feed).
Mengonfigurasi feed di Google SecOps untuk menyerap log URLScan IO
- Buka Setelan SIEM > Feed.
- Klik Tambahkan Feed Baru.
- Di kolom Nama feed, masukkan nama untuk feed (misalnya,
URLScan IO logs). - Pilih Amazon S3 V2 sebagai Jenis sumber.
- Pilih URLScan IO sebagai Jenis log.
- Klik Berikutnya.
- Tentukan nilai untuk parameter input berikut:
- URI S3:
s3://urlscan-logs-bucket/urlscan/ - Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
- Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
- ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
- Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
- Namespace aset: Namespace aset.
- Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
- URI S3:
- Klik Berikutnya.
- Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.
Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.