收集 Slack 审核日志

支持的平台:

本指南介绍了如何使用 Google Cloud Run Functions 或 Amazon S3 与 AWS Lambda 将 Slack 审核日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • Slack Enterprise Grid 方案,并拥有组织所有者管理员访问权限。
  • 对以下任一内容的特权访问权限:
    • Google Cloud(对于选项 1:Cloud Run Functions 和 Cloud Scheduler),或
    • AWS(对于选项 2:S3、IAM、Lambda、EventBridge)。

收集 Slack 审核日志的前提条件(应用 ID、OAuth 令牌、组织 ID)

Slack 审核日志 API 需要具有 auditlogs:read 范围的用户 OAuth 令牌。此令牌必须通过在企业网格组织级层(而非工作区级层)安装应用来获取。

为审核日志创建 Slack 应用

  1. 使用 Enterprise Grid 组织所有者管理员账号登录 Slack 管理控制台
  2. 前往 https://api.slack.com/apps,然后依次点击创建新应用 > 从头开始
  3. 提供以下配置详细信息:
    • 应用名称:输入一个描述性名称(例如 Google SecOps Audit Integration)。
    • 选择要在哪个工作区中开发应用:选择您的开发 Slack 工作区(组织中的任何工作区)。
  4. 点击创建应用

配置 OAuth 范围

  1. 在左侧边栏中,前往 OAuth 和权限
  2. 向下滚动到范围部分。
  3. 用户令牌范围(而非“聊天机器人令牌范围”)下,点击添加 OAuth 范围
  4. 添加范围:auditlogs:read

启用公开分发

  1. 前往左侧边栏中的 Manage Distribution
  2. Share Your App with Other Workspaces 下,确保所有四个部分都带有绿色对勾标记:
    • 移除硬编码信息
    • 启用公开分发
    • 设置重定向网址
    • 添加 OAuth 范围
  3. 点击启用公开分发

将应用安装到 Enterprise Grid 组织

  1. 在左侧边栏中,前往 OAuth 和权限
  2. 点击安装到组织(而非“安装到工作区”)。

重要提示:请检查安装界面右上角的下拉菜单,确认您要安装到企业组织,而不是个人工作区。

  1. 查看所请求的权限,然后点击允许
  2. 授权完成后,系统会将您重定向回“OAuth 和权限”页面。

检索凭据

  1. Workspace 的 OAuth 令牌下,找到用户 OAuth 令牌
  2. 复制并安全地保存以 xoxp- 开头的令牌(例如 xoxp-1234567890-0987654321-1234567890-abc123def456)。

重要提示:这是 Lambda 函数或 Cloud Run 函数的 SLACK_ADMIN_TOKEN。请妥善保管。

  1. 记下您的组织 ID
    • 前往 Slack 管理控制台
    • 依次前往设置和权限 > 组织设置
    • 复制组织 ID

方案 1:使用 Google Cloud Run Functions 配置 Slack 审核日志导出

此选项使用 Google Cloud Run Functions 和 Cloud Scheduler 收集 Slack 审核日志,并将其直接注入到 Google SecOps 中。

设置目录

  1. 在本地机器上创建一个新目录,用于部署 Cloud Run 函数。
  2. Chronicle 提取脚本 GitHub 代码库下载以下文件:
    • slack 文件夹中下载:
      • .env.yml
      • main.py
      • requirements.txt
    • 从代码库的目录下载整个通用目录及其所有文件:
      • common/__init__.py
      • common/auth.py
      • common/env_constants.py
      • common/ingest.py
      • common/status.py
      • common/utils.py
  3. 将所有下载的文件放入部署目录。

您的目录结构应如下所示:

deployment_directory/
├─common/
 ├─__init__.py
 ├─auth.py
 ├─env_constants.py
 ├─ingest.py
 ├─status.py
 └─utils.py
├─.env.yml
├─main.py
└─requirements.txt

在 Google Secret Manager 中创建 Secret

  1. Google Cloud 控制台中,前往安全性 > Secret Manager
  2. 点击创建 Secret
  3. Chronicle 服务账号提供以下配置详细信息:
    • 名称:输入 chronicle-service-account
    • Secret 值:粘贴您的 Google SecOps 提取身份验证 JSON 文件的内容。
  4. 点击创建密钥
  5. 复制 Secret 资源名称,格式为:projects/<PROJECT_ID>/secrets/chronicle-service-account/versions/latest
  6. 再次点击创建 Secret 以创建第二个 Secret。
  7. Slack 令牌提供以下配置详细信息:
    • 名称:输入 slack-admin-token
    • 密钥值:粘贴您的 Slack 用户 OAuth 令牌(以 xoxp- 开头)。
  8. 点击创建密钥
  9. 复制 Secret 资源名称,格式为:projects/<PROJECT_ID>/secrets/slack-admin-token/versions/latest

