Elasticsearch 로그 수집
이 문서에서는 Amazon S3를 사용하여 Elasticsearch 로그를 Google Security Operations에 수집하는 방법을 설명합니다. 파서는 원시 JSON 형식 로그를 통합 데이터 모델 (UDM)로 변환합니다. 중첩된 JSON 구조에서 필드를 추출하고, UDM 필드에 매핑하고, 심각도 수준 및 사용자 역할과 같은 보안 관련 컨텍스트로 데이터를 보강합니다.
시작하기 전에
- Google SecOps 인스턴스
- Elasticsearch 클러스터 관리에 대한 권한 있는 액세스
- AWS (S3, IAM, EC2)에 대한 권한 있는 액세스
- Logstash를 실행할 EC2 인스턴스 또는 영구 호스트
Elasticsearch 기본 요건 가져오기
- 관리자로 Elasticsearch 클러스터에 로그인합니다.
- Elasticsearch 구독에 보안 기능이 포함되어 있는지 확인합니다 (감사 로깅에 필요).
- 참고할 수 있도록 Elasticsearch 클러스터 이름과 버전을 기록해 둡니다.
- 감사 로그가 기록될 경로를 식별합니다 (기본값:
$ES_HOME/logs/<clustername>_audit.json
).
Elasticsearch 감사 로깅 사용 설정
- 각 Elasticsearch 노드에서 elasticsearch.yml 구성 파일을 수정합니다.
다음 설정을 추가합니다.
xpack.security.audit.enabled: true
클러스터의 지속적 재시작을 실행하여 변경사항을 적용합니다.
- 샤드 할당 사용 중지:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- 각 노드를 한 번에 하나씩 중지했다가 다시 시작합니다.
- 샤드 할당을 다시 사용 설정합니다.
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- 샤드 할당 사용 중지:
감사 로그가 로그 디렉터리의
<clustername>_audit.json
에 생성되는지 확인합니다.
Google SecOps용 AWS S3 버킷 및 IAM 구성
- 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
- 나중에 참조할 수 있도록 버킷 이름과 리전을 저장합니다 (예:
elastic-search-logs
). - 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
- 생성된 사용자를 선택합니다.
- 보안 사용자 인증 정보 탭을 선택합니다.
- 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
- 사용 사례로 서드 파티 서비스를 선택합니다.
- 다음을 클릭합니다.
- 선택사항: 설명 태그를 추가합니다.
- 액세스 키 만들기를 클릭합니다.
- CSV 파일 다운로드를 클릭하여 향후 참조할 수 있도록 액세스 키와 비밀 액세스 키를 저장합니다.
- 완료를 클릭합니다.
- 권한 탭을 선택합니다.
- 권한 정책 섹션에서 권한 추가를 클릭합니다.
- 권한 추가를 선택합니다.
- 정책 직접 연결을 선택합니다.
- AmazonS3FullAccess 정책을 검색합니다.
- 정책을 선택합니다.
- 다음을 클릭합니다.
- 권한 추가를 클릭합니다.
감사 로그를 S3로 전송하도록 Logstash 구성
- Elasticsearch 감사 로그 파일에 액세스할 수 있는 EC2 인스턴스 또는 영구 호스트에 Logstash를 설치합니다.
S3 출력 플러그인이 아직 없는 경우 설치합니다.
bin/logstash-plugin install logstash-output-s3
Logstash 구성 파일 (
elastic-to-s3.conf
)을 만듭니다.input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
다음 구성으로 Logstash를 시작합니다.
bin/logstash -f elastic-to-s3.conf
선택사항: Google SecOps용 읽기 전용 IAM 사용자 만들기
- AWS 콘솔 > IAM > 사용자 > 사용자 추가로 이동합니다.
- Add users를 클릭합니다.
- 다음 구성 세부정보를 제공합니다.
- 사용자:
secops-reader
를 입력합니다. - 액세스 유형: 액세스 키 – 프로그래매틱 액세스를 선택합니다.
- 사용자:
- 사용자 만들기를 클릭합니다.
- 최소 읽기 정책 (맞춤) 연결: 사용자 > secops-reader > 권한 > 권한 추가 > 정책 직접 연결 > 정책 만들기
JSON 편집기에서 다음 정책을 입력합니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
이름을
secops-reader-policy
로 설정합니다.정책 만들기 > 검색/선택 > 다음 > 권한 추가로 이동합니다.
보안 사용자 인증 정보> 액세스 키> 액세스 키 만들기로 이동합니다.
CSV를 다운로드합니다 (이러한 값은 피드에 입력됨).
Elasticsearch 로그를 수집하도록 Google SecOps에서 피드 구성
- SIEM 설정> 피드로 이동합니다.
- + 새 피드 추가를 클릭합니다.
- 피드 이름 필드에 피드 이름을 입력합니다 (예:
Elasticsearch Logs
). - 소스 유형으로 Amazon S3 V2를 선택합니다.
- 로그 유형으로 Elastic Search를 선택합니다.
- 다음을 클릭합니다.
- 다음 입력 파라미터의 값을 지정합니다.
- S3 URI:
s3://elastic-search-logs/logs/
- 소스 삭제 옵션: 환경설정에 따라 삭제 옵션을 선택합니다.
- 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.
- 액세스 키 ID: S3 버킷에 대한 액세스 권한이 있는 사용자 액세스 키입니다.
- 보안 비밀 액세스 키: S3 버킷에 액세스할 수 있는 사용자 보안 비밀 키입니다.
- 애셋 네임스페이스: 애셋 네임스페이스입니다.
- 수집 라벨: 이 피드의 이벤트에 적용된 라벨입니다.
- S3 URI:
- 다음을 클릭합니다.
- 확정 화면에서 새 피드 구성을 검토한 다음 제출을 클릭합니다.
UDM 매핑 테이블
로그 필드 | UDM 매핑 | 논리 |
---|---|---|
수준 | security_result.severity | 로직은 'Level' 필드의 값을 확인하고 해당하는 UDM 심각도 수준에 매핑합니다. - 'INFO', 'ALL', 'OFF', 'TRACE', 'DEBUG'는 'INFORMATIONAL'에 매핑됩니다. - 'WARN'이 'LOW'에 매핑됩니다. - 'ERROR'가 'ERROR'에 매핑됩니다. - 'FATAL'이 'CRITICAL'에 매핑됩니다. |
message.@timestamp | 타임스탬프 | 타임스탬프는 'yyyy-MM-ddTHH:mm:ss.SSS' 형식을 사용하여 원시 로그의 'message' 필드 내에서 '@timestamp' 필드로부터 파싱됩니다. |
message.action | security_result.action_details | 값은 원시 로그의 'message' 필드 내 'action' 필드에서 가져옵니다. |
message.event.action | security_result.summary | 값은 원시 로그의 'message' 필드 내 'event.action' 필드에서 가져옵니다. |
message.event.type | metadata.product_event_type | 값은 원시 로그의 'message' 필드 내 'event.type' 필드에서 가져옵니다. |
message.host.ip | target.ip | 값은 원시 로그의 'message' 필드 내 'host.ip' 필드에서 가져옵니다. |
message.host.name | target.hostname | 값은 원시 로그의 'message' 필드 내 'host.name' 필드에서 가져옵니다. |
message.indices | target.labels.value | 값은 원시 로그의 'message' 필드 내 'indices' 필드에서 가져옵니다. |
message.mrId | target.hostname | 값은 원시 로그의 'message' 필드 내 'mrId' 필드에서 가져옵니다. |
message.node.id | principal.asset.product_object_id | 값은 원시 로그의 'message' 필드 내에 있는 'node.id' 필드에서 가져옵니다. |
message.node.name | target.asset.hostname | 값은 원시 로그의 'message' 필드 내 'node.name' 필드에서 가져옵니다. |
message.origin.address | principal.ip | IP 주소는 포트 번호를 삭제하여 원시 로그의 'message' 필드 내 'origin.address' 필드에서 추출됩니다. |
message.origin.type | principal.resource.resource_subtype | 값은 원시 로그의 'message' 필드 내 'origin.type' 필드에서 가져옵니다. |
message.properties.host_group | principal.hostname | 값은 원시 로그의 'message' 필드 내에 있는 'properties.host_group' 필드에서 가져옵니다. |
message.properties.host_group | target.group.group_display_name | 값은 원시 로그의 'message' 필드 내에 있는 'properties.host_group' 필드에서 가져옵니다. |
message.request.id | target.resource.product_object_id | 값은 원시 로그의 'message' 필드 내 'request.id' 필드에서 가져옵니다. |
message.request.name | target.resource.name | 값은 원시 로그의 'message' 필드 내 'request.name' 필드에서 가져옵니다. |
message.user.name | principal.user.userid | 값은 원시 로그의 'message' 필드 내 'user.name' 필드에서 가져옵니다. |
message.user.realm | principal.user.attribute.permissions.name | 값은 원시 로그의 'message' 필드 내 'user.realm' 필드에서 가져옵니다. |
message.user.roles | about.user.attribute.roles.name | 값은 원시 로그의 'message' 필드 내 'user.roles' 필드에서 가져옵니다. |
metadata.event_type | 하드코딩된 값: 'USER_RESOURCE_ACCESS' | |
metadata.log_type | 하드코딩된 값: 'ELASTIC_SEARCH' | |
metadata.product_name | 하드코딩된 값: 'ELASTICSEARCH' | |
metadata.vendor_name | 하드코딩된 값: 'ELASTIC' | |
principal.port | 포트 번호는 원시 로그의 'message' 필드 내에 있는 'origin.address' 필드에서 추출됩니다. | |
target.labels.key | 하드코딩된 값: 'Indice' |
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.