Akamai Cloud Monitor 로그 수집

다음에서 지원:

이 문서에서는 Google Cloud Storage를 사용하여 Akamai Cloud Monitor (Load Balancer, Traffic Shaper, ADC) 로그를 Google Security Operations로 수집하는 방법을 설명합니다. Akamai는 JSON 이벤트를 HTTPS 엔드포인트로 푸시합니다. API Gateway + Cloud Functions 수신기는 이벤트를 GCS (JSONL, gz)에 씁니다. 파서는 JSON 로그를 UDM으로 변환합니다. JSON 페이로드에서 필드를 추출하고, 데이터 유형 변환을 실행하고, UDM 스키마와 일치하도록 필드 이름을 바꾸고, 맞춤 필드 및 URL 구성을 위한 특정 로직을 처리합니다. 또한 필드 존재 여부에 따라 오류 처리 및 조건부 로직이 통합되어 있습니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • Cloud Storage API가 사용 설정된 GCP 프로젝트
  • GCS 버킷을 만들고 관리할 수 있는 권한
  • GCS 버킷의 IAM 정책을 관리할 수 있는 권한
  • Cloud Functions, Pub/Sub 주제, API Gateway를 만들 수 있는 권한
  • Akamai Control Center 및 Property Manager에 대한 권한이 있는 액세스

Google Cloud Storage 버킷 만들기

  1. Google Cloud Console로 이동합니다.
  2. 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
  3. 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
  4. 버킷 만들기를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.

    설정
    버킷 이름 지정 전역적으로 고유한 이름 (예: akamai-cloud-monitor)을 입력합니다.
    위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전)
    위치 위치를 선택합니다 (예: us-central1).
    스토리지 클래스 Standard (자주 액세스하는 로그에 권장)
    액세스 제어 균일 (권장)
    보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정
  6. 만들기를 클릭합니다.

Akamai Cloud Monitor 구성 세부정보 수집

Akamai Control Center에서 다음 정보가 필요합니다.

  • 속성 관리자의 속성 이름
  • 수집해야 하는 필수 Cloud Monitor 데이터 세트
  • 웹훅 인증을 위한 선택적 공유 보안 비밀 토큰

Cloud 함수의 서비스 계정 만들기

Cloud 함수에는 GCS 버킷에 쓸 수 있는 권한이 있는 서비스 계정이 필요합니다.

서비스 계정 만들기

  1. GCP 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
  2. 서비스 계정 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 서비스 계정 이름: akamai-cloud-monitor-sa을 입력합니다.
    • 서비스 계정 설명: Service account for Cloud Function to collect Akamai Cloud Monitor logs을 입력합니다.
  4. 만들고 계속하기를 클릭합니다.
  5. 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 단계를 따르세요.
    1. 역할 선택을 클릭합니다.
    2. 스토리지 객체 관리자를 검색하여 선택합니다.
    3. + 다른 역할 추가를 클릭합니다.
    4. Cloud Run 호출자를 검색하여 선택합니다.
    5. + 다른 역할 추가를 클릭합니다.
    6. Cloud Functions 호출자를 검색하여 선택합니다.
  6. 계속을 클릭합니다.
  7. 완료를 클릭합니다.

이러한 역할은 다음 작업에 필요합니다.

  • 스토리지 객체 관리자: GCS 버킷에 로그를 쓰고 상태 파일을 관리합니다.
  • Cloud Run 호출자: Pub/Sub가 함수를 호출하도록 허용
  • Cloud Functions 호출자: 함수 호출 허용

GCS 버킷에 대한 IAM 권한 부여

GCS 버킷에 대한 쓰기 권한을 서비스 계정에 부여합니다.

  1. Cloud Storage> 버킷으로 이동합니다.
  2. 버킷 이름을 클릭합니다.
  3. 권한 탭으로 이동합니다.
  4. 액세스 권한 부여를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.
    • 주 구성원 추가: 서비스 계정 이메일 (예: akamai-cloud-monitor-sa@PROJECT_ID.iam.gserviceaccount.com)을 입력합니다.
    • 역할 할당: 스토리지 객체 관리자를 선택합니다.
  6. 저장을 클릭합니다.

