Raccogli i log di Cisco Application Centric Infrastructure (ACI)

Supportato in:

Questo documento spiega come importare i log di Cisco Application Centric Infrastructure (ACI) in Google Security Operations. Il parser tenta innanzitutto di elaborare i log Cisco ACI in entrata come messaggi syslog utilizzando i pattern Grok. Se l'analisi syslog non va a buon fine, il messaggio viene considerato in formato JSON e analizzato di conseguenza. Infine, mappa i campi estratti al modello UDM (Unified Data Model).

Questa integrazione supporta due metodi:

  • Opzione 1: formato Syslog utilizzando l'agente Bindplane
  • Opzione 2: formato JSON utilizzando Google Cloud Storage tramite l'API REST APIC

Ogni opzione è autonoma e può essere implementata in modo indipendente in base ai requisiti dell'infrastruttura e alle preferenze di formato dei log.

Opzione 1: Syslog utilizzando l'agente Bindplane

Questa opzione configura Cisco ACI Fabric per inviare messaggi syslog a un agente Bindplane, che li inoltra a Google Security Operations per l'analisi.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Un host Windows 2016 o versioni successive o Linux con systemd
  • Se l'agente viene eseguito dietro un proxy, assicurati che le porte del firewall siano aperte in base ai requisiti dell'agente Bindplane
  • Accesso con privilegi alla console Cisco APIC

Recuperare il file di autenticazione dell'importazione di Google SecOps

  1. Vai a Impostazioni SIEM > Agenti di raccolta.
  2. Scarica il file di autenticazione importazione.
  3. Salva il file in modo sicuro sul sistema in cui verrà installato Bindplane.

Recuperare l'ID cliente Google SecOps

  1. Vai a Impostazioni SIEM > Profilo.
  2. Copia e salva l'ID cliente dalla sezione Dettagli dell'organizzazione.

Installa l'agente Bindplane

Installa l'agente Bindplane sul sistema operativo Windows o Linux seguendo le istruzioni riportate di seguito.

Installazione di finestre

  1. Apri il prompt dei comandi o PowerShell come amministratore.
  2. Esegui questo comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Installazione di Linux

  1. Apri un terminale con privilegi di root o sudo.
  2. Esegui questo comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Per ulteriori opzioni di installazione, consulta la guida all'installazione dell'agente Bindplane.

Configura l'agente Bindplane per importare Syslog e inviarlo a Google SecOps

Accedi al file di configurazione

  1. Individua il file config.yaml. In genere si trova nella directory /etc/bindplane-agent/ su Linux o nella directory di installazione su Windows.
  2. Apri il file utilizzando un editor di testo (ad esempio nano, vi o Blocco note).
  3. Modifica il file config.yaml:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'CISCO_ACI'
        raw_log_field: body
        ingestion_labels:
          service:
    
    pipelines:
      logs/source0__chronicle_w_labels-0:
        receivers:
          - udplog
        exporters:
          - chronicle/chronicle_w_labels
    
    • Sostituisci quanto segue:
      • Sostituisci la porta e l'indirizzo IP in base alle esigenze della tua infrastruttura.
      • Sostituisci <CUSTOMER_ID> con l'ID cliente effettivo.
      • Aggiorna /path/to/ingestion-authentication-file.json al percorso in cui è stato salvato il file di autenticazione.

Riavvia l'agente Bindplane per applicare le modifiche

  • Per riavviare l'agente Bindplane in Linux, esegui questo comando:

    sudo systemctl restart bindplane-agent
    
  • Per riavviare l'agente Bindplane in Windows, puoi utilizzare la console Servizi o inserire il seguente comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configura l'inoltro Syslog su Cisco ACI

Configura il contratto di gestione fuori banda

  1. Accedi alla console Cisco APIC.
  2. Vai a Tenant > Gestione > Contratti > Filtri.
  3. Fai clic su Crea filtro.
  4. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci syslog-udp-514.
    • Nome voce: inserisci syslog.
    • EtherType: seleziona IP.
    • Protocollo IP: seleziona UDP.
    • Intervallo porte di destinazione da: inserisci 514.
    • Intervallo porte di destinazione a: inserisci 514.
  5. Fai clic su Invia.

