收集 Akamai Cloud Monitor 記錄

支援的國家/地區:

本文說明如何使用 Google Cloud Storage,將 Akamai Cloud Monitor (Load Balancer、Traffic Shaper、ADC) 記錄檔擷取至 Google Security Operations。Akamai 會將 JSON 事件推送至 HTTPS 端點;API Gateway + Cloud Function 接收器會將事件寫入 GCS (JSONL、gz)。剖析器會將 JSON 記錄轉換為 UDM。這項功能會從 JSON 酬載中擷取欄位、執行資料類型轉換、重新命名欄位以符合 UDM 結構定義,並處理自訂欄位和網址建構的特定邏輯。此外,這項功能還會根據欄位是否存在,納入錯誤處理機制和條件式邏輯。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體
  • 已啟用 Cloud Storage API 的 GCP 專案
  • 建立及管理 GCS 值區的權限
  • 管理 Google Cloud Storage 值區 IAM 政策的權限
  • 建立 Cloud Functions、Pub/Sub 主題和 API 閘道的權限
  • Akamai Control Center 和 Property Manager 的特殊存取權

建立 Google Cloud Storage 值區

  1. 前往 Google Cloud 控制台
  2. 選取專案或建立新專案。
  3. 在導覽選單中,依序前往「Cloud Storage」>「Bucket」
  4. 按一下「建立值區」
  5. 請提供下列設定詳細資料:

    設定
    為 bucket 命名 輸入全域不重複的名稱 (例如 akamai-cloud-monitor)
    位置類型 根據需求選擇 (區域、雙區域、多區域)
    位置 選取位置 (例如 us-central1)
    儲存空間級別 標準 (建議用於經常存取的記錄)
    存取控管 統一 (建議)
    保護工具 選用:啟用物件版本管理或保留政策
  6. 點選「建立」

收集 Akamai Cloud Monitor 設定詳細資料

您需要從 Akamai Control Center 取得下列資訊:

  • 物業管理工具中的房源名稱
  • 必須收集的 Cloud Monitor 資料集
  • Webhook 驗證用的選填共用密鑰權杖

建立 Cloud Functions 服務帳戶

Cloud Function 需要具備 GCS bucket 寫入權限的服務帳戶。

建立服務帳戶

  1. GCP 主控台中,依序前往「IAM & Admin」(IAM 與管理) >「Service Accounts」(服務帳戶)
  2. 按一下 [Create Service Account] (建立服務帳戶)
  3. 請提供下列設定詳細資料:
    • 服務帳戶名稱:輸入 akamai-cloud-monitor-sa
    • 服務帳戶說明:輸入 Service account for Cloud Function to collect Akamai Cloud Monitor logs
  4. 按一下「建立並繼續」
  5. 在「將專案存取權授予這個服務帳戶」部分:
    1. 按一下「選擇角色」
    2. 搜尋並選取「Storage 物件管理員」
    3. 點選「+ 新增其他角色」
    4. 搜尋並選取「Cloud Run Invoker」
    5. 點選「+ 新增其他角色」
    6. 搜尋並選取「Cloud Functions Invoker」(Cloud Functions 叫用者)
  6. 按一下「繼續」
  7. 按一下 [完成]。

這些角色適用於:

  • Storage 物件管理員:將記錄檔寫入 GCS 值區,並管理狀態檔案
  • Cloud Run 叫用者:允許 Pub/Sub 叫用函式
  • Cloud Functions 叫用者:允許函式叫用

授予 GCS 值區的 IAM 權限

授予服務帳戶 GCS bucket 的寫入權限:

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:輸入服務帳戶電子郵件地址 (例如 akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com)。
    • 指派角色:選取「Storage 物件管理員」
  6. 按一下 [儲存]

建立 Cloud 函式來接收 Akamai 記錄

