Keeper Enterprise Security のログを収集する
このドキュメントでは、Google Cloud Storage V2 を使用して Keeper Enterprise Security ログを Google Security Operations に取り込む方法について説明します。
Keeper Enterprise Security は、認証情報、シークレット、センシティブ データを保護するエンタープライズ パスワード マネージャーと特権アクセス管理プラットフォームです。Reporting and Alerts API を介して、Vault へのアクセス、パスワードの変更、共有イベント、管理アクションの監査ログを生成します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- 管理者権限を持つ Keeper 管理コンソールへの特権アクセス
- 高度なレポートとアラート モジュール(ARAM)が有効になっている Keeper Enterprise プランまたは Keeper Business プラン
Google Cloud Storage バケットを作成する
- Google Cloud コンソールに移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前を入力します(例: keeper-audit-logs)。ロケーション タイプ ニーズに応じて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション 場所を選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Keeper Enterprise Security API の認証情報を収集する
API キーを作成
- Keeper 管理コンソールにログインします。
- [管理] > [レポートとアラート] に移動します。
- [API キー] をクリックするか、API キー管理セクションに移動します。
- [Create API Key] をクリックします。
- API キーの名前(
Google Security Operations Integrationなど)を入力します。 - 次の詳細をコピーして安全な場所に保存します。
- API キー: 生成された API キーの値
- 秘密鍵: 秘密鍵ファイル(トークン生成に使用)をダウンロードします。
管理コンソールで Keeper Enterprise のエンタープライズ ID を確認します。
API ベース URL を特定する
Keeper API のベース URL は、データセンターのリージョンによって異なります。
地域 API ベース URL 米国 https://keepersecurity.comEU https://keepersecurity.euAU https://keepersecurity.com.auCA https://keepersecurity.caJP https://keepersecurity.jpUS GovCloud https://govcloud.keepersecurity.us
権限を確認する
アカウントに必要な権限があることを確認するには:
- Keeper 管理コンソールにログインします。
- [管理] > [レポートとアラート] に移動します。
- [レポートとアラート] ダッシュボードと [API キー] セクションが表示されていれば、必要な権限が付与されています。
- このオプションが表示されない場合は、Keeper 管理者に連絡して ARAM モジュールを有効にし、管理者権限を付与してもらってください。
テスト API へのアクセス
統合に進む前に、認証情報をテストします。
# Replace with your actual credentials KEEPER_API_KEY="your-api-key" KEEPER_BASE="https://keepersecurity.com" # Test API access - get audit events (requires signed token) # Note: Keeper uses a signed JWT for authentication. # Refer to Keeper Commander CLI for testing: keeper audit-report --format json --limit 1
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を入力します。
- サービス アカウント名: 「
keeper-logs-collector-sa」と入力します。 - サービス アカウントの説明:
Service account for Cloud Run function to collect Keeper Enterprise Security logsと入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の操作に必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可します
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(例:
keeper-audit-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を入力します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
keeper-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [ストレージ オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions の関数がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を入力します。
- トピック ID: 「
keeper-logs-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、Keeper Reporting and Alerts API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 keeper-logs-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
keeper-logs-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
keeper-logs-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETkeeper-audit-logsGCS バケット名 GCS_PREFIXkeeperログファイルの接頭辞 STATE_KEYkeeper/state.json状態ファイルのパス KEEPER_API_BASEhttps://keepersecurity.comKeeper API のベース URL KEEPER_API_KEYyour-api-keyKeeper API キー KEEPER_PRIVATE_KEYbase64-encoded-private-keyBase64 でエンコードされた秘密鍵 KEEPER_ENTERPRISE_ID12345エンタープライズ ID MAX_RECORDS5000実行あたりの最大レコード数 PAGE_SIZE10001 ページあたりのレコード数 LOOKBACK_HOURS24最初のルックバック期間 [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: 1 を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数]: 「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数]: 「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [エントリ ポイント] フィールドに「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
最初のファイル - main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 import hashlib import hmac # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'keeper') STATE_KEY = os.environ.get('STATE_KEY', 'keeper/state.json') KEEPER_API_BASE = os.environ.get('KEEPER_API_BASE', 'https://keepersecurity.com') KEEPER_API_KEY = os.environ.get('KEEPER_API_KEY') KEEPER_PRIVATE_KEY = os.environ.get('KEEPER_PRIVATE_KEY') KEEPER_ENTERPRISE_ID = os.environ.get('KEEPER_ENTERPRISE_ID') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) def get_auth_token(): """ Generate authentication token for Keeper Reporting API. Uses HMAC-SHA512 signing with the private key. """ api_base = KEEPER_API_BASE.rstrip('/') token_url = f"{api_base}/api/rest/enterprise/auth/token" # Build token request timestamp = str(int(time.time())) message = f"{KEEPER_API_KEY}:{timestamp}" # Decode private key from base64 private_key_bytes = base64.b64decode(KEEPER_PRIVATE_KEY) # Sign with HMAC-SHA512 signature = hmac.new( private_key_bytes, message.encode('utf-8'), hashlib.sha512 ).digest() signature_b64 = base64.b64encode(signature).decode('utf-8') body = json.dumps({ 'api_key': KEEPER_API_KEY, 'timestamp': timestamp, 'signature': signature_b64, 'enterprise_id': KEEPER_ENTERPRISE_ID }) headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } backoff = 1.0 for attempt in range(3): response = http.request('POST', token_url, body=body, headers=headers) if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429) on token request. Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue if response.status != 200: raise RuntimeError(f"Failed to get auth token: {response.status} - {response.data.decode('utf-8')}") data = json.loads(response.data.decode('utf-8')) return data.get('token', data.get('access_token')) raise RuntimeError("Failed to get auth token after 3 retries") @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Keeper Enterprise Security audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, KEEPER_API_KEY, KEEPER_PRIVATE_KEY, KEEPER_ENTERPRISE_ID]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Get auth token token = get_auth_token() # Fetch logs records, newest_event_time = fetch_logs( token=token, start_time=last_time, end_time=now, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(token: str, start_time: datetime, end_time: datetime, page_size: int, max_records: int): """ Fetch audit event logs from Keeper Reporting and Alerts API with pagination and rate limiting. Args: token: Authentication token start_time: Start time for log query end_time: End time for log query page_size: Number of records per page max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ api_base = KEEPER_API_BASE.rstrip('/') endpoint = f"{api_base}/api/rest/enterprise/audit-events" headers = { 'Authorization': f'Bearer {token}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-KeeperCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 # Convert to Unix epoch seconds for Keeper API start_epoch = int(start_time.timestamp()) end_epoch = int(end_time.timestamp()) cursor = None while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request body body = { 'start_time': start_epoch, 'end_time': end_epoch, 'limit': min(page_size, max_records - len(records)), 'enterprise_id': KEEPER_ENTERPRISE_ID } if cursor: body['cursor'] = cursor try: response = http.request( 'POST', endpoint, body=json.dumps(body), headers=headers ) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('audit_events', data.get('events', [])) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: event_ts = event.get('timestamp') or event.get('created') if event_ts: if isinstance(event_ts, (int, float)): event_dt = datetime.fromtimestamp(event_ts, tz=timezone.utc) event_time = event_dt.isoformat() else: event_time = str(event_ts) if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results cursor = data.get('cursor') or data.get('next_cursor') if not cursor: print("No more pages (no next cursor)") break except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time2 つ目のファイル - requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 keeper-logs-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨) ターゲット タイプ Pub/Sub トピック トピック keeper-logs-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
| 頻度 | CRON 式 | ユースケース |
|---|---|---|
| 5 分おき | */5 * * * * |
大容量、低レイテンシ |
| 15 分ごと | */15 * * * * |
検索ボリュームが普通 |
| 1 時間ごと | 0 * * * * |
標準(推奨) |
| 6 時間ごと | 0 */6 * * * |
少量、バッチ処理 |
| 毎日 | 0 0 * * * |
履歴データの収集 |
統合をテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
keeper-logs-collectorをクリックします。- [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。以下のものを探します。
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://keeper-audit-logs/keeper/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X records[Cloud Storage] > [バケット] に移動します。
バケット名(
keeper-audit-logs)をクリックします。keeper/フォルダに移動します。現在のタイムスタンプで新しい
.ndjsonファイルが作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で API 認証情報を確認する
- HTTP 403: Keeper 管理コンソールで、アカウントに必要な ARAM 権限があることを確認します
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
Keeper Enterprise Security のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Keeper Enterprise Security Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Keeper Enterprise Security] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウントのメールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comそのメールアドレスをコピーします。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://keeper-audit-logs/keeper/- 次のように置き換えます。
keeper-audit-logs: GCS バケット名。keeper: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
- 次のように置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 過去の日数内に変更されたファイルを含めます(デフォルトは 180 日)。
アセットの名前空間: アセットの名前空間
Ingestion labels: このフィードのイベントに適用されるラベル
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者のロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を入力します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
- [保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| logData.channel | extensions.auth.type | ["SSO", "MACHINE", "VPN", "PHYSICAL", "TACACS"] のいずれかの場合は大文字の値に設定し、それ以外の場合は「AUTHTYPE_UNSPECIFIED」に設定します。 |
| logData.timestamp | metadata.event_timestamp | ISO8601 形式を使用して変換 |
| logData.audit_event、logData.remote_address | metadata.event_type | audit_event が「login」と一致し、remote_address が空でない場合は「USER_LOGIN」に設定します。remote_address が空でない場合は「STATUS_UPDATE」に設定します。それ以外の場合は「GENERIC_EVENT」に設定します。 |
| logData.audit_event | metadata.product_event_type | 値を直接コピーしました |
| logData.enterprise_id | metadata.product_log_id | 文字列に変換しました |
| metadata.product_name | 「KEEPER」に設定 | |
| metadata.vendor_name | 「KEEPER」に設定 | |
| logData.client_version | network.http.parsed_user_agent | 値を直接コピーしてから parseduseragent に変換 |
| logData.client_version | network.http.user_agent | 値を直接コピーしました |
| logData.remote_address | principal.asset.ip | 値を直接コピーしました |
| logData.remote_address | principal.ip | 値を直接コピーしました |
| logData.category | security_result.category_details | ソースから統合 |
| logData.shared_folder_uid、logData.folder_uid | security_result.detection_fields | キー「shared_folder_uid」と「folder_uid」を持つラベルとして統合されました |
| logData.email | target.user.email_addresses | ソースから統合 |
| logData.username | target.user.userid | 値を直接コピーしました |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。