Collecter les journaux Akamai Cloud Monitor

Compatible avec :

Ce document explique comment ingérer les journaux Akamai Cloud Monitor (Load Balancer, Traffic Shaper, ADC) dans Google Security Operations à l'aide de Google Cloud Storage. Akamai envoie des événements JSON à votre point de terminaison HTTPS. Un récepteur API Gateway + Cloud Functions écrit les événements dans GCS (JSONL, gz). L'analyseur transforme les journaux JSON en UDM. Il extrait les champs de la charge utile JSON, effectue des conversions de type de données, renomme les champs pour qu'ils correspondent au schéma UDM et gère la logique spécifique pour les champs personnalisés et la construction d'URL. Il intègre également la gestion des erreurs et la logique conditionnelle en fonction de la présence des champs.

Avant de commencer

Assurez-vous de remplir les conditions suivantes :

  • Une instance Google SecOps
  • Un projet GCP avec l'API Cloud Storage activée
  • Autorisations pour créer et gérer des buckets GCS
  • Autorisations permettant de gérer les stratégies IAM sur les buckets GCS
  • Autorisations permettant de créer des fonctions Cloud Functions, des sujets Pub/Sub et une passerelle API
  • Accès privilégié à Akamai Control Center et Property Manager

Créer un bucket Google Cloud Storage

  1. Accédez à la console Google Cloud.
  2. Sélectionnez votre projet ou créez-en un.
  3. Dans le menu de navigation, accédez à Cloud Storage> Buckets.
  4. Cliquez sur Créer un bucket.
  5. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Nommer votre bucket Saisissez un nom unique (par exemple, akamai-cloud-monitor).
    Type d'emplacement Choisissez en fonction de vos besoins (région, birégion ou multirégion).
    Emplacement Sélectionnez l'emplacement (par exemple, us-central1).
    Classe de stockage Standard (recommandé pour les journaux auxquels vous accédez fréquemment)
    Access control (Contrôle des accès) Uniforme (recommandé)
    Outils de protection Facultatif : Activer la gestion des versions des objets ou la règle de conservation
  6. Cliquez sur Créer.

Collecter les détails de configuration d'Akamai Cloud Monitor

Vous aurez besoin des informations suivantes provenant d'Akamai Control Center :

  • Nom de la propriété dans le Gestionnaire de propriétés
  • Ensembles de données Cloud Monitor à collecter
  • Jeton secret partagé facultatif pour l'authentification du webhook

Créer un compte de service pour Cloud Functions

La fonction Cloud a besoin d'un compte de service disposant des autorisations nécessaires pour écrire dans le bucket GCS.

Créer un compte de service

  1. Dans la console GCP, accédez à IAM et administration > Comptes de service.
  2. Cliquez sur Créer un compte de service.
  3. Fournissez les informations de configuration suivantes :
    • Nom du compte de service : saisissez akamai-cloud-monitor-sa.
    • Description du compte de service : saisissez Service account for Cloud Function to collect Akamai Cloud Monitor logs.
  4. Cliquez sur Créer et continuer.
  5. Dans la section Autoriser ce compte de service à accéder au projet :
    1. Cliquez sur Sélectionner un rôle.
    2. Recherchez et sélectionnez Administrateur des objets de l'espace de stockage.
    3. Cliquez sur + Ajouter un autre rôle.
    4. Recherchez et sélectionnez Demandeur Cloud Run.
    5. Cliquez sur + Ajouter un autre rôle.
    6. Recherchez et sélectionnez Demandeur Cloud Functions.
  6. Cliquez sur Continuer.
  7. Cliquez sur OK.

Ces rôles sont requis pour :

  • Administrateur des objets Storage : écrire des journaux dans le bucket GCS et gérer les fichiers d'état
  • Demandeur Cloud Run : autorise Pub/Sub à appeler la fonction
  • Demandeur Cloud Functions : autorise l'appel de fonctions

