DigiCert の監査ログを収集する
このドキュメントでは、Google Cloud Storage を使用して DigiCert 監査ログを Google Security Operations に取り込む方法について説明します。DigiCert CertCentral は、証明書のライフサイクル管理プラットフォームです。証明書のオペレーション、ユーザー アクティビティ、管理アクションの監査ログを提供します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- DigiCert CertCentral への特権アクセス(管理者ロールの API キー)
Google Cloud Storage バケットを作成する
- Google Cloud Console に移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( digicert-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション ロケーションを選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
DigiCert API 認証情報を収集する
DigiCert API キーを取得する
- DigiCert CertCentral にログインします。
- [アカウント] > [API キー] に移動します。
- [Create API Key] をクリックします。
- 次の構成の詳細を指定します。
- 名前: わかりやすい名前を入力します(例:
Chronicle Integration)。 - ロール: [管理者] を選択します。
- 名前: わかりやすい名前を入力します(例:
- [作成] をクリックします。
- API キー(
X-DC-DEVKEY)をコピーして保存します。この値は再度表示されることはありません。
DigiCert レポート ID を取得する
- DigiCert CertCentral で、[Reports > Report Library] に移動します。
- [レポートを作成] をクリックします。
- 次の構成の詳細を指定します。
- レポートのタイプ: [監査ログ] を選択します。
- 形式: [JSON] を選択します。
- 名前: わかりやすい名前を入力します(例:
Chronicle Audit Logs)。
- [作成] をクリックします。
レポート ID(UUID 形式)をコピーして保存します。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名: 「
digicert-logs-collector-sa」と入力します。 - サービス アカウントの説明: 「
Service account for Cloud Run function to collect DigiCert audit logs」と入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の目的で必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可する
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
digicert-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [Storage オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
digicert-audit-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、DigiCert API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 digicert-audit-logs-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、Pub/Sub トピック(
digicert-audit-trigger)を選択します。 - [保存] をクリックします。
[認証] セクションで、次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウントを選択します(
digicert-logs-collector-sa)。
- サービス アカウント: サービス アカウントを選択します(
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 GCS_BUCKETdigicert-logsGCS_PREFIXdigicert/logsSTATE_KEYdigicert/logs/state.jsonDIGICERT_API_KEYxxxxxxxxxxxxxxxxxxxxxxxxDIGICERT_REPORT_ID88de5e19-ec57-4d70-865d-df953b062574REQUEST_TIMEOUT30POLL_INTERVAL10MAX_WAIT_SECONDS300[変数とシークレット] タブで [リクエスト] まで下にスクロールします。
- リクエスト タイムアウト:
900秒(15 分)を入力します。
- リクエスト タイムアウト:
[コンテナ] の [設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: [1] を選択します。
- [完了] をクリックします。
- [リソース] セクションで次の操作を行います。
[実行環境] までスクロールします。
- [デフォルト](推奨)を選択します。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数] に「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数] に「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスを作成すると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [関数のエントリ ポイント] に「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- 最初のファイル: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import io import gzip import zipfile import uuid # Initialize HTTP client http = urllib3.PoolManager() # Initialize Storage client storage_client = storage.Client() API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch DigiCert audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'digicert/logs').rstrip('/') state_key = os.environ.get('STATE_KEY', f'{prefix}/state.json') api_key = os.environ.get('DIGICERT_API_KEY') report_id = os.environ.get('DIGICERT_REPORT_ID') max_wait = int(os.environ.get('MAX_WAIT_SECONDS', '300')) poll_int = int(os.environ.get('POLL_INTERVAL', '10')) timeout = int(os.environ.get('REQUEST_TIMEOUT', '30')) if not all([bucket_name, api_key, report_id]): print('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(bucket_name) # Load state state = load_state(bucket, state_key) last_run = state.get('last_run_id') # Start report run started = datetime.now(timezone.utc) start_report_run(api_key, report_id, timeout) # Wait for report to be ready run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int) # Skip if same run as last time if last_run and last_run == run_id: print(f'Skipping duplicate run: {run_id}') return # Get report data rows = get_json_rows(api_key, report_id, run_id, timeout) # Write to GCS key = write_ndjson_gz(bucket, prefix, rows, run_id) # Update state save_state(bucket, state_key, { 'last_run_id': run_id, 'last_success_at': datetime.now(timezone.utc).isoformat(), 'last_s3_key': key, 'rows_count': len(rows) }) print(f'Successfully processed {len(rows)} logs to {key}') except Exception as e: print(f'Error processing logs: {str(e)}') raise def http_request(method, url, api_key, body=None, timeout=30, max_retries=5): """Make HTTP request with retry logic.""" headers = { 'X-DC-DEVKEY': api_key, 'Content-Type': 'application/json', 'User-Agent': USER_AGENT } attempt, backoff = 0, 1.0 while True: try: response = http.request( method, url, headers=headers, body=body, timeout=timeout ) status = response.status # Retry on server errors if 500 <= status <= 599 and attempt < max_retries: attempt += 1 time.sleep(backoff) backoff *= 2 continue # Retry on rate limit if status == 429 and attempt < max_retries: retry_after = response.headers.get('Retry-After') delay = float(retry_after) if retry_after and retry_after.isdigit() else backoff attempt += 1 time.sleep(delay) backoff *= 2 continue if status not in (200, 201): raise RuntimeError(f'HTTP {status}: {response.data[:200]}') return status, response.headers, response.data except urllib3.exceptions.HTTPError as e: if attempt < max_retries: attempt += 1 time.sleep(backoff) backoff *= 2 continue raise def start_report_run(api_key, report_id, timeout): """Start a new report run.""" status, _, body = http_request( 'POST', f'{API_BASE}/report/{report_id}/run', api_key, b'{}', timeout ) if status not in (200, 201): raise RuntimeError(f'Start run failed: {status} {body[:200]}') def list_report_history(api_key, status_filter=None, report_type=None, limit=100, timeout=30): """List report history.""" params = { 'limit': str(limit), 'offset': '0', 'sort_by': 'report_start_date', 'sort_direction': 'DESC' } if status_filter: params['status'] = status_filter if report_type: params['report_type'] = report_type query_string = '&'.join([f'{k}={v}' for k, v in params.items()]) url = f'{API_BASE}/report/history?{query_string}' status, _, body = http_request('GET', url, api_key, timeout=timeout) if status != 200: raise RuntimeError(f'History failed: {status} {body[:200]}') return json.loads(body.decode('utf-8')) def find_ready_run(api_key, report_id, started_not_before, timeout, max_wait_seconds, poll_interval): """Find a ready report run.""" deadline = time.time() + max_wait_seconds while time.time() < deadline: hist = list_report_history( api_key, status_filter='READY', limit=200, timeout=timeout ).get('report_history', []) for item in hist: if item.get('report_identifier') != report_id: continue if not item.get('report_run_identifier'): continue try: rsd = datetime.strptime( item.get('report_start_date', ''), '%Y-%m-%d %H:%M:%S' ).replace(tzinfo=timezone.utc) except Exception: rsd = started_not_before if rsd + timedelta(seconds=60) >= started_not_before: return item['report_run_identifier'] time.sleep(poll_interval) raise TimeoutError('READY run not found in time') def get_json_rows(api_key, report_id, run_id, timeout): """Get JSON rows from report.""" status, headers, body = http_request( 'GET', f'{API_BASE}/report/{report_id}/{run_id}/json', api_key, timeout=timeout ) if status != 200: raise RuntimeError(f'Get JSON failed: {status} {body[:200]}') # Check if response is ZIP content_type = headers.get('content-type', '').lower() if 'application/zip' in content_type or body[:2] == b'PK': with zipfile.ZipFile(io.BytesIO(body)) as zf: json_files = [n for n in zf.namelist() if n.lower().endswith('.json')] if not json_files: raise RuntimeError('ZIP has no JSON') rows = json.loads(zf.read(json_files[0]).decode('utf-8')) else: rows = json.loads(body.decode('utf-8')) if not isinstance(rows, list): raise RuntimeError('Unexpected JSON format') return rows def write_ndjson_gz(bucket, prefix, rows, run_id): """Write NDJSON gzipped file to GCS.""" ts = datetime.now(timezone.utc).strftime('%Y/%m/%d/%H%M%S') key = f'{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz' buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode='wb') as gz: for r in rows: gz.write((json.dumps(r, separators=(',', ':')) + '\n').encode('utf-8')) blob = bucket.blob(key) blob.upload_from_string( buf.getvalue(), content_type='application/x-ndjson', content_encoding='gzip' ) return key def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f'Warning: Could not load state: {str(e)}') return {} def save_state(bucket, key, state): """Save state to GCS.""" try: blob = bucket.blob(key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) except Exception as e: print(f'Warning: Could not save state: {str(e)}')- 2 つ目のファイル: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 digicert-audit-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨)。 ターゲット タイプ Pub/Sub トピック Pub/Sub トピック( digicert-audit-trigger)を選択するメッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
頻度 CRON 式 ユースケース 5 分毎 */5 * * * *大容量、低レイテンシ 15 分ごと */15 * * * *検索量が普通 1 時間ごと 0 * * * *標準(推奨) 6 時間ごと 0 */6 * * *少量、バッチ処理 毎日 0 0 * * *履歴データの収集
スケジューラ ジョブをテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして手動でトリガーします。
- 数秒待ってから、[Cloud Run] > [サービス] > [digicert-audit-logs-collector] > [ログ] に移動します。
- 関数が正常に実行されたことを確認します。
- GCS バケットをチェックして、ログが書き込まれたことを確認します。
Google SecOps サービス アカウントを取得する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
DigiCert Audit Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Digicert] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [ストレージ オブジェクト閲覧者] を選択します。
[保存] をクリックします。
DigiCert のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
DigiCert Audit Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Digicert] を選択します。
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://digicert-logs/digicert/logs/次のように置き換えます。
digicert-logs: GCS バケット名。digicert/logs: ログが保存される接頭辞/フォルダパス。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
ご不明な点がございましたら、コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。