收集 Akamai Cloud Monitor 記錄
本文說明如何使用 Google Cloud Storage,將 Akamai Cloud Monitor (Load Balancer、Traffic Shaper、ADC) 記錄檔擷取至 Google Security Operations。Akamai 會將 JSON 事件推送至 HTTPS 端點;API Gateway + Cloud Function 接收器會將事件寫入 GCS (JSONL、gz)。剖析器會將 JSON 記錄轉換為 UDM。這項功能會從 JSON 酬載中擷取欄位、執行資料類型轉換、重新命名欄位以符合 UDM 結構定義,並處理自訂欄位和網址建構的特定邏輯。此外,這項功能還會根據欄位是否存在,納入錯誤處理機制和條件式邏輯。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- 已啟用 Cloud Storage API 的 GCP 專案
- 建立及管理 GCS 值區的權限
- 管理 Google Cloud Storage 值區 IAM 政策的權限
- 建立 Cloud Functions、Pub/Sub 主題和 API 閘道的權限
- Akamai Control Center 和 Property Manager 的特殊存取權
建立 Google Cloud Storage 值區
- 前往 Google Cloud 控制台。
- 選取專案或建立新專案。
- 在導覽選單中,依序前往「Cloud Storage」>「Bucket」。
- 按一下「建立值區」。
請提供下列設定詳細資料:
設定 值 為 bucket 命名 輸入全域不重複的名稱 (例如 akamai-cloud-monitor)位置類型 根據需求選擇 (區域、雙區域、多區域) 位置 選取位置 (例如 us-central1)儲存空間級別 標準 (建議用於經常存取的記錄) 存取控管 統一 (建議) 保護工具 選用:啟用物件版本管理或保留政策 點選「建立」。
收集 Akamai Cloud Monitor 設定詳細資料
您需要從 Akamai Control Center 取得下列資訊:
- 物業管理工具中的房源名稱
- 必須收集的 Cloud Monitor 資料集
- Webhook 驗證用的選填共用密鑰權杖
建立 Cloud Functions 服務帳戶
Cloud Function 需要具備 GCS bucket 寫入權限的服務帳戶。
建立服務帳戶
- 在 GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)。
- 按一下 [Create Service Account] (建立服務帳戶)。
- 請提供下列設定詳細資料:
- 服務帳戶名稱:輸入
akamai-cloud-monitor-sa。 - 服務帳戶說明:輸入
Service account for Cloud Function to collect Akamai Cloud Monitor logs。
- 服務帳戶名稱:輸入
- 按一下「建立並繼續」。
- 在「將專案存取權授予這個服務帳戶」部分:
- 按一下「選擇角色」。
- 搜尋並選取「Storage 物件管理員」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Run Invoker」。
- 點選「+ 新增其他角色」。
- 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)。
- 按一下「繼續」。
- 按一下 [完成]。
這些角色適用於:
- Storage 物件管理員:將記錄檔寫入 GCS 值區,並管理狀態檔案
- Cloud Run 叫用者:允許 Pub/Sub 叫用函式
- Cloud Functions 叫用者:允許函式叫用
授予 GCS 值區的 IAM 權限
授予服務帳戶 GCS bucket 的寫入權限:
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱。
- 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:輸入服務帳戶電子郵件地址 (例如
akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com)。 - 指派角色:選取「Storage 物件管理員」。
- 新增主體:輸入服務帳戶電子郵件地址 (例如
- 按一下 [儲存]。
建立 Cloud 函式來接收 Akamai 記錄
Cloud 函式會接收 Akamai Cloud Monitor 的 HTTP POST 要求,並將記錄寫入 GCS。
- 前往 GCP 主控台的「Cloud Functions」。
- 按一下「Create function」(建立函式)。
請提供下列設定詳細資料:
設定 值 環境 選取「第 2 代」 函式名稱 akamai-cloud-monitor-receiver區域 選取與 GCS bucket 相符的區域 (例如 us-central1)在「Trigger」(觸發條件) 專區:
- 觸發條件類型:選取「HTTPS」。
- 驗證:選取「允許未經驗證的叫用」 (Akamai 會傳送未經驗證的要求)。
按一下「儲存」,儲存觸發條件設定。
展開「執行階段、建構作業、連線和安全性設定」。
在「執行階段」部分:
- 配置的記憶體:選取「512 MiB」。
- 逾時:輸入
600秒 (10 分鐘)。 - 執行階段服務帳戶:選取服務帳戶 (
akamai-cloud-monitor-sa)。
在「執行階段環境變數」部分,針對下列各項點選「+ 新增變數」:
變數名稱 範例值 GCS_BUCKETakamai-cloud-monitorGCS_PREFIXakamai/cloud-monitor/jsonINGEST_TOKENrandom-shared-secret(非必要)按一下「下一步」前往程式碼編輯器。
在「執行階段」下拉式選單中,選取「Python 3.12」。
新增函式程式碼
- 在「Function entry point」(函式進入點) 中輸入 main
在內嵌程式碼編輯器中建立兩個檔案:
- 第一個檔案:main.py:
import os import json import gzip import io import uuid import datetime as dt from google.cloud import storage import functions_framework GCS_BUCKET = os.environ.get("GCS_BUCKET") GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret storage_client = storage.Client() def _write_jsonl_gz(objs: list) -> str: """Write JSON objects to GCS as gzipped JSONL.""" timestamp = dt.datetime.utcnow() key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode()) buf.seek(0) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}{key}") blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip") return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}" def _parse_records_from_request(request) -> list: """Parse JSON records from HTTP request body.""" body = request.get_data(as_text=True) if not body: return [] try: data = json.loads(body) except Exception: # Accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] @functions_framework.http def main(request): """ Cloud Function HTTP handler for Akamai Cloud Monitor logs. Args: request: Flask request object Returns: Tuple of (response_body, status_code, headers) """ # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: token = request.args.get("token") if token != INGEST_TOKEN: return ("Forbidden", 403) records = _parse_records_from_request(request) if not records: return ("No content", 204) try: gcs_key = _write_jsonl_gz(records) response = { "ok": True, "gcs_key": gcs_key, "count": len(records) } return (json.dumps(response), 200, {"Content-Type": "application/json"}) except Exception as e: print(f"Error writing to GCS: {str(e)}") return (f"Internal server error: {str(e)}", 500)- 第二個檔案:requirements.txt:
functions-framework==3.* google-cloud-storage==2.*按一下「Deploy」(部署) 即可部署函式。
等待部署作業完成 (2 到 3 分鐘)。
部署完成後,請前往「觸發條件」分頁,然後複製「觸發條件網址」。您將在 Akamai 設定中使用此網址。
設定 Akamai Cloud Monitor,推送記錄
- 登入 Akamai Control Center。
- 在資源管理工具中開啟資源。
- 按一下「新增規則」> 選擇「雲端管理」。
- 新增 Cloud Monitor Instrumentation,然後選取所需的資料集。
- 新增 Cloud Monitor Data Delivery。
請提供下列設定詳細資料:
- 放送主機名稱:輸入 Cloud Functions 觸發條件網址的主機名稱 (例如
us-central1-your-project.cloudfunctions.net)。 傳送網址路徑:輸入 Cloud Functions 觸發網址的路徑,以及選用的查詢權杖:
- 沒有權杖:
/akamai-cloud-monitor-receiver 使用權杖:
/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>將
<INGEST_TOKEN>替換為您在 Cloud Functions 環境變數中設定的值。
- 沒有權杖:
- 放送主機名稱:輸入 Cloud Functions 觸發條件網址的主機名稱 (例如
按一下 [儲存]。
按一下「啟用」,啟用資源版本。
擷取 Google SecOps 服務帳戶
Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。
取得服務帳戶電子郵件地址
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Akamai Cloud Monitor - GCS)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Akamai Cloud Monitor」做為「記錄類型」。
按一下「取得服務帳戶」。系統會顯示專屬的服務帳戶電子郵件地址,例如:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com複製這個電子郵件地址,以便在下一步中使用。
將 IAM 權限授予 Google SecOps 服務帳戶
Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。
- 依序前往「Cloud Storage」>「Buckets」。
- 按一下 bucket 名稱。
- 前往「權限」分頁標籤。
- 按一下「授予存取權」。
- 請提供下列設定詳細資料:
- 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
- 指派角色:選取「Storage 物件檢視者」。
按一下 [儲存]。
在 Google SecOps 中設定資訊提供,擷取 Akamai Cloud Monitor 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「新增動態消息」。
- 按一下「設定單一動態饋給」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Akamai Cloud Monitor - GCS)。 - 選取「Google Cloud Storage V2」做為「來源類型」。
- 選取「Akamai Cloud Monitor」做為「記錄類型」。
- 點選 [下一步]。
指定下列輸入參數的值:
儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:
gs://akamai-cloud-monitor/akamai/cloud-monitor/json/取代:
akamai-cloud-monitor:您的 GCS bucket 名稱。akamai/cloud-monitor/json:儲存記錄的前置路徑 (必須與 Cloud Function 中的GCS_PREFIX相符)。
來源刪除選項:根據偏好設定選取刪除選項:
- 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
- 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。
檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
資產命名空間:
akamai.cloud_monitor擷取標籤:標籤會新增至這個動態饋給中的所有事件 (例如
source=akamai_cloud_monitor、format=json)。
點選 [下一步]。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
支援的 Akamai Cloud Monitor 範例記錄
JSON:
{ "UA": "-", "accLang": "-", "bytes": "3929", "cacheStatus": "1", "cliIP": "0.0.0.0", "cookie": "-", "cp": "848064", "customField": "-", "dnsLookupTimeMSec": "-", "errorCode": "-", "maxAgeSec": "31536000", "objSize": "3929", "overheadBytes": "240", "proto": "HTTPS", "queryStr": "-", "range": "-", "referer": "-", "reqEndTimeMSec": "4", "reqHost": "www.example.com", "reqId": "1ce83c03", "reqMethod": "GET", "reqPath": "assets/images/placeholder-tagline.png", "reqPort": "443", "reqTimeSec": "1622470405.760", "rspContentLen": "3929", "rspContentType": "image/png", "statusCode": "200", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1.2", "totalBytes": "4599", "transferTimeMSec": "0", "turnAroundTimeMSec": "0", "uncompressedSize": "-", "version": "1", "xForwardedFor": "-" }
UDM 對應表
| 記錄欄位 | UDM 對應 | 邏輯 |
|---|---|---|
| accLang | network.http.user_agent | 如果不是「-」或空字串,則直接對應。 |
| 城市 | principal.location.city | 如果不是「-」或空字串,則直接對應。 |
| cliIP | principal.ip | 如果不是空字串,則直接對應。 |
| 國家/地區 | principal.location.country_or_region | 如果不是「-」或空字串,則直接對應。 |
| cp | additional.fields | 以鍵/值組合形式對應,鍵為「cp」。 |
| customField | about.ip、about.labels、src.ip | 系統會將這些值剖析為鍵/值組合。特殊處理「eIp」和「pIp」,分別對應至 src.ip 和 about.ip。其他鍵會對應為 about 內的標籤。 |
| errorCode | security_result.summary、security_result.severity | 如果存在,則將 security_result.severity 設為「ERROR」,並將值對應至 security_result.summary。 |
| geo.city | principal.location.city | 如果城市為「-」或空字串,則直接對應。 |
| geo.country | principal.location.country_or_region | 如果國家/地區為「-」或空字串,則直接對應。 |
| geo.lat | principal.location.region_latitude | 直接對應,並轉換為浮點數。 |
| geo.long | principal.location.region_longitude | 直接對應,並轉換為浮點數。 |
| geo.region | principal.location.state | 直接對應。 |
| id | metadata.product_log_id | 如果不是空字串,則直接對應。 |
| message.cliIP | principal.ip | 如果 cliIP 是空字串,則直接對應。 |
| message.fwdHost | principal.hostname | 直接對應。 |
| message.reqHost | target.hostname、target.url | 用於建構 target.url 和擷取 target.hostname。 |
| message.reqLen | network.sent_bytes | 直接對應,如果 totalBytes 為空或「-」,則會轉換為不帶正負號的整數。 |
| message.reqMethod | network.http.method | 如果 reqMethod 是空字串,則直接對應。 |
| message.reqPath | target.url | 附加至 target.url。 |
| message.reqPort | target.port | 直接對應,如果 reqPort 是空字串,則轉換為整數。 |
| message.respLen | network.received_bytes | 直接對應,轉換為不帶正負號的整數。 |
| message.sslVer | network.tls.version | 直接對應。 |
| message.status | network.http.response_code | 直接對應,如果 statusCode 為空或「-」,則轉換為整數。 |
| message.UA | network.http.user_agent | 如果 UA 為「-」或空字串,則直接對應。 |
| network.asnum | additional.fields | 以鍵/值組合形式對應,鍵為「asnum」。 |
| network.edgeIP | intermediary.ip | 直接對應。 |
| network.network | additional.fields | 以鍵/值組合形式對應,索引鍵為「network」。 |
| network.networkType | additional.fields | 以鍵/值組合形式對應,索引鍵為「networkType」。 |
| proto | network.application_protocol | 用於判斷 network.application_protocol。 |
| queryStr | target.url | 如果不是「-」或空字串,則會附加至 target.url。 |
| 參照網址 | network.http.referral_url、about.hostname | 如果不是「-」,則直接對應。擷取的主機名稱會對應至 about.hostname。 |
| reqHost | target.hostname、target.url | 用於建構 target.url 和擷取 target.hostname。 |
| reqId | metadata.product_log_id、network.session_id | 如果 ID 是空字串,則直接對應。也對應至 network.session_id。 |
| reqMethod | network.http.method | 如果不是空字串,則直接對應。 |
| reqPath | target.url | 如果不是「-」,則會附加至 target.url。 |
| reqPort | target.port | 直接對應,並轉換為整數。 |
| reqTimeSec | metadata.event_timestamp、timestamp | 用於設定事件時間戳記。 |
| start | metadata.event_timestamp、timestamp | 如果 reqTimeSec 是空白字串,可用於設定事件時間戳記。 |
| statusCode | network.http.response_code | 直接對應,如果不是「-」或空字串,則轉換為整數。 |
| tlsVersion | network.tls.version | 直接對應。 |
| totalBytes | network.sent_bytes | 直接對應,如果不是空白或「-」,則轉換為不帶正負號的整數。 |
| 類型 | metadata.product_event_type | 直接對應。 |
| UA | network.http.user_agent | 如果不是「-」或空字串,則直接對應。 |
| 版本 | metadata.product_version | 直接對應。 |
| xForwardedFor | principal.ip | 如果不是「-」或空字串,則直接對應。 |
| (剖析器邏輯) | metadata.vendor_name | 設為「Akamai」。 |
| (剖析器邏輯) | metadata.product_name | 設為「Cloud Monitor」。 |
| (剖析器邏輯) | metadata.event_type | 設為「NETWORK_HTTP」。 |
| (剖析器邏輯) | metadata.product_version | 如果版本是空字串,請設為「2」。 |
| (剖析器邏輯) | metadata.log_type | 設為「AKAMAI_CLOUD_MONITOR」。 |
| (剖析器邏輯) | network.application_protocol | 從 proto 或 message.proto 判斷。如果兩者皆包含「HTTPS」(不區分大小寫),則設為「HTTPS」,否則設為「HTTP」。 |
| (剖析器邏輯) | security_result.severity | 如果 errorCode 為「-」或空字串,請設為「INFORMATIONAL」。 |
| (剖析器邏輯) | target.url | 由通訊協定、reqHost (或 message.reqHost)、reqPath (或 message.reqPath) 和 queryStr 建構而成。 |
需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。