Accorder des autorisations IAM sur un bucket GCS

Accordez au compte de service des autorisations d'écriture sur le bucket GCS :

  1. Accédez à Cloud Storage > Buckets.
  2. Cliquez sur le nom de votre bucket.
  3. Accédez à l'onglet Autorisations.
  4. Cliquez sur Accorder l'accès.
  5. Fournissez les informations de configuration suivantes :
    • Ajouter des comptes principaux : saisissez l'adresse e-mail du compte de service (par exemple, akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Attribuer des rôles : sélectionnez Administrateur des objets Storage.
  6. Cliquez sur Enregistrer.

Créer une fonction Cloud pour recevoir les journaux Akamai

La fonction Cloud Functions reçoit les requêtes HTTP POST d'Akamai Cloud Monitor et écrit les journaux dans GCS.

  1. Dans la console GCP, accédez à Cloud Functions.
  2. Cliquez sur Créer une fonction.
  3. Fournissez les informations de configuration suivantes :

    Paramètre Valeur
    Environment Sélectionnez 2e génération.
    Nom de la fonction akamai-cloud-monitor-receiver
    Région Sélectionnez la région correspondant à votre bucket GCS (par exemple, us-central1).
  4. Dans la section Déclencheur :

    • Type de déclencheur : sélectionnez HTTPS.
    • Authentification : sélectionnez Autoriser les appels non authentifiés (Akamai enverra des requêtes non authentifiées).
  5. Cliquez sur Enregistrer pour enregistrer la configuration du déclencheur.

  6. Développez Paramètres d'exécution, de compilation, de connexion et de sécurité.

  7. Dans la section Environnement d'exécution :

    • Mémoire allouée : sélectionnez 512 Mio.
    • Délai avant expiration : saisissez 600 secondes (10 minutes).
    • Compte de service d'exécution : sélectionnez le compte de service (akamai-cloud-monitor-sa).
  8. Dans la section Variables d'environnement d'exécution, cliquez sur + Ajouter une variable pour chacun des éléments suivants :

    Nom de la variable Exemple de valeur
    GCS_BUCKET akamai-cloud-monitor
    GCS_PREFIX akamai/cloud-monitor/json
    INGEST_TOKEN random-shared-secret (facultatif)
  9. Cliquez sur Suivant pour accéder à l'éditeur de code.

  10. Dans le menu déroulant Environnement d'exécution, sélectionnez Python 3.12.

Ajouter un code de fonction

  1. Saisissez main dans Point d'entrée de la fonction.
  2. Dans l'éditeur de code intégré, créez deux fichiers :

    • Premier fichier : main.py:
    import os
    import json
    import gzip
    import io
    import uuid
    import datetime as dt
    from google.cloud import storage
    import functions_framework
    
    GCS_BUCKET = os.environ.get("GCS_BUCKET")
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret
    
    storage_client = storage.Client()
    
    def _write_jsonl_gz(objs: list) -> str:
        """Write JSON objects to GCS as gzipped JSONL."""
        timestamp = dt.datetime.utcnow()
        key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
    
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode())
        buf.seek(0)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}{key}")
        blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip")
    
        return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}"
    
    def _parse_records_from_request(request) -> list:
        """Parse JSON records from HTTP request body."""
        body = request.get_data(as_text=True)
    
        if not body:
            return []
    
        try:
            data = json.loads(body)
        except Exception:
            # Accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
    
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    @functions_framework.http
    def main(request):
        """
        Cloud Function HTTP handler for Akamai Cloud Monitor logs.
    
        Args:
            request: Flask request object
    
        Returns:
            Tuple of (response_body, status_code, headers)
        """
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            token = request.args.get("token")
            if token != INGEST_TOKEN:
                return ("Forbidden", 403)
    
        records = _parse_records_from_request(request)
    
        if not records:
            return ("No content", 204)
    
        try:
            gcs_key = _write_jsonl_gz(records)
    
            response = {
                "ok": True,
                "gcs_key": gcs_key,
                "count": len(records)
            }
    
            return (json.dumps(response), 200, {"Content-Type": "application/json"})
    
        except Exception as e:
            print(f"Error writing to GCS: {str(e)}")
            return (f"Internal server error: {str(e)}", 500)
    
    • Deuxième fichier : requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. Cliquez sur Déployer pour déployer la fonction.

  4. Attendez la fin du déploiement (deux à trois minutes).

  5. Après le déploiement, accédez à l'onglet Trigger (Déclencheur) et copiez l'URL du déclencheur. Vous utiliserez cette URL dans la configuration Akamai.

