Cisco Application Centric Infrastructure (ACI) 로그 수집

다음에서 지원:

이 문서에서는 Cisco Application Centric Infrastructure (ACI) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 먼저 Grok 패턴을 사용하여 수신되는 Cisco ACI 로그를 syslog 메시지로 처리하려고 시도합니다. syslog 파싱이 실패하면 메시지가 JSON 형식이라고 가정하고 그에 따라 파싱합니다. 마지막으로 추출된 필드를 통합 데이터 모델 (UDM)에 매핑합니다.

이 통합은 다음 두 가지 방법을 지원합니다.

  • 옵션 1: Bindplane 에이전트를 사용하는 Syslog 형식
  • 옵션 2: APIC REST API를 사용하여 Google Cloud Storage를 사용하는 JSON 형식

각 옵션은 독립적이며 인프라 요구사항과 로그 형식 환경설정에 따라 독립적으로 구현할 수 있습니다.

옵션 1: Bindplane 에이전트를 사용하는 Syslog

이 옵션은 Cisco ACI 패브릭이 syslog 메시지를 Bindplane 에이전트로 전송하도록 구성합니다. Bindplane 에이전트는 메시지를 Google Security Operations로 전달하여 분석합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • systemd가 설치된 Windows 2016 이상 또는 Linux 호스트
  • 프록시 뒤에서 실행하는 경우 Bindplane 에이전트 요구사항에 따라 방화벽 포트가 열려 있는지 확인합니다.
  • Cisco APIC 콘솔에 대한 권한 있는 액세스

Google SecOps 수집 인증 파일 가져오기

  1. SIEM 설정 > 수집 에이전트로 이동합니다.
  2. 수집 인증 파일을 다운로드합니다.
  3. Bindplane이 설치될 시스템에 파일을 안전하게 저장합니다.

Google SecOps 고객 ID 가져오기

  1. SIEM 설정 > 프로필로 이동합니다.
  2. 조직 세부정보 섹션에서 고객 ID를 복사하여 저장합니다.

Bindplane 에이전트 설치

다음 안내에 따라 Windows 또는 Linux 운영체제에 Bindplane 에이전트를 설치합니다.

Windows 설치

  1. 관리자 권한으로 명령 프롬프트 또는 PowerShell을 엽니다.
  2. 다음 명령어를 실행합니다.

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux 설치

  1. 루트 또는 sudo 권한으로 터미널을 엽니다.
  2. 다음 명령어를 실행합니다.

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

추가 설치 옵션은 Bindplane 에이전트 설치 가이드를 참고하세요.

Syslog를 수집하여 Google SecOps로 전송하도록 Bindplane 에이전트 구성

구성 파일 액세스

  1. config.yaml 파일을 찾습니다. 일반적으로 Linux에서는 /etc/bindplane-agent/ 디렉터리에 있고 Windows에서는 설치 디렉터리에 있습니다.
  2. 텍스트 편집기 (예: nano, vi, 메모장)를 사용하여 파일을 엽니다.
  3. config.yaml 파일을 수정합니다.

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'CISCO_ACI'
        raw_log_field: body
        ingestion_labels:
          service:
    
    pipelines:
      logs/source0__chronicle_w_labels-0:
        receivers:
          - udplog
        exporters:
          - chronicle/chronicle_w_labels
    
    • 다음을 바꿉니다.
      • 인프라에 필요한 대로 포트와 IP 주소를 바꿉니다.
      • <CUSTOMER_ID>를 실제 고객 ID로 바꿉니다.
      • /path/to/ingestion-authentication-file.json를 인증 파일이 저장된 경로로 업데이트합니다.

Bindplane 에이전트를 다시 시작하여 변경사항 적용

  • Linux에서 Bindplane 에이전트를 다시 시작하려면 다음 명령어를 실행합니다.

    sudo systemctl restart bindplane-agent
    
  • Windows에서 Bindplane 에이전트를 다시 시작하려면 서비스 콘솔을 사용하거나 다음 명령어를 입력하면 됩니다.

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Cisco ACI에서 Syslog 전달 구성

