收集 Sentry 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Sentry 記錄檔擷取至 Google Security Operations。Sentry 會以事件、問題、效能監控資料和錯誤追蹤資訊的形式,產生作業資料。整合後,您就能將這些記錄傳送至 Google SecOps 進行分析和監控,進一步瞭解 Sentry 監控應用程式中的應用程式錯誤、效能問題和使用者互動。

事前準備

請確認您已完成下列事前準備事項:

  • Google SecOps 執行個體。
  • Sentry 租戶的特殊存取權 (具有 API 範圍的驗證權杖)。
  • AWS 的特殊權限 (S3、Identity and Access Management (IAM)、Lambda、EventBridge)。

收集 Sentry 必要條件 (ID、API 金鑰、機構 ID、權杖)

  1. 登入 Sentry。
  2. 找出機構代碼
    • 依序前往「設定」>「機構」>「設定」>「機構 ID」 (機構名稱旁邊會顯示 Slug)。
  3. 建立「驗證權杖」
    • 依序前往「設定」>「開發人員設定」>「個人權杖」
    • 按一下「建立新項目」
    • 範圍 (最低)org:readproject:readevent:read
    • 複製權杖值 (只會顯示一次)。這項資訊會做為:Authorization: Bearer <token>
  4. (如果是自行代管):請記下基準網址 (例如 https://<your-domain>);否則請使用 https://sentry.io

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 sentry-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 S3 上傳的身分與存取權管理政策和角色

  1. AWS 控制台中,依序前往「IAM」>「Policies」(政策)
  2. 按一下「建立政策」>「JSON」分頁
  3. 複製並貼上下列政策。
  4. 政策 JSON (如果您輸入的 bucket 名稱不同,請替換 sentry-logs):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sentry-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sentry-logs/sentry/events/state.json"
        }
      ]
    }
    
  5. 依序點選「下一步」>「建立政策」

  6. 依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。

  7. 附加新建立的政策。

  8. 為角色命名 WriteSentryToS3Role,然後按一下「建立角色」

