Cisco Application Centric Infrastructure (ACI) 로그 수집

다음에서 지원:

이 문서에서는 Cisco Application Centric Infrastructure (ACI) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 먼저 Grok 패턴을 사용하여 수신되는 Cisco ACI 로그를 syslog 메시지로 처리하려고 시도합니다. syslog 파싱이 실패하면 메시지가 JSON 형식이라고 가정하고 그에 따라 파싱합니다. 마지막으로 추출된 필드를 통합 데이터 모델 (UDM)에 매핑합니다.

이 통합은 다음 두 가지 방법을 지원합니다.

  • 옵션 1: Bindplane 에이전트를 통한 Syslog 형식
  • 옵션 2: APIC REST API를 사용하여 AWS S3를 통한 JSON 형식

각 옵션은 독립적이며 인프라 요구사항 및 로그 형식 환경설정에 따라 독립적으로 구현할 수 있습니다.

옵션 1: Bindplane 에이전트를 통한 시스템 로그

이 옵션은 Cisco ACI 패브릭이 시스템 로그 메시지를 Bindplane 에이전트로 전송하도록 구성합니다. Bindplane 에이전트는 이를 Chronicle로 전달하여 분석합니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • systemd가 설치된 Windows 2016 이상 또는 Linux 호스트
  • 프록시 뒤에서 실행하는 경우 Bindplane 에이전트 요구사항에 따라 방화벽 포트가 열려 있는지 확인합니다.
  • Cisco APIC 콘솔에 대한 권한 있는 액세스

Google SecOps 수집 인증 파일 가져오기

  1. Google SecOps 콘솔에 로그인합니다.
  2. SIEM 설정 > 수집 에이전트로 이동합니다.
  3. 수집 인증 파일을 다운로드합니다. Bindplane이 설치될 시스템에 파일을 안전하게 저장합니다.

Google SecOps 고객 ID 가져오기

  1. Google SecOps 콘솔에 로그인합니다.
  2. SIEM 설정 > 프로필로 이동합니다.
  3. 조직 세부정보 섹션에서 고객 ID를 복사하여 저장합니다.

Bindplane 에이전트 설치

다음 안내에 따라 Windows 또는 Linux 운영체제에 Bindplane 에이전트를 설치합니다.

Windows 설치

  1. 명령 프롬프트 또는 PowerShell을 관리자로 엽니다.
  2. 다음 명령어를 실행합니다.

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Linux 설치

  1. 루트 또는 sudo 권한으로 터미널을 엽니다.
  2. 다음 명령어를 실행합니다.

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

추가 설치 리소스

Syslog를 수집하여 Google SecOps로 전송하도록 Bindplane 에이전트 구성

  1. 구성 파일에 액세스합니다.

    1. config.yaml 파일을 찾습니다. 일반적으로 Linux에서는 /etc/bindplane-agent/ 디렉터리에 있고 Windows에서는 설치 디렉터리에 있습니다.
    2. 텍스트 편집기 (예: nano, vi, 메모장)를 사용하여 파일을 엽니다.
  2. 다음과 같이 config.yaml 파일을 수정합니다.

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'CISCO_ACI'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • 인프라에 필요한 대로 포트와 IP 주소를 바꿉니다.
    • <customer_id>를 실제 고객 ID로 바꿉니다.
    • Google SecOps 수집 인증 파일 가져오기 섹션에서 인증 파일이 저장된 경로로 /path/to/ingestion-authentication-file.json를 업데이트합니다.

Bindplane 에이전트를 다시 시작하여 변경사항 적용

  • Linux에서 Bindplane 에이전트를 다시 시작하려면 다음 명령어를 실행합니다.

    sudo systemctl restart bindplane-agent
    
  • Windows에서 Bindplane 에이전트를 다시 시작하려면 서비스 콘솔을 사용하거나 다음 명령어를 입력하면 됩니다.

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Cisco ACI에서 Syslog 전달 구성

대역 외 관리 계약 구성

  1. Cisco APIC 콘솔에 로그인합니다.
  2. 테넌트 > 관리 > 계약 > 필터로 이동합니다.
  3. 필터 만들기를 클릭합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름: syslog-udp-514를 입력합니다.
    • 항목 이름: syslog를 입력합니다.
    • EtherType: IP를 선택합니다.
    • IP Protocol(IP 프로토콜): UDP를 선택합니다.
    • 대상 포트 범위 시작: 514을 입력합니다.
    • 대상 포트 범위: 514를 입력합니다.
  5. 제출을 클릭합니다.

