Apache Kafka を Google SecOps と統合する
統合バージョン: 1.0
このドキュメントでは、Apache Kafka を Google Security Operations(Google SecOps)と統合する方法について説明します。
ユースケース
Apache Kafka 統合は、次のユースケースに対応できます。
リアルタイムのセキュリティ ログの取り込み: Kafka トピックから Google SecOps にセキュリティ イベントを自動的に取り込んで処理します。これにより、ログの一元管理とリアルタイム分析が可能になり、ストリーミング データに基づいてアラートを生成できます。
イベント ドリブン自動化: Kafka トピックからストリーミングされた特定のセキュリティ イベントまたはメッセージに基づいて、Google SecOps で自動化されたプレイブックをトリガーします。これにより、通常とは異なる場所からのユーザー ログインなどの重大なイベントに対する対応が迅速になります。
脅威インテリジェンスの拡充: Kafka トピックからカスタム脅威インテリジェンス フィードを pull して、既存のアラートとケースを拡充します。これにより、アナリストはセキュリティ侵害の証拠や痕跡(IOC)に関する最新のコンテキスト情報を入手し、脅威分析の精度を高めることができます。
始める前に
Google SecOps で Apache Kafka 統合を構成する前に、次の前提条件を満たしてください。
- Apache Kafka サーバー: 必要な Kafka ブローカーとトピックが構成された実行中の Apache Kafka サーバーにアクセスできることを確認します。
リモート エージェントの Docker イメージ: リモート エージェントを作成する場合は、Debian ベースのイメージを使用する必要があります。互換性を確認するには、次の画像をご覧ください。
us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
統合のパラメータ
Apache Kafka インテグレーションには、次のパラメータが必要です。
| パラメータ | 説明 |
|---|---|
Kafka brokers |
必須。 接続する Kafka ブローカーのカンマ区切りのリスト( |
Use TLS for connection |
省略可。 選択すると、統合で認証に TLS 暗号化が使用されます。 このパラメータには、認証局(CA)証明書が必要です。 デフォルトでは有効になっていません。 |
Use SASL PLAIN with TLS for connection |
省略可。 選択すると、統合で認証に SASL PLAIN のユーザー名とパスワードのメカニズムが使用されます。 このオプションは TLS 暗号化でのみサポートされており、SASL ユーザー名とパスワード、CA 証明書の両方が必要です。 デフォルトでは有効になっていません。 |
CA certificate of Kafka server |
省略可。 Kafka サーバーの ID の検証に使用される CA 証明書。 SASL が有効になっている場合、このパラメータは必須です。 |
Client certificate |
省略可。 Kafka サーバーとの相互 TLS 認証用のクライアント証明書。 相互 TLS(mTLS)が有効になっている場合、このパラメータは必須です。 |
Client certificate key |
省略可。 相互 TLS 認証に使用される、クライアントの証明書に対応する秘密鍵。 相互 TLS(mTLS)が有効になっている場合、このパラメータは必須です。 |
Client certificate key password |
省略可。 クライアント証明書の秘密鍵の復号に使用されるパスワード。 相互 TLS(mTLS)が有効になっている場合、このパラメータは必須です。 |
SASL PLAIN Username |
省略可。 Kafka ブローカーとの SASL PLAIN 認証に使用するユーザー名。 SASL が有効になっている場合、このパラメータは必須です。 |
SASL PLAIN Password |
省略可。 Kafka ブローカーとの SASL PLAIN 認証のパスワード。 SASL が有効になっている場合、このパラメータは必須です。 |
Google SecOps で統合を構成する手順については、統合を構成するをご覧ください。
必要に応じて、後の段階で変更できます。統合インスタンスを構成すると、ハンドブックで使用できるようになります。複数のインスタンスの構成とサポートの詳細については、複数のインスタンスのサポートをご覧ください。
操作
アクションの詳細については、 デスクから保留中のアクションに対応すると手動アクションを実行するをご覧ください。
Ping
Ping アクションを使用して、Apache Kafka への接続をテストします。
このアクションは Google SecOps エンティティに対して実行されません。
アクション入力
なし
アクションの出力
Ping アクションは次の出力を提供します。
| アクションの出力タイプ | 対象 |
|---|---|
| ケースウォールのアタッチメント | 利用不可 |
| ケースウォールのリンク | 利用不可 |
| ケースウォール テーブル | 利用不可 |
| 拡充テーブル | 利用不可 |
| JSON の結果 | 利用不可 |
| 出力メッセージ | 利用可能 |
| スクリプトの結果。 | 利用可能 |
出力メッセージ
Ping アクションは、次の出力メッセージを返すことができます。
| 出力メッセージ | メッセージの説明 |
|---|---|
|
アクションが成功しました。 |
Failed to connect to the Apache Kafka server!
Error is ERROR_REASON |
操作に失敗しました。 サーバーへの接続、入力パラメータ、または認証情報を確認してください。 |
スクリプトの結果
次の表に、Ping アクションを使用した場合のスクリプト結果出力の値を示します。
| スクリプトの結果名 | 値 |
|---|---|
is_success |
True または False |
コネクタ
Google SecOps でコネクタを構成する方法については、データを取り込む(コネクタ)をご覧ください。
Apache Kafka - Messages Connector
Apache Kafka - メッセージ コネクタを使用して、Apache Kafka からメッセージを取得します。
コネクタは、指定された Kafka トピックからメッセージを取得し、メッセージ形式に基づいてさまざまな方法で処理できます。メッセージが有効な JSON オブジェクトの場合、コネクタはアラートの作成とマッピングのために特定のフィールドを抽出します。メッセージがプレーン文字列の場合、未加工のイベントデータとして取り込まれます。
コネクタは、指定したパラメータに基づいて、重大度のマッピング、アラート名のテンプレート、一意の ID の生成を処理します。
JSON の重大度マッピング
アラートの重大度をマッピングするには、Apache Kafka - Messages Connector が Severity Mapping JSON パラメータの重大度の値を取得するために使用するフィールドを指定する必要があります。コネクタ レスポンスには、integer、float、string などの値の型を含めることができます。
Apache Kafka - メッセージ コネクタは、integer と float の値を読み取り、Google SecOps の設定に従ってマッピングします。次の表に、integer 値と Google SecOps の重大度のマッピングを示します。
| 整数値 | マッピングされた重大度 |
|---|---|
100 |
Critical |
80~100 |
High |
60~80 |
Medium |
40~60 |
Low |
40未満 |
Informational |
レスポンスに string 値が含まれている場合、Pub/Sub - メッセージ コネクタには追加の構成が必要です。
最初は、デフォルト値は次のように表示されます。
{
"Default": 60
}
マッピングに必要な値が event_severity JSON キーにある場合、値は次のようになります。
"Malicious""Benign""Unknown"
event_severity JSON キー値を解析し、JSON オブジェクトの形式が正しいことを確認するには、Severity Mapping JSON パラメータを次のように構成します。
{
"event_severity": {
"Malicious": 100,
"Unknown": 60,
"Benign": -1
},
"Default": 50
}
"Default" の値が必要です。
同じ JSON オブジェクトに複数の一致がある場合、Apache Kafka - Messages Connector は最初の JSON オブジェクト キーを優先します。
integer 値または float 値を含むフィールドを操作するには、Severity Mapping JSON パラメータでキーと空の文字列を構成します。
{
"Default":"60",
"integer_field": "",
"float_field": ""
}
コネクタの入力
Apache Kafka - Messages Connector には、次のパラメータが必要です。
| パラメータ | 説明 |
|---|---|
Product Field Name |
必須。 商品名が保存されるフィールドの名前。 商品名は主にマッピングに影響します。コネクタのマッピング プロセスを合理化して改善するため、デフォルト値はコードから参照されるフォールバック値に解決されます。このパラメータの無効な入力は、デフォルトでフォールバック値に解決されます。 デフォルト値は |
Event Field Name |
必須。 イベント名(サブタイプ)を特定するフィールドの名前。 デフォルト値は |
Environment Field Name |
省略可。 環境名が保存されるフィールドの名前。 環境フィールドがない場合、コネクタはデフォルト値を使用します。 デフォルト値は |
Environment Regex Pattern |
省略可。
デフォルト値 正規表現パターンが null か空の場合、または環境値が null の場合、最終的な環境の結果はデフォルト環境になります。 |
Script Timeout (Seconds) |
必須。 現在のスクリプトを実行する Python プロセスのタイムアウト上限(秒単位)。 デフォルト値は |
Kafka brokers |
必須。 接続する Kafka ブローカーのカンマ区切りのリスト( |
Use TLS for connection |
省略可。 選択すると、統合で認証に TLS 暗号化が使用されます。 このパラメータには CA 証明書が必要です。 デフォルトでは有効になっていません。 |
Use SASL PLAIN with TLS for connection |
省略可。 選択すると、統合で認証に SASL PLAIN のユーザー名とパスワードのメカニズムが使用されます。 このオプションでは、SASL ユーザー名とパスワードを指定する必要があります。これは TLS 暗号化でのみサポートされており、CA 証明書が必要です。 デフォルトでは有効になっていません。 |
CA certificate of Kafka server |
省略可。 Kafka サーバーの ID の検証に使用される CA 証明書。 |
Client certificate |
省略可。 Kafka サーバーとの相互 TLS 認証用のクライアント証明書。 |
Client certificate key |
省略可。 相互 TLS 認証に使用される、クライアントの証明書に対応する秘密鍵。 |
Client certificate key password |
省略可。 クライアント証明書の秘密鍵の復号に使用されるパスワード。 |
SASL PLAIN Username |
省略可。 Kafka ブローカーとの SASL PLAIN 認証に使用するユーザー名。 |
SASL PLAIN Password |
省略可。 Kafka ブローカーとの SASL PLAIN 認証のパスワード。 |
Topic |
必須。 インシデントを取得する Kafka トピック。 |
Consumer Group ID |
省略可。 インシデントの取得に使用されるコンシューマー グループの識別子。 値が指定されていない場合は、一意の ID が生成されます。 |
Partitions |
省略可。 メッセージを取得するパーティションの CSV リスト。 |
Initial Offset |
省略可。 コネクタが Kafka パーティションからメッセージの取得を開始する場所。 特定のオフセットから開始する正の整数を指定するか、 |
Poll Timeout |
省略可。 Kafka からメッセージを消費するポーリング タイムアウト(秒単位)。 |
Case Name Template |
省略可。 カスタムケース名を定義するテンプレート。コネクタは、イベントに FIELD_NAME 形式のプレースホルダを使用できます。このプレースホルダには、最初のイベントの文字列値が入力されます。 例: |
Alert Name Template |
必須。 アラート名を定義するテンプレート。 FIELD_NAME 形式のプレースホルダを使用できます。このプレースホルダには、最初のイベントの文字列値が入力されます。 例: 値が指定されていない場合や、テンプレートが無効な場合、コネクタはデフォルトのアラート名を使用します。 |
Rule Generator Template |
必須。 ルール ジェネレータを定義するテンプレート。 FIELD_NAME 形式のプレースホルダを使用できます。このプレースホルダには、最初のイベントの文字列値が入力されます。 例: 値が指定されていない場合や、テンプレートが無効な場合、コネクタはデフォルトのルール ジェネレータ名を使用します。 |
Timestamp Field |
必須。 Google SecOps アラートのタイムスタンプを含む Kafka メッセージのフィールド名。 タイムスタンプが Unix エポック形式でない場合は、その形式を |
Timestamp Format |
省略可。 メッセージのタイムスタンプの形式。Unix エポック以外のタイムスタンプに必要です。標準の Python タイムスタンプが Unix エポック形式ではなく、このパラメータが構成されていない場合、コネクタは失敗します。 |
Severity Mapping JSON |
必須。 コネクタがメッセージから重大度レベルを抽出して Google SecOps の優先度スケールにマッピングするために使用する JSON オブジェクト。 デフォルト値は |
Unique ID Field |
省略可。 一意のメッセージ ID として使用するフィールドの名前。 値が指定されていない場合、コネクタはメッセージ コンテンツの SHA-256 ハッシュを生成して、メッセージ ID として使用します。 |
Max Messages To Fetch |
必須。 コネクタが各イテレーションで処理するメッセージの最大数。 デフォルト値は |
Disable Overflow |
省略可。 選択すると、コネクタは Google SecOps のオーバーフロー メカニズムを無視します。 デフォルトで有効になっています。 |
Verify SSL |
必須。 選択すると、統合は Apache Kafka サーバーに接続するときに SSL 証明書を検証します。 デフォルトで有効になっています。 |
Proxy Server Address |
省略可。 使用するプロキシ サーバーのアドレス。 |
Proxy Username |
省略可。 認証に使用するプロキシのユーザー名。 |
Proxy Password |
省略可。 認証に使用するプロキシ パスワード。 |
コネクタルール
コネクタはプロキシをサポートしています。
コネクタのアラート
次の表に、Apache Kafka メッセージ フィールドと Google SecOps アラート フィールドのマッピングを示します。
| Siemplify アラート フィールド | Apache Kafka メッセージ フィールド |
|---|---|
SourceSystemName |
フレームワークによって入力されます。 |
TicketId |
一意の ID フィールドの値またはメッセージの SHA-256 ハッシュ。 |
DisplayId |
ApacheKafka_{unique id or hash}_{connector identifier} |
Name |
Alert Name Template によって生成された値。 |
Reason |
なし |
Description |
なし |
DeviceVendor |
ハードコード: Apache Kafka |
DeviceProduct |
フォールバック値: Message |
Priority |
Severity Mapping JSON パラメータからマッピングされます。 |
RuleGenerator |
Rule Generator Template によって生成された値。 |
SourceGroupingIdentifier |
なし |
StartTime |
Timestamp Field から変換されました。 |
EndTime |
Timestamp Field から変換されました。 |
Siemplify Alert - Extensions |
なし |
Siemplify Alert - Attachments |
なし |
コネクタ イベント
コネクタ イベントの例を次に示します。
{
"notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
"finding": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
"parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
"resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
"state": "ACTIVE",
"category": "OPEN_NETBIOS_PORT",
"externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"sourceProperties": {
"Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
"ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
"Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
"ScannerName": "FIREWALL_SCANNER",
"ResourcePath": [
"projects/PROJECT_ID/",
"folders/FOLDER_ID_1/",
"folders/FOLDER_ID_2/",
"organizations/ORGANIZATION_ID/"
],
"ExposedService": "NetBIOS",
"OpenPorts": {
"TCP": [
137.0,
138.0,
139.0
],
"UDP": [
137.0,
138.0,
139.0
]
},
"compliance_standards": {
"iso": [
{
"ids": [
"A.13.1.1"
]
}
],
"pci": [
{
"ids": [
"1.2.1"
]
}
],
"nist": [
{
"ids": [
"SC-7"
]
}
]
},
"ReactivationCount": 4.0
},
"securityMarks": {
"name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
"marks": {
"USER_ID": "SECURITY_MARK"
}
},
"eventTime": "2024-08-30T14:44:37.973090Z",
"createTime": "2024-06-24T07:08:54.777Z",
"propertyDataTypes": {
"ResourcePath": {
"listValues": {
"propertyDataTypes": [
{
"primitiveDataType": "STRING"
}
]
}
},
"ReactivationCount": {
"primitiveDataType": "NUMBER"
},
"Explanation": {
"primitiveDataType": "STRING"
},
"ExposedService": {
"primitiveDataType": "STRING"
},
"ScannerName": {
"primitiveDataType": "STRING"
}
}
}
}
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。