Akamai 로그를 수신하는 Cloud 함수 만들기

Cloud 함수는 Akamai Cloud Monitor에서 HTTP POST 요청을 수신하고 GCS에 로그를 작성합니다.

  1. GCP Console에서 Cloud Functions로 이동합니다.
  2. 함수 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    설정
    환경 2세대를 선택합니다.
    함수 이름 akamai-cloud-monitor-receiver
    리전 GCS 버킷과 일치하는 리전을 선택합니다 (예: us-central1).
  4. 트리거 섹션에서 다음을 수행합니다.

    • 트리거 유형: HTTPS를 선택합니다.
    • 인증: 인증되지 않은 호출 허용을 선택합니다 (Akamai에서 인증되지 않은 요청을 전송함).
  5. 저장을 클릭하여 트리거 구성을 저장합니다.

  6. 런타임, 빌드, 연결, 보안 설정을 펼칩니다.

  7. 런타임 섹션에서 다음을 수행합니다.

    • 할당된 메모리: 512MiB를 선택합니다.
    • 제한 시간: 600초 (10분)를 입력합니다.
    • 런타임 서비스 계정: 서비스 계정 (akamai-cloud-monitor-sa)을 선택합니다.
  8. 런타임 환경 변수 섹션에서 다음 각 항목에 대해 + 변수 추가를 클릭합니다.

    변수 이름 예시 값
    GCS_BUCKET akamai-cloud-monitor
    GCS_PREFIX akamai/cloud-monitor/json
    INGEST_TOKEN random-shared-secret(선택사항)
  9. 다음을 클릭하여 코드 편집기로 이동합니다.

  10. 런타임 드롭다운에서 Python 3.12를 선택합니다.

함수 코드 추가

  1. 함수 진입점main을 입력합니다.
  2. 인라인 코드 편집기에서 다음 두 파일을 만듭니다.

    • 첫 번째 파일: main.py:
    import os
    import json
    import gzip
    import io
    import uuid
    import datetime as dt
    from google.cloud import storage
    import functions_framework
    
    GCS_BUCKET = os.environ.get("GCS_BUCKET")
    GCS_PREFIX = os.environ.get("GCS_PREFIX", "akamai/cloud-monitor/json").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret
    
    storage_client = storage.Client()
    
    def _write_jsonl_gz(objs: list) -> str:
        """Write JSON objects to GCS as gzipped JSONL."""
        timestamp = dt.datetime.utcnow()
        key = f"{timestamp:%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
    
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "\n").encode())
        buf.seek(0)
    
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(f"{GCS_PREFIX}{key}")
        blob.upload_from_file(buf, content_type="application/json", content_encoding="gzip")
    
        return f"gs://{GCS_BUCKET}/{GCS_PREFIX}{key}"
    
    def _parse_records_from_request(request) -> list:
        """Parse JSON records from HTTP request body."""
        body = request.get_data(as_text=True)
    
        if not body:
            return []
    
        try:
            data = json.loads(body)
        except Exception:
            # Accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
    
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    @functions_framework.http
    def main(request):
        """
        Cloud Function HTTP handler for Akamai Cloud Monitor logs.
    
        Args:
            request: Flask request object
    
        Returns:
            Tuple of (response_body, status_code, headers)
        """
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            token = request.args.get("token")
            if token != INGEST_TOKEN:
                return ("Forbidden", 403)
    
        records = _parse_records_from_request(request)
    
        if not records:
            return ("No content", 204)
    
        try:
            gcs_key = _write_jsonl_gz(records)
    
            response = {
                "ok": True,
                "gcs_key": gcs_key,
                "count": len(records)
            }
    
            return (json.dumps(response), 200, {"Content-Type": "application/json"})
    
        except Exception as e:
            print(f"Error writing to GCS: {str(e)}")
            return (f"Internal server error: {str(e)}", 500)
    
    • 두 번째 파일: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    
  3. 배포를 클릭하여 함수를 배포합니다.

  4. 배포가 완료될 때까지 기다립니다 (2~3분).

  5. 배포 후 트리거 탭으로 이동하여 트리거 URL을 복사합니다. 이 URL은 Akamai 구성에서 사용됩니다.

