Symantec WSS 로그 수집

다음에서 지원:

이 문서에서는 Amazon S3를 사용하여 Symantec Web Security Service (WSS) 로그를 Google Security Operations로 수집하는 방법을 설명합니다. 파서는 먼저 로그 메시지를 JSON으로 파싱하려고 시도합니다. 이 과정이 실패하면 점점 더 구체적인 grok 패턴을 사용하여 원시 텍스트에서 필드를 추출하고 궁극적으로 추출된 데이터를 통합 데이터 모델 (UDM)에 매핑합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스입니다.
  • Symantec Web Security Service에 대한 권한 있는 액세스
  • AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge)에 대한 권한 있는 액세스

Symantec WSS 필수사항 (ID, API 키, 조직 ID, 토큰) 수집

  1. 관리자로 Symantec Web Security Service Portal에 로그인합니다.
  2. 계정 > API 사용자 인증 정보로 이동합니다.
  3. 추가를 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • API 이름: 설명이 포함된 이름 (예: Google SecOps Integration)을 입력합니다.
    • 설명: API 사용자 인증 정보에 대한 설명을 입력합니다.
  5. 저장을 클릭하고 생성된 API 사용자 인증 정보를 안전하게 복사합니다.
  6. WSS 포털 URL동기화 API 엔드포인트를 기록합니다.
  7. 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
    • WSS_API_USERNAME.
    • WSS_API_PASSWORD.
    • WSS_SYNC_URL.

Google SecOps용 AWS S3 버킷 및 IAM 구성

  1. 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 참조할 수 있도록 버킷 이름리전을 저장합니다 (예: symantec-wss-logs).
  3. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  4. 생성된 사용자를 선택합니다.
  5. 보안 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. 정책 직접 연결을 선택합니다.
  17. AmazonS3FullAccess 정책을 검색합니다.
  18. 정책을 선택합니다.
  19. 다음을 클릭합니다.
  20. 권한 추가를 클릭합니다.

S3 업로드용 IAM 정책 및 역할 구성

  1. AWS 콘솔에서 IAM > 정책으로 이동합니다.
  2. 정책 만들기 > JSON 탭을 클릭합니다.
  3. 다음 정책을 복사하여 붙여넣습니다.
  4. 정책 JSON (다른 버킷 이름을 입력한 경우 symantec-wss-logs 대체):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::symantec-wss-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::symantec-wss-logs/symantec/wss/state.json"
        }
      ]
    }
    
  5. 다음 > 정책 만들기를 클릭합니다.

  6. IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.

  7. 새로 만든 정책을 연결합니다.

  8. 역할 이름을 SymantecWssToS3Role로 지정하고 역할 만들기를 클릭합니다.

