Raccogliere i log di Akamai Cloud Monitor
Questo documento spiega come importare i log di Akamai Cloud Monitor (bilanciatore del carico, Traffic Shaper, ADC) in Google Security Operations utilizzando Google Cloud Storage. Akamai esegue il push degli eventi JSON all'endpoint HTTPS; un ricevitore API Gateway + Cloud Functions scrive gli eventi in GCS (JSONL, gz). Il parser trasforma i log JSON in UDM. Estrae i campi dal payload JSON, esegue le conversioni dei tipi di dati, rinomina i campi in modo che corrispondano allo schema UDM e gestisce la logica specifica per i campi personalizzati e la creazione degli URL. Incorpora anche la gestione degli errori e la logica condizionale in base alla presenza dei campi.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Un'istanza Google SecOps
- Un progetto GCP con l'API Cloud Storage abilitata
- Autorizzazioni per creare e gestire bucket GCS
- Autorizzazioni per gestire le policy IAM nei bucket GCS
- Autorizzazioni per creare Cloud Functions, argomenti Pub/Sub e API Gateway
- Accesso con privilegi ad Akamai Control Center e Property Manager
Creazione di un bucket Google Cloud Storage
- Vai alla console Google Cloud.
- Seleziona il tuo progetto o creane uno nuovo.
- Nel menu di navigazione, vai a Cloud Storage > Bucket.
- Fai clic su Crea bucket.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Assegna un nome al bucket Inserisci un nome univoco globale (ad esempio akamai-cloud-monitor).Tipo di località Scegli in base alle tue esigenze (regione singola, doppia regione, più regioni) Località Seleziona la posizione (ad esempio, us-central1).Classe di archiviazione Standard (consigliato per i log a cui si accede di frequente) Controllo dell'accesso Uniforme (consigliato) Strumenti di protezione (Facoltativo) Attiva il controllo delle versioni degli oggetti o la policy di conservazione Fai clic su Crea.
Raccogli i dettagli di configurazione di Akamai Cloud Monitor
Avrai bisogno delle seguenti informazioni da Akamai Control Center:
- Nome della proprietà in Gestore immobiliare
- Set di dati Cloud Monitor obbligatori da raccogliere
- Token del secret condiviso facoltativo per l'autenticazione webhook
Crea un service account per Cloud Functions
La funzione Cloud richiede un service account con autorizzazioni per scrivere nel bucket GCS.
Crea service account
- Nella console Google Cloud, vai a IAM e amministrazione > Service Accounts.
- Fai clic su Crea service account.
- Fornisci i seguenti dettagli di configurazione:
- Nome del service account: inserisci
akamai-cloud-monitor-sa. - Descrizione service account: inserisci
Service account for Cloud Function to collect Akamai Cloud Monitor logs.
- Nome del service account: inserisci
- Fai clic su Crea e continua.
- Nella sezione Concedi a questo service account l'accesso al progetto:
- Fai clic su Seleziona un ruolo.
- Cerca e seleziona Amministratore oggetti di archiviazione.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Cloud Run Invoker.
- Fai clic su + Aggiungi un altro ruolo.
- Cerca e seleziona Invoker di Cloud Functions.
- Fai clic su Continua.
- Fai clic su Fine.
Questi ruoli sono necessari per:
- Amministratore oggetti Storage: scrive i log nel bucket GCS e gestisce i file di stato
- Cloud Run Invoker: consente a Pub/Sub di richiamare la funzione
- Cloud Functions Invoker: consente la chiamata di funzioni
Concedi autorizzazioni IAM sul bucket GCS
Concedi al service account le autorizzazioni di scrittura sul bucket GCS:
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket.
- Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: inserisci l'email del service account (ad es.
akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com). - Assegna i ruoli: seleziona Storage Object Admin.
- Aggiungi entità: inserisci l'email del service account (ad es.
- Fai clic su Salva.
Crea una funzione Cloud Functions per ricevere i log Akamai
La funzione Cloud Functions riceve richieste HTTP POST da Akamai Cloud Monitor e scrive i log in GCS.
- Nella console GCP, vai a Cloud Functions.
- Fai clic su Crea funzione.
Fornisci i seguenti dettagli di configurazione:
Impostazione Valore Ambiente Seleziona 2ª gen.. Nome della funzione akamai-cloud-monitor-receiverRegione Seleziona la regione corrispondente al tuo bucket GCS (ad esempio us-central1)Nella sezione Trigger:
- Tipo di trigger: seleziona HTTPS.
- Autenticazione: seleziona Consenti chiamate non autenticate (Akamai invierà richieste non autenticate).
Fai clic su Salva per salvare la configurazione dell'attivatore.
Espandi Impostazioni di runtime, build, connessioni e sicurezza.
Nella sezione Runtime:
- Memoria allocata: seleziona 512 MiB.
- Timeout: inserisci
600secondi (10 minuti). - Service account di runtime: seleziona il service account (
akamai-cloud-monitor-sa).
Nella sezione Variabili di ambiente runtime, fai clic su + Aggiungi variabile per ciascuna delle seguenti variabili:
Nome variabile Valore di esempio GCS_BUCKETakamai-cloud-monitorGCS_PREFIXakamai/cloud-monitor/jsonINGEST_TOKEN(Facoltativo) random-shared-secretFai clic su Avanti per passare all'editor di codice.
Nel menu a discesa Runtime, seleziona Python 3.12.
Aggiungi codice per la funzione
- Inserisci main in Entry point della funzione
Nell'editor di codice incorporato, crea due file:
- Primo file: main.py:
import os import json import gzip import io import uuid import datetime as dt from google.cloud import storage import functions_framework GCS_BUCKET = os.environ.get("GCS_BUCKET") GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret storage_client = storage.Client() def _write_jsonl_gz(objs: list) -> str: """Write JSON objects to GCS as gzipped JSONL.""" timestamp = dt.datetime.utcnow() key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode()) buf.seek(0) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}{key}") blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip") return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}" def _parse_records_from_request(request) -> list: """Parse JSON records from HTTP request body.""" body = request.get_data(as_text=True) if not body: return [] try: data = json.loads(body) except Exception: # Accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] @functions_framework.http def main(request): """ Cloud Function HTTP handler for Akamai Cloud Monitor logs. Args: request: Flask request object Returns: Tuple of (response_body, status_code, headers) """ # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: token = request.args.get("token") if token != INGEST_TOKEN: return ("Forbidden", 403) records = _parse_records_from_request(request) if not records: return ("No content", 204) try: gcs_key = _write_jsonl_gz(records) response = { "ok": True, "gcs_key": gcs_key, "count": len(records) } return (json.dumps(response), 200, {"Content-Type": "application/json"}) except Exception as e: print(f"Error writing to GCS: {str(e)}") return (f"Internal server error: {str(e)}", 500)- Secondo file: requirements.txt::
functions-framework==3.* google-cloud-storage==2.*Fai clic su Esegui il deployment per eseguire il deployment della funzione.
Attendi il completamento del deployment (2-3 minuti).
Dopo il deployment, vai alla scheda Trigger e copia l'URL trigger. Utilizzerai questo URL nella configurazione di Akamai.
Configura Akamai Cloud Monitor per il push dei log
- Accedi ad Akamai Control Center.
- Apri la proprietà in Property Manager.
- Fai clic su Aggiungi regola > scegli Cloud Management.
- Aggiungi Cloud Monitor Instrumentation e seleziona i set di dati richiesti.
- Aggiungi Cloud Monitor Data Delivery.
Fornisci i seguenti dettagli di configurazione:
- Nome host di pubblicazione: inserisci il nome host dall'URL di attivazione di Cloud Functions (ad esempio,
us-central1-your-project.cloudfunctions.net). Percorso URL di pubblicazione: inserisci il percorso dall'URL di attivazione di Cloud Functions più il token di query facoltativo:
- Senza token:
/akamai-cloud-monitor-receiver Con token:
/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>Sostituisci
<INGEST_TOKEN>con il valore impostato nelle variabili di ambiente di Cloud Functions.
- Senza token:
- Nome host di pubblicazione: inserisci il nome host dall'URL di attivazione di Cloud Functions (ad esempio,
Fai clic su Salva.
Fai clic su Attiva per attivare la versione della proprietà.
Recuperare il service account Google SecOps
Google SecOps utilizza un service account univoco per leggere i dati dal tuo bucket GCS. Devi concedere a questo service account l'accesso al tuo bucket.
Recuperare l'email del service account
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Akamai Cloud Monitor - GCS). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona Akamai Cloud Monitor come Tipo di log.
Fai clic su Ottieni service account. Viene visualizzata un'email del service account univoca, ad esempio:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia questo indirizzo email per utilizzarlo nel passaggio successivo.
Concedi le autorizzazioni IAM al service account Google SecOps
Il service account Google SecOps deve avere il ruolo Visualizzatore oggetti Storage nel bucket GCS.
- Vai a Cloud Storage > Bucket.
- Fai clic sul nome del bucket.
- Vai alla scheda Autorizzazioni.
- Fai clic su Concedi l'accesso.
- Fornisci i seguenti dettagli di configurazione:
- Aggiungi entità: incolla l'email del service account Google SecOps.
- Assegna i ruoli: seleziona Visualizzatore oggetti Storage.
Fai clic su Salva.
Configura un feed in Google SecOps per importare i log di Akamai Cloud Monitor
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo feed.
- Fai clic su Configura un singolo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Akamai Cloud Monitor - GCS). - Seleziona Google Cloud Storage V2 come Tipo di origine.
- Seleziona Akamai Cloud Monitor come Tipo di log.
- Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
URL del bucket di archiviazione: inserisci l'URI del bucket GCS con il percorso del prefisso:
gs://akamai-cloud-monitor/akamai/cloud-monitor/json/Sostituisci:
akamai-cloud-monitor: il nome del bucket GCS.akamai/cloud-monitor/json: il percorso del prefisso in cui sono archiviati i log (deve corrispondere aGCS_PREFIXnella Funzione Cloud).
Opzione di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze:
- Mai: non elimina mai i file dopo i trasferimenti (opzione consigliata per i test).
- Elimina file trasferiti: elimina i file dopo il trasferimento riuscito.
Elimina file trasferiti e directory vuote: elimina i file e le directory vuote dopo il trasferimento riuscito.
Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
Spazio dei nomi dell'asset:
akamai.cloud_monitorEtichette di importazione: le etichette vengono aggiunte a tutti gli eventi di questo feed (ad esempio
source=akamai_cloud_monitor,format=json).
Fai clic su Avanti.
Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Log di esempio di Akamai Cloud Monitor supportati
JSON:
{ "UA": "-", "accLang": "-", "bytes": "3929", "cacheStatus": "1", "cliIP": "0.0.0.0", "cookie": "-", "cp": "848064", "customField": "-", "dnsLookupTimeMSec": "-", "errorCode": "-", "maxAgeSec": "31536000", "objSize": "3929", "overheadBytes": "240", "proto": "HTTPS", "queryStr": "-", "range": "-", "referer": "-", "reqEndTimeMSec": "4", "reqHost": "www.example.com", "reqId": "1ce83c03", "reqMethod": "GET", "reqPath": "assets/images/placeholder-tagline.png", "reqPort": "443", "reqTimeSec": "1622470405.760", "rspContentLen": "3929", "rspContentType": "image/png", "statusCode": "200", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1.2", "totalBytes": "4599", "transferTimeMSec": "0", "turnAroundTimeMSec": "0", "uncompressedSize": "-", "version": "1", "xForwardedFor": "-" }
Tabella di mappatura UDM
| Campo log | Mappatura UDM | Funzione logica |
|---|---|---|
| accLang | network.http.user_agent | Mappato direttamente se non è "-" o una stringa vuota. |
| city | principal.location.city | Mappato direttamente se non è "-" o una stringa vuota. |
| cliIP | principal.ip | Mappato direttamente se non è una stringa vuota. |
| country | principal.location.country_or_region | Mappato direttamente se non è "-" o una stringa vuota. |
| cp | additional.fields | Mappato come coppia chiave-valore con la chiave "cp". |
| customField | about.ip, about.labels, src.ip | Analizzati come coppie chiave-valore. Gestione speciale di "eIp" e "pIp" da mappare rispettivamente a src.ip e about.ip. Altre chiavi sono mappate come etichette nella sezione Informazioni. |
| errorCode | security_result.summary, security_result.severity | Se presente, imposta security_result.severity su "ERROR" e mappa il valore a security_result.summary. |
| geo.city | principal.location.city | Mappato direttamente se la città è "-" o una stringa vuota. |
| geo.country | principal.location.country_or_region | Mappato direttamente se il paese è "-" o una stringa vuota. |
| geo.lat | principal.location.region_latitude | Mappato direttamente, convertito in float. |
| geo.long | principal.location.region_longitude | Mappato direttamente, convertito in float. |
| geo.region | principal.location.state | Mappato direttamente. |
| id | metadata.product_log_id | Mappato direttamente se non è una stringa vuota. |
| message.cliIP | principal.ip | Mappato direttamente se cliIP è una stringa vuota. |
| message.fwdHost | principal.hostname | Mappato direttamente. |
| message.reqHost | target.hostname, target.url | Utilizzato per creare target.url ed estrarre target.hostname. |
| message.reqLen | network.sent_bytes | Mappato direttamente, convertito in numero intero senza segno se totalBytes è vuoto o "-". |
| message.reqMethod | network.http.method | Mappato direttamente se reqMethod è una stringa vuota. |
| message.reqPath | target.url | Aggiunto a target.url. |
| message.reqPort | target.port | Mappato direttamente, convertito in numero intero se reqPort è una stringa vuota. |
| message.respLen | network.received_bytes | Mappato direttamente, convertito in numero intero senza segno. |
| message.sslVer | network.tls.version | Mappato direttamente. |
| message.status | network.http.response_code | Mappato direttamente, convertito in numero intero se statusCode è vuoto o "-". |
| message.UA | network.http.user_agent | Mappato direttamente se UA è "-" o una stringa vuota. |
| network.asnum | additional.fields | Mappato come coppia chiave-valore con la chiave "asnum". |
| network.edgeIP | intermediary.ip | Mappato direttamente. |
| network.network | additional.fields | Mappato come coppia chiave-valore con la chiave "network". |
| network.networkType | additional.fields | Mappato come coppia chiave-valore con la chiave "networkType". |
| proto | network.application_protocol | Utilizzato per determinare network.application_protocol. |
| queryStr | target.url | Aggiunto a target.url se non è "-" o una stringa vuota. |
| referer | network.http.referral_url, about.hostname | Mappato direttamente se non è "-". Il nome host estratto viene mappato a about.hostname. |
| reqHost | target.hostname, target.url | Utilizzato per creare target.url ed estrarre target.hostname. |
| reqId | metadata.product_log_id, network.session_id | Mappato direttamente se l'ID è una stringa vuota. Mappato anche a network.session_id. |
| reqMethod | network.http.method | Mappato direttamente se non è una stringa vuota. |
| reqPath | target.url | Aggiunto a target.url se non è "-". |
| reqPort | target.port | Mappato direttamente, convertito in numero intero. |
| reqTimeSec | metadata.event_timestamp, timestamp | Utilizzato per impostare il timestamp dell'evento. |
| start | metadata.event_timestamp, timestamp | Utilizzato per impostare il timestamp dell'evento se reqTimeSec è una stringa vuota. |
| statusCode | network.http.response_code | Mappato direttamente, convertito in numero intero se non è "-" o una stringa vuota. |
| tlsVersion | network.tls.version | Mappato direttamente. |
| totalBytes | network.sent_bytes | Mappato direttamente, convertito in numero intero senza segno se non è vuoto o "-". |
| tipo | metadata.product_event_type | Mappato direttamente. |
| UA | network.http.user_agent | Mappato direttamente se non è "-" o una stringa vuota. |
| versione | metadata.product_version | Mappato direttamente. |
| xForwardedFor | principal.ip | Mappato direttamente se non è "-" o una stringa vuota. |
| (Parser Logic) | metadata.vendor_name | Impostato su "Akamai". |
| (Parser Logic) | metadata.product_name | Imposta "Cloud Monitor". |
| (Parser Logic) | metadata.event_type | Imposta su "NETWORK_HTTP". |
| (Parser Logic) | metadata.product_version | Imposta il valore su "2" se la versione è una stringa vuota. |
| (Parser Logic) | metadata.log_type | Imposta il valore su "AKAMAI_CLOUD_MONITOR". |
| (Parser Logic) | network.application_protocol | Determinato da proto o message.proto. Imposta "HTTPS" se uno dei due contiene "HTTPS" (senza distinzione tra maiuscole e minuscole), altrimenti "HTTP". |
| (Parser Logic) | security_result.severity | Imposta "INFORMATIONAL" se errorCode è "-" o una stringa vuota. |
| (Parser Logic) | target.url | Costruito a partire da protocollo, reqHost (o message.reqHost), reqPath (o message.reqPath) e queryStr. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.