관리 계약 만들기

  1. 테넌트 > 관리 > 계약 > 표준으로 이동합니다.
  2. 계약 만들기를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 이름: mgmt-syslog-contract를 입력합니다.
    • 범위: 컨텍스트를 선택합니다.
  4. 제출을 클릭합니다.
  5. 계약을 펼치고 과목을 클릭합니다.
  6. 계약 주제 만들기를 클릭합니다.
  7. 다음 구성 세부정보를 제공합니다.
    • 이름: syslog-subject를 입력합니다.
    • 양방향 적용: 이 옵션을 선택합니다.
  8. 제출을 클릭합니다.
  9. 주제를 펼치고 필터를 클릭합니다.
  10. 필터 바인딩 만들기를 클릭합니다.
  11. 앞에서 만든 syslog-udp-514 필터를 선택합니다.
  12. 제출을 클릭합니다.

Syslog 대상 그룹 구성

  1. 관리자 > 외부 데이터 수집기 > 모니터링 대상 > Syslog로 이동합니다.
  2. Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Monitoring Destination Group(Syslog 모니터링 대상 그룹 만들기)을 선택합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 이름: Chronicle-Syslog-Group를 입력합니다.
    • 관리자 상태: 사용을 선택합니다.
    • 형식: aci를 선택합니다.
  4. 다음을 클릭합니다.
  5. 시스템로그 모니터링 대상 만들기 대화상자에서 다음을 수행합니다.
    • 이름: Chronicle-BindPlane를 입력합니다.
    • 호스트: Bindplane 에이전트 서버의 IP 주소를 입력합니다.
    • 포트: 514를 입력합니다.
    • 관리자 상태: 사용을 선택합니다.
    • 심각도: 정보를 선택하여 자세한 로그를 캡처합니다.
  6. 제출을 클릭합니다.

모니터링 정책 구성

패브릭 모니터링 정책

  1. 패브릭 > 패브릭 정책 > 정책 > 모니터링 > 공통 정책으로 이동합니다.
  2. Callhome/Smart Callhome/SNMP/Syslog/TACACS를 펼칩니다.
  3. Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Source를 선택합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름: Chronicle-Fabric-Syslog를 입력합니다.
    • 감사 로그: 감사 이벤트를 포함하는지 확인합니다.
    • 이벤트: 시스템 이벤트를 포함하려면 선택합니다.
    • 오류: 오류 이벤트를 포함할지 선택합니다.
    • 세션 로그: 세션 로그를 포함하려면 선택합니다.
    • 대상 그룹: Chronicle-Syslog-Group를 선택합니다.
  5. 제출을 클릭합니다.

액세스 모니터링 정책

  1. 패브릭 > 액세스 정책 > 정책 > 모니터링 > 기본 정책으로 이동합니다.
  2. Callhome/Smart Callhome/SNMP/Syslog를 펼칩니다.
  3. Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Source를 선택합니다.
  4. 다음 구성 세부정보를 제공합니다.
    • 이름: Chronicle-Access-Syslog를 입력합니다.
    • 감사 로그: 감사 이벤트를 포함하는지 확인합니다.
    • 이벤트: 시스템 이벤트를 포함하려면 선택합니다.
    • 오류: 오류 이벤트를 포함할지 선택합니다.
    • 세션 로그: 세션 로그를 포함하려면 선택합니다.
    • 대상 그룹: Chronicle-Syslog-Group를 선택합니다.
  5. 제출을 클릭합니다.

시스템 Syslog 메시지 정책 구성

  1. 패브릭 > 패브릭 정책 > 정책 > 모니터링 > 공통 정책으로 이동합니다.
  2. Syslog Messages Policies를 펼칩니다.
  3. 기본값을 클릭합니다.
  4. 시설 필터 섹션에서 다음을 수행합니다.
    • 시설: 기본값을 선택합니다.
    • 최소 심각도: 정보로 변경합니다.
  5. 제출을 클릭합니다.

옵션 2: AWS S3를 통한 JSON

이 옵션은 APIC REST API를 사용하여 Cisco ACI 패브릭에서 JSON 형식의 이벤트, 결함, 감사 로그를 수집하고 SecOps 수집을 위해 AWS S3에 저장합니다.