Lambda 함수 만들기

  1. AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
  2. 처음부터 작성을 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    설정
    이름 symantec_wss_to_s3
    런타임 Python 3.13
    아키텍처 x86_64
    실행 역할 SymantecWssToS3Role
  4. 함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드 (symantec_wss_to_s3.py)를 붙여넣습니다.

    #!/usr/bin/env python3
    # Lambda: Pull Symantec WSS logs and store raw payloads to S3
    # - Time window via millisecond timestamps for WSS Sync API.
    # - Preserves vendor-native format (CSV/JSON/ZIP).
    # - Retries with exponential backoff; unique S3 keys to avoid overwrites.
    
    import os, json, time, uuid
    from urllib.request import Request, urlopen
    from urllib.error import URLError, HTTPError
    
    import boto3
    
    S3_BUCKET   = os.environ["S3_BUCKET"]
    S3_PREFIX   = os.environ.get("S3_PREFIX", "symantec/wss/")
    STATE_KEY   = os.environ.get("STATE_KEY", "symantec/wss/state.json")
    WINDOW_SEC  = int(os.environ.get("WINDOW_SECONDS", "3600"))  # default 1h
    HTTP_TIMEOUT= int(os.environ.get("HTTP_TIMEOUT", "60"))
    WSS_SYNC_URL = os.environ.get("WSS_SYNC_URL", "https://portal.threatpulse.com/reportpod/logs/sync")
    API_USERNAME = os.environ["WSS_API_USERNAME"]
    API_PASSWORD = os.environ["WSS_API_PASSWORD"]
    TOKEN_PARAM  = os.environ.get("WSS_TOKEN_PARAM", "none")
    MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
    USER_AGENT  = os.environ.get("USER_AGENT", "symantec-wss-to-s3/1.0")
    
    s3 = boto3.client("s3")
    
    def _load_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            return {}
    
    def _save_state(st):
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _ms_timestamp(ts: float) -> int:
        """Convert Unix timestamp to milliseconds for WSS API"""
        return int(ts * 1000)
    
    def _fetch_wss_logs(start_ms: int, end_ms: int) -> tuple[bytes, str, str]:
        # WSS Sync API parameters
        params = f"startDate={start_ms}&endDate={end_ms}&token={TOKEN_PARAM}"
        url = f"{WSS_SYNC_URL}?{params}"
    
        attempt = 0
        while True:
            req = Request(url, method="GET")
            req.add_header("User-Agent", USER_AGENT)
            req.add_header("X-APIUsername", API_USERNAME)
            req.add_header("X-APIPassword", API_PASSWORD)
    
            try:
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    blob = r.read()
                    content_type = r.headers.get("Content-Type", "application/octet-stream")
                    content_encoding = r.headers.get("Content-Encoding", "")
                    return blob, content_type, content_encoding
            except (HTTPError, URLError) as e:
                attempt += 1
                print(f"HTTP error on attempt {attempt}: {e}")
                if attempt > MAX_RETRIES:
                    raise
                # exponential backoff with jitter
                time.sleep(min(60, 2 ** attempt) + (time.time() % 1))
    
    def _determine_extension(content_type: str, content_encoding: str) -> str:
        """Determine file extension based on content type and encoding"""
        if "zip" in content_type.lower():
            return ".zip"
        if "gzip" in content_type.lower() or content_encoding.lower() == "gzip":
            return ".gz"
        if "json" in content_type.lower():
            return ".json"
        if "csv" in content_type.lower():
            return ".csv"
        return ".bin"
    
    def _put_wss_data(blob: bytes, content_type: str, content_encoding: str, from_ts: float, to_ts: float) -> str:
        # Create unique S3 key for WSS data
        ts_path = time.strftime("%Y/%m/%d", time.gmtime(to_ts))
        uniq = f"{int(time.time()*1e6)}_{uuid.uuid4().hex[:8]}"
        ext = _determine_extension(content_type, content_encoding)
        key = f"{S3_PREFIX}{ts_path}/symantec_wss_{int(from_ts)}_{int(to_ts)}_{uniq}{ext}"
    
        s3.put_object(
            Bucket=S3_BUCKET, 
            Key=key, 
            Body=blob, 
            ContentType=content_type,
            Metadata={
                'source': 'symantec-wss',
                'from_timestamp': str(int(from_ts)),
                'to_timestamp': str(int(to_ts)),
                'content_encoding': content_encoding
            }
        )
        return key
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        now = time.time()
        from_ts = float(st.get("last_to_ts") or (now - WINDOW_SEC))
        to_ts = now
    
        # Convert to milliseconds for WSS API
        start_ms = _ms_timestamp(from_ts)
        end_ms = _ms_timestamp(to_ts)
    
        print(f"Fetching Symantec WSS logs from {start_ms} to {end_ms}")
    
        blob, content_type, content_encoding = _fetch_wss_logs(start_ms, end_ms)
    
        print(f"Retrieved {len(blob)} bytes with content-type: {content_type}")
        if content_encoding:
            print(f"Content encoding: {content_encoding}")
    
        key = _put_wss_data(blob, content_type, content_encoding, from_ts, to_ts)
    
        st["last_to_ts"] = to_ts
        st["last_successful_run"] = now
        _save_state(st)
    
        return {
            "statusCode": 200,
            "body": {
                "success": True, 
                "s3_key": key, 
                "content_type": content_type,
                "content_encoding": content_encoding,
                "from_timestamp": from_ts,
                "to_timestamp": to_ts,
                "bytes_retrieved": len(blob)
            }
        }
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. 구성 > 환경 변수로 이동합니다.

  6. 수정 > 새 환경 변수 추가를 클릭합니다.

  7. 다음 표에 제공된 환경 변수를 입력하고 예시 값을 실제 값으로 바꿉니다.

    환경 변수

    예시 값
    S3_BUCKET symantec-wss-logs
    S3_PREFIX symantec/wss/
    STATE_KEY symantec/wss/state.json
    WINDOW_SECONDS 3600
    HTTP_TIMEOUT 60
    MAX_RETRIES 3
    USER_AGENT symantec-wss-to-s3/1.0
    WSS_SYNC_URL https://portal.threatpulse.com/reportpod/logs/sync
    WSS_API_USERNAME your-api-username (2단계에서)
    WSS_API_PASSWORD your-api-password (2단계에서)
    WSS_TOKEN_PARAM none
  8. 함수가 생성되면 해당 페이지에 머무르거나 Lambda > 함수 > your-function을 엽니다.

  9. 구성 탭을 선택합니다.

  10. 일반 구성 패널에서 수정을 클릭합니다.

  11. 제한 시간5분 (300초)으로 변경하고 저장을 클릭합니다.

