Akamai SIEM 커넥터 로그 수집

다음에서 지원:

이 문서에서는 Amazon S3를 사용하여 Akamai SIEM 커넥터 로그를 Google Security Operations로 수집하는 방법을 설명합니다. Akamai SIEM 통합은 SIEM 통합 API를 통해 Akamai 플랫폼의 보안 이벤트를 JSON 형식으로 제공합니다. 이 통합은 AWS Lambda를 사용하여 Akamai API에서 이벤트를 주기적으로 가져와 Google SecOps에서 수집하는 S3에 저장합니다.

시작하기 전에

  • Google SecOps 인스턴스
  • SIEM 관리 사용자 역할이 있는 Akamai Control Center에 대한 권한 있는 액세스
  • SIEM API 서비스가 사용 설정된 Akamai API 사용자 인증 정보 (읽기-쓰기 액세스 수준)
  • AWS (S3, IAM, Lambda, EventBridge)에 대한 액세스 권한

Akamai Control Center에서 SIEM 통합 사용 설정

  1. Akamai Control Center에 로그인합니다.
  2. 웹 및 데이터 센터 보안 > 보안 구성으로 이동합니다.
  3. SIEM 데이터를 수집할 보안 구성 (및 적절한 버전)을 엽니다.
  4. 고급 설정을 클릭하고 SIEM 통합을 위한 데이터 수집을 펼칩니다.
  5. 사용을 클릭하여 SIEM을 사용 설정합니다.
  6. 데이터를 내보낼 보안 정책을 선택합니다.

    • 모든 보안 정책: 보안 구성 내에서 일부 또는 모든 보안 정책을 위반하는 이벤트의 SIEM 데이터를 전송합니다.
    • 특정 보안 정책: 드롭다운 목록에서 특정 보안 정책을 하나 이상 선택합니다.
  7. 선택사항: 계정 보호 도구를 사용하고 암호화되지 않은 사용자 이름을 포함하려면 사용자 이름 포함 체크박스를 사용 설정합니다.

  8. 선택사항: 특정 보호 유형 및 작업에 속하는 이벤트를 제외하려면 예외 추가를 클릭하고 SIEM에서 수집하지 않을 보호 및 관련 작업을 선택합니다.

  9. 저장을 클릭합니다.

  10. SIEM 통합 섹션에서 보안 구성 ID (configId)를 복사하여 저장합니다. Lambda 구성에 필요합니다.

SIEM 통합을 위한 Akamai API 사용자 인증 정보 만들기

  1. Akamai Control Center에 로그인합니다.
  2. ACCOUNT ADMIN(계정 관리자) > Identity & access(ID 및 액세스) > API clients(API 클라이언트)로 이동합니다.
  3. API 클라이언트 만들기를 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.

    • API 클라이언트 이름: 설명이 포함된 이름 (예: Google SecOps Poller)을 입력합니다.
    • API 서비스: SIEM을 선택하고 액세스 수준을 READ-WRITE로 설정합니다.
  5. API 클라이언트 만들기를 클릭합니다.

  6. 다음 사용자 인증 정보를 복사하여 안전하게 저장합니다.

    • 클라이언트 토큰
    • 클라이언트 보안 비밀번호
    • Access Token(액세스 토큰)
    • 호스트 (예: example.luna.akamaiapis.net)

Google SecOps용 AWS S3 버킷 및 IAM 구성

  1. 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 참조할 수 있도록 버킷 이름리전을 저장합니다(예: akamai-siem-logs).
  3. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  4. 생성된 사용자를 선택합니다.
  5. 보안용 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. .csv 파일 다운로드를 클릭하여 나중에 참고할 수 있도록 액세스 키보안 비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. 정책 직접 연결을 선택합니다.
  17. AmazonS3FullAccess 정책을 검색합니다.
  18. 정책을 선택합니다.
  19. 다음을 클릭합니다.
  20. 권한 추가를 클릭합니다.

S3 업로드용 IAM 정책 및 역할 구성

  1. AWS 콘솔에서 IAM > 정책 > 정책 생성 > JSON 탭으로 이동합니다.
  2. 다음 정책을 복사하여 붙여넣고 akamai-siem-logs을 버킷 이름으로 바꿉니다.

    정책 JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::akamai-siem-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::akamai-siem-logs/akamai-siem/state.json"
        }
      ]
    }
    
  3. 다음을 클릭합니다.

  4. 정책 이름 AkamaiSIEMtoS3Policy을 입력하고 정책 만들기를 클릭합니다.

  5. IAM > 역할 > 역할 만들기로 이동합니다.

  6. AWS 서비스를 선택합니다.

  7. 사용 사례로 Lambda를 선택합니다.

  8. 다음을 클릭합니다.

  9. 방금 만든 정책 AkamaiSIEMtoS3Policy를 검색하여 선택합니다.

  10. 다음을 클릭합니다.

  11. 역할 이름 AkamaiSIEMtoS3Role을 입력하고 역할 만들기를 클릭합니다.

