Apache Kafka を Google SecOps と統合する

統合バージョン: 1.0

このドキュメントでは、Apache Kafka を Google Security Operations(Google SecOps)と統合する方法について説明します。

ユースケース

Apache Kafka 統合は、次のユースケースに対応できます。

  • リアルタイムのセキュリティ ログの取り込み: Kafka トピックから Google SecOps にセキュリティ イベントを自動的に取り込んで処理します。これにより、ログの一元管理とリアルタイム分析が可能になり、ストリーミング データに基づいてアラートを生成できます。

  • イベント ドリブン自動化: Kafka トピックからストリーミングされた特定のセキュリティ イベントまたはメッセージに基づいて、Google SecOps で自動化されたプレイブックをトリガーします。これにより、通常とは異なる場所からのユーザー ログインなどの重大なイベントに対する対応が迅速になります。

  • 脅威インテリジェンスの拡充: Kafka トピックからカスタム脅威インテリジェンス フィードを pull して、既存のアラートとケースを拡充します。これにより、アナリストはセキュリティ侵害の証拠や痕跡(IOC)に関する最新のコンテキスト情報を入手し、脅威分析の精度を高めることができます。

始める前に

Google SecOps で Apache Kafka 統合を構成する前に、次の前提条件を満たしてください。

  • Apache Kafka サーバー: 必要な Kafka ブローカーとトピックが構成された実行中の Apache Kafka サーバーにアクセスできることを確認します。
  • リモート エージェントの Docker イメージ: リモート エージェントを作成する場合は、Debian ベースのイメージを使用する必要があります。互換性を確認するには、次の画像をご覧ください。

    us-docker.pkg.dev/siem-ar-public/images/agent-debian:latest
    

統合のパラメータ

Apache Kafka インテグレーションには、次のパラメータが必要です。

パラメータ 説明
Kafka brokers

必須。

接続する Kafka ブローカーのカンマ区切りのリスト(hostname:port 形式)。

Use TLS for connection

省略可。

選択すると、統合で認証に TLS 暗号化が使用されます。

このパラメータには、認証局(CA)証明書が必要です。

デフォルトでは有効になっていません。

Use SASL PLAIN with TLS for connection

省略可。

選択すると、統合で認証に SASL PLAIN のユーザー名とパスワードのメカニズムが使用されます。

このオプションは TLS 暗号化でのみサポートされており、SASL ユーザー名とパスワード、CA 証明書の両方が必要です。

デフォルトでは有効になっていません。

CA certificate of Kafka server

省略可。

Kafka サーバーの ID の検証に使用される CA 証明書。

SASL が有効になっている場合、このパラメータは必須です。

Client certificate

省略可。

Kafka サーバーとの相互 TLS 認証用のクライアント証明書。

相互 TLS(mTLS)が有効になっている場合、このパラメータは必須です。

Client certificate key

省略可。

相互 TLS 認証に使用される、クライアントの証明書に対応する秘密鍵。

相互 TLS(mTLS)が有効になっている場合、このパラメータは必須です。

Client certificate key password

省略可。

クライアント証明書の秘密鍵の復号に使用されるパスワード。

相互 TLS(mTLS)が有効になっている場合、このパラメータは必須です。

SASL PLAIN Username

省略可。

Kafka ブローカーとの SASL PLAIN 認証に使用するユーザー名。

SASL が有効になっている場合、このパラメータは必須です。

SASL PLAIN Password

省略可。

Kafka ブローカーとの SASL PLAIN 認証のパスワード。

SASL が有効になっている場合、このパラメータは必須です。

Google SecOps で統合を構成する手順については、統合を構成するをご覧ください。

必要に応じて、後の段階で変更できます。統合インスタンスを構成すると、ハンドブックで使用できるようになります。複数のインスタンスの構成とサポートの詳細については、複数のインスタンスのサポートをご覧ください。

操作

アクションの詳細については、 デスクから保留中のアクションに対応する手動アクションを実行するをご覧ください。

Ping

Ping アクションを使用して、Apache Kafka への接続をテストします。

このアクションは Google SecOps エンティティに対して実行されません。

アクション入力

なし

アクションの出力

Ping アクションは次の出力を提供します。

