收集 Elasticsearch 記錄

支援的國家/地區:

本文說明如何使用 Amazon S3,將 Elasticsearch 記錄擷取至 Google Security Operations。剖析器會將原始 JSON 格式的記錄轉換為統一資料模型 (UDM)。這項功能會從巢狀 JSON 結構中擷取欄位,將這些欄位對應至 UDM 欄位,並使用嚴重性等級和使用者角色等安全性相關內容,擴充資料。

事前準備

  • Google SecOps 執行個體
  • Elasticsearch 叢集管理員的特殊存取權
  • AWS (S3、IAM、EC2) 的特殊存取權
  • 執行 Logstash 的 EC2 執行個體或永久主機

取得 Elasticsearch 必要條件

  1. 以管理員身分登入 Elasticsearch 叢集
  2. 確認 Elasticsearch 訂閱方案包含安全性功能 (稽核記錄功能需要這項功能)。
  3. 記下 Elasticsearch 叢集名稱和版本,以供參考。
  4. 找出稽核記錄的寫入路徑 (預設為 $ES_HOME/logs/<clustername>_audit.json)。

啟用 Elasticsearch 稽核記錄

  1. 在每個 Elasticsearch 節點上,編輯 elasticsearch.yml 設定檔。
  2. 新增下列設定:

    xpack.security.audit.enabled: true
    
  3. 對叢集執行輪動式重新啟動,以套用變更:

    • 停用分片分配:PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
    • 逐一停止並重新啟動每個節點。
    • 重新啟用分片分配:PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
  4. 確認稽核記錄是否在記錄目錄的 <clustername>_audit.json 中產生。

為 Google SecOps 設定 AWS S3 值區和 IAM

  1. 按照這份使用者指南建立 Amazon S3 bucket建立 bucket
  2. 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如 elastic-search-logs)。
  3. 請按照這份使用者指南建立使用者建立 IAM 使用者
  4. 選取建立的「使用者」
  5. 選取「安全憑證」分頁標籤。
  6. 在「Access Keys」部分中,按一下「Create Access Key」
  7. 選取「第三方服務」做為「用途」
  8. 點選「下一步」
  9. 選用:新增說明標記。
  10. 按一下「建立存取金鑰」
  11. 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後參考。
  12. 按一下 [完成]
  13. 選取「權限」分頁標籤。
  14. 在「權限政策」部分中,按一下「新增權限」
  15. 選取「新增權限」
  16. 選取「直接附加政策」
  17. 搜尋「AmazonS3FullAccess」AmazonS3FullAccess政策。
  18. 選取政策。
  19. 點選「下一步」
  20. 按一下「Add permissions」。

設定 Logstash,將稽核記錄傳送至 S3

  1. 在可存取 Elasticsearch 稽核記錄檔的 EC2 執行個體或持續性主機上,安裝 Logstash
  2. 如果沒有 S3 輸出外掛程式,請安裝:

    bin/logstash-plugin install logstash-output-s3
    
  3. 建立 Logstash 設定檔 (elastic-to-s3.conf):

    input {
      file {
        path => "/path/to/elasticsearch/logs/*_audit.json"
        start_position => "beginning"
        codec => "json"              # audit file: 1 JSON object per line
        sincedb_path => "/var/lib/logstash/sincedb_elastic_search"
        exclude => ["*.gz"]
      }
    }
    
    filter {
      # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects.
      # If you must add metadata for ops, put it under [@metadata] so it won't be written.
      # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" }
    }
    
    output {
      s3 {
        access_key_id => "YOUR_AWS_ACCESS_KEY"
        secret_access_key => "YOUR_AWS_SECRET_KEY"
        region => "us-east-1"
        bucket => "elastic-search-logs"
        prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/"
    
        codec => "json_lines"        # NDJSON output (1 JSON object per line)
        encoding => "gzip"           # compress objects
    
        server_side_encryption => true
        # Optionally for KMS:
        # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID"
    
        size_file => 104857600       # 100MB rotation
        time_file => 300             # 5 min rotation
      }
    }
    
  4. 使用設定檔啟動 Logstash:

    bin/logstash -f elastic-to-s3.conf
    