대역 외 관리 계약 구성

  1. Cisco APIC 콘솔에 로그인합니다.
  2. 테넌트 > 관리 > 계약 > 필터로 이동합니다.
  3. 필터 만들기를 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름syslog-udp-514를 입력합니다.
    • 항목 이름: syslog를 입력합니다.
    • EtherType: IP를 선택합니다.
    • IP 프로토콜: UDP를 선택합니다.
    • 대상 포트 범위 시작: 514을 입력합니다.
    • 대상 포트 범위: 514를 입력합니다.
  5. 제출을 클릭합니다.

관리 계약 만들기

  1. 테넌트 > 관리 > 계약 > 표준으로 이동합니다.
  2. 계약 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 이름mgmt-syslog-contract를 입력합니다.
    • 범위: 컨텍스트를 선택합니다.
  4. 제출을 클릭합니다.
  5. 계약을 펼치고 과목을 클릭합니다.
  6. 계약 주제 만들기를 클릭합니다.
  7. 다음 구성 세부정보를 제공합니다.
    • 이름syslog-subject를 입력합니다.
    • 양방향 적용: 이 옵션을 선택합니다.
  8. 제출을 클릭합니다.
  9. 주제를 펼치고 필터를 클릭합니다.
  10. 필터 바인딩 만들기를 클릭합니다.
  11. syslog-udp-514 필터를 선택합니다.
  12. 제출을 클릭합니다.

Syslog 대상 그룹 구성

  1. 관리자 > 외부 데이터 수집기 > 모니터링 대상 > Syslog로 이동합니다.
  2. Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Monitoring Destination Group(Syslog 모니터링 대상 그룹 만들기)을 선택합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 이름Chronicle-Syslog-Group를 입력합니다.
    • 관리자 상태: 사용을 선택합니다.
    • 형식: aci를 선택합니다.
  4. 다음을 클릭합니다.
  5. 시스템로그 모니터링 대상 만들기 대화상자에서 다음을 수행합니다.
    • 이름Chronicle-BindPlane를 입력합니다.
    • 호스트: Bindplane 에이전트 서버의 IP 주소를 입력합니다.
    • 포트: 514를 입력합니다.
    • 관리자 상태: 사용을 선택합니다.
    • 심각도: 정보를 선택하여 자세한 로그를 캡처합니다.
  6. 제출을 클릭합니다.

모니터링 정책 구성

패브릭 모니터링 정책
  1. 패브릭 > 패브릭 정책 > 정책 > 모니터링 > 공통 정책으로 이동합니다.
  2. Callhome/Smart Callhome/SNMP/Syslog/TACACS를 펼칩니다.
  3. Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Source를 선택합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름Chronicle-Fabric-Syslog를 입력합니다.
    • 감사 로그: 감사 이벤트를 포함하는지 확인합니다.
    • 이벤트: 시스템 이벤트를 포함하려면 선택합니다.
    • 오류: 오류 이벤트를 포함할지 선택합니다.
    • 세션 로그: 세션 로그를 포함하려면 선택합니다.
    • 대상 그룹: Chronicle-Syslog-Group를 선택합니다.
  5. 제출을 클릭합니다.
액세스 모니터링 정책
  1. 패브릭 > 액세스 정책 > 정책 > 모니터링 > 기본 정책으로 이동합니다.
  2. Callhome/Smart Callhome/SNMP/Syslog를 펼칩니다.
  3. Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Source를 선택합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름Chronicle-Access-Syslog를 입력합니다.
    • 감사 로그: 감사 이벤트를 포함하는지 확인합니다.
    • 이벤트: 시스템 이벤트를 포함하려면 선택합니다.
    • 오류: 오류 이벤트를 포함할지 선택합니다.
    • 세션 로그: 세션 로그를 포함하려면 선택합니다.
    • 대상 그룹: Chronicle-Syslog-Group를 선택합니다.
  5. 제출을 클릭합니다.