アクションの出力タイプ 対象
ケースウォールのアタッチメント 利用不可
ケースウォールのリンク 利用不可
ケースウォール テーブル 利用不可
拡充テーブル 利用不可
JSON の結果 利用不可
出力メッセージ 利用可能
スクリプトの結果。 利用可能
出力メッセージ

Ping アクションは、次の出力メッセージを返すことができます。

出力メッセージ メッセージの説明

Successfully connected to the Apache Kafka server with the provided connection parameters!

アクションが成功しました。
Failed to connect to the Apache Kafka server! Error is ERROR_REASON

操作に失敗しました。

サーバーへの接続、入力パラメータ、または認証情報を確認してください。

スクリプトの結果

次の表に、Ping アクションを使用した場合のスクリプト結果出力の値を示します。

スクリプトの結果名
is_success True または False

コネクタ

Google SecOps でコネクタを構成する方法については、データを取り込む(コネクタ)をご覧ください。

Apache Kafka - Messages Connector

Apache Kafka - メッセージ コネクタを使用して、Apache Kafka からメッセージを取得します。

コネクタは、指定された Kafka トピックからメッセージを取得し、メッセージ形式に基づいてさまざまな方法で処理できます。メッセージが有効な JSON オブジェクトの場合、コネクタはアラートの作成とマッピングのために特定のフィールドを抽出します。メッセージがプレーン文字列の場合、未加工のイベントデータとして取り込まれます。

コネクタは、指定したパラメータに基づいて、重大度のマッピング、アラート名のテンプレート、一意の ID の生成を処理します。

JSON の重大度マッピング

アラートの重大度をマッピングするには、Apache Kafka - Messages ConnectorSeverity Mapping JSON パラメータの重大度の値を取得するために使用するフィールドを指定する必要があります。コネクタ レスポンスには、integerfloatstring などの値の型を含めることができます。

Apache Kafka - メッセージ コネクタは、integerfloat の値を読み取り、Google SecOps の設定に従ってマッピングします。次の表に、integer 値と Google SecOps の重大度のマッピングを示します。

整数値 マッピングされた重大度
100 Critical
80100 High
6080 Medium
4060 Low
40未満 Informational

レスポンスに string 値が含まれている場合、Pub/Sub - メッセージ コネクタには追加の構成が必要です。

最初は、デフォルト値は次のように表示されます。

{
    "Default": 60
}

マッピングに必要な値が event_severity JSON キーにある場合、値は次のようになります。

  • "Malicious"
  • "Benign"
  • "Unknown"

event_severity JSON キー値を解析し、JSON オブジェクトの形式が正しいことを確認するには、Severity Mapping JSON パラメータを次のように構成します。

{
    "event_severity": {
        "Malicious": 100,
        "Unknown": 60,
        "Benign": -1
    },
    "Default": 50
}

"Default" の値が必要です。

同じ JSON オブジェクトに複数の一致がある場合、Apache Kafka - Messages Connector は最初の JSON オブジェクト キーを優先します。

integer 値または float 値を含むフィールドを操作するには、Severity Mapping JSON パラメータでキーと空の文字列を構成します。

{
  "Default":"60",
  "integer_field": "",
  "float_field": ""
}

コネクタの入力

Apache Kafka - Messages Connector には、次のパラメータが必要です。

パラメータ 説明
Product Field Name

必須。

商品名が保存されるフィールドの名前。

商品名は主にマッピングに影響します。コネクタのマッピング プロセスを合理化して改善するため、デフォルト値はコードから参照されるフォールバック値に解決されます。このパラメータの無効な入力は、デフォルトでフォールバック値に解決されます。

デフォルト値は Product Name です。

Event Field Name

必須。

イベント名(サブタイプ)を特定するフィールドの名前。

デフォルト値は event_type です。

Environment Field Name

省略可。

環境名が保存されるフィールドの名前。

環境フィールドがない場合、コネクタはデフォルト値を使用します。

デフォルト値は "" です。

Environment Regex Pattern

省略可。

Environment Field Name フィールドで見つかった値に対して実行する正規表現パターン。このパラメータを使用すると、正規表現ロジックを使用して環境フィールドを操作できます。

デフォルト値 .* を使用して、必要な未加工の Environment Field Name 値を取得します。

正規表現パターンが null か空の場合、または環境値が null の場合、最終的な環境の結果はデフォルト環境になります。

Script Timeout (Seconds)

必須。

