Harness IO 감사 로그 수집

다음에서 지원:

이 문서에서는 Amazon S3를 사용하여 Harness IO 감사 로그를 Google Security Operations로 수집하는 방법을 설명합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • 다음 권한이 있는 Harness에 대한 액세스 권한 관리
    • API 키 만들기
    • 감사 로그 액세스
    • 계정 설정 보기
  • AWS (S3, IAM, Lambda, EventBridge)에 대한 액세스 권한

Harness API 사용자 인증 정보 수집

Harness에서 API 키 만들기

  1. Harness Platform에 로그인합니다.
  2. 사용자 프로필을 클릭합니다.
  3. 내 API 키로 이동합니다.
  4. + API 키를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.
    • 이름: 설명이 포함된 이름을 입력합니다 (예: Google SecOps Integration).
    • 설명: 선택사항인 설명입니다.
  6. 저장을 클릭합니다.
  7. + 토큰을 클릭하여 새 토큰을 만듭니다.
  8. 다음 구성 세부정보를 제공합니다.
    • 이름: Chronicle Feed Token를 입력합니다.
    • 만료 설정: 적절한 만료 시간 또는 만료 없음 (프로덕션용)을 선택합니다.
  9. 토큰 생성을 클릭합니다.
  10. 토큰 값을 복사하여 안전하게 저장합니다. 이 토큰은 x-api-key 헤더 값으로 사용됩니다.

Harness 계정 ID 가져오기

  1. Harness Platform에서 URL의 Account ID를 확인합니다.
    • URL 예: https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/...
    • YOUR_ACCOUNT_ID 부분은 계정 식별자입니다.
  2. 또는 계정 설정 > 개요로 이동하여 계정 식별자를 확인합니다.
  3. Lambda 함수에서 사용할 계정 ID를 복사하여 저장합니다.

Google SecOps용 AWS S3 버킷 및 IAM 구성

  1. 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 참조할 수 있도록 버킷 이름리전을 저장합니다(예: harness-io-logs).
  3. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  4. 생성된 사용자를 선택합니다.
  5. 보안용 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키보안 비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. 정책 직접 연결을 선택합니다.
  17. AmazonS3FullAccess 정책을 검색합니다.
  18. 정책을 선택합니다.
  19. 다음을 클릭합니다.
  20. 권한 추가를 클릭합니다.

Lambda S3 업로드의 IAM 정책 및 역할 구성

  1. AWS 콘솔에서 IAM > 정책 > 정책 만들기 > JSON 탭으로 이동합니다.
  2. 다음 정책을 복사하여 붙여넣습니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutHarnessObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/state.json"
        }
      ]
    }
    
    • 다른 버킷 이름을 입력한 경우 harness-io-logs을 바꿉니다.
  3. 다음을 클릭합니다.

  4. 정책 이름을 HarnessToS3Policy로 지정하고 정책 만들기를 클릭합니다.

  5. IAM > 역할 > 역할 만들기로 이동합니다.

  6. 신뢰할 수 있는 항목 유형으로 AWS 서비스를 선택합니다.

  7. 사용 사례로 Lambda를 선택합니다.

  8. 다음을 클릭합니다.

  9. 다음 정책을 검색하여 선택합니다.

    • HarnessToS3Policy (방금 만든 정책)
    • AWSLambdaBasicExecutionRole (CloudWatch Logs의 경우)
  10. 다음을 클릭합니다.

  11. 역할 이름을 HarnessAuditLambdaRole로 지정하고 역할 만들기를 클릭합니다.

