收集 Akamai Cloud Monitor 日志

支持的平台:

本文档介绍了如何使用 Google Cloud Storage 将 Akamai Cloud Monitor(负载平衡器、流量整形器、ADC)日志提取到 Google Security Operations。Akamai 将 JSON 事件推送到您的 HTTPS 端点;API Gateway + Cloud Function 接收器将事件写入 GCS (JSONL, gz)。解析器将 JSON 日志转换为 UDM。它从 JSON 载荷中提取字段,执行数据类型转换,重命名字段以匹配 UDM 架构,并处理自定义字段和网址构建的特定逻辑。它还包含基于字段存在情况的错误处理和条件逻辑。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例
  • 已启用 Cloud Storage API 的 GCP 项目
  • 创建和管理 GCS 存储分区的权限
  • 管理 GCS 存储分区的 IAM 政策的权限
  • 创建 Cloud Functions、Pub/Sub 主题和 API Gateway 的权限
  • 对 Akamai Control Center 和 Property Manager 的特权访问权限

创建 Google Cloud Storage 存储分区

  1. 前往 Google Cloud 控制台
  2. 选择您的项目或创建新项目。
  3. 在导航菜单中,依次前往 Cloud Storage > 存储分区
  4. 点击创建存储分区
  5. 提供以下配置详细信息:

    设置
    为存储分区命名 输入一个全局唯一的名称(例如 akamai-cloud-monitor
    位置类型 根据您的需求进行选择(区域级、双区域级、多区域级)
    位置 选择相应位置(例如 us-central1
    存储类别 标准(建议用于经常访问的日志)
    访问权限控制 统一(推荐)
    保护工具 可选:启用对象版本控制或保留政策
  6. 点击创建

收集 Akamai Cloud Monitor 配置详细信息

您将需要 Akamai 控制中心提供以下信息:

  • Property Manager 中的房源名称
  • 需要收集的 Cloud Monitor 数据集
  • 用于 webhook 身份验证的可选共享密钥令牌

为 Cloud Functions 函数创建服务账号

Cloud Functions 函数需要一个有权写入 GCS 存储分区的服务账号。

创建服务账号

  1. GCP 控制台中,依次前往 IAM 和管理 > 服务账号
  2. 点击创建服务账号
  3. 提供以下配置详细信息:
    • 服务账号名称:输入 akamai-cloud-monitor-sa
    • 服务账号说明:输入 Service account for Cloud Function to collect Akamai Cloud Monitor logs
  4. 点击创建并继续
  5. 向此服务账号授予对项目的访问权限部分:
    1. 点击选择角色
    2. 搜索并选择 Storage Object Admin
    3. 点击 + 添加其他角色
    4. 搜索并选择 Cloud Run Invoker
    5. 点击 + 添加其他角色
    6. 搜索并选择 Cloud Functions Invoker
  6. 点击继续
  7. 点击完成

必须拥有这些角色,才能:

  • Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
  • Cloud Run Invoker:允许 Pub/Sub 调用函数
  • Cloud Functions Invoker:允许调用函数

授予对 GCS 存储分区的 IAM 权限

向服务账号授予对 GCS 存储分区的写入权限:

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:输入服务账号电子邮件地址(例如 akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 分配角色:选择 Storage Object Admin
  6. 点击保存

创建 Cloud Functions 函数以接收 Akamai 日志

Cloud Functions 从 Akamai Cloud Monitor 接收 HTTP POST 请求,并将日志写入 GCS。

  1. GCP 控制台中,前往 Cloud Functions
  2. 点击创建函数
  3. 提供以下配置详细信息:

    设置
    环境 选择第 2 代
    函数名称 akamai-cloud-monitor-receiver
    区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1
  4. 触发器部分中:

    • 触发器类型:选择 HTTPS
    • 身份验证:选择允许未通过身份验证的调用(Akamai 将发送未通过身份验证的请求)。
  5. 点击保存以保存触发器配置。

  6. 展开运行时、构建、连接和安全设置

  7. 运行时部分中:

    • 分配的内存:选择 512 MiB
    • 超时:输入 600 秒(10 分钟)。
    • 运行时服务账号:选择服务账号 (akamai-cloud-monitor-sa)。
  8. 运行时环境变量部分中,针对以下各项点击 + 添加变量

    变量名称 示例值
    GCS_BUCKET akamai-cloud-monitor
    GCS_PREFIX akamai/cloud-monitor/json
    INGEST_TOKEN random-shared-secret(可选)
  9. 点击下一步,继续前往代码编辑器。

  10. 运行时下拉列表中,选择 Python 3.12

添加函数代码

  1. 函数入口点中输入 main
  2. 在内嵌代码编辑器中,创建两个文件:

    • 第一个文件:main.py:
    import os
    import json
    import gzip
    import io
    import uuid
    import datetime as dt
    from google.cloud import storage
    import functions_framework
    
    GCS_BUCKET = os.environ.get("GCS_BUCKET")
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret
    
    storage_client = storage.Client()
    
    def _write_jsonl_gz(objs: list) -> str:
        """Write JSON objects to GCS as gzipped JSONL."""
        timestamp = dt.datetime.utcnow()
        key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
    
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode())
        buf.seek(0)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}{key}")
        blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip")
    
        return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}"
    
    def _parse_records_from_request(request) -> list:
        """Parse JSON records from HTTP request body."""
        body = request.get_data(as_text=True)
    
        if not body:
            return []
    
        try:
            data = json.loads(body)
        except Exception:
            # Accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
    
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    @functions_framework.http
    def main(request):
        """
        Cloud Function HTTP handler for Akamai Cloud Monitor logs.
    
        Args:
            request: Flask request object
    
        Returns:
            Tuple of (response_body, status_code, headers)
        """
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            token = request.args.get("token")
            if token != INGEST_TOKEN:
                return ("Forbidden", 403)
    
        records = _parse_records_from_request(request)
    
        if not records:
            return ("No content", 204)
    
        try:
            gcs_key = _write_jsonl_gz(records)
    
            response = {
                "ok": True,
                "gcs_key": gcs_key,
                "count": len(records)
            }
    
            return (json.dumps(response), 200, {"Content-Type": "application/json"})
    
        except Exception as e:
            print(f"Error writing to GCS: {str(e)}")
            return (f"Internal server error: {str(e)}", 500)
    
    • 第二个文件:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. 点击部署以部署该函数。

  4. 等待部署完成(2-3 分钟)。

  5. 部署完成后,前往触发器标签页,然后复制触发器网址。您将在 Akamai 配置中使用此网址。

