Druva Backup 로그 수집

다음에서 지원:

이 문서에서는 Druva REST API에서 이벤트를 가져와 Google Cloud Storage 버킷에 작성하는 Google Cloud Run 함수를 설정한 다음 Google Cloud Storage V2를 사용하여 Google Security Operations 피드를 구성하여 Druva Backup 로그를 수집하는 방법을 설명합니다.

Druva는 엔드포인트, SaaS 애플리케이션, 엔터프라이즈 워크로드에 백업, 재해 복구, 보관 서비스를 제공하는 클라우드 기반 데이터 보호 및 관리 플랫폼입니다. 이 플랫폼은 모니터링 및 규정 준수를 위해 SIEM 솔루션과 통합할 수 있는 포괄적인 감사 추적, 백업 이벤트, 복원 활동, 보안 알림을 생성합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • 결제가 사용 설정된 Google Cloud 프로젝트
  • 다음 Google Cloud API가 사용 설정되었습니다.
    • Cloud Run Functions API
    • Cloud Scheduler API
    • Cloud Storage API
    • Pub/Sub API
    • IAM API
  • Druva Cloud Platform Console에 대한 Druva Cloud 관리자 액세스
  • API 사용자 인증 정보 생성을 위한 Druva 통합 센터 액세스

Google Cloud Storage 버킷 만들기

  1. Google Cloud 콘솔로 이동합니다.
  2. 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
  3. 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
  4. 버킷 만들기를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.

    설정
    버킷 이름 지정 전역적으로 고유한 이름 (예: druva-backup-logs)을 입력합니다.
    위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전)
    위치 Google SecOps 인스턴스에 가장 가까운 위치를 선택합니다 (예: us-central1).
    스토리지 클래스 Standard (자주 액세스하는 로그에 권장)
    액세스 제어 균일 (권장)
    보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정
  6. 만들기를 클릭합니다.

Druva API 사용자 인증 정보 수집

Cloud Run 함수가 Druva에서 이벤트를 가져오도록 하려면 OAuth 2.0 인증으로 API 사용자 인증 정보를 만들어야 합니다.

API 사용자 인증 정보 만들기

  1. Druva Cloud Platform Console에 로그인합니다.
  2. 전체 탐색 메뉴에서 통합 센터를 선택합니다.
  3. 왼쪽 패널에서 API 사용자 인증 정보를 클릭합니다.
  4. 새 사용자 인증 정보를 클릭합니다.
  5. 새 사용자 인증 정보 창에서 다음 세부정보를 입력합니다. 이름: 설명이 포함된 이름 (예: Google SecOps Cloud Storage Integration)을 입력합니다.
  6. 승인 제한을 적용하려면 다음을 실행하세요.
    1. 데이터 검색 및 수정에 대한 전체 액세스를 허용하려면 Druva Cloud Administrator를 선택합니다.
    2. 또는 제품 관리자를 선택하고 다음을 선택합니다. Cloud 관리자 (읽기 전용) 역할: 수정 권한 없이 데이터 검색 액세스만 제한합니다 (SIEM 통합에 권장).
  7. 저장을 클릭합니다.

API 사용자 인증 정보 기록

API 사용자 인증 정보를 만들면 사용자 인증 정보 세부정보 창이 표시됩니다.

  1. 클라이언트 ID 옆에 있는 복사 아이콘을 클릭하여 값을 클립보드에 복사합니다.
  2. 클라이언트 ID를 안전하게 저장합니다 (예: McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx).
  3. 보안 비밀 키 옆에 있는 복사 아이콘을 클릭하여 값을 클립보드에 복사합니다.
  4. 보안 비밀 키를 안전하게 저장합니다 (예: Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt).

서비스 계정 만들기

Cloud Run 함수가 Google Cloud Storage에 액세스할 수 있도록 전용 서비스 계정을 만듭니다.

  1. Google Cloud 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
  2. 서비스 계정 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 서비스 계정 이름: druva-backup-function (또는 설명이 포함된 이름)을 입력합니다.
    • 서비스 계정 설명: Service account for Druva Backup Cloud Run function를 입력합니다.
  4. 만들고 계속하기를 클릭합니다.
  5. 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 역할을 추가합니다.
    1. 역할 선택을 클릭하고 스토리지 객체 관리자를 선택합니다.
    2. 다른 역할 추가를 클릭하고 Cloud Run 호출자를 선택합니다.
  6. 계속을 클릭합니다.
  7. 완료를 클릭합니다.
  8. 서비스 계정 이메일 (예: druva-backup-function@PROJECT_ID.iam.gserviceaccount.com)을 기록합니다.