시스템 Syslog 메시지 정책 구성

  1. 패브릭 > 패브릭 정책 > 정책 > 모니터링 > 공통 정책으로 이동합니다.
  2. Syslog Messages Policies를 펼칩니다.
  3. 기본값을 클릭합니다.
  4. 시설 필터 섹션에서 다음을 수행합니다.
    • 시설: 기본값을 선택합니다.
    • 최소 심각도: 정보로 변경합니다.
  5. 제출을 클릭합니다.

옵션 2: Google Cloud Storage를 사용하는 JSON

이 옵션은 APIC REST API를 사용하여 Cisco ACI 패브릭에서 JSON 형식 이벤트, 결함, 감사 로그를 수집하고 Google SecOps 수집을 위해 Google Cloud Storage에 저장합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • Cloud Storage API가 사용 설정된 GCP 프로젝트
  • GCS 버킷을 만들고 관리할 수 있는 권한
  • GCS 버킷의 IAM 정책을 관리할 수 있는 권한
  • Cloud Run 서비스, Pub/Sub 주제, Cloud Scheduler 작업을 만들 수 있는 권한
  • Cisco APIC 콘솔에 대한 권한 있는 액세스

Cisco ACI APIC 기본 요건 수집

APIC 사용자 인증 정보 가져오기

  1. HTTPS를 사용하여 Cisco APIC 콘솔에 로그인합니다.
  2. 관리 > AAA (APIC 6.0 이상) 또는 관리 > 인증 > AAA (이전 버전)로 이동합니다.

  3. 적절한 권한이 있는 로컬 사용자를 만들거나 기존 사용자를 사용합니다.

  4. 다음 세부정보를 복사하여 안전한 위치에 저장합니다.

    • APIC 사용자 이름: 모니터링 데이터에 대한 읽기 액세스 권한이 있는 로컬 사용자
    • APIC 비밀번호: 사용자 비밀번호
    • APIC URL: APIC의 HTTPS URL (예: https://apic.example.com)

Google Cloud Storage 버킷 만들기

  1. Google Cloud 콘솔로 이동합니다.
  2. 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
  3. 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
  4. 버킷 만들기를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.

    설정
    버킷 이름 지정 전역적으로 고유한 이름 (예: cisco-aci-logs)을 입력합니다.
    위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전)
    위치 위치를 선택합니다 (예: us-central1).
    스토리지 클래스 Standard (자주 액세스하는 로그에 권장)
    액세스 제어 균일 (권장)
    보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정
  6. 만들기를 클릭합니다.

Cloud Run 함수의 서비스 계정 만들기

Cloud Run 함수에는 GCS 버킷에 쓸 수 있고 Pub/Sub에서 호출할 수 있는 권한이 있는 서비스 계정이 필요합니다.

서비스 계정 만들기

  1. GCP 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
  2. 서비스 계정 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 서비스 계정 이름: cisco-aci-collector-sa을 입력합니다.
    • 서비스 계정 설명: Service account for Cloud Run function to collect Cisco ACI logs을 입력합니다.
  4. 만들고 계속하기를 클릭합니다.
  5. 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 역할을 추가합니다.
    1. 역할 선택을 클릭합니다.
    2. 스토리지 객체 관리자를 검색하여 선택합니다.
    3. + 다른 역할 추가를 클릭합니다.
    4. Cloud Run 호출자를 검색하여 선택합니다.
    5. + 다른 역할 추가를 클릭합니다.
    6. Cloud Functions 호출자를 검색하여 선택합니다.
  6. 계속을 클릭합니다.
  7. 완료를 클릭합니다.

이러한 역할은 다음 작업에 필요합니다.

  • 스토리지 객체 관리자: GCS 버킷에 로그를 쓰고 상태 파일을 관리합니다.
  • Cloud Run 호출자: Pub/Sub가 함수를 호출하도록 허용
  • Cloud Functions 호출자: 함수 호출 허용

GCS 버킷에 대한 IAM 권한 부여

