Censys 로그 수집
이 문서에서는 Google Cloud Storage V2를 사용하여 Censys 로그를 Google Security Operations로 수집하는 방법을 설명합니다.
Censys는 API를 통해 포괄적인 공격 표면 관리 및 인터넷 인텔리전스를 제공합니다. 이 통합을 사용하면 Censys ASM에서 호스트 검색 이벤트, 위험 이벤트, 애셋 변경사항을 수집하여 분석 및 모니터링을 위해 Google SecOps로 전달할 수 있습니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Cloud Storage API가 사용 설정된 GCP 프로젝트
- GCS 버킷을 만들고 관리할 수 있는 권한
- GCS 버킷의 IAM 정책을 관리할 수 있는 권한
- Cloud Run 서비스, Pub/Sub 주제, Cloud Scheduler 작업을 만들 수 있는 권한
- Censys ASM에 대한 액세스 권한
Censys API 사용자 인증 정보 수집
- app.censys.io에서 Censys ASM 콘솔에 로그인합니다.
- 페이지 상단의 통합으로 이동합니다.
- API 키와 조직 ID를 복사하여 저장합니다.
- API 기본 URL(
https://api.platform.censys.io)을 기록해 둡니다.
Google Cloud Storage 버킷 만들기
- Google Cloud Console로 이동합니다.
- 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
- 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
- 버킷 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 버킷 이름 지정 전역적으로 고유한 이름 (예: censys-logs)을 입력합니다.위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전) 위치 위치를 선택합니다 (예: us-central1).스토리지 클래스 Standard (자주 액세스하는 로그에 권장) 액세스 제어 균일 (권장) 보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정 만들기를 클릭합니다.
Cloud Run 함수의 서비스 계정 만들기
Cloud Run 함수에는 GCS 버킷에 쓸 수 있는 권한이 있는 서비스 계정이 필요합니다.
서비스 계정 만들기
- GCP 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
- 서비스 계정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 서비스 계정 이름:
censys-data-collector-sa을 입력합니다. - 서비스 계정 설명:
Service account for Cloud Run function to collect Censys logs을 입력합니다.
- 서비스 계정 이름:
- 만들고 계속하기를 클릭합니다.
- 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 역할을 추가합니다.
- 역할 선택을 클릭합니다.
- 스토리지 객체 관리자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Run 호출자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Functions 호출자를 검색하여 선택합니다.
- 계속을 클릭합니다.
- 완료를 클릭합니다.
이러한 역할은 다음 작업에 필요합니다.
- 스토리지 객체 관리자: GCS 버킷에 로그를 쓰고 상태 파일을 관리합니다.
- Cloud Run 호출자: Pub/Sub가 함수를 호출하도록 허용
- Cloud Functions 호출자: 함수 호출 허용
GCS 버킷에 대한 IAM 권한 부여
GCS 버킷에 대한 쓰기 권한을 서비스 계정에 부여합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름을 클릭합니다.
- 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
censys-data-collector-sa@PROJECT_ID.iam.gserviceaccount.com)을 입력합니다. - 역할 할당: 스토리지 객체 관리자를 선택합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
- 저장을 클릭합니다.
게시/구독 주제 만들기
Cloud Scheduler가 게시하고 Cloud Run 함수가 구독할 Pub/Sub 주제를 만듭니다.
- GCP Console에서 Pub/Sub > 주제로 이동합니다.
- 주제 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주제 ID:
censys-data-trigger를 입력합니다. - 다른 설정은 기본값으로 둡니다.
- 주제 ID:
- 만들기를 클릭합니다.
로그를 수집하는 Cloud Run 함수 만들기
Cloud Run 함수는 Cloud Scheduler의 Pub/Sub 메시지에 의해 트리거되어 Censys ASM API에서 로그를 가져오고 GCS에 씁니다.
- GCP 콘솔에서 Cloud Run으로 이동합니다.
- 서비스 만들기를 클릭합니다.
- 함수를 선택합니다 (인라인 편집기를 사용하여 함수 만들기).
구성 섹션에서 다음 구성 세부정보를 제공합니다.
설정 값 서비스 이름 censys-data-collector리전 GCS 버킷과 일치하는 리전을 선택합니다 (예: us-central1).런타임 Python 3.12 이상 선택 트리거 (선택사항) 섹션에서 다음을 수행합니다.
- + 트리거 추가를 클릭합니다.
- Cloud Pub/Sub를 선택합니다.
- Cloud Pub/Sub 주제 선택에서 Pub/Sub 주제 (
censys-data-trigger)를 선택합니다. - 저장을 클릭합니다.
인증 섹션에서 다음을 구성합니다.
- 인증 필요를 선택합니다.
- ID 및 액세스 관리 (IAM)를 확인합니다.
- 아래로 스크롤하고 컨테이너, 네트워킹, 보안을 펼칩니다.
- 보안 탭으로 이동합니다.
- 서비스 계정: 서비스 계정 (
censys-data-collector-sa)을 선택합니다.
- 서비스 계정: 서비스 계정 (
컨테이너 탭으로 이동합니다.
- 변수 및 보안 비밀을 클릭합니다.
각 환경 변수에 대해 + 변수 추가를 클릭합니다.
변수 이름 예시 값 GCS_BUCKETcensys-logsGCS_PREFIXcensys/STATE_KEYcensys/state.jsonCENSYS_API_KEYyour-censys-api-keyCENSYS_ORG_IDyour-organization-idAPI_BASEhttps://api.platform.censys.io
변수 및 보안 비밀 탭에서 요청까지 아래로 스크롤합니다.
- 요청 제한 시간:
600초 (10분)를 입력합니다.
- 요청 제한 시간:
컨테이너의 설정 탭으로 이동합니다.
- 리소스 섹션에서 다음을 수행합니다.
- 메모리: 512MiB 이상을 선택합니다.
- CPU: 1을 선택합니다.
- 완료를 클릭합니다.
- 리소스 섹션에서 다음을 수행합니다.
실행 환경으로 스크롤합니다.
- 기본을 선택합니다 (권장).
버전 확장 섹션에서 다음을 수행합니다.
- 최소 인스턴스 수:
0를 입력합니다. - 최대 인스턴스 수:
100을 입력합니다 (또는 예상 부하에 따라 조정).
- 최소 인스턴스 수:
만들기를 클릭합니다.
서비스가 생성될 때까지 기다립니다 (1~2분).
서비스가 생성되면 인라인 코드 편집기가 자동으로 열립니다.
함수 코드 추가
- 함수 진입점에 main을 입력합니다.
인라인 코드 편집기에서 다음 두 파일을 만듭니다.
- 첫 번째 파일: main.py:
import functions_framework from google.cloud import storage import json import urllib3 import gzip import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch logs from Censys ASM API and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ # Get environment variables bucket_name = os.environ.get('GCS_BUCKET') prefix = os.environ.get('GCS_PREFIX', 'censys/') state_key = os.environ.get('STATE_KEY', 'censys/state.json') censys_api_key = os.environ.get('CENSYS_API_KEY') censys_org_id = os.environ.get('CENSYS_ORG_ID') api_base = os.environ.get('API_BASE', 'https://api.platform.censys.io') if not all([bucket_name, censys_api_key, censys_org_id]): print('Error: Missing required environment variables') return try: collector = CensysCollector( bucket_name=bucket_name, prefix=prefix, state_key=state_key, api_key=censys_api_key, org_id=censys_org_id, api_base=api_base ) # Get last collection time last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) print(f'Collecting events since {last_collection_time}') # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to GCS collector.save_events_to_gcs(logbook_events, 'logbook') collector.save_events_to_gcs(risk_events, 'risks') # Update state collector.save_collection_time(current_time) print(f'Successfully processed {len(logbook_events)} logbook events and {len(risk_events)} risk events') except Exception as e: print(f'Error processing logs: {str(e)}') raise class CensysCollector: def __init__(self, bucket_name: str, prefix: str, state_key: str, api_key: str, org_id: str, api_base: str): self.bucket_name = bucket_name self.prefix = prefix self.state_key = state_key self.headers = { 'Authorization': f'Bearer {api_key}', 'X-Organization-ID': org_id, 'Content-Type': 'application/json' } self.api_base = api_base self.bucket = storage_client.bucket(bucket_name) def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from GCS state file.""" try: blob = self.bucket.blob(self.state_key) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: print(f'No state file found or error reading state: {e}') return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to GCS state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} blob = self.bucket.blob(self.state_key) blob.upload_from_string( json.dumps(state), content_type='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{self.api_base}/v3/logbook" params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', '60')) print(f'Rate limited (429). Retrying after {retry_after}s...') import time time.sleep(retry_after) return self.collect_logbook_events(cursor) if response.status != 200: print(f'API request failed with status {response.status}: {response.data}') return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) print(f'Collected {len(events)} logbook events') return events except Exception as e: print(f'Error collecting logbook events: {e}') return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{self.api_base}/v3/risks" try: response = http.request('GET', url, headers=self.headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', '60')) print(f'Rate limited (429). Retrying after {retry_after}s...') import time time.sleep(retry_after) return self.collect_risks_events() if response.status != 200: print(f'API request failed with status {response.status}: {response.data}') return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) print(f'Collected {len(events)} risk events') return events except Exception as e: print(f'Error collecting risk events: {e}') return [] def save_events_to_gcs(self, events: List[Dict[str, Any]], event_type: str): """Save events to GCS in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{self.prefix}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = '\n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) blob = self.bucket.blob(filename) blob.upload_from_string( gz_bytes, content_type='application/gzip' ) print(f'Saved {len(events)} {event_type} events to {filename}') except Exception as e: print(f'Error saving {event_type} events to GCS: {e}') raise- 두 번째 파일: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0배포를 클릭하여 함수를 저장하고 배포합니다.
배포가 완료될 때까지 기다립니다 (2~3분).
Cloud Scheduler 작업 만들기
Cloud Scheduler는 일정 간격으로 Pub/Sub 주제에 메시지를 게시하여 Cloud Run 함수를 트리거합니다.
- GCP Console에서 Cloud Scheduler로 이동합니다.
- 작업 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 censys-data-collector-hourly리전 Cloud Run 함수와 동일한 리전 선택 주파수 0 * * * *(매시간 정각)시간대 시간대 선택 (UTC 권장) 타겟 유형 Pub/Sub 주제 Pub/Sub 주제 ( censys-data-trigger)를 선택합니다.메일 본문 {}(빈 JSON 객체)만들기를 클릭합니다.
일정 빈도 옵션
로그 볼륨 및 지연 시간 요구사항에 따라 빈도를 선택합니다.
빈도 크론 표현식 사용 사례 5분마다 */5 * * * *대용량, 저지연 15분마다 */15 * * * *검색량 보통 1시간마다 0 * * * *일반(권장) 6시간마다 0 */6 * * *양이 적은 일괄 처리 매일 0 0 * * *이전 데이터 수집
통합 테스트
- Cloud Scheduler 콘솔에서 작업을 찾습니다.
- 강제 실행을 클릭하여 작업을 수동으로 트리거합니다.
- 몇 초 동안 기다립니다.
- Cloud Run > 서비스로 이동합니다.
- 함수 이름 (
censys-data-collector)을 클릭합니다. - 로그 탭을 클릭합니다.
함수가 성공적으로 실행되었는지 확인합니다. 다음을 확인하세요.
Collecting events since YYYY-MM-DDTHH:MM:SS+00:00 Collected X logbook events Collected X risk events Saved X logbook events to censys/logbook_YYYYMMDD_HHMMSS.json.gz Saved X risks events to censys/risks_YYYYMMDD_HHMMSS.json.gz Successfully processed X logbook events and X risk eventsCloud Storage> 버킷으로 이동합니다.
버킷 이름을 클릭합니다.
접두사 폴더 (
censys/)로 이동합니다.새
.json.gz파일이 현재 타임스탬프로 생성되었는지 확인합니다.
로그에 오류가 표시되면 다음 단계를 따르세요.
- HTTP 401: 환경 변수에서 API 사용자 인증 정보 확인
- HTTP 403: 계정에 필요한 권한이 있는지 확인
- HTTP 429: 비율 제한 - 함수가 백오프를 사용하여 자동으로 재시도됩니다.
- 환경 변수 누락: 필수 변수가 모두 설정되었는지 확인
Google SecOps 서비스 계정 가져오기
Google SecOps는 고유한 서비스 계정을 사용하여 GCS 버킷에서 데이터를 읽습니다. 이 서비스 계정에 버킷에 대한 액세스 권한을 부여해야 합니다.
서비스 계정 이메일 가져오기
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Censys logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 CENSYS를 선택합니다.
- 서비스 계정 가져오기를 클릭합니다.
고유한 서비스 계정 이메일이 표시됩니다(예:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com다음 단계에서 사용할 수 있도록 이 이메일 주소를 복사합니다.
Google SecOps 서비스 계정에 IAM 권한 부여
Google SecOps 서비스 계정에는 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름을 클릭합니다.
- 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: Google SecOps 서비스 계정 이메일을 붙여넣습니다.
- 역할 할당: 스토리지 객체 뷰어를 선택합니다.
- 저장을 클릭합니다.
Censys 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Censys logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 CENSYS를 선택합니다.
- 다음을 클릭합니다.
다음 입력 매개변수의 값을 지정합니다.
스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.
gs://censys-logs/censys/다음과 같이 바꿉니다.
censys-logs: GCS 버킷 이름입니다.censys/: 로그가 저장되는 선택적 접두사/폴더 경로입니다 (루트의 경우 비워 둠).
예:
- 루트 버킷:
gs://censys-logs/ - 접두사 사용:
gs://censys-logs/censys/
- 루트 버킷:
소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
- 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
애셋 네임스페이스: 애셋 네임스페이스입니다.
수집 라벨: 이 피드의 이벤트에 적용할 라벨입니다.
다음을 클릭합니다.
확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
| 로그 필드 | UDM 매핑 | 로직 |
|---|---|---|
| assetId | read_only_udm.principal.asset.hostname | assetId 필드가 IP 주소가 아닌 경우 principal.asset.hostname에 매핑됩니다. |
| assetId | read_only_udm.principal.asset.ip | assetId 필드가 IP 주소인 경우 principal.asset.ip에 매핑됩니다. |
| assetId | read_only_udm.principal.hostname | assetId 필드가 IP 주소가 아닌 경우 principal.hostname에 매핑됩니다. |
| assetId | read_only_udm.principal.ip | assetId 필드가 IP 주소인 경우 principal.ip에 매핑됩니다. |
| associatedAt | read_only_udm.security_result.detection_fields.value | associatedAt 필드가 security_result.detection_fields.value에 매핑됩니다. |
| autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | autonomousSystem.asn 필드가 문자열로 변환되고 키가 'autonomousSystem_asn'인 additional.fields.value.string_value에 매핑됩니다. |
| autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | autonomousSystem.bgpPrefix 필드는 키가 'autonomousSystem_bgpPrefix'인 additional.fields.value.string_value에 매핑됩니다. |
| 배너 | read_only_udm.principal.resource.attribute.labels.value | 배너 필드는 키가 'banner'인 principal.resource.attribute.labels.value에 매핑됩니다. |
| 클라우드 | read_only_udm.metadata.vendor_name | cloud 필드가 metadata.vendor_name에 매핑됩니다. |
| comments.refUrl | read_only_udm.network.http.referral_url | comments.refUrl 필드가 network.http.referral_url에 매핑됩니다. |
| data.cve | read_only_udm.additional.fields.value.string_value | data.cve 필드가 키가 'data_cve'인 additional.fields.value.string_value에 매핑됩니다. |
| data.cvss | read_only_udm.additional.fields.value.string_value | data.cvss 필드가 키 'data_cvss'를 사용하여 additional.fields.value.string_value에 매핑됩니다. |
| data.ipAddress | read_only_udm.principal.asset.ip | data.ipAddress 필드가 assetId 필드와 같지 않으면 principal.asset.ip에 매핑됩니다. |
| data.ipAddress | read_only_udm.principal.ip | data.ipAddress 필드가 assetId 필드와 같지 않으면 principal.ip에 매핑됩니다. |
| data.location.city | read_only_udm.principal.location.city | location.city 필드가 비어 있으면 data.location.city 필드가 principal.location.city에 매핑됩니다. |
| data.location.countryCode | read_only_udm.principal.location.country_or_region | location.country 필드가 비어 있으면 data.location.countryCode 필드가 principal.location.country_or_region에 매핑됩니다. |
| data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude 및 location.geoCoordinates.latitude 필드가 비어 있으면 data.location.latitude 필드가 부동 소수점으로 변환되고 principal.location.region_coordinates.latitude에 매핑됩니다. |
| data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude 및 location.geoCoordinates.longitude 필드가 비어 있으면 data.location.longitude 필드가 부동 소수점으로 변환되고 principal.location.region_coordinates.longitude에 매핑됩니다. |
| data.location.province | read_only_udm.principal.location.state | location.province 필드가 비어 있으면 data.location.province 필드가 principal.location.state에 매핑됩니다. |
| data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | data.mailServers 배열의 각 요소는 키가 'Mail Servers'이고 value.list_value.values.string_value가 요소 값으로 설정된 별도의 additional.fields 항목에 매핑됩니다. |
| data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | data.names.forwardDns 배열의 각 요소는 name 필드가 요소의 name 필드로 설정된 별도의 network.dns.questions 항목에 매핑됩니다. |
| data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | data.nameServers 배열의 각 요소는 키가 'Name nameServers'이고 value.list_value.values.string_value가 요소 값으로 설정된 별도의 additional.fields 항목에 매핑됩니다. |
| data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | data.protocols[].transportProtocol 필드가 TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP 또는 VRRP 중 하나인 경우 network.ip_protocol에 매핑됩니다. |
| data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | data.protocols[].transportProtocol 필드가 키 'data_protocols {index}'와 함께 principal.resource.attribute.labels.value에 매핑됩니다. |
| http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | http.request.headers[].key 필드가 'User-Agent'인 경우 해당 http.request.headers[].value.headers.0 필드가 network.http.user_agent에 매핑됩니다. |
| http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | http.request.headers[].key 필드가 'User-Agent'인 경우 해당 http.request.headers[].value.headers.0 필드가 사용자 에이전트 문자열로 파싱되고 network.http.parsed_user_agent에 매핑됩니다. |
| http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | http.request.headers 배열의 각 요소에 대해 key 필드는 principal.resource.attribute.labels.key에 매핑되고 value.headers.0 필드는 principal.resource.attribute.labels.value에 매핑됩니다. |
| http.request.uri | read_only_udm.principal.asset.hostname | http.request.uri 필드의 호스트 이름 부분이 추출되어 principal.asset.hostname에 매핑됩니다. |
| http.request.uri | read_only_udm.principal.hostname | http.request.uri 필드의 호스트 이름 부분이 추출되어 principal.hostname에 매핑됩니다. |
| http.response.body | read_only_udm.principal.resource.attribute.labels.value | http.response.body 필드가 키 'http_response_body'와 함께 principal.resource.attribute.labels.value에 매핑됩니다. |
| http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.target.hostname | http.response.headers[].key 필드가 'Server'인 경우 해당 http.response.headers[].value.headers.0 필드가 target.hostname에 매핑됩니다. |
| http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | http.response.headers 배열의 각 요소에 대해 key 필드는 principal.resource.attribute.labels.key에 매핑되고 value.headers.0 필드는 principal.resource.attribute.labels.value에 매핑됩니다. |
| http.response.statusCode | read_only_udm.network.http.response_code | http.response.statusCode 필드가 정수로 변환되고 network.http.response_code에 매핑됩니다. |
| ip | read_only_udm.target.asset.ip | ip 필드가 target.asset.ip에 매핑됩니다. |
| ip | read_only_udm.target.ip | ip 필드가 target.ip에 매핑됩니다. |
| isSeed | read_only_udm.additional.fields.value.string_value | isSeed 필드가 문자열로 변환되고 키 'isSeed'를 사용하여 additional.fields.value.string_value에 매핑됩니다. |
| location.city | read_only_udm.principal.location.city | location.city 필드가 principal.location.city에 매핑됩니다. |
| location.continent | read_only_udm.additional.fields.value.string_value | location.continent 필드는 키가 'location_continent'인 additional.fields.value.string_value에 매핑됩니다. |
| location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude 필드가 부동 소수점으로 변환되고 principal.location.region_coordinates.latitude에 매핑됩니다. |
| location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude 필드가 부동 소수점으로 변환되고 principal.location.region_coordinates.longitude에 매핑됩니다. |
| location.country | read_only_udm.principal.location.country_or_region | location.country 필드가 principal.location.country_or_region에 매핑됩니다. |
| location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude 필드가 비어 있으면 location.geoCoordinates.latitude 필드가 float로 변환되고 principal.location.region_coordinates.latitude에 매핑됩니다. |
| location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude 필드가 비어 있으면 location.geoCoordinates.longitude 필드가 float로 변환되고 principal.location.region_coordinates.longitude에 매핑됩니다. |
| location.postalCode | read_only_udm.additional.fields.value.string_value | location.postalCode 필드는 키가 '우편번호'인 additional.fields.value.string_value에 매핑됩니다. |
| location.province | read_only_udm.principal.location.state | location.province 필드가 principal.location.state에 매핑됩니다. |
| 작업 | read_only_udm.security_result.action_details | operation 필드는 security_result.action_details에 매핑됩니다. |
| perspectiveId | read_only_udm.principal.group.product_object_id | perspectiveId 필드가 principal.group.product_object_id에 매핑됩니다. |
| 포트 | read_only_udm.principal.port | 포트 필드가 정수로 변환되고 principal.port에 매핑됩니다. |
| risks[].severity, risks[].title | read_only_udm.security_result.category_details | risks[].severity 필드가 risks[].title 필드와 연결되어 security_result.category_details에 매핑됩니다. |
| serviceName | read_only_udm.network.application_protocol | serviceName 필드가 'HTTP' 또는 'HTTPS'인 경우 network.application_protocol에 매핑됩니다. |
| sourceIp | read_only_udm.principal.asset.ip | sourceIp 필드가 principal.asset.ip에 매핑됩니다. |
| sourceIp | read_only_udm.principal.ip | sourceIp 필드가 principal.ip에 매핑됩니다. |
| 타임스탬프 | read_only_udm.metadata.event_timestamp | 타임스탬프 필드는 타임스탬프로 파싱되고 metadata.event_timestamp에 매핑됩니다. |
| transportFingerprint.id | read_only_udm.metadata.product_log_id | transportFingerprint.id 필드가 문자열로 변환되고 metadata.product_log_id에 매핑됩니다. |
| transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | transportFingerprint.raw 필드가 키 'transportFingerprint_raw'와 함께 additional.fields.value.string_value에 매핑됩니다. |
| 유형 | read_only_udm.metadata.product_event_type | type 필드가 metadata.product_event_type에 매핑됩니다. |
| - | read_only_udm.metadata.product_name | 'CENSYS_ASM' 값이 metadata.product_name에 할당됩니다. |
| - | read_only_udm.metadata.vendor_name | 'CENSYS' 값이 metadata.vendor_name에 할당됩니다. |
| - | read_only_udm.metadata.event_type | 이벤트 유형은 특정 필드의 존재 여부에 따라 결정됩니다. has_princ_machine_id 및 has_target_machine이 true이고 has_network_flow가 false인 경우 NETWORK_CONNECTION, has_network_flow가 true인 경우 NETWORK_DNS, has_princ_machine_id가 true인 경우 STATUS_UPDATE, 그 외의 경우 GENERIC_EVENT입니다. |
도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.