EventBridge 일정 만들기

  1. Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
  2. 다음 구성 세부정보를 제공합니다.
    • 반복 일정: 요금 (1 hour)
    • 타겟: Lambda 함수 symantec_wss_to_s3
    • 이름: symantec-wss-1h.
  3. 일정 만들기를 클릭합니다.

(선택사항) Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기

  1. AWS 콘솔에서 IAM > 사용자로 이동합니다.
  2. Add users를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 사용자: secops-reader를 입력합니다.
    • 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
  4. 사용자 만들기를 클릭합니다.
  5. 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::symantec-wss-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::symantec-wss-logs"
        }
      ]
    }
    
  7. 이름 = secops-reader-policy

  8. 정책 만들기 > 검색/선택 > 다음 > 권한 추가를 클릭합니다.

  9. secops-reader의 액세스 키를 만듭니다(보안 사용자 인증 정보 > 액세스 키).

  10. 액세스 키 만들기를 클릭합니다.

  11. CSV를 다운로드합니다. (이 값은 피드에 붙여넣습니다.)

Symantec WSS 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정> 피드로 이동합니다.
  2. + 새 피드 추가를 클릭합니다.
  3. 피드 이름 필드에 피드 이름을 입력합니다 (예: Symantec WSS logs).
  4. 소스 유형으로 Amazon S3 V2를 선택합니다.
  5. 로그 유형으로 Symantec WSS를 선택합니다.
  6. 다음을 클릭합니다.
  7. 다음 입력 파라미터의 값을 지정합니다.
    • S3 URI: s3://symantec-wss-logs/symantec/wss/
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
    • 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
    • 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
    • 애셋 네임스페이스: 애셋 네임스페이스입니다.
    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
  8. 다음을 클릭합니다.
  9. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

UDM 매핑 테이블