设置所需的运行时环境变量

  1. 打开部署目录中的 .env.yml 文件。
  2. 使用您的值配置环境变量:
CHRONICLE_CUSTOMER_ID: "<your-chronicle-customer-id>"
CHRONICLE_REGION: us
CHRONICLE_SERVICE_ACCOUNT: "projects/<PROJECT_ID>/secrets/chronicle-service-account/versions/latest"
CHRONICLE_NAMESPACE: ""
POLL_INTERVAL: "5"
SLACK_ADMIN_TOKEN: "projects/<PROJECT_ID>/secrets/slack-admin-token/versions/latest"

替换以下内容:

  • <your-chronicle-customer-id>:您的 Google SecOps 客户 ID。
  • <PROJECT_ID>:您的 Google Cloud 项目 ID。
  • CHRONICLE_REGION:设置为您的 Google SecOps 区域。有效值:usasia-northeast1asia-south1asia-southeast1australia-southeast1europeeurope-west2europe-west3europe-west6europe-west9europe-west12me-central1me-central2me-west1northamerica-northeast2southamerica-east1
  • POLL_INTERVAL:函数执行的频率间隔(以分钟为单位)。此时长必须与 Cloud Scheduler 作业间隔相同。
  1. 保存 .env.yml 文件。

部署 Cloud Run 函数

  1. 在 Google Cloud Console 中打开终端或 Cloud Shell
  2. 前往部署目录:
cd /path/to/deployment_directory
  1. 执行以下命令以部署 Cloud Run 函数:
gcloud functions deploy slack-audit-to-chronicle \
  --entry-point main \
  --trigger-http \
  --runtime python39 \
  --env-vars-file .env.yml \
  --timeout 300s \
  --memory 512MB \
  --service-account <SERVICE_ACCOUNT_EMAIL>

<SERVICE_ACCOUNT_EMAIL> 替换为您希望 Cloud Run 函数使用的服务账号的电子邮件地址。

  1. 等待部署完成。
  2. 部署完成后,请记下输出中的函数网址

设置 Cloud Scheduler

  1. Google Cloud 控制台中,依次前往 Cloud Scheduler > 创建作业
  2. 提供以下配置详细信息:
    • 名称:输入 slack-audit-scheduler
    • 区域:选择与您部署 Cloud Run 函数时所选的区域相同的区域。
    • 频率:输入 */5 * * * *(每 5 分钟运行一次,与 POLL_INTERVAL 值一致)。
    • 时区:选择 UTC
    • 目标类型:选择 HTTP
    • 网址:输入部署输出中的 Cloud Run 函数网址。
    • HTTP 方法:选择 POST
    • 身份验证标头:选择添加 OIDC 令牌
    • 服务账号:选择用于 Cloud Run 函数的同一服务账号。
  3. 点击创建

选项 2:使用 AWS S3 配置 Slack 审核日志导出

此选项使用 AWS Lambda 收集 Slack 审核日志并将其存储在 S3 中,然后配置 Google SecOps Feed 以注入日志。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 slack-audit-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 .csv 文件以保存访问密钥密钥,供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. 在 AWS 控制台中,依次前往 IAM > 政策 > 创建政策 > JSON 标签页
  2. 复制并粘贴下方的政策。
  3. 政策 JSON(如果您输入了其他存储桶名称,请替换 slack-audit-logs):
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowPutObjects",
      "Effect": "Allow",
      "Action": "s3:PutObject",
      "Resource": "arn:aws:s3:::slack-audit-logs/*"
    },
    {
      "Sid": "AllowGetStateObject",
      "Effect": "Allow",
      "Action": "s3:GetObject",
      "Resource": "arn:aws:s3:::slack-audit-logs/slack/audit/state.json"
    }
  ]
}
  1. 点击下一步
  2. 输入政策名称 SlackAuditS3Policy
  3. 点击创建政策
  4. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda
  5. 附加新创建的政策 SlackAuditS3Policy
  6. 将角色命名为 SlackAuditToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:
