Mengumpulkan log URLScan IO

Didukung di:

Dokumen ini menjelaskan cara menyerap log URLScan IO ke Google Security Operations menggunakan Amazon S3.

Sebelum memulai

Pastikan Anda memiliki prasyarat berikut:

  • Instance Google SecOps
  • Akses istimewa ke tenant URLScan IO
  • Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)

Mendapatkan prasyarat URLScan IO

  1. Login ke URLScan IO.
  2. Klik ikon profil Anda.
  3. Pilih API Key dari menu.
  4. Jika Anda belum memiliki kunci API:
    • Klik tombol Create API Key.
    • Masukkan deskripsi untuk kunci API (misalnya, Google SecOps Integration).
    • Pilih izin untuk kunci (untuk akses hanya baca, pilih izin Baca).
    • Klik Generate API Key.
  5. Salin dan simpan detail berikut di lokasi yang aman:
    • API_KEY: String kunci API yang dibuat (format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)
    • URL Dasar API: https://urlscan.io/api/v1 (ini konstan untuk semua pengguna)
  6. Catat batas kuota API Anda:
    • Akun gratis: Dibatasi hingga 1.000 panggilan API per hari, 60 per menit
    • Akun Pro: Batas yang lebih tinggi berdasarkan tingkat langganan
  7. Jika Anda perlu membatasi penelusuran hanya pada pemindaian organisasi Anda, catat:
    • ID pengguna: Nama pengguna atau email Anda (untuk digunakan dengan filter penelusuran user:)
    • ID tim: Jika menggunakan fitur tim (untuk digunakan dengan filter penelusuran team:)

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket.
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, urlscan-logs-bucket).
  3. Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: Tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk referensi di masa mendatang.
  12. Klik Done.
  13. Pilih tab Permissions.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung.
  17. Cari kebijakan AmazonS3FullAccess.
  18. Pilih kebijakan.
  19. Klik Berikutnya.
  20. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Di konsol AWS, buka IAM > Policies.
  2. Klik Buat kebijakan > tab JSON.
  3. Masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::urlscan-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::urlscan-logs-bucket/urlscan/state.json"
        }
      ]
    }
    
    • Ganti urlscan-logs-bucket jika Anda memasukkan nama bucket yang berbeda.
  4. Klik Berikutnya > Buat kebijakan.

  5. Buka IAM > Roles > Create role > AWS service > Lambda.

  6. Lampirkan kebijakan yang baru dibuat.

  7. Beri nama peran urlscan-lambda-role, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di AWS Console, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama urlscan-collector
    Runtime Python 3.13
    Arsitektur x86_64
    Peran eksekusi urlscan-lambda-role
  4. Setelah fungsi dibuat, buka tab Code, hapus stub, lalu masukkan kode berikut (urlscan-collector.py):

    import json
    import os
    import boto3
    from datetime import datetime, timedelta
    import urllib3
    import base64
    
    s3 = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        # Environment variables
        bucket = os.environ['S3_BUCKET']
        prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_key = os.environ['API_KEY']
        api_base = os.environ['API_BASE']
        search_query = os.environ.get('SEARCH_QUERY', 'date:>now-1h')
        page_size = int(os.environ.get('PAGE_SIZE', '100'))
        max_pages = int(os.environ.get('MAX_PAGES', '10'))
    
        # Load state
        state = load_state(bucket, state_key)
        last_run = state.get('last_run')
    
        # Prepare search query
        if last_run:
            # Adjust search query based on last run
            search_time = datetime.fromisoformat(last_run)
            time_diff = datetime.utcnow() - search_time
            hours = int(time_diff.total_seconds() / 3600) + 1
            search_query = f'date:>now-{hours}h'
    
        # Search for scans
        headers = {'API-Key': api_key}
        all_results = []
    
        for page in range(max_pages):
            search_url = f"{api_base}/search/"
            params = {
                'q': search_query,
                'size': page_size,
                'offset': page * page_size
            }
    
            # Make search request
            response = http.request(
                'GET',
                search_url,
                fields=params,
                headers=headers
            )
    
            if response.status != 200:
                print(f"Search failed: {response.status}")
                break
    
            search_data = json.loads(response.data.decode('utf-8'))
            results = search_data.get('results', [])
    
            if not results:
                break
    
            # Fetch full result for each scan
            for result in results:
                uuid = result.get('task', {}).get('uuid')
                if uuid:
                    result_url = f"{api_base}/result/{uuid}/"
                    result_response = http.request(
                        'GET',
                        result_url,
                        headers=headers
                    )
    
                    if result_response.status == 200:
                        full_result = json.loads(result_response.data.decode('utf-8'))
                        all_results.append(full_result)
                    else:
                        print(f"Failed to fetch result for {uuid}: {result_response.status}")
    
            # Check if we have more pages
            if len(results) < page_size:
                break
    
        # Write results to S3
        if all_results:
            now = datetime.utcnow()
            file_key = f"{prefix}year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/urlscan_{now.strftime('%Y%m%d_%H%M%S')}.json"
    
            # Create NDJSON content
            ndjson_content = '\n'.join([json.dumps(r, separators=(',', ':')) for r in all_results])
    
            # Upload to S3
            s3.put_object(
                Bucket=bucket,
                Key=file_key,
                Body=ndjson_content.encode('utf-8'),
                ContentType='application/x-ndjson'
            )
    
            print(f"Uploaded {len(all_results)} results to s3://{bucket}/{file_key}")
    
        # Update state
        state['last_run'] = datetime.utcnow().isoformat()
        save_state(bucket, state_key, state)
    
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Processed {len(all_results)} scan results',
                'location': f"s3://{bucket}/{prefix}"
            })
        }
    
    def load_state(bucket, key):
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            return json.loads(response['Body'].read())
        except s3.exceptions.NoSuchKey:
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(bucket, key, state):
        try:
            s3.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
  5. Buka Configuration > Environment variables.

  6. Klik Edit > Add new environment variable.

  7. Masukkan variabel lingkungan berikut, ganti dengan nilai Anda:

    Kunci Nilai contoh
    S3_BUCKET urlscan-logs-bucket
    S3_PREFIX urlscan/
    STATE_KEY urlscan/state.json
    API_KEY <your-api-key>
    API_BASE https://urlscan.io/api/v1
    SEARCH_QUERY date:>now-1h
    PAGE_SIZE 100
    MAX_PAGES 10
  8. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).

  9. Pilih tab Configuration

  10. Di panel General configuration, klik Edit.

  11. Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour).
    • Target: fungsi Lambda Anda urlscan-collector.
    • Name: urlscan-collector-1h.
  3. Klik Buat jadwal.

Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps

  1. Buka Konsol AWS > IAM > Pengguna.
  2. Klik Add users.
  3. Berikan detail konfigurasi berikut:
    • Pengguna: Masukkan secops-reader.
    • Jenis akses: Pilih Kunci akses – Akses terprogram.
  4. Klik Buat pengguna.
  5. Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
  6. Di editor JSON, masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::urlscan-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::urlscan-logs-bucket"
        }
      ]
    }
    
  7. Tetapkan nama ke secops-reader-policy.

  8. Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.

  9. Buka Kredensial keamanan > Kunci akses > Buat kunci akses.

  10. Download CSV (nilai ini dimasukkan ke dalam feed).

Mengonfigurasi feed di Google SecOps untuk menyerap log URLScan IO

  1. Buka Setelan SIEM > Feed.
  2. Klik Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, URLScan IO logs).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih URLScan IO sebagai Jenis log.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://urlscan-logs-bucket/urlscan/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: Namespace aset.
    • Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.