收集 Akamai Cloud Monitor 日志
本文档介绍了如何使用 Google Cloud Storage 将 Akamai Cloud Monitor(负载平衡器、流量整形器、ADC)日志提取到 Google Security Operations。Akamai 将 JSON 事件推送到您的 HTTPS 端点;API Gateway + Cloud Function 接收器将事件写入 GCS (JSONL, gz)。解析器将 JSON 日志转换为 UDM。它从 JSON 载荷中提取字段,执行数据类型转换,重命名字段以匹配 UDM 架构,并处理自定义字段和网址构建的特定逻辑。它还包含基于字段存在情况的错误处理和条件逻辑。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 已启用 Cloud Storage API 的 GCP 项目
- 创建和管理 GCS 存储分区的权限
- 管理 GCS 存储分区的 IAM 政策的权限
- 创建 Cloud Functions、Pub/Sub 主题和 API Gateway 的权限
- 对 Akamai Control Center 和 Property Manager 的特权访问权限
创建 Google Cloud Storage 存储分区
- 前往 Google Cloud 控制台。
- 选择您的项目或创建新项目。
- 在导航菜单中,依次前往 Cloud Storage > 存储分区。
- 点击创建存储分区。
提供以下配置详细信息:
设置 值 为存储分区命名 输入一个全局唯一的名称(例如 akamai-cloud-monitor)位置类型 根据您的需求进行选择(区域级、双区域级、多区域级) 位置 选择相应位置(例如 us-central1)存储类别 标准(建议用于经常访问的日志) 访问权限控制 统一(推荐) 保护工具 可选:启用对象版本控制或保留政策 点击创建。
收集 Akamai Cloud Monitor 配置详细信息
您将需要 Akamai 控制中心提供以下信息:
- Property Manager 中的房源名称
- 需要收集的 Cloud Monitor 数据集
- 用于 webhook 身份验证的可选共享密钥令牌
为 Cloud Functions 函数创建服务账号
Cloud Functions 函数需要一个有权写入 GCS 存储分区的服务账号。
创建服务账号
- 在 GCP 控制台中,依次前往 IAM 和管理 > 服务账号。
- 点击创建服务账号。
- 提供以下配置详细信息:
- 服务账号名称:输入
akamai-cloud-monitor-sa。 - 服务账号说明:输入
Service account for Cloud Function to collect Akamai Cloud Monitor logs。
- 服务账号名称:输入
- 点击创建并继续。
- 在向此服务账号授予对项目的访问权限部分:
- 点击选择角色。
- 搜索并选择 Storage Object Admin。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Run Invoker。
- 点击 + 添加其他角色。
- 搜索并选择 Cloud Functions Invoker。
- 点击继续。
- 点击完成。
必须拥有这些角色,才能:
- Storage Object Admin:将日志写入 GCS 存储分区并管理状态文件
- Cloud Run Invoker:允许 Pub/Sub 调用函数
- Cloud Functions Invoker:允许调用函数
授予对 GCS 存储分区的 IAM 权限
向服务账号授予对 GCS 存储分区的写入权限:
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:输入服务账号电子邮件地址(例如
akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 分配角色:选择 Storage Object Admin。
- 添加主账号:输入服务账号电子邮件地址(例如
- 点击保存。
创建 Cloud Functions 函数以接收 Akamai 日志
Cloud Functions 从 Akamai Cloud Monitor 接收 HTTP POST 请求,并将日志写入 GCS。
- 在 GCP 控制台中,前往 Cloud Functions。
- 点击创建函数。
提供以下配置详细信息:
设置 值 环境 选择第 2 代 函数名称 akamai-cloud-monitor-receiver区域 选择与您的 GCS 存储分区匹配的区域(例如 us-central1)在触发器部分中:
- 触发器类型:选择 HTTPS。
- 身份验证:选择允许未通过身份验证的调用(Akamai 将发送未通过身份验证的请求)。
点击保存以保存触发器配置。
展开运行时、构建、连接和安全设置。
在运行时部分中:
- 分配的内存:选择 512 MiB。
- 超时:输入
600秒(10 分钟)。 - 运行时服务账号:选择服务账号 (
akamai-cloud-monitor-sa)。
在运行时环境变量部分中,针对以下各项点击 + 添加变量:
变量名称 示例值 GCS_BUCKETakamai-cloud-monitorGCS_PREFIXakamai/cloud-monitor/jsonINGEST_TOKENrandom-shared-secret(可选)点击下一步,继续前往代码编辑器。
在运行时下拉列表中,选择 Python 3.12。
添加函数代码
- 在函数入口点中输入 main
在内嵌代码编辑器中,创建两个文件:
- 第一个文件:main.py::
import os import json import gzip import io import uuid import datetime as dt from google.cloud import storage import functions_framework GCS_BUCKET = os.environ.get("GCS_BUCKET") GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret storage_client = storage.Client() def _write_jsonl_gz(objs: list) -> str: """Write JSON objects to GCS as gzipped JSONL.""" timestamp = dt.datetime.utcnow() key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode()) buf.seek(0) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}{key}") blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip") return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}" def _parse_records_from_request(request) -> list: """Parse JSON records from HTTP request body.""" body = request.get_data(as_text=True) if not body: return [] try: data = json.loads(body) except Exception: # Accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] @functions_framework.http def main(request): """ Cloud Function HTTP handler for Akamai Cloud Monitor logs. Args: request: Flask request object Returns: Tuple of (response_body, status_code, headers) """ # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: token = request.args.get("token") if token != INGEST_TOKEN: return ("Forbidden", 403) records = _parse_records_from_request(request) if not records: return ("No content", 204) try: gcs_key = _write_jsonl_gz(records) response = { "ok": True, "gcs_key": gcs_key, "count": len(records) } return (json.dumps(response), 200, {"Content-Type": "application/json"}) except Exception as e: print(f"Error writing to GCS: {str(e)}") return (f"Internal server error: {str(e)}", 500)- 第二个文件:requirements.txt::
functions-framework==3.* google-cloud-storage==2.*点击部署以部署该函数。
等待部署完成(2-3 分钟)。
部署完成后,前往触发器标签页,然后复制触发器网址。您将在 Akamai 配置中使用此网址。
配置 Akamai Cloud Monitor 以推送日志
- 登录 Akamai 控制中心。
- 在 Property Manager 中打开您的媒体资源。
- 点击添加规则 > 选择“云管理”。
- 添加 Cloud Monitor Instrumentation 并选择所需的数据集。
- 添加 Cloud Monitor 数据交付。
提供以下配置详细信息:
- 传送主机名:输入 Cloud Functions 函数触发器网址中的主机名(例如
us-central1-your-project.cloudfunctions.net)。 交付网址路径:输入 Cloud Functions 函数触发器网址中的路径,以及可选的查询令牌:
- 不含令牌:
/akamai-cloud-monitor-receiver 包含令牌:
/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>将
<INGEST_TOKEN>替换为您在 Cloud Functions 环境变量中设置的值。
- 不含令牌:
- 传送主机名:输入 Cloud Functions 函数触发器网址中的主机名(例如
点击保存。
点击启用以启用媒体资源版本。
检索 Google SecOps 服务账号
Google SecOps 使用唯一的服务账号从您的 GCS 存储分区中读取数据。您必须授予此服务账号对您的存储分区的访问权限。
获取服务账号电子邮件地址
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Akamai Cloud Monitor - GCS)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Akamai Cloud Monitor 作为日志类型。
点击获取服务账号。系统会显示一个唯一的服务账号电子邮件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com复制此电子邮件地址,以便在下一步中使用。
向 Google SecOps 服务账号授予 IAM 权限
Google SecOps 服务账号需要对您的 GCS 存储分区具有 Storage Object Viewer 角色。
- 前往 Cloud Storage > 存储分区。
- 点击您的存储分区名称。
- 前往权限标签页。
- 点击授予访问权限。
- 提供以下配置详细信息:
- 添加主账号:粘贴 Google SecOps 服务账号电子邮件地址。
- 分配角色:选择 Storage Object Viewer。
点击保存。
在 Google SecOps 中配置 Feed 以提取 Akamai Cloud Monitor 日志
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 点击配置单个 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Akamai Cloud Monitor - GCS)。 - 选择 Google Cloud Storage V2 作为来源类型。
- 选择 Akamai Cloud Monitor 作为日志类型。
- 点击下一步。
为以下输入参数指定值:
存储分区网址:输入带有前缀路径的 GCS 存储分区 URI:
gs://akamai-cloud-monitor/akamai/cloud-monitor/json/将
akamai-cloud-monitor:您的 GCS 存储分区名称。akamai/cloud-monitor/json:存储日志的前缀路径(必须与 Cloud Functions 中的GCS_PREFIX匹配)。
来源删除选项:根据您的偏好选择删除选项:
- 永不:永不删除转移后的任何文件(建议用于测试)。
- 删除已转移的文件:在成功转移后删除文件。
删除已转移的文件和空目录:在成功转移后删除文件和空目录。
文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
资源命名空间:
akamai.cloud_monitor注入标签:标签会添加到相应 Feed 中的所有事件(例如
source=akamai_cloud_monitor、format=json)。
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
支持的 Akamai Cloud Monitor 示例日志
JSON:
{ "UA": "-", "accLang": "-", "bytes": "3929", "cacheStatus": "1", "cliIP": "0.0.0.0", "cookie": "-", "cp": "848064", "customField": "-", "dnsLookupTimeMSec": "-", "errorCode": "-", "maxAgeSec": "31536000", "objSize": "3929", "overheadBytes": "240", "proto": "HTTPS", "queryStr": "-", "range": "-", "referer": "-", "reqEndTimeMSec": "4", "reqHost": "www.example.com", "reqId": "1ce83c03", "reqMethod": "GET", "reqPath": "assets/images/placeholder-tagline.png", "reqPort": "443", "reqTimeSec": "1622470405.760", "rspContentLen": "3929", "rspContentType": "image/png", "statusCode": "200", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1.2", "totalBytes": "4599", "transferTimeMSec": "0", "turnAroundTimeMSec": "0", "uncompressedSize": "-", "version": "1", "xForwardedFor": "-" }
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
| accLang | network.http.user_agent | 如果不是“-”或空字符串,则直接映射。 |
| city | principal.location.city | 如果不是“-”或空字符串,则直接映射。 |
| cliIP | principal.ip | 如果不是空字符串,则直接映射。 |
| country | principal.location.country_or_region | 如果不是“-”或空字符串,则直接映射。 |
| cp | additional.fields | 映射为键值对,键为“cp”。 |
| customField | about.ip、about.labels、src.ip | 解析为键值对。特殊处理“eIp”和“pIp”,以分别映射到 src.ip 和 about.ip。其他键会映射为“关于”中的标签。 |
| errorCode | security_result.summary、security_result.severity | 如果存在,则将 security_result.severity 设置为“ERROR”,并将该值映射到 security_result.summary。 |
| geo.city | principal.location.city | 如果城市为“-”或空字符串,则直接映射。 |
| geo.country | principal.location.country_or_region | 如果国家/地区为“-”或空字符串,则直接映射。 |
| geo.lat | principal.location.region_latitude | 直接映射,转换为浮点数。 |
| geo.long | principal.location.region_longitude | 直接映射,转换为浮点数。 |
| geo.region | principal.location.state | 直接映射。 |
| id | metadata.product_log_id | 如果不是空字符串,则直接映射。 |
| message.cliIP | principal.ip | 如果 cliIP 为空字符串,则直接映射。 |
| message.fwdHost | principal.hostname | 直接映射。 |
| message.reqHost | target.hostname、target.url | 用于构建 target.url 和提取 target.hostname。 |
| message.reqLen | network.sent_bytes | 直接映射,如果 totalBytes 为空或“-”,则转换为无符号整数。 |
| message.reqMethod | network.http.method | 如果 reqMethod 为空字符串,则直接映射。 |
| message.reqPath | target.url | 附加到 target.url。 |
| message.reqPort | target.port | 直接映射,如果 reqPort 为空字符串,则转换为整数。 |
| message.respLen | network.received_bytes | 直接映射,转换为无符号整数。 |
| message.sslVer | network.tls.version | 直接映射。 |
| message.status | network.http.response_code | 直接映射,如果 statusCode 为空或“-”,则转换为整数。 |
| message.UA | network.http.user_agent | 如果 UA 为“-”或空字符串,则直接映射。 |
| network.asnum | additional.fields | 映射为键值对,键为“asnum”。 |
| network.edgeIP | intermediary.ip | 直接映射。 |
| network.network | additional.fields | 映射为键值对,键为“network”。 |
| network.networkType | additional.fields | 映射为键值对,键为“networkType”。 |
| proto | network.application_protocol | 用于确定 network.application_protocol。 |
| queryStr | target.url | 如果不是“-”或空字符串,则附加到 target.url。 |
| 引荐来源网址 | network.http.referral_url, about.hostname | 如果不是“-”,则直接映射。提取的主机名会映射到 about.hostname。 |
| reqHost | target.hostname、target.url | 用于构建 target.url 和提取 target.hostname。 |
| reqId | metadata.product_log_id、network.session_id | 如果 ID 为空字符串,则直接映射。还映射到 network.session_id。 |
| reqMethod | network.http.method | 如果不是空字符串,则直接映射。 |
| reqPath | target.url | 如果不是“-”,则附加到 target.url。 |
| reqPort | target.port | 直接映射,转换为整数。 |
| reqTimeSec | metadata.event_timestamp, timestamp | 用于设置事件时间戳。 |
| start | metadata.event_timestamp, timestamp | 如果 reqTimeSec 为空字符串,则用于设置事件时间戳。 |
| statusCode | network.http.response_code | 直接映射,如果不是“-”或空字符串,则转换为整数。 |
| tlsVersion | network.tls.version | 直接映射。 |
| totalBytes | network.sent_bytes | 直接映射,如果非空或“-”,则转换为无符号整数。 |
| 类型 | metadata.product_event_type | 直接映射。 |
| UA | network.http.user_agent | 如果不是“-”或空字符串,则直接映射。 |
| 版本 | metadata.product_version | 直接映射。 |
| xForwardedFor | principal.ip | 如果不是“-”或空字符串,则直接映射。 |
| (解析器逻辑) | metadata.vendor_name | 设置为“Akamai”。 |
| (解析器逻辑) | metadata.product_name | 设置为“云监控”。 |
| (解析器逻辑) | metadata.event_type | 设置为“NETWORK_HTTP”。 |
| (解析器逻辑) | metadata.product_version | 如果版本为空字符串,则设置为“2”。 |
| (解析器逻辑) | metadata.log_type | 设置为“AKAMAI_CLOUD_MONITOR”。 |
| (解析器逻辑) | network.application_protocol | 根据 proto 或 message.proto 确定。如果其中任一字符串包含“HTTPS”(不区分大小写),则设置为“HTTPS”,否则设置为“HTTP”。 |
| (解析器逻辑) | security_result.severity | 如果 errorCode 为“-”或空字符串,则设置为“INFORMATIONAL”。 |
| (解析器逻辑) | target.url | 由协议、reqHost(或 message.reqHost)、reqPath(或 message.reqPath)和 queryStr 构成。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。