現在のスクリプトを実行する Python プロセスのタイムアウト上限(秒単位)。

デフォルト値は 180 です。

Kafka brokers

必須。

接続する Kafka ブローカーのカンマ区切りのリスト(hostname:port 形式)。

Use TLS for connection

省略可。

選択すると、統合で認証に TLS 暗号化が使用されます。

このパラメータには CA 証明書が必要です。

デフォルトでは有効になっていません。

Use SASL PLAIN with TLS for connection

省略可。

選択すると、統合で認証に SASL PLAIN のユーザー名とパスワードのメカニズムが使用されます。

このオプションでは、SASL ユーザー名とパスワードを指定する必要があります。これは TLS 暗号化でのみサポートされており、CA 証明書が必要です。

デフォルトでは有効になっていません。

CA certificate of Kafka server

省略可。

Kafka サーバーの ID の検証に使用される CA 証明書。

Client certificate

省略可。

Kafka サーバーとの相互 TLS 認証用のクライアント証明書。

Client certificate key

省略可。

相互 TLS 認証に使用される、クライアントの証明書に対応する秘密鍵。

Client certificate key password

省略可。

クライアント証明書の秘密鍵の復号に使用されるパスワード。

SASL PLAIN Username

省略可。

Kafka ブローカーとの SASL PLAIN 認証に使用するユーザー名。

SASL PLAIN Password

省略可。

Kafka ブローカーとの SASL PLAIN 認証のパスワード。

Topic

必須。

インシデントを取得する Kafka トピック。

Consumer Group ID

省略可。

インシデントの取得に使用されるコンシューマー グループの識別子。

値が指定されていない場合は、一意の ID が生成されます。

Partitions

省略可。

メッセージを取得するパーティションの CSV リスト。

Initial Offset

省略可。

コネクタが Kafka パーティションからメッセージの取得を開始する場所。

特定のオフセットから開始する正の整数を指定するか、earliest または latest の値を使用して、パーティションの先頭または末尾から取得を開始できます。

Poll Timeout

省略可。

Kafka からメッセージを消費するポーリング タイムアウト(秒単位)。

Case Name Template

省略可。

カスタムケース名を定義するテンプレート。コネクタは、イベントに custom_case_name キーを追加します。

FIELD_NAME 形式のプレースホルダを使用できます。このプレースホルダには、最初のイベントの文字列値が入力されます。

例: Phishing - EVENT_MAILBOX

Alert Name Template

必須。

アラート名を定義するテンプレート。

FIELD_NAME 形式のプレースホルダを使用できます。このプレースホルダには、最初のイベントの文字列値が入力されます。

例: Phishing - EVENT_MAILBOX

値が指定されていない場合や、テンプレートが無効な場合、コネクタはデフォルトのアラート名を使用します。

Rule Generator Template

必須。

ルール ジェネレータを定義するテンプレート。

FIELD_NAME 形式のプレースホルダを使用できます。このプレースホルダには、最初のイベントの文字列値が入力されます。

例: Phishing - EVENT_MAILBOX

値が指定されていない場合や、テンプレートが無効な場合、コネクタはデフォルトのルール ジェネレータ名を使用します。

Timestamp Field

必須。

Google SecOps アラートのタイムスタンプを含む Kafka メッセージのフィールド名。

タイムスタンプが Unix エポック形式でない場合は、その形式を Timestamp Format パラメータで定義する必要があります。

Timestamp Format

省略可。

メッセージのタイムスタンプの形式。Unix エポック以外のタイムスタンプに必要です。標準の Python strftime 形式コードを使用します。

タイムスタンプが Unix エポック形式ではなく、このパラメータが構成されていない場合、コネクタは失敗します。

Severity Mapping JSON

必須。

コネクタがメッセージから重大度レベルを抽出して Google SecOps の優先度スケールにマッピングするために使用する JSON オブジェクト。

デフォルト値は {"Default": "60"} です。

Unique ID Field

省略可。

一意のメッセージ ID として使用するフィールドの名前。

値が指定されていない場合、コネクタはメッセージ コンテンツの SHA-256 ハッシュを生成して、メッセージ ID として使用します。

Max Messages To Fetch

必須。

コネクタが各イテレーションで処理するメッセージの最大数。

デフォルト値は 100 です。

Disable Overflow

省略可。

