Druva Backup のログを収集する

以下でサポートされています。

このドキュメントでは、Druva REST API からイベントを取得して Google Cloud Storage バケットに書き込む Google Cloud Run 関数を設定し、Google Cloud Storage V2 を使用して Google Security Operations フィードを構成することで、Druva Backup ログを収集する方法について説明します。

Druva は、エンドポイント、SaaS アプリケーション、エンタープライズ ワークロードのバックアップ、障害復旧、アーカイブ サービスを提供するクラウドネイティブのデータ保護および管理プラットフォームです。このプラットフォームは、モニタリングとコンプライアンスのために SIEM ソリューションと統合できる、包括的な監査証跡、バックアップ イベント、復元アクティビティ、セキュリティ アラートを生成します。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス
  • 課金を有効にした Google Cloud プロジェクト
  • 次の Google Cloud APIs が有効になっている。
    • Cloud Run functions API
    • Cloud Scheduler API
    • Cloud Storage API
    • Pub/Sub API
    • IAM API
  • Druva Cloud Platform Console への Druva Cloud 管理者のアクセス権
  • API 認証情報を作成するための Druva Integration Center へのアクセス

Google Cloud Storage バケットを作成する

  1. Google Cloud コンソールに移動します。
  2. プロジェクトを選択するか、新しいプロジェクトを作成します。
  3. ナビゲーション メニューで、[Cloud Storage > バケット] に移動します。
  4. [バケットを作成] をクリックします。
  5. 次の構成情報を提供してください。

    設定
    バケットに名前を付ける グローバルに一意の名前(druva-backup-logs など)を入力します。
    ロケーション タイプ ニーズに基づいて選択します(リージョン、デュアルリージョン、マルチリージョン)。
    ロケーション Google SecOps インスタンスに最も近いロケーションを選択します(例: us-central1)。
    ストレージ クラス Standard(アクセス頻度の高いログにおすすめ)
    アクセス制御 均一(推奨)
    保護ツール 省略可: オブジェクトのバージョニングまたは保持ポリシーを有効にする
  6. [作成] をクリックします。

Druva API 認証情報を収集する

Cloud Run 関数が Druva からイベントを取得できるようにするには、OAuth 2.0 認証を使用して API 認証情報を作成する必要があります。

API 認証情報を作成する

  1. Druva Cloud Platform Console にログインします。
  2. グローバル ナビゲーション メニューから [統合センター] を選択します。
  3. 左側のパネルで [API 認証情報] をクリックします。
  4. [New Credentials] をクリックします。
  5. [新しい認証情報] ウィンドウで、次の詳細を入力します。 名前: わかりやすい名前を入力します(例: Google SecOps Cloud Storage Integration)。
  6. 承認制限を適用するには:
    1. データの取得と変更への完全アクセスを許可するには、[Druva Cloud Administrator] を選択します。
    2. または、[プロダクト管理者] を選択して、次のいずれかを選択します。 Cloud 管理者(読み取り専用)ロール: 変更権限なしでデータ取得のみにアクセスを制限する場合(SIEM 統合におすすめ)
  7. [保存] をクリックします。

API 認証情報を記録する

API 認証情報を作成すると、[認証情報の詳細] ウィンドウが表示されます。

  1. [クライアント ID] の横にあるコピーアイコンをクリックして、値をクリップボードにコピーします。
  2. クライアント ID(McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx など)を安全に保存します。
  3. [シークレット キー] の横にあるコピー アイコンをクリックして、値をクリップボードにコピーします。
  4. シークレット キーを安全に保存します(Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt など)。

サービス アカウントを作成する