Lambda 함수 만들기

  1. AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
  2. 처음부터 작성을 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    설정
    이름 AkamaiSIEMtoS3Function
    런타임 Python 3.13
    아키텍처 x86_64
    실행 역할 기존 역할 사용
    기존 역할 AkamaiSIEMtoS3Role
  4. 함수 만들기를 클릭합니다.

  5. 함수가 생성되면 코드 탭을 열고 스텁을 삭제한 후 다음 코드를 붙여넣습니다.

    import json
    import boto3
    import os
    import urllib3
    import hmac
    import hashlib
    import base64
    from datetime import datetime
    from urllib.parse import urlparse, urljoin
    
    # Configuration from environment variables
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ.get('S3_PREFIX', 'akamai-siem/')
    STATE_KEY = os.environ.get('STATE_KEY', 'akamai-siem/state.json')
    
    AKAMAI_HOST = os.environ['AKAMAI_HOST']
    AKAMAI_CLIENT_TOKEN = os.environ['AKAMAI_CLIENT_TOKEN']
    AKAMAI_CLIENT_SECRET = os.environ['AKAMAI_CLIENT_SECRET']
    AKAMAI_ACCESS_TOKEN = os.environ['AKAMAI_ACCESS_TOKEN']
    AKAMAI_CONFIG_IDS = os.environ['AKAMAI_CONFIG_IDS'].split(',')
    
    LIMIT = int(os.environ.get('LIMIT', '10000'))
    
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def load_state():
        """Load offset state from S3"""
        try:
            response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(response['Body'].read().decode('utf-8'))
        except s3_client.exceptions.NoSuchKey:
            return {}
        except Exception as e:
            print(f"Error loading state: {e}")
            return {}
    
    def save_state(state):
        """Save offset state to S3"""
        try:
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state, indent=2).encode('utf-8'),
                ContentType='application/json'
            )
        except Exception as e:
            print(f"Error saving state: {e}")
    
    def make_edgegrid_auth_header(url, method='GET'):
        """Create EdgeGrid authentication header"""
        timestamp = datetime.utcnow().strftime('%Y%m%dT%H:%M:%S+0000')
        nonce = base64.b64encode(os.urandom(16)).decode('utf-8')
    
        parsed_url = urlparse(url)
        relative_url = parsed_url.path
        if parsed_url.query:
            relative_url += '?' + parsed_url.query
    
        auth_header = f'EG1-HMAC-SHA256 ' \
                    f'client_token={AKAMAI_CLIENT_TOKEN};' \
                    f'access_token={AKAMAI_ACCESS_TOKEN};' \
                    f'timestamp={timestamp};' \
                    f'nonce={nonce};'
    
        data_to_sign = '\t'.join([
            method,
            parsed_url.scheme,
            parsed_url.netloc,
            relative_url,
            '',  # Request body for GET
            '',  # No additional headers
        ])
    
        signing_key = hmac.new(
            AKAMAI_CLIENT_SECRET.encode('utf-8'),
            timestamp.encode('utf-8'),
            hashlib.sha256
        ).digest()
    
        auth_signature = base64.b64encode(
            hmac.new(
                signing_key,
                (data_to_sign + auth_header).encode('utf-8'),
                hashlib.sha256
            ).digest()
        ).decode('utf-8')
    
        return auth_header + f'signature={auth_signature}'
    
    def fetch_akamai_events(config_id, offset=None):
        """Fetch events from Akamai SIEM API"""
        base_url = f'https://{AKAMAI_HOST}'
        endpoint = f'/siem/v1/configs/{config_id}'
    
        params = f'limit={LIMIT}'
        if offset:
            params += f'&offset={offset}'
    
        url = f'{base_url}{endpoint}?{params}'
    
        try:
            headers = {
                'Authorization': make_edgegrid_auth_header(url)
            }
    
            response = http.request('GET', url, headers=headers, timeout=120)
    
            if response.status != 200:
                print(f"Error response {response.status}: {response.data.decode('utf-8')}")
                return [], offset
    
            # Parse multi-JSON response (newline-delimited JSON)
            lines = response.data.decode('utf-8').strip().split('\n')
            events = []
            new_offset = offset
    
            for line in lines:
                if not line.strip():
                    continue
                try:
                    obj = json.loads(line)
    
                    # Check if this is offset context (metadata object with offset)
                    if 'offset' in obj and ('total' in obj or 'responseContext' in obj):
                        new_offset = obj.get('offset')
                        continue
    
                    # This is an event
                    events.append(obj)
                except json.JSONDecodeError as e:
                    print(f"Warning: Failed to parse line: {e}")
                    continue
    
            return events, new_offset
    
        except Exception as e:
            print(f"Error fetching events for config {config_id}: {e}")
            return [], offset
    
    def lambda_handler(event, context):
        """Lambda handler - fetches Akamai events and writes to S3"""
        print(f"Starting Akamai SIEM fetch at {datetime.utcnow().isoformat()}Z")
    
        state = load_state()
        total_events = 0
    
        for config_id in AKAMAI_CONFIG_IDS:
            config_id = config_id.strip()
            if not config_id:
                continue
    
            print(f"Fetching events for config: {config_id}")
    
            current_offset = state.get(config_id)
            events, new_offset = fetch_akamai_events(config_id, current_offset)
    
            if events:
                print(f"Fetched {len(events)} events for config {config_id}")
    
                # Write events to S3 as newline-delimited JSON
                timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f'{S3_PREFIX}{config_id}/{timestamp}.json'
    
                payload = '\n'.join(json.dumps(event) for event in events)
    
                try:
                    s3_client.put_object(
                        Bucket=S3_BUCKET,
                        Key=s3_key,
                        Body=payload.encode('utf-8'),
                        ContentType='application/json'
                    )
                    print(f"Wrote {len(events)} events to s3://{S3_BUCKET}/{s3_key}")
    
                    # Update offset only after successful write
                    if new_offset:
                        state[config_id] = new_offset
                        total_events += len(events)
                except Exception as e:
                    print(f"Error writing to S3: {e}")
            else:
                print(f"No new events for config {config_id}")
    
        # Save updated state
        save_state(state)
    
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Successfully processed {total_events} events',
                'configs_processed': len(AKAMAI_CONFIG_IDS)
            })
        }
    
  6. 배포를 클릭하여 코드를 저장합니다.

  7. 구성 > 환경 변수로 이동합니다.

  8. 수정을 클릭합니다.

  9. 다음 각 항목에 대해 환경 변수 추가를 클릭합니다.

    환경 변수

    예시 값
    S3_BUCKET akamai-siem-logs
    S3_PREFIX akamai-siem/
    STATE_KEY akamai-siem/state.json
    AKAMAI_HOST example.luna.akamaiapis.net
    AKAMAI_CLIENT_TOKEN your-client-token
    AKAMAI_CLIENT_SECRET your-client-secret
    AKAMAI_ACCESS_TOKEN your-access-token
    AKAMAI_CONFIG_IDS 12345,67890
    LIMIT 10000
  10. 저장을 클릭합니다.

  11. 구성 > 일반 구성으로 이동합니다.

  12. 수정을 클릭합니다.

  13. 제한 시간5분 (300초)으로 변경합니다.

  14. 저장을 클릭합니다.