서비스 계정 (cisco-aci-collector-sa)에 GCS 버킷에 대한 쓰기 권한을 부여합니다.

  1. Cloud Storage> 버킷으로 이동합니다.
  2. 버킷 이름 (예: cisco-aci-logs)을 클릭합니다.
  3. 권한 탭으로 이동합니다.
  4. 액세스 권한 부여를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.
    • 주 구성원 추가: 서비스 계정 이메일 (예: cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com)을 입력합니다.
    • 역할 할당: 스토리지 객체 관리자를 선택합니다.
  6. 저장을 클릭합니다.

게시/구독 주제 만들기

Cloud Scheduler가 게시하고 Cloud Run 함수가 구독할 Pub/Sub 주제를 만듭니다.

  1. GCP Console에서 Pub/Sub > 주제로 이동합니다.
  2. 주제 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 주제 ID: cisco-aci-trigger를 입력합니다.
    • 다른 설정은 기본값으로 둡니다.
  4. 만들기를 클릭합니다.

로그를 수집하는 Cloud Run 함수 만들기

Cloud Run 함수는 Cloud Scheduler의 Pub/Sub 메시지에 의해 트리거되어 Cisco APIC REST API에서 로그를 가져와 GCS에 씁니다.

  1. GCP 콘솔에서 Cloud Run으로 이동합니다.
  2. 서비스 만들기를 클릭합니다.
  3. 함수를 선택합니다 (인라인 편집기를 사용하여 함수 만들기).
  4. 구성 섹션에서 다음 구성 세부정보를 제공합니다.

    설정
    서비스 이름 cisco-aci-collector
    리전 GCS 버킷과 일치하는 리전을 선택합니다 (예: us-central1).
    런타임 Python 3.12 이상 선택
  5. 트리거 (선택사항) 섹션에서 다음을 수행합니다.

    1. + 트리거 추가를 클릭합니다.
    2. Cloud Pub/Sub를 선택합니다.
    3. Cloud Pub/Sub 주제 선택에서 Pub/Sub 주제 (cisco-aci-trigger)를 선택합니다.
    4. 저장을 클릭합니다.
  6. 인증 섹션에서 다음을 구성합니다.

    • 인증 필요를 선택합니다.
    • Identity and Access Management (IAM)를 선택합니다.
  1. 컨테이너, 네트워킹, 보안으로 스크롤하여 펼칩니다.
  2. 보안 탭으로 이동합니다.
    • 서비스 계정: 서비스 계정 (cisco-aci-collector-sa)을 선택합니다.
  3. 컨테이너 탭으로 이동합니다.

    • 변수 및 보안 비밀을 클릭합니다.
    • 각 환경 변수에 대해 + 변수 추가를 클릭합니다.

      변수 이름 예시 값 설명
      GCS_BUCKET cisco-aci-logs GCS 버킷 이름
      GCS_PREFIX cisco-aci-events 로그 파일의 접두사
      STATE_KEY cisco-aci-events/state.json 상태 파일 경로
      APIC_URL https://apic.example.com APIC HTTPS URL
      APIC_USERNAME your-apic-username APIC 사용자 이름
      APIC_PASSWORD your-apic-password APIC 비밀번호
      PAGE_SIZE 100 페이지당 레코드 수
      MAX_PAGES 10 실행당 최대 페이지 수
  4. 변수 및 보안 비밀 섹션에서 요청으로 스크롤합니다.

    • 요청 제한 시간: 300초 (5분)를 입력합니다.
  5. 설정 탭으로 이동합니다.

    • 리소스 섹션에서 다음을 수행합니다.
      • 메모리: 512MiB 이상을 선택합니다.
      • CPU: 1을 선택합니다.
  6. 버전 확장 섹션에서 다음을 수행합니다.

    • 최소 인스턴스 수: 0를 입력합니다.
    • 최대 인스턴스 수: 100을 입력합니다 (또는 예상 부하에 따라 조정).
  7. 만들기를 클릭합니다.

  8. 서비스가 생성될 때까지 기다립니다 (1~2분).

  9. 서비스가 생성되면 인라인 코드 편집기가 자동으로 열립니다.