Cloud 函式會接收 Akamai Cloud Monitor 的 HTTP POST 要求,並將記錄寫入 GCS。

  1. 前往 GCP 主控台的「Cloud Functions」
  2. 按一下「Create function」(建立函式)
  3. 請提供下列設定詳細資料:

    設定
    環境 選取「第 2 代」
    函式名稱 akamai-cloud-monitor-receiver
    區域 選取與 GCS bucket 相符的區域 (例如 us-central1)
  4. 在「Trigger」(觸發條件) 專區:

    • 觸發條件類型:選取「HTTPS」
    • 驗證:選取「允許未經驗證的叫用」 (Akamai 會傳送未經驗證的要求)。
  5. 按一下「儲存」,儲存觸發條件設定。

  6. 展開「執行階段、建構作業、連線和安全性設定」

  7. 在「執行階段」部分:

    • 配置的記憶體:選取「512 MiB」
    • 逾時:輸入 600 秒 (10 分鐘)。
    • 執行階段服務帳戶:選取服務帳戶 (akamai-cloud-monitor-sa)。
  8. 在「執行階段環境變數」部分,針對下列各項點選「+ 新增變數」

    變數名稱 範例值
    GCS_BUCKET akamai-cloud-monitor
    GCS_PREFIX akamai/cloud-monitor/json
    INGEST_TOKEN random-shared-secret (非必要)
  9. 按一下「下一步」前往程式碼編輯器。

  10. 在「執行階段」下拉式選單中,選取「Python 3.12」

新增函式程式碼

  1. 在「Function entry point」(函式進入點) 中輸入 main
  2. 在內嵌程式碼編輯器中建立兩個檔案:

    • 第一個檔案:main.py:
    import os
    import json
    import gzip
    import io
    import uuid
    import datetime as dt
    from google.cloud import storage
    import functions_framework
    
    GCS_BUCKET = os.environ.get("GCS_BUCKET")
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret
    
    storage_client = storage.Client()
    
    def _write_jsonl_gz(objs: list) -> str:
        """Write JSON objects to GCS as gzipped JSONL."""
        timestamp = dt.datetime.utcnow()
        key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
    
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode())
        buf.seek(0)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}{key}")
        blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip")
    
        return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}"
    
    def _parse_records_from_request(request) -> list:
        """Parse JSON records from HTTP request body."""
        body = request.get_data(as_text=True)
    
        if not body:
            return []
    
        try:
            data = json.loads(body)
        except Exception:
            # Accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
    
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    @functions_framework.http
    def main(request):
        """
        Cloud Function HTTP handler for Akamai Cloud Monitor logs.
    
        Args:
            request: Flask request object
    
        Returns:
            Tuple of (response_body, status_code, headers)
        """
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            token = request.args.get("token")
            if token != INGEST_TOKEN:
                return ("Forbidden", 403)
    
        records = _parse_records_from_request(request)
    
        if not records:
            return ("No content", 204)
    
        try:
            gcs_key = _write_jsonl_gz(records)
    
            response = {
                "ok": True,
                "gcs_key": gcs_key,
                "count": len(records)
            }
    
            return (json.dumps(response), 200, {"Content-Type": "application/json"})
    
        except Exception as e:
            print(f"Error writing to GCS: {str(e)}")
            return (f"Internal server error: {str(e)}", 500)
    
    • 第二個檔案:requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. 按一下「Deploy」(部署) 即可部署函式。

  4. 等待部署作業完成 (2 到 3 分鐘)。

  5. 部署完成後,請前往「觸發條件」分頁,然後複製「觸發條件網址」。您將在 Akamai 設定中使用此網址。

設定 Akamai Cloud Monitor,推送記錄

  1. 登入 Akamai Control Center
  2. 資源管理工具中開啟資源。
  3. 按一下「新增規則」> 選擇「雲端管理」
  4. 新增 Cloud Monitor Instrumentation,然後選取所需的資料集
  5. 新增 Cloud Monitor Data Delivery
  6. 請提供下列設定詳細資料:

    • 放送主機名稱:輸入 Cloud Functions 觸發條件網址的主機名稱 (例如 us-central1-your-project.cloudfunctions.net)。
    • 傳送網址路徑:輸入 Cloud Functions 觸發網址的路徑,以及選用的查詢權杖:

      • 沒有權杖:/akamai-cloud-monitor-receiver
      • 使用權杖:/akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>

      • <INGEST_TOKEN> 替換為您在 Cloud Functions 環境變數中設定的值。

  7. 按一下 [儲存]

  8. 按一下「啟用」,啟用資源版本。

擷取 Google SecOps 服務帳戶

Google SecOps 會使用專屬服務帳戶,從 GCS bucket 讀取資料。您必須授予這個服務帳戶值區存取權。

