收集 Slack 审核日志
本指南介绍了如何使用 Google Cloud Run Functions 或 Amazon S3 与 AWS Lambda 将 Slack 审核日志注入到 Google Security Operations。
准备工作
请确保满足以下前提条件:
- Google SecOps 实例。
- Slack 企业网格方案,并拥有组织所有者或管理员访问权限。
- 对以下任一内容的特权访问权限:
- Google Cloud(对于选项 1:Cloud Run Functions 和 Cloud Scheduler),或
- AWS(对于选项 2:S3、IAM、Lambda、EventBridge)。
收集 Slack 审核日志的前提条件(应用 ID、OAuth 令牌、组织 ID)
Slack 审核日志 API 需要具有 auditlogs:read 范围的用户 OAuth 令牌。此令牌必须通过在企业网格组织级层(而非工作区级层)安装应用来获取。
为审核日志创建 Slack 应用
- 使用 Enterprise Grid 组织所有者或管理员账号登录 Slack 管理控制台。
- 前往 https://api.slack.com/apps,然后依次点击创建新应用 > 从头开始。
- 提供以下配置详细信息:
- 应用名称:输入一个描述性名称(例如
Google SecOps Audit Integration)。 - 选择要在哪个工作区中开发应用:选择您的开发 Slack 工作区(组织中的任何工作区)。
- 应用名称:输入一个描述性名称(例如
- 点击创建应用。
配置 OAuth 范围
- 在左侧边栏中,前往 OAuth 和权限。
- 向下滚动到范围部分。
- 在用户令牌范围(而非聊天机器人令牌范围)下,点击添加 OAuth 范围。
- 添加范围:
auditlogs:read。
启用公开分发
- 前往左侧边栏中的管理分发。
- 在 Share Your App with Other Workspaces 下,确保所有四个部分都带有绿色对勾标记:
- 移除硬编码信息
- 启用公开分发
- 设置重定向网址
- 添加 OAuth 范围
- 点击启用公开分发。
将应用安装到企业网格组织
- 在左侧边栏中,前往 OAuth 和权限。
点击安装到组织(而非“安装到工作区”)。
查看所请求的权限,然后点击允许。
授权完成后,系统会将您重定向回“OAuth 和权限”页面。
检索凭据
- 在 Workspace 的 OAuth 令牌下,找到用户 OAuth 令牌。
复制并安全地保存以
xoxp-开头的令牌(例如xoxp-1234567890-0987654321-1234567890-abc123def456)。记下您的组织 ID:
- 前往 Slack 管理控制台。
- 依次前往设置和权限 > 组织设置。
- 复制组织 ID。
方案 1:使用 Google Cloud Run Functions 配置 Slack 审核日志导出
此选项使用 Google Cloud Run Functions 和 Cloud Scheduler 收集 Slack 审核日志,并将其直接注入到 Google SecOps 中。
设置目录
- 在本地机器上创建一个新目录,用于部署 Cloud Run 函数。
- 从 Chronicle 提取脚本 GitHub 代码库下载以下文件:
- 从 slack 文件夹中下载:
.env.ymlmain.pyrequirements.txt
- 从代码库的根目录下载整个通用目录及其所有文件:
common/__init__.pycommon/auth.pycommon/env_constants.pycommon/ingest.pycommon/status.pycommon/utils.py
- 从 slack 文件夹中下载:
- 将所有下载的文件放入部署目录。
您的目录结构应如下所示:
deployment_directory/
├─common/
│ ├─__init__.py
│ ├─auth.py
│ ├─env_constants.py
│ ├─ingest.py
│ ├─status.py
│ └─utils.py
├─.env.yml
├─main.py
└─requirements.txt
在 Google Secret Manager 中创建 Secret
- 在 Google Cloud 控制台中,依次前往安全性 > Secret Manager。
- 点击创建 Secret。
- 为 Chronicle 服务账号提供以下配置详细信息:
- 名称:输入
chronicle-service-account。 - Secret 值:粘贴您的 Google SecOps 提取身份验证 JSON 文件的内容。
- 名称:输入
- 点击创建密钥。
- 复制 Secret 资源名称,格式为:
projects/<PROJECT_ID>/secrets/chronicle-service-account/versions/latest。 - 再次点击创建 Secret 以创建第二个 Secret。
- 为 Slack 令牌提供以下配置详细信息:
- 名称:输入
slack-admin-token。 - 密钥值:粘贴您的 Slack 用户 OAuth 令牌(以
xoxp-开头)。
- 名称:输入
- 点击创建密钥。
- 复制 Secret 资源名称,格式为:
projects/<PROJECT_ID>/secrets/slack-admin-token/versions/latest。
设置所需的运行时环境变量
- 打开部署目录中的
.env.yml文件。 使用您的值配置环境变量:
CHRONICLE_CUSTOMER_ID: "<your-chronicle-customer-id>" CHRONICLE_REGION: us CHRONICLE_SERVICE_ACCOUNT: "projects/<PROJECT_ID>/secrets/chronicle-service-account/versions/latest" CHRONICLE_NAMESPACE: "" POLL_INTERVAL: "5" SLACK_ADMIN_TOKEN: "projects/<PROJECT_ID>/secrets/slack-admin-token/versions/latest"- 替换以下内容:
<your-chronicle-customer-id>:您的 Google SecOps 客户 ID。<PROJECT_ID>:您的 Google Cloud 项目 ID。- CHRONICLE_REGION:设置为您的 Google SecOps 区域。有效值:
us、asia-northeast1、asia-south1、asia-southeast1、australia-southeast1、europe、europe-west2、europe-west3、europe-west6、europe-west9、europe-west12、me-central1、me-central2、me-west1、northamerica-northeast2、southamerica-east1。 - POLL_INTERVAL:函数执行的频率间隔(以分钟为单位)。此时长必须与 Cloud Scheduler 作业间隔相同。
- 替换以下内容:
保存
.env.yml文件。
部署 Cloud Run 函数
- 在 Google Cloud 控制台中打开终端或 Cloud Shell。
前往部署目录:
cd /path/to/deployment_directory执行以下命令以部署 Cloud Run 函数:
gcloud functions deploy slack-audit-to-chronicle \ --entry-point main \ --trigger-http \ --runtime python39 \ --env-vars-file .env.yml \ --timeout 300s \ --memory 512MB \ --service-account <SERVICE_ACCOUNT_EMAIL>将
<SERVICE_ACCOUNT_EMAIL>替换为您希望 Cloud Run 函数使用的服务账号的电子邮件地址。
等待部署完成。
部署完成后,请记下输出中的函数网址。
设置 Cloud Scheduler
- 在 Google Cloud 控制台中,依次前往 Cloud Scheduler > 创建作业。
- 提供以下配置详细信息:
- 名称:输入
slack-audit-scheduler。 - 区域:选择您部署 Cloud Run 函数时所用的同一区域。
- 频率:输入
*/5 * * * *(每 5 分钟运行一次,与POLL_INTERVAL值一致)。 - 时区:选择 UTC。
- 目标类型:选择 HTTP。
- 网址:输入部署输出中的 Cloud Run 函数网址。
- HTTP 方法:选择 POST。
- 身份验证标头:选择添加 OIDC 令牌。
- 服务账号:选择与 Cloud Run 函数所用的服务账号相同的服务账号。
- 名称:输入
- 点击创建。
选项 2:使用 AWS S3 配置 Slack 审核日志导出
此选项使用 AWS Lambda 收集 Slack 审核日志并将其存储在 S3 中,然后配置 Google SecOps Feed 以注入日志。
为 Google SecOps 配置 AWS S3 存储桶和 IAM
- 按照以下用户指南创建 Amazon S3 存储桶:创建存储桶
- 保存存储桶名称和区域以供日后参考(例如
slack-audit-logs)。 - 按照以下用户指南创建用户:创建 IAM 用户。
- 选择创建的用户。
- 选择安全凭据标签页。
- 在访问密钥部分中,点击创建访问密钥。
- 选择第三方服务作为使用情形。
- 点击下一步。
- 可选:添加说明标记。
- 点击创建访问密钥。
- 点击下载 .csv 文件以保存访问密钥和密钥,供日后参考。
- 点击完成。
- 选择权限标签页。
- 在权限政策部分中,点击添加权限。
- 选择添加权限。
- 选择直接附加政策。
- 搜索 AmazonS3FullAccess 政策。
- 选择相应政策。
- 点击下一步。
- 点击添加权限。
为 S3 上传配置 IAM 政策和角色
- 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页。
- 复制并粘贴下方的政策。
政策 JSON(如果您输入了其他存储桶名称,请替换
slack-audit-logs):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::slack-audit-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::slack-audit-logs/slack/audit/state.json" } ] }点击下一步。
输入政策名称
SlackAuditS3Policy。点击创建政策。
依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda。
附加新创建的政策
SlackAuditS3Policy。将角色命名为
SlackAuditToS3Role,然后点击创建角色。
创建 Lambda 函数
- 在 AWS 控制台中,依次前往 Lambda > 函数 > 创建函数。
- 点击从头开始创作。
提供以下配置详细信息:
设置 值 名称 slack_audit_to_s3运行时 Python 3.13 架构 x86_64 执行角色 SlackAuditToS3Role点击创建函数。
创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (
slack_audit_to_s3.py)。#!/usr/bin/env python3 # Lambda: Pull Slack Audit Logs (Enterprise Grid) to S3 (JSONL format) import os, json, time, urllib.parse from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BASE_URL = "https://api.slack.com/audit/v1/logs" TOKEN = os.environ["SLACK_AUDIT_TOKEN"] # org-level user token with auditlogs:read BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "slack/audit/") STATE_KEY = os.environ.get("STATE_KEY", "slack/audit/state.json") LIMIT = int(os.environ.get("LIMIT", "200")) # Slack recommends <= 200 MAX_PAGES = int(os.environ.get("MAX_PAGES", "20")) LOOKBACK_SEC = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # First-run window HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60")) HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3")) RETRY_AFTER_DEFAULT = int(os.environ.get("RETRY_AFTER_DEFAULT", "2")) # Optional server-side filters (comma-separated 'action' values), empty means no filter ACTIONS = os.environ.get("ACTIONS", "").strip() s3 = boto3.client("s3") def _get_state() -> dict: try: obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY) st = json.loads(obj["Body"].read() or b"{}") return {"cursor": st.get("cursor")} except Exception: return {"cursor": None} def _put_state(state: dict) -> None: body = json.dumps(state, separators=(",", ":")).encode("utf-8") s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json") def _http_get(params: dict) -> dict: qs = urllib.parse.urlencode(params, doseq=True) url = f"{BASE_URL}?{qs}" if qs else BASE_URL req = Request(url, method="GET") req.add_header("Authorization", f"Bearer {TOKEN}") req.add_header("Accept", "application/json") attempt = 0 while True: try: with urlopen(req, timeout=HTTP_TIMEOUT) as r: return json.loads(r.read().decode("utf-8")) except HTTPError as e: # Respect Retry-After on 429/5xx if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES: retry_after = 0 try: retry_after = int(e.headers.get("Retry-After", RETRY_AFTER_DEFAULT)) except Exception: retry_after = RETRY_AFTER_DEFAULT time.sleep(max(1, retry_after)) attempt += 1 continue # Re-raise other HTTP errors raise except URLError: if attempt < HTTP_RETRIES: time.sleep(RETRY_AFTER_DEFAULT) attempt += 1 continue raise def _write_page(data: dict, page_idx: int) -> str: """ Extract entries from Slack API response and write as JSONL (one event per line). Chronicle requires newline-delimited JSON, not a JSON array. """ entries = data.get("entries") or [] if not entries: # No entries to write, skip file creation return None # Convert each entry to a single-line JSON string lines = [json.dumps(entry, separators=(",", ":")) for entry in entries] # Join with newlines to create JSONL format body = "\n".join(lines).encode("utf-8") # Write to S3 ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime()) key = f"{PREFIX}{ts}-slack-audit-p{page_idx:05d}.json" s3.put_object(Bucket=BUCKET, Key=key, Body=body, ContentType="application/json") return key def lambda_handler(event=None, context=None): state = _get_state() cursor = state.get("cursor") params = {"limit": LIMIT} if ACTIONS: params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()] if cursor: params["cursor"] = cursor else: # First run (or reset): fetch a recent window by time params["oldest"] = int(time.time()) - LOOKBACK_SEC pages = 0 total = 0 last_cursor = None while pages < MAX_PAGES: data = _http_get(params) # Write entries in JSONL format written_key = _write_page(data, pages) entries = data.get("entries") or [] total += len(entries) # Cursor for next page meta = data.get("response_metadata") or {} next_cursor = meta.get("next_cursor") or data.get("next_cursor") if next_cursor: params = {"limit": LIMIT, "cursor": next_cursor} if ACTIONS: params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()] last_cursor = next_cursor pages += 1 continue break if last_cursor: _put_state({"cursor": last_cursor}) return {"ok": True, "pages": pages + (1 if total or last_cursor else 0), "entries": total, "cursor": last_cursor} if __name__ == "__main__": print(lambda_handler())依次前往配置 > 环境变量 > 修改 > 添加环境变量。
输入以下环境变量,并替换为您的值。
环境变量
键 示例值 S3_BUCKETslack-audit-logsS3_PREFIXslack/audit/STATE_KEYslack/audit/state.jsonSLACK_AUDIT_TOKENxoxp-***(具有auditlogs:read的组织级用户令牌)LIMIT200MAX_PAGES20LOOKBACK_SECONDS3600HTTP_TIMEOUT60HTTP_RETRIES3RETRY_AFTER_DEFAULT2ACTIONS(可选,CSV) user_login,app_installed点击保存。
选择配置标签页。
在常规配置面板中,点击修改。
将超时更改为 5 分钟(300 秒),然后点击保存。
创建 EventBridge 计划
- 依次前往 Amazon EventBridge > 调度器 > 创建调度。
- 提供以下配置详细信息:
- 名称:输入
slack-audit-1h。 - 周期性安排:选择基于费率的安排。
- 费率表达式:输入
1小时。 - 灵活的时间窗口:选择关闭。
- 名称:输入
- 点击下一步。
- 选择目标:
- 目标 API:选择 AWS Lambda 调用。
- Lambda 函数:选择
slack_audit_to_s3。
- 点击下一步。
- 点击下一步(跳过可选设置)。
- 检查并点击创建时间表。
(可选)为 Google SecOps 创建只读 IAM 用户和密钥
- 依次前往 AWS 控制台 > IAM > 用户 > 创建用户。
- 提供以下配置详细信息:
- 用户名:输入
secops-reader。 - 访问类型:选择以编程方式访问。
- 用户名:输入
- 点击下一步。
- 选择直接附加政策。
- 点击创建政策。
在 JSON 标签页中,粘贴以下内容:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::slack-audit-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::slack-audit-logs" } ] }点击下一步。
输入政策名称
secops-reader-policy。点击创建政策。
返回到用户创建页面,刷新政策列表,然后选择
secops-reader-policy。点击下一步。
点击创建用户。
选择已创建的用户
secops-reader。依次前往安全凭据 > 访问密钥 > 创建访问密钥。
选择第三方服务。
点击下一步。
点击创建访问密钥。
点击下载 .csv 文件以保存凭据。
在 Google SecOps 中配置 Feed 以注入 Slack 审核日志
- 依次前往 SIEM 设置 > Feed。
- 点击新增。
- 在Feed 名称字段中,输入 Feed 的名称(例如
Slack Audit Logs)。 - 选择 Amazon S3 V2 作为来源类型。
- 选择 Slack 审核作为日志类型。
- 点击下一步。
- 为以下输入参数指定值:
- S3 URI:
s3://slack-audit-logs/slack/audit/ - 来源删除选项:根据您的偏好选择删除选项。
- 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
- 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥(来自
secops-reader)。 - 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥(来自
secops-reader)。 - 资产命名空间:资产命名空间。
- 注入标签:应用于此 Feed 中事件的标签。
- S3 URI:
- 点击下一步。
- 在最终确定界面中查看新的 Feed 配置,然后点击提交。
UDM 映射表
| 日志字段 | UDM 映射 | 逻辑 |
|---|---|---|
action |
metadata.product_event_type |
直接从原始日志中的 action 字段映射。 |
actor.type |
principal.labels.value |
直接从 actor.type 字段映射,并添加了键 actor.type。 |
actor.user.email |
principal.user.email_addresses |
直接从 actor.user.email 字段映射。 |
actor.user.id |
principal.user.product_object_id |
直接从 actor.user.id 字段映射。 |
actor.user.id |
principal.user.userid |
直接从 actor.user.id 字段映射。 |
actor.user.name |
principal.user.user_display_name |
直接从 actor.user.name 字段映射。 |
actor.user.team |
principal.user.group_identifiers |
直接从 actor.user.team 字段映射。 |
context.ip_address |
principal.ip |
直接从 context.ip_address 字段映射。 |
context.location.domain |
about.resource.attribute.labels.value |
直接从 context.location.domain 字段映射,并添加了键 context.location.domain。 |
context.location.id |
about.resource.id |
直接从 context.location.id 字段映射。 |
context.location.name |
about.resource.name |
直接从 context.location.name 字段映射。 |
context.location.name |
about.resource.attribute.labels.value |
直接从 context.location.name 字段映射,并添加了键 context.location.name。 |
context.location.type |
about.resource.resource_subtype |
直接从 context.location.type 字段映射。 |
context.session_id |
network.session_id |
直接从 context.session_id 字段映射。 |
context.ua |
network.http.user_agent |
直接从 context.ua 字段映射。 |
context.ua |
network.http.parsed_user_agent |
使用 parseduseragent 过滤条件从 context.ua 字段派生的已解析的用户代理信息。 |
country |
principal.location.country_or_region |
直接从 country 字段映射。 |
date_create |
metadata.event_timestamp.seconds |
date_create 字段中的纪元时间戳会转换为时间戳对象。 |
details.inviter.email |
target.user.email_addresses |
直接从 details.inviter.email 字段映射。 |
details.inviter.id |
target.user.product_object_id |
直接从 details.inviter.id 字段映射。 |
details.inviter.name |
target.user.user_display_name |
直接从 details.inviter.name 字段映射。 |
details.inviter.team |
target.user.group_identifiers |
直接从 details.inviter.team 字段映射。 |
details.reason |
security_result.description |
直接从 details.reason 字段映射,如果是数组,则用英文逗号连接。 |
details.type |
about.resource.attribute.labels.value |
直接从 details.type 字段映射,并添加了键 details.type。 |
details.type |
security_result.summary |
直接从 details.type 字段映射。 |
entity.app.id |
target.resource.id |
直接从 entity.app.id 字段映射。 |
entity.app.name |
target.resource.name |
直接从 entity.app.name 字段映射。 |
entity.channel.id |
target.resource.id |
直接从 entity.channel.id 字段映射。 |
entity.channel.name |
target.resource.name |
直接从 entity.channel.name 字段映射。 |
entity.channel.privacy |
target.resource.attribute.labels.value |
直接从 entity.channel.privacy 字段映射,并添加了键 entity.channel.privacy。 |
entity.file.filetype |
target.resource.attribute.labels.value |
直接从 entity.file.filetype 字段映射,并添加了键 entity.file.filetype。 |
entity.file.id |
target.resource.id |
直接从 entity.file.id 字段映射。 |
entity.file.name |
target.resource.name |
直接从 entity.file.name 字段映射。 |
entity.file.title |
target.resource.attribute.labels.value |
直接从 entity.file.title 字段映射,并添加了键 entity.file.title。 |
entity.huddle.date_end |
about.resource.attribute.labels.value |
直接从 entity.huddle.date_end 字段映射,并添加了键 entity.huddle.date_end。 |
entity.huddle.date_start |
about.resource.attribute.labels.value |
直接从 entity.huddle.date_start 字段映射,并添加了键 entity.huddle.date_start。 |
entity.huddle.id |
about.resource.attribute.labels.value |
直接从 entity.huddle.id 字段映射,并添加了键 entity.huddle.id。 |
entity.huddle.participants.0 |
about.resource.attribute.labels.value |
直接从 entity.huddle.participants.0 字段映射,并添加了键 entity.huddle.participants.0。 |
entity.huddle.participants.1 |
about.resource.attribute.labels.value |
直接从 entity.huddle.participants.1 字段映射,并添加了键 entity.huddle.participants.1。 |
entity.type |
target.resource.resource_subtype |
直接从 entity.type 字段映射。 |
entity.user.email |
target.user.email_addresses |
直接从 entity.user.email 字段映射。 |
entity.user.id |
target.user.product_object_id |
直接从 entity.user.id 字段映射。 |
entity.user.name |
target.user.user_display_name |
直接从 entity.user.name 字段映射。 |
entity.user.team |
target.user.group_identifiers |
直接从 entity.user.team 字段映射。 |
entity.workflow.id |
target.resource.id |
直接从 entity.workflow.id 字段映射。 |
entity.workflow.name |
target.resource.name |
直接从 entity.workflow.name 字段映射。 |
id |
metadata.product_log_id |
直接从 id 字段映射。 |
ip |
principal.ip |
直接从 ip 字段映射。由基于 action 字段的逻辑确定。默认值为 USER_COMMUNICATION,但会根据 action 的值更改为其他值,例如 USER_CREATION、USER_LOGIN、USER_LOGOUT、USER_RESOURCE_ACCESS、USER_RESOURCE_UPDATE_PERMISSIONS 或 USER_CHANGE_PERMISSIONS。硬编码为“SLACK_AUDIT”。如果存在 date_create,则设置为“Enterprise Grid”;否则,如果存在 user_id,则设置为“Audit Logs”。硬编码为“Slack”。硬编码为“REMOTE”。如果 action 包含“user_login”或“user_logout”,则设置为“SSO”。否则,请设置为“MACHINE”。在提供的示例中未映射。默认为“ALLOW”,但如果 action 为“user_login_failed”,则设置为“BLOCK”。如果 date_create 存在,则设置为“Slack”;否则,如果 user_id 存在,则设置为“SLACK”。 |
user_agent |
network.http.user_agent |
直接从 user_agent 字段映射。 |
user_id |
principal.user.product_object_id |
直接从 user_id 字段映射。 |
username |
principal.user.product_object_id |
直接从 username 字段映射。 |
需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。