EventBridge 일정 만들기

  1. Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
  2. 다음 구성 세부정보를 제공합니다.

    • 일정 이름: AkamaiSIEMtoS3-5min를 입력합니다.
    • 일정 패턴: 반복 일정을 선택합니다.
    • 일정 유형: 요율 기반 일정을 선택합니다.
    • 요금 표현식: 5을 입력하고 을 선택합니다.
  3. 다음을 클릭합니다.

  4. 다음 구성 세부정보를 제공합니다.

    • 타겟: AWS Lambda 호출을 선택합니다.
    • Lambda 함수: AkamaiSIEMtoS3Function를 선택합니다.
  5. 다음을 클릭합니다.

  6. 다음을 클릭합니다 (선택적 설정은 건너뜁니다).

  7. 검토한 후 일정 만들기를 클릭합니다.

Akamai SIEM 커넥터 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정> 피드로 이동합니다.
  2. + 새 피드 추가를 클릭합니다.
  3. 피드 이름 필드에 피드 이름을 입력합니다(예: Akamai SIEM Connector).
  4. 소스 유형으로 Amazon S3 V2를 선택합니다.
  5. 로그 유형으로 Akamai SIEM 커넥터를 선택합니다.
  6. 다음을 클릭합니다.
  7. 다음 입력 매개변수의 값을 지정합니다.

    • S3 URI: s3://akamai-siem-logs/akamai-siem/
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
    • 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키
    • 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
    • 애셋 네임스페이스: 애셋 네임스페이스입니다.
    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
  8. 다음을 클릭합니다.

  9. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.