로그 필드 UDM 매핑 논리
category_id read_only_udm.metadata.product_event_type category_id가 1이면 read_only_udm.metadata.product_event_type이 Security로 설정됩니다. category_id가 5이면 read_only_udm.metadata.product_event_type이 Policy로 설정됩니다.
collector_device_ip read_only_udm.principal.ip, read_only_udm.principal.asset.ip collector_device_ip 필드의 값
connection.bytes_download read_only_udm.network.received_bytes connection.bytes_download 필드 값이 정수로 변환됨
connection.bytes_upload read_only_udm.network.sent_bytes integer로 변환된 connection.bytes_upload 필드의 값
connection.dst_ip read_only_udm.target.ip connection.dst_ip 필드의 값
connection.dst_location.country read_only_udm.target.location.country_or_region connection.dst_location.country 필드의 값
connection.dst_name read_only_udm.target.hostname connection.dst_name 필드 값
connection.dst_port read_only_udm.target.port connection.dst_port 필드 값이 정수로 변환됨
connection.http_status read_only_udm.network.http.response_code connection.http_status 필드 값이 정수로 변환됨
connection.http_user_agent read_only_udm.network.http.user_agent connection.http_user_agent 필드의 값
connection.src_ip read_only_udm.principal.ip, read_only_udm.src.ip connection.src_ip 필드의 값입니다. src_ip 또는 collector_device_ip가 비어 있지 않으면 read_only_udm.src.ip에 매핑됩니다.
connection.tls.version read_only_udm.network.tls.version_protocol connection.tls.version 필드의 값
connection.url.host read_only_udm.target.hostname connection.url.host 필드의 값
connection.url.method read_only_udm.network.http.method connection.url.method 필드의 값
connection.url.path read_only_udm.target.url connection.url.path 필드의 값
connection.url.text read_only_udm.target.url connection.url.text 필드의 값
cs_connection_negotiated_cipher read_only_udm.network.tls.cipher cs_connection_negotiated_cipher 필드의 값
cs_icap_status read_only_udm.security_result.description cs_icap_status 필드의 값
device_id read_only_udm.target.resource.id, read_only_udm.target.resource.product_object_id device_id 필드의 값
device_ip read_only_udm.intermediary.ip, read_only_udm.intermediary.asset.ip device_ip 필드의 값
device_time read_only_udm.metadata.collected_timestamp, read_only_udm.metadata.event_timestamp device_time 필드의 값이 문자열로 변환됩니다. when이 비어 있으면 read_only_udm.metadata.event_timestamp에 매핑됩니다.
호스트 이름 read_only_udm.principal.hostname, read_only_udm.principal.asset.hostname 호스트 이름 필드의 값
log_time read_only_udm.metadata.event_timestamp log_time 필드 값이 타임스탬프로 변환되었습니다. when 및 device_time이 비어 있으면 read_only_udm.metadata.event_timestamp에 매핑됩니다.
msg_desc read_only_udm.metadata.description msg_desc 필드의 값
os_details read_only_udm.target.asset.platform_software.platform, read_only_udm.target.asset.platform_software.platform_version os_details 필드의 값입니다. os_details가 비어 있지 않으면 파싱되어 os_name과 os_ver이 추출됩니다. os_name에 Windows이 포함되어 있으면 read_only_udm.target.asset.platform_software.platform이 WINDOWS로 설정됩니다. os_ver가 read_only_udm.target.asset.platform_software.platform_version에 매핑됨
product_data.cs(Referer) read_only_udm.network.http.referral_url product_data.cs(Referer) 필드의 값
product_data.r-supplier-country read_only_udm.principal.location.country_or_region product_data.r-supplier-country 필드의 값
product_data.s-supplier-ip read_only_udm.intermediary.ip, read_only_udm.intermediary.asset.ip product_data.s-supplier-ip 필드의 값
product_data.x-bluecoat-application-name read_only_udm.target.application product_data.x-bluecoat-application-name 필드의 값
product_data.x-bluecoat-transaction-uuid read_only_udm.metadata.product_log_id product_data.x-bluecoat-transaction-uuid 필드의 값
product_data.x-client-agent-sw read_only_udm.observer.platform_version product_data.x-client-agent-sw 필드의 값
product_data.x-client-agent-type read_only_udm.observer.application product_data.x-client-agent-type 필드의 값
product_data.x-client-device-id read_only_udm.target.resource.type, read_only_udm.target.resource.id, read_only_udm.target.resource.product_object_id 비어 있지 않으면 read_only_udm.target.resource.type이 DEVICE로 설정됩니다. product_data.x-client-device-id 필드 값이 read_only_udm.target.resource.id 및 read_only_udm.target.resource.product_object_id에 매핑됩니다.
product_data.x-client-device-name read_only_udm.src.hostname, read_only_udm.src.asset.hostname product_data.x-client-device-name 필드의 값
product_data.x-cs-client-ip-country read_only_udm.target.location.country_or_region product_data.x-cs-client-ip-country 필드의 값
product_data.x-cs-connection-negotiated-cipher read_only_udm.network.tls.cipher product_data.x-cs-connection-negotiated-cipher 필드의 값
product_data.x-cs-connection-negotiated-ssl-version read_only_udm.network.tls.version_protocol product_data.x-cs-connection-negotiated-ssl-version 필드의 값
product_data.x-exception-id read_only_udm.security_result.summary product_data.x-exception-id 필드의 값
product_data.x-rs-certificate-hostname read_only_udm.network.tls.client.server_name product_data.x-rs-certificate-hostname 필드의 값
product_data.x-rs-certificate-hostname-categories read_only_udm.security_result.category_details product_data.x-rs-certificate-hostname-categories 필드의 값
product_data.x-rs-certificate-observed-errors read_only_udm.network.tls.server.certificate.issuer product_data.x-rs-certificate-observed-errors 필드의 값
product_data.x-rs-certificate-validate-status read_only_udm.network.tls.server.certificate.subject product_data.x-rs-certificate-validate-status 필드의 값
product_name read_only_udm.metadata.product_name product_name 필드의 값
product_ver read_only_udm.metadata.product_version product_ver 필드의 값
proxy_connection.src_ip read_only_udm.intermediary.ip, read_only_udm.intermediary.asset.ip proxy_connection.src_ip 필드의 값
received_bytes read_only_udm.network.received_bytes received_bytes 필드 값이 정수로 변환됨
ref_uid read_only_udm.metadata.product_log_id ref_uid 필드의 값
s_action read_only_udm.metadata.description s_action 필드의 값
sent_bytes read_only_udm.network.sent_bytes sent_bytes 필드 값이 정수로 변환됨
severity_id read_only_udm.security_result.severity severity_id가 1 또는 2인 경우 read_only_udm.security_result.severity가 LOW로 설정됩니다. severity_id가 3 또는 4인 경우 read_only_udm.security_result.severity가 MEDIUM로 설정됩니다. severity_id가 5 또는 6이면 read_only_udm.security_result.severity가 HIGH로 설정됩니다.
supplier_country read_only_udm.principal.location.country_or_region supplier_country 필드의 값
target_ip read_only_udm.target.ip, read_only_udm.target.asset.ip target_ip 필드의 값
user.full_name read_only_udm.principal.user.user_display_name user.full_name 필드의 값
user.name read_only_udm.principal.user.user_display_name user.name 필드의 값
user_name read_only_udm.principal.user.user_display_name user_name 필드의 값
uuid read_only_udm.metadata.product_log_id uuid 필드의 값
when read_only_udm.metadata.event_timestamp 필드가 타임스탬프로 변환된 시점의 값
read_only_udm.metadata.event_type 호스트 이름이 비어 있고 connection.dst_ip가 비어 있지 않은 경우 NETWORK_UNCATEGORIZED로 설정됩니다. 호스트 이름이 비어 있지 않으면 SCAN_NETWORK로 설정됩니다. has_principal 및 has_target가 true이면 NETWORK_CONNECTION로 설정합니다. has_principal이 true이고 has_target이 false이면 STATUS_UPDATE로 설정합니다. has_principal 및 has_target가 false이면 GENERIC_EVENT로 설정합니다.
read_only_udm.metadata.log_type 항상 SYMANTEC_WSS로 설정
read_only_udm.metadata.vendor_name 항상 SYMANTEC로 설정
read_only_udm.security_result.action product_data.sc-filter_result가 OBSERVED 또는 PROXIED인 경우 ALLOW로 설정합니다. product_data.sc-filter_result가 DENIED이면 BLOCK로 설정합니다.
read_only_udm.security_result.action_details product_data.sc-filter_result 필드의 값
read_only_udm.target.resource.type product_data.x-client-device-id가 비어 있지 않으면 DEVICE로 설정

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.