设置
名称 slack_audit_to_s3
运行时 Python 3.13
架构 x86_64
执行角色 SlackAuditToS3Role
  1. 点击创建函数
  2. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (slack_audit_to_s3.py)。
#!/usr/bin/env python3
# Lambda: Pull Slack Audit Logs (Enterprise Grid) to S3 (JSONL format)

import os, json, time, urllib.parse
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
import boto3

BASE_URL = "https://api.slack.com/audit/v1/logs"

TOKEN        = os.environ["SLACK_AUDIT_TOKEN"]  # org-level user token with auditlogs:read
BUCKET       = os.environ["S3_BUCKET"]
PREFIX       = os.environ.get("S3_PREFIX", "slack/audit/")
STATE_KEY    = os.environ.get("STATE_KEY", "slack/audit/state.json")
LIMIT        = int(os.environ.get("LIMIT", "200"))             # Slack recommends <= 200
MAX_PAGES    = int(os.environ.get("MAX_PAGES", "20"))
LOOKBACK_SEC = int(os.environ.get("LOOKBACK_SECONDS", "3600")) # First-run window
HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "60"))
HTTP_RETRIES = int(os.environ.get("HTTP_RETRIES", "3"))
RETRY_AFTER_DEFAULT = int(os.environ.get("RETRY_AFTER_DEFAULT", "2"))
# Optional server-side filters (comma-separated 'action' values), empty means no filter
ACTIONS      = os.environ.get("ACTIONS", "").strip()

s3 = boto3.client("s3")


def _get_state() -> dict:
    try:
        obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
        st = json.loads(obj["Body"].read() or b"{}")
        return {"cursor": st.get("cursor")}
    except Exception:
        return {"cursor": None}


def _put_state(state: dict) -> None:
    body = json.dumps(state, separators=(",", ":")).encode("utf-8")
    s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=body, ContentType="application/json")


def _http_get(params: dict) -> dict:
    qs  = urllib.parse.urlencode(params, doseq=True)
    url = f"{BASE_URL}?{qs}" if qs else BASE_URL
    req = Request(url, method="GET")
    req.add_header("Authorization", f"Bearer {TOKEN}")
    req.add_header("Accept", "application/json")

    attempt = 0
    while True:
        try:
            with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Respect Retry-After on 429/5xx
            if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                retry_after = 0
                try:
                    retry_after = int(e.headers.get("Retry-After", RETRY_AFTER_DEFAULT))
                except Exception:
                    retry_after = RETRY_AFTER_DEFAULT
                time.sleep(max(1, retry_after))
                attempt += 1
                continue
            # Re-raise other HTTP errors
            raise
        except URLError:
            if attempt < HTTP_RETRIES:
                time.sleep(RETRY_AFTER_DEFAULT)
                attempt += 1
                continue
            raise


def _write_page(data: dict, page_idx: int) -> str:
    """
    Extract entries from Slack API response and write as JSONL (one event per line).
    Chronicle requires newline-delimited JSON, not a JSON array.
    """
    entries = data.get("entries") or []
    
    if not entries:
        # No entries to write, skip file creation
        return None
    
    # Convert each entry to a single-line JSON string
    lines = [json.dumps(entry, separators=(",", ":")) for entry in entries]
    
    # Join with newlines to create JSONL format
    body = "\n".join(lines).encode("utf-8")
    
    # Write to S3
    ts  = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
    key = f"{PREFIX}{ts}-slack-audit-p{page_idx:05d}.json"
    s3.put_object(Bucket=BUCKET, Key=key, Body=body, ContentType="application/json")
    
    return key


def lambda_handler(event=None, context=None):
    state  = _get_state()
    cursor = state.get("cursor")

    params = {"limit": LIMIT}
    if ACTIONS:
        params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()]
    if cursor:
        params["cursor"] = cursor
    else:
        # First run (or reset): fetch a recent window by time
        params["oldest"] = int(time.time()) - LOOKBACK_SEC

    pages = 0
    total = 0
    last_cursor = None

    while pages < MAX_PAGES:
        data = _http_get(params)
        
        # Write entries in JSONL format
        written_key = _write_page(data, pages)

        entries = data.get("entries") or []
        total += len(entries)

        # Cursor for next page
        meta = data.get("response_metadata") or {}
        next_cursor = meta.get("next_cursor") or data.get("next_cursor")
        if next_cursor:
            params = {"limit": LIMIT, "cursor": next_cursor}
            if ACTIONS:
                params["action"] = [a.strip() for a in ACTIONS.split(",") if a.strip()]
            last_cursor = next_cursor
            pages += 1
            continue
        break

    if last_cursor:
        _put_state({"cursor": last_cursor})

    return {"ok": True, "pages": pages + (1 if total or last_cursor else 0), "entries": total, "cursor": last_cursor}


