Raccogliere file CSV IOC personalizzati

Supportato in:

Questo documento spiega come importare file CSV IOC personalizzati in Google Security Operations utilizzando Google Cloud Storage, quindi mappa questi campi all'UDM, gestendo vari tipi di dati come IP, domini e hash e arricchendo l'output con dettagli sulle minacce, informazioni sulle entità e livelli di gravità.

Prima di iniziare

Assicurati di disporre dei seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Un progetto GCP con l'API Cloud Storage abilitata
  • Autorizzazioni per creare e gestire bucket GCS
  • Autorizzazioni per gestire le policy IAM nei bucket GCS
  • Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
  • Accesso a uno o più URL feed IOC CSV (HTTPS) o a un endpoint interno che pubblica CSV

Creazione di un bucket Google Cloud Storage

  1. Vai alla console Google Cloud.
  2. Seleziona il tuo progetto o creane uno nuovo.
  3. Nel menu di navigazione, vai a Cloud Storage > Bucket.
  4. Fai clic su Crea bucket.
  5. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio csv-ioc-logs).
    Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni)
    Località Seleziona la posizione (ad esempio, us-central1).
    Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente)
    Controllo dell'accesso Uniforme (consigliato)
    Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione
  6. Fai clic su Crea.

Crea un service account per la funzione Cloud Run

La funzione Cloud Run richiede un service account con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.

Crea service account

  1. Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
  2. Fai clic su Crea service account.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome del service account: inserisci csv-ioc-collector-sa.
    • Descrizione service account: inserisci Service account for Cloud Run function to collect CSV IOC files.
  4. Fai clic su Crea e continua.
  5. Nella sezione Concedi a questo service account l'accesso al progetto, aggiungi i seguenti ruoli:
    1. Fai clic su Seleziona un ruolo.
    2. Cerca e seleziona Amministratore oggetti di archiviazione.
    3. Fai clic su + Aggiungi un altro ruolo.
    4. Cerca e seleziona Cloud Run Invoker.
    5. Fai clic su + Aggiungi un altro ruolo.
    6. Cerca e seleziona Invoker di Cloud Functions.
  6. Fai clic su Continua.
  7. Fai clic su Fine.

Questi ruoli sono necessari per:

  • Storage Object Admin: scrivi i log nel bucket GCS
  • Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
  • Cloud Functions Invoker: consente la chiamata di funzioni

Concedi autorizzazioni IAM sul bucket GCS

Concedi al service account le autorizzazioni di scrittura sul bucket GCS:

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket.
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: inserisci l'email del service account (ad es. csv-ioc-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Assegna i ruoli: seleziona Storage Object Admin.
  6. Fai clic su Salva.

Crea argomento Pub/Sub

Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.

  1. Nella console GCP, vai a Pub/Sub > Argomenti.
  2. Fai clic su Crea argomento.
  3. Fornisci i seguenti dettagli di configurazione:
    • ID argomento: inserisci csv-ioc-trigger.
    • Lascia le altre impostazioni sui valori predefiniti.
  4. Fai clic su Crea.

Crea una funzione Cloud Run per raccogliere i file CSV IOC

La funzione Cloud Run viene attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i file CSV IOC dagli endpoint HTTPS e li scrive in GCS.

  1. Nella console GCP, vai a Cloud Run.
  2. Fai clic su Crea servizio.
  3. Seleziona Funzione (usa un editor in linea per creare una funzione).
  4. Nella sezione Configura, fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome servizio csv-ioc-collector
    Regione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio us-central1)
    Runtime Seleziona Python 3.12 o versioni successive
  5. Nella sezione Trigger (facoltativo):

    1. Fai clic su + Aggiungi trigger.
    2. Seleziona Cloud Pub/Sub.
    3. In Seleziona un argomento Cloud Pub/Sub, scegli l'argomento Pub/Sub (csv-ioc-trigger).
    4. Fai clic su Salva.
  6. Nella sezione Autenticazione:

    1. Seleziona Richiedi autenticazione.
    2. Controlla Identity and Access Management (IAM).
  7. Scorri verso il basso ed espandi Container, networking, sicurezza.

  8. Vai alla scheda Sicurezza:

    • Service account: seleziona il service account (csv-ioc-collector-sa).
  9. Vai alla scheda Container:

    1. Fai clic su Variabili e secret.
    2. Fai clic su + Aggiungi variabile per ogni variabile di ambiente:
    Nome variabile Valore di esempio Descrizione
    GCS_BUCKET csv-ioc-logs Nome bucket GCS
    GCS_PREFIX csv-ioc Prefisso per i file di log
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv URL HTTPS separati da virgole
    AUTH_HEADER Authorization: Bearer <token> Intestazione di autenticazione facoltativa
    TIMEOUT 60 Timeout richiesta in secondi
  10. Nella sezione Variabili e secret, scorri verso il basso fino a Richieste:

    • Timeout richiesta: inserisci 600 secondi (10 minuti).
  11. Vai alla scheda Impostazioni:

    • Nella sezione Risorse:
      • Memoria: seleziona 512 MiB o un valore superiore.
      • CPU: seleziona 1.
    • Fai clic su Fine.
  12. Nella sezione Scalabilità della revisione:

    • Numero minimo di istanze: inserisci 0.
    • Numero massimo di istanze: inserisci 100 (o modifica in base al carico previsto).
  13. Fai clic su Crea.

  14. Attendi la creazione del servizio (1-2 minuti).

  15. Dopo aver creato il servizio, si apre automaticamente l'editor di codice incorporato.

