Snyk Group Issues ログを収集する
このドキュメントでは、Google Cloud Storage を使用して Snyk Group Issues ログを Google Security Operations に取り込む方法について説明します。Snyk は、組織がオープンソースの依存関係、コンテナ イメージ、Infrastructure as Code 構成、アプリケーション コードの脆弱性を検出して修正するのに役立つデベロッパー セキュリティ プラットフォームです。Snyk Group Issues では、Snyk Group 内のすべてのプロジェクトにわたるセキュリティの脆弱性とライセンスの問題を可視化できます。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run 関数、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- Snyk グループへの特権アクセス(読み取りアクセス権のある API トークン、グループ ID)
- API トークンを持つユーザーに割り当てられた Snyk グループ管理者ロール(ユーザーはグループ監査ログとグループの問題を表示できる必要があります)
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( snyk-group-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Snyk グループ ID と API トークンを収集する
Snyk API トークンを取得する
- https://app.snyk.io で Snyk UI にログインします。
- [アカウント設定] > [API トークン] に移動します。
- [生成] をクリックして API トークンを生成します。
トークンをコピーして安全な場所に保存し、後で
SNYK_TOKENとして使用します。
Snyk グループ ID を取得する
- Snyk UI で、グループに切り替えます。
- [グループ設定] に移動します。
- URL(
https://app.snyk.io/group/<GROUP_ID>/...)からグループ ID をコピーして保存し、後でGROUP_IDとして使用します。
グループ管理者ロールを割り当てる
- Snyk UI で、[Group settings > Members] に移動します。
- API トークンに関連付けられているユーザーを見つけます。
ユーザーにグループ管理者ロールを割り当てます。
メモ API エンドポイント
REST API のベース エンドポイントはリージョンによって異なります。Snyk リージョンを特定し、対応する REST ベース URL をメモします。
地域 REST ベース URL SNYK-US-01 https://api.snyk.io/restSNYK-US-02 https://api.us.snyk.io/restSNYK-EU-01 https://api.eu.snyk.io/restSNYK-AU-01 https://api.au.snyk.io/restこの REST ベース URL は、Cloud Run functions の構成で
API_BASEとして使用します。関数コードは、このベース URL に/groups/{group_id}/audit_logs/searchなどのパスを追加して、完全なエンドポイント URL を構築します。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込む権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
snyk-logs-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Snyk Group logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次の操作を行います。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
snyk-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
snyk-logs-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Snyk Group API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 snyk-group-logs-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
snyk-logs-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
snyk-logs-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETsnyk-group-logsGCS_PREFIXsnyk/group/STATE_KEYsnyk/group/state.jsonSNYK_TOKENyour-snyk-api-tokenGROUP_IDyour-group-uuidAPI_BASEhttps://api.snyk.io/restSNYK_AUDIT_API_VERSION2024-10-15SNYK_ISSUES_API_VERSION2024-10-15AUDIT_PAGE_SIZE100ISSUES_PAGE_LIMIT100MAX_PAGES20LOOKBACK_SECONDS3600[変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] まで下にスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import time import urllib.parse from urllib.request import Request, urlopen from urllib.parse import urlparse, parse_qs from urllib.error import HTTPError # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from Snyk Group API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'snyk/group/').strip() state_key = os.environ.get('STATE_KEY', 'snyk/group/state.json').strip() # Snyk API credentials api_base = os.environ.get('API_BASE', 'https://api.snyk.io/rest').rstrip('/') snyk_token = os.environ.get('SNYK_TOKEN').strip() group_id = os.environ.get('GROUP_ID').strip() # Page sizes & limits audit_size = int(os.environ.get('AUDIT_PAGE_SIZE', '100')) issues_limit = int(os.environ.get('ISSUES_PAGE_LIMIT', '100')) max_pages = int(os.environ.get('MAX_PAGES', '20')) # API versions audit_api_version = os.environ.get('SNYK_AUDIT_API_VERSION', '2024-10-15').strip() issues_api_version = os.environ.get('SNYK_ISSUES_API_VERSION', '2024-10-15').strip() # First-run lookback lookback_seconds = int(os.environ.get('LOOKBACK_SECONDS', '3600')) if not all([bucket_name, snyk_token, group_id]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state state = load_state(bucket, state_key) print('Starting Snyk Group logs collection') # Pull audit logs audit_res = pull_audit_logs( bucket, prefix, state, api_base, snyk_token, group_id, audit_api_version, audit_size, max_pages, lookback_seconds ) print(f"Audit logs: {audit_res}") # Pull issues issues_res = pull_issues( bucket, prefix, state, api_base, snyk_token, group_id, issues_api_version, issues_limit, max_pages ) print(f"Issues: {issues_res}") # Save state save_state(bucket, state_key, state) print('Successfully completed Snyk Group logs collection') except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, separators=(',', ':')), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}') def _iso(ts): """Convert timestamp to ISO format.""" return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(ts)) def _http_get(url, headers): """Make HTTP GET request with retry logic.""" req = Request(url, method='GET', headers=headers) try: with urlopen(req, timeout=60) as r: return json.loads(r.read().decode('utf-8')) except HTTPError as e: if e.code in (429, 500, 502, 503, 504): delay = int(e.headers.get('Retry-After', '1')) time.sleep(max(1, delay)) with urlopen(req, timeout=60) as r2: return json.loads(r2.read().decode('utf-8')) raise def _write_page(bucket, prefix, kind, payload): """Write page to GCS.""" ts = time.gmtime() key = f"{prefix.rstrip('/')}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-snyk-{kind}.json" blob = bucket.blob(key) blob.upload_from_string( json.dumps(payload, separators=(',', ':')), content_type='application/json' ) return key def _next_href(links): """Extract next href from links.""" if not links: return None nxt = links.get('next') if not nxt: return None if isinstance(nxt, str): return nxt if isinstance(nxt, dict): return nxt.get('href') return None def pull_audit_logs(bucket, prefix, state, api_base, snyk_token, group_id, audit_api_version, audit_size, max_pages, lookback_seconds): """Pull audit logs from Snyk Group API.""" headers = { 'Authorization': f'token {snyk_token}', 'Accept': 'application/vnd.api+json', } cursor = state.get('audit_cursor') pages = 0 total = 0 base = f"{api_base}/groups/{group_id}/audit_logs/search" params = { 'version': audit_api_version, 'size': audit_size } if cursor: params['cursor'] = cursor else: now = time.time() params['from'] = _iso(now - lookback_seconds) params['to'] = _iso(now) while pages < max_pages: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url, headers) _write_page(bucket, prefix, 'audit', payload) data_items = (payload.get('data') or {}).get('items') or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get('links')) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get('cursor') or [None])[0] if not cur: break params = { 'version': audit_api_version, 'size': audit_size, 'cursor': cur } state['audit_cursor'] = cur pages += 1 return { 'pages': pages + 1 if total else pages, 'items': total, 'cursor': state.get('audit_cursor') } def pull_issues(bucket, prefix, state, api_base, snyk_token, group_id, issues_api_version, issues_limit, max_pages): """Pull issues from Snyk Group API.""" headers = { 'Authorization': f'token {snyk_token}', 'Accept': 'application/vnd.api+json', } cursor = state.get('issues_cursor') pages = 0 total = 0 base = f"{api_base}/groups/{group_id}/issues" params = { 'version': issues_api_version, 'limit': issues_limit } if cursor: params['starting_after'] = cursor while pages < max_pages: url = f"{base}?{urllib.parse.urlencode(params, doseq=True)}" payload = _http_get(url, headers) _write_page(bucket, prefix, 'issues', payload) data_items = payload.get('data') or [] if isinstance(data_items, list): total += len(data_items) nxt = _next_href(payload.get('links')) if not nxt: break q = parse_qs(urlparse(nxt).query) cur = (q.get('starting_after') or [None])[0] if not cur: break params = { 'version': issues_api_version, 'limit': issues_limit, 'starting_after': cur } state['issues_cursor'] = cur pages += 1 return { 'pages': pages + 1 if total else pages, 'items': total, 'cursor': state.get('issues_cursor') } ```
2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.*
[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 snyk-group-logs-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック トピック snyk-logs-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run> サービス> snyk-group-logs-collector > ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、ログが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Snyk Group Audit/Issues)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Snyk Group level audit/issues logs] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Snyk Group のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Snyk Group Audit/Issues)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Snyk Group level audit/issues logs] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://snyk-group-logs/snyk/group/次のように置き換えます。
snyk-group-logs: GCS バケット名。snyk/group/: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
例:
- ルートバケット:
gs://company-logs/ - 接頭辞あり:
gs://company-logs/snyk-logs/ - サブフォルダあり:
gs://company-logs/snyk/group/
- ルートバケット:
- Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
- 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アセットの名前空間: アセットの名前空間(
snyk.groupなど)。 - Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。