Raccogliere i dati di threat intelligence di Team Cymru Scout

Supportato in:

Questo documento spiega come importare i dati di intelligence sulle minacce di Team Cymru Scout in Google Security Operations utilizzando Amazon S3.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Accesso privilegiato al tenant Team Cymru Scout
  • Accesso privilegiato ad AWS (S3, IAM, Lambda, EventBridge)

Prerequisiti per Team Cymru Scout

  1. Accedi alla piattaforma Scout di Team Cymru.
  2. Vai alla pagina Chiavi API.
  3. Fai clic su pulsante Crea.
  4. Se necessario, fornisci la descrizione della chiave.
  5. Fai clic sul pulsante Crea chiave per generare la chiave API.
  6. Copia e salva in una posizione sicura i seguenti dettagli:
    • SCOUT_API_KEY: chiave di accesso all'API
    • SCOUT_BASE_URL: URL di base dell'API Scout

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket.
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, team-cymru-scout-ti).
  3. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Norme relative alle autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Collega direttamente i criteri.
  17. Cerca i criteri AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy.
  2. Fai clic su Crea criterio > scheda JSON.
  3. Inserisci la seguente policy:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutObjects",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::team-cymru-scout-ti/*"
        },
        {
        "Sid": "AllowGetStateObject",
        "Effect": "Allow",
        "Action": "s3:GetObject",
        "Resource": "arn:aws:s3:::team-cymru-scout-ti/team-cymru/scout-ti/state.json"
        }
    ]
    }
    
    • Sostituisci team-cymru-scout-ti se hai inserito un nome bucket diverso.
  4. Fai clic su Avanti > Crea policy.

  5. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  6. Allega la policy appena creata.

  7. Assegna al ruolo il nome TeamCymruScoutToS3Role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome team_cymru_scout_ti_to_s3
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione TeamCymruScoutToS3Role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e inserisci il seguente codice (team_cymru_scout_ti_to_s3.py):

    ```python
    #!/usr/bin/env python3
    # Lambda: Pull Team Cymru Scout Threat Intelligence exports to S3 (no transform)
    
    import os, json, time
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    S3_BUCKET    = os.environ["S3_BUCKET"]
    S3_PREFIX    = os.environ.get("S3_PREFIX", "team-cymru/scout-ti/")
    STATE_KEY    = os.environ.get("STATE_KEY", "team-cymru/scout-ti/state.json")
    WINDOW_SEC   = int(os.environ.get("WINDOW_SECONDS", "3600"))
    HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60"))
    HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3"))
    MODE         = os.environ.get("MODE", "GET").upper()
    API_HEADERS  = json.loads(os.environ.get("API_HEADERS", "{}"))
    MAX_PAGES    = int(os.environ.get("MAX_PAGES", "10"))
    
    # GET mode
    DOWNLOAD_URL_TEMPLATE = os.environ.get("DOWNLOAD_URL_TEMPLATE", "")
    # POST_JSON mode
    API_URL            = os.environ.get("API_URL", "")
    JSON_BODY_TEMPLATE = os.environ.get("JSON_BODY_TEMPLATE", "")
    
    # Team Cymru Scout specific
    SCOUT_BASE_URL = os.environ.get("SCOUT_BASE_URL", "https://api.scout.cymru.com")
    SCOUT_API_KEY  = os.environ.get("SCOUT_API_KEY", "")
    
    s3 = boto3.client("s3")
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            b = obj["Body"].read()
            return json.loads(b) if b else {}
        except Exception:
            return {}
    
    def _put_state(st: dict):
        s3.put_object(
            Bucket=S3_BUCKET, Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _http(url: str, method: str = "GET", body: bytes | None = None) -> tuple[bytes, str]:
        attempt = 0
        while True:
            try:
                req = Request(url, method=method)
                # Add headers
                headers = API_HEADERS.copy()
                if SCOUT_API_KEY and "Authorization" not in headers:
                    headers["Authorization"] = f"Bearer {SCOUT_API_KEY}"
                headers.setdefault("Accept", "application/json")
    
                for k, v in headers.items():
                    req.add_header(k, v)
    
                if body is not None:
                    req.add_header("Content-Type", "application/json")
    
                with urlopen(req, data=body, timeout=HTTP_TIMEOUT) as r:
                    return r.read(), r.headers.get("Content-Type", "application/json")
            except HTTPError as e:
                if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                    delay = 1 + attempt
                    try:
                        delay = int(e.headers.get("Retry-After", delay))
                    except Exception:
                        pass
                    time.sleep(max(1, delay))
                    attempt += 1
                    continue
                raise
            except URLError:
                if attempt < HTTP_RETRIES:
                    time.sleep(1 + attempt)
                    attempt += 1
                    continue
                raise
    
    def _write(blob: bytes, ctype: str, from_ts: float, to_ts: float, page: int) -> str:
        date_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        key = f"{S3_PREFIX}/{date_path}/scout_ti_{int(from_ts)}_{int(to_ts)}_p{page:03d}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=blob, ContentType=ctype or "application/json")
        return key
    
    def _next_cursor(obj: dict) -> str | None:
        if not isinstance(obj, dict):
            return None
    
        for container in (obj, obj.get("meta", {}) or {}, obj.get("metadata", {}) or {}):
            for k in ("next", "next_cursor", "nextCursor", "nextPageToken", "continuation", "cursor", "pagedResultsCookie"):
                v = container.get(k)
                if v:
                    return str(v)
        return None
    
    def _loop(from_ts: float, to_ts: float) -> dict:
        cursor, page, written = None, 0, 0
    
        while page < MAX_PAGES:
            if MODE == "GET":
                if DOWNLOAD_URL_TEMPLATE:
                    url = (DOWNLOAD_URL_TEMPLATE
                        .replace("{FROM}", _iso(from_ts))
                        .replace("{TO}", _iso(to_ts))
                        .replace("{CURSOR}", cursor or ""))
                else:
                    # Default Scout API endpoint (adjust based on actual API)
                    url = f"{SCOUT_BASE_URL}/v1/threat-intelligence?start={_iso(from_ts)}&end={_iso(to_ts)}"
                    if cursor:
                        url += f"&cursor={cursor}"
                blob, ctype = _http(url, method="GET")
            else:
                assert API_URL and JSON_BODY_TEMPLATE, "API_URL and JSON_BODY_TEMPLATE required for MODE=POST_JSON"
                body = (JSON_BODY_TEMPLATE
                        .replace("{FROM}", _iso(from_ts))
                        .replace("{TO}", _iso(to_ts))
                        .replace("{CURSOR}", cursor or "")).encode("utf-8")
                blob, ctype = _http(API_URL, method="POST", body=body)
    
            # Normalize to JSON bytes for storage
            try:
                parsed = json.loads(blob.decode("utf-8"))
                normalized = json.dumps(parsed, separators=(",", ":")).encode("utf-8")
                ctype_out = "application/json"
            except Exception:
                normalized = blob
                ctype_out = ctype or "application/octet-stream"
    
            _ = _write(normalized, ctype_out, from_ts, to_ts, page)
            written += 1
            page += 1
    
            # Follow cursor if JSON and cursor exists
            try:
                if parsed and isinstance(parsed, dict):
                    cursor = _next_cursor(parsed)
                if not cursor:
                    break
            except Exception:
                break
    
        return {"pages": page, "objects": written}
    
    def lambda_handler(event=None, context=None):
        st = _get_state()
        now = time.time()
        from_ts = st.get("last_to_ts") or (now - WINDOW_SEC)
        to_ts = now
        res = _loop(from_ts, to_ts)
        st["last_to_ts"] = to_ts
        _put_state(st)
        return {"ok": True, "window": {"from": _iso(from_ts), "to": _iso(to_ts)}, **res}
    
    if __name__ == "__main__":
        print(lambda_handler())
    ```
    
  5. Vai a Configurazione > Variabili di ambiente.

  6. Fai clic su Modifica > Aggiungi nuova variabile di ambiente.

  7. Inserisci le seguenti variabili di ambiente fornite, sostituendole con i tuoi valori.

    Chiave Valore di esempio
    S3_BUCKET team-cymru-scout-ti
    S3_PREFIX team-cymru/scout-ti/
    STATE_KEY team-cymru/scout-ti/state.json
    SCOUT_BASE_URL https://api.scout.cymru.com
    SCOUT_API_KEY your-scout-api-key
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    HTTP_RETRIES 3
    MODE GET o POST_JSON
    API_HEADERS {"Authorization":"Bearer <token>","Accept":"application/json"}
    DOWNLOAD_URL_TEMPLATE (Modalità GET) Modello di URL personalizzato con {FROM}, {TO}, {CURSOR}
    API_URL URL dell'endpoint API (modalità POST_JSON)
    JSON_BODY_TEMPLATE (modalità POST_JSON) Corpo JSON con {FROM}, {TO}, {CURSOR}
    MAX_PAGES 10
  8. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  9. Seleziona la scheda Configurazione.

  10. Nel riquadro Configurazione generale, fai clic su Modifica.

  11. Modifica Timeout in 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Target: la tua funzione Lambda team_cymru_scout_ti_to_s3.
    • Nome: team-cymru-scout-ti-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utente IAM di sola lettura per Google SecOps

  1. Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
  2. Fai clic su Add users (Aggiungi utenti).
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
  6. Nell'editor JSON, inserisci la seguente policy:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": ["s3:GetObject"],
        "Resource": "arn:aws:s3:::team-cymru-scout-ti/*"
        },
        {
        "Effect": "Allow",
        "Action": ["s3:ListBucket"],
        "Resource": "arn:aws:s3:::team-cymru-scout-ti"
        }
    ]
    }
    
  7. Imposta il nome su secops-reader-policy.

  8. Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.

  10. Scarica il file CSV (questi valori vengono inseriti nel feed).

Configura un feed in Google SecOps per importare Team Cymru Scout Threat Intelligence

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Team Cymru Scout Threat Intelligence).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Team Cymru Scout Threat Intelligence come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://team-cymru-scout-ti/team-cymru/scout-ti/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Formati dei log di intelligence sulle minacce di Team Cymru Scout supportati

Il parser Team Cymru Scout Threat Intelligence supporta i log nei formati KV (LEEF) e CSV.

Log di esempio di Team Cymru Scout Threat Intelligence supportati

  • JSON

    {
      "account_name": "dummy_secops_user",
      "account_type": "basic_auth",
      "used_queries": 1414,
      "remaining_queries": 48586,
      "used_queries_percentage": 2.828,
      "query_limit": 50000,
      "used_foundation_queries": 4224,
      "remaining_foundation_queries": 5776,
      "foundation_query_limit": 10000,
      "used_foundation_queries_percentage": 42.24,
      "event_type": "account_usage"
    }
    

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.