Lambda 함수 만들기

  1. AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
  2. 처음부터 작성을 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    설정
    이름 harness-audit-to-s3
    런타임 Python 3.13
    아키텍처 x86_64
    실행 역할 HarnessAuditLambdaRole
  4. 함수 만들기를 클릭합니다.

  5. 함수가 생성되면 코드 탭을 엽니다.

  6. 기본 스텁 코드를 삭제하고 다음 Lambda 함수 코드를 입력합니다.

    • Lambda 함수 코드 (harness_audit_to_s3.py)

      #!/usr/bin/env python3
      """
      Harness.io Audit Logs to S3 Lambda
      Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion.
      """
      import os
      import json
      import time
      import uuid
      import logging
      import urllib.parse
      from datetime import datetime, timedelta, timezone
      from urllib.request import Request, urlopen
      from urllib.error import HTTPError, URLError
      import boto3
      
      # Configuration from Environment Variables
      API_BASE = os.environ.get("HARNESS_API_BASE", "https://app.harness.io").rstrip("/")
      ACCOUNT_ID = os.environ["HARNESS_ACCOUNT_ID"]
      API_KEY = os.environ["HARNESS_API_KEY"]
      BUCKET = os.environ["S3_BUCKET"]
      PREFIX = os.environ.get("S3_PREFIX", "harness/audit").strip("/")
      STATE_KEY = os.environ.get("STATE_KEY", "harness/audit/state.json")
      PAGE_SIZE = min(int(os.environ.get("PAGE_SIZE", "50")), 100)
      START_MINUTES_BACK = int(os.environ.get("START_MINUTES_BACK", "60"))
      
      # Optional filters (NEW)
      FILTER_MODULES = os.environ.get("FILTER_MODULES", "").split(",") if os.environ.get("FILTER_MODULES") else None
      FILTER_ACTIONS = os.environ.get("FILTER_ACTIONS", "").split(",") if os.environ.get("FILTER_ACTIONS") else None
      STATIC_FILTER = os.environ.get("STATIC_FILTER")  # e.g., "EXCLUDE_LOGIN_EVENTS"
      MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
      
      # AWS clients
      s3 = boto3.client("s3")
      
      # HTTP headers for Harness API
      HDRS = {
          "x-api-key": API_KEY,
          "Content-Type": "application/json",
          "Accept": "application/json",
      }
      
      # Logging configuration
      logger = logging.getLogger()
      logger.setLevel(logging.INFO)
      
      # ============================================
      # State Management Functions
      # ============================================
      def _read_state():
          """Read checkpoint state from S3."""
          try:
              obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
              state = json.loads(obj["Body"].read())
      
              since_ms = state.get("since")
              page_token = state.get("pageToken")
      
              logger.info(f"State loaded: since={since_ms}, pageToken={page_token}")
              return since_ms, page_token
      
          except s3.exceptions.NoSuchKey:
              logger.info("No state file found, starting fresh collection")
              start_time = datetime.now(timezone.utc) - timedelta(minutes=START_MINUTES_BACK)
              since_ms = int(start_time.timestamp() * 1000)
              logger.info(f"Initial since timestamp: {since_ms} ({start_time.isoformat()})")
              return since_ms, None
      
          except Exception as e:
              logger.error(f"Error reading state: {e}")
              raise
      
      def _write_state(since_ms: int, page_token: str = None):
          """Write checkpoint state to S3."""
          state = {
              "since": since_ms,
              "pageToken": page_token,
              "lastRun": int(time.time() * 1000),
              "lastRunISO": datetime.now(timezone.utc).isoformat()
          }
      
          try:
              s3.put_object(
                  Bucket=BUCKET,
                  Key=STATE_KEY,
                  Body=json.dumps(state, indent=2).encode(),
                  ContentType="application/json"
              )
              logger.info(f"State saved: since={since_ms}, pageToken={page_token}")
          except Exception as e:
              logger.error(f"Error writing state: {e}")
              raise
      
      # ============================================
      # Harness API Functions
      # ============================================
      def _fetch_harness_audits(since_ms: int, page_token: str = None, retry_count: int = 0):
          """
          Fetch audit logs from Harness API with retry logic.
      
          API Endpoint: POST /audit/api/audits/listV2
          Documentation: https://apidocs.harness.io/audit/getauditeventlistv2
          """
          try:
              # Build URL with query parameters
              url = (
                  f"{API_BASE}/audit/api/audits/listV2"
                  f"?accountIdentifier={urllib.parse.quote(ACCOUNT_ID)}"
                  f"&pageSize={PAGE_SIZE}"
              )
      
              if page_token:
                  url += f"&pageToken={urllib.parse.quote(page_token)}"
      
              logger.info(f"Fetching from: {url[:100]}...")
      
              # Build request body with time filter and optional filters
              body_data = {
                  "startTime": since_ms,
                  "endTime": int(time.time() * 1000),
                  "filterType": "Audit" 
              }
      
              if FILTER_MODULES:
                  body_data["modules"] = [m.strip() for m in FILTER_MODULES if m.strip()]
                  logger.info(f"Applying module filter: {body_data['modules']}")
      
              if FILTER_ACTIONS:
                  body_data["actions"] = [a.strip() for a in FILTER_ACTIONS if a.strip()]
                  logger.info(f"Applying action filter: {body_data['actions']}")
      
              if STATIC_FILTER:
                  body_data["staticFilter"] = STATIC_FILTER
                  logger.info(f"Applying static filter: {STATIC_FILTER}")
      
              logger.debug(f"Request body: {json.dumps(body_data)}")
      
              # Make POST request
              req = Request(
                  url,
                  data=json.dumps(body_data).encode('utf-8'),
                  headers=HDRS,
                  method="POST"
              )
      
              resp = urlopen(req, timeout=30)
              resp_text = resp.read().decode('utf-8')
              resp_data = json.loads(resp_text)
      
              if "status" not in resp_data:
                  logger.warning(f"Response missing 'status' field: {resp_text[:200]}")
      
              # Check response status
              if resp_data.get("status") != "SUCCESS":
                  error_msg = resp_data.get("message", "Unknown error")
                  raise Exception(f"API returned status: {resp_data.get('status')} - {error_msg}")
      
              # Extract data from response structure
              data_obj = resp_data.get("data", {})
      
              if not data_obj:
                  logger.warning("Response 'data' object is empty or missing")
      
              events = data_obj.get("content", [])
              has_next = data_obj.get("hasNext", False)
              next_token = data_obj.get("pageToken")
      
              logger.info(f"API response: {len(events)} events, hasNext={has_next}, pageToken={next_token}")
      
              if not events and data_obj:
                  logger.info(f"Empty events but data present. Data keys: {list(data_obj.keys())}")
      
              return {
                  "events": events,
                  "hasNext": has_next,
                  "pageToken": next_token
              }
      
          except HTTPError as e:
              error_body = e.read().decode() if hasattr(e, 'read') else ''
      
              if e.code == 401:
                  logger.error("Authentication failed: Invalid API key")
                  raise Exception("Invalid Harness API key. Check HARNESS_API_KEY environment variable.")
      
              elif e.code == 403:
                  logger.error("Authorization failed: Insufficient permissions")
                  raise Exception("API key lacks required audit:read permissions")
      
              elif e.code == 429:
                  retry_after = int(e.headers.get("Retry-After", "60"))
                  logger.warning(f"Rate limit exceeded. Retry after {retry_after} seconds (attempt {retry_count + 1}/{MAX_RETRIES})")
      
                  if retry_count < MAX_RETRIES:
                      logger.info(f"Waiting {retry_after} seconds before retry...")
                      time.sleep(retry_after)
                      logger.info(f"Retrying request (attempt {retry_count + 2}/{MAX_RETRIES})")
                      return _fetch_harness_audits(since_ms, page_token, retry_count + 1)
                  else:
                      raise Exception(f"Max retries ({MAX_RETRIES}) exceeded for rate limiting")
      
              elif e.code == 400:
                  logger.error(f"Bad request: {error_body}")
                  raise Exception(f"Invalid request parameters: {error_body}")
      
              else:
                  logger.error(f"HTTP {e.code}: {e.reason} - {error_body}")
                  raise Exception(f"Harness API error {e.code}: {e.reason}")
      
          except URLError as e:
              logger.error(f"Network error: {e.reason}")
              raise Exception(f"Failed to connect to Harness API: {e.reason}")
      
          except json.JSONDecodeError as e:
              logger.error(f"Invalid JSON response: {e}")
              logger.error(f"Response text (first 500 chars): {resp_text[:500] if 'resp_text' in locals() else 'N/A'}")
              raise Exception("Harness API returned invalid JSON")
      
          except Exception as e:
              logger.error(f"Unexpected error in _fetch_harness_audits: {e}", exc_info=True)
              raise
      
      # ============================================
      # S3 Upload Functions
      # ============================================
      def _upload_to_s3(events: list) -> str:
          """
          Upload audit events to S3 in JSONL format.
          Each line is a complete JSON object (one event per line).
          """
          if not events:
              logger.info("No events to upload")
              return None
      
          try:
              # Create JSONL content (one JSON object per line)
              jsonl_lines = [json.dumps(event) for event in events]
              jsonl_content = "\n".join(jsonl_lines)
      
              # Generate S3 key with timestamp and UUID
              timestamp = datetime.now(timezone.utc)
              key = (
                  f"{PREFIX}/"
                  f"{timestamp:%Y/%m/%d}/"
                  f"harness-audit-{timestamp:%Y%m%d-%H%M%S}-{uuid.uuid4()}.jsonl"
              )
      
              # Upload to S3
              s3.put_object(
                  Bucket=BUCKET,
                  Key=key,
                  Body=jsonl_content.encode('utf-8'),
                  ContentType="application/x-ndjson",
                  Metadata={
                      "event-count": str(len(events)),
                      "source": "harness-audit-lambda",
                      "collection-time": timestamp.isoformat()
                  }
              )
      
              logger.info(f"Uploaded {len(events)} events to s3://{BUCKET}/{key}")
              return key
      
          except Exception as e:
              logger.error(f"Error uploading to S3: {e}", exc_info=True)
              raise
      
      # ============================================
      # Main Orchestration Function
      # ============================================
      def fetch_and_store():
          """
          Main function to fetch audit logs from Harness and store in S3.
          Handles pagination and state management.
          """
          logger.info("=== Harness Audit Collection Started ===")
          logger.info(f"Configuration: API_BASE={API_BASE}, ACCOUNT_ID={ACCOUNT_ID[:8]}..., PAGE_SIZE={PAGE_SIZE}")
      
          if FILTER_MODULES:
              logger.info(f"Module filter enabled: {FILTER_MODULES}")
          if FILTER_ACTIONS:
              logger.info(f"Action filter enabled: {FILTER_ACTIONS}")
          if STATIC_FILTER:
              logger.info(f"Static filter enabled: {STATIC_FILTER}")
      
          try:
              # Step 1: Read checkpoint state
              since_ms, page_token = _read_state()
      
              if page_token:
                  logger.info(f"Resuming pagination from saved pageToken")
              else:
                  since_dt = datetime.fromtimestamp(since_ms / 1000, tz=timezone.utc)
                  logger.info(f"Starting new collection from: {since_dt.isoformat()}")
      
              # Step 2: Collect all events with pagination
              all_events = []
              current_page_token = page_token
              page_count = 0
              max_pages = 100  # Safety limit
              has_next = True
      
              while has_next and page_count < max_pages:
                  page_count += 1
                  logger.info(f"--- Fetching page {page_count} ---")
      
                  # Fetch one page of results
                  result = _fetch_harness_audits(since_ms, current_page_token)
      
                  # Extract events
                  events = result.get("events", [])
                  all_events.extend(events)
      
                  logger.info(f"Page {page_count}: {len(events)} events (total: {len(all_events)})")
      
                  # Check pagination status
                  has_next = result.get("hasNext", False)
                  current_page_token = result.get("pageToken")
      
                  if not has_next:
                      logger.info("Pagination complete (hasNext=False)")
                      break
      
                  if not current_page_token:
                      logger.warning("hasNext=True but no pageToken, stopping pagination")
                      break
      
                  # Small delay between pages to avoid rate limiting
                  time.sleep(0.5)
      
              if page_count >= max_pages:
                  logger.warning(f"Reached max pages limit ({max_pages}), stopping")
      
              # Step 3: Upload collected events to S3
              if all_events:
                  s3_key = _upload_to_s3(all_events)
                  logger.info(f"Successfully uploaded {len(all_events)} total events")
              else:
                  logger.info("No new events to upload")
                  s3_key = None
      
              # Step 4: Update checkpoint state
              if not has_next:
                  # Pagination complete - update since to current time for next run
                  new_since = int(time.time() * 1000)
                  _write_state(new_since, None)
                  logger.info(f"Pagination complete, state updated with new since={new_since}")
              else:
                  # Pagination incomplete - save pageToken for continuation
                  _write_state(since_ms, current_page_token)
                  logger.info(f"Pagination incomplete, saved pageToken for next run")
      
              # Step 5: Return result
              result = {
                  "statusCode": 200,
                  "message": "Success",
                  "eventsCollected": len(all_events),
                  "pagesProcessed": page_count,
                  "paginationComplete": not has_next,
                  "s3Key": s3_key,
                  "filters": {
                      "modules": FILTER_MODULES,
                      "actions": FILTER_ACTIONS,
                      "staticFilter": STATIC_FILTER
                  }
              }
      
              logger.info(f"Collection completed: {json.dumps(result)}")
              return result
      
          except Exception as e:
              logger.error(f"Collection failed: {e}", exc_info=True)
      
              result = {
                  "statusCode": 500,
                  "message": "Error",
                  "error": str(e),
                  "errorType": type(e).__name__
              }
      
              return result
      
          finally:
              logger.info("=== Harness Audit Collection Finished ===")
      
      # ============================================
      # Lambda Handler
      # ============================================
      def lambda_handler(event, context):
          """AWS Lambda handler function."""
          return fetch_and_store()
      
      # ============================================
      # Local Testing
      # ============================================
      if __name__ == "__main__":
          # For local testing
          result = lambda_handler(None, None)
          print(json.dumps(result, indent=2))
      

  7. 배포를 클릭하여 함수 코드를 저장합니다.

