NetDocuments のログを収集する
このドキュメントでは、Google Cloud Storage V2 を使用して NetDocuments ログを Google Security Operations に取り込む方法について説明します。
NetDocuments は、法務および専門サービス組織向けに設計されたクラウドベースのドキュメント管理プラットフォームです。NetDocuments REST API を介してドキュメントのアクセス、変更、共有、管理アクティビティを追跡する監査ログを生成します。
始める前に
次の前提条件を満たしていることを確認します。
- Google SecOps インスタンス
- Cloud Storage API が有効になっている GCP プロジェクト
- GCS バケットを作成および管理する権限
- GCS バケットの IAM ポリシーを管理する権限
- Cloud Run サービス、Pub/Sub トピック、Cloud Scheduler ジョブを作成する権限
- NetDocuments への特権アクセス(リポジトリ管理者または組織管理者ロール)
- OAuth2 認証情報(クライアント ID、クライアント シークレット、更新トークン)を含む NetDocuments アプリケーション登録
Google Cloud Storage バケットを作成する
- Google Cloud コンソールに移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前を入力します(例: netdocuments-audit-logs)。ロケーション タイプ ニーズに応じて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション 場所を選択します(例: us-central1)。ストレージ クラス Standard(頻繁にアクセスされるログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
NetDocuments API 認証情報を収集する
アプリケーションの登録
- 管理者アカウントで NetDocuments 開発者ポータルにログインします。
- [アプリケーション] に移動し、[アプリケーションの登録] をクリックします。
- アプリケーションの名前を入力します(例:
Google Security Operations Integration)。 - [アプリケーションの種類] を [サーバー(機密)] に設定します。
- 次の認証情報をメモします。
- クライアント ID: OAuth2 クライアント識別子
- クライアント シークレット: OAuth2 クライアント シークレット
更新トークンを生成する
- NetDocuments OAuth2 認証フローを使用して更新トークンを取得します。
- 必要なスコープを使用してアプリケーションを承認します。
read(ドキュメントと監査ログへの読み取りアクセス)full(組織で必要な場合はフルアクセス)
OAuth2 認可コードフローを完了し、更新トークンを安全に保存します。
API ベース URL を特定する
NetDocuments API のベース URL は、データセンターのリージョンによって異なります。
| 地域 | API ベース URL |
|---|---|
| 米国 | https://api.netdocuments.com/v2 |
| EU | https://api.eu.netdocuments.com/v2 |
| AU | https://api.au.netdocuments.com/v2 |
権限を確認する
アカウントに必要な権限があることを確認するには:
- NetDocuments 管理ポータルにログインします。
- [リポジトリ管理者] > [アクティビティ ログ] に移動します。
- アクティビティ ログ セクションが表示されている場合は、必要な権限が付与されています。
- このオプションが表示されない場合は、NetDocuments 管理者に連絡して、リポジトリ管理者または組織管理者のアクセス権を付与してもらってください。
Cloud Run functions のサービス アカウントを作成する
Cloud Run 関数には、GCS バケットに書き込み、Pub/Sub によって呼び出される権限を持つサービス アカウントが必要です。
サービス アカウントの作成
- GCP Console で、[IAM と管理>サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を入力します。
- サービス アカウント名: 「
netdocuments-logs-collector-sa」と入力します。 - サービス アカウントの説明:
Service account for Cloud Run function to collect NetDocuments logsと入力します。
- サービス アカウント名: 「
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックします。
- [ストレージ オブジェクト管理者] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Run 起動元] を検索して選択します。
- [+ 別のロールを追加] をクリックします。
- [Cloud Functions 起動元] を検索して選択します。
- [続行] をクリックします。
- [完了] をクリックします。
これらのロールは、次の操作に必要です。
- Storage オブジェクト管理者: ログを GCS バケットに書き込み、状態ファイルを管理する
- Cloud Run 起動元: Pub/Sub が関数を呼び出すことを許可します
- Cloud Functions 起動元: 関数の呼び出しを許可する
GCS バケットに対する IAM 権限を付与する
GCS バケットに対する書き込み権限をサービス アカウントに付与します。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(例:
netdocuments-audit-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を入力します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
netdocuments-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com)を入力します。 - ロールを割り当てる: [ストレージ オブジェクト管理者] を選択します。
- プリンシパルを追加: サービス アカウントのメールアドレス(例:
- [保存] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler がパブリッシュし、Cloud Run functions の関数がサブスクライブする Pub/Sub トピックを作成します。
- GCP Console で、[Pub/Sub> トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を入力します。
- トピック ID: 「
netdocuments-logs-trigger」と入力します。 - その他の設定はデフォルトのままにします。
- トピック ID: 「
- [作成] をクリックします。
ログを収集する Cloud Run 関数を作成する
Cloud Run 関数は、Cloud Scheduler からの Pub/Sub メッセージによってトリガーされ、NetDocuments REST API からログを取得して GCS に書き込みます。
- GCP Console で、[Cloud Run] に移動します。
- [サービスを作成] をクリックします。
- [関数] を選択します(インライン エディタを使用して関数を作成します)。
[構成] セクションで、次の構成の詳細を指定します。
設定 値 サービス名 netdocuments-logs-collectorリージョン GCS バケットと一致するリージョンを選択します(例: us-central1)。ランタイム [Python 3.12] 以降を選択します。 [トリガー(省略可)] セクションで、次の操作を行います。
- [+ トリガーを追加] をクリックします。
- [Cloud Pub/Sub] を選択します。
- [Cloud Pub/Sub トピックを選択してください] で、トピック
netdocuments-logs-triggerを選択します。 - [保存] をクリックします。
[認証] セクションで次の操作を行います。
- [認証が必要] を選択します。
- Identity and Access Management(IAM)を確認します。
下にスクロールして、[コンテナ、ネットワーキング、セキュリティ] を開きます。
[セキュリティ] タブに移動します。
- サービス アカウント: サービス アカウント
netdocuments-logs-collector-saを選択します。
- サービス アカウント: サービス アカウント
[コンテナ] タブに移動します。
- [変数とシークレット] をクリックします。
- 環境変数ごとに [+ 変数を追加] をクリックします。
変数名 値の例 説明 GCS_BUCKETnetdocuments-audit-logsGCS バケット名 GCS_PREFIXnetdocumentsログファイルの接頭辞 STATE_KEYnetdocuments/state.json状態ファイルのパス ND_API_BASEhttps://api.netdocuments.com/v2NetDocuments API のベース URL ND_CLIENT_IDyour-client-idOAuth2 クライアント ID ND_CLIENT_SECRETyour-client-secretOAuth2 クライアント シークレット ND_REFRESH_TOKENyour-refresh-tokenOAuth2 更新トークン MAX_RECORDS5000実行あたりの最大レコード数 PAGE_SIZE10001 ページあたりのレコード数 LOOKBACK_HOURS24最初のルックバック期間 [変数とシークレット] セクションで、[リクエスト] まで下にスクロールします。
- リクエストのタイムアウト:
600秒(10 分)を入力します。
- リクエストのタイムアウト:
[設定] タブに移動します。
- [リソース] セクションで次の操作を行います。
- メモリ: 512 MiB 以上を選択します。
- CPU: 1 を選択します。
- [リソース] セクションで次の操作を行います。
[リビジョン スケーリング] セクションで、次の操作を行います。
- [インスタンスの最小数]: 「
0」と入力します。 - インスタンスの最大数:
100と入力します(または、予想される負荷に基づいて調整します)。
- [インスタンスの最小数]: 「
[作成] をクリックします。
サービスが作成されるまで待ちます(1 ~ 2 分)。
サービスが作成されると、インライン コードエディタが自動的に開きます。
関数コードを追加する
- [エントリ ポイント] フィールドに「main」と入力します。
インライン コードエディタで、次の 2 つのファイルを作成します。
- main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'netdocuments') STATE_KEY = os.environ.get('STATE_KEY', 'netdocuments/state.json') ND_API_BASE = os.environ.get('ND_API_BASE', 'https://api.netdocuments.com/v2') ND_CLIENT_ID = os.environ.get('ND_CLIENT_ID') ND_CLIENT_SECRET = os.environ.get('ND_CLIENT_SECRET') ND_REFRESH_TOKEN = os.environ.get('ND_REFRESH_TOKEN') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) def to_unix_millis(dt: datetime) -> int: """Convert datetime to Unix epoch milliseconds.""" if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt = dt.astimezone(timezone.utc) return int(dt.timestamp() * 1000) def parse_datetime(value: str) -> datetime: """Parse ISO datetime string to datetime object.""" if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value) def get_access_token(): """ Obtain an OAuth2 access token using the refresh token. """ token_url = "https://api.netdocuments.com/v1/OAuth" # Build Basic auth header credentials = base64.b64encode( f"{ND_CLIENT_ID}:{ND_CLIENT_SECRET}".encode('utf-8') ).decode('utf-8') headers = { 'Authorization': f'Basic {credentials}', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } body = f"grant_type=refresh_token&refresh_token={ND_REFRESH_TOKEN}" backoff = 1.0 for attempt in range(3): response = http.request('POST', token_url, body=body, headers=headers) if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429) on token request. Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue if response.status != 200: raise RuntimeError(f"Failed to get access token: {response.status} - {response.data.decode('utf-8')}") data = json.loads(response.data.decode('utf-8')) return data['access_token'] raise RuntimeError("Failed to get access token after 3 retries") @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch NetDocuments audit logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, ND_CLIENT_ID, ND_CLIENT_SECRET, ND_REFRESH_TOKEN]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_time = None if isinstance(state, dict) and state.get("last_event_time"): try: last_time = parse_datetime(state["last_event_time"]) # Overlap by 2 minutes to catch any delayed events last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") if last_time is None: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") # Get access token token = get_access_token() # Fetch logs records, newest_event_time = fetch_logs( token=token, start_time=last_time, end_time=now, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) if not records: print("No new log records found.") save_state(bucket, STATE_KEY, now.isoformat()) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state with newest event time if newest_event_time: save_state(bucket, STATE_KEY, newest_event_time) else: save_state(bucket, STATE_KEY, now.isoformat()) print(f"Successfully processed {len(records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_event_time_iso: str): """Save the last event timestamp to GCS state file.""" try: state = {'last_event_time': last_event_time_iso} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(token: str, start_time: datetime, end_time: datetime, page_size: int, max_records: int): """ Fetch audit logs from NetDocuments REST API with pagination and rate limiting. Args: token: OAuth2 access token start_time: Start time for log query end_time: End time for log query page_size: Number of records per page max_records: Maximum total records to fetch Returns: Tuple of (records list, newest_event_time ISO string) """ api_base = ND_API_BASE.rstrip('/') endpoint = f"{api_base}/AuditLog/search" headers = { 'Authorization': f'Bearer {token}', 'Accept': 'application/json', 'Content-Type': 'application/json', 'User-Agent': 'GoogleSecOps-NetDocumentsCollector/1.0' } records = [] newest_time = None page_num = 0 backoff = 1.0 offset = 0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records})") break # Build request body body = { 'startDate': start_time.strftime('%Y-%m-%dT%H:%M:%SZ'), 'endDate': end_time.strftime('%Y-%m-%dT%H:%M:%SZ'), '$top': min(page_size, max_records - len(records)), '$skip': offset } try: response = http.request( 'POST', endpoint, body=json.dumps(body), headers=headers ) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [], None data = json.loads(response.data.decode('utf-8')) page_results = data.get('results', data.get('value', [])) if not page_results: print(f"No more results (empty page)") break print(f"Page {page_num}: Retrieved {len(page_results)} events") records.extend(page_results) # Track newest event time for event in page_results: try: event_ts = event.get('date') or event.get('timestamp') or event.get('created') if event_ts: event_time = str(event_ts) if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time): newest_time = event_time except Exception as e: print(f"Warning: Could not parse event time: {e}") # Check for more results offset += len(page_results) if len(page_results) < page_size: print("No more pages (partial page received)") break except Exception as e: print(f"Error fetching logs: {e}") return [], None print(f"Retrieved {len(records)} total records from {page_num} pages") return records, newest_time- requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0[デプロイ] をクリックして、関数を保存してデプロイします。
デプロイが完了するまで待ちます(2 ~ 3 分)。
Cloud Scheduler ジョブの作成
Cloud Scheduler は、定期的に Pub/Sub トピックにメッセージをパブリッシュし、Cloud Run functions の関数をトリガーします。
- GCP Console で、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
設定 値 名前 netdocuments-logs-collector-hourlyリージョン Cloud Run functions と同じリージョンを選択する 周波数 0 * * * *(1 時間ごとに正時)タイムゾーン タイムゾーンを選択します(UTC を推奨) ターゲット タイプ Pub/Sub トピック トピック netdocuments-logs-triggerを選択します。メッセージ本文 {}(空の JSON オブジェクト)[作成] をクリックします。
スケジュールの頻度のオプション
ログの量とレイテンシの要件に基づいて頻度を選択します。
| 頻度 | CRON 式 | ユースケース |
|---|---|---|
| 5 分おき | */5 * * * * |
大容量、低レイテンシ |
| 15 分ごと | */15 * * * * |
検索ボリュームが普通 |
| 1 時間ごと | 0 * * * * |
標準(推奨) |
| 6 時間ごと | 0 */6 * * * |
少量、バッチ処理 |
| 毎日 | 0 0 * * * |
履歴データの収集 |
統合をテストする
- Cloud Scheduler コンソールで、ジョブを見つけます。
- [強制実行] をクリックして、ジョブを手動でトリガーします。
- 数秒待ちます。
- Cloud Run > サービスに移動します。
netdocuments-logs-collectorをクリックします。- [Logs] タブをクリックします。
関数が正常に実行されたことを確認します。以下のものを探します。
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Page 1: Retrieved X events Wrote X records to gs://netdocuments-audit-logs/netdocuments/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X records[Cloud Storage] > [バケット] に移動します。
バケット名(
netdocuments-audit-logs)をクリックします。netdocuments/フォルダに移動します。現在のタイムスタンプで新しい
.ndjsonファイルが作成されたことを確認します。
ログにエラーが表示された場合:
- HTTP 401: 環境変数で OAuth2 認証情報を確認する
- HTTP 403: アカウントに必要なリポジトリ管理者または組織管理者の権限があることを確認する
- HTTP 429: レート制限 - 関数はバックオフで自動的に再試行されます
- 環境変数が不足している: 必要な変数がすべて設定されていることを確認します
NetDocuments のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
NetDocuments Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [NetDocuments] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウントのメールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーします。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://netdocuments-audit-logs/netdocuments/- 次のように置き換えます。
netdocuments-audit-logs: GCS バケット名。netdocuments: ログが保存されるオプションの接頭辞/フォルダパス(ルートの場合は空のままにします)。
- 次のように置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 過去の日数内に変更されたファイルを含めます(デフォルトは 180 日)。
アセットの名前空間: アセットの名前空間
Ingestion labels: このフィードのイベントに適用されるラベル
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、GCS バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名をクリックします。
- [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を入力します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレスを貼り付けます。
- ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
- [保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| description、name | metadata.description | イベントの説明(人が読める形式)。 |
| 日付 | metadata.event_timestamp | イベントが発生した時刻。 |
| metadata.event_type | イベントタイプ。 | |
| source.id | principal.asset.hostname | プリンシパルに関連付けられたアセットのホスト名。 |
| ホスト | principal.asset.ip | プリンシパルに関連付けられたアセットの IP アドレス。 |
| source.id | principal.hostname | プリンシパルに関連付けられたホスト名。 |
| ホスト | principal.ip | プリンシパルに関連付けられた IP アドレス。 |
| source.name | principal.resource.attribute.labels | プリンシパルのリソースのラベルのマップ。 |
| user.email | principal.user.email_addresses | ユーザーに関連付けられているメールアドレス。 |
| user.memberType | principal.user.role_name | ユーザーのロール名。 |
| user.name | principal.user.user_display_name | ユーザーの表示名。 |
| user.id | principal.user.userid | プリンシパルのユーザー ID。 |
| storageObject.fileExtension | target.file.mime_type | ターゲット ファイルの MIME タイプ。 |
| storageObject.name | target.file.names | ターゲット ファイルの名前。 |
| storageObject.size | target.file.size | ターゲット ファイルのサイズ(バイト単位)。 |
| storageObject.version、storageObject.collabSpace、storageObject.NetBinder、storageObject.cabinet.name、storageObject.cabinet.id | target.resource.attribute.labels | ターゲット リソースのラベルのマップ。 |
| storageObject.docId | target.resource.product_object_id | ターゲット リソースのプロダクト固有のオブジェクト ID。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。