로그를 푸시하도록 Akamai Cloud Monitor 구성

  1. Akamai Control Center에 로그인합니다.
  2. 속성 관리자에서 속성을 엽니다.
  3. 규칙 추가> 클라우드 관리 선택을 클릭합니다.
  4. Cloud Monitor Instrumentation을 추가하고 필요한 데이터 세트를 선택합니다.
  5. Cloud Monitor Data Delivery 추가
  6. 다음 구성 세부정보를 제공합니다.

    • 전송 호스트 이름: Cloud 함수 트리거 URL의 호스트 이름을 입력합니다 (예: us-central1-your-project.cloudfunctions.net).
    • 전송 URL 경로: Cloud 함수 트리거 URL의 경로와 선택적 쿼리 토큰을 입력합니다.

      • 토큰 없음: /akamai-cloud-monitor-receiver
      • 토큰 사용: /akamai-cloud-monitor-receiver?token=<INGEST_TOKEN>

      • <INGEST_TOKEN>을 Cloud Functions 환경 변수에 설정한 값으로 바꿉니다.

  7. 저장을 클릭합니다.

  8. 활성화를 클릭하여 속성 버전을 활성화합니다.

Google SecOps 서비스 계정 가져오기

Google SecOps는 고유한 서비스 계정을 사용하여 GCS 버킷에서 데이터를 읽습니다. 이 서비스 계정에 버킷에 대한 액세스 권한을 부여해야 합니다.

서비스 계정 이메일 가져오기

  1. SIEM 설정> 피드로 이동합니다.
  2. 새 피드 추가를 클릭합니다.
  3. 단일 피드 구성을 클릭합니다.
  4. 피드 이름 필드에 피드 이름을 입력합니다(예: Akamai Cloud Monitor - GCS).
  5. 소스 유형으로 Google Cloud Storage V2를 선택합니다.
  6. 로그 유형으로 Akamai Cloud Monitor를 선택합니다.
  7. 서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다. 예를 들면 다음과 같습니다.

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 다음 단계에서 사용할 수 있도록 이 이메일 주소를 복사합니다.

Google SecOps 서비스 계정에 IAM 권한 부여

Google SecOps 서비스 계정에는 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.

  1. Cloud Storage> 버킷으로 이동합니다.
  2. 버킷 이름을 클릭합니다.
  3. 권한 탭으로 이동합니다.
  4. 액세스 권한 부여를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.
    • 주 구성원 추가: Google SecOps 서비스 계정 이메일을 붙여넣습니다.
    • 역할 할당: 스토리지 객체 뷰어를 선택합니다.
  6. 저장을 클릭합니다.

Akamai Cloud Monitor 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정> 피드로 이동합니다.
  2. 새 피드 추가를 클릭합니다.
  3. 단일 피드 구성을 클릭합니다.
  4. 피드 이름 필드에 피드 이름을 입력합니다(예: Akamai Cloud Monitor - GCS).
  5. 소스 유형으로 Google Cloud Storage V2를 선택합니다.
  6. 로그 유형으로 Akamai Cloud Monitor를 선택합니다.
  7. 다음을 클릭합니다.
  8. 다음 입력 매개변수의 값을 지정합니다.

    • 스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.

      gs://akamai-cloud-monitor/akamai/cloud-monitor/json/
      
      • 다음과 같이 바꿉니다.

        • akamai-cloud-monitor: GCS 버킷 이름입니다.
        • akamai/cloud-monitor/json: 로그가 저장되는 접두사 경로입니다 (Cloud 함수의 GCS_PREFIX와 일치해야 함).
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.

      • 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
      • 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
      • 전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.

    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.

    • 애셋 네임스페이스: akamai.cloud_monitor

    • 수집 라벨: 이 피드의 모든 이벤트에 라벨이 추가됩니다 (예: source=akamai_cloud_monitor, format=json).

  9. 다음을 클릭합니다.

  10. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