Lambda 환경 변수 구성

  1. Lambda 함수 페이지에서 구성 탭을 선택합니다.
  2. 왼쪽 사이드바에서 환경 변수를 클릭합니다.
  3. 수정을 클릭합니다.
  4. 다음 각 항목에 대해 환경 변수 추가를 클릭합니다.

    필수 환경 변수:

    설명
    HARNESS_ACCOUNT_ID Harness 계정 ID Harness의 계정 식별자
    HARNESS_API_KEY API 키 토큰 audit:read 권한이 있는 토큰
    S3_BUCKET harness-io-logs S3 버킷 이름
    S3_PREFIX harness/audit S3 객체의 접두사
    STATE_KEY harness/audit/state.json S3의 상태 파일 경로

    선택적 환경 변수:

    기본값 설명
    HARNESS_API_BASE https://app.harness.io Harness API 기본 URL
    PAGE_SIZE 50 페이지당 이벤트 수 (최대 100개)
    START_MINUTES_BACK 60 초기 확인 기간(분)
    FILTER_MODULES 없음 쉼표로 구분된 모듈 (예: CD,CI,CE)
    FILTER_ACTIONS 없음 쉼표로 구분된 작업 (예: CREATE,UPDATE,DELETE)
    STATIC_FILTER 없음 사전 정의된 필터: EXCLUDE_LOGIN_EVENTS 또는 EXCLUDE_SYSTEM_EVENTS
    MAX_RETRIES 3 비율 제한의 최대 재시도 횟수
  5. 저장을 클릭합니다.