Crea contratto di gestione

  1. Vai a Tenant > mgmt > Contratti > Standard.
  2. Fai clic su Crea contratto.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci mgmt-syslog-contract.
    • Ambito: seleziona Contesto.
  4. Fai clic su Invia.
  5. Espandi il contratto e fai clic su Materie.
  6. Fai clic su Crea oggetto del contratto.
  7. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci syslog-subject.
    • Applica in entrambe le direzioni: seleziona questa opzione.
  8. Fai clic su Invia.
  9. Espandi l'argomento e fai clic su Filtri.
  10. Fai clic su Crea associazione filtro.
  11. Seleziona il filtro syslog-udp-514.
  12. Fai clic su Invia.

Configura gruppo di destinazione Syslog

  1. Vai ad Amministrazione > Collector di dati esterni > Destinazioni di monitoraggio > Syslog.
  2. Fai clic con il tasto destro del mouse su Syslog e seleziona Crea gruppo di destinazione di monitoraggio Syslog.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci Chronicle-Syslog-Group.
    • Stato amministratore: seleziona Attivato.
    • Formato: seleziona aci.
  4. Fai clic su Avanti.
  5. Nella finestra di dialogo Crea destinazione di monitoraggio syslog:
    • Nome: inserisci Chronicle-BindPlane.
    • Host: inserisci l'indirizzo IP del server dell'agente Bindplane.
    • Porta: inserisci 514.
    • Stato amministratore: seleziona Attivato.
    • Gravità: seleziona Informazioni (per acquisire log dettagliati).
  6. Fai clic su Invia.

Configura le policy di monitoraggio

Norme di monitoraggio di Fabric
  1. Vai a Fabric > Fabric Policies > Policies > Monitoring > Common Policy.
  2. Espandi Callhome/Smart Callhome/SNMP/Syslog/TACACS.
  3. Fai clic con il tasto destro del mouse su Syslog e seleziona Crea origine Syslog.
  4. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci Chronicle-Fabric-Syslog.
    • Log di controllo: seleziona questa opzione per includere gli eventi di controllo.
    • Eventi: seleziona questa opzione per includere gli eventi di sistema.
    • Guasti: seleziona questa opzione per includere gli eventi di guasto.
    • Log di sessione: seleziona questa opzione per includere i log di sessione.
    • Gruppo di destinazione: seleziona Chronicle-Syslog-Group.
  5. Fai clic su Invia.
Policy di monitoraggio dell'accesso
  1. Vai a Fabric > Norme di accesso > Norme > Monitoraggio > Norme predefinite.
  2. Espandi Callhome/Smart Callhome/SNMP/Syslog.
  3. Fai clic con il tasto destro del mouse su Syslog e seleziona Crea origine Syslog.
  4. Fornisci i seguenti dettagli di configurazione:
    • Nome: inserisci Chronicle-Access-Syslog.
    • Log di controllo: seleziona questa opzione per includere gli eventi di controllo.
    • Eventi: seleziona questa opzione per includere gli eventi di sistema.
    • Guasti: seleziona questa opzione per includere gli eventi di guasto.
    • Log di sessione: seleziona questa opzione per includere i log di sessione.
    • Gruppo di destinazione: seleziona Chronicle-Syslog-Group.
  5. Fai clic su Invia.

Configura il criterio per i messaggi syslog di sistema

  1. Vai a Fabric > Fabric Policies > Policies > Monitoring > Common Policy.
  2. Espandi Criteri per i messaggi Syslog.
  3. Fai clic su predefinita.
  4. Nella sezione Filtro struttura:
    • Struttura: seleziona predefinita.
    • Gravità minima: imposta Informazioni.
  5. Fai clic su Invia.

Opzione 2: JSON utilizzando Google Cloud Storage

Questa opzione utilizza l'API REST APIC per raccogliere eventi, errori e audit log in formato JSON da Cisco ACI Fabric e li archivia in Google Cloud Storage per l'importazione di Google SecOps.

Prima di iniziare