Cloud Run functions が Google Cloud Storage にアクセスするための専用のサービス アカウントを作成します。

  1. Google Cloud コンソールで、[IAM と管理] > [サービス アカウント] に移動します。
  2. [サービス アカウントを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • サービス アカウント名: druva-backup-function(またはわかりやすい名前)を入力します。
    • サービス アカウントの説明: Service account for Druva Backup Cloud Run function と入力します。
  4. [作成して続行] をクリックします。
  5. [このサービス アカウントにプロジェクトへのアクセスを許可する] セクションで、次のロールを追加します。
    1. [ロールを選択] をクリックし、[ストレージ オブジェクト管理者] を選択します。
    2. [別のロールを追加] をクリックし、[Cloud Run 起動元] を選択します。
  6. [続行] をクリックします。
  7. [完了] をクリックします。
  8. サービス アカウントのメールアドレス(druva-backup-function@PROJECT_ID.iam.gserviceaccount.com など)を記録します。

Pub/Sub トピックの作成

Cloud Scheduler が Cloud Run functions をトリガーするために使用する Pub/Sub トピックを作成します。

  1. Google Cloud コンソールで、[Pub/Sub>トピック] に移動します。
  2. [トピックを作成] をクリックします。
  3. 次の構成の詳細を指定します。
    • トピック ID: 「druva-backup-trigger」と入力します。
  4. [デフォルトのサブスクリプションを追加する] のチェックボックスをオフにします。
  5. [作成] をクリックします。

Cloud Run functions を作成する

関数コードを準備する

OAuth 2.0 クライアント認証情報を使用して Druva API で認証し、イベント エンドポイントからページネーションを使用してイベントを取得し、結果を NDJSON として GCS バケットに書き込む Cloud Run 関数を作成します。

Cloud Run 関数をデプロイする

  1. Google Cloud コンソールで、[Cloud Run functions] に移動します。
  2. [関数を作成] をクリックします。
  3. 次の構成情報を提供してください。

    • 環境: [第 2 世代] を選択します。
    • 関数名: 「druva-backup-to-gcs」と入力します。
    • リージョン: GCS バケットに最も近いリージョン(us-central1 など)を選択します。
    • トリガーのタイプ: [Cloud Pub/Sub] を選択します。
    • Cloud Pub/Sub トピック: druva-backup-trigger を選択します。
    • サービス アカウント: druva-backup-function@PROJECT_ID.iam.gserviceaccount.com を選択します。
    • 割り当てられたメモリ: 512 MiB
    • タイムアウト: 540
    • [インスタンスの最大数]: 1
  4. [次へ] をクリックします。

  5. [ランタイム] として [Python 3.11] を選択します。

  6. [エントリ ポイント] を main に設定します。

  7. [ソースコード] エディタで、main.py の内容を次の内容に置き換えます。

    import base64
    import json
    import os
    import time
    from datetime import datetime, timezone, timedelta
    
    import requests
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup")
    STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json")
    DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com")
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
    def get_oauth_token():
        """Obtain OAuth 2.0 access token using client credentials grant."""
        token_url = f"https://{DRUVA_BASE_URL}/token"
        payload = {
            "grant_type": "client_credentials",
            "scope": "read",
        }
        resp = requests.post(
            token_url,
            data=payload,
            auth=(CLIENT_ID, CLIENT_SECRET),
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def load_state(storage_client):
        """Load the persisted state (last event time and tracker) from GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.download_as_text())
        return {}
    
    def save_state(storage_client, state):
        """Persist state to GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.upload_from_string(
            json.dumps(state),
            content_type="application/json",
        )
    
    def fetch_events(token, state):
        """Fetch events from Druva API with pagination via nextPageToken."""
        events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
    
        params = {"pageSize": PAGE_SIZE}
    
        tracker = state.get("tracker")
        last_event_time = state.get("last_event_time")
    
        if tracker:
            params["tracker"] = tracker
        elif last_event_time:
            params["fromTime"] = last_event_time
        else:
            lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS)
            params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        all_events = []
        total_fetched = 0
    
        while total_fetched < MAX_RECORDS:
            resp = requests.get(
                events_url,
                headers=headers,
                params=params,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
    
            events = data.get("events", [])
            all_events.extend(events)
            total_fetched += len(events)
    
            new_tracker = data.get("tracker")
            next_page_token = data.get("nextPageToken")
    
            if new_tracker:
                state["tracker"] = new_tracker
    
            if next_page_token:
                params["nextPageToken"] = next_page_token
                params.pop("tracker", None)
                params.pop("fromTime", None)
            else:
                break
    
        if all_events:
            last_ts = all_events[-1].get("eventTime", "")
            if last_ts:
                state["last_event_time"] = last_ts
    
        return all_events, state
    
    def write_events_to_gcs(storage_client, events):
        """Write events as NDJSON to GCS."""
        if not events:
            return
    
        now = datetime.now(timezone.utc)
        filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson"
        blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}"
    
        ndjson_lines = "\n".join(json.dumps(event) for event in events)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(blob_path)
        blob.upload_from_string(
            ndjson_lines,
            content_type="application/x-ndjson",
        )
        print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}")
    
    def main(event, context):
        """Cloud Run function entry point triggered by Pub/Sub."""
        storage_client = storage.Client()
    
        token = get_oauth_token()
    
        state = load_state(storage_client)
    
        events, updated_state = fetch_events(token, state)
    
        write_events_to_gcs(storage_client, events)
    
        save_state(storage_client, updated_state)
    
        print(f"Completed: fetched {len(events)} events")
        return f"OK: {len(events)} events"
    
  8. requirements.txt の内容を次のように置き換えます。

    requests>=2.31.0
    google-cloud-storage>=2.14.0
    