지원되는 Akamai Cloud Monitor 샘플 로그

  • JSON:

    {
      "UA": "-",
      "accLang": "-",
      "bytes": "3929",
      "cacheStatus": "1",
      "cliIP": "0.0.0.0",
      "cookie": "-",
      "cp": "848064",
      "customField": "-",
      "dnsLookupTimeMSec": "-",
      "errorCode": "-",
      "maxAgeSec": "31536000",
      "objSize": "3929",
      "overheadBytes": "240",
      "proto": "HTTPS",
      "queryStr": "-",
      "range": "-",
      "referer": "-",
      "reqEndTimeMSec": "4",
      "reqHost": "www.example.com",
      "reqId": "1ce83c03",
      "reqMethod": "GET",
      "reqPath": "assets/images/placeholder-tagline.png",
      "reqPort": "443",
      "reqTimeSec": "1622470405.760",
      "rspContentLen": "3929",
      "rspContentType": "image/png",
      "statusCode": "200",
      "tlsOverheadTimeMSec": "0",
      "tlsVersion": "TLSv1.2",
      "totalBytes": "4599",
      "transferTimeMSec": "0",
      "turnAroundTimeMSec": "0",
      "uncompressedSize": "-",
      "version": "1",
      "xForwardedFor": "-"
    }
    

UDM 매핑 테이블