Configurer Akamai Cloud Monitor pour envoyer des journaux

  1. Connectez-vous à Akamai Control Center.
  2. Ouvrez votre propriété dans Property Manager.
  3. Cliquez sur Ajouter une règle > choisissez Gestion du cloud.
  4. Ajoutez Instrumentation Cloud Monitoring et sélectionnez les ensembles de données requis.
  5. Ajoutez Cloud Monitor Data Delivery.
  6. Fournissez les informations de configuration suivantes :

    • Nom d'hôte de diffusion : saisissez le nom d'hôte de l'URL du déclencheur de votre fonction Cloud (par exemple, us-central1-your-project.cloudfunctions.net).
    • Chemin d'accès de l'URL de distribution : saisissez le chemin d'accès à partir de l'URL du déclencheur de votre fonction Cloud, plus un jeton de requête facultatif :

      • Sans jeton : /akamai-cloud-monitor-receiver
      • Avec le jeton : /akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>

      • Remplacez <INGEST_TOKEN> par la valeur que vous avez définie dans les variables d'environnement Cloud Functions.

  7. Cliquez sur Enregistrer.

  8. Cliquez sur Activer pour activer la version de la propriété.

Récupérer le compte de service Google SecOps

Google SecOps utilise un compte de service unique pour lire les données de votre bucket GCS. Vous devez accorder à ce compte de service l'accès à votre bucket.

Obtenir l'adresse e-mail du compte de service

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Cliquez sur Configurer un flux unique.
  4. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Akamai Cloud Monitor - GCS).
  5. Sélectionnez Google Cloud Storage V2 comme Type de source.
  6. Sélectionnez Akamai Cloud Monitor comme type de journal.
  7. Cliquez sur Obtenir un compte de service. Une adresse e-mail unique pour le compte de service s'affiche, par exemple :

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copiez cette adresse e-mail pour l'utiliser à l'étape suivante.

Accorder des autorisations IAM au compte de service Google SecOps

Le compte de service Google SecOps a besoin du rôle Lecteur des objets Storage sur votre bucket GCS.

  1. Accédez à Cloud Storage > Buckets.
  2. Cliquez sur le nom de votre bucket.
  3. Accédez à l'onglet Autorisations.
  4. Cliquez sur Accorder l'accès.
  5. Fournissez les informations de configuration suivantes :
    • Ajouter des comptes principaux : collez l'adresse e-mail du compte de service Google SecOps.
    • Attribuez des rôles : sélectionnez Lecteur des objets de l'espace de stockage.
  6. Cliquez sur Enregistrer.