Lambda 제한 시간 및 메모리 구성

  1. Lambda 함수 페이지에서 구성 탭을 선택합니다.
  2. 왼쪽 사이드바에서 일반 구성을 클릭합니다.
  3. 수정을 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 메모리: 256 MB (권장)
    • 제한 시간: 5 min 0 sec (300초)
  5. 저장을 클릭합니다.

EventBridge 일정 만들기

  1. Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
  2. 다음 구성 세부정보를 제공합니다.
    • 일정 이름: harness-audit-hourly를 입력합니다.
    • 설명: 선택사항인 설명입니다.
  3. 다음을 클릭합니다.
  4. 일정 패턴에서 반복 일정을 선택합니다.
  5. 요금 기반 일정을 선택합니다.
  6. 다음 구성 세부정보를 제공합니다.
    • 요율 표현식: 1 hour를 입력합니다.
  7. 다음을 클릭합니다.
  8. 타겟에서 다음 구성 세부정보를 제공합니다.
    • 타겟 API: AWS Lambda 호출을 선택합니다.
    • Lambda 함수: 함수 harness-audit-to-s3를 선택합니다.
  9. 다음을 클릭합니다.
  10. 일정 구성을 검토합니다.
  11. 일정 만들기를 클릭합니다.

Google SecOps용 읽기 전용 IAM 사용자 만들기

