收集 Akamai SIEM 連接器記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Akamai SIEM Connector 記錄擷取至 Google Security Operations。Akamai SIEM 整合功能會透過 SIEM 整合 API,以 JSON 格式提供 Akamai 平台中的安全事件。這項整合功能會使用 AWS Lambda,定期從 Akamai API 擷取事件並儲存在 S3 中,供 Google SecOps 擷取。

事前準備

  • Google SecOps 執行個體
  • 具備「管理 SIEM」使用者角色的 Akamai Control Center 存取權
  • 已啟用 SIEM API 服務的 Akamai API 憑證 (讀取/寫入存取層級)
  • AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)

在 Akamai Control Center 中啟用 SIEM 整合

  1. 登入 Akamai Control Center
  2. 依序前往「WEB & DATA CENTER SECURITY」(網站和資料中心安全性) >「Security Configuration」(安全性設定)。
  3. 開啟要收集 SIEM 資料的「安全性設定」 (和適當版本)。
  4. 按一下「進階設定」,然後展開「SIEM 整合的資料收集」
  5. 按一下「開啟」即可啟用 SIEM。
  6. 選擇要匯出資料的安全政策:

    • 所有安全政策:針對違反安全設定中任何或所有安全政策的事件,傳送 SIEM 資料。
    • 特定安全性政策:從下拉式清單中選取一或多項特定安全性政策。
  7. 選用:如果您使用 Account Protector,並想加入未加密的使用者名稱,請勾選「Include username」(加入使用者名稱) 核取方塊。

  8. 選用:如要排除屬於特定保護措施類型和動作的事件,請按一下「新增例外狀況」,然後選取您不希望 SIEM 收集的保護措施和相關動作。

  9. 按一下 [儲存]

  10. 從 SIEM 整合部分複製並儲存安全設定 ID (configId)。Lambda 設定會需要這項資訊。

