Snyk Group Issues ログを収集する

以下でサポートされています。

このドキュメントでは、Google Cloud Storage を使用して Snyk Group Issues ログを Google Security Operations に取り込む方法について説明します。Snyk は、組織がオープンソースの依存関係、コンテナ イメージ、Infrastructure as Code 構成、アプリケーション コードの脆弱性を検出して修正するのに役立つデベロッパー セキュリティ プラットフォームです。Snyk Group Issues では、Snyk Group 内のすべてのプロジェクトにわたるセキュリティの脆弱性とライセンスの問題を可視化できます。

始める前に

次の前提条件を満たしていることを確認します。

  • Google SecOps インスタンス
  • Cloud Storage API が有効になっている GCP プロジェクト
  • GCS バケットを作成および管理する権限
  • GCS バケットの IAM ポリシーを管理する権限
  • Cloud Run 関数、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
  • Snyk グループへの特権アクセス(読み取りアクセス権のある API トークン、グループ ID)
  • API トークンを持つユーザーに割り当てられた Snyk グループ管理者ロール(ユーザーはグループ監査ログとグループの問題を表示できる必要があります)

Google Cloud Storage バケットを作成する

  1. Google Cloud Console に移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(snyk-group-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション ロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

Snyk グループ ID と API トークンを収集する

Snyk API トークンを取得する

  1. https://app.snyk.io で Snyk UI にログインします。
  2. [アカウント設定] > [API トークン] に移動します。
  3. [生成] をクリックして API トークンを生成します。
  4. トークンをコピーして安全な場所に保存し、後で SNYK_TOKEN として使用します。

Snyk グループ ID を取得する

  1. Snyk UI で、グループに切り替えます。
  2. [グループ設定] に移動します。
  3. URL(https://app.snyk.io/group/<GROUP_ID>/...)からグループ ID をコピーして保存し、後で GROUP_ID として使用します。

グループ管理者ロールを割り当てる

  1. Snyk UI で、[Group settings > Members] に移動します。
  2. API トークンに関連付けられているユーザーを見つけます。
  3. ユーザーにグループ管理者ロールを割り当てます。

メモ API エンドポイント

  • REST API のベース エンドポイントはリージョンによって異なります。Snyk リージョンを特定し、対応する REST ベース URL をメモします。

    地域 REST ベース URL
    SNYK-US-01 https://api.snyk.io/rest
    SNYK-US-02 https://api.us.snyk.io/rest
    SNYK-EU-01 https://api.eu.snyk.io/rest
    SNYK-AU-01 https://api.au.snyk.io/rest

    この REST ベース URL は、Cloud Run functions の構成で API_BASE として使用します。関数コードは、このベース URL に /groups/{group_id}/audit_logs/search などのパスを追加して、完全なエンドポイント URL を構築します。

Cloud Run functions のサービス アカウントを作成する

Cloud Run 関数には、GCS バケットに書き込む権限を持つサービス アカウントが必要です。

サービス アカウントの作成

  1. GCP Console で、[IAM と管理>サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: 「snyk-logs-collector-sa」と入力します。
    • サービス アカウントの説明: 「Service account for Cloud Run function to collect Snyk Group logs」と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次の操作を行います。
    1. [ロールを選択] をクリックします。
    2. [ストレージ オブジェクト管理者] を検索して選択します。
    3. [+ 別のロールを追加] をクリックします。
    4. [Cloud Run 起動元] を検索して選択します。
    5. [+ 別のロールを追加] をクリックします。
    6. [Cloud Functions 起動元] を検索して選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。

これらのロールは、次の目的で必要です。

  • Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
  • Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
  • Cloud Functions 起動元: 関数の呼び出しを許可する

GCS バケットに対する IAM 権限を付与する

GCS バケットに対する書き込み権限をサービス アカウントに付与します。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: サービス アカウントのメールアドレス(例: snyk-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。
    • ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
  6. [保存] をクリックします。

Pub/Sub トピックの作成

Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。

  1. GCP Console で、[Pub/Sub> トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「snyk-logs-trigger」と入力します。
    • その他の設定はデフォルトのままにします。
  4. [作成] をクリックします。

ログを収集する Cloud Run 関数を作成する

Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Snyk Group API からログを取得して GCS に書き込みます。

  1. GCP Console で、[Cloud Run] に移動します。
  2. [サービスを作成] をクリックします。
  3. [関数] を選択します(インライン エディタを使用して関数を作成します)。
  4. [構成] セクションで、次の構成の詳細を指定します。

    設定
    サービス名 snyk-group-logs-collector
    リージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。
    ランタイム [Python 3.12] 以降を選択します。
  5. [トリガー(省略可)] セクションで、次の操作を行います。

    1. [+ トリガーを追加] をクリックします。
    2. [Cloud Pub/Sub] を選択します。
    3. [Cloud Pub/Sub トピックを選択してください] で、トピック snyk-logs-trigger を選択します。
    4. [保存] をクリックします。
  6. [認証] セクションで、次の操作を行います。

    1. [認証が必要] を選択します。
    2. Identity and Access Management(IAM)を確認します。
  7. 下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。

  8. [セキュリティ] タブに移動します。

    • サービス アカウント: サービス アカウント snyk-logs-collector-sa を選択します。
  9. [コンテナ] タブに移動します。

    1. [変数とシークレット] をクリックします。
    2. 環境変数ごとに [+ 変数を追加] をクリックします。
    変数名 値の例
    GCS_BUCKET snyk-group-logs
    GCS_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN your-snyk-api-token
    GROUP_ID your-group-uuid
    API_BASE https://api.snyk.io/rest
    SNYK_AUDIT_API_VERSION 2024-10-15
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  10. [変数とシークレット] タブで [リクエスト] まで下にスクロールします。

    • リクエストのタイムアウト: 600 秒(10 分)を入力します。
  11. [コンテナ] の [設定] タブに移動します。

    • [リソース] セクションで次の操作を行います。
      • メモリ: 512 MiB 以上を選択します。
      • CPU: [1] を選択します。
    • [完了] をクリックします。
  12. [実行環境] まで下にスクロールします。

    • [デフォルト](推奨)を選択します。
  13. [リビジョン スケーリング] セクションで、次の操作を行います。

    • [インスタンスの最小数] に「0」と入力します。
    • インスタンスの最大数: 100 と入力します(または、予想される負荷に基づいて調整します)。
  14. [作成] をクリックします。

  15. サービスが作成されるまで待ちます(1 ~ 2 分)。

  16. サービスを作成すると、インライン コードエディタが自動的に開きます。

関数コードを追加する

  1. [関数のエントリ ポイント] に「main」と入力します。
  2. インライン コードエディタで、次の 2 つのファイルを作成します。

    • 最初のファイル: main.py:

          import functions_framework
          from google.cloud import storage
          import json
          import os
          import time
          import urllib.parse
          from urllib.request import Request, urlopen
          from urllib.parse import urlparse, parse_qs
          from urllib.error import HTTPError
      
          # Initialize Storage client
          storage_client = storage.Client()
      
          @functions_framework.cloud_event
          def main(cloud_event):
              """
              Cloud Run function triggered by Pub/Sub to fetch logs from Snyk Group API and write to GCS.
      
              Args:
                  cloud_event: CloudEvent object containing Pub/Sub message
              """
      
              # Get environment variables
              bucket_name = os.environ.get('GCS_BUCKET')
              prefix = os.environ.get('GCS_PREFIX', 'snyk/group/').strip()
              state_key = os.environ.get('STATE_KEY', 'snyk/group/state.json').strip()
      
              # Snyk API credentials
              api_base = os.environ.get('API_BASE', 'https://api.snyk.io/rest').rstrip('/')
              snyk_token = os.environ.get('SNYK_TOKEN').strip()
              group_id = os.environ.get('GROUP_ID').strip()
      
              # Page sizes & limits
              audit_size = int(os.environ.get('AUDIT_PAGE_SIZE', '100'))
              issues_limit = int(os.environ.get('ISSUES_PAGE_LIMIT', '100'))
              max_pages = int(os.environ.get('MAX_PAGES', '20'))
      
              # API versions
              audit_api_version = os.environ.get('SNYK_AUDIT_API_VERSION', '2024-10-15').strip()
              issues_api_version = os.environ.get('SNYK_ISSUES_API_VERSION', '2024-10-15').strip()
      
              # First-run lookback
              lookback_seconds = int(os.environ.get('LOOKBACK_SECONDS', '3600'))
      
              if not all([bucket_name, snyk_token, group_id]):
                  print('Error: Missing required environment variables')
                  return
      
              try:
                  # Get GCS bucket
                  bucket = storage_client.bucket(bucket_name)
      
                  # Load state
                  state = load_state(bucket, state_key)
      
                  print('Starting Snyk Group logs collection')
      
                  # Pull audit logs
                  audit_res = pull_audit_logs(
                      bucket, prefix, state, api_base, snyk_token, group_id,
                      audit_api_version, audit_size, max_pages, lookback_seconds
                  )
                  print(f"Audit logs: {audit_res}")
      
                  # Pull issues
                  issues_res = pull_issues(
                      bucket, prefix, state, api_base, snyk_token, group_id,
                      issues_api_version, issues_limit, max_pages
                  )
                  print(f"Issues: {issues_res}")
      
                  # Save state
                  save_state(bucket, state_key, state)
      
                  print('Successfully completed Snyk Group logs collection')
      
              except Exception as e:
                  print(f'Error processing logs: {str(e)}')
                  raise
      
          def load_state(bucket, key):
              """Load state from GCS."""
              try:
                  blob = bucket.blob(key)
                  if blob.exists():
                      state_data = blob.download_as_text()
                      return json.loads(state_data)
              except Exception as e:
                  print(f'Warning: Could not load state: {str(e)}')
              return {}
      
          def save_state(bucket, key, state):
              """Save state to GCS."""
              try:
                  blob = bucket.blob(key)
                  blob.upload_from_string(
                      json.dumps(state, separators=(',', ':')),
                      content_type='application/json'
                  )
              except Exception as e:
                  print(f'Warning: Could not save state: {str(e)}')
      
          def _iso(ts):
              """Convert timestamp to ISO format."""
              return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(ts))
      
          def _http_get(url, headers):
              """Make HTTP GET request with retry logic."""
              req = Request(url, method='GET', headers=headers)
              try:
                  with urlopen(req, timeout=60) as r:
                      return json.loads(r.read().decode('utf-8'))
              except HTTPError as e:
                  if e.code in (429, 500, 502, 503, 504):
                      delay = int(e.headers.get('Retry-After', '1'))
                      time.sleep(max(1, delay))
                      with urlopen(req, timeout=60) as r2:
                          return json.loads(r2.read().decode('utf-8'))
                  raise
      
          def _write_page(bucket, prefix, kind, payload):
              """Write page to GCS."""
              ts = time.gmtime()
              key = f"{prefix.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json"
              blob = bucket.blob(key)
              blob.upload_from_string(
                  json.dumps(payload, separators=(',', ':')),
                  content_type='application/json'
              )
              return key
      
          def _next_href(links):
              """Extract next href from links."""
              if not links:
                  return None
              nxt = links.get('next')
              if not nxt:
                  return None
              if isinstance(nxt, str):
                  return nxt
              if isinstance(nxt, dict):
                  return nxt.get('href')
              return None
      
          def pull_audit_logs(bucket, prefix, state, api_base, snyk_token, group_id,
                              audit_api_version, audit_size, max_pages, lookback_seconds):
              """Pull audit logs from Snyk Group API."""
              headers = {
                  'Authorization': f'token {snyk_token}',
                  'Accept': 'application/vnd.api+json',
              }
      
              cursor = state.get('audit_cursor')
              pages = 0
              total = 0
              base = f"{api_base}/groups/{group_id}/audit_logs/search"
              params = {
                  'version': audit_api_version,
                  'size': audit_size
              }
      
              if cursor:
                  params['cursor'] = cursor
              else:
                  now = time.time()
                  params['from'] = _iso(now - lookback_seconds)
                  params['to'] = _iso(now)
      
              while pages < max_pages:
                  url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
                  payload = _http_get(url, headers)
                  _write_page(bucket, prefix, 'audit', payload)
      
                  data_items = (payload.get('data') or {}).get('items') or []
                  if isinstance(data_items, list):
                      total += len(data_items)
      
                  nxt = _next_href(payload.get('links'))
                  if not nxt:
                      break
      
                  q = parse_qs(urlparse(nxt).query)
                  cur = (q.get('cursor') or [None])[0]
                  if not cur:
                      break
      
                  params = {
                      'version': audit_api_version,
                      'size': audit_size,
                      'cursor': cur
                  }
                  state['audit_cursor'] = cur
                  pages += 1
      
              return {
                  'pages': pages + 1 if total else pages,
                  'items': total,
                  'cursor': state.get('audit_cursor')
              }
      
          def pull_issues(bucket, prefix, state, api_base, snyk_token, group_id,
                          issues_api_version, issues_limit, max_pages):
              """Pull issues from Snyk Group API."""
              headers = {
                  'Authorization': f'token {snyk_token}',
                  'Accept': 'application/vnd.api+json',
              }
      
              cursor = state.get('issues_cursor')
              pages = 0
              total = 0
              base = f"{api_base}/groups/{group_id}/issues"
              params = {
                  'version': issues_api_version,
                  'limit': issues_limit
              }
      
              if cursor:
                  params['starting_after'] = cursor
      
              while pages < max_pages:
                  url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}"
                  payload = _http_get(url, headers)
                  _write_page(bucket, prefix, 'issues', payload)
      
                  data_items = payload.get('data') or []
                  if isinstance(data_items, list):
                      total += len(data_items)
      
                  nxt = _next_href(payload.get('links'))
                  if not nxt:
                      break
      
                  q = parse_qs(urlparse(nxt).query)
                  cur = (q.get('starting_after') or [None])[0]
                  if not cur:
                      break
      
                  params = {
                      'version': issues_api_version,
                      'limit': issues_limit,
                      'starting_after': cur
                  }
                  state['issues_cursor'] = cur
                  pages += 1
      
              return {
                  'pages': pages + 1 if total else pages,
                  'items': total,
                  'cursor': state.get('issues_cursor')
              }
          ```
      
    • 2 つ目のファイル: requirements.txt:

      functions-framework==3.*
      google-cloud-storage==2.*
      
  3. [デプロイ] をクリックして、関数を保存してデプロイします。

  4. デプロイが完了するまで待ちます(2 ~ 3 分)。

Cloud Scheduler ジョブの作成

Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。

  1. GCP Console で、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 snyk-group-logs-hourly
    リージョン Cloud Run functions と同じリージョンを選択する
    周波数 0 * * * *(1 時間ごとに正時)
    タイムゾーン タイムゾーンを選択します(UTC を推奨)。
    ターゲット タイプ Pub/Sub
    トピック トピック snyk-logs-trigger を選択します。
    メッセージ本文 {}(空の JSON オブジェクト)
  4. [作成] をクリックします。

スケジュールの頻度のオプション

  • ログの量とレイテンシの要件に基づいて頻度を選択します。

    頻度 CRON 式 ユースケース
    5 分毎 */5 * * * * 大容量、低レイテンシ
    15 分ごと */15 * * * * 検索量が普通
    1 時間ごと 0 * * * * 標準(推奨)
    6 時間ごと 0 */6 * * * 少量、バッチ処理
    毎日 0 0 * * * 履歴データの収集

スケジューラ ジョブをテストする

  1. Cloud Scheduler コンソールで、ジョブを見つけます。
  2. [強制実行] をクリックして手動でトリガーします。
  3. 数秒待ってから、[Cloud Run> サービス> snyk-group-logs-collector > ログ] に移動します。
  4. 関数が正常に実行されたことを確認します。
  5. GCS バケットをチェックして、ログが書き込まれたことを確認します。

Google SecOps サービス アカウントを取得する

Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Snyk Group Audit/Issues)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Snyk Group level audit/issues logs] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

Google SecOps サービス アカウントに IAM 権限を付与する

Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
    • ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

Snyk Group のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Snyk Group Audit/Issues)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Snyk Group level audit/issues logs] を選択します。
  7. [次へ] をクリックします。
  8. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://snyk-group-logs/snyk/group/
      
      • 次のように置き換えます。

        • snyk-group-logs: GCS バケット名。
        • snyk/group/: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
      • 例:

        • ルートバケット: gs://company-logs/
        • 接頭辞あり: gs://company-logs/snyk-logs/
        • サブフォルダあり: gs://company-logs/snyk/group/
    • Source deletion option: 必要に応じて削除オプションを選択します。
      • なし: 転送後にファイルを削除しません(テストにおすすめ)。
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アセットの名前空間: アセットの名前空間snyk.group など)。
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  9. [次へ] をクリックします。

  10. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。