Configurer un flux dans Google SecOps pour ingérer les journaux Akamai Cloud Monitor

  1. Accédez à Paramètres SIEM> Flux.
  2. Cliquez sur Add New Feed (Ajouter un flux).
  3. Cliquez sur Configurer un flux unique.
  4. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Akamai Cloud Monitor - GCS).
  5. Sélectionnez Google Cloud Storage V2 comme Type de source.
  6. Sélectionnez Akamai Cloud Monitor comme type de journal.
  7. Cliquez sur Suivant.
  8. Spécifiez les valeurs des paramètres d'entrée suivants :

    • URL du bucket Storage : saisissez l'URI du bucket GCS avec le préfixe du chemin d'accès :

      gs://akamai-cloud-monitor/akamai/cloud-monitor/json/
      
      • Remplacez :

        • akamai-cloud-monitor : nom de votre bucket GCS.
        • akamai/cloud-monitor/json : préfixe du chemin d'accès où les journaux sont stockés (doit correspondre à GCS_PREFIX dans Cloud Functions).
    • Option de suppression de la source : sélectionnez l'option de suppression de votre choix :

      • Jamais : ne supprime jamais aucun fichier après les transferts (recommandé pour les tests).
      • Supprimer les fichiers transférés : supprime les fichiers après un transfert réussi.
      • Supprimer les fichiers transférés et les répertoires vides : supprime les fichiers et les répertoires vides après un transfert réussi.

    • Âge maximal des fichiers : incluez les fichiers modifiés au cours des derniers jours. La valeur par défaut est de 180 jours.

    • Espace de noms de l'élément : akamai.cloud_monitor

    • Étiquettes d'ingestion : des étiquettes sont ajoutées à tous les événements de ce flux (par exemple, source=akamai_cloud_monitor, format=json).

  9. Cliquez sur Suivant.

  10. Vérifiez la configuration de votre nouveau flux sur l'écran Finaliser, puis cliquez sur Envoyer.

Exemples de journaux Akamai Cloud Monitor compatibles

  • JSON :

    {
      "UA": "-",
      "accLang": "-",
      "bytes": "3929",
      "cacheStatus": "1",
      "cliIP": "0.0.0.0",
      "cookie": "-",
      "cp": "848064",
      "customField": "-",
      "dnsLookupTimeMSec": "-",
      "errorCode": "-",
      "maxAgeSec": "31536000",
      "objSize": "3929",
      "overheadBytes": "240",
      "proto": "HTTPS",
      "queryStr": "-",
      "range": "-",
      "referer": "-",
      "reqEndTimeMSec": "4",
      "reqHost": "www.example.com",
      "reqId": "1ce83c03",
      "reqMethod": "GET",
      "reqPath": "assets/images/placeholder-tagline.png",
      "reqPort": "443",
      "reqTimeSec": "1622470405.760",
      "rspContentLen": "3929",
      "rspContentType": "image/png",
      "statusCode": "200",
      "tlsOverheadTimeMSec": "0",
      "tlsVersion": "TLSv1.2",
      "totalBytes": "4599",
      "transferTimeMSec": "0",
      "turnAroundTimeMSec": "0",
      "uncompressedSize": "-",
      "version": "1",
      "xForwardedFor": "-"
    }
    

Table de mappage UDM

