Apache Kafka와 Google SecOps 통합
이 문서에서는 Apache Kafka를 Google Security Operations (Google SecOps)와 통합하는 방법을 설명합니다.
사용 사례
Apache Kafka 통합은 다음 사용 사례를 해결할 수 있습니다.
실시간 보안 로그 수집: Kafka 주제의 보안 이벤트를 Google SecOps로 자동 수집하고 처리합니다. 이를 통해 중앙 집중식 로그 관리와 실시간 분석을 통해 스트리밍 데이터를 기반으로 알림을 생성할 수 있습니다.
이벤트 기반 자동화: Kafka 주제에서 스트리밍되는 특정 보안 이벤트 또는 메시지를 기반으로 Google SecOps에서 자동화된 플레이북을 트리거합니다. 이를 통해 비정상적인 위치에서의 사용자 로그인과 같은 중요한 이벤트에 대한 응답이 빨라집니다.
위협 인텔리전스 보강: Kafka 주제에서 맞춤 위협 인텔리전스 피드를 가져와 기존 알림 및 케이스를 보강합니다. 이를 통해 분석가에게 침해 지표 (IOC)에 관한 최신 컨텍스트가 제공되고 위협 분석의 정확성이 향상됩니다.
시작하기 전에
Google SecOps에서 Apache Kafka 통합을 구성하기 전에 다음 필수 요건을 완료하세요.
- Apache Kafka 서버: 필요한 Kafka 브로커와 주제가 구성된 실행 중인 Apache Kafka 서버에 액세스할 수 있는지 확인합니다.
원격 에이전트 Docker 이미지: 원격 에이전트를 만들 때는 Debian 기반 이미지를 사용해야 합니다. 호환성을 확인하려면 다음 이미지를 사용하세요.
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
통합 매개변수
Apache Kafka 통합에는 다음 매개변수가 필요합니다.
| 매개변수 | 설명 |
|---|---|
Kafka brokers |
필수 항목입니다. 연결할 Kafka 브로커의 쉼표로 구분된 목록입니다( |
Use TLS for connection |
선택사항입니다. 선택하면 통합에서 인증에 TLS 암호화를 사용합니다. 이 매개변수에는 인증 기관 (CA) 인증서가 필요합니다. 기본적으로 사용 설정되어 있지 않습니다. |
Use SASL PLAIN with TLS for connection |
선택사항입니다. 선택하면 통합에서 인증을 위해 SASL PLAIN 사용자 이름 및 비밀번호 메커니즘을 사용합니다. 이 옵션은 TLS 암호화에서만 지원되며 SASL 사용자 이름과 비밀번호, CA 인증서가 모두 필요합니다. 기본적으로 사용 설정되어 있지 않습니다. |
CA certificate of Kafka server |
선택사항입니다. Kafka 서버의 ID를 확인하는 데 사용되는 CA 인증서입니다. SASL이 사용 설정된 경우 이 매개변수는 필수입니다. |
Client certificate |
선택사항입니다. Kafka 서버와의 상호 TLS 인증을 위한 클라이언트의 인증서입니다. 상호 TLS (mTLS)가 사용 설정된 경우 이 파라미터는 필수입니다. |
Client certificate key |
선택사항입니다. 상호 TLS 인증에 사용되는 클라이언트 인증서에 해당하는 비공개 키입니다. 상호 TLS (mTLS)가 사용 설정된 경우 이 파라미터는 필수입니다. |
Client certificate key password |
선택사항입니다. 클라이언트 인증서의 비공개 키를 복호화하는 데 사용되는 비밀번호입니다. 상호 TLS (mTLS)가 사용 설정된 경우 이 파라미터는 필수입니다. |
SASL PLAIN Username |
선택사항입니다. Kafka 브로커와의 SASL PLAIN 인증을 위한 사용자 이름입니다. SASL이 사용 설정된 경우 이 매개변수는 필수입니다. |
SASL PLAIN Password |
선택사항입니다. Kafka 브로커와의 SASL PLAIN 인증 비밀번호입니다. SASL이 사용 설정된 경우 이 매개변수는 필수입니다. |
Google SecOps에서 통합을 구성하는 방법에 대한 안내는 통합 구성을 참고하세요.
필요한 경우 이후 단계에서 변경할 수 있습니다. 통합 인스턴스를 구성한 후 플레이북에서 사용할 수 있습니다. 여러 인스턴스를 구성하고 지원하는 방법에 관한 자세한 내용은 여러 인스턴스 지원을 참고하세요.
작업
작업에 대한 자세한 내용은 내 Workdesk에서 대기 중인 작업에 응답 및 수동 작업 실행을 참고하세요.
핑
Ping 작업을 사용하여 Apache Kafka와의 연결을 테스트합니다.
이 작업은 Google SecOps 항목에서 실행되지 않습니다.
작업 입력
없음
작업 출력
Ping 작업은 다음 출력을 제공합니다.
| 작업 출력 유형 | 가용성 |
|---|---|
| 케이스 월 연결 | 사용 불가 |
| 케이스 월 링크 | 사용 불가 |
| 케이스 월 테이블 | 사용 불가 |
| 보강 테이블 | 사용 불가 |
| JSON 결과 | 사용 불가 |
| 출력 메시지 | 사용 가능 |
| 스크립트 결과입니다. | 사용 가능 |
출력 메시지
Ping 작업은 다음 출력 메시지를 반환할 수 있습니다.
| 출력 메시지 | 메시지 설명 |
|---|---|
|
작업이 완료되었습니다. |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
작업이 실패했습니다. 서버, 입력 매개변수 또는 사용자 인증 정보에 대한 연결을 확인합니다. |
스크립트 결과
다음 표에는 Ping 작업을 사용할 때 스크립트 결과 출력 값이 나와 있습니다.
| 스크립트 결과 이름 | 값 |
|---|---|
is_success |
True 또는 False |
커넥터
Google SecOps에서 커넥터를 구성하는 방법을 자세히 알아보려면 데이터 수집 (커넥터)을 참고하세요.
Apache Kafka - Messages Connector
Apache Kafka - 메시지 커넥터를 사용하여 Apache Kafka에서 메시지를 가져옵니다.
커넥터는 지정된 Kafka 주제에서 메시지를 가져오며 메시지 형식에 따라 다양한 방식으로 처리할 수 있습니다. 메시지가 유효한 JSON 객체인 경우 커넥터는 알림 생성 및 매핑을 위해 특정 필드를 추출합니다. 메시지가 일반 문자열인 경우 원시 이벤트 데이터로 수집됩니다.
커넥터는 제공된 매개변수를 기반으로 심각도 매핑, 알림 이름 템플릿, 고유 ID 생성을 처리합니다.
JSON 심각도 매핑
알림 심각도를 매핑하려면 Apache Kafka - 메시지 커넥터가 Severity Mapping JSON 매개변수의 심각도 값을 가져오는 데 사용하는 필드를 지정해야 합니다. 커넥터 응답에는 integer, float, string과 같은 값 유형이 포함될 수 있습니다.
Apache Kafka - Messages Connector는 integer 및 float 값을 읽고 Google SecOps 설정에 따라 매핑합니다. 다음 표는 Google SecOps의 심각도에 대한 integer 값의 매핑을 보여줍니다.
| 정수 값 | 매핑된 심각도 |
|---|---|
100 |
Critical |
80부터 100까지 |
High |
60부터 80까지 |
Medium |
40부터 60까지 |
Low |
40 미만 |
Informational |
대답에 string 값이 포함된 경우 Pub/Sub – Messages Connector에 추가 구성이 필요합니다.
처음에는 기본값이 다음과 같이 표시됩니다.
{
"Default": 60
}
매핑에 필요한 값이 event_severity JSON 키에 있는 경우 값은 다음과 같을 수 있습니다.
"Malicious""Benign""Unknown"
event_severity JSON 키 값을 파싱하고 JSON 객체의 형식이 올바른지 확인하려면 다음과 같이 Severity Mapping JSON 매개변수를 구성하세요.
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
"Default" 값은 필수 항목입니다.
동일한 JSON 객체에 여러 일치 항목이 있는 경우 Apache Kafka - Messages Connector는 첫 번째 JSON 객체 키에 우선순위를 부여합니다.
integer 또는 float 값이 포함된 필드를 사용하려면 Severity Mapping JSON 매개변수에서 키와 빈 문자열을 구성합니다.
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
커넥터 입력
Apache Kafka - Messages Connector에는 다음 매개변수가 필요합니다.
| 매개변수 | 설명 |
|---|---|
Product Field Name |
필수 항목입니다. 제품 이름이 저장된 필드의 이름입니다. 제품 이름은 주로 매핑에 영향을 미칩니다. 커넥터의 매핑 프로세스를 간소화하고 개선하기 위해 기본값은 코드에서 참조되는 대체 값으로 확인됩니다. 이 매개변수의 잘못된 입력은 기본적으로 대체 값으로 처리됩니다. 기본값은 |
Event Field Name |
필수 항목입니다. 이벤트 이름 (하위 유형)을 결정하는 필드의 이름입니다. 기본값은 |
Environment Field Name |
선택사항입니다. 환경 이름이 저장된 필드의 이름입니다. 환경 필드가 누락된 경우 커넥터는 기본값을 사용합니다. 기본값은 |
Environment Regex Pattern |
선택사항입니다.
기본값 정규 표현식 패턴이 null이거나 비어 있거나 환경 값이 null인 경우 최종 환경 결과는 기본 환경입니다. |
Script Timeout (Seconds) |
필수 항목입니다. 현재 스크립트를 실행하는 Python 프로세스의 제한 시간 한도(초)입니다. 기본값은 |
Kafka brokers |
필수 항목입니다. 연결할 Kafka 브로커의 쉼표로 구분된 목록입니다( |
Use TLS for connection |
선택사항입니다. 선택하면 통합에서 인증에 TLS 암호화를 사용합니다. 이 매개변수에는 CA 인증서가 필요합니다. 기본적으로 사용 설정되어 있지 않습니다. |
Use SASL PLAIN with TLS for connection |
선택사항입니다. 선택하면 통합에서 인증을 위해 SASL PLAIN 사용자 이름 및 비밀번호 메커니즘을 사용합니다. 이 옵션을 사용하려면 SASL 사용자 이름과 비밀번호를 제공해야 합니다. CA 인증서가 필요한 TLS 암호화에서만 지원됩니다. 기본적으로 사용 설정되어 있지 않습니다. |
CA certificate of Kafka server |
선택사항입니다. Kafka 서버의 ID를 확인하는 데 사용되는 CA 인증서입니다. |
Client certificate |
선택사항입니다. Kafka 서버와의 상호 TLS 인증을 위한 클라이언트의 인증서입니다. |
Client certificate key |
선택사항입니다. 상호 TLS 인증에 사용되는 클라이언트 인증서에 해당하는 비공개 키입니다. |
Client certificate key password |
선택사항입니다. 클라이언트 인증서의 비공개 키를 복호화하는 데 사용되는 비밀번호입니다. |
SASL PLAIN Username |
선택사항입니다. Kafka 브로커와의 SASL PLAIN 인증을 위한 사용자 이름입니다. |
SASL PLAIN Password |
선택사항입니다. Kafka 브로커와의 SASL PLAIN 인증 비밀번호입니다. |
Topic |
필수 항목입니다. 인시던트가 검색되는 Kafka 주제입니다. |
Consumer Group ID |
선택사항입니다. 인시던트를 가져올 때 사용되는 소비자 그룹의 식별자입니다. 값이 제공되지 않으면 고유 ID가 생성됩니다. |
Partitions |
선택사항입니다. 메시지를 가져올 파티션의 CSV 목록입니다. |
Initial Offset |
선택사항입니다. 커넥터가 Kafka 파티션에서 메시지 가져오기를 시작하는 위치입니다. 특정 오프셋에서 시작하도록 양의 정수를 지정하거나 |
Poll Timeout |
선택사항입니다. Kafka에서 메시지를 소비하는 폴링 제한 시간(초)입니다. |
Case Name Template |
선택사항입니다. 맞춤 케이스 이름을 정의하는 템플릿입니다. 커넥터는 이벤트에 첫 번째 이벤트의 문자열 값에서 채워지는 FIELD_NAME 형식의 자리표시자를 사용할 수 있습니다. 예: |
Alert Name Template |
필수 항목입니다. 알림 이름을 정의하는 템플릿입니다. 첫 번째 이벤트의 문자열 값에서 채워지는 FIELD_NAME 형식의 자리표시자를 사용할 수 있습니다. 예: 값이 제공되지 않거나 템플릿이 잘못된 경우 커넥터는 기본 알림 이름을 사용합니다. |
Rule Generator Template |
필수 항목입니다. 규칙 생성기를 정의하는 템플릿입니다. 첫 번째 이벤트의 문자열 값에서 채워지는 FIELD_NAME 형식의 자리표시자를 사용할 수 있습니다. 예: 값이 제공되지 않거나 템플릿이 유효하지 않으면 커넥터는 기본 규칙 생성기 이름을 사용합니다. |
Timestamp Field |
필수 항목입니다. Google SecOps 알림 타임스탬프가 포함된 Kafka 메시지의 필드 이름입니다. 타임스탬프가 Unix epoch 형식이 아닌 경우 |
Timestamp Format |
선택사항입니다. Unix epoch이 아닌 타임스탬프에 필요한 메시지 타임스탬프의 형식입니다. 표준 Python 타임스탬프가 Unix 에포크 형식이고 이 매개변수가 구성되지 않은 경우 커넥터가 실패합니다. |
Severity Mapping JSON |
필수 항목입니다. 커넥터가 메시지에서 심각도 수준을 추출하여 Google SecOps 우선순위 등급에 매핑하는 데 사용하는 JSON 객체입니다. 기본값은 |
Unique ID Field |
선택사항입니다. 고유 메시지 식별자로 사용할 필드의 이름입니다. 값이 제공되지 않으면 커넥터는 메시지 콘텐츠의 SHA-256 해시를 메시지 식별자로 생성하고 사용합니다. |
Max Messages To Fetch |
필수 항목입니다. 커넥터가 각 반복에서 처리하는 최대 메시지 수입니다. 기본값은 |
Disable Overflow |
선택사항입니다. 선택하면 커넥터가 Google SecOps 오버플로 메커니즘을 무시합니다. 기본적으로 사용 설정됩니다. |
Verify SSL |
필수 항목입니다. 선택하면 통합에서 Apache Kafka 서버에 연결할 때 SSL 인증서를 검증합니다. 기본적으로 사용 설정됩니다. |
Proxy Server Address |
선택사항입니다. 사용할 프록시 서버의 주소입니다. |
Proxy Username |
선택사항입니다. 인증할 프록시 사용자 이름입니다. |
Proxy Password |
선택사항입니다. 인증할 프록시 비밀번호입니다. |
커넥터 규칙
커넥터가 프록시를 지원합니다.
커넥터 알림
다음 표에서는 Apache Kafka 메시지 필드와 Google SecOps 알림 필드의 매핑을 설명합니다.
| Siemplify 알림 필드 | Apache Kafka 메시지 필드 |
|---|---|
SourceSystemName |
프레임워크에 의해 채워집니다. |
TicketId |
고유 ID 필드의 값 또는 메시지의 SHA-256 해시입니다. |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
Alert Name Template에 의해 생성된 값입니다. |
Reason |
해당 사항 없음 |
Description |
해당 사항 없음 |
DeviceVendor |
하드코딩: Apache Kafka |
DeviceProduct |
대체 값: Message |
Priority |
Severity Mapping JSON 매개변수에서 매핑됩니다. |
RuleGenerator |
Rule Generator Template에 의해 생성된 값입니다. |
SourceGroupingIdentifier |
해당 사항 없음 |
StartTime |
Timestamp Field에서 변환되었습니다. |
EndTime |
Timestamp Field에서 변환되었습니다. |
Siemplify Alert - Extensions |
해당 사항 없음 |
Siemplify Alert - Attachments |
해당 사항 없음 |
커넥터 이벤트
커넥터 이벤트의 예는 다음과 같습니다.
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가에게 문의하여 답변을 받으세요.