收集 Akamai SIEM 连接器日志
本文档介绍了如何使用 Amazon S3 将 Akamai SIEM 连接器日志提取到 Google Security Operations。Akamai SIEM 集成通过 SIEM 集成 API 以 JSON 格式提供来自 Akamai 平台的安全事件。此集成使用 AWS Lambda 定期从 Akamai API 中提取事件,并将其存储在 S3 中,然后 Google SecOps 会提取这些事件。
准备工作
- Google SecOps 实例
- 通过管理 SIEM 用户角色对 Akamai 控制中心进行特权访问
- 已启用 SIEM API 服务的 Akamai API 凭据(读写访问权限级别)
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
在 Akamai Control Center 中启用 SIEM 集成
- 登录 Akamai 控制中心。
- 依次前往 WEB 和数据中心安全性> 安全配置。
- 打开您要收集 SIEM 数据的安全配置(以及相应版本)。
- 点击高级设置,然后展开 SIEM 集成的数据收集。
- 点击开启以启用 SIEM。
选择要导出哪些安全政策的数据:
- 所有安全政策:针对违反安全配置中任何或所有安全政策的事件发送 SIEM 数据。
- 特定安全政策:从下拉列表中选择一项或多项特定安全政策。
可选:如果您使用账号保护器,并希望包含未加密的用户名,请选中包含用户名复选框。
可选:如果您想排除属于特定保护类型和操作的事件,请点击添加例外情况,然后选择您不希望 SIEM 收集的保护措施和关联操作。
点击保存。
复制并保存“SIEM 集成”部分中的安全配置 ID (configId)。您需要此信息才能进行 Lambda 配置。
为 SIEM 集成创建 Akamai API 凭据
- 登录 Akamai 控制中心。
- 依次前往账号管理 > 身份和访问权限 > API 客户端。
- 点击创建 API 客户端。
提供以下配置详细信息:
- API 客户端名称:输入描述性名称(例如
Google SecOps Poller)。 - API 服务:选择 SIEM,并将访问权限级别设置为 READ-WRITE。
- API 客户端名称:输入描述性名称(例如
点击创建 API 客户端。
复制并妥善保存以下凭证:
- 客户端令牌
- 客户端密钥 (Client Secret)
- Access Token
- 主机(例如
example.luna.akamaiapis.net)
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
akamai-siem-logs)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 .csv 文件以保存访问密钥和密钥,供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
复制并粘贴以下政策,然后将
akamai-siem-logs替换为您的存储分区名称:政策 JSON:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::akamai-siem-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::akamai-siem-logs/akamai-siem/state.json" } ] }点击下一步。
输入政策名称
AkamaiSIEMtoS3Policy,然后点击创建政策。依次前往 IAM > 角色 > 创建角色。
选择 AWS 服务。
选择 Lambda 作为用例。
点击下一步。
搜索并选择您刚刚创建的政策
AkamaiSIEMtoS3Policy。点击下一步。
输入角色名称
AkamaiSIEMtoS3Role,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 AkamaiSIEMtoS3Function运行时 Python 3.13 架构 x86_64 执行角色 使用现有角色 现有角色 AkamaiSIEMtoS3Role点击创建函数。
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码:
import json import boto3 import os import urllib3 import hmac import hashlib import base64 from datetime import datetime from urllib.parse import urlparse, urljoin # Configuration from environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ.get('S3_PREFIX', 'akamai-siem/') STATE_KEY = os.environ.get('STATE_KEY', 'akamai-siem/state.json') AKAMAI_HOST = os.environ['AKAMAI_HOST'] AKAMAI_CLIENT_TOKEN = os.environ['AKAMAI_CLIENT_TOKEN'] AKAMAI_CLIENT_SECRET = os.environ['AKAMAI_CLIENT_SECRET'] AKAMAI_ACCESS_TOKEN = os.environ['AKAMAI_ACCESS_TOKEN'] AKAMAI_CONFIG_IDS = os.environ['AKAMAI_CONFIG_IDS'].split(',') LIMIT = int(os.environ.get('LIMIT', '10000')) s3_client = boto3.client('s3') http = urllib3.PoolManager() def load_state(): """Load offset state from S3""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) return json.loads(response['Body'].read().decode('utf-8')) except s3_client.exceptions.NoSuchKey: return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(state): """Save offset state to S3""" try: s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, indent=2).encode('utf-8'), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}") def make_edgegrid_auth_header(url, method='GET'): """Create EdgeGrid authentication header""" timestamp = datetime.utcnow().strftime('%Y%m%dT%H:%M:%S+0000') nonce = base64.b64encode(os.urandom(16)).decode('utf-8') parsed_url = urlparse(url) relative_url = parsed_url.path if parsed_url.query: relative_url += '?' + parsed_url.query auth_header = f'EG1-HMAC-SHA256 ' \ f'client_token={AKAMAI_CLIENT_TOKEN};' \ f'access_token={AKAMAI_ACCESS_TOKEN};' \ f'timestamp={timestamp};' \ f'nonce={nonce};' data_to_sign = '\t'.join([ method, parsed_url.scheme, parsed_url.netloc, relative_url, '', # Request body for GET '', # No additional headers ]) signing_key = hmac.new( AKAMAI_CLIENT_SECRET.encode('utf-8'), timestamp.encode('utf-8'), hashlib.sha256 ).digest() auth_signature = base64.b64encode( hmac.new( signing_key, (data_to_sign + auth_header).encode('utf-8'), hashlib.sha256 ).digest() ).decode('utf-8') return auth_header + f'signature={auth_signature}' def fetch_akamai_events(config_id, offset=None): """Fetch events from Akamai SIEM API""" base_url = f'https://{AKAMAI_HOST}' endpoint = f'/siem/v1/configs/{config_id}' params = f'limit={LIMIT}' if offset: params += f'&offset={offset}' url = f'{base_url}{endpoint}?{params}' try: headers = { 'Authorization': make_edgegrid_auth_header(url) } response = http.request('GET', url, headers=headers, timeout=120) if response.status != 200: print(f"Error response {response.status}: {response.data.decode('utf-8')}") return [], offset # Parse multi-JSON response (newline-delimited JSON) lines = response.data.decode('utf-8').strip().split('\n') events = [] new_offset = offset for line in lines: if not line.strip(): continue try: obj = json.loads(line) # Check if this is offset context (metadata object with offset) if 'offset' in obj and ('total' in obj or 'responseContext' in obj): new_offset = obj.get('offset') continue # This is an event events.append(obj) except json.JSONDecodeError as e: print(f"Warning: Failed to parse line: {e}") continue return events, new_offset except Exception as e: print(f"Error fetching events for config {config_id}: {e}") return [], offset def lambda_handler(event, context): """Lambda handler - fetches Akamai events and writes to S3""" print(f"Starting Akamai SIEM fetch at {datetime.utcnow().isoformat()}Z") state = load_state() total_events = 0 for config_id in AKAMAI_CONFIG_IDS: config_id = config_id.strip() if not config_id: continue print(f"Fetching events for config: {config_id}") current_offset = state.get(config_id) events, new_offset = fetch_akamai_events(config_id, current_offset) if events: print(f"Fetched {len(events)} events for config {config_id}") # Write events to S3 as newline-delimited JSON timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') s3_key = f'{S3_PREFIX}{config_id}/{timestamp}.json' payload = '\n'.join(json.dumps(event) for event in events) try: s3_client.put_object( Bucket=S3_BUCKET, Key=s3_key, Body=payload.encode('utf-8'), ContentType='application/json' ) print(f"Wrote {len(events)} events to s3://{S3_BUCKET}/{s3_key}") # Update offset only after successful write if new_offset: state[config_id] = new_offset total_events += len(events) except Exception as e: print(f"Error writing to S3: {e}") else: print(f"No new events for config {config_id}") # Save updated state save_state(state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {total_events} events', 'configs_processed': len(AKAMAI_CONFIG_IDS) }) }点击部署以保存代码。
依次前往配置 > 环境变量。
点击修改。
针对以下各项点击添加环境变量:
环境变量
键 示例值 S3_BUCKETakamai-siem-logsS3_PREFIXakamai-siem/STATE_KEYakamai-siem/state.jsonAKAMAI_HOSTexample.luna.akamaiapis.netAKAMAI_CLIENT_TOKENyour-client-tokenAKAMAI_CLIENT_SECRETyour-client-secretAKAMAI_ACCESS_TOKENyour-access-tokenAKAMAI_CONFIG_IDS12345,67890LIMIT10000点击保存。
依次前往配置 > 常规配置。
点击修改。
将超时更改为 5 分钟(300 秒)。
点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
提供以下配置详细信息:
- 时间安排名称:输入
AkamaiSIEMtoS3-5min。 - 安排模式:选择周期性安排。
- 安排类型:选择基于费率的安排。
- 速率表达式:输入
5,然后选择分钟。
- 时间安排名称:输入
点击下一步。
提供以下配置详细信息:
- 目标:选择 AWS Lambda 调用。
- Lambda 函数:选择
AkamaiSIEMtoS3Function。
点击下一步。
点击下一步(跳过可选设置)。
检查并点击创建时间表。
在 Google SecOps 中配置 Feed 以注入 Akamai SIEM 连接器日志
- 依次前往 SIEM 设置 > Feed。
- 点击 + 添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Akamai SIEM Connector)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Akamai SIEM 连接器作为日志类型。
- 点击下一步。
为以下输入参数指定值:
- S3 URI:
s3://akamai-siem-logs/akamai-siem/ - 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储分区的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
点击下一步。
在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。