收集 Akamai SIEM 連接器記錄
本文說明如何使用 Amazon S3,將 Akamai SIEM Connector 記錄擷取至 Google Security Operations。Akamai SIEM 整合功能會透過 SIEM 整合 API,以 JSON 格式提供 Akamai 平台中的安全事件。這項整合功能會使用 AWS Lambda,定期從 Akamai API 擷取事件並儲存在 S3 中,供 Google SecOps 擷取。
事前準備
- Google SecOps 執行個體
- 具備「管理 SIEM」使用者角色的 Akamai Control Center 存取權
- 已啟用 SIEM API 服務的 Akamai API 憑證 (讀取/寫入存取層級)
- AWS 的特殊存取權 (S3、IAM、Lambda、EventBridge)
在 Akamai Control Center 中啟用 SIEM 整合
- 登入 Akamai Control Center。
- 依序前往「WEB & DATA CENTER SECURITY」(網站和資料中心安全性) >「Security Configuration」(安全性設定)。
- 開啟要收集 SIEM 資料的「安全性設定」 (和適當版本)。
- 按一下「進階設定」,然後展開「SIEM 整合的資料收集」。
- 按一下「開啟」即可啟用 SIEM。
選擇要匯出資料的安全政策:
- 所有安全政策:針對違反安全設定中任何或所有安全政策的事件,傳送 SIEM 資料。
- 特定安全性政策:從下拉式清單中選取一或多項特定安全性政策。
選用:如果您使用 Account Protector,並想加入未加密的使用者名稱,請勾選「Include username」(加入使用者名稱) 核取方塊。
選用:如要排除屬於特定保護措施類型和動作的事件,請按一下「新增例外狀況」,然後選取您不希望 SIEM 收集的保護措施和相關動作。
按一下 [儲存]。
從 SIEM 整合部分複製並儲存安全設定 ID (configId)。Lambda 設定會需要這項資訊。
為 SIEM 整合建立 Akamai API 憑證
- 登入 Akamai Control Center。
- 依序前往「帳戶管理員」>「身分與存取權」>「API 用戶端」。
- 按一下「建立 API 用戶端」。
請提供下列設定詳細資料:
- API 用戶端名稱:輸入描述性名稱 (例如
Google SecOps Poller)。 - API 服務:選取「SIEM」,並將存取層級設為「READ-WRITE」。
- API 用戶端名稱:輸入描述性名稱 (例如
按一下「建立 API 用戶端」。
複製並妥善儲存下列憑證:
- 用戶端憑證
- 用戶端密碼
- 存取權杖
- 主機 (例如
example.luna.akamaiapis.net)
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 bucket:建立 bucket
- 儲存 bucket 的「名稱」和「地區」,以供日後參考 (例如
akamai-siem-logs)。 - 請按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選 [下一步]。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「Download .csv file」(下載 .csv 檔案),儲存「Access Key」(存取金鑰) 和「Secret Access Key」(私密存取金鑰),以供日後參考。
- 按一下 [完成]。
- 選取「權限」分頁標籤。
- 在「權限政策」部分中,按一下「新增權限」。
- 選取「新增權限」。
- 選取「直接附加政策」。
- 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
- 選取政策。
- 點選 [下一步]。
- 按一下「新增權限」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
複製並貼上下列政策,然後將
akamai-siem-logs替換為您的值區名稱:政策 JSON:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::akamai-siem-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::akamai-siem-logs/akamai-siem/state.json" } ] }點選 [下一步]。
輸入政策名稱
AkamaiSIEMtoS3Policy,然後按一下「建立政策」。依序前往「IAM」>「角色」>「建立角色」。
選取「AWS 服務」。
選取「Lambda」Lambda做為用途。
點選 [下一步]。
搜尋並選取您剛建立的政策
AkamaiSIEMtoS3Policy。點選 [下一步]。
輸入角色名稱
AkamaiSIEMtoS3Role,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
請提供下列設定詳細資料:
設定 值 名稱 AkamaiSIEMtoS3Function執行階段 Python 3.13 架構 x86_64 執行角色 使用現有角色 現有角色 AkamaiSIEMtoS3Role按一下「Create function」(建立函式)。
建立函式後,開啟「程式碼」分頁,刪除存根並貼上下列程式碼:
import json import boto3 import os import urllib3 import hmac import hashlib import base64 from datetime import datetime from urllib.parse import urlparse, urljoin # Configuration from environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ.get('S3_PREFIX', 'akamai-siem/') STATE_KEY = os.environ.get('STATE_KEY', 'akamai-siem/state.json') AKAMAI_HOST = os.environ['AKAMAI_HOST'] AKAMAI_CLIENT_TOKEN = os.environ['AKAMAI_CLIENT_TOKEN'] AKAMAI_CLIENT_SECRET = os.environ['AKAMAI_CLIENT_SECRET'] AKAMAI_ACCESS_TOKEN = os.environ['AKAMAI_ACCESS_TOKEN'] AKAMAI_CONFIG_IDS = os.environ['AKAMAI_CONFIG_IDS'].split(',') LIMIT = int(os.environ.get('LIMIT', '10000')) s3_client = boto3.client('s3') http = urllib3.PoolManager() def load_state(): """Load offset state from S3""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read().decode('utf-8')) except s3_client.exceptions.NoSuchKey: return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(state): """Save offset state to S3""" try: s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2).encode('utf-8'), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}") def make_edgegrid_auth_header(url, method='GET'): """Create EdgeGrid authentication header""" timestamp = datetime.utcnow().strftime('%Y%m%dT%H:%M:%S+0000') nonce = base64.b64encode(os.urandom(16)).decode('utf-8') parsed_url = urlparse(url) relative_url = parsed_url.path if parsed_url.query: relative_url += '?' + parsed_url.query auth_header = f'EG1-HMAC-SHA256 ' \ f'client_token={AKAMAI_CLIENT_TOKEN};' \ f'access_token={AKAMAI_ACCESS_TOKEN};' \ f'timestamp={timestamp};' \ f'nonce={nonce};' data_to_sign = '\t'.join([ method, parsed_url.scheme, parsed_url.netloc, relative_url, '', # Request body for GET '', # No additional headers ]) signing_key = hmac.new( AKAMAI_CLIENT_SECRET.encode('utf-8'), timestamp.encode('utf-8'), hashlib.sha256 ).digest() auth_signature = base64.b64encode( hmac.new( signing_key, (data_to_sign + auth_header).encode('utf-8'), hashlib.sha256 ).digest() ).decode('utf-8') return auth_header + f'signature={auth_signature}' def fetch_akamai_events(config_id, offset=None): """Fetch events from Akamai SIEM API""" base_url = f'https://{AKAMAI_HOST}' endpoint = f'/siem/v1/configs/{config_id}' params = f'limit={LIMIT}' if offset: params += f'&offset={offset}' url = f'{base_url}{endpoint}?{params}' try: headers = { 'Authorization': make_edgegrid_auth_header(url) } response = http.request('GET', url, headers=headers, timeout=120) if response.status != 200: print(f"Error response {response.status}: {response.data.decode('utf-8')}") return [], offset # Parse multi-JSON response (newline-delimited JSON) lines = response.data.decode('utf-8').strip().split('\n') events = [] new_offset = offset for line in lines: if not line.strip(): continue try: obj = json.loads(line) # Check if this is offset context (metadata object with offset) if 'offset' in obj and ('total' in obj or 'responseContext' in obj): new_offset = obj.get('offset') continue # This is an event events.append(obj) except json.JSONDecodeError as e: print(f"Warning: Failed to parse line: {e}") continue return events, new_offset except Exception as e: print(f"Error fetching events for config {config_id}: {e}") return [], offset def lambda_handler(event, context): """Lambda handler - fetches Akamai events and writes to S3""" print(f"Starting Akamai SIEM fetch at {datetime.utcnow().isoformat()}Z") state = load_state() total_events = 0 for config_id in AKAMAI_CONFIG_IDS: config_id = config_id.strip() if not config_id: continue print(f"Fetching events for config: {config_id}") current_offset = state.get(config_id) events, new_offset = fetch_akamai_events(config_id, current_offset) if events: print(f"Fetched {len(events)} events for config {config_id}") # Write events to S3 as newline-delimited JSON timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') s3_key = f'{S3_PREFIX}{config_id}/{timestamp}.json' payload = '\n'.join(json.dumps(event) for event in events) try: s3_client.put_object( Bucket=S3_BUCKET, Key=s3_key, Body=payload.encode('utf-8'), ContentType='application/json' ) print(f"Wrote {len(events)} events to s3://{S3_BUCKET}/{s3_key}") # Update offset only after successful write if new_offset: state[config_id] = new_offset total_events += len(events) except Exception as e: print(f"Error writing to S3: {e}") else: print(f"No new events for config {config_id}") # Save updated state save_state(state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {total_events} events', 'configs_processed': len(AKAMAI_CONFIG_IDS) }) }按一下「Deploy」(部署) 即可儲存程式碼。
依序前往「設定」>「環境變數」。
按一下 [編輯]。
針對下列各項,按一下「新增環境變數」:
環境變數
鍵 範例值 S3_BUCKETakamai-siem-logsS3_PREFIXakamai-siem/STATE_KEYakamai-siem/state.jsonAKAMAI_HOSTexample.luna.akamaiapis.netAKAMAI_CLIENT_TOKENyour-client-tokenAKAMAI_CLIENT_SECRETyour-client-secretAKAMAI_ACCESS_TOKENyour-access-tokenAKAMAI_CONFIG_IDS12345,67890LIMIT10000按一下 [儲存]。
依序前往「設定」>「一般設定」。
按一下 [編輯]。
將「Timeout」(逾時間隔) 變更為 5 分鐘 (300 秒)。
按一下 [儲存]。
建立 EventBridge 排程
- 依序前往 Amazon EventBridge > Scheduler > Create schedule。
請提供下列設定詳細資料:
- 時間表名稱:輸入
AkamaiSIEMtoS3-5min。 - 排程模式:選取「週期性排程」。
- 時間表類型:選取「以費率為準的時間表」。
- 費率運算式:輸入
5並選取「分鐘」。
- 時間表名稱:輸入
點選 [下一步]。
請提供下列設定詳細資料:
- 目標:選取「AWS Lambda Invoke」。
- Lambda 函式:選取
AkamaiSIEMtoS3Function。
點選 [下一步]。
按一下「下一步」 (略過選用設定)。
檢查並按一下「建立排程」。
在 Google SecOps 中設定動態饋給,擷取 Akamai SIEM Connector 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Akamai SIEM Connector)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「Akamai SIEM 連接器」做為「記錄類型」。
- 點選 [下一步]。
指定下列輸入參數的值:
- S3 URI:
s3://akamai-siem-logs/akamai-siem/ - 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 儲存空間存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
點選 [下一步]。
在「Finalize」(完成) 畫面中檢查新的動態饋給設定,然後按一下「Submit」(提交)。
需要其他協助嗎?向社群成員和 Google SecOps 專業人員尋求答案。