Champ de journal Mappage UDM Logique
accLang network.http.user_agent Mappé directement s'il n'est pas "-" ni une chaîne vide.
city principal.location.city Mappé directement s'il n'est pas "-" ni une chaîne vide.
cliIP principal.ip Mappé directement s'il ne s'agit pas d'une chaîne vide.
country principal.location.country_or_region Mappé directement s'il n'est pas "-" ni une chaîne vide.
cp additional.fields Mappé sous forme de paire clé/valeur avec la clé "cp".
customField about.ip, about.labels, src.ip Analysées sous forme de paires clé/valeur. Traitement spécial pour "eIp" et "pIp" afin de les mapper respectivement sur src.ip et about.ip. Les autres clés sont mappées en tant que libellés dans "À propos".
errorCode security_result.summary, security_result.severity Si elle est présente, définit security_result.severity sur "ERROR" et mappe la valeur sur security_result.summary.
geo.city principal.location.city Mappé directement si la ville est "-" ou une chaîne vide.
geo.country principal.location.country_or_region Mappé directement si le pays est "-" ou une chaîne vide.
geo.lat principal.location.region_latitude Mappé directement, converti en float.
geo.long principal.location.region_longitude Mappé directement, converti en float.
geo.region principal.location.state Mappé directement.
id metadata.product_log_id Mappé directement s'il ne s'agit pas d'une chaîne vide.
message.cliIP principal.ip Mappé directement si cliIP est une chaîne vide.
message.fwdHost principal.hostname Mappé directement.
message.reqHost target.hostname, target.url Utilisé pour construire target.url et extraire target.hostname.
message.reqLen network.sent_bytes Directement mappé, converti en entier non signé si totalBytes est vide ou "-".
message.reqMethod network.http.method Mappé directement si reqMethod est une chaîne vide.
message.reqPath target.url Ajouté à target.url.
message.reqPort target.port Mappé directement, converti en entier si reqPort est une chaîne vide.
message.respLen network.received_bytes Mappé directement, converti en entier non signé.
message.sslVer network.tls.version Mappé directement.
message.status network.http.response_code Mappé directement, converti en entier si statusCode est vide ou "-".
message.UA network.http.user_agent Mappé directement si UA est "-" ou une chaîne vide.
network.asnum additional.fields Mappé sous forme de paire clé-valeur avec la clé "asnum".
network.edgeIP intermediary.ip Mappé directement.
network.network additional.fields Mappé sous forme de paire clé-valeur avec la clé "network".
network.networkType additional.fields Mappé sous forme de paire clé-valeur avec la clé "networkType".
proto network.application_protocol Utilisé pour déterminer network.application_protocol.
queryStr target.url Ajouté à target.url s'il n'est pas défini sur "-" ou sur une chaîne vide.
URL de provenance network.http.referral_url, about.hostname Mappé directement si la valeur n'est pas "-". Le nom d'hôte extrait est mappé sur about.hostname.
reqHost target.hostname, target.url Utilisé pour construire target.url et extraire target.hostname.
reqId metadata.product_log_id, network.session_id Mappé directement si l'ID est une chaîne vide. Également mappé à network.session_id.
reqMethod network.http.method Mappé directement s'il ne s'agit pas d'une chaîne vide.
reqPath target.url Ajouté à target.url si la valeur n'est pas "-".
reqPort target.port Mappé directement, converti en entier.
reqTimeSec metadata.event_timestamp, timestamp Permet de définir le code temporel de l'événement.
start metadata.event_timestamp, timestamp Permet de définir l'horodatage de l'événement si reqTimeSec est une chaîne vide.
statusCode network.http.response_code Mappé directement, converti en entier si la valeur n'est pas "-" ou une chaîne vide.
tlsVersion network.tls.version Mappé directement.
totalBytes network.sent_bytes Mappé directement, converti en entier non signé s'il n'est pas vide ou "-".
type metadata.product_event_type Mappé directement.
UA network.http.user_agent Mappé directement s'il n'est pas "-" ni une chaîne vide.
version metadata.product_version Mappé directement.
xForwardedFor principal.ip Mappé directement s'il n'est pas "-" ni une chaîne vide.
(Logique de l'analyseur) metadata.vendor_name Défini sur "Akamai".
(Logique de l'analyseur) metadata.product_name Définissez la valeur sur "Cloud Monitor".
(Logique de l'analyseur) metadata.event_type Définissez-le sur "NETWORK_HTTP".
(Logique de l'analyseur) metadata.product_version Définissez la valeur sur "2" si la version est une chaîne vide.
(Logique de l'analyseur) metadata.log_type Définissez-le sur "AKAMAI_CLOUD_MONITOR".
(Logique de l'analyseur) network.application_protocol Déterminé à partir de proto ou message.proto. Définissez sur "HTTPS" si l'une des valeurs contient "HTTPS" (sans tenir compte de la casse), ou sur "HTTP" dans le cas contraire.
(Logique de l'analyseur) security_result.severity Définissez sur "INFORMATIONAL" si errorCode est "-" ou une chaîne vide.
(Logique de l'analyseur) target.url Construit à partir du protocole, de reqHost (ou message.reqHost), de reqPath (ou message.reqPath) et de queryStr.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.