if __name__ == "__main__":
    print(lambda_handler())

  1. 依次前往配置 > 环境变量 > 修改 > 添加环境变量
  2. 输入以下环境变量,并将其替换为您的值。

环境变量

示例值
S3_BUCKET slack-audit-logs
S3_PREFIX slack/audit/
STATE_KEY slack/audit/state.json
SLACK_AUDIT_TOKEN xoxp-***(具有 auditlogs:read 的组织级用户令牌)
LIMIT 200
MAX_PAGES 20
LOOKBACK_SECONDS 3600
HTTP_TIMEOUT 60
HTTP_RETRIES 3
RETRY_AFTER_DEFAULT 2
ACTIONS (可选,CSV) user_login,app_installed
  1. 点击保存
  2. 选择配置标签页。
  3. 常规配置面板中,点击修改
  4. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 名称:输入 slack-audit-1h
    • 周期性安排:选择基于费率的安排
    • 费率表达式:输入 1 小时。
    • 灵活的时间窗口:选择关闭
  3. 点击下一步
  4. 选择目标
    • 目标 API:选择 AWS Lambda 调用
    • Lambda 函数:选择 slack_audit_to_s3
  5. 点击下一步
  6. 点击下一步(跳过可选设置)。
  7. 检查并点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. 依次前往 AWS 控制台 > IAM > 用户 > 创建用户
  2. 提供以下配置详细信息:
    • 用户名:输入 secops-reader
    • 访问类型:选择以编程方式访问
  3. 点击下一步
  4. 选择直接附加政策
  5. 点击创建政策
  6. JSON 标签页中,粘贴以下内容:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject"],
      "Resource": "arn:aws:s3:::slack-audit-logs/*"
    },
    {
      "Effect": "Allow",
      "Action": ["s3:ListBucket"],
      "Resource": "arn:aws:s3:::slack-audit-logs"
    }
  ]
}
  1. 点击下一步
  2. 输入政策名称 secops-reader-policy
  3. 点击创建政策
  4. 返回到用户创建页面,刷新政策列表,然后选择 secops-reader-policy
  5. 点击下一步
  6. 点击创建用户
  7. 选择已创建的用户 secops-reader
  8. 依次前往安全凭据 > 访问密钥 > 创建访问密钥
  9. 选择第三方服务
  10. 点击下一步
  11. 点击创建访问密钥
  12. 点击下载 .csv 文件以保存凭据。