Pub/Sub 주제 만들기

Cloud Scheduler가 Cloud Run 함수를 트리거하는 데 사용할 Pub/Sub 주제를 만듭니다.

  1. Google Cloud 콘솔에서 Pub/Sub > 주제로 이동합니다.
  2. 주제 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 주제 ID: druva-backup-trigger를 입력합니다.
  4. 기본 구독 추가를 선택 해제합니다.
  5. 만들기를 클릭합니다.

Cloud Run 함수 만들기

함수 코드 준비

OAuth 2.0 클라이언트 사용자 인증 정보를 사용하여 Druva API로 인증하고, 페이지로 나누기를 사용하여 이벤트 엔드포인트를 통해 이벤트를 검색하고, 결과를 NDJSON으로 GCS 버킷에 쓰는 Cloud Run 함수를 만듭니다.

Cloud Run 함수 배포

  1. Google Cloud 콘솔에서 Cloud Run 함수로 이동합니다.
  2. 함수 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    • 환경: 2세대 선택
    • 함수 이름: druva-backup-to-gcs를 입력합니다.
    • 리전: GCS 버킷에 가장 가까운 리전을 선택합니다 (예: us-central1).
    • 트리거 유형: Cloud Pub/Sub를 선택합니다.
    • Cloud Pub/Sub 주제: druva-backup-trigger 선택
    • 서비스 계정: druva-backup-function@PROJECT_ID.iam.gserviceaccount.com 선택
    • 할당된 메모리: 512 MiB
    • 제한 시간: 540
    • 최대 인스턴스 수: 1
  4. 다음을 클릭합니다.

  5. 런타임으로 Python 3.11을 선택합니다.

  6. 진입점main로 설정합니다.

  7. 소스 코드 편집기에서 main.py의 내용을 다음으로 바꿉니다.

    import base64
    import json
    import os
    import time
    from datetime import datetime, timezone, timedelta
    
    import requests
    from google.cloud import storage
    
    GCS_BUCKET = os.environ["GCS_BUCKET"]
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup")
    STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json")
    DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com")
    CLIENT_ID = os.environ["CLIENT_ID"]
    CLIENT_SECRET = os.environ["CLIENT_SECRET"]
    MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000"))
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500"))
    LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24"))
    
    def get_oauth_token():
        """Obtain OAuth 2.0 access token using client credentials grant."""
        token_url = f"https://{DRUVA_BASE_URL}/token"
        payload = {
            "grant_type": "client_credentials",
            "scope": "read",
        }
        resp = requests.post(
            token_url,
            data=payload,
            auth=(CLIENT_ID, CLIENT_SECRET),
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json()["access_token"]
    
    def load_state(storage_client):
        """Load the persisted state (last event time and tracker) from GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        if blob.exists():
            return json.loads(blob.download_as_text())
        return {}
    
    def save_state(storage_client, state):
        """Persist state to GCS."""
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}")
        blob.upload_from_string(
            json.dumps(state),
            content_type="application/json",
        )
    
    def fetch_events(token, state):
        """Fetch events from Druva API with pagination via nextPageToken."""
        events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events"
        headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        }
    
        params = {"pageSize": PAGE_SIZE}
    
        tracker = state.get("tracker")
        last_event_time = state.get("last_event_time")
    
        if tracker:
            params["tracker"] = tracker
        elif last_event_time:
            params["fromTime"] = last_event_time
        else:
            lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS)
            params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ")
    
        all_events = []
        total_fetched = 0
    
        while total_fetched < MAX_RECORDS:
            resp = requests.get(
                events_url,
                headers=headers,
                params=params,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
    
            events = data.get("events", [])
            all_events.extend(events)
            total_fetched += len(events)
    
            new_tracker = data.get("tracker")
            next_page_token = data.get("nextPageToken")
    
            if new_tracker:
                state["tracker"] = new_tracker
    
            if next_page_token:
                params["nextPageToken"] = next_page_token
                params.pop("tracker", None)
                params.pop("fromTime", None)
            else:
                break
    
        if all_events:
            last_ts = all_events[-1].get("eventTime", "")
            if last_ts:
                state["last_event_time"] = last_ts
    
        return all_events, state
    
    def write_events_to_gcs(storage_client, events):
        """Write events as NDJSON to GCS."""
        if not events:
            return
    
        now = datetime.now(timezone.utc)
        filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson"
        blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}"
    
        ndjson_lines = "\n".join(json.dumps(event) for event in events)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(blob_path)
        blob.upload_from_string(
            ndjson_lines,
            content_type="application/x-ndjson",
        )
        print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}")
    
    def main(event, context):
        """Cloud Run function entry point triggered by Pub/Sub."""
        storage_client = storage.Client()
    
        token = get_oauth_token()
    
        state = load_state(storage_client)
    
        events, updated_state = fetch_events(token, state)
    
        write_events_to_gcs(storage_client, events)
    
        save_state(storage_client, updated_state)
    
        print(f"Completed: fetched {len(events)} events")
        return f"OK: {len(events)} events"
    
  8. requirements.txt의 내용을 다음으로 바꿉니다.

    requests>=2.31.0
    google-cloud-storage>=2.14.0
    

환경 변수 구성

  1. Cloud Run 함수 구성에서 런타임, 빌드, 연결, 보안 설정 섹션으로 이동합니다.
  2. 런타임 환경 변수에서 다음 변수를 추가합니다.

    • GCS_BUCKET: GCS 버킷의 이름입니다 (예: druva-backup-logs).
    • GCS_PREFIX: 로그 파일의 접두사 경로입니다 (예: druva_backup).
    • STATE_KEY: 상태 파일 이름 (예: druva_state.json)
    • DRUVA_BASE_URL: Druva API 기본 URL입니다.
      • apis.druva.com(Druva Cloud(Standard))
      • govcloudapis.druva.com(Druva GovCloud의 경우)
    • CLIENT_ID: Druva API 사용자 인증 정보의 클라이언트 ID
    • CLIENT_SECRET: Druva API 사용자 인증 정보의 보안 키
    • MAX_RECORDS: 호출당 가져올 최대 레코드 수입니다 (예: 10000).
    • PAGE_SIZE: API 페이지당 이벤트 수 (최대 500)
    • LOOKBACK_HOURS: 첫 번째 실행에서 되돌아볼 시간 (예: 24)
  3. 배포를 클릭합니다.

  4. 배포가 완료될 때까지 기다립니다.

Cloud Scheduler 작업 만들기

정기적으로 Cloud Run 함수를 트리거하는 Cloud Scheduler 작업을 만듭니다.

  1. Google Cloud 콘솔에서 Cloud Scheduler로 이동합니다.
  2. 작업 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    • 이름: druva-backup-scheduler을 입력합니다.
    • 리전: Cloud Run 함수와 동일한 리전을 선택합니다 (예: us-central1).
    • 설명: Triggers Druva Backup log collection every 30 minutes 입력
    • 빈도: */30 * * * * (30분마다) 입력
    • 시간대: 선호하는 시간대를 선택합니다 (예: UTC).
  4. 계속을 클릭합니다.

  5. 타겟을 구성합니다.

    • 대상 유형: Pub/Sub를 선택합니다.
    • Cloud Pub/Sub 주제: druva-backup-trigger 선택
    • 메시지 본문: {"trigger": "scheduled"}를 입력합니다.
  6. 만들기를 클릭합니다.

Cloud Scheduler 작업 테스트

  1. Cloud Scheduler 목록에서 druva-backup-scheduler를 찾습니다.
  2. 강제 실행을 클릭하여 함수를 즉시 트리거합니다.
  3. 다음을 확인하여 실행을 검증합니다.
    • Cloud Run Functions > druva-backup-to-gcs > Logs의 Cloud Run Functions 로그
    • Cloud Storage > druva-backup-logs의 새 NDJSON 파일용 GCS 버킷

Google SecOps 서비스 계정을 가져오고 피드 구성

서비스 계정 이메일 가져오기

  1. SIEM 설정> 피드로 이동합니다.
  2. 새 피드 추가를 클릭합니다.
  3. 단일 피드 구성을 클릭합니다.
  4. 피드 이름 필드에 피드 이름을 입력합니다(예: Druva Backup Events).
  5. 소스 유형으로 Google Cloud Storage V2를 선택합니다.
  6. 로그 유형으로 Druva Backup을 선택합니다.
  7. 서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다. 예를 들면 다음과 같습니다.

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 다음 단계에서 사용할 수 있도록 이 이메일 주소를 복사합니다.

피드 구성

  1. 다음을 클릭합니다.
  2. 다음 입력 매개변수의 값을 지정합니다.

    • 스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.

      gs://druva-backup-logs/druva_backup/
      
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.

      • 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
      • 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
      • 전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다 (기본값은 180일).

    • 애셋 네임스페이스: 애셋 네임스페이스

    • 수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.

  3. 다음을 클릭합니다.

  4. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

Google SecOps 서비스 계정에 IAM 권한 부여

Google SecOps 서비스 계정에는 Cloud Run 함수에서 작성한 로그 파일을 읽기 위해 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.

  1. Cloud Storage> 버킷으로 이동합니다.
  2. 버킷 이름 (예: druva-backup-logs)을 클릭합니다.
  3. 권한 탭으로 이동합니다.
  4. 액세스 권한 부여를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.
    • 주 구성원 추가: Google SecOps 서비스 계정 이메일 (예: chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)을 붙여넣습니다.
    • 역할 할당: 스토리지 객체 뷰어 선택
  6. 저장을 클릭합니다.

UDM 매핑 테이블

로그 필드 UDM 매핑 논리
inSyncUserID, eventsGroupId, FilesMissed, FilesBackedup, TotalBackupSize, TotalBytesTransferred, facility, inSyncDataSourceID, initiator, event_type additional.fields 비어 있지 않은 경우 각 필드에서 생성된 라벨과 병합됨
initiator extensions.auth.type 시작자가 이메일 정규식과 일치하는 경우 'AUTHTYPE_UNSPECIFIED'로 설정
metadata.event_type has_target_user가 true이고 has_principal이 true이면 'USER_LOGIN'으로 설정되고, has_principal이 true이고 has_target이 false이면 'STATUS_UPDATE'로 설정되며, 그 밖의 경우에는 'GENERIC_EVENT'로 설정됩니다.
eventID metadata.product_log_id 문자열로 변환됨
metadata.product_name 'DRUVA_BACKUP'으로 설정
clientVersion metadata.product_version 값이 직접 복사됨
inSyncDataSourceName principal.asset.hostname 값이 직접 복사됨
ip principal.asset.ip IP에서 병합됨
inSyncDataSourceName principal.hostname 값이 직접 복사됨
ip principal.ip IP에서 병합됨
clientOS principal.platform (?i)Linux와 일치하는 경우 'LINUX'로 설정됩니다. (?i)windows와 일치하는 경우 'WINDOWS'로 설정됩니다. (?i)mac과 일치하는 경우 'MAC'으로 설정됩니다.
profileName principal.resource.name 값이 직접 복사됨
profileID principal.resource.product_object_id 문자열로 변환됨
eventState security_result.action (?i)Success와 일치하면 'ALLOW', 그렇지 않으면 'BLOCK'으로 설정됩니다.
eventState security_result.action_details 값이 직접 복사됨
줄이는 것을 security_result.severity [0,1,2,3,LOW]에 속하는 경우 'LOW', [4,5,6,MEDIUM,SUBSTANTIAL,INFO]에 속하는 경우 'MEDIUM', [7,8,HIGH,SEVERE]에 속하는 경우 'HIGH', [9,10,VERY-HIGH,CRITICAL]에 속하는 경우 'CRITICAL'로 설정됩니다.
inSyncUserEmail, initiator target.user.email_addresses inSyncUserEmail에서 병합됨. 이메일 정규식과 일치하는 경우 이니시에이터에서도 병합됨
inSyncUserName target.user.userid 값이 직접 복사됨
metadata.vendor_name 'DRUVA_BACKUP'으로 설정

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.