收集 Rippling 活动日志

支持的平台:

本文档介绍了如何使用 Amazon S3 将 Rippling 活动日志注入到 Google Security Operations。

准备工作

请确保满足以下前提条件:

  • Google SecOps 实例。
  • Rippling 的特权访问权限(具有对公司活动的访问权限的 API 令牌)。
  • AWS(S3、Identity and Access Management (IAM)、Lambda、EventBridge)的特权访问权限。

获取 Rippling 前提条件

  1. 登录 Rippling 管理员
  2. 依次打开搜索 > API 令牌
    替代路径:设置 > 公司设置 > API 令牌
  3. 点击创建 API 令牌
  4. 提供以下配置详细信息:
    • 名称:提供一个唯一且有意义的名称(例如 Google SecOps S3 Export
    • API 版本:基本 API (v1)
    • 范围/权限:启用 company:activity:read公司活动必需)。
  5. 点击创建,并将令牌值保存在安全位置。(您将使用它作为不记名令牌)。

为 Google SecOps 配置 AWS S3 存储桶和 IAM

  1. 按照以下用户指南创建 Amazon S3 存储桶创建存储桶
  2. 保存存储桶名称区域以供日后参考(例如 rippling-activity-logs)。
  3. 按照以下用户指南创建用户创建 IAM 用户
  4. 选择创建的用户
  5. 选择安全凭据标签页。
  6. 访问密钥部分中,点击创建访问密钥
  7. 选择第三方服务作为使用情形
  8. 点击下一步
  9. 可选:添加说明标记。
  10. 点击创建访问密钥
  11. 点击下载 CSV 文件,保存访问密钥秘密访问密钥,以供日后参考。
  12. 点击完成
  13. 选择权限标签页。
  14. 权限政策部分中,点击添加权限
  15. 选择添加权限
  16. 选择直接附加政策
  17. 搜索 AmazonS3FullAccess 政策。
  18. 选择相应政策。
  19. 点击下一步
  20. 点击添加权限

为 S3 上传配置 IAM 政策和角色

  1. AWS 控制台中,依次前往 IAM > 政策
  2. 依次点击创建政策 > JSON 标签页
  3. 复制并粘贴以下政策。
  4. 政策 JSON(如果您输入了其他存储桶或前缀,请替换相应的值):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::rippling-activity-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::rippling-activity-logs/rippling/activity/state.json"
        }
      ]
    }
    ````
    
  5. 依次点击下一步 > 创建政策

  6. 依次前往 IAM > 角色 > 创建角色 > AWS 服务 > Lambda

  7. 附加新创建的政策。

  8. 将角色命名为 WriteRipplingToS3Role,然后点击创建角色