Assicurati di disporre dei seguenti prerequisiti:

  • Un'istanza Google SecOps
  • Progetto GCP con l'API Cloud Storage abilitata
  • Autorizzazioni per creare e gestire bucket GCS
  • Autorizzazioni per gestire le policy IAM nei bucket GCS
  • Autorizzazioni per creare servizi Cloud Run, argomenti Pub/Sub e job Cloud Scheduler
  • Accesso con privilegi alla console Cisco APIC

Raccogli i prerequisiti di Cisco ACI APIC

Ottieni le credenziali APIC

  1. Accedi alla console Cisco APIC utilizzando HTTPS.
  2. Vai ad Admin > AAA (su APIC 6.0+) o Admin > Authentication > AAA (su versioni precedenti).

  3. Crea o utilizza un utente locale esistente con privilegi appropriati.

  4. Copia e salva in una posizione sicura i seguenti dettagli:

    • Nome utente APIC: utente locale con accesso in lettura ai dati di monitoraggio
    • Password API: password utente
    • URL APIC: l'URL HTTPS del tuo APIC (ad esempio, https://apic.example.com)

Creazione di un bucket Google Cloud Storage

  1. Vai alla consoleGoogle Cloud .
  2. Seleziona il tuo progetto o creane uno nuovo.
  3. Nel menu di navigazione, vai a Cloud Storage > Bucket.
  4. Fai clic su Crea bucket.
  5. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio cisco-aci-logs).
    Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni)
    Località Seleziona la posizione (ad esempio, us-central1).
    Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente)
    Controllo dell'accesso Uniforme (consigliato)
    Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione
  6. Fai clic su Crea.

Crea un service account per la funzione Cloud Run

La funzione Cloud Run richiede un service account con autorizzazioni di scrittura nel bucket GCS e di invocazione da parte di Pub/Sub.

Crea service account

  1. Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
  2. Fai clic su Crea service account.
  3. Fornisci i seguenti dettagli di configurazione:
    • Nome del service account: inserisci cisco-aci-collector-sa.
    • Descrizione service account: inserisci Service account for Cloud Run function to collect Cisco ACI logs.
  4. Fai clic su Crea e continua.
  5. Nella sezione Concedi a questo service account l'accesso al progetto, aggiungi i seguenti ruoli:
    1. Fai clic su Seleziona un ruolo.
    2. Cerca e seleziona Amministratore oggetti di archiviazione.
    3. Fai clic su + Aggiungi un altro ruolo.
    4. Cerca e seleziona Cloud Run Invoker.
    5. Fai clic su + Aggiungi un altro ruolo.
    6. Cerca e seleziona Invoker di Cloud Functions.
  6. Fai clic su Continua.
  7. Fai clic su Fine.

Questi ruoli sono necessari per:

  • Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
  • Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
  • Cloud Functions Invoker: consente la chiamata di funzioni

Concedi autorizzazioni IAM sul bucket GCS

