收集 网址Scan IO 日志
支持的平台:
Google SecOps
SIEM
本文档介绍了如何使用 Amazon S3 将网址 Scan IO 日志注入到 Google Security Operations。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例
- 对 网址Scan IO 租户的特权访问权限
- 对 AWS(S3、IAM、Lambda、EventBridge)的特权访问权限
获取 网址Scan IO 前提条件
- 登录 网址Scan IO。
- 点击您的个人资料图标。
- 从菜单中选择 API 密钥。
- 如果您还没有 API 密钥,请执行以下操作:
- 点击创建 API 密钥按钮。
- 输入 API 密钥的说明(例如
Google SecOps Integration)。 - 为密钥选择权限(对于只读访问权限,请选择读取权限)。
- 点击 Generate API Key。
- 复制以下详细信息并将其保存在安全的位置:
- API_KEY:生成的 API 密钥字符串(格式:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) - API 基准网址:
https://urlscan.io/api/v1(所有用户都一样)
- API_KEY:生成的 API 密钥字符串(格式:
- 请注意您的 API 配额限制:
- 免费账号:每天最多可进行 1, 000 次 API 调用,每分钟最多可进行 60 次 API 调用
- 专业版账号:根据订阅方案提供更高的上限
- 如果您需要将搜索范围限制为仅搜索组织的扫描结果,请记下以下内容:
- 用户标识符:您的用户名或电子邮件地址(用于
user:搜索过滤条件) - 团队标识符:如果使用团队功能(与
team:搜索过滤条件搭配使用)
- 用户标识符:您的用户名或电子邮件地址(用于
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶。
- 保存存储桶名称和区域以供日后参考(例如
urlscan-logs-bucket)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 CSV 文件,保存访问密钥和不公开的访问密钥以供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策。
- 依次点击创建政策 > JSON 标签页。
输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::urlscan-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::urlscan-logs-bucket/urlscan/state.json" } ] }- 如果您输入了其他存储桶名称,请替换
urlscan-logs-bucket。
- 如果您输入了其他存储桶名称,请替换
依次点击下一步 > 创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策。
将角色命名为
urlscan-lambda-role,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 urlscan-collector运行时 Python 3.13 架构 x86_64 执行角色 urlscan-lambda-role创建函数后,打开 Code 标签页,删除桩代码并输入以下代码 (
urlscan-collector.py):import json import os import boto3 from datetime import datetime, timedelta import urllib3 import base64 s3 = boto3.client('s3') http = urllib3.PoolManager() def lambda_handler(event, context): # Environment variables bucket = os.environ['S3_BUCKET'] prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] api_key = os.environ['API_KEY'] api_base = os.environ['API_BASE'] search_query = os.environ.get('SEARCH_QUERY', 'date:>now-1h') page_size = int(os.environ.get('PAGE_SIZE', '100')) max_pages = int(os.environ.get('MAX_PAGES', '10')) # Load state state = load_state(bucket, state_key) last_run = state.get('last_run') # Prepare search query if last_run: # Adjust search query based on last run search_time = datetime.fromisoformat(last_run) time_diff = datetime.utcnow() - search_time hours = int(time_diff.total_seconds() / 3600) + 1 search_query = f'date:>now-{hours}h' # Search for scans headers = {'API-Key': api_key} all_results = [] for page in range(max_pages): search_url = f"{api_base}/search/" params = { 'q': search_query, 'size': page_size, 'offset': page * page_size } # Make search request response = http.request( 'GET', search_url, fields=params, headers=headers ) if response.status != 200: print(f"Search failed: {response.status}") break search_data = json.loads(response.data.decode('utf-8')) results = search_data.get('results', []) if not results: break # Fetch full result for each scan for result in results: uuid = result.get('task', {}).get('uuid') if uuid: result_url = f"{api_base}/result/{uuid}/" result_response = http.request( 'GET', result_url, headers=headers ) if result_response.status == 200: full_result = json.loads(result_response.data.decode('utf-8')) all_results.append(full_result) else: print(f"Failed to fetch result for {uuid}: {result_response.status}") # Check if we have more pages if len(results) < page_size: break # Write results to S3 if all_results: now = datetime.utcnow() file_key = f"{prefix}year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/urlscan_{now.strftime('%Y%m%d_%H%M%S')}.json" # Create NDJSON content ndjson_content = '\n'.join([json.dumps(r, separators=(',', ':')) for r in all_results]) # Upload to S3 s3.put_object( Bucket=bucket, Key=file_key, Body=ndjson_content.encode('utf-8'), ContentType='application/x-ndjson' ) print(f"Uploaded {len(all_results)} results to s3://{bucket}/{file_key}") # Update state state['last_run'] = datetime.utcnow().isoformat() save_state(bucket, state_key, state) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Processed {len(all_results)} scan results', 'location': f"s3://{bucket}/{prefix}" }) } def load_state(bucket, key): try: response = s3.get_object(Bucket=bucket, Key=key) return json.loads(response['Body'].read()) except s3.exceptions.NoSuchKey: return {} except Exception as e: print(f"Error loading state: {e}") return {} def save_state(bucket, key, state): try: s3.put_object( Bucket=bucket, Key=key, Body=json.dumps(state), ContentType='application/json' ) except Exception as e: print(f"Error saving state: {e}")依次前往配置 > 环境变量。
依次点击修改 > 添加新的环境变量。
输入以下环境变量,并替换为您的值:
键 示例值 S3_BUCKETurlscan-logs-bucketS3_PREFIXurlscan/STATE_KEYurlscan/state.jsonAPI_KEY<your-api-key>API_BASEhttps://urlscan.io/api/v1SEARCH_QUERYdate:>now-1hPAGE_SIZE100MAX_PAGES10创建函数后,停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 周期性安排:费率 (
1 hour)。 - 目标:您的 Lambda 函数
urlscan-collector。 - 名称:
urlscan-collector-1h。
- 周期性安排:费率 (
- 点击创建时间表。
可选:为 Google SecOps 创建只读 IAM 用户和密钥
- 前往 AWS 控制台 > IAM > 用户。
- 点击 Add users(添加用户)。
- 提供以下配置详细信息:
- 用户:输入
secops-reader。 - 访问类型:选择访问密钥 - 以程序化方式访问。
- 用户:输入
- 点击创建用户。
- 附加最低限度的读取政策(自定义):用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策。
在 JSON 编辑器中,输入以下政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::urlscan-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::urlscan-logs-bucket" } ] }将名称设置为
secops-reader-policy。依次前往创建政策 > 搜索/选择 > 下一步 > 添加权限。
依次前往安全凭据 > 访问密钥 > 创建访问密钥。
下载 CSV(这些值会输入到 Feed 中)。
在 Google SecOps 中配置 Feed 以注入网址 Scan IO 日志
- 依次前往 SIEM 设置 > Feed。
- 点击添加新 Feed。
- 在Feed 名称字段中,输入 Feed 的名称(例如
URLScan IO logs)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 网址Scan IO 作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://urlscan-logs-bucket/urlscan/ - 源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
- 私有访问密钥:有权访问 S3 存储桶的用户私有密钥。
- 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。