创建 Lambda 函数

  1. AWS 控制台中,依次前往 Lambda > 函数 > 创建函数
  2. 点击从头开始创作
  3. 提供以下配置详细信息:

    设置
    名称 rippling_activity_to_s3
    运行时 Python 3.13
    架构 x86_64
    执行角色 WriteRipplingToS3Role
  4. 创建函数后,打开 Code 标签页,删除桩代码并粘贴以下代码 (rippling_activity_to_s3.py)。

    #!/usr/bin/env python3
    # Lambda: Pull Rippling Company Activity logs to S3 (raw JSON, no transforms)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from datetime import datetime, timezone, timedelta
    import boto3
    
    API_TOKEN = os.environ["RIPPLING_API_TOKEN"]
    ACTIVITY_URL = os.environ.get("RIPPLING_ACTIVITY_URL", "https://api.rippling.com/platform/api/company_activity")
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "rippling/activity/")
    STATE_KEY = os.environ.get("STATE_KEY", "rippling/activity/state.json")
    
    LIMIT = int(os.environ.get("LIMIT", "1000"))
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "10"))
    LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60"))
    END_LAG_SECONDS = int(os.environ.get("END_LAG_SECONDS", "120"))
    
    s3 = boto3.client("s3")
    
    def _headers():
        return {"Authorization": f"Bearer {API_TOKEN}", "Accept": "application/json"}
    
    def _get_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            j = json.loads(obj["Body"].read())
            return {"since": j.get("since"), "next": j.get("next")}
        except Exception:
            return {"since": None, "next": None}
    
    def _put_state(since_iso, next_cursor):
        body = json.dumps({"since": since_iso, "next": next_cursor}, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body)
    
    def _get(url):
        req = Request(url, method="GET")
        for k, v in _headers().items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return json.loads(r.read().decode("utf-8"))
    
    def _build_url(base, params):
        qs = urllib.parse.urlencode(params)
        return f"{base}?{qs}" if qs else base
    
    def _parse_iso(ts):
        if ts.endswith("Z"):
            ts = ts[:-1] + "+00:00"
        return datetime.fromisoformat(ts)
    
    def _iso_from_epoch(sec):
        return datetime.fromtimestamp(sec, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
    
    def _write(payload, run_ts_iso, page_index, source="company_activity"):
        day_path = _parse_iso(run_ts_iso).strftime("%Y/%m/%d")
        key = f"{S3_PREFIX.strip('/')}/{day_path}/{run_ts_iso.replace(':','').replace('-','')}-page{page_index:05d}-{source}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"))
        return key
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        run_end = datetime.now(timezone.utc) - timedelta(seconds=END_LAG_SECONDS)
        end_iso = run_end.replace(microsecond=0).isoformat().replace("+00:00", "Z")
    
        since_iso = state["since"]
        next_cursor = state["next"]
    
        if since_iso is None:
            since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60)
        else:
            try:
                since_iso = (_parse_iso(since_iso) + timedelta(seconds=1)).replace(microsecond=0).isoformat().replace("+00:00", "Z")
            except Exception:
                since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60)
    
        run_ts_iso = end_iso
        pages = 0
        total = 0
        newest_ts = None
        pending_next = None
    
        while pages < MAX_PAGES:
            params = {"limit": str(LIMIT)}
            if next_cursor:
                params["next"] = next_cursor
            else:
                params["startDate"] = since_iso
                params["endDate"] = end_iso
    
            url = _build_url(ACTIVITY_URL, params)
            data = _get(url)
            _write(data, run_ts_iso, pages)
    
            events = data.get("events") or data.get("items") or data.get("data") or []
            total += len(events) if isinstance(events, list) else 0
    
            if isinstance(events, list):
                for ev in events:
                    t = ev.get("timestamp") or ev.get("time") or ev.get("event_time")
                    if isinstance(t, str):
                        try:
                            dt_ts = _parse_iso(t)
                            if newest_ts is None or dt_ts > newest_ts:
                                newest_ts = dt_ts
                        except Exception:
                            pass
    
            nxt = data.get("next") or data.get("next_cursor") or None
            pages += 1
    
            if nxt:
                next_cursor = nxt
                pending_next = nxt
                continue
            else:
                pending_next = None
                break
    
        new_since_iso = (newest_ts or run_end).replace(microsecond=0).isoformat().replace("+00:00", "Z")
        _put_state(new_since_iso, pending_next)
    
        return {"ok": True, "pages": pages, "events": total, "since": new_since_iso, "next": pending_next}
    
  5. 依次前往配置 > 环境变量

  6. 依次点击修改 > 添加新的环境变量

  7. 输入下表中提供的环境变量,并将示例值替换为您的值。

    环境变量

    示例值
    S3_BUCKET rippling-activity-logs
    S3_PREFIX rippling/activity/
    STATE_KEY rippling/activity/state.json
    RIPPLING_API_TOKEN your-api-token
    RIPPLING_ACTIVITY_URL https://api.rippling.com/platform/api/company_activity
    LIMIT 1000
    MAX_PAGES 10
    LOOKBACK_MINUTES 60
    END_LAG_SECONDS 120
  8. 创建函数后,请停留在其页面上(或依次打开 Lambda > 函数 > 您的函数)。

  9. 选择配置标签页。

  10. 常规配置面板中,点击修改

  11. 超时更改为 5 分钟(300 秒),然后点击保存

创建 EventBridge 计划

  1. 依次前往 Amazon EventBridge > 调度器 > 创建调度
  2. 提供以下配置详细信息:
    • 周期性安排费率 (1 hour)。
    • 目标:您的 Lambda 函数 rippling_activity_to_s3
    • 名称rippling-activity-logs-1h
  3. 点击创建时间表

(可选)为 Google SecOps 创建只读 IAM 用户和密钥

  1. AWS 控制台中,依次前往 IAM > 用户 > 添加用户
  2. 点击 Add users(添加用户)。
  3. 提供以下配置详细信息:
    • 用户:输入 secops-reader
    • 访问类型:选择访问密钥 - 以程序化方式访问
  4. 点击创建用户
  5. 附加最低限度的读取政策(自定义):依次选择用户 > secops-reader > 权限 > 添加权限 > 直接附加政策 > 创建政策
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::rippling-activity-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::rippling-activity-logs"
        }
      ]
    }
    
  7. 名称 = secops-reader-policy

  8. 依次点击创建政策 > 搜索/选择 > 下一步 > 添加权限

  9. secops-reader 创建访问密钥:安全凭据 > 访问密钥

  10. 点击创建访问密钥

  11. 下载 .CSV。(您需要将这些值粘贴到 Feed 中)。

在 Google SecOps 中配置 Feed 以注入 Rippling 活动日志

  1. 依次前往 SIEM 设置> Feed
  2. 点击 + 添加新 Feed
  3. Feed 名称字段中,输入 Feed 的名称(例如 Rippling Activity Logs)。
  4. 选择 Amazon S3 V2 作为来源类型
  5. 选择 Rippling 活动日志作为日志类型
  6. 点击下一步
  7. 为以下输入参数指定值:
    • S3 URIs3://rippling-activity-logs/rippling/activity/
    • 来源删除选项:根据您的偏好选择删除选项。
    • 文件存在时间上限:包含在过去指定天数内修改的文件。默认值为 180 天。
    • 访问密钥 ID:有权访问 S3 存储桶的用户访问密钥。
    • 私有访问密钥:具有 S3 存储桶访问权限的用户私有密钥。
    • 资源命名空间rippling.activity
    • 可选:提取标签:添加提取标签。
  8. 点击下一步
  9. 最终确定界面中查看新的 Feed 配置,然后点击提交

需要更多帮助?从社区成员和 Google SecOps 专业人士那里获得解答。