Druva Backup 로그 수집
이 문서에서는 Druva REST API에서 이벤트를 가져와 Google Cloud Storage 버킷에 작성하는 Google Cloud Run 함수를 설정한 다음 Google Cloud Storage V2를 사용하여 Google Security Operations 피드를 구성하여 Druva Backup 로그를 수집하는 방법을 설명합니다.
Druva는 엔드포인트, SaaS 애플리케이션, 엔터프라이즈 워크로드에 백업, 재해 복구, 보관 서비스를 제공하는 클라우드 기반 데이터 보호 및 관리 플랫폼입니다. 이 플랫폼은 모니터링 및 규정 준수를 위해 SIEM 솔루션과 통합할 수 있는 포괄적인 감사 추적, 백업 이벤트, 복원 활동, 보안 알림을 생성합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- 결제가 사용 설정된 Google Cloud 프로젝트
- 다음 Google Cloud API가 사용 설정되었습니다.
- Cloud Run Functions API
- Cloud Scheduler API
- Cloud Storage API
- Pub/Sub API
- IAM API
- Druva Cloud Platform Console에 대한 Druva Cloud 관리자 액세스
- API 사용자 인증 정보 생성을 위한 Druva 통합 센터 액세스
Google Cloud Storage 버킷 만들기
- Google Cloud 콘솔로 이동합니다.
- 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
- 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
- 버킷 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 버킷 이름 지정 전역적으로 고유한 이름 (예: druva-backup-logs)을 입력합니다.위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전) 위치 Google SecOps 인스턴스에 가장 가까운 위치를 선택합니다 (예: us-central1).스토리지 클래스 Standard (자주 액세스하는 로그에 권장) 액세스 제어 균일 (권장) 보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정 만들기를 클릭합니다.
Druva API 사용자 인증 정보 수집
Cloud Run 함수가 Druva에서 이벤트를 가져오도록 하려면 OAuth 2.0 인증으로 API 사용자 인증 정보를 만들어야 합니다.
API 사용자 인증 정보 만들기
- Druva Cloud Platform Console에 로그인합니다.
- 전체 탐색 메뉴에서 통합 센터를 선택합니다.
- 왼쪽 패널에서 API 사용자 인증 정보를 클릭합니다.
- 새 사용자 인증 정보를 클릭합니다.
- 새 사용자 인증 정보 창에서 다음 세부정보를 입력합니다.
이름: 설명이 포함된 이름 (예:
Google SecOps Cloud Storage Integration)을 입력합니다. - 승인 제한을 적용하려면 다음을 실행하세요.
- 데이터 검색 및 수정에 대한 전체 액세스를 허용하려면 Druva Cloud Administrator를 선택합니다.
- 또는 제품 관리자를 선택하고 다음을 선택합니다. Cloud 관리자 (읽기 전용) 역할: 수정 권한 없이 데이터 검색 액세스만 제한합니다 (SIEM 통합에 권장).
- 저장을 클릭합니다.
API 사용자 인증 정보 기록
API 사용자 인증 정보를 만들면 사용자 인증 정보 세부정보 창이 표시됩니다.
- 클라이언트 ID 옆에 있는 복사 아이콘을 클릭하여 값을 클립보드에 복사합니다.
- 클라이언트 ID를 안전하게 저장합니다 (예:
McNkxxxx4Vicxxxx4Ldpxxxx/09Uxxxx). - 보안 비밀 키 옆에 있는 복사 아이콘을 클릭하여 값을 클립보드에 복사합니다.
보안 비밀 키를 안전하게 저장합니다 (예:
Xmcxxxx8j5xxxx6NxxxxRbRxxxxNNyPt).
서비스 계정 만들기
Cloud Run 함수가 Google Cloud Storage에 액세스할 수 있도록 전용 서비스 계정을 만듭니다.
- Google Cloud 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
- 서비스 계정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 서비스 계정 이름:
druva-backup-function(또는 설명이 포함된 이름)을 입력합니다. - 서비스 계정 설명:
Service account for Druva Backup Cloud Run function를 입력합니다.
- 서비스 계정 이름:
- 만들고 계속하기를 클릭합니다.
- 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 역할을 추가합니다.
- 역할 선택을 클릭하고 스토리지 객체 관리자를 선택합니다.
- 다른 역할 추가를 클릭하고 Cloud Run 호출자를 선택합니다.
- 계속을 클릭합니다.
- 완료를 클릭합니다.
서비스 계정 이메일 (예:
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com)을 기록합니다.
Pub/Sub 주제 만들기
Cloud Scheduler가 Cloud Run 함수를 트리거하는 데 사용할 Pub/Sub 주제를 만듭니다.
- Google Cloud 콘솔에서 Pub/Sub > 주제로 이동합니다.
- 주제 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주제 ID:
druva-backup-trigger를 입력합니다.
- 주제 ID:
- 기본 구독 추가를 선택 해제합니다.
만들기를 클릭합니다.
Cloud Run 함수 만들기
함수 코드 준비
OAuth 2.0 클라이언트 사용자 인증 정보를 사용하여 Druva API로 인증하고, 페이지로 나누기를 사용하여 이벤트 엔드포인트를 통해 이벤트를 검색하고, 결과를 NDJSON으로 GCS 버킷에 쓰는 Cloud Run 함수를 만듭니다.
Cloud Run 함수 배포
- Google Cloud 콘솔에서 Cloud Run 함수로 이동합니다.
- 함수 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
- 환경: 2세대 선택
- 함수 이름:
druva-backup-to-gcs를 입력합니다. - 리전: GCS 버킷에 가장 가까운 리전을 선택합니다 (예:
us-central1). - 트리거 유형: Cloud Pub/Sub를 선택합니다.
- Cloud Pub/Sub 주제:
druva-backup-trigger선택 - 서비스 계정:
druva-backup-function@PROJECT_ID.iam.gserviceaccount.com선택 - 할당된 메모리:
512 MiB - 제한 시간:
540초 - 최대 인스턴스 수:
1
다음을 클릭합니다.
런타임으로 Python 3.11을 선택합니다.
진입점을
main로 설정합니다.소스 코드 편집기에서
main.py의 내용을 다음으로 바꿉니다.import base64 import json import os import time from datetime import datetime, timezone, timedelta import requests from google.cloud import storage GCS_BUCKET = os.environ["GCS_BUCKET"] GCS_PREFIX = os.environ.get("GCS_PREFIX", "druva_backup") STATE_KEY = os.environ.get("STATE_KEY", "druva_state.json") DRUVA_BASE_URL = os.environ.get("DRUVA_BASE_URL", "apis.druva.com") CLIENT_ID = os.environ["CLIENT_ID"] CLIENT_SECRET = os.environ["CLIENT_SECRET"] MAX_RECORDS = int(os.environ.get("MAX_RECORDS", "10000")) PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "500")) LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "24")) def get_oauth_token(): """Obtain OAuth 2.0 access token using client credentials grant.""" token_url = f"https://{DRUVA_BASE_URL}/token" payload = { "grant_type": "client_credentials", "scope": "read", } resp = requests.post( token_url, data=payload, auth=(CLIENT_ID, CLIENT_SECRET), timeout=30, ) resp.raise_for_status() return resp.json()["access_token"] def load_state(storage_client): """Load the persisted state (last event time and tracker) from GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") if blob.exists(): return json.loads(blob.download_as_text()) return {} def save_state(storage_client, state): """Persist state to GCS.""" bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(f"{GCS_PREFIX}/{STATE_KEY}") blob.upload_from_string( json.dumps(state), content_type="application/json", ) def fetch_events(token, state): """Fetch events from Druva API with pagination via nextPageToken.""" events_url = f"https://{DRUVA_BASE_URL}/insync/eventmanagement/v2/events" headers = { "Authorization": f"Bearer {token}", "Accept": "application/json", } params = {"pageSize": PAGE_SIZE} tracker = state.get("tracker") last_event_time = state.get("last_event_time") if tracker: params["tracker"] = tracker elif last_event_time: params["fromTime"] = last_event_time else: lookback = datetime.now(timezone.utc) - timedelta(hours=LOOKBACK_HOURS) params["fromTime"] = lookback.strftime("%Y-%m-%dT%H:%M:%SZ") all_events = [] total_fetched = 0 while total_fetched < MAX_RECORDS: resp = requests.get( events_url, headers=headers, params=params, timeout=60, ) resp.raise_for_status() data = resp.json() events = data.get("events", []) all_events.extend(events) total_fetched += len(events) new_tracker = data.get("tracker") next_page_token = data.get("nextPageToken") if new_tracker: state["tracker"] = new_tracker if next_page_token: params["nextPageToken"] = next_page_token params.pop("tracker", None) params.pop("fromTime", None) else: break if all_events: last_ts = all_events[-1].get("eventTime", "") if last_ts: state["last_event_time"] = last_ts return all_events, state def write_events_to_gcs(storage_client, events): """Write events as NDJSON to GCS.""" if not events: return now = datetime.now(timezone.utc) filename = now.strftime("%Y%m%d_%H%M%S") + ".ndjson" blob_path = f"{GCS_PREFIX}/{now.strftime('%Y/%m/%d')}/{filename}" ndjson_lines = "\n".join(json.dumps(event) for event in events) bucket = storage_client.bucket(GCS_BUCKET) blob = bucket.blob(blob_path) blob.upload_from_string( ndjson_lines, content_type="application/x-ndjson", ) print(f"Wrote {len(events)} events to gs://{GCS_BUCKET}/{blob_path}") def main(event, context): """Cloud Run function entry point triggered by Pub/Sub.""" storage_client = storage.Client() token = get_oauth_token() state = load_state(storage_client) events, updated_state = fetch_events(token, state) write_events_to_gcs(storage_client, events) save_state(storage_client, updated_state) print(f"Completed: fetched {len(events)} events") return f"OK: {len(events)} events"requirements.txt의 내용을 다음으로 바꿉니다.requests>=2.31.0 google-cloud-storage>=2.14.0
환경 변수 구성
- Cloud Run 함수 구성에서 런타임, 빌드, 연결, 보안 설정 섹션으로 이동합니다.
런타임 환경 변수에서 다음 변수를 추가합니다.
GCS_BUCKET: GCS 버킷의 이름입니다 (예:druva-backup-logs).GCS_PREFIX: 로그 파일의 접두사 경로입니다 (예:druva_backup).STATE_KEY: 상태 파일 이름 (예:druva_state.json)DRUVA_BASE_URL: Druva API 기본 URL입니다.apis.druva.com(Druva Cloud(Standard))govcloudapis.druva.com(Druva GovCloud의 경우)
CLIENT_ID: Druva API 사용자 인증 정보의 클라이언트 IDCLIENT_SECRET: Druva API 사용자 인증 정보의 보안 키MAX_RECORDS: 호출당 가져올 최대 레코드 수입니다 (예:10000).PAGE_SIZE: API 페이지당 이벤트 수 (최대500)LOOKBACK_HOURS: 첫 번째 실행에서 되돌아볼 시간 (예:24)
배포를 클릭합니다.
배포가 완료될 때까지 기다립니다.
Cloud Scheduler 작업 만들기
정기적으로 Cloud Run 함수를 트리거하는 Cloud Scheduler 작업을 만듭니다.
- Google Cloud 콘솔에서 Cloud Scheduler로 이동합니다.
- 작업 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
- 이름:
druva-backup-scheduler을 입력합니다. - 리전: Cloud Run 함수와 동일한 리전을 선택합니다 (예:
us-central1). - 설명:
Triggers Druva Backup log collection every 30 minutes입력 - 빈도:
*/30 * * * *(30분마다) 입력 - 시간대: 선호하는 시간대를 선택합니다 (예:
UTC).
- 이름:
계속을 클릭합니다.
타겟을 구성합니다.
- 대상 유형: Pub/Sub를 선택합니다.
- Cloud Pub/Sub 주제:
druva-backup-trigger선택 - 메시지 본문:
{"trigger": "scheduled"}를 입력합니다.
만들기를 클릭합니다.
Cloud Scheduler 작업 테스트
- Cloud Scheduler 목록에서
druva-backup-scheduler를 찾습니다. - 강제 실행을 클릭하여 함수를 즉시 트리거합니다.
- 다음을 확인하여 실행을 검증합니다.
- Cloud Run Functions > druva-backup-to-gcs > Logs의 Cloud Run Functions 로그
- Cloud Storage > druva-backup-logs의 새 NDJSON 파일용 GCS 버킷
Google SecOps 서비스 계정을 가져오고 피드 구성
서비스 계정 이메일 가져오기
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Druva Backup Events). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Druva Backup을 선택합니다.
서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다. 예를 들면 다음과 같습니다.
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com다음 단계에서 사용할 수 있도록 이 이메일 주소를 복사합니다.
피드 구성
- 다음을 클릭합니다.
다음 입력 매개변수의 값을 지정합니다.
스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.
gs://druva-backup-logs/druva_backup/소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
- 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
- 전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다 (기본값은 180일).
애셋 네임스페이스: 애셋 네임스페이스
수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.
다음을 클릭합니다.
확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
Google SecOps 서비스 계정에 IAM 권한 부여
Google SecOps 서비스 계정에는 Cloud Run 함수에서 작성한 로그 파일을 읽기 위해 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름 (예:
druva-backup-logs)을 클릭합니다. - 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: Google SecOps 서비스 계정 이메일 (예:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com)을 붙여넣습니다. - 역할 할당: 스토리지 객체 뷰어 선택
- 주 구성원 추가: Google SecOps 서비스 계정 이메일 (예:
저장을 클릭합니다.
UDM 매핑 테이블
| 로그 필드 | UDM 매핑 | 논리 |
|---|---|---|
| inSyncUserID, eventsGroupId, FilesMissed, FilesBackedup, TotalBackupSize, TotalBytesTransferred, facility, inSyncDataSourceID, initiator, event_type | additional.fields | 비어 있지 않은 경우 각 필드에서 생성된 라벨과 병합됨 |
| initiator | extensions.auth.type | 시작자가 이메일 정규식과 일치하는 경우 'AUTHTYPE_UNSPECIFIED'로 설정 |
| metadata.event_type | has_target_user가 true이고 has_principal이 true이면 'USER_LOGIN'으로 설정되고, has_principal이 true이고 has_target이 false이면 'STATUS_UPDATE'로 설정되며, 그 밖의 경우에는 'GENERIC_EVENT'로 설정됩니다. | |
| eventID | metadata.product_log_id | 문자열로 변환됨 |
| metadata.product_name | 'DRUVA_BACKUP'으로 설정 | |
| clientVersion | metadata.product_version | 값이 직접 복사됨 |
| inSyncDataSourceName | principal.asset.hostname | 값이 직접 복사됨 |
| ip | principal.asset.ip | IP에서 병합됨 |
| inSyncDataSourceName | principal.hostname | 값이 직접 복사됨 |
| ip | principal.ip | IP에서 병합됨 |
| clientOS | principal.platform | (?i)Linux와 일치하는 경우 'LINUX'로 설정됩니다. (?i)windows와 일치하는 경우 'WINDOWS'로 설정됩니다. (?i)mac과 일치하는 경우 'MAC'으로 설정됩니다. |
| profileName | principal.resource.name | 값이 직접 복사됨 |
| profileID | principal.resource.product_object_id | 문자열로 변환됨 |
| eventState | security_result.action | (?i)Success와 일치하면 'ALLOW', 그렇지 않으면 'BLOCK'으로 설정됩니다. |
| eventState | security_result.action_details | 값이 직접 복사됨 |
| 줄이는 것을 | security_result.severity | [0,1,2,3,LOW]에 속하는 경우 'LOW', [4,5,6,MEDIUM,SUBSTANTIAL,INFO]에 속하는 경우 'MEDIUM', [7,8,HIGH,SEVERE]에 속하는 경우 'HIGH', [9,10,VERY-HIGH,CRITICAL]에 속하는 경우 'CRITICAL'로 설정됩니다. |
| inSyncUserEmail, initiator | target.user.email_addresses | inSyncUserEmail에서 병합됨. 이메일 정규식과 일치하는 경우 이니시에이터에서도 병합됨 |
| inSyncUserName | target.user.userid | 값이 직접 복사됨 |
| metadata.vendor_name | 'DRUVA_BACKUP'으로 설정 |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.