選択すると、コネクタは Google SecOps のオーバーフロー メカニズムを無視します。

デフォルトで有効になっています。

Verify SSL

必須。

選択すると、統合は Apache Kafka サーバーに接続するときに SSL 証明書を検証します。

デフォルトで有効になっています。

Proxy Server Address

省略可。

使用するプロキシ サーバーのアドレス。

Proxy Username

省略可。

認証に使用するプロキシのユーザー名。

Proxy Password

省略可。

認証に使用するプロキシ パスワード。

コネクタルール

コネクタはプロキシをサポートしています。

コネクタのアラート

次の表に、Apache Kafka メッセージ フィールドと Google SecOps アラート フィールドのマッピングを示します。

Siemplify アラート フィールド Apache Kafka メッセージ フィールド
SourceSystemName フレームワークによって入力されます。
TicketId 一意の ID フィールドの値またはメッセージの SHA-256 ハッシュ。
DisplayId ApacheKafka_{unique id or hash}_{connector identifier}
Name Alert Name Template によって生成された値。
Reason なし
Description なし
DeviceVendor ハードコード: Apache Kafka
DeviceProduct フォールバック値: Message
Priority Severity Mapping JSON パラメータからマッピングされます。
RuleGenerator Rule Generator Template によって生成された値。
SourceGroupingIdentifier なし
StartTime Timestamp Field から変換されました。
EndTime Timestamp Field から変換されました。
Siemplify Alert - Extensions なし
Siemplify Alert - Attachments なし

コネクタ イベント

コネクタ イベントの例を次に示します。

{
  "notificationConfigName": "organizations/ORGANIZATION_ID/notificationConfigs/soar_connector_CONNECTOR_ID_toxic_notifications_config",
  "finding": {
    "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID",
    "parent": "organizations/ORGANIZATION_ID/sources/SOURCE_ID",
    "resourceName": "//compute.googleapis.com/projects/PROJECT_ID/global/firewalls/FIREWALL_ID",
    "state": "ACTIVE",
    "category": "OPEN_NETBIOS_PORT",
    "externalUri": "https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
    "sourceProperties": {
      "Recommendation": "Restrict the firewall rules at: https://console.cloud.google.com/networking/firewalls/details/default-allow-rdp?project=PROJECT_ID",
      "ExceptionInstructions": "Add the security mark \"allow_open_netbios_port\" to the asset with a value of \"true\" to prevent this finding from being activated again.",
      "Explanation": "Firewall rules that allow connections from all IP addresses on TCP ports 137-139 or UDP ports 137-139 may expose NetBIOS services to attackers.",
      "ScannerName": "FIREWALL_SCANNER",
      "ResourcePath": [
        "projects/PROJECT_ID/",
        "folders/FOLDER_ID_1/",
        "folders/FOLDER_ID_2/",
        "organizations/ORGANIZATION_ID/"
      ],
      "ExposedService": "NetBIOS",
      "OpenPorts": {
        "TCP": [
          137.0,
          138.0,
          139.0
        ],
        "UDP": [
          137.0,
          138.0,
          139.0
        ]
      },
      "compliance_standards": {
        "iso": [
          {
            "ids": [
              "A.13.1.1"
            ]
          }
        ],
        "pci": [
          {
            "ids": [
              "1.2.1"
            ]
          }
        ],
        "nist": [
          {
            "ids": [
              "SC-7"
            ]
          }
        ]
      },
      "ReactivationCount": 4.0
    },
    "securityMarks": {
      "name": "organizations/ORGANIZATION_ID/sources/SOURCE_ID/findings/FINDING_ID/securityMarks",
      "marks": {
        "USER_ID": "SECURITY_MARK"
      }
    },
    "eventTime": "2024-08-30T14:44:37.973090Z",
    "createTime": "2024-06-24T07:08:54.777Z",
    "propertyDataTypes": {
      "ResourcePath": {
        "listValues": {
          "propertyDataTypes": [
            {
              "primitiveDataType": "STRING"
            }
          ]
        }
      },
      "ReactivationCount": {
        "primitiveDataType": "NUMBER"
      },
      "Explanation": {
        "primitiveDataType": "STRING"
      },
      "ExposedService": {
        "primitiveDataType": "STRING"
      },
      "ScannerName": {
        "primitiveDataType": "STRING"
      }
    }
  }
}

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。