Sentry ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Sentry ログを Google Security Operations に取り込む方法について説明します。Sentry は、イベント、問題、パフォーマンス モニタリング データ、エラー トラッキング情報という形式で運用データを生成します。この統合により、これらのログを Google SecOps に送信して分析とモニタリングを行うことができます。これにより、Sentry でモニタリングされているアプリケーション内のアプリケーション エラー、パフォーマンスの問題、ユーザー操作を可視化できます。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス。
  • Sentry テナントへの特権アクセス(API スコープを含む認証トークン)。
  • AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。

Sentry の前提条件(ID、API キー、組織 ID、トークン)を収集する

  1. Sentry にログインします。
  2. 組織のスラッグを確認します。
    • [設定] > [組織] > [設定] > [組織 ID](組織名の横にスラッグが表示されます)に移動します。
  3. 認証トークンを作成します。
    • [Settings] > [Developer Settings] > [Personal Tokens] に移動します。
    • [Create New] をクリックします。
    • スコープ(最小): org:readproject:readevent:read
    • トークンの値(1 回のみ表示)をコピーします。これは Authorization: Bearer <token> として使用されます。
  4. (セルフホストの場合)ベース URLhttps://<your-domain> など)をメモします。それ以外の場合は https://sentry.io を使用します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: sentry-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションの [権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] に移動します。
  2. [ポリシーを作成> [JSON] タブ] をクリックします。
  3. 次のポリシーをコピーして貼り付けます。
  4. ポリシー JSON(別のバケット名を入力した場合は sentry-logs を置き換えます):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::sentry-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::sentry-logs/sentry/events/state.json"
        }
      ]
    }
    
  5. [次へ] > [ポリシーを作成] をクリックします。

  6. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  7. 新しく作成したポリシーを関連付けます。

  8. ロールに「WriteSentryToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 sentry_to_s3
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール WriteSentryToS3Role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(sentry_to_s3.py)を貼り付けます。

    #!/usr/bin/env python3
    # Lambda: Pull Sentry project events (raw JSON) to S3 using Link "previous" cursor for duplicate-safe polling
    
    import os, json, time
    from urllib.request import Request, urlopen
    from urllib.parse import urlencode, urlparse, parse_qs
    import boto3
    
    ORG = os.environ["SENTRY_ORG"].strip()
    TOKEN = os.environ["SENTRY_AUTH_TOKEN"].strip()
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "sentry/events/")
    STATE_KEY = os.environ.get("STATE_KEY", "sentry/events/state.json")
    BASE = os.environ.get("SENTRY_API_BASE", "https://sentry.io").rstrip("/")
    MAX_PROJECTS = int(os.environ.get("MAX_PROJECTS", "100"))
    MAX_PAGES_PER_PROJECT = int(os.environ.get("MAX_PAGES_PER_PROJECT", "5"))
    
    s3 = boto3.client("s3")
    HDRS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json", "User-Agent": "chronicle-s3-sentry-lambda/1.0"}
    
    def _get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            raw = obj["Body"].read()
            return json.loads(raw) if raw else {"projects": {}}
        except Exception:
            return {"projects": {}}
    
    def _put_state(state: dict):
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state, separators=(",", ":")).encode("utf-8"))
    
    def _req(path: str, params: dict | None = None):
        url = f"{BASE}{path}"
        if params:
            url = f"{url}?{urlencode(params)}"
        req = Request(url, method="GET", headers=HDRS)
        with urlopen(req, timeout=60) as r:
            data = json.loads(r.read().decode("utf-8"))
            link = r.headers.get("Link")
            return data, link
    
    def _parse_link(link_header: str | None):
        """Return (prev_cursor, prev_has_more, next_cursor, next_has_more)."""
        if not link_header:
            return None, False, None, False
        prev_cursor, next_cursor = None, None
        prev_more, next_more = False, False
        parts = [p.strip() for p in link_header.split(",")]
        for p in parts:
            if "<" not in p or ">" not in p:
                continue
            url = p.split("<", 1)[1].split(">", 1)[0]
            rel = "previous" if 'rel="previous"' in p else ("next" if 'rel="next"' in p else None)
            has_more = 'results="true"' in p
            try:
                q = urlparse(url).query
                cur = parse_qs(q).get("cursor", [None])[0]
            except Exception:
                cur = None
            if rel == "previous":
                prev_cursor, prev_more = cur, has_more
            elif rel == "next":
                next_cursor, next_more = cur, has_more
        return prev_cursor, prev_more, next_cursor, next_more
    
    def _write_page(project_slug: str, payload: object, page_idx: int) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX.rstrip('/')}/{time.strftime('%Y/%m/%d', ts)}/sentry-{project_slug}-{page_idx:05d}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"))
        return key
    
    def list_projects(max_projects: int):
        projects, cursor = [], None
        while len(projects) < max_projects:
            params = {"cursor": cursor} if cursor else {}
            data, link = _req(f"/api/0/organizations/{ORG}/projects/", params)
            for p in data:
                slug = p.get("slug")
                if slug:
                    projects.append(slug)
                    if len(projects) >= max_projects:
                        break
            # advance pagination
            _, _, next_cursor, next_more = _parse_link(link)
            cursor = next_cursor if next_more else None
            if not next_more:
                break
        return projects
    
    def fetch_project_events(project_slug: str, start_prev_cursor: str | None):
        # If we have a stored "previous" cursor, poll forward (newer) until no more results.
        # If not (first run), fetch the latest page, then optionally follow "next" (older) for initial backfill up to the limit.
        pages = 0
        total = 0
        latest_prev_cursor_to_store = None
    
        def _one(cursor: str | None):
            nonlocal pages, total, latest_prev_cursor_to_store
            params = {"cursor": cursor} if cursor else {}
            data, link = _req(f"/api/0/projects/{ORG}/{project_slug}/events/", params)
            _write_page(project_slug, data, pages)
            total += len(data) if isinstance(data, list) else 0
            prev_c, prev_more, next_c, next_more = _parse_link(link)
            # capture the most recent "previous" cursor observed to store for the next run
            latest_prev_cursor_to_store = prev_c or latest_prev_cursor_to_store
            pages += 1
            return prev_c, prev_more, next_c, next_more
    
        if start_prev_cursor:
            # Poll new pages toward "previous" until no more
            cur = start_prev_cursor
            while pages < MAX_PAGES_PER_PROJECT:
                prev_c, prev_more, _, _ = _one(cur)
                if not prev_more:
                    break
                cur = prev_c
        else:
            # First run: start at newest, then (optionally) backfill a few older pages
            prev_c, _, next_c, next_more = _one(None)
            cur = next_c
            while next_more and pages < MAX_PAGES_PER_PROJECT:
                _, _, next_c, next_more = _one(cur)
                cur = next_c
    
        return {"project": project_slug, "pages": pages, "written": total, "store_prev_cursor": latest_prev_cursor_to_store}
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        state.setdefault("projects", {})
    
        projects = list_projects(MAX_PROJECTS)
        summary = []
        for slug in projects:
            start_prev = state["projects"].get(slug, {}).get("prev_cursor")
            res = fetch_project_events(slug, start_prev)
            if res.get("store_prev_cursor"):
                state["projects"][slug] = {"prev_cursor": res["store_prev_cursor"]}
            summary.append(res)
    
        _put_state(state)
        return {"ok": True, "projects": len(projects), "summary": summary}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. [構成] > [環境変数] に移動します。

  6. [編集>新しい環境変数を追加] をクリックします。

  7. 次の表に示す環境変数を入力し、サンプル値を自分の値に置き換えます。

    環境変数

    キー 値の例 説明
    S3_BUCKET sentry-logs データが保存される S3 バケットの名前。
    S3_PREFIX sentry/events/ オブジェクトの省略可能な S3 接頭辞(サブフォルダ)。
    STATE_KEY sentry/events/state.json 状態/チェックポイント ファイルのキー(省略可)。
    SENTRY_ORG your-org-slug Sentry 組織のスラッグ。
    SENTRY_AUTH_TOKEN sntrys_************************ org:read、project:read、event:read を含む Sentry 認証トークン。
    SENTRY_API_BASE https://sentry.io Sentry API のベース URL(セルフホスト: https://<your-domain>)。
    MAX_PROJECTS 100 処理するプロジェクトの最大数。
    MAX_PAGES_PER_PROJECT 5 1 回の実行あたりのプロジェクトあたりの最大ページ数。
  8. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  9. [CONFIGURATION] タブを選択します。

  10. [全般設定] パネルで、[編集] をクリックします。

  11. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数 sentry_to_s3
    • 名前: sentry-1h
  3. [スケジュールを作成] をクリックします。

(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [ユーザー] に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: 「secops-reader」と入力します。
    • アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::sentry-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::sentry-logs"
        }
      ]
    }
    
  7. 名前 = secops-reader-policy

  8. [ポリシーを作成> 検索/選択> 次へ> 権限を追加] をクリックします。

  9. secops-reader のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。

  10. [アクセスキーを作成] をクリックします。

  11. .CSV をダウンロードします。(これらの値はフィードに貼り付けます)。

Sentry のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Sentry Logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Sentry] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://sentry-logs/sentry/events/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。