在 Google SecOps 中配置 Feed 以注入 Slack 审核日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击新增
  3. Feed 名称字段中,输入 Feed 的名称(例如 Slack Audit Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Slack 审核作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://slack-audit-logs/slack/audit/
    • 源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥(来自 secops-reader)。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥(来自 secops-reader)。
    • 资产命名空间资产命名空间
    • 注入标签:应用于此 Feed 中事件的标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

UDM 映射表

日志字段 UDM 映射 逻辑
action metadata.product_event_type 直接从原始日志中的 action 字段映射。
actor.type principal.labels.value 直接从 actor.type 字段映射,并添加了键 actor.type
actor.user.email principal.user.email_addresses 直接从 actor.user.email 字段映射。
actor.user.id principal.user.product_object_id 直接从 actor.user.id 字段映射。
actor.user.id principal.user.userid 直接从 actor.user.id 字段映射。
actor.user.name principal.user.user_display_name 直接从 actor.user.name 字段映射。
actor.user.team principal.user.group_identifiers 直接从 actor.user.team 字段映射。
context.ip_address principal.ip 直接从 context.ip_address 字段映射。
context.location.domain about.resource.attribute.labels.value 直接从 context.location.domain 字段映射,并添加了键 context.location.domain
context.location.id about.resource.id 直接从 context.location.id 字段映射。
context.location.name about.resource.name 直接从 context.location.name 字段映射。
context.location.name about.resource.attribute.labels.value 直接从 context.location.name 字段映射,并添加了键 context.location.name
context.location.type about.resource.resource_subtype 直接从 context.location.type 字段映射。
context.session_id network.session_id 直接从 context.session_id 字段映射。
context.ua network.http.user_agent 直接从 context.ua 字段映射。
context.ua network.http.parsed_user_agent 使用 parseduseragent 过滤条件从 context.ua 字段派生的已解析的用户代理信息。
country principal.location.country_or_region 直接从 country 字段映射。
date_create metadata.event_timestamp.seconds date_create 字段中的纪元时间戳会转换为时间戳对象。
details.inviter.email target.user.email_addresses 直接从 details.inviter.email 字段映射。
details.inviter.id target.user.product_object_id 直接从 details.inviter.id 字段映射。
details.inviter.name target.user.user_display_name 直接从 details.inviter.name 字段映射。
details.inviter.team target.user.group_identifiers 直接从 details.inviter.team 字段映射。
details.reason security_result.description 直接从 details.reason 字段映射,如果是数组,则用英文逗号连接。
details.type about.resource.attribute.labels.value 直接从 details.type 字段映射,并添加了键 details.type
details.type security_result.summary 直接从 details.type 字段映射。
entity.app.id target.resource.id 直接从 entity.app.id 字段映射。
entity.app.name target.resource.name 直接从 entity.app.name 字段映射。
entity.channel.id target.resource.id 直接从 entity.channel.id 字段映射。
entity.channel.name target.resource.name 直接从 entity.channel.name 字段映射。
entity.channel.privacy target.resource.attribute.labels.value 直接从 entity.channel.privacy 字段映射,并添加了键 entity.channel.privacy
entity.file.filetype target.resource.attribute.labels.value 直接从 entity.file.filetype 字段映射,并添加了键 entity.file.filetype
entity.file.id target.resource.id 直接从 entity.file.id 字段映射。
entity.file.name target.resource.name 直接从 entity.file.name 字段映射。
entity.file.title target.resource.attribute.labels.value 直接从 entity.file.title 字段映射,并添加了键 entity.file.title
entity.huddle.date_end about.resource.attribute.labels.value 直接从 entity.huddle.date_end 字段映射,并添加了键 entity.huddle.date_end
entity.huddle.date_start about.resource.attribute.labels.value 直接从 entity.huddle.date_start 字段映射,并添加了键 entity.huddle.date_start
entity.huddle.id about.resource.attribute.labels.value 直接从 entity.huddle.id 字段映射,并添加了键 entity.huddle.id
entity.huddle.participants.0 about.resource.attribute.labels.value 直接从 entity.huddle.participants.0 字段映射,并添加了键 entity.huddle.participants.0
entity.huddle.participants.1 about.resource.attribute.labels.value 直接从 entity.huddle.participants.1 字段映射,并添加了键 entity.huddle.participants.1
entity.type target.resource.resource_subtype 直接从 entity.type 字段映射。
entity.user.email target.user.email_addresses 直接从 entity.user.email 字段映射。
entity.user.id target.user.product_object_id 直接从 entity.user.id 字段映射。
entity.user.name target.user.user_display_name 直接从 entity.user.name 字段映射。
entity.user.team target.user.group_identifiers 直接从 entity.user.team 字段映射。
entity.workflow.id target.resource.id 直接从 entity.workflow.id 字段映射。
entity.workflow.name target.resource.name 直接从 entity.workflow.name 字段映射。
id metadata.product_log_id 直接从 id 字段映射。
ip principal.ip 直接从 ip 字段映射。由基于 action 字段的逻辑确定。默认值为 USER_COMMUNICATION,但会根据 action 的值更改为其他值,例如 USER_CREATIONUSER_LOGINUSER_LOGOUTUSER_RESOURCE_ACCESSUSER_RESOURCE_UPDATE_PERMISSIONSUSER_CHANGE_PERMISSIONS。硬编码为“SLACK_AUDIT”。如果存在 date_create,则设置为“Enterprise Grid”;否则,如果存在 user_id,则设置为“Audit Logs”。硬编码为“Slack”。硬编码为“REMOTE”。如果 action 包含“user_login”或“user_logout”,则设置为“SSO”。否则,设置为“MACHINE”。在提供的示例中未映射。默认为“ALLOW”,但如果 action 为“user_login_failed”,则设置为“BLOCK”。如果 date_create 存在,则设置为“Slack”;否则,如果 user_id 存在,则设置为“SLACK”。
user_agent network.http.user_agent 直接从 user_agent 字段映射。
user_id principal.user.product_object_id 直接从 user_id 字段映射。
username principal.user.product_object_id 直接从 username 字段映射。

需要更多帮助?获得社区成员和 Google SecOps 专业人士的解答。