시작하기 전에

  • Google SecOps 인스턴스
  • Cisco APIC 콘솔에 대한 권한 있는 액세스
  • AWS (S3, IAM, Lambda, EventBridge)에 대한 액세스 권한

Cisco ACI APIC 기본 요건 (ID, API 키, 조직 ID, 토큰) 수집

  1. HTTPS를 사용하여 Cisco APIC 콘솔에 로그인합니다.
  2. 관리 > AAA (APIC 6.0 이상) 또는 관리 > 인증 > AAA (이전 버전)로 이동합니다.
    • 참고: APIC 6.0(1)부터 AAA 메뉴 경로가 변경되었습니다.
  3. 적절한 권한이 있는 로컬 사용자를 만들거나 기존 사용자를 사용합니다.
  4. 다음 세부정보를 복사하여 안전한 위치에 저장합니다.
    • APIC 사용자 이름: 모니터링 데이터에 대한 읽기 액세스 권한이 있는 로컬 사용자
    • APIC 비밀번호: 사용자 비밀번호
    • APIC URL: APIC의 HTTPS URL (예: https://apic.example.com)

Google SecOps용 AWS S3 버킷 및 IAM 구성

  1. 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 참조할 수 있도록 버킷 이름리전을 저장합니다(예: cisco-aci-logs).
  3. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  4. 생성된 사용자를 선택합니다.
  5. 보안용 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키보안 비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. 정책 직접 연결을 선택합니다.
  17. AmazonS3FullAccess 정책을 검색합니다.
  18. 정책을 선택합니다.
  19. 다음을 클릭합니다.
  20. 권한 추가를 클릭합니다.

S3 업로드용 IAM 정책 및 역할 구성

  1. AWS 콘솔에서 IAM > 정책으로 이동합니다.
  2. 정책 만들기 > JSON 탭을 클릭합니다.
  3. 다음 정책을 입력합니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::cisco-aci-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::cisco-aci-logs/cisco-aci-events/state.json"
        }
      ]
    }
    
    • 다른 버킷 이름을 입력한 경우 cisco-aci-logs을 바꿉니다.
  4. 다음 > 정책 만들기를 클릭합니다.

  5. IAM > 역할 > 역할 생성 > AWS 서비스 > Lambda로 이동합니다.

  6. 새로 만든 정책과 AWSLambdaBasicExecutionRole 관리형 정책을 연결합니다.

  7. 역할 이름을 cisco-aci-lambda-role로 지정하고 역할 만들기를 클릭합니다.

