Digital Guardian EDR のログを収集する
このドキュメントでは、Cloud Run 関数を介して Google Cloud Storage V2 を使用して Digital Guardian EDR ログを Google Security Operations に取り込む方法について説明します。
Fortra の Digital Guardian(旧 Digital Guardian)は、包括的なデータ損失防止およびエンドポイント検出対応プラットフォームです。エンドポイント、ネットワーク、クラウド アプリケーション全体にわたって、システム、ユーザー、データ イベントの可視性を提供します。Analytics & Reporting Cloud(ARC)サービスは、包括的なデータ保護のための高度な分析、ワークフロー、レポート機能を提供します。Cloud Run 関数は OAuth 2.0 を使用して ARC Export API に対して認証を行い、エクスポート データを取得し、ブックマークを確認して次のチャンクに進み、結果を NDJSON として GCS バケットに書き込みます。Google SecOps は GCS V2 フィードを介してそれらを取り込みます。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- 次の API が有効になっている Google Cloud プロジェクト。
- Cloud Storage
- Cloud Run functions
- Cloud Scheduler
- Pub/Sub
- Cloud Build
- Cloud Storage バケット、Cloud Run functions、Pub/Sub トピック、Cloud Scheduler ジョブを作成して管理する権限
- Digital Guardian Management Console(DGMC)への特権アクセス
- Digital Guardian Analytics & Reporting Cloud(ARC)テナント設定へのアクセス
- DGMC で Cloud Services を構成するための管理者権限
- 有効な GUID を使用して DGMC で作成されたエクスポート プロファイル
Google Cloud Storage バケットを作成する
- Google Cloud コンソールに移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( digitalguardian-edr-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション Google SecOps インスタンスに最も近いロケーションを選択します(例: us-central1)。ストレージ クラス Standard(アクセス頻度の高いログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Digital Guardian API 認証情報を収集する
Cloud Run 関数が Digital Guardian ARC からエクスポート データを取得できるようにするには、API 認証情報を取得してエクスポート プロファイルを構成する必要があります。
DGMC から API 認証情報を取得する
- Digital Guardian Management Console(DGMC)にログインします。
- [システム> 構成> クラウド サービス] に移動します。
[API アクセス] セクションで、次の値を見つけて記録します。
- API アクセス ID: OAuth 2.0 認証用のクライアント ID です。
- API Access Secret: OAuth 2.0 認証用のクライアント シークレットです。
- Access Gateway Base URL: API ゲートウェイ エンドポイント(
https://accessgw-usw.msp.digitalguardian.comなど)。 - 認可サーバーの URL: OAuth 2.0 トークン エンドポイント(例:
https://authsrv.msp.digitalguardian.com/as/token.oauth2)。
エクスポート プロファイルを作成して構成する
- Digital Guardian Management Console(DGMC)で、[Admin] > [Reports] > [Export Profiles] に移動します。
- [エクスポート プロファイルを作成] をクリックするか、既存のエクスポート プロファイルを選択します。
- 次の設定でエクスポート プロファイルを構成します。
- プロファイル名: わかりやすい名前を入力します(例:
Google SecOps SIEM Integration)。 - データソース: エクスポートするデータに応じて、[イベント] または [アラート] を選択します。
- エクスポート形式: [JSON Flattened Table] を選択します(SIEM 統合に推奨)。
- フィールド: エクスポートに含めるフィールドを選択します。
- フィルタ: エクスポートするデータを制限するフィルタを構成します(省略可)。
- プロファイル名: わかりやすい名前を入力します(例:
- [保存] をクリックして、エクスポート プロファイルを作成します。
保存したら、リストでエクスポート プロファイルを見つけ、エクスポート プロファイルの URL または詳細ページから GUID をコピーします。
認証情報の記録の概要
Cloud Run functions の環境変数を構成するために、次の情報を保存します。
- クライアント ID(API アクセス ID): DGMC Cloud Services から
- クライアント シークレット(API アクセス シークレット): DGMC Cloud Services から
- 認証サーバーの URL: 例:
https://authsrv.msp.digitalguardian.com/as/token.oauth2 - Access Gateway のベース URL: 例:
https://accessgw-usw.msp.digitalguardian.com - Export Profile GUID: DGMC で作成されたエクスポート プロファイルから
テスト API へのアクセス
次のコマンドを実行して、認証情報が有効であることを確認します。
# Step 1: Obtain OAuth 2.0 access token curl -s -X POST \ -d "grant_type=client_credentials&client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&scope=client" \ "https://authsrv.msp.digitalguardian.com/as/token.oauth2"# Step 2: Test export endpoint with the access token curl -s -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ "https://accessgw-usw.msp.digitalguardian.com/rest/1.0/export/YOUR_EXPORT_PROFILE_GUID"レスポンスが成功すると、エクスポート データを含む JSON ドキュメントが返されます。認証エラーが表示された場合は、DGMC Cloud Services で API アクセス ID とシークレットを確認します。
Cloud Run functions のサービス アカウントを作成する
- Google Cloud コンソールで、[IAM と管理] > [サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名:
digitalguardian-ingestion(またはわかりやすい名前)を入力します。 - サービス アカウントの説明: 「
Service account for Digital Guardian EDR Cloud Run function to write logs to GCS」と入力します。
- サービス アカウント名:
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- Storage オブジェクト管理者(Cloud Storage バケット内のオブジェクトの読み取り/書き込み)
- Cloud Run 起動元(Cloud Scheduler が関数を呼び出すことを許可するため)
- [続行] をクリックします。
[完了] をクリックします。
Pub/Sub トピックの作成
Cloud Scheduler は、Pub/Sub トピックを介して Cloud Run 関数をトリガーします。
- Google Cloud コンソールで、[Pub/Sub>トピック] に移動します。
- [トピックを作成] をクリックします。
- [トピック ID] フィールドに「
digitalguardian-edr-trigger」と入力します。 - デフォルト設定のままにします。
[作成] をクリックします。
Cloud Run functions を作成する
OAuth 2.0 クライアント認証情報を使用して Digital Guardian ARC に対して認証を行い、エクスポート データを取得し、ブックマークを承認して次のチャンクに進み、結果を NDJSON として GCS に書き込む Cloud Run functions の関数を作成します。
関数ソースファイルを準備する
Cloud Run 関数のデプロイ用に次の 2 つのファイルを作成します。
requirements.txt
functions-framework==3.* google-cloud-storage==2.* urllib3==2.*main.py
"""Cloud Run function to ingest Digital Guardian EDR logs into GCS.""" import json import os import time import urllib.parse from datetime import datetime, timezone import functions_framework import urllib3 from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "digitalguardian_edr") STATE_KEY = os.environ.get("STATE_KEY", "digitalguardian_edr_state.json") AUTH_SERVER_URL = os.environ["AUTH_SERVER_URL"] ARC_SERVER_URL = os.environ["ARC_SERVER_URL"] CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] EXPORT_PROFILE_GUID = os.environ["EXPORT_PROFILE_GUID"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) http = urllib3.PoolManager() gcs = storage.Client() def _get_access_token() -> str: """Obtain an OAuth 2.0 access token using client credentials grant.""" body = urllib.parse.urlencode({ "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "scope": "client", }) resp = http.request( "POST", AUTH_SERVER_URL, body=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if resp.status != 200: raise RuntimeError( f"OAuth token request failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) token_data = json.loads(resp.data.decode("utf-8")) return token_data["access_token"] def _arc_get(token: str, path: str, retries: int = 5) -> dict: """Execute a GET request against the ARC API with retry on 429.""" url = f"{ARC_SERVER_URL}{path}" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } backoff = 2 for attempt in range(retries): resp = http.request("GET", url, headers=headers) if resp.status == 200: return json.loads(resp.data.decode("utf-8")) if resp.status == 429: wait = backoff * (2 ** attempt) print( f"Rate limited (429). Retrying in {wait}s " f"(attempt {attempt + 1}/{retries})." ) time.sleep(wait) continue raise RuntimeError( f"ARC API error: {resp.status} — {resp.data.decode('utf-8')}" ) raise RuntimeError( "ARC API rate limit exceeded after maximum retries." ) def _arc_acknowledge(token: str) -> None: """POST to the acknowledge endpoint to advance the export bookmark.""" url = ( f"{ARC_SERVER_URL}/rest/1.0/export/" f"{EXPORT_PROFILE_GUID}/acknowledge" ) headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } resp = http.request("POST", url, headers=headers) if resp.status not in (200, 204): raise RuntimeError( f"ARC acknowledge failed: {resp.status} — " f"{resp.data.decode('utf-8')}" ) print("Export bookmark acknowledged successfully.") def _load_state() -> dict: """Load the last run state from GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.downlodownload_as_text return {} def _save_state(state: dict) -> None: """Persist run state to GCS.""" bucket = gcs.bugcs.bucketUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.uploadupload_from_string json.dumps(state), content_type="application/json" ) def _fetch_export(token: str) -> list: """Fetch export data from the ARC Export API.""" path = f"/rest/1.0/export/{EXPORT_PROFILE_GUID}" data = _arc_get(token, path) records = data if isinstance(data, list) else data.get("data", []) return records[:MAX_RECORDS] def _write_ndjson(records: list, run_ts: str) -> str: """Write records as NDJSON to GCS and return the blob path.""" bucket = gcs.bugcs.bucketUCKET) blob_path = ( f"{GCS_PREFIX}/year={run_ts[:4]}/month={run_ts[5:7]}/" f"day={run_ts[8:10]}/{run_ts}_export.ndjson" ) blob = bucket.blob(blob_path) ndjson = "\n".join( json.dumps(r, separators=(",", ":")) for r in records ) blob.uploadupload_from_stringn, content_type="application/x-ndjson") return blob_path @functions_framework.cloud_event def main(cloud_event): """Entry point triggered by Pub/Sub via Cloud Scheduler.""" state = _load_state() now = datetime.now(timezone.utc) print("Authenticating to Digital Guardian ARC.") token = _get_access_token() print( f"Fetching export data for profile {EXPORT_PROFILE_GUID}." ) records = _fetch_export(token) if not records: print("No new export data found.") return "OK" run_ts = now.strftime("%Y-%m-%dT%H%M%SZ") blob_path = _write_ndjson(records, run_ts) print( f"Wrote {len(records)} records to " f"gs://{GCS_BUCKET}/{blob_path}." ) _arc_acknowledge(token) state["last_run"] = now.isoformat() state["records_written"] = len(records) _save_state(state) print(f"State updated. last_run={now.isoformat()}.") return "OK"
Cloud Run 関数をデプロイする
- 両方のファイル(
main.pyとrequirements.txt)をローカル ディレクトリ(digitalguardian-function/など)に保存します。 - Cloud Shell または
gcloudCLI がインストールされているターミナルを開きます。 次のコマンドを実行して、関数をデプロイします。
gcloud functions deploy digitalguardian-edr-to-gcs \ --gen2 \ --region=us-central1 \ --runtime=python312 \ --trigger-topic=digitalguardian-edr-trigger \ --entry-point=main \ --memory=512MB \ --timeout=540s \ --service-account=digitalguardian-ingestion@PROJECT_ID.iam.gserviceaccount.com \ --set-env-vars=\ "GCS_BUCKET=digitalguardian-edr-logs",\ "GCS_PREFIX=digitalguardian_edr",\ "STATE_KEY=digitalguardian_edr_state.json",\ "AUTH_SERVER_URL=https://authsrv.msp.digitalguardian.com/as/token.oauth2",\ "ARC_SERVER_URL=https://accessgw-usw.msp.digitalguardian.com",\ "CLIENT_ID=YOUR_CLIENT_ID",\ "CLIENT_SECRET=YOUR_CLIENT_SECRET",\ "EXPORT_PROFILE_GUID=YOUR_EXPORT_PROFILE_GUID",\ "MAX_RECORDS=10000"次のプレースホルダの値を置き換えます。
PROJECT_ID: Google Cloud プロジェクト ID。digitalguardian-edr-logs: GCS バケット名。YOUR_CLIENT_ID: Digital Guardian API アクセス ID。YOUR_CLIENT_SECRET: Digital Guardian API アクセス シークレット。YOUR_EXPORT_PROFILE_GUID: DGMC のエクスポート プロファイル GUID。
関数のステータスを確認して、デプロイを確認します。
gcloud functions describe digitalguardian-edr-to-gcs --region=us-central1 --gen2
環境変数のリファレンス
| 変数 | 必須 | デフォルト | 説明 |
|---|---|---|---|
GCS_BUCKET |
○ | NDJSON 出力を保存する GCS バケット名 | |
GCS_PREFIX |
× | digitalguardian_edr |
バケット内のオブジェクト接頭辞(フォルダパス) |
STATE_KEY |
× | digitalguardian_edr_state.json |
接頭辞内の状態ファイルの Blob 名 |
AUTH_SERVER_URL |
○ | OAuth 2.0 認証サーバーの URL | |
ARC_SERVER_URL |
○ | ARC Access Gateway のベース URL | |
CLIENT_ID |
○ | DGMC からの API アクセス ID | |
CLIENT_SECRET |
○ | DGMC からの API アクセス シークレット | |
EXPORT_PROFILE_GUID |
○ | DGMC からプロファイル GUID をエクスポートする | |
MAX_RECORDS |
× | 10000 |
実行ごとに書き込むレコードの最大数 |
Cloud Scheduler ジョブを作成する
Cloud Scheduler は、Pub/Sub トピックを介して Cloud Run functions の関数を定期的にトリガーします。
- Google Cloud コンソールで、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
- 名前: 「
digitalguardian-edr-ingestion-schedule」と入力します。 - リージョン: Cloud Run functions と同じリージョン(
us-central1など)を選択します。 頻度: 「
*/5 * * * *」(5 分ごと)と入力します。タイムゾーン: 任意のタイムゾーン(
UTCなど)を選択します。
- 名前: 「
[続行] をクリックします。
[実行を構成する] セクションで、次の操作を行います。
- ターゲット タイプ: [Pub/Sub] を選択します。
- トピック:
digitalguardian-edr-triggerを選択します。 - メッセージ本文: 「
{"run": true}」と入力します。
[続行] をクリックします。
[オプションの設定を構成する] セクションで、次の操作を行います。
- 最大再試行回数:
3と入力します。 - 最小バックオフ時間: 「
5s」と入力します。 - 最大バックオフ時間: 「
60s」と入力します。
- 最大再試行回数:
[作成] をクリックします。
テストをすぐに実行するには、ジョブ名の横にあるその他アイコン(...)をクリックして、[強制実行] を選択します。
Google SecOps サービス アカウントを取得してフィードを構成する
Google SecOps は、一意のサービス アカウントを使用して GCS バケットからデータを読み取ります。このサービス アカウントにバケットへのアクセス権を付与する必要があります。
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Digital Guardian EDR Logs)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Digital Guardian EDR] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
[次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: GCS バケットの URI を入力します。
gs://digitalguardian-edr-logs/digitalguardian_edr/digitalguardian-edr-logsは、実際の GCS バケット名に置き換えます。digitalguardian_edrは、構成したGCS_PREFIX値に置き換えます。
Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しません(テストにおすすめ)。
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます(デフォルトは 180 日)。
アセットの名前空間: アセットの名前空間。
Ingestion labels: このフィードのイベントに適用されるラベル。
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Google SecOps サービス アカウントには、Cloud Storage バケットに対する Storage オブジェクト閲覧者ロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(例:
digitalguardian-edr-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレス(例:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)を貼り付けます。 - ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
- プリンシパルを追加: Google SecOps サービス アカウントのメールアドレス(例:
[保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| アプリケーション | target.application | 値を直接コピー |
| アプリケーション | target.process.command_line | ルールが .Printer. と一致する場合は %{Application} に設定されます。 |
| Bytes_Written | network.sent_bytes | 値が直接コピーされ、uinteger に変換されます |
| Category、Computer_Name、Detail | metadata.description | Category == "Policies" かつ Computer_Name が空の場合、%{Detail} に設定します。それ以外の場合は、grok_parse_failure で %{message} に設定します。 |
| Command_Line、Command_Line1 | principal.process.command_line | 末尾の引用符を削除した後の Command_Line の値(空でない場合)。空の場合は、末尾の引用符を削除した後の Command_Line1 の値 |
| Computer_Name、source | principal.hostname | computerName が空でない場合はその値、空の場合は %{source} に設定 |
| Destination_Device_Serial_Number、Destination_Device_Serial_Number1 | 引用符を処理する grok パターンを使用して抽出されます | |
| Destination_Directory、Destination_File | target.file.full_path | Destination_Directory と Destination_File の両方が空でない場合は、それらを連結した値 |
| Destination_Drive_Type | security_result.detection_fields | destination_drive_type_label(キー: Destination_Drive_Type、値: %{Destination_Drive_Type})と統合されました |
| Destination_File | target.file.names | Destination_File から統合 |
| Destination_File_Extension | target.file.mime_type | 値を直接コピー |
| Dll_SHA1_Hash | target.process.file.sha1 | 小文字に変換した直後にコピーされた値 |
| Email_Address | principal.user.email_addresses | Email_Address から統合 |
| Email_Sender、Email_Subject | network.email.from | 空でない場合は %{Email_Sender} に設定 |
| Email_Sender、Email_Subject | network.email.subject | Email_Sender が空でない場合は、件名(%{Email_Subject})から統合 |
| File_Extension | principal.process.file.mime_type | 値を直接コピー |
| IP_Address、source_ip | principal.ip | 空でない場合は source_ip から、空の場合は IP_Address から統合されます |
| Local_Port、source_port | principal.port | 空でない場合は source_port の値が整数に変換され、それ以外の場合は Local_Port の値が整数に変換されます。 |
| MD5_Checksum | target.process.file.md5 | 小文字に変換した直後にコピーされた値 |
| Network_Direction | network.direction | True の場合は INBOUND、False の場合は OUTBOUND に設定します。 |
| Process_PID | principal.process.pid | 値を直接コピー |
| Process_SHA256_Hash | target.process.file.sha256 | 小文字に変換した直後にコピーされた値 |
| Product_Version | metadata.product_version | 値を直接コピー |
| プロトコル | network.ip_protocol | ==「1」の場合、ICMP に設定 |
| Remote_Port | target.port | 値が直接コピーされ、整数に変換されます |
| ルール | security_result.rule_name | 値を直接コピー |
| ルール | metadata.event_type | .Printer. と一致する場合は PROCESS_UNCATEGORIZED に設定し、DLP.* と一致する場合は FILE_MOVE に設定します。 |
| 重大度 | security_result.severity | 整数に変換した後、3 以下の場合は LOW、6 以下の場合は MEDIUM、8 以下の場合は HIGH、10 以下の場合は CRITICAL に設定します。 |
| 重大度 | security_result.severity_details | 値を直接コピー |
| Source_Directory、Source_File | src.file.full_path | Source_Directory と Source_File の両方が空でない場合は、それらを連結した値 |
| Source_Drive_Type | security_result.detection_fields | source_drive_type_label(キー: Source_Drive_Type、値: %{Source_Drive_Type})と統合されました |
| Source_File | src.file.names | Source_File から統合されました |
| Source_File_Extension | src.file.mime_type | 値を直接コピー |
| URL_Path、http_url | target.url | http_url が空でない場合はその値、空の場合は URL_Path の値 |
| User_Name | principal.user.userid | grok 抽出後の userName の値 |
| User_Name | principal.administrative_domain | Grok 抽出後の domainName の値 |
| Was_Removable | security_result.detection_fields | was_removable_label(キー: Was_Removable、値: %{Was_Removable})と統合されました |
| Was_Source_Removable | security_result.detection_fields | was_source_removable_label(キー: Was_Source_Removable、値: %{Was_Source_Removable})と統合されました |
| computerName、destination_ip、protocol、source_ip、IP_Address、destination、userName、Process_PID、Category、Computer_Name | metadata.event_type | 最初は GENERIC_EVENT に設定します。protocol == HTTPS で(destination_ip または computerName)の場合は NETWORK_HTTP、(source_ip または IP_Address)と destination_ip の場合は NETWORK_CONNECTION、userName が空でない場合は USER_UNCATEGORIZED、Process_PID が空でない場合は SCAN_PROCESS に設定します。 |
| destination_ip | target.ip | destination_ip から統合 |
| incidents_url、matched_policies_by_severity | security_result | _sr と統合されました(rule_name: %{matched_policies_by_severity}、url_back_to_product: %{incidents_url}) |
| プロトコル | network.application_protocol | protocol == HTTP または HTTPS の場合は HTTPS に設定 |
| security_action | security_result.action | security_action から統合 |
| metadata.product_name | 「Enterprise DLP Platform」に設定 | |
| metadata.vendor_name | 「DigitalGuardian」に設定 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。