環境変数を構成する

  1. Cloud Run functions の構成で、[ランタイム、ビルド、接続、セキュリティの設定] セクションに移動します。
  2. [ランタイム環境変数] で、次の変数を追加します。

    • GCS_BUCKET: GCS バケットの名前(例: druva-backup-logs
    • GCS_PREFIX: ログファイルの接頭辞パス(例: druva_backup
    • STATE_KEY: 状態ファイル名(例: druva_state.json
    • DRUVA_BASE_URL: Druva API のベース URL:
      • Druva Cloud(Standard)の apis.druva.com
      • Druva GovCloud の govcloudapis.druva.com
    • CLIENT_ID: Druva API 認証情報のクライアント ID
    • CLIENT_SECRET: Druva API 認証情報のシークレット キー
    • MAX_RECORDS: 呼び出しごとに取得するレコードの最大数(例: 10000
    • PAGE_SIZE: API ページあたりのイベント数(最大 500
    • LOOKBACK_HOURS: 初回実行時に遡って確認する時間数(例: 24
  3. [デプロイ] をクリックします。

  4. デプロイが正常に完了するまで待ちます。

Cloud Scheduler ジョブを作成する

Cloud Scheduler ジョブを作成して、Cloud Run functions の関数を定期的にトリガーする。

  1. Google Cloud コンソールで、[Cloud Scheduler] に移動します。
  2. [ジョブを作成] をクリックします。
  3. 次の構成情報を提供してください。

    • 名前: 「druva-backup-scheduler」と入力します。
    • リージョン: Cloud Run functions と同じリージョン(us-central1 など)を選択します。
    • 説明: 「Triggers Druva Backup log collection every 30 minutes」と入力します。
    • 頻度: */30 * * * *(30 分ごと)と入力します。
    • タイムゾーン: 任意のタイムゾーンを選択します(例: UTC)。
  4. [続行] をクリックします。

  5. [ターゲット] を構成します。

    • ターゲット タイプ: [Pub/Sub] を選択します。
    • Cloud Pub/Sub トピック: druva-backup-trigger を選択します。
    • メッセージ本文: 「{"trigger": "scheduled"}」と入力します。
  6. [作成] をクリックします。

Cloud Scheduler ジョブをテストする

  1. [Cloud Scheduler] リストで、druva-backup-scheduler を見つけます。
  2. [強制実行] をクリックして、関数をすぐにトリガーします。
  3. 次の方法で実行を確認します。
    • Cloud Run functions のログは、Cloud Run functions > druva-backup-to-gcs > Logs にあります。
    • Cloud Storage > druva-backup-logs の新しい NDJSON ファイルの GCS バケット

Google SecOps サービス アカウントを取得してフィードを構成する

サービス アカウントのメールアドレスを取得する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [単一フィードを設定] をクリックします。
  4. [フィード名] フィールドに、フィードの名前を入力します(例: Druva Backup Events)。
  5. [ソースタイプ] として [Google Cloud Storage V2] を選択します。
  6. [ログタイプ] として [Druva Backup] を選択します。
  7. [サービス アカウントを取得する] をクリックします。一意のサービス アカウント メールアドレスが表示されます(例:)。

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. このメールアドレスをコピーして、次のステップで使用します。

フィードを構成する

  1. [次へ] をクリックします。
  2. 次の入力パラメータの値を指定します。

    • ストレージ バケットの URL: 接頭辞パスを含む GCS バケット URI を入力します。

      gs://druva-backup-logs/druva_backup/
      
    • Source deletion option: 必要に応じて削除オプションを選択します。

      • なし: 転送後にファイルを削除しない(テストにおすすめ)
      • 転送されたファイルを削除する: 転送が完了した後にファイルを削除します。
      • 転送されたファイルと空のディレクトリを削除する: 転送が完了した後にファイルと空のディレクトリを削除します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます(デフォルトは 180 日)。

    • アセットの名前空間: アセットの名前空間

    • Ingestion labels: このフィードのイベントに適用されるラベル

  3. [次へ] をクリックします。

  4. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

Google SecOps サービス アカウントに IAM 権限を付与する

Cloud Run 関数によって書き込まれたログファイルを読み取るには、Google SecOps サービス アカウントに GCS バケットに対する Storage オブジェクト閲覧者のロールが必要です。

  1. [Cloud Storage] > [バケット] に移動します。
  2. バケット名(例: druva-backup-logs)をクリックします。
  3. [権限] タブに移動します。
  4. [アクセス権を付与] をクリックします。
  5. 次の構成の詳細を指定します。
    • プリンシパルの追加: Google SecOps サービス アカウントのメールアドレス(例: chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)を貼り付けます。
    • ロールを割り当てる: [Storage オブジェクト閲覧者] を選択します。
  6. [保存] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
inSyncUserID、eventsGroupId、FilesMissed、FilesBackedup、TotalBackupSize、TotalBytesTransferred、facility、inSyncDataSourceID、initiator、event_type additional.fields 空でない場合は、各フィールドから作成されたラベルと統合されます
イニシエータ extensions.auth.type 開始元がメールの正規表現と一致する場合は、「AUTHTYPE_UNSPECIFIED」に設定します。
metadata.event_type has_target_user が true で has_principal が true の場合は「USER_LOGIN」、has_principal が true で has_target が false の場合は「STATUS_UPDATE」、それ以外の場合は「GENERIC_EVENT」に設定します。
eventID metadata.product_log_id 文字列に変換しました
metadata.product_name 「DRUVA_BACKUP」に設定します。
clientVersion metadata.product_version 値を直接コピー
inSyncDataSourceName principal.asset.hostname 値を直接コピー
ip principal.asset.ip 統合元 IP
inSyncDataSourceName principal.hostname 値を直接コピー
ip principal.ip 統合元 IP
clientOS principal.platform (?i)Linux と一致する場合は「LINUX」、(?i)windows と一致する場合は「WINDOWS」、(?i)mac と一致する場合は「MAC」に設定します。
profileName principal.resource.name 値を直接コピー
profileID principal.resource.product_object_id 文字列に変換しました
eventState security_result.action (?i)Success と一致する場合は「ALLOW」、それ以外の場合は「BLOCK」に設定
eventState security_result.action_details 値を直接コピー
重要度 security_result.severity [0,1,2,3,LOW] の場合は「LOW」、[4,5,6,MEDIUM,SUBSTANTIAL,INFO] の場合は「MEDIUM」、[7,8,HIGH,SEVERE] の場合は「HIGH」、[9,10,VERY-HIGH,CRITICAL] の場合は「CRITICAL」に設定します。
inSyncUserEmail、initiator target.user.email_addresses inSyncUserEmail から統合。メールアドレスの正規表現に一致する場合は、開始元からも統合
inSyncUserName target.user.userid 値を直接コピー
metadata.vendor_name 「DRUVA_BACKUP」に設定します。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。