為 SIEM 整合建立 Akamai API 憑證

  1. 登入 Akamai Control Center
  2. 依序前往「帳戶管理員」>「身分與存取權」>「API 用戶端」
  3. 按一下「建立 API 用戶端」
  4. 請提供下列設定詳細資料:

    • API 用戶端名稱:輸入描述性名稱 (例如 Google SecOps Poller)。
    • API 服務:選取「SIEM」,並將存取層級設為「READ-WRITE」
  5. 按一下「建立 API 用戶端」

  6. 複製並妥善儲存下列憑證:

    • 用戶端憑證
    • 用戶端密碼
    • 存取權杖
    • 主機 (例如 example.luna.akamaiapis.net)

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「地區」,以供日後參考 (例如 akamai-siem-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選 [下一步]。
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「Download .csv file」(下載 .csv 檔案),儲存「Access Key」(存取金鑰) 和「Secret Access Key」(私密存取金鑰),以供日後參考。
  12. 按一下 [完成]。
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選 [下一步]。
  20. 按一下「新增權限」

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤
  2. 複製並貼上下列政策,然後將 akamai-siem-logs 替換為您的值區名稱:

    政策 JSON

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::akamai-siem-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::akamai-siem-logs/akamai-siem/state.json"
        }
      ]
    }
    
  3. 點選 [下一步]。

  4. 輸入政策名稱 AkamaiSIEMtoS3Policy,然後按一下「建立政策」

  5. 依序前往「IAM」>「角色」>「建立角色」

  6. 選取「AWS 服務」

  7. 選取「Lambda」Lambda做為用途。

  8. 點選 [下一步]。

  9. 搜尋並選取您剛建立的政策 AkamaiSIEMtoS3Policy

  10. 點選 [下一步]。

  11. 輸入角色名稱 AkamaiSIEMtoS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 AkamaiSIEMtoS3Function
    執行階段 Python 3.13
    架構 x86_64
    執行角色 使用現有角色
    現有角色 AkamaiSIEMtoS3Role
  4. 按一下「Create function」(建立函式)

  5. 建立函式後,開啟「程式碼」分頁,刪除存根並貼上下列程式碼:

    import json
    import boto3
    import os
    import urllib3
    import hmac
    import hashlib
    import base64
    from datetime import datetime
    from urllib.parse import urlparse, urljoin
    
    # Configuration from environment variables
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ.get('S3_PREFIX', 'akamai-siem/')
    STATE_KEY = os.environ.get('STATE_KEY', 'akamai-siem/state.json')
    
    AKAMAI_HOST = os.environ['AKAMAI_HOST']
    AKAMAI_CLIENT_TOKEN = os.environ['AKAMAI_CLIENT_TOKEN']
    AKAMAI_CLIENT_SECRET = os.environ['AKAMAI_CLIENT_SECRET']
    AKAMAI_ACCESS_TOKEN = os.environ['AKAMAI_ACCESS_TOKEN']
    AKAMAI_CONFIG_IDS = os.environ['AKAMAI_CONFIG_IDS'].split(',')
    
    LIMIT = int(os.environ.get('LIMIT', '10000'))
    
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def load_state():
        """Load offset state from S3"""
        try:
            response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read().decode('utf-8'))
        except s3_client.exceptions.NoSuchKey:
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(state):
        """Save offset state to S3"""
        try:
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state, indent=2).encode('utf-8'),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
    def make_edgegrid_auth_header(url, method='GET'):
        """Create EdgeGrid authentication header"""
        timestamp = datetime.utcnow().strftime('%Y%m%dT%H:%M:%S+0000')
        nonce = base64.b64encode(os.urandom(16)).decode('utf-8')
    
        parsed_url = urlparse(url)
        relative_url = parsed_url.path
        if parsed_url.query:
            relative_url += '?' + parsed_url.query
    
        auth_header = f'EG1-HMAC-SHA256 ' \
                    f'client_token={AKAMAI_CLIENT_TOKEN};' \
                    f'access_token={AKAMAI_ACCESS_TOKEN};' \
                    f'timestamp={timestamp};' \
                    f'nonce={nonce};'
    
        data_to_sign = '\t'.join([
            method,
            parsed_url.scheme,
            parsed_url.netloc,
            relative_url,
            '',  # Request body for GET
            '',  # No additional headers
        ])
    
        signing_key = hmac.new(
            AKAMAI_CLIENT_SECRET.encode('utf-8'),
            timestamp.encode('utf-8'),
            hashlib.sha256
        ).digest()
    
        auth_signature = base64.b64encode(
            hmac.new(
                signing_key,
                (data_to_sign + auth_header).encode('utf-8'),
                hashlib.sha256
            ).digest()
        ).decode('utf-8')
    
        return auth_header + f'signature={auth_signature}'
    
    def fetch_akamai_events(config_id, offset=None):
        """Fetch events from Akamai SIEM API"""
        base_url = f'https://{AKAMAI_HOST}'
        endpoint = f'/siem/v1/configs/{config_id}'
    
        params = f'limit={LIMIT}'
        if offset:
            params += f'&offset={offset}'
    
        url = f'{base_url}{endpoint}?{params}'
    
        try:
            headers = {
                'Authorization': make_edgegrid_auth_header(url)
            }
    
            response = http.request('GET', url, headers=headers, timeout=120)
    
            if response.status != 200:
                print(f"Error response {response.status}: {response.data.decode('utf-8')}")
                return [], offset
    
            # Parse multi-JSON response (newline-delimited JSON)
            lines = response.data.decode('utf-8').strip().split('\n')
            events = []
            new_offset = offset
    
            for line in lines:
                if not line.strip():
                    continue
                try:
                    obj = json.loads(line)
    
                    # Check if this is offset context (metadata object with offset)
                    if 'offset' in obj and ('total' in obj or 'responseContext' in obj):
                        new_offset = obj.get('offset')
                        continue
    
                    # This is an event
                    events.append(obj)
                except json.JSONDecodeError as e:
                    print(f"Warning: Failed to parse line: {e}")
                    continue
    
            return events, new_offset
    
        except Exception as e:
            print(f"Error fetching events for config {config_id}: {e}")
            return [], offset
    
    def lambda_handler(event, context):
        """Lambda handler - fetches Akamai events and writes to S3"""
        print(f"Starting Akamai SIEM fetch at {datetime.utcnow().isoformat()}Z")
    
        state = load_state()
        total_events = 0
    
        for config_id in AKAMAI_CONFIG_IDS:
            config_id = config_id.strip()
            if not config_id:
                continue
    
            print(f"Fetching events for config: {config_id}")
    
            current_offset = state.get(config_id)
            events, new_offset = fetch_akamai_events(config_id, current_offset)
    
            if events:
                print(f"Fetched {len(events)} events for config {config_id}")
    
                # Write events to S3 as newline-delimited JSON
                timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f'{S3_PREFIX}{config_id}/{timestamp}.json'
    
                payload = '\n'.join(json.dumps(event) for event in events)
    
                try:
                    s3_client.put_object(
                        Bucket=S3_BUCKET,
                        Key=s3_key,
                        Body=payload.encode('utf-8'),
                        ContentType='application/json'
                    )
                    print(f"Wrote {len(events)} events to s3://{S3_BUCKET}/{s3_key}")
    
                    # Update offset only after successful write
                    if new_offset:
                        state[config_id] = new_offset
                        total_events += len(events)
                except Exception as e:
                    print(f"Error writing to S3: {e}")
            else:
                print(f"No new events for config {config_id}")
    
        # Save updated state
        save_state(state)
    
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Successfully processed {total_events} events',
                'configs_processed': len(AKAMAI_CONFIG_IDS)
            })
        }
    
  6. 按一下「Deploy」(部署) 即可儲存程式碼。

  7. 依序前往「設定」>「環境變數」

  8. 按一下 [編輯]

  9. 針對下列各項,按一下「新增環境變數」

    環境變數

    範例值
    S3_BUCKET akamai-siem-logs
    S3_PREFIX akamai-siem/
    STATE_KEY akamai-siem/state.json
    AKAMAI_HOST example.luna.akamaiapis.net
    AKAMAI_CLIENT_TOKEN your-client-token
    AKAMAI_CLIENT_SECRET your-client-secret
    AKAMAI_ACCESS_TOKEN your-access-token
    AKAMAI_CONFIG_IDS 12345,67890
    LIMIT 10000
  10. 按一下 [儲存]

  11. 依序前往「設定」>「一般設定」

  12. 按一下 [編輯]

  13. 將「Timeout」(逾時間隔) 變更為 5 分鐘 (300 秒)

  14. 按一下 [儲存]

建立 EventBridge 排程

  1. 依序前往 Amazon EventBridge > Scheduler > Create schedule
  2. 請提供下列設定詳細資料:

    • 時間表名稱:輸入 AkamaiSIEMtoS3-5min
    • 排程模式:選取「週期性排程」
    • 時間表類型:選取「以費率為準的時間表」
    • 費率運算式:輸入 5 並選取「分鐘」
  3. 點選 [下一步]。

  4. 請提供下列設定詳細資料:

    • 目標:選取「AWS Lambda Invoke」
    • Lambda 函式:選取 AkamaiSIEMtoS3Function
  5. 點選 [下一步]。

  6. 按一下「下一步」 (略過選用設定)。

  7. 檢查並按一下「建立排程」

在 Google SecOps 中設定動態饋給,擷取 Akamai SIEM Connector 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Akamai SIEM Connector)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Akamai SIEM 連接器」做為「記錄類型」
  6. 點選 [下一步]。
  7. 指定下列輸入參數的值:

    • S3 URIs3://akamai-siem-logs/akamai-siem/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 儲存空間存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選 [下一步]。

  9. 在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)

需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。