建立 Lambda 函式

  1. AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」
  2. 按一下「從頭開始撰寫」
  3. 請提供下列設定詳細資料:

    設定
    名稱 sentry_to_s3
    執行階段 Python 3.13
    架構 x86_64
    執行角色 WriteSentryToS3Role
  4. 建立函式後,開啟「程式碼」分頁,刪除存根並貼上以下程式碼 (sentry_to_s3.py)。

    #!/usr/bin/env python3
    # Lambda: Pull Sentry project events (raw JSON) to S3 using Link "previous" cursor for duplicate-safe polling
    
    import os, json, time
    from urllib.request import Request, urlopen
    from urllib.parse import urlencode, urlparse, parse_qs
    import boto3
    
    ORG = os.environ["SENTRY_ORG"].strip()
    TOKEN = os.environ["SENTRY_AUTH_TOKEN"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "sentry/events/")
    STATE_KEY = os.environ.get("STATE_KEY", "sentry/events/state.json")
    BASE = os.environ.get("SENTRY_API_BASE", "https://sentry.io").rstrip("/")
    MAX_PROJECTS = int(os.environ.get("MAX_PROJECTS", "100"))
    MAX_PAGES_PER_PROJECT = int(os.environ.get("MAX_PAGES_PER_PROJECT", "5"))
    
    s3 = boto3.client("s3")
    HDRS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json", "User-Agent": "chronicle-s3-sentry-lambda/1.0"}
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            raw = obj["Body"].read()
            return json.loads(raw) if raw else {"projects": {}}
        except Exception:
            return {"projects": {}}
    
    def _put_state(state: dict):
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"))
    
    def _req(path: str, params: dict | None = None):
        url = f"{BASE}{path}"
        if params:
            url = f"{url}?{urlencode(params)}"
        req = Request(url, method="GET", headers=HDRS)
        with urlopen(req, timeout=60) as r:
            data = json.loads(r.read().decode("utf-8"))
            link = r.headers.get("Link")
            return data, link
    
    def _parse_link(link_header: str | None):
        """Return (prev_cursor, prev_has_more, next_cursor, next_has_more)."""
        if not link_header:
            return None, False, None, False
        prev_cursor, next_cursor = None, None
        prev_more, next_more = False, False
        parts = [p.strip() for p in link_header.split(",")]
        for p in parts:
            if "<" not in p or ">" not in p:
                continue
            url = p.split("<", 1)[1].split(">", 1)[0]
            rel = "previous" if 'rel="previous"' in p else ("next" if 'rel="next"' in p else None)
            has_more = 'results="true"' in p
            try:
                q = urlparse(url).query
                cur = parse_qs(q).get("cursor", [None])[0]
            except Exception:
                cur = None
            if rel == "previous":
                prev_cursor, prev_more = cur, has_more
            elif rel == "next":
                next_cursor, next_more = cur, has_more
        return prev_cursor, prev_more, next_cursor, next_more
    
    def _write_page(project_slug: str, payload: object, page_idx: int) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d', ts)}/sentry-{project_slug}-{page_idx:05d}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"))
        return key
    
    def list_projects(max_projects: int):
        projects, cursor = [], None
        while len(projects) < max_projects:
            params = {"cursor": cursor} if cursor else {}
            data, link = _req(f"/api/0/organizations/{ORG}/projects/", params)
            for p in data:
                slug = p.get("slug")
                if slug:
                    projects.append(slug)
                    if len(projects) >= max_projects:
                        break
            # advance pagination
            _, _, next_cursor, next_more = _parse_link(link)
            cursor = next_cursor if next_more else None
            if not next_more:
                break
        return projects
    
    def fetch_project_events(project_slug: str, start_prev_cursor: str | None):
        # If we have a stored "previous" cursor, poll forward (newer) until no more results.
        # If not (first run), fetch the latest page, then optionally follow "next" (older) for initial backfill up to the limit.
        pages = 0
        total = 0
        latest_prev_cursor_to_store = None
    
        def _one(cursor: str | None):
            nonlocal pages, total, latest_prev_cursor_to_store
            params = {"cursor": cursor} if cursor else {}
            data, link = _req(f"/api/0/projects/{ORG}/{project_slug}/events/", params)
            _write_page(project_slug, data, pages)
            total += len(data) if isinstance(data, list) else 0
            prev_c, prev_more, next_c, next_more = _parse_link(link)
            # capture the most recent "previous" cursor observed to store for the next run
            latest_prev_cursor_to_store = prev_c or latest_prev_cursor_to_store
            pages += 1
            return prev_c, prev_more, next_c, next_more
    
        if start_prev_cursor:
            # Poll new pages toward "previous" until no more
            cur = start_prev_cursor
            while pages < MAX_PAGES_PER_PROJECT:
                prev_c, prev_more, _, _ = _one(cur)
                if not prev_more:
                    break
                cur = prev_c
        else:
            # First run: start at newest, then (optionally) backfill a few older pages
            prev_c, _, next_c, next_more = _one(None)
            cur = next_c
            while next_more and pages < MAX_PAGES_PER_PROJECT:
                _, _, next_c, next_more = _one(cur)
                cur = next_c
    
        return {"project": project_slug, "pages": pages, "written": total, "store_prev_cursor": latest_prev_cursor_to_store}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        state.setdefault("projects", {})
    
        projects = list_projects(MAX_PROJECTS)
        summary = []
        for slug in projects:
            start_prev = state["projects"].get(slug, {}).get("prev_cursor")
            res = fetch_project_events(slug, start_prev)
            if res.get("store_prev_cursor"):
                state["projects"][slug] = {"prev_cursor": res["store_prev_cursor"]}
            summary.append(res)
    
        _put_state(state)
        return {"ok": True, "projects": len(projects), "summary": summary}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 依序前往「設定」>「環境變數」

  6. 依序點選「編輯」> 新增環境變數

  7. 輸入下表提供的環境變數,並將範例值換成您的值。

    環境變數

    範例值 說明
    S3_BUCKET sentry-logs 要儲存資料的 S3 bucket 名稱。
    S3_PREFIX sentry/events/ 物件的選用 S3 前置字串 (子資料夾)。
    STATE_KEY sentry/events/state.json 選用狀態/檢查點檔案金鑰。
    SENTRY_ORG your-org-slug Sentry 機構代碼。
    SENTRY_AUTH_TOKEN sntrys_************************ Sentry 驗證權杖,且具有 org:read、project:read、event:read。
    SENTRY_API_BASE https://sentry.io Sentry API 基本網址 (自架式:https://<your-domain>)。
    MAX_PROJECTS 100 要處理的專案數量上限。
    MAX_PAGES_PER_PROJECT 5 每次執行專案的頁數上限。
  8. 建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。

  9. 選取「設定」分頁標籤。

  10. 在「一般設定」面板中,按一下「編輯」

  11. 將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)

建立 EventBridge 排程

  1. 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」
  2. 提供下列設定詳細資料:
    • 週期性時間表費率 (1 hour)。
    • 目標:您的 Lambda 函式 sentry_to_s3
    • 名稱sentry-1h
  3. 按一下「建立時間表」

(選用) 為 Google SecOps 建立唯讀 IAM 使用者和金鑰

  1. AWS 控制台中,依序前往「IAM」>「Users」
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sentry-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sentry-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy

  8. 依序點選「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. secops-reader 建立存取金鑰:依序點選「安全憑證」>「存取金鑰」

  10. 按一下「建立存取金鑰」

  11. 下載 .CSV。(您會將這些值貼到動態饋給中)。

在 Google SecOps 中設定動態饋給,擷取 Sentry 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Sentry Logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Sentry」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://sentry-logs/sentry/events/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。