Concedi al service account (cisco-aci-collector-sa) le autorizzazioni di scrittura sul bucket GCS:

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket (ad esempio cisco-aci-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: inserisci l'email del service account (ad es. cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Assegna i ruoli: seleziona Storage Object Admin.
  6. Fai clic su Salva.

Crea argomento Pub/Sub

Crea un argomento Pub/Sub a cui Cloud Scheduler pubblicherà e a cui la funzione Cloud Run si iscriverà.

  1. Nella console GCP, vai a Pub/Sub > Argomenti.
  2. Fai clic su Crea argomento.
  3. Fornisci i seguenti dettagli di configurazione:
    • ID argomento: inserisci cisco-aci-trigger.
    • Lascia le altre impostazioni sui valori predefiniti.
  4. Fai clic su Crea.

Crea una funzione Cloud Run per raccogliere i log

La funzione Cloud Run verrà attivata dai messaggi Pub/Sub di Cloud Scheduler per recuperare i log dall'API REST Cisco APIC e scriverli in GCS.

  1. Nella console GCP, vai a Cloud Run.
  2. Fai clic su Crea servizio.
  3. Seleziona Funzione (usa un editor in linea per creare una funzione).
  4. Nella sezione Configura, fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome servizio cisco-aci-collector
    Regione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio us-central1)
    Runtime Seleziona Python 3.12 o versioni successive
  5. Nella sezione Trigger (facoltativo):

    1. Fai clic su + Aggiungi trigger.
    2. Seleziona Cloud Pub/Sub.
    3. In Seleziona un argomento Cloud Pub/Sub, scegli l'argomento Pub/Sub (cisco-aci-trigger).
    4. Fai clic su Salva.
  6. Nella sezione Autenticazione:

    • Seleziona Richiedi autenticazione.
    • Seleziona Identity and Access Management (IAM).
  1. Scorri fino a Container, networking, sicurezza ed espandi la sezione.
  2. Vai alla scheda Sicurezza:
    • Service account: seleziona il service account (cisco-aci-collector-sa).
  3. Vai alla scheda Container:

    • Fai clic su Variabili e secret.
    • Fai clic su + Aggiungi variabile per ogni variabile di ambiente:

      Nome variabile Valore di esempio Descrizione
      GCS_BUCKET cisco-aci-logs Nome bucket GCS
      GCS_PREFIX cisco-aci-events Prefisso per i file di log
      STATE_KEY cisco-aci-events/state.json Percorso file di stato
      APIC_URL https://apic.example.com URL HTTPS APIC
      APIC_USERNAME your-apic-username Nome utente APIC
      APIC_PASSWORD your-apic-password Password APIC
      PAGE_SIZE 100 Record per pagina
      MAX_PAGES 10 Pagine massime per esecuzione
  4. Nella sezione Variabili e secret, scorri fino a Richieste:

    • Timeout richiesta: inserisci 300 secondi (5 minuti).
  5. Vai alla scheda Impostazioni:

    • Nella sezione Risorse:
      • Memoria: seleziona 512 MiB o un valore superiore.
      • CPU: seleziona 1.
  6. Nella sezione Scalabilità della revisione:

    • Numero minimo di istanze: inserisci 0.
    • Numero massimo di istanze: inserisci 100 (o modifica in base al carico previsto).
  7. Fai clic su Crea.

  8. Attendi la creazione del servizio (1-2 minuti).

  9. Dopo aver creato il servizio, si aprirà automaticamente l'editor di codice incorporato.