配置 Akamai Cloud Monitor 以推送日志

  1. 登录 Akamai 控制中心
  2. Property Manager 中打开您的媒体资源。
  3. 点击添加规则 > 选择“云管理”
  4. 添加 Cloud Monitor Instrumentation 并选择所需的数据集
  5. 添加 Cloud Monitor 数据交付
  6. 提供以下配置详细信息:

    • 传送主机名:输入 Cloud Functions 函数触发器网址中的主机名(例如 us-central1-your-project.cloudfunctions.net)。
    • 交付网址路径:输入 Cloud Functions 函数触发器网址中的路径,以及可选的查询令牌:

      • 不含令牌:/akamai-cloud-monitor-receiver
      • 包含令牌:/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>

      • <INGEST_TOKEN> 替换为您在 Cloud Functions 环境变量中设置的值。

  7. 点击保存

  8. 点击启用以启用媒体资源版本。

检索 Google SecOps 服务账号

Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。

获取服务账号电子邮件地址

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Akamai Cloud Monitor - GCS)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Akamai Cloud Monitor 作为日志类型
  7. 点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 复制此电子邮件地址,以便在下一步中使用。

向 Google SecOps 服务账号授予 IAM 权限

Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。

  1. 前往 Cloud Storage > 存储分区
  2. 点击您的存储分区名称。
  3. 前往权限标签页。
  4. 点击授予访问权限
  5. 提供以下配置详细信息:
    • 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
    • 分配角色:选择 Storage Object Viewer
  6. 点击保存

在 Google SecOps 中配置 Feed 以提取 Akamai Cloud Monitor 日志

  1. 依次前往 SIEM 设置 > Feed
  2. 点击添加新 Feed
  3. 点击配置单个 Feed
  4. Feed 名称字段中,输入 Feed 的名称(例如 Akamai Cloud Monitor - GCS)。
  5. 选择 Google Cloud Storage V2 作为来源类型
  6. 选择 Akamai Cloud Monitor 作为日志类型
  7. 点击下一步
  8. 为以下输入参数指定值:

    • 存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:

      gs://akamai-cloud-monitor/akamai/cloud-monitor/json/
      
        • akamai-cloud-monitor:您的 GCS 存储分区名称。
        • akamai/cloud-monitor/json:存储日志的前缀路径(必须与 Cloud Functions 中的 GCS_PREFIX 匹配)。
    • 来源删除选项:根据您的偏好选择删除选项:

      • 永不:永不删除转移后的任何文件(建议用于测试)。
      • 删除已转移的文件:在成功转移后删除文件。
      • 删除已转移的文件和空目录:在成功转移后删除文件和空目录。

    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。

    • 资源命名空间akamai.cloud_monitor

    • 注入标签:标签会添加到相应 Feed 中的所有事件(例如 source=akamai_cloud_monitorformat=json)。

  9. 点击下一步

  10. 最终确定界面中查看新的 Feed 配置,然后点击提交

支持的 Akamai Cloud Monitor 示例日志

  • JSON

    {
      "UA": "-",
      "accLang": "-",
      "bytes": "3929",
      "cacheStatus": "1",
      "cliIP": "0.0.0.0",
      "cookie": "-",
      "cp": "848064",
      "customField": "-",
      "dnsLookupTimeMSec": "-",
      "errorCode": "-",
      "maxAgeSec": "31536000",
      "objSize": "3929",
      "overheadBytes": "240",
      "proto": "HTTPS",
      "queryStr": "-",
      "range": "-",
      "referer": "-",
      "reqEndTimeMSec": "4",
      "reqHost": "www.example.com",
      "reqId": "1ce83c03",
      "reqMethod": "GET",
      "reqPath": "assets/images/placeholder-tagline.png",
      "reqPort": "443",
      "reqTimeSec": "1622470405.760",
      "rspContentLen": "3929",
      "rspContentType": "image/png",
      "statusCode": "200",
      "tlsOverheadTimeMSec": "0",
      "tlsVersion": "TLSv1.2",
      "totalBytes": "4599",
      "transferTimeMSec": "0",
      "turnAroundTimeMSec": "0",
      "uncompressedSize": "-",
      "version": "1",
      "xForwardedFor": "-"
    }
    