이 IAM 사용자를 통해 Google SecOps가 S3 버킷에서 로그를 읽을 수 있습니다.

  1. AWS 콘솔 > IAM > 사용자 > 사용자 만들기로 이동합니다.
  2. 다음 구성 세부정보를 제공합니다.
    • 사용자 이름: chronicle-s3-reader를 입력합니다.
  3. 다음을 클릭합니다.
  4. 정책 직접 연결을 선택합니다.
  5. 정책 만들기를 클릭합니다.
  6. JSON 탭을 선택합니다.
  7. 다음 정책을 붙여넣습니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::harness-io-logs/harness/audit/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::harness-io-logs",
          "Condition": {
            "StringLike": {
              "s3:prefix": "harness/audit/*"
            }
          }
        }
      ]
    }
    
  8. 다음을 클릭합니다.

  9. ChronicleHarnessS3ReadPolicy 정책의 이름을 지정합니다.

  10. 정책 만들기를 클릭합니다.

  11. 사용자 생성 탭으로 돌아가 정책 목록을 새로고침합니다.

  12. ChronicleHarnessS3ReadPolicy을 검색하여 선택합니다.

  13. 다음을 클릭합니다.

  14. 검토하고 사용자 만들기를 클릭합니다.

리더 사용자의 액세스 키 만들기

  1. IAM 사용자 페이지에서 chronicle-s3-reader 사용자를 선택합니다.
  2. 보안용 사용자 인증 정보 탭을 선택합니다.
  3. 액세스 키 만들기를 클릭합니다.
  4. 사용 사례로 서드 파티 서비스를 선택합니다.
  5. 다음을 클릭합니다.
  6. 선택사항: 설명 태그를 추가합니다.
  7. 액세스 키 만들기를 클릭합니다.
  8. CSV 파일 다운로드를 클릭하여 액세스 키 ID와 보안 비밀 액세스 키를 저장합니다.
  9. 완료를 클릭합니다.