取得服務帳戶電子郵件地址

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Akamai Cloud Monitor - GCS)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Akamai Cloud Monitor」做為「記錄類型」
  7. 按一下「取得服務帳戶」。系統會顯示專屬的服務帳戶電子郵件地址,例如:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 複製這個電子郵件地址,以便在下一步中使用。

將 IAM 權限授予 Google SecOps 服務帳戶

Google SecOps 服務帳戶需要 GCS bucket 的「Storage 物件檢視者」角色。

  1. 依序前往「Cloud Storage」>「Buckets」
  2. 按一下 bucket 名稱。
  3. 前往「權限」分頁標籤。
  4. 按一下「授予存取權」
  5. 請提供下列設定詳細資料:
    • 新增主體:貼上 Google SecOps 服務帳戶電子郵件地址。
    • 指派角色:選取「Storage 物件檢視者」
  6. 按一下 [儲存]

在 Google SecOps 中設定資訊提供,擷取 Akamai Cloud Monitor 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「新增動態消息」
  3. 按一下「設定單一動態饋給」
  4. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Akamai Cloud Monitor - GCS)。
  5. 選取「Google Cloud Storage V2」做為「來源類型」
  6. 選取「Akamai Cloud Monitor」做為「記錄類型」
  7. 點選 [下一步]。
  8. 指定下列輸入參數的值:

    • 儲存空間 bucket URL:輸入 GCS bucket URI,並加上前置路徑:

      gs://akamai-cloud-monitor/akamai/cloud-monitor/json/
      
      • 取代:

        • akamai-cloud-monitor:您的 GCS bucket 名稱。
        • akamai/cloud-monitor/json:儲存記錄的前置路徑 (必須與 Cloud Function 中的 GCS_PREFIX 相符)。
    • 來源刪除選項:根據偏好設定選取刪除選項:

      • 永不:移轉後一律不刪除任何檔案 (建議用於測試)。
      • 刪除已轉移的檔案:成功轉移檔案後刪除檔案。
      • 刪除已轉移的檔案和空白目錄:成功轉移後刪除檔案和空白目錄。

    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。

    • 資產命名空間akamai.cloud_monitor

    • 擷取標籤:標籤會新增至這個動態饋給中的所有事件 (例如 source=akamai_cloud_monitorformat=json)。

  9. 點選 [下一步]。

  10. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

支援的 Akamai Cloud Monitor 範例記錄

  • JSON:

    {
      "UA": "-",
      "accLang": "-",
      "bytes": "3929",
      "cacheStatus": "1",
      "cliIP": "0.0.0.0",
      "cookie": "-",
      "cp": "848064",
      "customField": "-",
      "dnsLookupTimeMSec": "-",
      "errorCode": "-",
      "maxAgeSec": "31536000",
      "objSize": "3929",
      "overheadBytes": "240",
      "proto": "HTTPS",
      "queryStr": "-",
      "range": "-",
      "referer": "-",
      "reqEndTimeMSec": "4",
      "reqHost": "www.example.com",
      "reqId": "1ce83c03",
      "reqMethod": "GET",
      "reqPath": "assets/images/placeholder-tagline.png",
      "reqPort": "443",
      "reqTimeSec": "1622470405.760",
      "rspContentLen": "3929",
      "rspContentType": "image/png",
      "statusCode": "200",
      "tlsOverheadTimeMSec": "0",
      "tlsVersion": "TLSv1.2",
      "totalBytes": "4599",
      "transferTimeMSec": "0",
      "turnAroundTimeMSec": "0",
      "uncompressedSize": "-",
      "version": "1",
      "xForwardedFor": "-"
    }
    

UDM 對應表

