Proofpoint Emerging Threats Pro IOC 로그 수집

다음에서 지원:

이 문서에서는 Amazon S3를 사용하여 Proofpoint Emerging Threats Pro IOC 로그를 Google Security Operations로 수집하는 방법을 설명합니다. Emerging Threats Intelligence는 카테고리, 점수, 시간 정보를 포함한 위협 인텔리전스 데이터와 함께 IP 및 도메인의 평판 목록을 CSV 형식으로 매시간 게시합니다. 파서 코드는 CSV 형식의 ET_PRO 위협 인텔리전스 데이터를 처리합니다. IP 주소, 도메인, 카테고리, 점수, 기타 관련 정보를 추출하여 표준화된 IOC 형식과 Chronicle UDM 스키마에 매핑하여 Google SecOps 내에서 추가 분석 및 사용을 지원합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • 피드를 만들 권한이 있는 Google SecOps 인스턴스
  • 평판 목록에 액세스할 수 있는 Proofpoint ET 인텔리전스 구독
  • https://etadmin.proofpoint.com/api-access의 ET Intelligence API 키
  • AWS (S3, IAM, Lambda, EventBridge)에 대한 액세스 권한

Emerging Threats Pro 기본 요건 수집

  1. https://etadmin.proofpoint.com에서 ET Intelligence Admin Portal에 로그인합니다.
  2. API 액세스로 이동합니다.
  3. API 키를 복사하여 저장합니다.
  4. Proofpoint 담당자에게 문의하여 다음을 받으세요.
    • 자세한 IP 평판 목록 URL
    • 자세한 도메인 평판 목록 URL

ET Intelligence는 IP 및 도메인 평판 목록에 대해 별도의 CSV 파일을 제공하며, 이는 매시간 업데이트됩니다. 다음 열이 포함된 '자세한' 형식을 사용합니다. * 도메인 목록: Domain Name, Category, Score, First Seen, Last Seen, Ports * IP 목록: IP Address, Category, Score, First Seen, Last Seen, Ports

AWS S3 버킷 및 IAM 구성

S3 버킷 만들기

  1. Amazon S3 콘솔을 엽니다.
  2. 버킷 만들기를 클릭합니다.
  3. 버킷 이름: et-pro-ioc-bucket (또는 원하는 이름)을 입력합니다.
  4. 리전: 원하는 리전을 선택합니다.
  5. 버킷 만들기를 클릭합니다.

Google SecOps용 IAM 사용자 만들기

  1. IAM 콘솔을 엽니다.
  2. 사용자 > 사용자 만들기를 클릭합니다.
  3. 사용자 이름: secops-reader을 입력합니다.
  4. 다음을 클릭합니다.
  5. 정책 직접 연결을 선택합니다.
  6. 정책 만들기를 클릭합니다.
  7. JSON 편집기에 다음 정책을 입력합니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket",
          "Condition": {
            "StringLike": {
              "s3:prefix": ["et-pro-ioc/*"]
            }
          }
        }
      ]
    }
    
  8. 정책 이름을 SecOpsReaderPolicy로 지정합니다.

  9. 정책 만들기를 클릭합니다.

  10. 사용자 생성으로 돌아가 새로 만든 정책을 선택합니다.

  11. 다음 > 사용자 만들기를 클릭합니다.

  12. 보안용 사용자 인증 정보 탭으로 이동합니다.

  13. 액세스 키 만들기를 클릭합니다.

  14. 서드 파티 서비스를 선택합니다.

  15. 액세스 키 만들기를 클릭합니다.

  16. 사용자 인증 정보를 다운로드하여 저장합니다.

