Cisco Application Centric Infrastructure (ACI) 로그 수집
이 문서에서는 Cisco Application Centric Infrastructure (ACI) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 먼저 Grok 패턴을 사용하여 수신되는 Cisco ACI 로그를 syslog 메시지로 처리하려고 시도합니다. syslog 파싱이 실패하면 메시지가 JSON 형식이라고 가정하고 그에 따라 파싱합니다. 마지막으로 추출된 필드를 통합 데이터 모델 (UDM)에 매핑합니다.
이 통합은 다음 두 가지 방법을 지원합니다.
- 옵션 1: Bindplane 에이전트를 사용하는 Syslog 형식
- 옵션 2: APIC REST API를 사용하여 Google Cloud Storage를 사용하는 JSON 형식
각 옵션은 독립적이며 인프라 요구사항과 로그 형식 환경설정에 따라 독립적으로 구현할 수 있습니다.
옵션 1: Bindplane 에이전트를 사용하는 Syslog
이 옵션은 Cisco ACI 패브릭이 syslog 메시지를 Bindplane 에이전트로 전송하도록 구성합니다. Bindplane 에이전트는 메시지를 Google Security Operations로 전달하여 분석합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
systemd가 설치된 Windows 2016 이상 또는 Linux 호스트- 프록시 뒤에서 실행하는 경우 Bindplane 에이전트 요구사항에 따라 방화벽 포트가 열려 있는지 확인합니다.
- Cisco APIC 콘솔에 대한 권한 있는 액세스
Google SecOps 수집 인증 파일 가져오기
- SIEM 설정 > 수집 에이전트로 이동합니다.
- 수집 인증 파일을 다운로드합니다.
- Bindplane이 설치될 시스템에 파일을 안전하게 저장합니다.
Google SecOps 고객 ID 가져오기
- SIEM 설정 > 프로필로 이동합니다.
- 조직 세부정보 섹션에서 고객 ID를 복사하여 저장합니다.
Bindplane 에이전트 설치
다음 안내에 따라 Windows 또는 Linux 운영체제에 Bindplane 에이전트를 설치합니다.
Windows 설치
- 관리자 권한으로 명령 프롬프트 또는 PowerShell을 엽니다.
다음 명령어를 실행합니다.
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Linux 설치
- 루트 또는 sudo 권한으로 터미널을 엽니다.
다음 명령어를 실행합니다.
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
추가 설치 옵션은 Bindplane 에이전트 설치 가이드를 참고하세요.
Syslog를 수집하여 Google SecOps로 전송하도록 Bindplane 에이전트 구성
구성 파일 액세스
config.yaml파일을 찾습니다. 일반적으로 Linux에서는/etc/bindplane-agent/디렉터리에 있고 Windows에서는 설치 디렉터리에 있습니다.- 텍스트 편집기 (예:
nano,vi, 메모장)를 사용하여 파일을 엽니다. config.yaml 파일을 수정합니다.
receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'CISCO_ACI' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- 다음을 바꿉니다.
- 인프라에 필요한 대로 포트와 IP 주소를 바꿉니다.
<CUSTOMER_ID>를 실제 고객 ID로 바꿉니다./path/to/ingestion-authentication-file.json를 인증 파일이 저장된 경로로 업데이트합니다.
- 다음을 바꿉니다.
Bindplane 에이전트를 다시 시작하여 변경사항 적용
Linux에서 Bindplane 에이전트를 다시 시작하려면 다음 명령어를 실행합니다.
sudo systemctl restart bindplane-agentWindows에서 Bindplane 에이전트를 다시 시작하려면 서비스 콘솔을 사용하거나 다음 명령어를 입력하면 됩니다.
net stop BindPlaneAgent && net start BindPlaneAgent
Cisco ACI에서 Syslog 전달 구성
대역 외 관리 계약 구성
- Cisco APIC 콘솔에 로그인합니다.
- 테넌트 > 관리 > 계약 > 필터로 이동합니다.
- 필터 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름 —
syslog-udp-514를 입력합니다. - 항목 이름:
syslog를 입력합니다. - EtherType: IP를 선택합니다.
- IP 프로토콜: UDP를 선택합니다.
- 대상 포트 범위 시작:
514을 입력합니다. - 대상 포트 범위:
514를 입력합니다.
- 이름 —
- 제출을 클릭합니다.
관리 계약 만들기
- 테넌트 > 관리 > 계약 > 표준으로 이동합니다.
- 계약 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름 —
mgmt-syslog-contract를 입력합니다. - 범위: 컨텍스트를 선택합니다.
- 이름 —
- 제출을 클릭합니다.
- 계약을 펼치고 과목을 클릭합니다.
- 계약 주제 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름 —
syslog-subject를 입력합니다. - 양방향 적용: 이 옵션을 선택합니다.
- 이름 —
- 제출을 클릭합니다.
- 주제를 펼치고 필터를 클릭합니다.
- 필터 바인딩 만들기를 클릭합니다.
syslog-udp-514필터를 선택합니다.- 제출을 클릭합니다.
Syslog 대상 그룹 구성
- 관리자 > 외부 데이터 수집기 > 모니터링 대상 > Syslog로 이동합니다.
- Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Monitoring Destination Group(Syslog 모니터링 대상 그룹 만들기)을 선택합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름 —
Chronicle-Syslog-Group를 입력합니다. - 관리자 상태: 사용을 선택합니다.
- 형식: aci를 선택합니다.
- 이름 —
- 다음을 클릭합니다.
- 시스템로그 모니터링 대상 만들기 대화상자에서 다음을 수행합니다.
- 이름 —
Chronicle-BindPlane를 입력합니다. - 호스트: Bindplane 에이전트 서버의 IP 주소를 입력합니다.
- 포트:
514를 입력합니다. - 관리자 상태: 사용을 선택합니다.
- 심각도: 정보를 선택하여 자세한 로그를 캡처합니다.
- 이름 —
- 제출을 클릭합니다.
모니터링 정책 구성
패브릭 모니터링 정책
- 패브릭 > 패브릭 정책 > 정책 > 모니터링 > 공통 정책으로 이동합니다.
- Callhome/Smart Callhome/SNMP/Syslog/TACACS를 펼칩니다.
- Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Source를 선택합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름 —
Chronicle-Fabric-Syslog를 입력합니다. - 감사 로그: 감사 이벤트를 포함하는지 확인합니다.
- 이벤트: 시스템 이벤트를 포함하려면 선택합니다.
- 오류: 오류 이벤트를 포함할지 선택합니다.
- 세션 로그: 세션 로그를 포함하려면 선택합니다.
- 대상 그룹:
Chronicle-Syslog-Group를 선택합니다.
- 이름 —
- 제출을 클릭합니다.
액세스 모니터링 정책
- 패브릭 > 액세스 정책 > 정책 > 모니터링 > 기본 정책으로 이동합니다.
- Callhome/Smart Callhome/SNMP/Syslog를 펼칩니다.
- Syslog를 마우스 오른쪽 버튼으로 클릭하고 Create Syslog Source를 선택합니다.
- 다음 구성 세부정보를 제공합니다.
- 이름 —
Chronicle-Access-Syslog를 입력합니다. - 감사 로그: 감사 이벤트를 포함하는지 확인합니다.
- 이벤트: 시스템 이벤트를 포함하려면 선택합니다.
- 오류: 오류 이벤트를 포함할지 선택합니다.
- 세션 로그: 세션 로그를 포함하려면 선택합니다.
- 대상 그룹:
Chronicle-Syslog-Group를 선택합니다.
- 이름 —
- 제출을 클릭합니다.
시스템 Syslog 메시지 정책 구성
- 패브릭 > 패브릭 정책 > 정책 > 모니터링 > 공통 정책으로 이동합니다.
- Syslog Messages Policies를 펼칩니다.
- 기본값을 클릭합니다.
- 시설 필터 섹션에서 다음을 수행합니다.
- 시설: 기본값을 선택합니다.
- 최소 심각도: 정보로 변경합니다.
- 제출을 클릭합니다.
옵션 2: Google Cloud Storage를 사용하는 JSON
이 옵션은 APIC REST API를 사용하여 Cisco ACI 패브릭에서 JSON 형식 이벤트, 결함, 감사 로그를 수집하고 Google SecOps 수집을 위해 Google Cloud Storage에 저장합니다.
시작하기 전에
다음 기본 요건이 충족되었는지 확인합니다.
- Google SecOps 인스턴스
- Cloud Storage API가 사용 설정된 GCP 프로젝트
- GCS 버킷을 만들고 관리할 수 있는 권한
- GCS 버킷의 IAM 정책을 관리할 수 있는 권한
- Cloud Run 서비스, Pub/Sub 주제, Cloud Scheduler 작업을 만들 수 있는 권한
- Cisco APIC 콘솔에 대한 권한 있는 액세스
Cisco ACI APIC 기본 요건 수집
APIC 사용자 인증 정보 가져오기
- HTTPS를 사용하여 Cisco APIC 콘솔에 로그인합니다.
관리 > AAA (APIC 6.0 이상) 또는 관리 > 인증 > AAA (이전 버전)로 이동합니다.
적절한 권한이 있는 로컬 사용자를 만들거나 기존 사용자를 사용합니다.
다음 세부정보를 복사하여 안전한 위치에 저장합니다.
- APIC 사용자 이름: 모니터링 데이터에 대한 읽기 액세스 권한이 있는 로컬 사용자
- APIC 비밀번호: 사용자 비밀번호
- APIC URL: APIC의 HTTPS URL (예:
https://apic.example.com)
Google Cloud Storage 버킷 만들기
- Google Cloud 콘솔로 이동합니다.
- 프로젝트를 선택하거나 새 프로젝트를 만듭니다.
- 탐색 메뉴에서 Cloud Storage> 버킷으로 이동합니다.
- 버킷 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 버킷 이름 지정 전역적으로 고유한 이름 (예: cisco-aci-logs)을 입력합니다.위치 유형 필요에 따라 선택 (리전, 이중 리전, 멀티 리전) 위치 위치를 선택합니다 (예: us-central1).스토리지 클래스 Standard (자주 액세스하는 로그에 권장) 액세스 제어 균일 (권장) 보호 도구 선택사항: 객체 버전 관리 또는 보관 정책 사용 설정 만들기를 클릭합니다.
Cloud Run 함수의 서비스 계정 만들기
Cloud Run 함수에는 GCS 버킷에 쓸 수 있고 Pub/Sub에서 호출할 수 있는 권한이 있는 서비스 계정이 필요합니다.
서비스 계정 만들기
- GCP 콘솔에서 IAM 및 관리자 > 서비스 계정으로 이동합니다.
- 서비스 계정 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 서비스 계정 이름:
cisco-aci-collector-sa을 입력합니다. - 서비스 계정 설명:
Service account for Cloud Run function to collect Cisco ACI logs을 입력합니다.
- 서비스 계정 이름:
- 만들고 계속하기를 클릭합니다.
- 이 서비스 계정에 프로젝트에 대한 액세스 권한 부여 섹션에서 다음 역할을 추가합니다.
- 역할 선택을 클릭합니다.
- 스토리지 객체 관리자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Run 호출자를 검색하여 선택합니다.
- + 다른 역할 추가를 클릭합니다.
- Cloud Functions 호출자를 검색하여 선택합니다.
- 계속을 클릭합니다.
- 완료를 클릭합니다.
이러한 역할은 다음 작업에 필요합니다.
- 스토리지 객체 관리자: GCS 버킷에 로그를 쓰고 상태 파일을 관리합니다.
- Cloud Run 호출자: Pub/Sub가 함수를 호출하도록 허용
- Cloud Functions 호출자: 함수 호출 허용
GCS 버킷에 대한 IAM 권한 부여
서비스 계정 (cisco-aci-collector-sa)에 GCS 버킷에 대한 쓰기 권한을 부여합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름 (예:
cisco-aci-logs)을 클릭합니다. - 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
cisco-aci-collector-sa@PROJECT_ID.iam.gserviceaccount.com)을 입력합니다. - 역할 할당: 스토리지 객체 관리자를 선택합니다.
- 주 구성원 추가: 서비스 계정 이메일 (예:
- 저장을 클릭합니다.
게시/구독 주제 만들기
Cloud Scheduler가 게시하고 Cloud Run 함수가 구독할 Pub/Sub 주제를 만듭니다.
- GCP Console에서 Pub/Sub > 주제로 이동합니다.
- 주제 만들기를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주제 ID:
cisco-aci-trigger를 입력합니다. - 다른 설정은 기본값으로 둡니다.
- 주제 ID:
- 만들기를 클릭합니다.
로그를 수집하는 Cloud Run 함수 만들기
Cloud Run 함수는 Cloud Scheduler의 Pub/Sub 메시지에 의해 트리거되어 Cisco APIC REST API에서 로그를 가져와 GCS에 씁니다.
- GCP 콘솔에서 Cloud Run으로 이동합니다.
- 서비스 만들기를 클릭합니다.
- 함수를 선택합니다 (인라인 편집기를 사용하여 함수 만들기).
구성 섹션에서 다음 구성 세부정보를 제공합니다.
설정 값 서비스 이름 cisco-aci-collector리전 GCS 버킷과 일치하는 리전을 선택합니다 (예: us-central1).런타임 Python 3.12 이상 선택 트리거 (선택사항) 섹션에서 다음을 수행합니다.
- + 트리거 추가를 클릭합니다.
- Cloud Pub/Sub를 선택합니다.
- Cloud Pub/Sub 주제 선택에서 Pub/Sub 주제 (
cisco-aci-trigger)를 선택합니다. - 저장을 클릭합니다.
인증 섹션에서 다음을 구성합니다.
- 인증 필요를 선택합니다.
- Identity and Access Management (IAM)를 선택합니다.
- 컨테이너, 네트워킹, 보안으로 스크롤하여 펼칩니다.
- 보안 탭으로 이동합니다.
- 서비스 계정: 서비스 계정 (
cisco-aci-collector-sa)을 선택합니다.
- 서비스 계정: 서비스 계정 (
컨테이너 탭으로 이동합니다.
- 변수 및 보안 비밀을 클릭합니다.
각 환경 변수에 대해 + 변수 추가를 클릭합니다.
변수 이름 예시 값 설명 GCS_BUCKETcisco-aci-logsGCS 버킷 이름 GCS_PREFIXcisco-aci-events로그 파일의 접두사 STATE_KEYcisco-aci-events/state.json상태 파일 경로 APIC_URLhttps://apic.example.comAPIC HTTPS URL APIC_USERNAMEyour-apic-usernameAPIC 사용자 이름 APIC_PASSWORDyour-apic-passwordAPIC 비밀번호 PAGE_SIZE100페이지당 레코드 수 MAX_PAGES10실행당 최대 페이지 수
변수 및 보안 비밀 섹션에서 요청으로 스크롤합니다.
- 요청 제한 시간:
300초 (5분)를 입력합니다.
- 요청 제한 시간:
설정 탭으로 이동합니다.
- 리소스 섹션에서 다음을 수행합니다.
- 메모리: 512MiB 이상을 선택합니다.
- CPU: 1을 선택합니다.
- 리소스 섹션에서 다음을 수행합니다.
버전 확장 섹션에서 다음을 수행합니다.
- 최소 인스턴스 수:
0를 입력합니다. - 최대 인스턴스 수:
100을 입력합니다 (또는 예상 부하에 따라 조정).
- 최소 인스턴스 수:
만들기를 클릭합니다.
서비스가 생성될 때까지 기다립니다 (1~2분).
서비스가 생성되면 인라인 코드 편집기가 자동으로 열립니다.
함수 코드 추가
- 진입점 필드에 main을 입력합니다.
인라인 코드 편집기에서 다음 두 파일을 만듭니다.
- 첫 번째 파일: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import logging # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=60.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'cisco-aci-events') STATE_KEY = os.environ.get('STATE_KEY', 'cisco-aci-events/state.json') APIC_URL = os.environ.get('APIC_URL') APIC_USERNAME = os.environ.get('APIC_USERNAME') APIC_PASSWORD = os.environ.get('APIC_PASSWORD') PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) MAX_PAGES = int(os.environ.get('MAX_PAGES', '10')) @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch Cisco ACI logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, APIC_URL, APIC_USERNAME, APIC_PASSWORD]): logger.error('Error: Missing required environment variables') return try: # Get GCS bucket bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window last_timestamp = state.get('last_timestamp') if not last_timestamp: last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z' logger.info(f"Starting Cisco ACI data collection for bucket: {GCS_BUCKET}") # Authenticate to APIC session_token = authenticate_apic(APIC_URL, APIC_USERNAME, APIC_PASSWORD) headers = { 'Cookie': f'APIC-cookie={session_token}', 'Accept': 'application/json', 'Content-Type': 'application/json' } # Data types to collect data_types = ['faultInst', 'eventRecord', 'aaaModLR'] all_collected_data = [] for data_type in data_types: logger.info(f"Collecting {data_type} data") collected_data = collect_aci_data( APIC_URL, headers, data_type, last_timestamp, PAGE_SIZE, MAX_PAGES ) # Tag each record with its type for record in collected_data: record['_data_type'] = data_type all_collected_data.extend(collected_data) logger.info(f"Collected {len(collected_data)} {data_type} records") logger.info(f"Total records collected: {len(all_collected_data)}") # Store data in GCS if any were collected if all_collected_data: timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S') s3_key = f"{GCS_PREFIX}/cisco_aci_events_{timestamp_str}.ndjson" # Convert to NDJSON format (one JSON object per line) ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data) # Upload to GCS blob = bucket.blob(s3_key) blob.upload_from_string( ndjson_content, content_type='application/x-ndjson' ) logger.info(f"Uploaded {len(all_collected_data)} records to gs://{GCS_BUCKET}/{s3_key}") # Update state file with latest timestamp from collected data latest_timestamp = get_latest_timestamp_from_records(all_collected_data) if not latest_timestamp: latest_timestamp = datetime.utcnow().isoformat() + 'Z' update_state(bucket, STATE_KEY, latest_timestamp) else: logger.info("No new log records found.") logger.info(f"Successfully processed {len(all_collected_data)} records") except Exception as e: logger.error(f'Error processing logs: {str(e)}') raise def authenticate_apic(apic_url, username, password): """Authenticate to APIC and return session token""" login_url = f"{apic_url}/api/aaaLogin.json" login_data = { "aaaUser": { "attributes": { "name": username, "pwd": password } } } response = http.request( 'POST', login_url, body=json.dumps(login_data).encode('utf-8'), headers={'Content-Type': 'application/json'}, timeout=30 ) if response.status != 200: raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) token = response_data['imdata'][0]['aaaLogin']['attributes']['token'] logger.info("Successfully authenticated to APIC") return token def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages): """Collect data from APIC REST API with pagination""" all_data = [] page = 0 while page < max_pages: # Build API URL with pagination and time filters api_url = f"{apic_url}/api/class/{data_type}.json" params = [ f'page-size={page_size}', f'page={page}', f'order-by={data_type}.created|asc' ] # Add time filter to prevent duplicates if last_timestamp: params.append(f'query-target-filter=gt({data_type}.created,"{last_timestamp}")') full_url = f"{api_url}?{'&'.join(params)}" logger.info(f"Fetching {data_type} page {page} from APIC") # Make API request response = http.request('GET', full_url, headers=headers, timeout=60) if response.status != 200: logger.error(f"API request failed: {response.status} {response.data[:256]!r}") break data = json.loads(response.data.decode('utf-8')) records = data.get('imdata', []) if not records: logger.info(f"No more {data_type} records found") break # Extract the actual data from APIC format extracted_records = [] for record in records: if data_type in record: extracted_records.append(record[data_type]) all_data.extend(extracted_records) page += 1 # If we got less than page_size records, we've reached the end if len(records) < page_size: break return all_data def get_last_timestamp(bucket, state_key): """Get the last run timestamp from GCS state file""" try: blob = bucket.blob(state_key) if blob.exists(): state_data = blob.download_as_text() state = json.loads(state_data) return state.get('last_timestamp') except Exception as e: logger.warning(f"Error reading state file: {str(e)}") return None def get_latest_timestamp_from_records(records): """Get the latest timestamp from collected records to prevent missing events""" if not records: return None latest = None latest_time = None for record in records: try: # Handle both direct attributes and nested structure attrs = record.get('attributes', record) created = attrs.get('created') modTs = attrs.get('modTs') # Use created or modTs as fallback timestamp = created or modTs if timestamp: if latest_time is None or timestamp > latest_time: latest_time = timestamp latest = record except Exception as e: logger.debug(f"Error parsing timestamp from record: {e}") continue return latest_time def update_state(bucket, state_key, timestamp): """Update the state file with the current timestamp""" try: state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } blob = bucket.blob(state_key) blob.upload_from_string( json.dumps(state_data), content_type='application/json' ) logger.info(f"Updated state file with timestamp: {timestamp}") except Exception as e: logger.error(f"Error updating state file: {str(e)}") def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: logger.warning(f"Could not load state: {e}") return {}- 두 번째 파일: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0배포를 클릭하여 함수를 저장하고 배포합니다.
배포가 완료될 때까지 기다립니다 (2~3분).
Cloud Scheduler 작업 만들기
Cloud Scheduler는 일정 간격으로 Pub/Sub 주제에 메시지를 게시하여 Cloud Run 함수를 트리거합니다.
- GCP Console에서 Cloud Scheduler로 이동합니다.
- 작업 만들기를 클릭합니다.
다음 구성 세부정보를 제공합니다.
설정 값 이름 cisco-aci-collector-15m리전 Cloud Run 함수와 동일한 리전 선택 주파수 */15 * * * *(15분마다)시간대 시간대 선택 (UTC 권장) 타겟 유형 Pub/Sub 주제 Pub/Sub 주제 ( cisco-aci-trigger)를 선택합니다.메일 본문 {}(빈 JSON 객체)만들기를 클릭합니다.
일정 빈도 옵션
로그 볼륨 및 지연 시간 요구사항에 따라 빈도를 선택합니다.
빈도 크론 표현식 사용 사례 5분마다 */5 * * * *대용량, 저지연 15분마다 */15 * * * *검색량 보통 (권장) 1시간마다 0 * * * *표준 6시간마다 0 */6 * * *양이 적은 일괄 처리 매일 0 0 * * *이전 데이터 수집
통합 테스트
- Cloud Scheduler 콘솔에서 작업을 찾습니다 (
cisco-aci-collector-15m). - 강제 실행을 클릭하여 작업을 수동으로 트리거합니다.
- 몇 초 동안 기다립니다.
- Cloud Run > 서비스로 이동합니다.
- 함수 이름 (
cisco-aci-collector)을 클릭합니다. - 로그 탭을 클릭합니다.
함수가 성공적으로 실행되었는지 확인합니다. 다음을 확인하세요.
Starting Cisco ACI data collection for bucket: cisco-aci-logs Successfully authenticated to APIC Collecting faultInst data Collected X faultInst records Collecting eventRecord data Collected X eventRecord records Collecting aaaModLR data Collected X aaaModLR records Total records collected: X Uploaded X records to gs://cisco-aci-logs/cisco-aci-events/cisco_aci_events_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsCloud Storage> 버킷으로 이동합니다.
버킷 이름 (
cisco-aci-logs)을 클릭합니다.접두사 폴더 (
cisco-aci-events/)로 이동합니다.현재 타임스탬프를 사용하여 새
.ndjson파일이 생성되었는지 확인합니다.
로그에 오류가 표시되면 다음 단계를 따르세요.
- HTTP 401: 환경 변수에서 APIC 사용자 인증 정보 확인
- HTTP 403: APIC 계정에
faultInst,eventRecord,aaaModLR클래스에 대한 읽기 권한이 있는지 확인 - 연결 오류: Cloud Run 함수가 TCP/443에서 APIC URL에 연결할 수 있는지 확인
- 환경 변수 누락: 필수 변수가 모두 설정되었는지 확인
Google SecOps 서비스 계정 가져오기
Google SecOps는 고유한 서비스 계정을 사용하여 GCS 버킷에서 데이터를 읽습니다. 이 서비스 계정에 버킷에 대한 액세스 권한을 부여해야 합니다.
서비스 계정 이메일 가져오기
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Cisco ACI JSON logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Cisco Application Centric Infrastructure를 선택합니다.
서비스 계정 가져오기를 클릭합니다. 고유한 서비스 계정 이메일이 표시됩니다. 예를 들면 다음과 같습니다.
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com이 이메일 주소를 복사합니다. 다음 단계에서 사용합니다.
Google SecOps 서비스 계정에 IAM 권한 부여
Google SecOps 서비스 계정에는 GCS 버킷에 대한 스토리지 객체 뷰어 역할이 필요합니다.
- Cloud Storage> 버킷으로 이동합니다.
- 버킷 이름 (
cisco-aci-logs)을 클릭합니다. - 권한 탭으로 이동합니다.
- 액세스 권한 부여를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 주 구성원 추가: Google SecOps 서비스 계정 이메일을 붙여넣습니다.
- 역할 할당: 스토리지 객체 뷰어를 선택합니다.
- 저장을 클릭합니다.
Cisco ACI 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- 새 피드 추가를 클릭합니다.
- 단일 피드 구성을 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다(예:
Cisco ACI JSON logs). - 소스 유형으로 Google Cloud Storage V2를 선택합니다.
- 로그 유형으로 Cisco Application Centric Infrastructure를 선택합니다.
- 다음을 클릭합니다.
다음 입력 매개변수의 값을 지정합니다.
스토리지 버킷 URL: 다음 접두사 경로를 사용하여 GCS 버킷 URI를 입력합니다.
gs://cisco-aci-logs/cisco-aci-events/- 다음과 같이 바꿉니다.
cisco-aci-logs: GCS 버킷 이름입니다.cisco-aci-events: 로그가 저장되는 접두사/폴더 경로입니다.
- 다음과 같이 바꿉니다.
소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 삭제 안함: 전송 후 파일을 삭제하지 않습니다 (테스트에 권장).
- 전송된 파일 삭제: 전송이 완료되면 파일을 삭제합니다.
- 전송된 파일 및 빈 디렉터리 삭제: 전송이 완료되면 파일과 빈 디렉터리를 삭제합니다.
최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
애셋 네임스페이스: 애셋 네임스페이스입니다.
수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
다음을 클릭합니다.
확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
| 로그 필드 | UDM 매핑 | 로직 |
|---|---|---|
| @timestamp | read_only_udm.metadata.event_timestamp | 값은 원시 로그 필드 '@timestamp'에서 가져와 타임스탬프로 파싱됩니다. |
| aci_tag | read_only_udm.metadata.product_log_id | 값은 원시 로그 필드 'aci_tag'에서 가져옵니다. |
| cisco_timestamp | - | 매핑되지 않음 |
| DIP | read_only_udm.target.ip | 값은 원시 로그 필드 'DIP'에서 가져옵니다. |
| DPort | read_only_udm.target.port | 값은 원시 로그 필드 'DPort'에서 가져와 정수로 변환됩니다. |
| 설명 | read_only_udm.security_result.description | 값은 원시 로그 필드 'description'에서 가져옵니다. |
| fault_cause | read_only_udm.additional.fields.value.string_value | 값은 원시 로그 필드 'fault_cause'에서 가져옵니다. 키가 'Fault Cause'로 설정됩니다. |
| hostname | read_only_udm.principal.hostname | 값은 원시 로그 필드 'hostname'에서 가져옵니다. |
| lifecycle_state | read_only_udm.metadata.product_event_type | 값은 원시 로그 필드 'lifecycle_state'에서 가져옵니다. |
| log.source.address | - | 매핑되지 않음 |
| logstash.collect.host | - | 매핑되지 않음 |
| logstash.collect.timestamp | read_only_udm.metadata.collected_timestamp | 값은 원시 로그 필드 'logstash.collect.timestamp'에서 가져와 타임스탬프로 파싱됩니다. |
| logstash.ingest.host | read_only_udm.intermediary.hostname | 값은 원시 로그 필드 'logstash.ingest.host'에서 가져옵니다. |
| logstash.irm_environment | read_only_udm.additional.fields.value.string_value | 값은 원시 로그 필드 'logstash.irm_environment'에서 가져옵니다. 키가 'IRM_Environment'로 설정됩니다. |
| logstash.irm_region | read_only_udm.additional.fields.value.string_value | 값은 원시 로그 필드 'logstash.irm_region'에서 가져옵니다. 키가 'IRM_Region'으로 설정됩니다. |
| logstash.irm_site | read_only_udm.additional.fields.value.string_value | 값은 원시 로그 필드 'logstash.irm_site'에서 가져옵니다. 키가 'IRM_Site'로 설정됩니다. |
| logstash.process.host | read_only_udm.intermediary.hostname | 값은 원시 로그 필드 'logstash.process.host'에서 가져옵니다. |
| 메시지 | - | 매핑되지 않음 |
| message_class | - | 매핑되지 않음 |
| message_code | - | 매핑되지 않음 |
| message_content | - | 매핑되지 않음 |
| message_dn | - | 매핑되지 않음 |
| message_type | read_only_udm.metadata.product_event_type | 값은 대괄호를 삭제한 후 원시 로그 필드 'message_type'에서 가져옵니다. |
| node_link | read_only_udm.principal.process.file.full_path | 값은 원시 로그 필드 'node_link'에서 가져옵니다. |
| PktLen | read_only_udm.network.received_bytes | 값은 원시 로그 필드 'PktLen'에서 가져와 부호 없는 정수로 변환됩니다. |
| 프로그램 | - | 매핑되지 않음 |
| Proto | read_only_udm.network.ip_protocol | 값은 원시 로그 필드 'Proto'에서 가져와 정수로 변환되고 해당 IP 프로토콜 이름 (예: 6 -> TCP). |
| SIP | read_only_udm.principal.ip | 값은 원시 로그 필드 'SIP'에서 가져옵니다. |
| SPort | read_only_udm.principal.port | 값은 원시 로그 필드 'SPort'에서 가져와 정수로 변환됩니다. |
| syslog_facility | - | 매핑되지 않음 |
| syslog_facility_code | - | 매핑되지 않음 |
| syslog_host | read_only_udm.principal.ip, read_only_udm.observer.ip | 값은 원시 로그 필드 'syslog_host'에서 가져옵니다. |
| syslog_prog | - | 매핑되지 않음 |
| syslog_severity | read_only_udm.security_result.severity_details | 값은 원시 로그 필드 'syslog_severity'에서 가져옵니다. |
| syslog_severity_code | read_only_udm.security_result.severity | 값은 원시 로그 필드 'syslog_severity_code'에서 가져와 해당 심각도 수준에 매핑됩니다. 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH |
| syslog5424_pri | - | 매핑되지 않음 |
| Vlan-Id | read_only_udm.principal.resource.id | 값은 원시 로그 필드 'Vlan-Id'에서 가져옵니다. |
| - | read_only_udm.metadata.event_type | 로직: 'SIP' 또는 'hostname'이 있고 'Proto'가 있으면 'NETWORK_CONNECTION'으로 설정합니다. 'SIP', 'hostname', 'syslog_host'가 있는 경우 'STATUS_UPDATE'로 설정합니다. 그 외의 경우에는 'GENERIC_EVENT'로 설정합니다. |
| - | read_only_udm.metadata.log_type | 로직: 'CISCO_ACI'로 설정합니다. |
| - | read_only_udm.metadata.vendor_name | 로직: 'Cisco'로 설정됩니다. |
| - | read_only_udm.metadata.product_name | 논리: 'ACI'로 설정합니다. |
도움이 더 필요한가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.