Aggiungi codice per la funzione

  1. Inserisci main in Entry point della funzione
  2. Nell'editor di codice incorporato, crea due file:

    • Primo file: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone
    import time
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch CSV IOC feeds over HTTPS and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        # Get environment variables
        bucket_name = os.environ.get('GCS_BUCKET')
        prefix = os.environ.get('GCS_PREFIX', 'csv-ioc').strip('/')
        ioc_urls_str = os.environ.get('IOC_URLS', '')
        auth_header = os.environ.get('AUTH_HEADER', '')
        timeout = int(os.environ.get('TIMEOUT', '60'))
    
        ioc_urls = [u.strip() for u in ioc_urls_str.split(',') if u.strip()]
    
        if not bucket_name:
            print('Error: GCS_BUCKET environment variable is required')
            return
    
        if not ioc_urls:
            print('Error: IOC_URLS must contain at least one HTTPS URL')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(bucket_name)
    
            run_ts = int(time.time())
            written = []
    
            for i, url in enumerate(ioc_urls):
                print(f'Processing URL {i+1}/{len(ioc_urls)}: {url}')
    
                # Build request
                req_headers = {'Accept': 'text/csv, */*'}
    
                # Add authentication header if provided
                if auth_header:
                    if ':' in auth_header:
                        k, v = auth_header.split(':', 1)
                        req_headers[k.strip()] = v.strip()
                    else:
                        req_headers['Authorization'] = auth_header.strip()
    
                # Fetch data with retries
                data = fetch_with_retries(url, req_headers, timeout)
    
                if data:
                    # Write to GCS
                    key = generate_blob_name(prefix, url, run_ts, i)
                    blob = bucket.blob(key)
                    blob.upload_from_string(data, content_type='text/csv')
    
                    written.append({
                        'url': url,
                        'gcs_key': key,
                        'bytes': len(data)
                    })
    
                    print(f'Wrote {len(data)} bytes to gs://{bucket_name}/{key}')
                else:
                    print(f'Warning: No data retrieved from {url}')
    
            print(f'Successfully processed {len(written)} URLs')
            print(json.dumps({'ok': True, 'written': written}, indent=2))
    
        except Exception as e:
            print(f'Error processing CSV IOC feeds: {str(e)}')
            raise
    
    def fetch_with_retries(url, headers, timeout, max_retries=5):
        """Fetch data from URL with retry logic for 429/5xx errors."""
        if not url.lower().startswith('https://'):
            raise ValueError('Only HTTPS URLs are allowed in IOC_URLS')
    
        attempt = 0
        backoff = 1.0
    
        while attempt < max_retries:
            try:
                response = http.request('GET', url, headers=headers, timeout=timeout)
    
                if response.status == 200:
                    return response.data.decode('utf-8')
                elif response.status == 429 or (500 <= response.status < 600):
                    print(f'Received status {response.status}, retrying in {backoff}s (attempt {attempt+1}/{max_retries})')
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                else:
                    print(f'Error: Received unexpected status {response.status} from {url}')
                    return None
    
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f'Request failed: {str(e)}, retrying in {backoff}s (attempt {attempt+1}/{max_retries})')
                    time.sleep(backoff)
                    attempt += 1
                    backoff *= 2
                else:
                    raise
    
        print(f'Max retries exceeded for {url}')
        return None
    
    def generate_blob_name(prefix, url, run_ts, idx):
        """Generate a unique blob name for the CSV file."""
        # Create a short, filesystem-safe token for the URL
        safe_url = url.replace('://', '_').replace('/', '_').replace('?', '_').replace('&', '_')[:100]
    
        # Generate timestamp-based path
        timestamp_path = time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))
    
        return f"{prefix}/{timestamp_path}-url{idx:03d}-{safe_url}.csv"
    
    • Secondo file: requirements.txt::
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.

  4. Attendi il completamento del deployment (2-3 minuti).