記錄欄位 UDM 對應 邏輯
accLang network.http.user_agent 如果不是「-」或空字串,則直接對應。
城市 principal.location.city 如果不是「-」或空字串,則直接對應。
cliIP principal.ip 如果不是空字串,則直接對應。
國家/地區 principal.location.country_or_region 如果不是「-」或空字串,則直接對應。
cp additional.fields 以鍵/值組合形式對應,鍵為「cp」。
customField about.ip、about.labels、src.ip 系統會將這些值剖析為鍵/值組合。特殊處理「eIp」和「pIp」,分別對應至 src.ip 和 about.ip。其他鍵會對應為 about 內的標籤。
errorCode security_result.summary、security_result.severity 如果存在,則將 security_result.severity 設為「ERROR」,並將值對應至 security_result.summary。
geo.city principal.location.city 如果城市為「-」或空字串,則直接對應。
geo.country principal.location.country_or_region 如果國家/地區為「-」或空字串,則直接對應。
geo.lat principal.location.region_latitude 直接對應,並轉換為浮點數。
geo.long principal.location.region_longitude 直接對應,並轉換為浮點數。
geo.region principal.location.state 直接對應。
id metadata.product_log_id 如果不是空字串,則直接對應。
message.cliIP principal.ip 如果 cliIP 是空字串,則直接對應。
message.fwdHost principal.hostname 直接對應。
message.reqHost target.hostname、target.url 用於建構 target.url 和擷取 target.hostname。
message.reqLen network.sent_bytes 直接對應,如果 totalBytes 為空或「-」,則會轉換為不帶正負號的整數。
message.reqMethod network.http.method 如果 reqMethod 是空字串,則直接對應。
message.reqPath target.url 附加至 target.url。
message.reqPort target.port 直接對應,如果 reqPort 是空字串,則轉換為整數。
message.respLen network.received_bytes 直接對應,轉換為不帶正負號的整數。
message.sslVer network.tls.version 直接對應。
message.status network.http.response_code 直接對應,如果 statusCode 為空或「-」,則轉換為整數。
message.UA network.http.user_agent 如果 UA 為「-」或空字串,則直接對應。
network.asnum additional.fields 以鍵/值組合形式對應,鍵為「asnum」。
network.edgeIP intermediary.ip 直接對應。
network.network additional.fields 以鍵/值組合形式對應,索引鍵為「network」。
network.networkType additional.fields 以鍵/值組合形式對應,索引鍵為「networkType」。
proto network.application_protocol 用於判斷 network.application_protocol。
queryStr target.url 如果不是「-」或空字串,則會附加至 target.url。
參照網址 network.http.referral_url、about.hostname 如果不是「-」,則直接對應。擷取的主機名稱會對應至 about.hostname。
reqHost target.hostname、target.url 用於建構 target.url 和擷取 target.hostname。
reqId metadata.product_log_id、network.session_id 如果 ID 是空字串,則直接對應。也對應至 network.session_id。
reqMethod network.http.method 如果不是空字串,則直接對應。
reqPath target.url 如果不是「-」,則會附加至 target.url。
reqPort target.port 直接對應,並轉換為整數。
reqTimeSec metadata.event_timestamp、timestamp 用於設定事件時間戳記。
start metadata.event_timestamp、timestamp 如果 reqTimeSec 是空白字串,可用於設定事件時間戳記。
statusCode network.http.response_code 直接對應,如果不是「-」或空字串,則轉換為整數。
tlsVersion network.tls.version 直接對應。
totalBytes network.sent_bytes 直接對應,如果不是空白或「-」,則轉換為不帶正負號的整數。
類型 metadata.product_event_type 直接對應。
UA network.http.user_agent 如果不是「-」或空字串,則直接對應。
版本 metadata.product_version 直接對應。
xForwardedFor principal.ip 如果不是「-」或空字串,則直接對應。
(剖析器邏輯) metadata.vendor_name 設為「Akamai」。
(剖析器邏輯) metadata.product_name 設為「Cloud Monitor」。
(剖析器邏輯) metadata.event_type 設為「NETWORK_HTTP」。
(剖析器邏輯) metadata.product_version 如果版本是空字串,請設為「2」。
(剖析器邏輯) metadata.log_type 設為「AKAMAI_CLOUD_MONITOR」。
(剖析器邏輯) network.application_protocol 從 proto 或 message.proto 判斷。如果兩者皆包含「HTTPS」(不區分大小寫),則設為「HTTPS」,否則設為「HTTP」。
(剖析器邏輯) security_result.severity 如果 errorCode 為「-」或空字串,請設為「INFORMATIONAL」。
(剖析器邏輯) target.url 由通訊協定、reqHost (或 message.reqHost)、reqPath (或 message.reqPath) 和 queryStr 建構而成。

需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。