함수 코드 추가

  1. 진입점 필드에 main을 입력합니다.
  2. 인라인 코드 편집기에서 다음 두 파일을 만듭니다.

    • 첫 번째 파일: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=60.0),
        retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events')
    STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json')
    APIC_URL = os.environ.get('APIC_URL')
    APIC_USERNAME = os.environ.get('APIC_USERNAME')
    APIC_PASSWORD = os.environ.get('APIC_PASSWORD')
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
    @functions_framework.cloud_event
    def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS.
    
        Args:
            cloud_event: CloudEvent object containing Pub/Sub message
        """
    
        if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]):
            logger.error('Error: Missing required environment variables')
            return
    
        try:
            # Get GCS bucket
            bucket = storage_client.bucket(GCS_BUCKET)
    
            # Load state
            state = load_state(bucket, STATE_KEY)
    
            # Determine time window
            last_timestamp = state.get('last_timestamp')
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
    
            logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}")
    
            # Authenticate to APIC
            session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD)
            headers = {
                'Cookie': f'APIC-cookie={session_token}',
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            }
    
            # Data types to collect
            data_types = ['faultInst', 'eventRecord', 'aaaModLR']
            all_collected_data = []
    
            for data_type in data_types:
                logger.info(f"Collecting {data_type} data")
                collected_data = collect_aci_data(
                    APIC_URL,
                    headers,
                    data_type,
                    last_timestamp,
                    PAGE_SIZE,
                    MAX_PAGES
                )
    
                # Tag each record with its type
                for record in collected_data:
                    record['_data_type'] = data_type
    
                all_collected_data.extend(collected_data)
                logger.info(f"Collected {len(collected_data)} {data_type} records")
    
            logger.info(f"Total records collected: {len(all_collected_data)}")
    
            # Store data in GCS if any were collected
            if all_collected_data:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson"
    
                # Convert to NDJSON format (one JSON object per line)
                ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data)
    
                # Upload to GCS
                blob = bucket.blob(s3_key)
                blob.upload_from_string(
                    ndjson_content,
                    content_type='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}")
    
                # Update state file with latest timestamp from collected data
                latest_timestamp = get_latest_timestamp_from_records(all_collected_data)
                if not latest_timestamp:
                    latest_timestamp = datetime.utcnow().isoformat() + 'Z'
    
                update_state(bucket, STATE_KEY, latest_timestamp)
            else:
                logger.info("No new log records found.")
    
            logger.info(f"Successfully processed {len(all_collected_data)} records")
    
        except Exception as e:
            logger.error(f'Error processing logs: {str(e)}')
            raise
    
    def authenticate_apic(apic_url, username, password):
        """Authenticate to APIC and return session token"""
        login_url = f"{apic_url}/api/aaaLogin.json"
        login_data = {
            "aaaUser": {
                "attributes": {
                    "name": username,
                    "pwd": password
                }
            }
        }
    
        response = http.request(
            'POST',
            login_url,
            body=json.dumps(login_data).encode('utf-8'),
            headers={'Content-Type': 'application/json'},
            timeout=30
        )
    
        if response.status != 200:
            raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}")
    
        response_data = json.loads(response.data.decode('utf-8'))
        token = response_data['imdata'][0]['aaaLogin']['attributes']['token']
        logger.info("Successfully authenticated to APIC")
        return token
    
    def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):
        """Collect data from APIC REST API with pagination"""
        all_data = []
        page = 0
    
        while page < max_pages:
            # Build API URL with pagination and time filters
            api_url = f"{apic_url}/api/class/{data_type}.json"
            params = [
                f'page-size={page_size}',
                f'page={page}',
                f'order-by={data_type}.created|asc'
            ]
    
            # Add time filter to prevent duplicates
            if last_timestamp:
                params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")')
    
            full_url = f"{api_url}?{'&'.join(params)}"
    
            logger.info(f"Fetching {data_type} page {page} from APIC")
    
            # Make API request
            response = http.request('GET', full_url, headers=headers, timeout=60)
    
            if response.status != 200:
                logger.error(f"API request failed: {response.status} {response.data[:256]!r}")
                break
    
            data = json.loads(response.data.decode('utf-8'))
            records = data.get('imdata', [])
    
            if not records:
                logger.info(f"No more {data_type} records found")
                break
    
            # Extract the actual data from APIC format
            extracted_records = []
            for record in records:
                if data_type in record:
                    extracted_records.append(record[data_type])
    
            all_data.extend(extracted_records)
            page += 1
    
            # If we got less than page_size records, we've reached the end
            if len(records) < page_size:
                break
    
        return all_data
    
    def get_last_timestamp(bucket, state_key):
        """Get the last run timestamp from GCS state file"""
        try:
            blob = bucket.blob(state_key)
            if blob.exists():
                state_data = blob.download_as_text()
                state = json.loads(state_data)
                return state.get('last_timestamp')
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
    
        return None
    
    def get_latest_timestamp_from_records(records):
        """Get the latest timestamp from collected records to prevent missing events"""
        if not records:
            return None
    
        latest = None
        latest_time = None
    
        for record in records:
            try:
                # Handle both direct attributes and nested structure
                attrs = record.get('attributes', record)
                created = attrs.get('created')
                modTs = attrs.get('modTs')
    
                # Use created or modTs as fallback
                timestamp = created or modTs
    
                if timestamp:
                    if latest_time is None or timestamp > latest_time:
                        latest_time = timestamp
                        latest = record
            except Exception as e:
                logger.debug(f"Error parsing timestamp from record: {e}")
                continue
    
        return latest_time
    
    def update_state(bucket, state_key, timestamp):
        """Update the state file with the current timestamp"""
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
            blob = bucket.blob(state_key)
            blob.upload_from_string(
                json.dumps(state_data),
                content_type='application/json'
            )
            logger.info(f"Updated state file with timestamp: {timestamp}")
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
    def load_state(bucket, key):
        """Load state from GCS."""
        try:
            blob = bucket.blob(key)
            if blob.exists():
                state_data = blob.download_as_text()
                return json.loads(state_data)
        except Exception as e:
            logger.warning(f"Could not load state: {e}")
    
        return {}
    
    • 두 번째 파일: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. 배포를 클릭하여 함수를 저장하고 배포합니다.

  4. 배포가 완료될 때까지 기다립니다 (2~3분).

Cloud Scheduler 작업 만들기

Cloud Scheduler는 일정 간격으로 Pub/Sub 주제에 메시지를 게시하여 Cloud Run 함수를 트리거합니다.

  1. GCP Console에서 Cloud Scheduler로 이동합니다.
  2. 작업 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    설정
    이름 cisco-aci-collector-15m
    리전 Cloud Run 함수와 동일한 리전 선택
    주파수 */15 * * * * (15분마다)
    시간대 시간대 선택 (UTC 권장)
    타겟 유형 Pub/Sub
    주제 Pub/Sub 주제 (cisco-aci-trigger)를 선택합니다.
    메일 본문 {} (빈 JSON 객체)
  4. 만들기를 클릭합니다.

일정 빈도 옵션

  • 로그 볼륨 및 지연 시간 요구사항에 따라 빈도를 선택합니다.

    빈도 크론 표현식 사용 사례
    5분마다 */5 * * * * 대용량, 저지연
    15분마다 */15 * * * * 검색량 보통 (권장)
    1시간마다 0 * * * * 표준
    6시간마다 0 */6 * * * 양이 적은 일괄 처리
    매일 0 0 * * * 이전 데이터 수집

통합 테스트

  1. Cloud Scheduler 콘솔에서 작업을 찾습니다 (cisco-aci-collector-15m).
  2. 강제 실행을 클릭하여 작업을 수동으로 트리거합니다.
  3. 몇 초 동안 기다립니다.
  4. Cloud Run > 서비스로 이동합니다.
  5. 함수 이름 (cisco-aci-collector)을 클릭합니다.
  6. 로그 탭을 클릭합니다.
  7. 함수가 성공적으로 실행되었는지 확인합니다. 다음을 확인하세요.

    Starting Cisco ACI data collection for bucket: cisco-aci-logs
    Successfully authenticated to APIC
    Collecting faultInst data
    Collected X faultInst records
    Collecting eventRecord data
    Collected X eventRecord records
    Collecting aaaModLR data
    Collected X aaaModLR records
    Total records collected: X
    Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Cloud Storage> 버킷으로 이동합니다.

  9. 버킷 이름 (cisco-aci-logs)을 클릭합니다.

  10. 접두사 폴더 (cisco-aci-events/)로 이동합니다.

  11. 현재 타임스탬프를 사용하여 새 .ndjson 파일이 생성되었는지 확인합니다.

로그에 오류가 표시되면 다음 단계를 따르세요.

  • HTTP 401: 환경 변수에서 APIC 사용자 인증 정보 확인
  • HTTP 403: APIC 계정에 faultInst, eventRecord, aaaModLR 클래스에 대한 읽기 권한이 있는지 확인
  • 연결 오류: Cloud Run 함수가 TCP/443에서 APIC URL에 연결할 수 있는지 확인
  • 환경 변수 누락: 필수 변수가 모두 설정되었는지 확인

Google SecOps 서비스 계정 가져오기

Google SecOps는 고유한 서비스 계정을 사용하여 GCS 버킷에서 데이터를 읽습니다. 이 서비스 계정에 버킷에 대한 액세스 권한을 부여해야 합니다.

서비스 계정 이메일 가져오기

  1. SIEM 설정> 피드로 이동합니다.
  2. 새 피드 추가를 클릭합니다.
  3. 단일 피드 구성을 클릭합니다.
  4. 피드 이름 필드에 피드 이름을 입력합니다(예: Cisco ACI JSON logs).
  5. 소스 유형으로 Google Cloud Storage V2를 선택합니다.
  6. 로그 유형으로 Cisco Application Centric Infrastructure를 선택합니다.
  7. 서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다. 예를 들면 다음과 같습니다.

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. 이 이메일 주소를 복사합니다. 다음 단계에서 사용합니다.

Google SecOps 서비스 계정에 IAM 권한 부여

Google SecOps 서비스 계정에는 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.

  1. Cloud Storage> 버킷으로 이동합니다.
  2. 버킷 이름 (cisco-aci-logs)을 클릭합니다.
  3. 권한 탭으로 이동합니다.
  4. 액세스 권한 부여를 클릭합니다.
  5. 다음 구성 세부정보를 제공합니다.
    • 주 구성원 추가: Google SecOps 서비스 계정 이메일을 붙여넣습니다.
    • 역할 할당: 스토리지 객체 뷰어를 선택합니다.
  6. 저장을 클릭합니다.

Cisco ACI 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정> 피드로 이동합니다.
  2. 새 피드 추가를 클릭합니다.
  3. 단일 피드 구성을 클릭합니다.
  4. 피드 이름 필드에 피드 이름을 입력합니다(예: Cisco ACI JSON logs).
  5. 소스 유형으로 Google Cloud Storage V2를 선택합니다.
  6. 로그 유형으로 Cisco Application Centric Infrastructure를 선택합니다.
  7. 다음을 클릭합니다.
  8. 다음 입력 매개변수의 값을 지정합니다.

    • 스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.

      gs://cisco-aci-logs/cisco-aci-events/
      
      • 다음과 같이 바꿉니다.
        • cisco-aci-logs: GCS 버킷 이름입니다.
        • cisco-aci-events: 로그가 저장되는 접두사/폴더 경로입니다.
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.

      • 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
      • 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
      • 전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.

    • 애셋 네임스페이스: 애셋 네임스페이스입니다.

    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.

  9. 다음을 클릭합니다.

  10. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

UDM 매핑 테이블

로그 필드 UDM 매핑 로직
@timestamp read_only_udm.metadata.event_timestamp 값은 원시 로그 필드 '@timestamp'에서 가져와 타임스탬프로 파싱됩니다.
aci_tag read_only_udm.metadata.product_log_id 값은 원시 로그 필드 'aci_tag'에서 가져옵니다.
cisco_timestamp - 매핑되지 않음
DIP read_only_udm.target.ip 값은 원시 로그 필드 'DIP'에서 가져옵니다.
DPort read_only_udm.target.port 값은 원시 로그 필드 'DPort'에서 가져와 정수로 변환됩니다.
설명 read_only_udm.security_result.description 값은 원시 로그 필드 'description'에서 가져옵니다.
fault_cause read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'fault_cause'에서 가져옵니다. 키가 'Fault Cause'로 설정됩니다.
hostname read_only_udm.principal.hostname 값은 원시 로그 필드 'hostname'에서 가져옵니다.
lifecycle_state read_only_udm.metadata.product_event_type 값은 원시 로그 필드 'lifecycle_state'에서 가져옵니다.
log.source.address - 매핑되지 않음
logstash.collect.host - 매핑되지 않음
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp 값은 원시 로그 필드 'logstash.collect.timestamp'에서 가져와 타임스탬프로 파싱됩니다.
logstash.ingest.host read_only_udm.intermediary.hostname 값은 원시 로그 필드 'logstash.ingest.host'에서 가져옵니다.
logstash.irm_environment read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'logstash.irm_environment'에서 가져옵니다. 키가 'IRM_Environment'로 설정됩니다.
logstash.irm_region read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'logstash.irm_region'에서 가져옵니다. 키가 'IRM_Region'으로 설정됩니다.
logstash.irm_site read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'logstash.irm_site'에서 가져옵니다. 키가 'IRM_Site'로 설정됩니다.
logstash.process.host read_only_udm.intermediary.hostname 값은 원시 로그 필드 'logstash.process.host'에서 가져옵니다.
메시지 - 매핑되지 않음
message_class - 매핑되지 않음
message_code - 매핑되지 않음
message_content - 매핑되지 않음
message_dn - 매핑되지 않음
message_type read_only_udm.metadata.product_event_type 값은 대괄호를 삭제한 후 원시 로그 필드 'message_type'에서 가져옵니다.
node_link read_only_udm.principal.process.file.full_path 값은 원시 로그 필드 'node_link'에서 가져옵니다.
PktLen read_only_udm.network.received_bytes 값은 원시 로그 필드 'PktLen'에서 가져와 부호 없는 정수로 변환됩니다.
프로그램 - 매핑되지 않음
Proto read_only_udm.network.ip_protocol 값은 원시 로그 필드 'Proto'에서 가져와 정수로 변환되고 해당 IP 프로토콜 이름 (예: 6 -> TCP).
SIP read_only_udm.principal.ip 값은 원시 로그 필드 'SIP'에서 가져옵니다.
SPort read_only_udm.principal.port 값은 원시 로그 필드 'SPort'에서 가져와 정수로 변환됩니다.
syslog_facility - 매핑되지 않음
syslog_facility_code - 매핑되지 않음
syslog_host read_only_udm.principal.ip, read_only_udm.observer.ip 값은 원시 로그 필드 'syslog_host'에서 가져옵니다.
syslog_prog - 매핑되지 않음
syslog_severity read_only_udm.security_result.severity_details 값은 원시 로그 필드 'syslog_severity'에서 가져옵니다.
syslog_severity_code read_only_udm.security_result.severity 값은 원시 로그 필드 'syslog_severity_code'에서 가져와 해당 심각도 수준에 매핑됩니다. 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH
syslog5424_pri - 매핑되지 않음
Vlan-Id read_only_udm.principal.resource.id 값은 원시 로그 필드 'Vlan-Id'에서 가져옵니다.
- read_only_udm.metadata.event_type 로직: 'SIP' 또는 'hostname'이 있고 'Proto'가 있으면 'NETWORK_CONNECTION'으로 설정합니다. 'SIP', 'hostname', 'syslog_host'가 있는 경우 'STATUS_UPDATE'로 설정합니다. 그 외의 경우에는 'GENERIC_EVENT'로 설정합니다.
- read_only_udm.metadata.log_type 로직: 'CISCO_ACI'로 설정합니다.
- read_only_udm.metadata.vendor_name 로직: 'Cisco'로 설정됩니다.
- read_only_udm.metadata.product_name 논리: 'ACI'로 설정합니다.

도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.