Crea job Cloud Scheduler

Cloud Scheduler pubblicherà messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.

  1. Nella console di GCP, vai a Cloud Scheduler.
  2. Fai clic su Crea job.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome csv-ioc-collector-hourly
    Regione Seleziona la stessa regione della funzione Cloud Run
    Frequenza 0 * * * * (ogni ora, all'ora)
    Fuso orario Seleziona il fuso orario (UTC consigliato)
    Tipo di target Pub/Sub
    Argomento Seleziona l'argomento Pub/Sub (csv-ioc-trigger)
    Corpo del messaggio {} (oggetto JSON vuoto)
  4. Fai clic su Crea.

Opzioni di frequenza di pianificazione

  • Scegli la frequenza in base al volume dei log e ai requisiti di latenza:

    Frequenza Espressione cron Caso d'uso
    Ogni 5 minuti */5 * * * * Volume elevato, bassa latenza
    Ogni 15 minuti */15 * * * * Volume medio
    Ogni ora 0 * * * * Standard (consigliato)
    Ogni 6 ore 0 */6 * * * Volume basso, elaborazione batch
    Ogni giorno 0 0 * * * Raccolta dei dati storici

Testare l'integrazione

  1. Nella console Cloud Scheduler, trova il tuo job (csv-ioc-collector-hourly).
  2. Fai clic su Forza esecuzione per attivare il job manualmente.
  3. Aspetta alcuni secondi.
  4. Vai a Cloud Run > Servizi.
  5. Fai clic sul nome della funzione (csv-ioc-collector).
  6. Fai clic sulla scheda Log.
  7. Verifica che la funzione sia stata eseguita correttamente. Cerca quanto segue:

    Processing URL 1/X: https://...
    Wrote X bytes to gs://csv-ioc-logs/csv-ioc/YYYY/MM/DD/HHMMSS-url000-...csv
    Successfully processed X URLs
    
  8. Vai a Cloud Storage > Bucket.

  9. Fai clic sul nome del bucket (csv-ioc-logs).

  10. Vai alla cartella del prefisso (csv-ioc/).

  11. Verifica che siano stati creati nuovi file .csv con il timestamp corrente.

Se visualizzi errori nei log:

  • HTTP 401/403: controlla la variabile di ambiente AUTH_HEADER
  • HTTP 429: limitazione della frequenza: la funzione riproverà automaticamente con backoff
  • Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate
  • Sono consentiti solo URL HTTPS: verifica che IOC_URLS contenga solo URL HTTPS

Recuperare il service account Google SecOps

Google SecOps utilizza un service account univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo service account l'accesso al tuo bucket.

Recuperare l'email del service account

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, CSV Custom IOC).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona CSV Custom IOC come Tipo di log.
  7. Fai clic su Ottieni service account. Viene visualizzata un'email del service account univoca, ad esempio:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia questo indirizzo email per utilizzarlo nel passaggio successivo.

Concedi le autorizzazioni IAM al service account Google SecOps

Il service account Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket (csv-ioc-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: incolla l'email del service account Google SecOps.
    • Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
  6. Fai clic su Salva.

Configura un feed in Google SecOps per importare file CSV di indicatori personalizzati di compromissione

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, CSV Custom IOC).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona CSV Custom IOC come Tipo di log.
  7. Fai clic su Avanti.
  8. Specifica i valori per i seguenti parametri di input:

    • URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:

      gs://csv-ioc-logs/csv-ioc/
      
      • Sostituisci:

        • csv-ioc-logs: il nome del bucket GCS.
        • csv-ioc: (facoltativo) il prefisso o il percorso della cartella in cui sono archiviati i log.
      • Esempi:

        • Bucket radice: gs://csv-ioc-logs/
        • Con prefisso: gs://csv-ioc-logs/csv-ioc/
        • Con sottocartella: gs://csv-ioc-logs/ioc-feeds/
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:

      • Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
      • Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
      • Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.

    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.

    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.

    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.

  9. Fai clic su Avanti.

  10. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Funzione logica