選用:為 Google SecOps 建立唯讀 IAM 使用者

  1. 依序前往 AWS 管理中心 > IAM >「Users」(使用者) >「Add users」(新增使用者)
  2. 點選 [Add users] (新增使用者)。
  3. 提供下列設定詳細資料:
    • 使用者:輸入 secops-reader
    • 存取類型:選取「存取金鑰 - 程式輔助存取」
  4. 按一下「建立使用者」
  5. 附加最低讀取權限政策 (自訂):依序選取「Users」(使用者) >「secops-reader」>「Permissions」(權限) >「Add permissions」(新增權限) >「Attach policies directly」(直接附加政策) >「Create policy」(建立政策)
  6. 在 JSON 編輯器中輸入下列政策:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::elastic-search-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::elastic-search-logs"
        }
      ]
    }
    
  7. 將名稱設為 secops-reader-policy

  8. 依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」

  9. 依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」

  10. 下載 CSV (這些值會輸入至動態饋給)。

在 Google SecOps 中設定資訊提供,擷取 Elasticsearch 記錄

  1. 依序前往「SIEM 設定」>「動態饋給」
  2. 按一下「+ 新增動態消息」
  3. 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如 Elasticsearch Logs)。
  4. 選取「Amazon S3 V2」做為「來源類型」
  5. 選取「Elastic Search」做為「記錄類型」
  6. 點選「下一步」
  7. 指定下列輸入參數的值:
    • S3 URIs3://elastic-search-logs/logs/
    • 來源刪除選項:根據偏好設定選取刪除選項。
    • 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
    • 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
    • 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
    • 資產命名空間資產命名空間
    • 擷取標籤:套用至這個動態饋給事件的標籤。
  8. 點選「下一步」
  9. 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」

UDM 對應表

記錄欄位 UDM 對應 邏輯
等級 security_result.severity 這項邏輯會檢查「層級」欄位的值,並對應至相應的 UDM 嚴重程度層級:
-「INFO」、「ALL」、「OFF」、「TRACE」、「DEBUG」會對應至「INFORMATIONAL」。
-「WARN」已對應至「LOW」。
-「ERROR」對應至「ERROR」。
-「FATAL」會對應至「CRITICAL」。
message.@timestamp 時間戳記 系統會使用「yyyy-MM-ddTHH:mm:ss.SSS」格式,從原始記錄的「message」欄位中剖析「@timestamp」欄位。
message.action security_result.action_details 值取自原始記錄「message」欄位中的「action」欄位。
message.event.action security_result.summary 值取自原始記錄的「message」欄位中的「event.action」欄位。
message.event.type metadata.product_event_type 值取自原始記錄的「message」欄位中的「event.type」欄位。
message.host.ip target.ip 值取自原始記錄「message」欄位中的「host.ip」欄位。
message.host.name target.hostname 值取自原始記錄「message」欄位中的「host.name」欄位。
message.indices target.labels.value 值取自原始記錄「message」欄位中的「indices」欄位。
message.mrId target.hostname 值取自原始記錄「message」欄位中的「mrId」欄位。
message.node.id principal.asset.product_object_id 值取自原始記錄「message」欄位中的「node.id」欄位。
message.node.name target.asset.hostname 值取自原始記錄的「message」欄位中的「node.name」欄位。
message.origin.address principal.ip 系統會從原始記錄的「message」欄位中,移除通訊埠編號,並從「origin.address」欄位擷取 IP 位址。
message.origin.type principal.resource.resource_subtype 值取自原始記錄「message」欄位中的「origin.type」欄位。
message.properties.host_group principal.hostname 值取自原始記錄「message」欄位中的「properties.host_group」欄位。
message.properties.host_group target.group.group_display_name 值取自原始記錄「message」欄位中的「properties.host_group」欄位。
message.request.id target.resource.product_object_id 值取自原始記錄「message」欄位中的「request.id」欄位。
message.request.name target.resource.name 值取自原始記錄「message」欄位中的「request.name」欄位。
message.user.name principal.user.userid 值取自原始記錄「message」欄位中的「user.name」欄位。
message.user.realm principal.user.attribute.permissions.name 值取自原始記錄「message」欄位中的「user.realm」欄位。
message.user.roles about.user.attribute.roles.name 值取自原始記錄「message」欄位中的「user.roles」欄位。
metadata.event_type 硬式編碼值:「USER_RESOURCE_ACCESS」
metadata.log_type 硬式編碼值:「ELASTIC_SEARCH」
metadata.product_name 硬式編碼值:「ELASTICSEARCH」
metadata.vendor_name 硬式編碼值:「ELASTIC」
principal.port 通訊埠號碼是從原始記錄的「message」欄位中「origin.address」欄位擷取。
target.labels.key 硬式編碼值:「Indice」

還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。