Druva Backup のログを収集する
このドキュメントでは、Druva REST API からイベントを取得して Google Cloud Storage バケットに書き込む Google Cloud Run 関数を設定し、Google Cloud Storage V2 を使用して Google Security Operations フィードを構成することで、Druva Backup ログを収集する方法について説明します。
Druva は、エンドポイント、SaaS アプリケーション、エンタープライズ ワークロードのバックアップ、障害復旧、アーカイブ サービスを提供するクラウドネイティブのデータ保護および管理プラットフォームです。このプラットフォームは、モニタリングとコンプライアンスのために SIEM ソリューションと統合できる、包括的な監査証跡、バックアップ イベント、復元アクティビティ、セキュリティ アラートを生成します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス
- 課金を有効にした Google Cloud プロジェクト
- 次の Google Cloud APIs が有効になっている。
- Cloud Run functions API
- Cloud Scheduler API
- Cloud Storage API
- Pub/Sub API
- IAM API
- Druva Cloud Platform Console への Druva Cloud 管理者のアクセス権
- API 認証情報を作成するための Druva Integration Center へのアクセス
Google Cloud Storage バケットを作成する
- Google Cloud コンソールに移動します。
- プロジェクトを選択するか、新しいプロジェクトを作成します。
- ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
- [バケットを作成] をクリックします。
次の構成情報を提供してください。
設定 値 バケットに名前を付ける グローバルに一意の名前( druva-backup-logsなど)を入力します。ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。 ロケーション Google SecOps インスタンスに最も近いロケーションを選択します(例: us-central1)。ストレージ クラス Standard(アクセス頻度の高いログにおすすめ) アクセス制御 均一(推奨) 保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする [作成] をクリックします。
Druva API 認証情報を収集する
Cloud Run 関数が Druva からイベントを取得できるようにするには、OAuth 2.0 認証を使用して API 認証情報を作成する必要があります。
API 認証情報を作成する
- Druva Cloud Platform Console にログインします。
- グローバル ナビゲーション メニューから [統合センター] を選択します。
- 左側のパネルで [API 認証情報] をクリックします。
- [New Credentials] をクリックします。
- [新しい認証情報] ウィンドウで、次の詳細を入力します。
名前: わかりやすい名前を入力します(例:
Google SecOps Cloud Storage Integration)。 - 承認制限を適用するには:
- データの取得と変更への完全アクセスを許可するには、[Druva Cloud Administrator] を選択します。
- または、[プロダクト管理者] を選択して、次のいずれかを選択します。 Cloud 管理者(読み取り専用)ロール: 変更権限なしでデータ取得のみにアクセスを制限する場合(SIEM 統合におすすめ)
- [保存] をクリックします。
API 認証情報を記録する
API 認証情報を作成すると、[認証情報の詳細] ウィンドウが表示されます。
- [クライアント ID] の横にあるコピーアイコンをクリックして、値をクリップボードにコピーします。
- クライアント ID(
McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxxなど)を安全に保存します。 - [シークレット キー] の横にあるコピー アイコンをクリックして、値をクリップボードにコピーします。
シークレット キーを安全に保存します(
Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPtなど)。
サービス アカウントを作成する
Cloud Run functions が Google Cloud Storage にアクセスするための専用のサービス アカウントを作成します。
- Google Cloud コンソールで、[IAM と管理] > [サービス アカウント] に移動します。
- [サービス アカウントを作成] をクリックします。
- 次の構成の詳細を指定します。
- サービス アカウント名:
druva-backup-function(またはわかりやすい名前)を入力します。 - サービス アカウントの説明:
Service account for Druva Backup Cloud Run functionと入力します。
- サービス アカウント名:
- [作成して続行] をクリックします。
- [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
- [ロールを選択] をクリックし、[ストレージ オブジェクト管理者] を選択します。
- [別のロールを追加] をクリックし、[Cloud Run 起動元] を選択します。
- [続行] をクリックします。
- [完了] をクリックします。
サービス アカウントのメールアドレス(
druva-backup-function@PROJECT_ID.iam.gserviceaccount.comなど)を記録します。
Pub/Sub トピックの作成
Cloud Scheduler が Cloud Run functions をトリガーするために使用する Pub/Sub トピックを作成します。
- Google Cloud コンソールで、[Pub/Sub>トピック] に移動します。
- [トピックを作成] をクリックします。
- 次の構成の詳細を指定します。
- トピック ID: 「
druva-backup-trigger」と入力します。
- トピック ID: 「
- [デフォルトのサブスクリプションを追加する] のチェックボックスをオフにします。
[作成] をクリックします。
Cloud Run functions を作成する
関数コードを準備する
OAuth 2.0 クライアント認証情報を使用して Druva API で認証し、イベント エンドポイントからページネーションを使用してイベントを取得し、結果を NDJSON として GCS バケットに書き込む Cloud Run 関数を作成します。
Cloud Run 関数をデプロイする
- Google Cloud コンソールで、[Cloud Run functions] に移動します。
- [関数を作成] をクリックします。
次の構成情報を提供してください。
- 環境: [第 2 世代] を選択します。
- 関数名: 「
druva-backup-to-gcs」と入力します。 - リージョン: GCS バケットに最も近いリージョン(
us-central1など)を選択します。 - トリガーのタイプ: [Cloud Pub/Sub] を選択します。
- Cloud Pub/Sub トピック:
druva-backup-triggerを選択します。 - サービス アカウント:
druva-backup-function@PROJECT_ID.iam.gserviceaccount.comを選択します。 - 割り当てられたメモリ:
512 MiB - タイムアウト:
540秒 - [インスタンスの最大数]:
1
[次へ] をクリックします。
[ランタイム] として [Python 3.11] を選択します。
[エントリ ポイント] を
mainに設定します。[ソースコード] エディタで、
main.pyの内容を次の内容に置き換えます。import base64 import json import os import time from datetime import datetime, timezone, timedelta import requests from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup") STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json") DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com") CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24")) def get_oauth_token(): """Obtain OAuth 2.0 access token using client credentials grant.""" token_url = f"https://{DRUVA_BASE_URL}/token" payload = { "grant_type": "client_credentials", "scope": "read", } resp = requests.post( token_url, data=payload, auth=(CLIENT_ID, CLIENT_SECRET), timeout=30, ) resp.raise_for_status() return resp.json()["access_token"] def load_state(storage_client): """Load the persisted state (last event time and tracker) from GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def save_state(storage_client, state): """Persist state to GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.upload_from_string( json.dumps(state), content_type="application/json", ) def fetch_events(token, state): """Fetch events from Druva API with pagination via nextPageToken.""" events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } params = {"pageSize": PAGE_SIZE} tracker = state.get("tracker") last_event_time = state.get("last_event_time") if tracker: params["tracker"] = tracker elif last_event_time: params["fromTime"] = last_event_time else: lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS) params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ") all_events = [] total_fetched = 0 while total_fetched < MAX_RECORDS: resp = requests.get( events_url, headers=headers, params=params, timeout=60, ) resp.raise_for_status() data = resp.json() events = data.get("events", []) all_events.extend(events) total_fetched += len(events) new_tracker = data.get("tracker") next_page_token = data.get("nextPageToken") if new_tracker: state["tracker"] = new_tracker if next_page_token: params["nextPageToken"] = next_page_token params.pop("tracker", None) params.pop("fromTime", None) else: break if all_events: last_ts = all_events[-1].get("eventTime", "") if last_ts: state["last_event_time"] = last_ts return all_events, state def write_events_to_gcs(storage_client, events): """Write events as NDJSON to GCS.""" if not events: return now = datetime.now(timezone.utc) filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson" blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}" ndjson_lines = "\n".join(json.dumps(event) for event in events) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(blob_path) blob.upload_from_string( ndjson_lines, content_type="application/x-ndjson", ) print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}") def main(event, context): """Cloud Run function entry point triggered by Pub/Sub.""" storage_client = storage.Client() token = get_oauth_token() state = load_state(storage_client) events, updated_state = fetch_events(token, state) write_events_to_gcs(storage_client, events) save_state(storage_client, updated_state) print(f"Completed: fetched {len(events)} events") return f"OK: {len(events)} events"requirements.txtの内容を次のように置き換えます。requests>=2.31.0 google-cloud-storage>=2.14.0
環境変数を構成する
- Cloud Run functions の構成で、[ランタイム、ビルド、接続、セキュリティの設定] セクションに移動します。
[ランタイム環境変数] で、次の変数を追加します。
GCS_BUCKET: GCS バケットの名前(例:druva-backup-logs)GCS_PREFIX: ログファイルの接頭辞パス(例:druva_backup)STATE_KEY: 状態ファイル名(例:druva_state.json)DRUVA_BASE_URL: Druva API のベース URL:- Druva Cloud(Standard)の
apis.druva.com - Druva GovCloud の
govcloudapis.druva.com
- Druva Cloud(Standard)の
CLIENT_ID: Druva API 認証情報のクライアント IDCLIENT_SECRET: Druva API 認証情報のシークレット キーMAX_RECORDS: 呼び出しごとに取得するレコードの最大数(例:10000)PAGE_SIZE: API ページあたりのイベント数(最大500)LOOKBACK_HOURS: 初回実行時に遡って確認する時間数(例:24)
[デプロイ] をクリックします。
デプロイが正常に完了するまで待ちます。
Cloud Scheduler ジョブを作成する
Cloud Scheduler ジョブを作成して、Cloud Run functions の関数を定期的にトリガーする。
- Google Cloud コンソールで、[Cloud Scheduler] に移動します。
- [ジョブを作成] をクリックします。
次の構成情報を提供してください。
- 名前: 「
druva-backup-scheduler」と入力します。 - リージョン: Cloud Run functions と同じリージョン(
us-central1など)を選択します。 - 説明: 「
Triggers Druva Backup log collection every 30 minutes」と入力します。 - 頻度:
*/30 * * * *(30 分ごと)と入力します。 - タイムゾーン: 任意のタイムゾーンを選択します(例:
UTC)。
- 名前: 「
[続行] をクリックします。
[ターゲット] を構成します。
- ターゲット タイプ: [Pub/Sub] を選択します。
- Cloud Pub/Sub トピック:
druva-backup-triggerを選択します。 - メッセージ本文: 「
{"trigger": "scheduled"}」と入力します。
[作成] をクリックします。
Cloud Scheduler ジョブをテストする
- [Cloud Scheduler] リストで、
druva-backup-schedulerを見つけます。 - [強制実行] をクリックして、関数をすぐにトリガーします。
- 次の方法で実行を確認します。
- Cloud Run functions のログは、Cloud Run functions > druva-backup-to-gcs > Logs にあります。
- Cloud Storage > druva-backup-logs の新しい NDJSON ファイルの GCS バケット
Google SecOps サービス アカウントを取得してフィードを構成する
サービス アカウントのメールアドレスを取得する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [単一フィードを設定] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
Druva Backup Events)。 - [ソースタイプ] として [Google Cloud Storage V2] を選択します。
- [ログタイプ] として [Druva Backup] を選択します。
[サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comこのメールアドレスをコピーして、次のステップで使用します。
フィードを構成する
- [次へ] をクリックします。
次の入力パラメータの値を指定します。
ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。
gs://druva-backup-logs/druva_backup/Source deletion option: 必要に応じて削除オプションを選択します。
- なし: 転送後にファイルを削除しない(テストにおすすめ)
- 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
- 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます(デフォルトは 180 日)。
アセットの名前空間: アセットの名前空間
Ingestion labels: このフィードのイベントに適用されるラベル
[次へ] をクリックします。
[Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
Google SecOps サービス アカウントに IAM 権限を付与する
Cloud Run 関数によって書き込まれたログファイルを読み取るには、Google SecOps サービス アカウントに GCS バケットに対する Storage オブジェクト閲覧者のロールが必要です。
- [Cloud Storage] > [バケット] に移動します。
- バケット名(例:
druva-backup-logs)をクリックします。 - [権限] タブに移動します。
- [アクセス権を付与] をクリックします。
- 次の構成の詳細を指定します。
- プリンシパルの追加: Google SecOps サービス アカウントのメールアドレス(例:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)を貼り付けます。 - ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
- プリンシパルの追加: Google SecOps サービス アカウントのメールアドレス(例:
[保存] をクリックします。
UDM マッピング テーブル
| ログフィールド | UDM マッピング | ロジック |
|---|---|---|
| inSyncUserID、eventsGroupId、FilesMissed、FilesBackedup、TotalBackupSize、TotalBytesTransferred、facility、inSyncDataSourceID、initiator、event_type | additional.fields | 空でない場合は、各フィールドから作成されたラベルと統合されます |
| イニシエータ | extensions.auth.type | 開始元がメールの正規表現と一致する場合は、「AUTHTYPE_UNSPECIFIED」に設定します。 |
| metadata.event_type | has_target_user が true で has_principal が true の場合は「USER_LOGIN」、has_principal が true で has_target が false の場合は「STATUS_UPDATE」、それ以外の場合は「GENERIC_EVENT」に設定します。 | |
| eventID | metadata.product_log_id | 文字列に変換しました |
| metadata.product_name | 「DRUVA_BACKUP」に設定します。 | |
| clientVersion | metadata.product_version | 値を直接コピー |
| inSyncDataSourceName | principal.asset.hostname | 値を直接コピー |
| ip | principal.asset.ip | 統合元 IP |
| inSyncDataSourceName | principal.hostname | 値を直接コピー |
| ip | principal.ip | 統合元 IP |
| clientOS | principal.platform | (?i)Linux と一致する場合は「LINUX」、(?i)windows と一致する場合は「WINDOWS」、(?i)mac と一致する場合は「MAC」に設定します。 |
| profileName | principal.resource.name | 値を直接コピー |
| profileID | principal.resource.product_object_id | 文字列に変換しました |
| eventState | security_result.action | (?i)Success と一致する場合は「ALLOW」、それ以外の場合は「BLOCK」に設定 |
| eventState | security_result.action_details | 値を直接コピー |
| 重要度 | security_result.severity | [0,1,2,3,LOW] の場合は「LOW」、[4,5,6,MEDIUM,SUBSTANTIAL,INFO] の場合は「MEDIUM」、[7,8,HIGH,SEVERE] の場合は「HIGH」、[9,10,VERY-HIGH,CRITICAL] の場合は「CRITICAL」に設定します。 |
| inSyncUserEmail、initiator | target.user.email_addresses | inSyncUserEmail から統合。メールアドレスの正規表現に一致する場合は、開始元からも統合 |
| inSyncUserName | target.user.userid | 値を直接コピー |
| metadata.vendor_name | 「DRUVA_BACKUP」に設定します。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。