asn entity.metadata.threat.detection_fields.asn_label.value Mappato direttamente dal campo "asn".
category entity.metadata.threat.category_details Mappato direttamente dal campo "categoria".
classificazione entity.metadata.threat.category_details Aggiunto a "classificazione - " e mappato al campo "entity.metadata.threat.category_details".
colonna2 entity.entity.hostname Mappato a "entity.entity.hostname" se [category] corrisponde a ". ?ip" o ". ?proxy" e [not_ip] è true.
colonna2 entity.entity.ip Unito a "entity.entity.ip" se [category] corrisponde a ". ?ip" o ". ?proxy" e [not_ip] è false.
confidenza entity.metadata.threat.confidence_score Convertito in float e mappato al campo "entity.metadata.threat.confidence_score".
country entity.entity.location.country_or_region Mappato direttamente dal campo "Paese".
date_first entity.metadata.threat.first_discovered_time Analizzato come ISO8601 e mappato al campo "entity.metadata.threat.first_discovered_time".
date_last entity.metadata.threat.last_updated_time Analizzato come ISO8601 e mappato al campo "entity.metadata.threat.last_updated_time".
dettaglio entity.metadata.threat.summary Mappato direttamente dal campo "Dettagli".
detail2 entity.metadata.threat.description Mappato direttamente dal campo "detail2".
dominio entity.entity.hostname Mappato direttamente dal campo "dominio".
email entity.entity.user.email_addresses Unito al campo "entity.entity.user.email_addresses".
id entity.metadata.product_entity_id Aggiunto a "id - " e mappato al campo "entity.metadata.product_entity_id".
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value Mappato direttamente dal campo "import_session_id".
itype entity.metadata.threat.detection_fields.itype_label.value Mappato direttamente dal campo "itype".
lat entity.entity.location.region_latitude Convertito in float e mappato al campo "entity.entity.location.region_latitude".
Lon entity.entity.location.region_longitude Convertito in float e mappato al campo "entity.entity.location.region_longitude".
maltype entity.metadata.threat.detection_fields.maltype_label.value Mappato direttamente dal campo "maltype".
md5 entity.entity.file.md5 Mappato direttamente dal campo "md5".
media entity.metadata.threat.detection_fields.media_label.value Mappato direttamente dal campo "media".
media_type entity.metadata.threat.detection_fields.media_type_label.value Mappato direttamente dal campo "media_type".
org entity.metadata.threat.detection_fields.org_label.value Mappato direttamente dal campo "org".
resource_uri entity.entity.url Mappato a "entity.entity.url" se [itype] non corrisponde a "(ip
resource_uri entity.metadata.threat.url_back_to_product Mappato a "entity.metadata.threat.url_back_to_product" se [itype] corrisponde a "(ip
punteggio entity.metadata.threat.confidence_details Mappato direttamente dal campo "Punteggio".
gravità entity.metadata.threat.severity Convertito in maiuscolo e mappato al campo "entity.metadata.threat.severity" se corrisponde a "LOW", "MEDIUM", "HIGH" o "CRITICAL".
origine entity.metadata.threat.detection_fields.source_label.value Mappato direttamente dal campo "origine".
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value Mappato direttamente dal campo "source_feed_id".
srcip entity.entity.ip Unito a "entity.entity.ip" se [srcip] non è vuoto e non è uguale a [value].
stato entity.metadata.threat.detection_fields.state_label.value Mappato direttamente dal campo "state".
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value Mappato direttamente dal campo "trusted_circle_ids".
update_id entity.metadata.threat.detection_fields.update_id_label.value Mappato direttamente dal campo "update_id".
valore entity.entity.file.full_path Mappato a "entity.entity.file.full_path" se [category] corrisponde a ".*?file".
valore entity.entity.file.md5 Mappato a "entity.entity.file.md5" se [category] corrisponde a ".*?md5" e [value] è una stringa esadecimale di 32 caratteri.
valore entity.entity.file.sha1 Mappato a "entity.entity.file.sha1" se ([category] corrisponde a ". ?md5" e [value] è una stringa esadecimale di 40 caratteri) o ([category] corrisponde a ". ?sha1" e [value] è una stringa esadecimale di 40 caratteri).
valore entity.entity.file.sha256 Mappato a "entity.entity.file.sha256" se ([category] corrisponde a ". ?md5" e [value] è una stringa esadecimale e [file_type] non è "md5") o ([category] corrisponde a ". ?sha256" e [value] è una stringa esadecimale).
valore entity.entity.hostname Mappato a "entity.entity.hostname" se ([category] corrisponde a ". ?domain") o ([category] corrisponde a ". ?ip" o ".*?proxy" e [not_ip] è true).
valore entity.entity.url Mappato su "entity.entity.url" se ([category] corrisponde a ".*?url") o ([category] corrisponde a "url" e [resource_uri] non è vuoto).
N/D entity.metadata.collected_timestamp Completato con il timestamp dell'evento.
N/D entity.metadata.interval.end_time Imposta un valore costante di 253402300799 secondi.
N/D entity.metadata.interval.start_time Completato con il timestamp dell'evento.
N/D entity.metadata.vendor_name Impostato su un valore costante di "Custom IOC".

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.