로그 필드 UDM 매핑 로직
accLang network.http.user_agent '-' 또는 빈 문자열이 아닌 경우 직접 매핑됩니다.
city principal.location.city '-' 또는 빈 문자열이 아닌 경우 직접 매핑됩니다.
cliIP principal.ip 빈 문자열이 아닌 경우 직접 매핑됩니다.
국가 principal.location.country_or_region '-' 또는 빈 문자열이 아닌 경우 직접 매핑됩니다.
cp additional.fields 키가 'cp'인 키-값 쌍으로 매핑됩니다.
customField about.ip, about.labels, src.ip 키-값 쌍으로 파싱됩니다. 각각 src.ip 및 about.ip에 매핑하기 위한 'eIp' 및 'pIp'의 특별 처리 다른 키는 about 내에서 라벨로 매핑됩니다.
errorCode security_result.summary, security_result.severity 값이 있으면 security_result.severity를 'ERROR'로 설정하고 값을 security_result.summary에 매핑합니다.
geo.city principal.location.city 도시가 '-'이거나 빈 문자열인 경우 직접 매핑됩니다.
geo.country principal.location.country_or_region 국가가 '-'이거나 빈 문자열인 경우 직접 매핑됩니다.
geo.lat principal.location.region_latitude 직접 매핑되고 float로 변환됩니다.
geo.long principal.location.region_longitude 직접 매핑되고 float로 변환됩니다.
geo.region principal.location.state 직접 매핑됩니다.
id metadata.product_log_id 빈 문자열이 아닌 경우 직접 매핑됩니다.
message.cliIP principal.ip cliIP가 빈 문자열인 경우 직접 매핑됩니다.
message.fwdHost principal.hostname 직접 매핑됩니다.
message.reqHost target.hostname, target.url target.url을 구성하고 target.hostname을 추출하는 데 사용됩니다.
message.reqLen network.sent_bytes 직접 매핑되며 totalBytes가 비어 있거나 '-'인 경우 부호 없는 정수로 변환됩니다.
message.reqMethod network.http.method reqMethod가 빈 문자열인 경우 직접 매핑됩니다.
message.reqPath target.url target.url에 추가됩니다.
message.reqPort target.port 직접 매핑되며 reqPort가 빈 문자열인 경우 정수로 변환됩니다.
message.respLen network.received_bytes 직접 매핑되며 부호 없는 정수로 변환됩니다.
message.sslVer network.tls.version 직접 매핑됩니다.
message.status network.http.response_code 직접 매핑되며 statusCode가 비어 있거나 '-'인 경우 정수로 변환됩니다.
message.UA network.http.user_agent UA가 '-' 또는 빈 문자열인 경우 직접 매핑됩니다.
network.asnum additional.fields 키가 'asnum'인 키-값 쌍으로 매핑됩니다.
network.edgeIP intermediary.ip 직접 매핑됩니다.
network.network additional.fields 키가 'network'인 키-값 쌍으로 매핑됩니다.
network.networkType additional.fields 'networkType' 키가 있는 키-값 쌍으로 매핑됩니다.
proto network.application_protocol network.application_protocol을 확인하는 데 사용됩니다.
queryStr target.url '-' 또는 빈 문자열이 아닌 경우 target.url에 추가됩니다.
리퍼러 network.http.referral_url, about.hostname '-'가 아닌 경우 직접 매핑됩니다. 추출된 호스트 이름은 about.hostname에 매핑됩니다.
reqHost target.hostname, target.url target.url을 구성하고 target.hostname을 추출하는 데 사용됩니다.
reqId metadata.product_log_id, network.session_id ID가 빈 문자열인 경우 직접 매핑됩니다. network.session_id에도 매핑됩니다.
reqMethod network.http.method 빈 문자열이 아닌 경우 직접 매핑됩니다.
reqPath target.url '-'가 아닌 경우 target.url에 추가됩니다.
reqPort target.port 직접 매핑되고 정수로 변환됩니다.
reqTimeSec metadata.event_timestamp, timestamp 이벤트 타임스탬프를 설정하는 데 사용됩니다.
start metadata.event_timestamp, timestamp reqTimeSec이 빈 문자열인 경우 이벤트 타임스탬프를 설정하는 데 사용됩니다.
statusCode network.http.response_code 직접 매핑되며, '-' 또는 빈 문자열이 아닌 경우 정수로 변환됩니다.
tlsVersion network.tls.version 직접 매핑됩니다.
totalBytes network.sent_bytes 직접 매핑되며, 비어 있지 않거나 '-'인 경우 부호 없는 정수로 변환됩니다.
유형 metadata.product_event_type 직접 매핑됩니다.
UA network.http.user_agent '-' 또는 빈 문자열이 아닌 경우 직접 매핑됩니다.
version metadata.product_version 직접 매핑됩니다.
xForwardedFor principal.ip '-' 또는 빈 문자열이 아닌 경우 직접 매핑됩니다.
(파서 로직) metadata.vendor_name 'Akamai'로 설정합니다.
(파서 로직) metadata.product_name 'Cloud Monitor'로 설정합니다.
(파서 로직) metadata.event_type 'NETWORK_HTTP'로 설정합니다.
(파서 로직) metadata.product_version 버전이 빈 문자열인 경우 '2'로 설정됩니다.
(파서 로직) metadata.log_type 'AKAMAI_CLOUD_MONITOR'로 설정합니다.
(파서 로직) network.application_protocol 프로토 또는 message.proto에서 결정됩니다. 둘 중 하나에 'HTTPS' (대소문자 구분 안 함)가 포함된 경우 'HTTPS'로 설정하고, 그렇지 않은 경우 'HTTP'로 설정합니다.
(파서 로직) security_result.severity errorCode가 '-' 또는 빈 문자열인 경우 'INFORMATIONAL'로 설정됩니다.
(파서 로직) target.url 프로토콜, reqHost (또는 message.reqHost), reqPath (또는 message.reqPath), queryStr에서 구성됩니다.

도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.