UDM 映射表

日志字段 UDM 映射 逻辑
accLang network.http.user_agent 如果不是“-”或空字符串,则直接映射。
city principal.location.city 如果不是“-”或空字符串,则直接映射。
cliIP principal.ip 如果不是空字符串,则直接映射。
country principal.location.country_or_region 如果不是“-”或空字符串,则直接映射。
cp additional.fields 映射为键值对,键为“cp”。
customField about.ip、about.labels、src.ip 解析为键值对。特殊处理“eIp”和“pIp”,以分别映射到 src.ip 和 about.ip。其他键会映射为“关于”中的标签。
errorCode security_result.summary、security_result.severity 如果存在,则将 security_result.severity 设置为“ERROR”,并将该值映射到 security_result.summary。
geo.city principal.location.city 如果城市为“-”或空字符串,则直接映射。
geo.country principal.location.country_or_region 如果国家/地区为“-”或空字符串,则直接映射。
geo.lat principal.location.region_latitude 直接映射,转换为浮点数。
geo.long principal.location.region_longitude 直接映射,转换为浮点数。
geo.region principal.location.state 直接映射。
id metadata.product_log_id 如果不是空字符串,则直接映射。
message.cliIP principal.ip 如果 cliIP 为空字符串,则直接映射。
message.fwdHost principal.hostname 直接映射。
message.reqHost target.hostname、target.url 用于构建 target.url 和提取 target.hostname。
message.reqLen network.sent_bytes 直接映射,如果 totalBytes 为空或“-”,则转换为无符号整数。
message.reqMethod network.http.method 如果 reqMethod 为空字符串,则直接映射。
message.reqPath target.url 附加到 target.url。
message.reqPort target.port 直接映射,如果 reqPort 为空字符串,则转换为整数。
message.respLen network.received_bytes 直接映射,转换为无符号整数。
message.sslVer network.tls.version 直接映射。
message.status network.http.response_code 直接映射,如果 statusCode 为空或“-”,则转换为整数。
message.UA network.http.user_agent 如果 UA 为“-”或空字符串,则直接映射。
network.asnum additional.fields 映射为键值对,键为“asnum”。
network.edgeIP intermediary.ip 直接映射。
network.network additional.fields 映射为键值对,键为“network”。
network.networkType additional.fields 映射为键值对,键为“networkType”。
proto network.application_protocol 用于确定 network.application_protocol。
queryStr target.url 如果不是“-”或空字符串,则附加到 target.url。
引荐来源网址 network.http.referral_url, about.hostname 如果不是“-”,则直接映射。提取的主机名会映射到 about.hostname。
reqHost target.hostname、target.url 用于构建 target.url 和提取 target.hostname。
reqId metadata.product_log_id、network.session_id 如果 ID 为空字符串,则直接映射。还映射到 network.session_id。
reqMethod network.http.method 如果不是空字符串,则直接映射。
reqPath target.url 如果不是“-”,则附加到 target.url。
reqPort target.port 直接映射,转换为整数。
reqTimeSec metadata.event_timestamp, timestamp 用于设置事件时间戳。
start metadata.event_timestamp, timestamp 如果 reqTimeSec 为空字符串,则用于设置事件时间戳。
statusCode network.http.response_code 直接映射,如果不是“-”或空字符串,则转换为整数。
tlsVersion network.tls.version 直接映射。
totalBytes network.sent_bytes 直接映射,如果非空或“-”,则转换为无符号整数。
类型 metadata.product_event_type 直接映射。
UA network.http.user_agent 如果不是“-”或空字符串,则直接映射。
版本 metadata.product_version 直接映射。
xForwardedFor principal.ip 如果不是“-”或空字符串,则直接映射。
(解析器逻辑) metadata.vendor_name 设置为“Akamai”。
(解析器逻辑) metadata.product_name 设置为“云监控”。
(解析器逻辑) metadata.event_type 设置为“NETWORK_HTTP”。
(解析器逻辑) metadata.product_version 如果版本为空字符串,则设置为“2”。
(解析器逻辑) metadata.log_type 设置为“AKAMAI_CLOUD_MONITOR”。
(解析器逻辑) network.application_protocol 根据 proto 或 message.proto 确定。如果其中任一字符串包含“HTTPS”(不区分大小写),则设置为“HTTPS”,否则设置为“HTTP”。
(解析器逻辑) security_result.severity 如果 errorCode 为“-”或空字符串,则设置为“INFORMATIONAL”。
(解析器逻辑) target.url 由协议、reqHost(或 message.reqHost)、reqPath(或 message.reqPath)和 queryStr 构成。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。