Raccogliere i log di URLScan IO

Supportato in:

Questo documento spiega come importare i log di URLScan IO in Google Security Operations utilizzando Amazon S3.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Accesso privilegiato al tenant URLScan IO
  • Accesso privilegiato a AWS (S3, IAM, Lambda, EventBridge)

Ottieni i prerequisiti di URLScan IO

  1. Accedi a URLScan IO.
  2. Fai clic sull'icona del profilo.
  3. Seleziona Chiave API dal menu.
  4. Se non hai ancora una chiave API:
    • Fai clic sul pulsante Crea chiave API.
    • Inserisci una descrizione per la chiave API (ad es. Google SecOps Integration).
    • Seleziona le autorizzazioni per la chiave (per l'accesso di sola lettura, seleziona le autorizzazioni Leggi).
    • Fai clic su Genera chiave API.
  5. Copia e salva in una posizione sicura i seguenti dettagli:
    • API_KEY: la stringa della chiave API generata (formato: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)
    • API Base URL: https://urlscan.io/api/v1 (questo è costante per tutti gli utenti)
  6. Prendi nota dei limiti della quota API:
    • Account senza costi: limite di 1000 chiamate API al giorno, 60 al minuto
    • Account Pro: limiti più elevati in base al livello di abbonamento
  7. Se devi limitare le ricerche solo alle scansioni della tua organizzazione, prendi nota di:
    • Identificatore utente: il tuo nome utente o indirizzo email (da utilizzare con il filtro di ricerca user:)
    • Identificatore team: se utilizzi la funzionalità dei team (da utilizzare con il filtro di ricerca team:)

Configurare il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket.
  2. Salva il nome e la regione del bucket per riferimento futuro (ad es. urlscan-logs-bucket).
  3. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso secret per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Policy di autorizzazione.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Collega direttamente le policy.
  17. Cerca la policy AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configurare la policy e il ruolo IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy.
  2. Fai clic su Crea policy > scheda JSON.
  3. Inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::urlscan-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::urlscan-logs-bucket/urlscan/state.json"
        }
      ]
    }
    
    • Sostituisci urlscan-logs-bucket se hai inserito un nome del bucket diverso.
  4. Fai clic su Avanti > Crea policy.

  5. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  6. Collega la policy appena creata.

  7. Assegna al ruolo il nome urlscan-lambda-role e fai clic su Crea ruolo.

Creare la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome urlscan-collector
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione urlscan-lambda-role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (urlscan-collector.py):

    import json
    import os
    import boto3
    from datetime import datetime, timedelta
    import urllib3
    import base64
    
    s3 = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        # Environment variables
        bucket = os.environ['S3_BUCKET']
        prefix = os.environ['S3_PREFIX']
        state_key = os.environ['STATE_KEY']
        api_key = os.environ['API_KEY']
        api_base = os.environ['API_BASE']
        search_query = os.environ.get('SEARCH_QUERY', 'date:>now-1h')
        page_size = int(os.environ.get('PAGE_SIZE', '100'))
        max_pages = int(os.environ.get('MAX_PAGES', '10'))
    
        # Load state
        state = load_state(bucket, state_key)
        last_run = state.get('last_run')
    
        # Prepare search query
        if last_run:
            # Adjust search query based on last run
            search_time = datetime.fromisoformat(last_run)
            time_diff = datetime.utcnow() - search_time
            hours = int(time_diff.total_seconds() / 3600) + 1
            search_query = f'date:>now-{hours}h'
    
        # Search for scans
        headers = {'API-Key': api_key}
        all_results = []
    
        for page in range(max_pages):
            search_url = f"{api_base}/search/"
            params = {
                'q': search_query,
                'size': page_size,
                'offset': page * page_size
            }
    
            # Make search request
            response = http.request(
                'GET',
                search_url,
                fields=params,
                headers=headers
            )
    
            if response.status != 200:
                print(f"Search failed: {response.status}")
                break
    
            search_data = json.loads(response.data.decode('utf-8'))
            results = search_data.get('results', [])
    
            if not results:
                break
    
            # Fetch full result for each scan
            for result in results:
                uuid = result.get('task', {}).get('uuid')
                if uuid:
                    result_url = f"{api_base}/result/{uuid}/"
                    result_response = http.request(
                        'GET',
                        result_url,
                        headers=headers
                    )
    
                    if result_response.status == 200:
                        full_result = json.loads(result_response.data.decode('utf-8'))
                        all_results.append(full_result)
                    else:
                        print(f"Failed to fetch result for {uuid}: {result_response.status}")
    
            # Check if we have more pages
            if len(results) < page_size:
                break
    
        # Write results to S3
        if all_results:
            now = datetime.utcnow()
            file_key = f"{prefix}year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/urlscan_{now.strftime('%Y%m%d_%H%M%S')}.json"
    
            # Create NDJSON content
            ndjson_content = '\n'.join([json.dumps(r, separators=(',', ':')) for r in all_results])
    
            # Upload to S3
            s3.put_object(
                Bucket=bucket,
                Key=file_key,
                Body=ndjson_content.encode('utf-8'),
                ContentType='application/x-ndjson'
            )
    
            print(f"Uploaded {len(all_results)} results to s3://{bucket}/{file_key}")
    
        # Update state
        state['last_run'] = datetime.utcnow().isoformat()
        save_state(bucket, state_key, state)
    
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Processed {len(all_results)} scan results',
                'location': f"s3://{bucket}/{prefix}"
            })
        }
    
    def load_state(bucket, key):
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            return json.loads(response['Body'].read())
        except s3.exceptions.NoSuchKey:
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(bucket, key, state):
        try:
            s3.put_object(
                Bucket=bucket,
                Key=key,
                Body=json.dumps(state),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    

  5. Vai a Configurazione > Variabili di ambiente.

  6. Fai clic su Modifica > Aggiungi nuova variabile di ambiente.

  7. Inserisci le seguenti variabili di ambiente, sostituendo i valori con i tuoi:

    Chiave Valore di esempio
    S3_BUCKET urlscan-logs-bucket
    S3_PREFIX urlscan/
    STATE_KEY urlscan/state.json
    API_KEY <your-api-key>
    API_BASE https://urlscan.io/api/v1
    SEARCH_QUERY date:>now-1h
    PAGE_SIZE 100
    MAX_PAGES 10
  8. Dopo aver creato la funzione, rimani nella relativa pagina (o apri Lambda > Funzioni > tua-funzione).

  9. Seleziona la scheda Configurazione.

  10. Nel riquadro Configurazione generale , fai clic su Modifica.

  11. Modifica Timeout in 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Pianificazione ricorrente: Frequenza (1 hour).
    • Target: la tua funzione Lambda urlscan-collector.
    • Nome: urlscan-collector-1h.
  3. Fai clic su Crea pianificazione.

(Facoltativo) Creare un utente e chiavi IAM di sola lettura per Google SecOps

  1. Vai a Console AWS > IAM > Utenti.
  2. Fai clic su Aggiungi utenti.
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega direttamente le policy > Crea policy.
  6. Nell'editor JSON, inserisci la seguente policy:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::urlscan-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::urlscan-logs-bucket"
        }
      ]
    }
    
  7. Imposta il nome su secops-reader-policy.

  8. Vai a Crea policy > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  10. Scarica il CSV (questi valori vengono inseriti nel feed).

Configurare un feed in Google SecOps per importare i log di URLScan IO

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Nel campo Nome feed , inserisci un nome per il feed (ad es. URLScan IO logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona URLScan IO come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://urlscan-logs-bucket/urlscan/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso secret: chiave secret utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Esamina la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.