Swimlane Platform のログを収集する
このドキュメントでは、Google Cloud Storage を使用して Swimlane Platform ログを Google Security Operations に取り込む方法について説明します。Swimlane Platform は、アカウントとテナント全体でユーザー アクティビティ、構成の変更、システム イベントを追跡するための監査ロギング機能を提供する、セキュリティ オーケストレーション、自動化、レスポンス(SOAR)プラットフォームです。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- 監査ログにアクセスするためのアカウント管理者権限を持つ Swimlane Platform への特権アクセス
- Swimlane Platform インスタンスの URL とアカウント認証情報
Swimlane Platform の認証情報を収集する
Swimlane Platform インスタンスの URL を取得する
- Swimlane Platform インスタンスにログインします。
- ブラウザのアドレスバーからインスタンスの URL をメモします。
- 形式:
https://<region>.swimlane.app(例:https://us.swimlane.app、https://eu.swimlane.app) - 例:
https://us.swimlane.app/workspaceで Swimlane にアクセスする場合、ベース URL はhttps://us.swimlane.appです。
- 形式:
個人用アクセス トークンを作成する
- アカウント管理者として Swimlane Platform にログインします。
- [プロフィール オプション] に移動します。
- [プロフィール] をクリックして、プロフィール エディタを開きます。
- [個人用アクセス トークン] セクションに移動します。
- [Generate token] をクリックして、新しい個人用アクセス トークンを作成します。
- トークンをすぐにコピーして安全に保管します(トークンは再度表示されません)。
アカウント ID を取得する
アカウント ID がわからない場合は、Swimlane 管理者にお問い合わせください。監査ログ API パスにはアカウント ID が必要です。
統合に関する次の詳細を記録します。
- 個人用アクセス トークン(PAT): API 呼び出しの
Private-Tokenヘッダーで使用されます。 - アカウント ID: 監査ログ API パス
/api/public/audit/account/{ACCOUNT_ID}/auditlogsに必要です。 - ベース URL: Swimlane ドメイン(
https://eu.swimlane.app、https://us.swimlane.appなど)。
権限を確認する
監査ログにアクセスするために必要な権限がアカウントに付与されていることを確認するには:
- Swimlane Platform にログインします。
- アカウント管理者権限があることを確認します。
監査ログ機能にアクセスできない場合は、Swimlane 管理者にお問い合わせください。
テスト API アクセス
統合に進む前に、API 認証情報が正しく機能していることを確認します。
# Replace with your actual credentials SWIMLANE_BASE_URL="https://<region>.swimlane.app" SWIMLANE_ACCOUNT_ID="<your-account-id>" SWIMLANE_PAT_TOKEN="<your-personal-access-token>" # Test API access curl -v -X GET "${SWIMLANE_BASE_URL}/api/public/audit/account/${SWIMLANE_ACCOUNT_ID}/auditlogs?pageNumber=1&pageSize=10" \ -H "Private-Token: ${SWIMLANE_PAT_TOKEN}" \ -H "Accept: application/json"
想定されるレスポンス: 監査ログを含む JSON を含む HTTP 200。
エラーが表示された場合:
- HTTP 401: 個人用アクセス トークンが正しいことを確認する
- HTTP 403: アカウントにアカウント管理者の権限があることを確認する
- HTTP 404: アカウント ID とベース URL が正しいことを確認する
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( swimlane-auditなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
swimlane-audit-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect Swimlane Platform logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
swimlane-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
swimlane-audit-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Swimlane Platform API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 swimlane-audit-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(
swimlane-audit-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
swimlane-audit-collector-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETswimlane-auditGCS バケット名 GCS_PREFIXswimlane/audit/ログファイルの接頭辞 STATE_KEYswimlane/audit/state.json状態ファイルのパス SWIMLANE_BASE_URLhttps://us.swimlane.appSwimlane Platform のベース URL SWIMLANE_PAT_TOKENyour-personal-access-tokenSwimlane の個人用アクセス トークン SWIMLANE_ACCOUNT_IDyour-account-idスイムレーン アカウント ID SWIMLANE_TENANT_LIST`` テナント ID のカンマ区切りのリスト(省略可。すべてのテナントが対象の場合は空白のままにする) INCLUDE_ACCOUNTtrueアカウント レベルのログを含める(true/false) PAGE_SIZE100ページあたりのレコード数(最大 100) LOOKBACK_HOURS24最初のルックバック期間 TIMEOUT30API リクエストのタイムアウト(秒) [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import uuid import gzip import io # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'swimlane/audit/') STATE_KEY = os.environ.get('STATE_KEY', 'swimlane/audit/state.json') SWIMLANE_BASE_URL = os.environ.get('SWIMLANE_BASE_URL', '').rstrip('/') SWIMLANE_PAT_TOKEN = os.environ.get('SWIMLANE_PAT_TOKEN') SWIMLANE_ACCOUNT_ID = os.environ.get('SWIMLANE_ACCOUNT_ID') SWIMLANE_TENANT_LIST = os.environ.get('SWIMLANE_TENANT_LIST', '') INCLUDE_ACCOUNT = os.environ.get('INCLUDE_ACCOUNT', 'true').lower() == 'true' PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) TIMEOUT = int(os.environ.get('TIMEOUT', '30')) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Swimlane Platform logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, SWIMLANE_BASE_URL, SWIMLANE_PAT_TOKEN, SWIMLANE_ACCOUNT_ID]): print('Error: Missing required environment variables (GCS_BUCKET, SWIMLANE_BASE_URL, SWIMLANE_PAT_TOKEN, SWIMLANE_ACCOUNT_ID)') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Fetch logs records, newest_event_time = fetch_logs( base_url=SWIMLANE_BASE_URL, pat_token=SWIMLANE_PAT_TOKEN, account_id=SWIMLANE_ACCOUNT_ID, tenant_list=SWIMLANE_TENANT_LIST, include_account=INCLUDE_ACCOUNT, start_time=last_time, end_time=now, page_size=PAGE_SIZE, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as gzipped NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}{now:%Y/%m/%d}/swimlane-audit-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode='w') as gz: for record in records: gz.write((json.dumps(record, ensure_ascii=False) + '\n').encode()) buf.seek(0) blob = bucket.blob(object_key) blob.upload_from_file(buf, content_type='application/gzip') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = { 'last_event_time': last_event_time_iso, 'updated_at': datetime.now(timezone.utc).isoformat() + 'Z' } blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(base_url: str, pat_token: str, account_id: str, tenant_list: str, include_account: bool, start_time: datetime, end_time: datetime, page_size: int): """ Fetch logs from Swimlane Platform API with pagination and rate limiting. Args: base_url: Swimlane Platform base URL pat_token: Personal Access Token account_id: Swimlane account identifier tenant_list: Comma-separated tenant IDs (optional) include_account: Include account-level logs start_time: Start time for log query end_time: End time for log query page_size: Number of records per page (max 100) Returns: Tuple of (records list, newest_event_time ISO string) """ endpoint = f"{base_url}/api/public/audit/account/{account_id}/auditlogs" headers = { 'Private-Token': pat_token, 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-SwimlaneCollector/1.0' } records = [] newest_time = None page_num = 1 backoff = 1.0 while True: params = [] params.append(f"pageNumber={page_num}") params.append(f"pageSize={min(page_size, 100)}") params.append(f"fromdate={start_time.isoformat()}") params.append(f"todate={end_time.isoformat()}") if tenant_list: params.append(f"tenantList={tenant_list}") params.append(f"includeAccount={'true' if include_account else 'false'}") url = f"{endpoint}?{'&'.join(params)}" try: response = http.request('GET', url, headers=headers, timeout=TIMEOUT) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status == 401: print(f"Authentication failed (401). Verify SWIMLANE_PAT_TOKEN is correct.") return [], None if response.status == 403: print(f"Access forbidden (403). Verify account has Account Admin permissions to access audit logs.") return [], None if response.status == 400: print(f"Bad request (400). Verify account_id and query parameters are correct.") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('auditlogs', []) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: event_time = event.get('eventTime') or event.get('EventTime') if event_time: if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results has_next = data.get('next') total_count = data.get('totalCount', 0) if not has_next: print(f"Reached last page (no next link)") break # Check if we've hit the 10,000 log limit if total_count > 10000 and len(records) >= 10000: print(f"Warning: Reached Swimlane API limit of 10,000 logs. Consider narrowing the time range.") break page_num += 1 except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 swimlane-audit-schedule-15minリージョン Cloud Run functions と同じリージョンを選択する 周波数 */15 * * * *(15 分ごと)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック( swimlane-audit-trigger)を選択するメッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *標準(推奨) 1 時間ごと 0 * * * *検索量が普通 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
統合をテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
- 関数名(
swimlane-audit-collector)をクリックします。 - [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。次の内容を確認します。
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://bucket-name/swimlane/audit/YYYY/MM/DD/swimlane-audit-UUID.json.gz Successfully processed X records[Cloud Storage] > [バケット] に移動します。
バケット名をクリックします。
プレフィックス フォルダ(
swimlane/audit/)に移動します。現在のタイムスタンプで新しい
.json.gzファイルが作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数の SWIMLANE_PAT_TOKEN を確認し、Personal Access Token が正しいことを確認します。
- HTTP 403: 監査ログにアクセスするためのアカウント管理者権限がアカウントに付与されていることを確認する
- HTTP 400: SWIMLANE_ACCOUNT_ID が正しく、クエリ パラメータが有効であることを確認します
- HTTP 404: SWIMLANE_BASE_URL と API エンドポイント パスが正しいことを確認する
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要なすべての変数(GCS_BUCKET、SWIMLANE_BASE_URL、SWIMLANE_PAT_TOKEN、SWIMLANE_ACCOUNT_ID)が設定されていることを確認します。
- 接続エラー: Swimlane Platform へのネットワーク接続とファイアウォール ルールを確認する
- 10,000 件のログ上限に関する警告: Swimlane の API 上限内に収まるように、LOOKBACK_HOURS を減らすか、Cloud Scheduler の頻度を増やします。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Swimlane Platform logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Swimlane Platform] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
Swimlane Platform のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Swimlane Platform logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Swimlane Platform] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://swimlane-audit/swimlane/audit/次のように置き換えます。
swimlane-audit: GCS バケット名。swimlane/audit/: ログが保存される接頭辞/フォルダパス。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。