Aggiungi codice per la funzione

  1. Inserisci main nel campo Entry point (Punto di ingresso).
  2. Nell'editor di codice incorporato, crea due file:

    • Primo file: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=60.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json')
    APIC_URL = os.environ.get('APIC_URL')
    APIC_USERNAME = os.environ.get('APIC_USERNAME')
    APIC_PASSWORD = os.environ.get('APIC_PASSWORD')
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]):
            logger.error('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            last_timestamp = state.get('last_timestamp')
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
    
            logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}")
    
            # Authenticate to APIC
            session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD)
            headers = {
                'Cookie': f'APIC-cookie={session_token}',
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            }
    
            # Data types to collect
            data_types = ['faultInst', 'eventRecord', 'aaaModLR']
            all_collected_data = []
    
            for data_type in data_types:
                logger.info(f"Collecting {data_type} data")
                collected_data = collect_aci_data(
                    APIC_URL,
                    headers,
                    data_type,
                    last_timestamp,
                    PAGE_SIZE,
                    MAX_PAGES
                )
    
                # Tag each record with its type
                for record in collected_data:
                    record['_data_type'] = data_type
    
                all_collected_data.extend(collected_data)
                logger.info(f"Collected {len(collected_data)} {data_type} records")
    
            logger.info(f"Total records collected: {len(all_collected_data)}")
    
            # Store data in GCS if any were collected
            if all_collected_data:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson"
    
                # Convert to NDJSON format (one JSON object per line)
                ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data)
    
                # Upload to GCS
                blob = bucket.blob(s3_key)
                blob.upload_from_string(
                    ndjson_content,
                    content_type='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}")
    
                # Update state file with latest timestamp from collected data
                latest_timestamp = get_latest_timestamp_from_records(all_collected_data)
                if not latest_timestamp:
                    latest_timestamp = datetime.utcnow().isoformat() + 'Z'
    
                update_state(bucket, STATE_KEY, latest_timestamp)
            else:
                logger.info("No new log records found.")
    
            logger.info(f"Successfully processed {len(all_collected_data)} records")
    
        except Exception as e:
            logger.error(f'Error processing logs: {str(e)}')
            raise
    
    def authenticate_apic(apic_url, username, password):
        """Authenticate to APIC and return session token"""
        login_url = f"{apic_url}/api/aaaLogin.json"
        login_data = {
            "aaaUser": {
                "attributes": {
                    "name": username,
                    "pwd": password
                }
            }
        }
    
        response = http.request(
            'POST',
            login_url,
            body=json.dumps(login_data).encode('utf-8'),
            headers={'Content-Type': 'application/json'},
            timeout=30
        )
    
        if response.status != 200:
            raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}")
    
        response_data = json.loads(response.data.decode('utf-8'))
        token = response_data['imdata'][0]['aaaLogin']['attributes']['token']
        logger.info("Successfully authenticated to APIC")
        return token
    
    def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):
        """Collect data from APIC REST API with pagination"""
        all_data = []
        page = 0
    
        while page < max_pages:
            # Build API URL with pagination and time filters
            api_url = f"{apic_url}/api/class/{data_type}.json"
            params = [
                f'page-size={page_size}',
                f'page={page}',
                f'order-by={data_type}.created|asc'
            ]
    
            # Add time filter to prevent duplicates
            if last_timestamp:
                params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")')
    
            full_url = f"{api_url}?{'&'.join(params)}"
    
            logger.info(f"Fetching {data_type} page {page} from APIC")
    
            # Make API request
            response = http.request('GET', full_url, headers=headers, timeout=60)
    
            if response.status != 200:
                logger.error(f"API request failed: {response.status} {response.data[:256]!r}")
                break
    
            data = json.loads(response.data.decode('utf-8'))
            records = data.get('imdata', [])
    
            if not records:
                logger.info(f"No more {data_type} records found")
                break
    
            # Extract the actual data from APIC format
            extracted_records = []
            for record in records:
                if data_type in record:
                    extracted_records.append(record[data_type])
    
            all_data.extend(extracted_records)
            page += 1
    
            # If we got less than page_size records, we've reached the end
            if len(records) < page_size:
                break
    
        return all_data
    
    def get_last_timestamp(bucket, state_key):
        """Get the last run timestamp from GCS state file"""
        try:
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                return state.get('last_timestamp')
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
    
        return None
    
    def get_latest_timestamp_from_records(records):
        """Get the latest timestamp from collected records to prevent missing events"""
        if not records:
            return None
    
        latest = None
        latest_time = None
    
        for record in records:
            try:
                # Handle both direct attributes and nested structure
                attrs = record.get('attributes', record)
                created = attrs.get('created')
                modTs = attrs.get('modTs')
    
                # Use created or modTs as fallback
                timestamp = created or modTs
    
                if timestamp:
                    if latest_time is None or timestamp > latest_time:
                        latest_time = timestamp
                        latest = record
            except Exception as e:
                logger.debug(f"Error parsing timestamp from record: {e}")
                continue
    
        return latest_time
    
    def update_state(bucket, state_key, timestamp):
        """Update the state file with the current timestamp"""
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
            blob = bucket.blob(state_key)
            blob.upload_from_string(
                json.dumps(state_data),
                content_type='application/json'
            )
            logger.info(f"Updated state file with timestamp: {timestamp}")
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            logger.warning(f"Could not load state: {e}")
    
        return {}
    
    • Secondo file: requirements.txt::
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Fai clic su Esegui il deployment per salvare la funzione ed eseguirne il deployment.

  4. Attendi il completamento del deployment (2-3 minuti).

Crea job Cloud Scheduler

Cloud Scheduler pubblica messaggi nell'argomento Pub/Sub a intervalli regolari, attivando la funzione Cloud Run.

  1. Nella console di GCP, vai a Cloud Scheduler.
  2. Fai clic su Crea job.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome cisco-aci-collector-15m
    Regione Seleziona la stessa regione della funzione Cloud Run
    Frequenza */15 * * * * (ogni 15 minuti)
    Fuso orario Seleziona il fuso orario (UTC consigliato)
    Tipo di target Pub/Sub
    Argomento Seleziona l'argomento Pub/Sub (cisco-aci-trigger)
    Corpo del messaggio {} (oggetto JSON vuoto)
  4. Fai clic su Crea.