Lambda 함수 만들기

  1. AWS 콘솔에서 Lambda > 함수 > 함수 만들기로 이동합니다.
  2. 처음부터 작성을 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.

    • 이름: cisco-aci-events-collector
    • 런타임: Python 3.13
    • 아키텍처: x86_64
    • 실행 역할: cisco-aci-lambda-role
  4. 함수를 만든 후 코드 탭을 열고 스텁을 삭제하고 다음 코드를 입력합니다 (cisco-aci-events-collector.py).

    import json
    import boto3
    import urllib3
    import base64
    from datetime import datetime, timedelta
    import os
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # AWS S3 client and HTTP pool manager
    s3_client = boto3.client('s3')
    http = urllib3.PoolManager()
    
    def lambda_handler(event, context):
        """
        AWS Lambda handler to fetch Cisco ACI events, faults, and audit logs and store them in S3
        """
    
        try:
            # Get environment variables
            s3_bucket = os.environ['S3_BUCKET']
            s3_prefix = os.environ['S3_PREFIX']
            state_key = os.environ['STATE_KEY']
            apic_url = os.environ['APIC_URL']
            apic_username = os.environ['APIC_USERNAME']
            apic_password = os.environ['APIC_PASSWORD']
    
            # Optional parameters
            page_size = int(os.environ.get('PAGE_SIZE', '100'))
            max_pages = int(os.environ.get('MAX_PAGES', '10'))
    
            logger.info(f"Starting Cisco ACI data collection for bucket: {s3_bucket}")
    
            # Get last run timestamp from state file
            last_timestamp = get_last_timestamp(s3_bucket, state_key)
            if not last_timestamp:
                last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
    
            # Authenticate to APIC
            session_token = authenticate_apic(apic_url, apic_username, apic_password)
    
            headers = {
                'Cookie': f'APIC-cookie={session_token}',
                'Accept': 'application/json',
                'Content-Type': 'application/json'
            }
    
            # Data types to collect
            data_types = ['faultInst', 'eventRecord', 'aaaModLR']
            all_collected_data = []
    
            for data_type in data_types:
                logger.info(f"Collecting {data_type} data")
                collected_data = collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages)
    
                # Tag each record with its type
                for record in collected_data:
                    record['_data_type'] = data_type
    
                all_collected_data.extend(collected_data)
                logger.info(f"Collected {len(collected_data)} {data_type} records")
    
            logger.info(f"Total records collected: {len(all_collected_data)}")
    
            # Store data in S3 if any were collected
            if all_collected_data:
                timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
                s3_key = f"{s3_prefix}cisco_aci_events_{timestamp_str}.ndjson"
    
                # Convert to NDJSON format (one JSON object per line)
                ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data)
    
                # Upload to S3
                s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=s3_key,
                    Body=ndjson_content.encode('utf-8'),
                    ContentType='application/x-ndjson'
                )
    
                logger.info(f"Uploaded {len(all_collected_data)} records to s3://{s3_bucket}/{s3_key}")
    
            # Update state file with latest timestamp from collected data
            latest_timestamp = get_latest_timestamp_from_records(all_collected_data)
            if not latest_timestamp:
                latest_timestamp = datetime.utcnow().isoformat() + 'Z'
    
            update_state(s3_bucket, state_key, latest_timestamp)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Success',
                    'total_records_collected': len(all_collected_data),
                    'data_types_collected': data_types
                })
            }
    
        except Exception as e:
            logger.error(f"Error in lambda_handler: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }
    
    def authenticate_apic(apic_url, username, password):
        """
        Authenticate to APIC and return session token
        """
        login_url = f"{apic_url}/api/aaaLogin.json"
        login_data = {
            "aaaUser": {
                "attributes": {
                    "name": username,
                    "pwd": password
                }
            }
        }
    
        response = http.request(
            'POST',
            login_url,
            body=json.dumps(login_data).encode('utf-8'),
            headers={'Content-Type': 'application/json'},
            timeout=30
        )
    
        if response.status != 200:
            raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}")
    
        response_data = json.loads(response.data.decode('utf-8'))
        token = response_data['imdata'][0]['aaaLogin']['attributes']['token']
        logger.info("Successfully authenticated to APIC")
        return token
    
    def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):
        """
        Collect data from APIC REST API with pagination
        """
        all_data = []
        page = 0
    
        while page < max_pages:
            # Build API URL with pagination and time filters
            api_url = f"{apic_url}/api/class/{data_type}.json"
            params = [
                f'page-size={page_size}',
                f'page={page}',
                f'order-by={data_type}.created|asc'
            ]
    
            # Add time filter for all data types to prevent duplicates
            time_attr = 'created'
            if last_timestamp:
                params.append(f'query-target-filter=gt({data_type}.{time_attr},"{last_timestamp}")')
    
            full_url = f"{api_url}?{'&'.join(params)}"
    
            logger.info(f"Fetching {data_type} page {page} from APIC")
    
            # Make API request
            response = http.request('GET', full_url, headers=headers, timeout=60)
    
            if response.status != 200:
                logger.error(f"API request failed: {response.status} {response.data[:256]!r}")
                break
    
            data = json.loads(response.data.decode('utf-8'))
            records = data.get('imdata', [])
    
            if not records:
                logger.info(f"No more {data_type} records found")
                break
    
            # Extract the actual data from APIC format
            extracted_records = []
            for record in records:
                if data_type in record:
                    extracted_records.append(record[data_type])
    
            all_data.extend(extracted_records)
            page += 1
    
            # If we got less than page_size records, we've reached the end
            if len(records) < page_size:
                break
    
        return all_data
    
    def get_last_timestamp(bucket, state_key):
        """
        Get the last run timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            logger.info("No state file found, starting from 1 hour ago")
            return None
        except Exception as e:
            logger.warning(f"Error reading state file: {str(e)}")
            return None
    
    def get_latest_timestamp_from_records(records):
        """
        Get the latest timestamp from collected records to prevent missing events
        """
        if not records:
            return None
    
        latest = None
        latest_time = None
    
        for record in records:
            try:
                # Handle both direct attributes and nested structure
                attrs = record.get('attributes', record)
                created = attrs.get('created')
                modTs = attrs.get('modTs')  # Fallback for some object types
    
                timestamp = created or modTs
                if timestamp:
                    if latest_time is None or timestamp > latest_time:
                        latest_time = timestamp
                        latest = record
            except Exception as e:
                logger.debug(f"Error parsing timestamp from record: {e}")
                continue
    
        return latest_time
    
    def update_state(bucket, state_key, timestamp):
        """
        Update the state file with the current timestamp
        """
        try:
            state_data = {
                'last_timestamp': timestamp,
                'updated_at': datetime.utcnow().isoformat() + 'Z'
            }
    
            s3_client.put_object(
                Bucket=bucket,
                Key=state_key,
                Body=json.dumps(state_data).encode('utf-8'),
                ContentType='application/json'
            )
    
            logger.info(f"Updated state file with timestamp: {timestamp}")
    
        except Exception as e:
            logger.error(f"Error updating state file: {str(e)}")
    
  5. 구성 > 환경 변수로 이동합니다.

  6. 수정 > 새 환경 변수 추가를 클릭합니다.

  7. 제공된 다음 환경 변수를 입력하고 값으로 바꿉니다.

    • S3_BUCKET: cisco-aci-logs
    • S3_PREFIX: cisco-aci-events/
    • STATE_KEY: cisco-aci-events/state.json
    • APIC_URL: https://apic.example.com
    • APIC_USERNAME: <your-apic-username>
    • APIC_PASSWORD: <your-apic-password>
    • PAGE_SIZE: 100 (선택사항, 페이지로 나누기 크기 제어)
    • MAX_PAGES: 10 (선택사항, 실행당 가져오는 총 페이지 수를 제한함)
  8. 함수가 생성된 후 해당 페이지에 머무르거나 Lambda > Functions > cisco-aci-events-collector를 엽니다.

  9. 구성 탭을 선택합니다.

  10. 일반 구성 패널에서 수정을 클릭합니다.

  11. 제한 시간5분 (300초)로 변경하고 저장을 클릭합니다.

EventBridge 일정 만들기

  1. Amazon EventBridge > 스케줄러 > 일정 만들기로 이동합니다.
  2. 다음 구성 세부정보를 제공합니다.
    • 반복 일정: 요금 (15 minutes)
    • 타겟: Lambda 함수 cisco-aci-events-collector
    • 이름: cisco-aci-events-collector-15m.
  3. 일정 만들기를 클릭합니다.

선택사항: Google SecOps용 읽기 전용 IAM 사용자 및 키 만들기

  1. AWS 콘솔 > IAM > 사용자 > 사용자 추가로 이동합니다.
  2. 사용자 추가를 클릭합니다.
  3. 다음 구성 세부정보를 제공합니다.
    • 사용자: secops-reader를 입력합니다.
    • 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
  4. 사용자 만들기를 클릭합니다.
  5. 최소 읽기 정책(맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
  6. JSON 편집기에 다음 정책을 입력합니다.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::cisco-aci-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::cisco-aci-logs"
        }
      ]
    }
    
  7. 이름을 secops-reader-policy로 설정합니다.

  8. 정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.

  9. 보안용 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.

  10. CSV를 다운로드합니다(이러한 값은 피드에 입력됨).

Cisco ACI 로그를 수집하도록 Google SecOps에서 피드 구성

  1. SIEM 설정> 피드로 이동합니다.
  2. + 새 피드 추가를 클릭합니다.
  3. 피드 이름 필드에 피드 이름을 입력합니다(예: Cisco ACI JSON logs).
  4. 소스 유형으로 Amazon S3 V2를 선택합니다.
  5. 로그 유형으로 Cisco Application Centric Infrastructure를 선택합니다.
  6. 다음을 클릭합니다.
  7. 다음 입력 파라미터의 값을 지정합니다.
    • S3 URI: s3://cisco-aci-logs/cisco-aci-events/
    • 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
    • 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키
    • 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
    • 애셋 네임스페이스: 애셋 네임스페이스입니다.
    • 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
  8. 다음을 클릭합니다.
  9. 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.

UDM 매핑 테이블

로그 필드 UDM 매핑 논리
@timestamp read_only_udm.metadata.event_timestamp 값은 원시 로그 필드 '@timestamp'에서 가져와 타임스탬프로 파싱됩니다.
aci_tag read_only_udm.metadata.product_log_id 값은 원시 로그 필드 'aci_tag'에서 가져옵니다.
cisco_timestamp - 매핑되지 않음
DIP read_only_udm.target.ip 값은 원시 로그 필드 'DIP'에서 가져옵니다.
DPort read_only_udm.target.port 값은 원시 로그 필드 'DPort'에서 가져와 정수로 변환됩니다.
설명 read_only_udm.security_result.description 값은 원시 로그 필드 'description'에서 가져옵니다.
fault_cause read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'fault_cause'에서 가져옵니다. 키가 'Fault Cause'로 설정됩니다.
호스트 이름 read_only_udm.principal.hostname 값은 원시 로그 필드 'hostname'에서 가져옵니다.
lifecycle_state read_only_udm.metadata.product_event_type 값은 원시 로그 필드 'lifecycle_state'에서 가져옵니다.
log.source.address - 매핑되지 않음
logstash.collect.host - 매핑되지 않음
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp 값은 원시 로그 필드 'logstash.collect.timestamp'에서 가져와 타임스탬프로 파싱됩니다.
logstash.ingest.host read_only_udm.intermediary.hostname 값은 원시 로그 필드 'logstash.ingest.host'에서 가져옵니다.
logstash.irm_environment read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'logstash.irm_environment'에서 가져옵니다. 키가 'IRM_Environment'로 설정됩니다.
logstash.irm_region read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'logstash.irm_region'에서 가져옵니다. 키가 'IRM_Region'으로 설정됩니다.
logstash.irm_site read_only_udm.additional.fields.value.string_value 값은 원시 로그 필드 'logstash.irm_site'에서 가져옵니다. 키가 'IRM_Site'로 설정됩니다.
logstash.process.host read_only_udm.intermediary.hostname 값은 원시 로그 필드 'logstash.process.host'에서 가져옵니다.
메시지 - 매핑되지 않음
message_class - 매핑되지 않음
message_code - 매핑되지 않음
message_content - 매핑되지 않음
message_dn - 매핑되지 않음
message_type read_only_udm.metadata.product_event_type 값은 대괄호를 삭제한 후 원시 로그 필드 'message_type'에서 가져옵니다.
node_link read_only_udm.principal.process.file.full_path 값은 원시 로그 필드 'node_link'에서 가져옵니다.
PktLen read_only_udm.network.received_bytes 값은 원시 로그 필드 'PktLen'에서 가져와 부호 없는 정수로 변환됩니다.
프로그램 - 매핑되지 않음
Proto read_only_udm.network.ip_protocol 값은 원시 로그 필드 'Proto'에서 가져와 정수로 변환되고 해당 IP 프로토콜 이름 (예: 6 -> TCP).
SIP read_only_udm.principal.ip 값은 원시 로그 필드 'SIP'에서 가져옵니다.
SPort read_only_udm.principal.port 값은 원시 로그 필드 'SPort'에서 가져와 정수로 변환됩니다.
syslog_facility - 매핑되지 않음
syslog_facility_code - 매핑되지 않음
syslog_host read_only_udm.principal.ip, read_only_udm.observer.ip 값은 원시 로그 필드 'syslog_host'에서 가져옵니다.
syslog_prog - 매핑되지 않음
syslog_severity read_only_udm.security_result.severity_details 값은 원시 로그 필드 'syslog_severity'에서 가져옵니다.
syslog_severity_code read_only_udm.security_result.severity 값은 원시 로그 필드 'syslog_severity_code'에서 가져와 해당 심각도 수준에 매핑됩니다. 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH
syslog5424_pri - 매핑되지 않음
Vlan-Id read_only_udm.principal.resource.id 값은 원시 로그 필드 'Vlan-Id'에서 가져옵니다.
- read_only_udm.metadata.event_type 로직: 'SIP' 또는 'hostname'이 있고 'Proto'가 있으면 'NETWORK_CONNECTION'으로 설정합니다. 'SIP', 'hostname', 'syslog_host'가 있는 경우 'STATUS_UPDATE'로 설정합니다. 그렇지 않으면 'GENERIC_EVENT'로 설정합니다.
- read_only_udm.metadata.log_type 로직: 'CISCO_ACI'로 설정합니다.
- read_only_udm.metadata.vendor_name 로직: 'Cisco'로 설정됩니다.
- read_only_udm.metadata.product_name 로직: 'ACI'로 설정합니다.

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.