Elasticsearch のログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Elasticsearch ログを Google Security Operations に取り込む方法について説明します。パーサーは、未加工の JSON 形式のログを統合データモデル(UDM)に変換します。ネストされた JSON 構造からフィールドを抽出し、UDM フィールドにマッピングして、重大度レベルやユーザーロールなどのセキュリティ関連のコンテキストでデータを拡充します。

始める前に

  • Google SecOps インスタンス
  • Elasticsearch クラスタ管理への特権アクセス
  • AWS(S3、IAM、EC2)への特権アクセス
  • Logstash を実行する EC2 インスタンスまたは永続ホスト

Elasticsearch の前提条件を取得する

  1. 管理者として Elasticsearch クラスタにログインします。
  2. Elasticsearch サブスクリプションにセキュリティ機能(監査ロギングに必要)が含まれていることを確認します。
  3. 参照用に Elasticsearch クラスタの名前とバージョンをメモします。
  4. 監査ログが書き込まれるパスを特定します(デフォルト: $ES_HOME/logs/<clustername>_audit.json)。

Elasticsearch 監査ロギングを有効にする

  1. 各 Elasticsearch ノードで、elasticsearch.yml 構成ファイルを編集します。
  2. 次の設定を追加します。

    xpack.security.audit.enabled: true
    
  3. クラスタのローリング再起動を実行して、変更を適用します。

    • シャード割り当てを無効にする: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • 各ノードを 1 つずつ停止して再起動します。
    • シャード割り当てを再度有効にする: PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. 監査ログがログ ディレクトリの <clustername>_audit.json に生成されていることを確認します。

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: elastic-search-logs)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションの [権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

監査ログを S3 に送信するように Logstash を構成する

  1. Elasticsearch 監査ログファイルにアクセスできる EC2 インスタンスまたは永続ホストに Logstash をインストールします。
  2. S3 出力プラグインがまだ存在しない場合は、インストールします。

    bin/logstash-plugin install logstash-output-s3
    
  3. Logstash 構成ファイル(elastic-to-s3.conf)を作成します。

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. 構成を使用して Logstash を起動します。

    bin/logstash -f elastic-to-s3.conf
    

省略可: Google SecOps 用の読み取り専用 IAM ユーザーを作成する

  1. AWS コンソール > IAM > ユーザー > ユーザーを追加 に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: 「secops-reader」と入力します。
    • アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]
  6. JSON エディタで、次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. 名前を secops-reader-policy に設定します。

  8. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  9. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  10. CSV をダウンロードします(これらの値はフィードに入力されます)。

Elasticsearch のログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Elasticsearch Logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Elastic Search] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://elastic-search-logs/logs/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
レベル security_result.severity このロジックは「Level」フィールドの値を確認し、対応する UDM の重大度レベルにマッピングします。
- 「INFO」、「ALL」、「OFF」、「TRACE」、「DEBUG」は「INFORMATIONAL」にマッピングされます。
- 「WARN」は「LOW」にマッピングされます。
- 「ERROR」は「ERROR」にマッピングされます。
- 「FATAL」は「CRITICAL」にマッピングされます。
message.@timestamp timestamp タイムスタンプは、未加工ログの「message」フィールド内の「@timestamp」フィールドから、「yyyy-MM-ddTHH:mm:ss.SSS」形式で解析されます。
message.action security_result.action_details 値は、未加工ログの「message」フィールド内の「action」フィールドから取得されます。
message.event.action security_result.summary 値は、未加工ログの「message」フィールド内の「event.action」フィールドから取得されます。
message.event.type metadata.product_event_type 値は、未加工ログの「message」フィールド内の「event.type」フィールドから取得されます。
message.host.ip target.ip 値は、未加工ログの「message」フィールド内の「host.ip」フィールドから取得されます。
message.host.name target.hostname 値は、未加工ログの「message」フィールド内の「host.name」フィールドから取得されます。
message.indices target.labels.value 値は、未加工ログの「message」フィールド内の「indices」フィールドから取得されます。
message.mrId target.hostname 値は、未加工ログの「message」フィールド内の「mrId」フィールドから取得されます。
message.node.id principal.asset.product_object_id 値は、未加工ログの「message」フィールド内の「node.id」フィールドから取得されます。
message.node.name target.asset.hostname 値は、未加工ログの「message」フィールド内の「node.name」フィールドから取得されます。
message.origin.address principal.ip IP アドレスは、未加工ログの「message」フィールド内の「origin.address」フィールドから、ポート番号を削除して抽出されます。
message.origin.type principal.resource.resource_subtype 値は、未加工ログの「message」フィールド内の「origin.type」フィールドから取得されます。
message.properties.host_group principal.hostname 値は、未加工ログの「message」フィールド内の「properties.host_group」フィールドから取得されます。
message.properties.host_group target.group.group_display_name 値は、未加工ログの「message」フィールド内の「properties.host_group」フィールドから取得されます。
message.request.id target.resource.product_object_id 値は、未加工ログの「message」フィールド内の「request.id」フィールドから取得されます。
message.request.name target.resource.name 値は、未加工ログの「message」フィールド内の「request.name」フィールドから取得されます。
message.user.name principal.user.userid 値は、未加工ログの「message」フィールド内の「user.name」フィールドから取得されます。
message.user.realm principal.user.attribute.permissions.name 値は、未加工ログの「message」フィールド内の「user.realm」フィールドから取得されます。
message.user.roles about.user.attribute.roles.name 値は、未加工ログの「message」フィールド内の「user.roles」フィールドから取得されます。
metadata.event_type ハードコードされた値: 「USER_RESOURCE_ACCESS」
metadata.log_type ハードコードされた値: 「ELASTIC_SEARCH」
metadata.product_name ハードコードされた値: 「ELASTICSEARCH」
metadata.vendor_name ハードコードされた値: 「ELASTIC」
principal.port ポート番号は、未加工ログの「message」フィールド内の「origin.address」フィールドから抽出されます。
target.labels.key ハードコードされた値: 「Indice」

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。