Lambda의 IAM 역할 구성

  1. AWS 콘솔에서 IAM > 역할 > 역할 만들기로 이동합니다.
  2. AWS 서비스 > Lambda를 선택합니다.
  3. 다음을 클릭합니다.
  4. 정책 만들기를 클릭합니다.
  5. JSON 탭을 선택하고 다음을 입력합니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/*"
        },
        {
          "Sid": "AllowStateManagement",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::et-pro-ioc-bucket/et-pro-ioc/state.json"
        }
      ]
    }
    
  6. 정책 이름을 EtProIocLambdaPolicy로 지정합니다.

  7. 정책 만들기를 클릭합니다.

  8. 역할 생성으로 돌아가 정책을 연결합니다.

  9. 역할 이름을 EtProIocLambdaRole으로 지정합니다.

  10. 역할 만들기를 클릭합니다.

Lambda 함수 만들기

  1. AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
  2. 처음부터 작성을 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    • 함수 이름: et-pro-ioc-fetcher
    • 런타임: Python 3.13
    • 아키텍처: x86_64
    • 실행 역할: 기존 역할 사용 EtProIocLambdaRole
  4. 생성 후 코드 탭으로 이동하여 다음으로 바꿉니다.

    #!/usr/bin/env python3
    # Lambda: Fetch ET Pro IOC reputation lists and write raw CSV to S3
    import os
    import time
    import json
    from datetime import datetime
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    # Environment variables
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "et-pro-ioc/").strip("/")
    ET_API_KEY = os.environ["ET_API_KEY"]
    ET_IP_LIST_URL = os.environ["ET_IP_LIST_URL"]
    ET_DOMAIN_LIST_URL = os.environ["ET_DOMAIN_LIST_URL"]
    STATE_KEY = os.environ.get("STATE_KEY", f"{PREFIX}/state.json")
    TIMEOUT = int(os.environ.get("TIMEOUT", "120"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        """Build request with ET API authentication"""
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed")
    
        req = Request(url, method="GET")
        # ET Intelligence uses Authorization header with API key
        req.add_header("Authorization", ET_API_KEY)
        return req
    
    def fetch_with_retry(url: str, max_retries: int = 3) -> bytes:
        """Fetch URL with retry logic for rate limits"""
        for attempt in range(max_retries):
            try:
                req = _build_request(url)
                with urlopen(req, timeout=TIMEOUT) as response:
                    if response.status == 200:
                        return response.read()
                    elif response.status == 429:
                        # Rate limited, wait and retry
                        wait_time = min(30 * (2 ** attempt), 300)
                        print(f"Rate limited, waiting {wait_time}s...")
                        time.sleep(wait_time)
                    else:
                        raise HTTPError(url, response.status, response.reason, {}, None)
            except URLError as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5 * (attempt + 1))
    
        raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
    
    def save_to_s3(key: str, content: bytes):
        """Save content to S3 with appropriate content type"""
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=content,
            ContentType="text/csv"
        )
        print(f"Saved {len(content)} bytes to s3://{BUCKET}/{key}")
    
    def get_state():
        """Get last fetch state from S3"""
        try:
            response = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read())
        except:
            return {}
    
    def save_state(state: dict):
        """Save fetch state to S3"""
        s3.put_object(
            Bucket=BUCKET,
            Key=STATE_KEY,
            Body=json.dumps(state, indent=2),
            ContentType="application/json"
        )
    
    def lambda_handler(event, context):
        """Main Lambda handler"""
        print("Starting ET Pro IOC fetch")
    
        # Generate timestamp for file naming
        now = datetime.utcnow()
        timestamp = now.strftime("%Y/%m/%d/%H%M%S")
    
        results = []
        errors = []
    
        # Fetch IP reputation list
        try:
            print(f"Fetching IP reputation list...")
            ip_data = fetch_with_retry(ET_IP_LIST_URL)
            ip_key = f"{PREFIX}/ip/{timestamp}.csv"
            save_to_s3(ip_key, ip_data)
            results.append({"type": "ip", "key": ip_key, "size": len(ip_data)})
        except Exception as e:
            error_msg = f"Failed to fetch IP list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Fetch Domain reputation list
        try:
            print(f"Fetching Domain reputation list...")
            domain_data = fetch_with_retry(ET_DOMAIN_LIST_URL)
            domain_key = f"{PREFIX}/domain/{timestamp}.csv"
            save_to_s3(domain_key, domain_data)
            results.append({"type": "domain", "key": domain_key, "size": len(domain_data)})
        except Exception as e:
            error_msg = f"Failed to fetch Domain list: {str(e)}"
            print(error_msg)
            errors.append(error_msg)
    
        # Save state
        state = {
            "last_fetch": now.isoformat(),
            "results": results,
            "errors": errors
        }
        save_state(state)
    
        return {
            "statusCode": 200 if not errors else 207,
            "body": json.dumps(state)
        }
    
  5. 구성 > 일반 구성으로 이동합니다.

  6. 수정을 클릭합니다.

  7. 제한 시간5분으로 설정합니다.

  8. 저장을 클릭합니다.

환경 변수 구성

  1. 구성 > 환경 변수로 이동합니다.
  2. 수정 > 환경 변수 추가를 클릭합니다.
  3. 다음 변수를 추가합니다.

    S3_BUCKET et-pro-ioc-bucket
    S3_PREFIX et-pro-ioc
    STATE_KEY et-pro-ioc/state.json
    ET_API_KEY [Your ET API Key]
    ET_IP_LIST_URL [Your detailed IP list URL]
    ET_DOMAIN_LIST_URL [Your detailed Domain list URL]
    TIMEOUT 120
  4. 저장을 클릭합니다.

구독의 정확한 URL은 Proofpoint 담당자에게 문의하세요. 자세한 형식 URL은 일반적으로 다음 패턴을 따릅니다. * IP 목록: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-iprepdata.txt * 도메인 목록: https://rules.emergingthreatspro.com/[your-code]/reputation/detailed-domainrepdata.txt

EventBridge 일정 만들기

  1. Amazon EventBridge > Schedules > Create schedule로 이동합니다.
  2. 일정 이름: et-pro-ioc-hourly
  3. 일정 패턴: 비율 기반 일정
  4. 속도 표현식: 1시간
  5. 다음을 클릭합니다.
  6. 타겟: Lambda 함수
  7. 함수: et-pro-ioc-fetcher
  8. 나머지 단계를 통해 다음을 클릭합니다.
  9. 일정 만들기를 클릭합니다.

Google SecOps에서 피드 구성

IP 평판용 피드와 도메인 평판용 피드 등 별도의 피드 두 개를 만들어야 합니다.

IP 평판 피드 만들기

  1. SIEM 설정> 피드로 이동합니다.
  2. 새로 추가를 클릭합니다.
  3. 피드 이름 필드에 ET Pro IOC - IP Reputation를 입력합니다.
  4. 소스 유형 목록에서 Amazon S3를 선택합니다.
  5. 로그 유형으로 Emerging Threats Pro를 선택합니다.
  6. 다음을 클릭합니다.
  7. 다음 입력 파라미터의 값을 지정합니다.
    • S3 URI: s3://et-pro-ioc-bucket/et-pro-ioc/ip/
    • 소스 삭제 옵션: 환경설정에 따라 선택
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
    • 액세스 키 ID: SecOps 읽기 권한 액세스 키
    • 보안 비밀 액세스 키: SecOps 리더 보안 비밀 키
    • 애셋 네임스페이스: 애셋 네임스페이스입니다.
    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
  8. 다음을 클릭합니다.
  9. 검토 후 제출을 클릭합니다.

도메인 평판 피드 만들기

  1. 피드 생성 프로세스를 반복합니다.
  2. 피드 이름 필드에 ET Pro IOC - Domain Reputation를 입력합니다.
  3. 소스 유형 목록에서 Amazon S3를 선택합니다.
  4. 로그 유형으로 Emerging Threats Pro를 선택합니다.
  5. 다음을 클릭합니다.
  6. 다음 입력 파라미터의 값을 지정합니다.
    • S3 URI: s3://et-pro-ioc-bucket/et-pro-ioc/domain/
    • 소스 삭제 옵션: 환경설정에 따라 선택
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
    • 액세스 키 ID: SecOps 읽기 권한 액세스 키
    • 보안 비밀 액세스 키: SecOps 리더 보안 비밀 키
    • 애셋 네임스페이스: 애셋 네임스페이스입니다.
    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
  7. 다음을 클릭합니다.
  8. 검토 후 제출을 클릭합니다.

UDM 매핑 테이블

로그 필드 UDM 매핑 논리
카테고리 이 필드는 파서 로직에서 사용되지만 UDM에 직접 매핑되지는 않습니다. 조회 테이블을 통해 event.ioc.categorization 값을 결정합니다.
collection_time.nanos event.idm.entity.metadata.collected_timestamp.nanos 원시 로그에서 직접 매핑됩니다.
collection_time.seconds event.idm.entity.metadata.collected_timestamp.seconds 원시 로그에서 직접 매핑됩니다.
데이터 이 필드는 콘텐츠에 따라 여러 UDM 필드로 파싱됩니다.
first_seen event.idm.entity.metadata.interval.start_time 날짜로 파싱되어 UDM에 매핑됩니다.
first_seen event.ioc.active_timerange.start 날짜로 파싱되어 UDM에 매핑됩니다.
ip_or_domain event.idm.entity.entity.hostname Grok 패턴이 필드에서 호스트를 추출하는 경우 UDM에 매핑됩니다.
ip_or_domain event.idm.entity.entity.ip grok 패턴이 필드에서 호스트를 추출하지 않는 경우 UDM에 매핑됩니다.
ip_or_domain event.ioc.domain_and_ports.domain Grok 패턴이 필드에서 호스트를 추출하는 경우 UDM에 매핑됩니다.
ip_or_domain event.ioc.ip_and_ports.ip_address grok 패턴이 필드에서 호스트를 추출하지 않는 경우 UDM에 매핑됩니다.
last_seen event.idm.entity.metadata.interval.end_time 날짜로 파싱되어 UDM에 매핑됩니다.
last_seen event.ioc.active_timerange.end 날짜로 파싱되어 UDM에 매핑됩니다.
ports event.idm.entity.entity.labels.value 포트가 여러 개인 경우 쉼표 구분자로 파싱되고 결합되어 UDM에 매핑됩니다.
ports event.idm.entity.entity.port 포트가 하나만 있는 경우 파싱되어 UDM에 매핑됩니다.
ports event.ioc.domain_and_ports.ports grok 패턴이 필드에서 호스트를 추출하는 경우 파싱하여 UDM에 매핑합니다.
ports event.ioc.ip_and_ports.ports grok 패턴이 필드에서 호스트를 추출하지 않으면 파싱되어 UDM에 매핑됩니다.
점수 event.ioc.confidence_score 원시 로그에서 직접 매핑됩니다.
event.idm.entity.entity.labels.key 포트가 여러 개인 경우 'ports'로 설정합니다.
event.idm.entity.metadata.entity_type 그록 패턴이 ip_or_domain 필드에서 호스트를 추출하는 경우 'DOMAIN_NAME'으로 설정하고, 그렇지 않으면 'IP_ADDRESS'로 설정합니다.
event.idm.entity.metadata.threat.category 'SOFTWARE_MALICIOUS'로 설정합니다.
event.idm.entity.metadata.threat.category_details 참고표를 사용하여 category 필드에서 파생됩니다.
event.idm.entity.metadata.threat.threat_name 'ET Intelligence Rep List'로 설정합니다.
event.idm.entity.metadata.vendor_name 'ET_PRO_IOC'로 설정합니다.
event.ioc.feed_name 'ET Intelligence Rep List'로 설정합니다.
event.ioc.raw_severity '악성'으로 설정합니다.
timestamp.nanos collection_time.nanos에서 복사됨
timestamp.seconds collection_time.seconds에서 복사됨

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.