Opzioni di frequenza di pianificazione

  • Scegli la frequenza in base al volume dei log e ai requisiti di latenza:

    Frequenza Espressione cron Caso d'uso
    Ogni 5 minuti */5 * * * * Volume elevato, bassa latenza
    Ogni 15 minuti */15 * * * * Volume medio (consigliato)
    Ogni ora 0 * * * * Standard
    Ogni 6 ore 0 */6 * * * Volume basso, elaborazione batch
    Ogni giorno 0 0 * * * Raccolta dei dati storici

Testare l'integrazione

  1. Nella console Cloud Scheduler, trova il tuo job (cisco-aci-collector-15m).
  2. Fai clic su Forza esecuzione per attivare il job manualmente.
  3. Aspetta alcuni secondi.
  4. Vai a Cloud Run > Servizi.
  5. Fai clic sul nome della funzione (cisco-aci-collector).
  6. Fai clic sulla scheda Log.
  7. Verifica che la funzione sia stata eseguita correttamente. Cerca quanto segue:

    Starting Cisco ACI data collection for bucket: cisco-aci-logs
    Successfully authenticated to APIC
    Collecting faultInst data
    Collected X faultInst records
    Collecting eventRecord data
    Collected X eventRecord records
    Collecting aaaModLR data
    Collected X aaaModLR records
    Total records collected: X
    Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Vai a Cloud Storage > Bucket.

  9. Fai clic sul nome del bucket (cisco-aci-logs).

  10. Vai alla cartella del prefisso (cisco-aci-events/).

  11. Verifica che sia stato creato un nuovo file .ndjson con il timestamp corrente.

Se visualizzi errori nei log:

  • HTTP 401: controlla le credenziali APIC nelle variabili di ambiente
  • HTTP 403: verifica che l'account APIC disponga delle autorizzazioni di lettura per le classi faultInst, eventRecord e aaaModLR
  • Errori di connessione: verifica che la funzione Cloud Run possa raggiungere l'URL APIC su TCP/443
  • Variabili di ambiente mancanti: controlla che tutte le variabili richieste siano impostate

Recuperare il service account Google SecOps

Google SecOps utilizza un service account univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo service account l'accesso al tuo bucket.

Recuperare l'email del service account

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Cisco ACI JSON logs).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona Cisco Application Centric Infrastructure come Tipo di log.
  7. Fai clic su Ottieni service account. Viene visualizzata un'email del service account univoca, ad esempio:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia questo indirizzo email. Lo utilizzerai nel prossimo passaggio.

Concedi le autorizzazioni IAM al service account Google SecOps

Il service account Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.

  1. Vai a Cloud Storage > Bucket.
  2. Fai clic sul nome del bucket (cisco-aci-logs).
  3. Vai alla scheda Autorizzazioni.
  4. Fai clic su Concedi l'accesso.
  5. Fornisci i seguenti dettagli di configurazione:
    • Aggiungi entità: incolla l'email del service account Google SecOps.
    • Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
  6. Fai clic su Salva.

Configura un feed in Google SecOps per importare i log Cisco ACI

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo feed.
  3. Fai clic su Configura un singolo feed.
  4. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Cisco ACI JSON logs).
  5. Seleziona Google Cloud Storage V2 come Tipo di origine.
  6. Seleziona Cisco Application Centric Infrastructure come Tipo di log.
  7. Fai clic su Avanti.
  8. Specifica i valori per i seguenti parametri di input:

    • URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:

      gs://cisco-aci-logs/cisco-aci-events/
      
      • Sostituisci:
        • cisco-aci-logs: il nome del bucket GCS.
        • cisco-aci-events: il percorso del prefisso/della cartella in cui sono archiviati i log.
    • Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:

      • Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
      • Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
      • Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.

    • Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.

    • Etichette di importazione: l'etichetta applicata agli eventi di questo feed.

  9. Fai clic su Avanti.

  10. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Funzione logica