Harness IO 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정> 피드로 이동합니다.
  2. 새로 추가를 클릭합니다.
  3. 다음 페이지에서 단일 피드 구성을 클릭합니다.
  4. 피드 이름 필드에 피드 이름을 입력합니다(예: Harness Audit Logs).
  5. 소스 유형으로 Amazon S3 V2를 선택합니다.
  6. 로그 유형으로 Harness IO를 선택합니다.
  7. 다음을 클릭합니다.
  8. 다음 입력 매개변수의 값을 지정합니다.

    • S3 URI: 접두사 경로를 사용하여 S3 버킷 URI를 입력합니다. s3://harness-io-logs/harness/audit/
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.

      • 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (처음에는 권장).
      • 성공 시: 전송이 성공한 후 모든 파일과 빈 디렉터리를 삭제합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.

    • 액세스 키 ID: chronicle-s3-reader 사용자의 액세스 키 ID를 입력합니다.

    • 보안 비밀 액세스 키: chronicle-s3-reader 사용자의 보안 비밀 액세스 키를 입력합니다.

    • 애셋 네임스페이스: 애셋 네임스페이스입니다. harness.audit를 입력합니다.

    • 수집 라벨: 이 피드의 이벤트에 적용할 선택적 라벨입니다.

  9. 다음을 클릭합니다.

  10. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.