Collecter les journaux Akamai Cloud Monitor
Ce document explique comment ingérer les journaux Akamai Cloud Monitor (Load Balancer, Traffic Shaper, ADC) dans Google Security Operations à l'aide de Google Cloud Storage. Akamai envoie des événements JSON à votre point de terminaison HTTPS. Un récepteur API Gateway + Cloud Functions écrit les événements dans GCS (JSONL, gz). L'analyseur transforme les journaux JSON en UDM. Il extrait les champs de la charge utile JSON, effectue des conversions de type de données, renomme les champs pour qu'ils correspondent au schéma UDM et gère la logique spécifique pour les champs personnalisés et la construction d'URL. Il intègre également la gestion des erreurs et la logique conditionnelle en fonction de la présence des champs.
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps
- Un projet GCP avec l'API Cloud Storage activée
- Autorisations pour créer et gérer des buckets GCS
- Autorisations permettant de gérer les stratégies IAM sur les buckets GCS
- Autorisations permettant de créer des fonctions Cloud Functions, des sujets Pub/Sub et une passerelle API
- Accès privilégié à Akamai Control Center et Property Manager
Créer un bucket Google Cloud Storage
- Accédez à la console Google Cloud.
- Sélectionnez votre projet ou créez-en un.
- Dans le menu de navigation, accédez à Cloud Storage> Buckets.
- Cliquez sur Créer un bucket.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Nommer votre bucket Saisissez un nom unique (par exemple, akamai-cloud-monitor).Type d'emplacement Choisissez en fonction de vos besoins (région, birégion ou multirégion). Emplacement Sélectionnez l'emplacement (par exemple, us-central1).Classe de stockage Standard (recommandé pour les journaux auxquels vous accédez fréquemment) Access control (Contrôle des accès) Uniforme (recommandé) Outils de protection Facultatif : Activer la gestion des versions des objets ou la règle de conservation Cliquez sur Créer.
Collecter les détails de configuration d'Akamai Cloud Monitor
Vous aurez besoin des informations suivantes provenant d'Akamai Control Center :
- Nom de la propriété dans le Gestionnaire de propriétés
- Ensembles de données Cloud Monitor à collecter
- Jeton secret partagé facultatif pour l'authentification du webhook
Créer un compte de service pour Cloud Functions
La fonction Cloud a besoin d'un compte de service disposant des autorisations nécessaires pour écrire dans le bucket GCS.
Créer un compte de service
- Dans la console GCP, accédez à IAM et administration > Comptes de service.
- Cliquez sur Créer un compte de service.
- Fournissez les informations de configuration suivantes :
- Nom du compte de service : saisissez
akamai-cloud-monitor-sa. - Description du compte de service : saisissez
Service account for Cloud Function to collect Akamai Cloud Monitor logs.
- Nom du compte de service : saisissez
- Cliquez sur Créer et continuer.
- Dans la section Autoriser ce compte de service à accéder au projet :
- Cliquez sur Sélectionner un rôle.
- Recherchez et sélectionnez Administrateur des objets de l'espace de stockage.
- Cliquez sur + Ajouter un autre rôle.
- Recherchez et sélectionnez Demandeur Cloud Run.
- Cliquez sur + Ajouter un autre rôle.
- Recherchez et sélectionnez Demandeur Cloud Functions.
- Cliquez sur Continuer.
- Cliquez sur OK.
Ces rôles sont requis pour :
- Administrateur des objets Storage : écrire des journaux dans le bucket GCS et gérer les fichiers d'état
- Demandeur Cloud Run : autorise Pub/Sub à appeler la fonction
- Demandeur Cloud Functions : autorise l'appel de fonctions
Accorder des autorisations IAM sur un bucket GCS
Accordez au compte de service des autorisations d'écriture sur le bucket GCS :
- Accédez à Cloud Storage > Buckets.
- Cliquez sur le nom de votre bucket.
- Accédez à l'onglet Autorisations.
- Cliquez sur Accorder l'accès.
- Fournissez les informations de configuration suivantes :
- Ajouter des comptes principaux : saisissez l'adresse e-mail du compte de service (par exemple,
akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com). - Attribuer des rôles : sélectionnez Administrateur des objets Storage.
- Ajouter des comptes principaux : saisissez l'adresse e-mail du compte de service (par exemple,
- Cliquez sur Enregistrer.
Créer une fonction Cloud pour recevoir les journaux Akamai
La fonction Cloud Functions reçoit les requêtes HTTP POST d'Akamai Cloud Monitor et écrit les journaux dans GCS.
- Dans la console GCP, accédez à Cloud Functions.
- Cliquez sur Créer une fonction.
Fournissez les informations de configuration suivantes :
Paramètre Valeur Environment Sélectionnez 2e génération. Nom de la fonction akamai-cloud-monitor-receiverRégion Sélectionnez la région correspondant à votre bucket GCS (par exemple, us-central1).Dans la section Déclencheur :
- Type de déclencheur : sélectionnez HTTPS.
- Authentification : sélectionnez Autoriser les appels non authentifiés (Akamai enverra des requêtes non authentifiées).
Cliquez sur Enregistrer pour enregistrer la configuration du déclencheur.
Développez Paramètres d'exécution, de compilation, de connexion et de sécurité.
Dans la section Environnement d'exécution :
- Mémoire allouée : sélectionnez 512 Mio.
- Délai avant expiration : saisissez
600secondes (10 minutes). - Compte de service d'exécution : sélectionnez le compte de service (
akamai-cloud-monitor-sa).
Dans la section Variables d'environnement d'exécution, cliquez sur + Ajouter une variable pour chacun des éléments suivants :
Nom de la variable Exemple de valeur GCS_BUCKETakamai-cloud-monitorGCS_PREFIXakamai/cloud-monitor/jsonINGEST_TOKENrandom-shared-secret(facultatif)Cliquez sur Suivant pour accéder à l'éditeur de code.
Dans le menu déroulant Environnement d'exécution, sélectionnez Python 3.12.
Ajouter un code de fonction
- Saisissez main dans Point d'entrée de la fonction.
Dans l'éditeur de code intégré, créez deux fichiers :
- Premier fichier : main.py:
import os import json import gzip import io import uuid import datetime as dt from google.cloud import storage import functions_framework GCS_BUCKET = os.environ.get("GCS_BUCKET") GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret storage_client = storage.Client() def _write_jsonl_gz(objs: list) -> str: """Write JSON objects to GCS as gzipped JSONL.""" timestamp = dt.datetime.utcnow() key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode()) buf.seek(0) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}{key}") blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip") return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}" def _parse_records_from_request(request) -> list: """Parse JSON records from HTTP request body.""" body = request.get_data(as_text=True) if not body: return [] try: data = json.loads(body) except Exception: # Accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] @functions_framework.http def main(request): """ Cloud Function HTTP handler for Akamai Cloud Monitor logs. Args: request: Flask request object Returns: Tuple of (response_body, status_code, headers) """ # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: token = request.args.get("token") if token != INGEST_TOKEN: return ("Forbidden", 403) records = _parse_records_from_request(request) if not records: return ("No content", 204) try: gcs_key = _write_jsonl_gz(records) response = { "ok": True, "gcs_key": gcs_key, "count": len(records) } return (json.dumps(response), 200, {"Content-Type": "application/json"}) except Exception as e: print(f"Error writing to GCS: {str(e)}") return (f"Internal server error: {str(e)}", 500)- Deuxième fichier : requirements.txt:
functions-framework==3.* google-cloud-storage==2.*Cliquez sur Déployer pour déployer la fonction.
Attendez la fin du déploiement (deux à trois minutes).
Après le déploiement, accédez à l'onglet Trigger (Déclencheur) et copiez l'URL du déclencheur. Vous utiliserez cette URL dans la configuration Akamai.
Configurer Akamai Cloud Monitor pour envoyer des journaux
- Connectez-vous à Akamai Control Center.
- Ouvrez votre propriété dans Property Manager.
- Cliquez sur Ajouter une règle > choisissez Gestion du cloud.
- Ajoutez Instrumentation Cloud Monitoring et sélectionnez les ensembles de données requis.
- Ajoutez Cloud Monitor Data Delivery.
Fournissez les informations de configuration suivantes :
- Nom d'hôte de diffusion : saisissez le nom d'hôte de l'URL du déclencheur de votre fonction Cloud (par exemple,
us-central1-your-project.cloudfunctions.net). Chemin d'accès de l'URL de distribution : saisissez le chemin d'accès à partir de l'URL du déclencheur de votre fonction Cloud, plus un jeton de requête facultatif :
- Sans jeton :
/akamai-cloud-monitor-receiver Avec le jeton :
/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>Remplacez
<INGEST_TOKEN>par la valeur que vous avez définie dans les variables d'environnement Cloud Functions.
- Sans jeton :
- Nom d'hôte de diffusion : saisissez le nom d'hôte de l'URL du déclencheur de votre fonction Cloud (par exemple,
Cliquez sur Enregistrer.
Cliquez sur Activer pour activer la version de la propriété.
Récupérer le compte de service Google SecOps
Google SecOps utilise un compte de service unique pour lire les données de votre bucket GCS. Vous devez accorder à ce compte de service l'accès à votre bucket.
Obtenir l'adresse e-mail du compte de service
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Cliquez sur Configurer un flux unique.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Akamai Cloud Monitor - GCS). - Sélectionnez Google Cloud Storage V2 comme Type de source.
- Sélectionnez Akamai Cloud Monitor comme type de journal.
Cliquez sur Obtenir un compte de service. Une adresse e-mail unique pour le compte de service s'affiche, par exemple :
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopiez cette adresse e-mail pour l'utiliser à l'étape suivante.
Accorder des autorisations IAM au compte de service Google SecOps
Le compte de service Google SecOps a besoin du rôle Lecteur des objets Storage sur votre bucket GCS.
- Accédez à Cloud Storage > Buckets.
- Cliquez sur le nom de votre bucket.
- Accédez à l'onglet Autorisations.
- Cliquez sur Accorder l'accès.
- Fournissez les informations de configuration suivantes :
- Ajouter des comptes principaux : collez l'adresse e-mail du compte de service Google SecOps.
- Attribuez des rôles : sélectionnez Lecteur des objets de l'espace de stockage.
Cliquez sur Enregistrer.
Configurer un flux dans Google SecOps pour ingérer les journaux Akamai Cloud Monitor
- Accédez à Paramètres SIEM> Flux.
- Cliquez sur Add New Feed (Ajouter un flux).
- Cliquez sur Configurer un flux unique.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple,
Akamai Cloud Monitor - GCS). - Sélectionnez Google Cloud Storage V2 comme Type de source.
- Sélectionnez Akamai Cloud Monitor comme type de journal.
- Cliquez sur Suivant.
Spécifiez les valeurs des paramètres d'entrée suivants :
URL du bucket Storage : saisissez l'URI du bucket GCS avec le préfixe du chemin d'accès :
gs://akamai-cloud-monitor/akamai/cloud-monitor/json/Remplacez :
akamai-cloud-monitor: nom de votre bucket GCS.akamai/cloud-monitor/json: préfixe du chemin d'accès où les journaux sont stockés (doit correspondre àGCS_PREFIXdans Cloud Functions).
Option de suppression de la source : sélectionnez l'option de suppression de votre choix :
- Jamais : ne supprime jamais aucun fichier après les transferts (recommandé pour les tests).
- Supprimer les fichiers transférés : supprime les fichiers après un transfert réussi.
Supprimer les fichiers transférés et les répertoires vides : supprime les fichiers et les répertoires vides après un transfert réussi.
Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.
Espace de noms de l'élément :
akamai.cloud_monitorÉtiquettes d'ingestion : des étiquettes sont ajoutées à tous les événements de ce flux (par exemple,
source=akamai_cloud_monitor,format=json).
Cliquez sur Suivant.
Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.
Exemples de journaux Akamai Cloud Monitor compatibles
JSON :
{ "UA": "-", "accLang": "-", "bytes": "3929", "cacheStatus": "1", "cliIP": "0.0.0.0", "cookie": "-", "cp": "848064", "customField": "-", "dnsLookupTimeMSec": "-", "errorCode": "-", "maxAgeSec": "31536000", "objSize": "3929", "overheadBytes": "240", "proto": "HTTPS", "queryStr": "-", "range": "-", "referer": "-", "reqEndTimeMSec": "4", "reqHost": "www.example.com", "reqId": "1ce83c03", "reqMethod": "GET", "reqPath": "assets/images/placeholder-tagline.png", "reqPort": "443", "reqTimeSec": "1622470405.760", "rspContentLen": "3929", "rspContentType": "image/png", "statusCode": "200", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1.2", "totalBytes": "4599", "transferTimeMSec": "0", "turnAroundTimeMSec": "0", "uncompressedSize": "-", "version": "1", "xForwardedFor": "-" }
Table de mappage UDM
| Champ de journal | Mappage UDM | Logique |
|---|---|---|
| accLang | network.http.user_agent | Mappé directement s'il n'est pas "-" ni une chaîne vide. |
| city | principal.location.city | Mappé directement s'il n'est pas "-" ni une chaîne vide. |
| cliIP | principal.ip | Mappé directement s'il ne s'agit pas d'une chaîne vide. |
| country | principal.location.country_or_region | Mappé directement s'il n'est pas "-" ni une chaîne vide. |
| cp | additional.fields | Mappé sous forme de paire clé/valeur avec la clé "cp". |
| customField | about.ip, about.labels, src.ip | Analysées sous forme de paires clé/valeur. Traitement spécial pour "eIp" et "pIp" afin de les mapper respectivement sur src.ip et about.ip. Les autres clés sont mappées en tant que libellés dans "À propos". |
| errorCode | security_result.summary, security_result.severity | Si elle est présente, définit security_result.severity sur "ERROR" et mappe la valeur sur security_result.summary. |
| geo.city | principal.location.city | Mappé directement si la ville est "-" ou une chaîne vide. |
| geo.country | principal.location.country_or_region | Mappé directement si le pays est "-" ou une chaîne vide. |
| geo.lat | principal.location.region_latitude | Mappé directement, converti en float. |
| geo.long | principal.location.region_longitude | Mappé directement, converti en float. |
| geo.region | principal.location.state | Mappé directement. |
| id | metadata.product_log_id | Mappé directement s'il ne s'agit pas d'une chaîne vide. |
| message.cliIP | principal.ip | Mappé directement si cliIP est une chaîne vide. |
| message.fwdHost | principal.hostname | Mappé directement. |
| message.reqHost | target.hostname, target.url | Utilisé pour construire target.url et extraire target.hostname. |
| message.reqLen | network.sent_bytes | Directement mappé, converti en entier non signé si totalBytes est vide ou "-". |
| message.reqMethod | network.http.method | Mappé directement si reqMethod est une chaîne vide. |
| message.reqPath | target.url | Ajouté à target.url. |
| message.reqPort | target.port | Mappé directement, converti en entier si reqPort est une chaîne vide. |
| message.respLen | network.received_bytes | Mappé directement, converti en entier non signé. |
| message.sslVer | network.tls.version | Mappé directement. |
| message.status | network.http.response_code | Mappé directement, converti en entier si statusCode est vide ou "-". |
| message.UA | network.http.user_agent | Mappé directement si UA est "-" ou une chaîne vide. |
| network.asnum | additional.fields | Mappé sous forme de paire clé-valeur avec la clé "asnum". |
| network.edgeIP | intermediary.ip | Mappé directement. |
| network.network | additional.fields | Mappé sous forme de paire clé-valeur avec la clé "network". |
| network.networkType | additional.fields | Mappé sous forme de paire clé-valeur avec la clé "networkType". |
| proto | network.application_protocol | Utilisé pour déterminer network.application_protocol. |
| queryStr | target.url | Ajouté à target.url s'il n'est pas défini sur "-" ou sur une chaîne vide. |
| URL de provenance | network.http.referral_url, about.hostname | Mappé directement si la valeur n'est pas "-". Le nom d'hôte extrait est mappé sur about.hostname. |
| reqHost | target.hostname, target.url | Utilisé pour construire target.url et extraire target.hostname. |
| reqId | metadata.product_log_id, network.session_id | Mappé directement si l'ID est une chaîne vide. Également mappé à network.session_id. |
| reqMethod | network.http.method | Mappé directement s'il ne s'agit pas d'une chaîne vide. |
| reqPath | target.url | Ajouté à target.url si la valeur n'est pas "-". |
| reqPort | target.port | Mappé directement, converti en entier. |
| reqTimeSec | metadata.event_timestamp, timestamp | Permet de définir le code temporel de l'événement. |
| start | metadata.event_timestamp, timestamp | Permet de définir l'horodatage de l'événement si reqTimeSec est une chaîne vide. |
| statusCode | network.http.response_code | Mappé directement, converti en entier si la valeur n'est pas "-" ou une chaîne vide. |
| tlsVersion | network.tls.version | Mappé directement. |
| totalBytes | network.sent_bytes | Mappé directement, converti en entier non signé s'il n'est pas vide ou "-". |
| type | metadata.product_event_type | Mappé directement. |
| UA | network.http.user_agent | Mappé directement s'il n'est pas "-" ni une chaîne vide. |
| version | metadata.product_version | Mappé directement. |
| xForwardedFor | principal.ip | Mappé directement s'il n'est pas "-" ni une chaîne vide. |
| (Logique de l'analyseur) | metadata.vendor_name | Défini sur "Akamai". |
| (Logique de l'analyseur) | metadata.product_name | Définissez la valeur sur "Cloud Monitor". |
| (Logique de l'analyseur) | metadata.event_type | Définissez-le sur "NETWORK_HTTP". |
| (Logique de l'analyseur) | metadata.product_version | Définissez la valeur sur "2" si la version est une chaîne vide. |
| (Logique de l'analyseur) | metadata.log_type | Définissez-le sur "AKAMAI_CLOUD_MONITOR". |
| (Logique de l'analyseur) | network.application_protocol | Déterminé à partir de proto ou message.proto. Définissez sur "HTTPS" si l'une des valeurs contient "HTTPS" (sans tenir compte de la casse), ou sur "HTTP" dans le cas contraire. |
| (Logique de l'analyseur) | security_result.severity | Définissez sur "INFORMATIONAL" si errorCode est "-" ou une chaîne vide. |
| (Logique de l'analyseur) | target.url | Construit à partir du protocole, de reqHost (ou message.reqHost), de reqPath (ou message.reqPath) et de queryStr. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.