@timestamp read_only_udm.metadata.event_timestamp Il valore viene estratto dal campo di log non elaborato "@timestamp" e analizzato come timestamp.
aci_tag read_only_udm.metadata.product_log_id Il valore viene estratto dal campo del log non elaborato "aci_tag".
cisco_timestamp - Non mappato.
DIP read_only_udm.target.ip Il valore viene estratto dal campo del log non elaborato "DIP".
DPort read_only_udm.target.port Il valore viene estratto dal campo del log non elaborato "DPort" e convertito in numero intero.
descrizione read_only_udm.security_result.description Il valore viene estratto dal campo del log non elaborato "description".
fault_cause read_only_udm.additional.fields.value.string_value Il valore viene estratto dal campo del log non elaborato "fault_cause". La chiave è impostata su "Fault Cause".
nome host read_only_udm.principal.hostname Il valore viene estratto dal campo log non elaborato "hostname".
lifecycle_state read_only_udm.metadata.product_event_type Il valore viene estratto dal campo del log non elaborato "lifecycle_state".
log.source.address - Non mappato.
logstash.collect.host - Non mappato.
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp Il valore viene estratto dal campo log grezzo "logstash.collect.timestamp" e analizzato come timestamp.
logstash.ingest.host read_only_udm.intermediary.hostname Il valore viene estratto dal campo log non elaborato "logstash.ingest.host".
logstash.irm_environment read_only_udm.additional.fields.value.string_value Il valore viene estratto dal campo del log non elaborato "logstash.irm_environment". La chiave è impostata su "IRM_Environment".
logstash.irm_region read_only_udm.additional.fields.value.string_value Il valore viene estratto dal campo log non elaborato "logstash.irm_region". La chiave è impostata su "IRM_Region".
logstash.irm_site read_only_udm.additional.fields.value.string_value Il valore viene estratto dal campo log grezzo "logstash.irm_site". La chiave è impostata su "IRM_Site".
logstash.process.host read_only_udm.intermediary.hostname Il valore viene estratto dal campo del log non elaborato "logstash.process.host".
messaggio - Non mappato.
message_class - Non mappato.
message_code - Non mappato.
message_content - Non mappato.
message_dn - Non mappato.
message_type read_only_udm.metadata.product_event_type Il valore viene estratto dal campo del log non elaborato "message_type" dopo la rimozione delle parentesi quadre.
node_link read_only_udm.principal.process.file.full_path Il valore viene estratto dal campo del log non elaborato "node_link".
PktLen read_only_udm.network.received_bytes Il valore viene estratto dal campo del log non elaborato "PktLen" e convertito in un numero intero senza segno.
programma - Non mappato.
Proto read_only_udm.network.ip_protocol Il valore viene estratto dal campo del log non elaborato "Proto", convertito in numero intero e mappato al nome del protocollo IP corrispondente (ad es. 6 -> TCP).
SIP read_only_udm.principal.ip Il valore viene estratto dal campo del log non elaborato "SIP".
SPort read_only_udm.principal.port Il valore viene estratto dal campo del log non elaborato "SPort" e convertito in numero intero.
syslog_facility - Non mappato.
syslog_facility_code - Non mappato.
syslog_host read_only_udm.principal.ip, read_only_udm.observer.ip Il valore viene estratto dal campo del log non elaborato "syslog_host".
syslog_prog - Non mappato.
syslog_severity read_only_udm.security_result.severity_details Il valore viene estratto dal campo del log non elaborato "syslog_severity".
syslog_severity_code read_only_udm.security_result.severity Il valore viene estratto dal campo del log non elaborato "syslog_severity_code" e mappato al livello di gravità corrispondente: 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH.
syslog5424_pri - Non mappato.
Vlan-Id read_only_udm.principal.resource.id Il valore viene estratto dal campo del log non elaborato "Vlan-Id".
- read_only_udm.metadata.event_type Logica: se sono presenti "SIP" o "hostname" e "Proto", imposta il valore su "NETWORK_CONNECTION". Altrimenti, se è presente "SIP", "hostname" o "syslog_host", imposta "STATUS_UPDATE". In caso contrario, impostalo su "GENERIC_EVENT".
- read_only_udm.metadata.log_type Logic: imposta su "CISCO_ACI".
- read_only_udm.metadata.vendor_name Logic: impostalo su "Cisco".
- read_only_udm.metadata.product_name Logica: imposta su "ACI".

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.