Raccogliere i log di Elasticsearch
Questo documento spiega come importare i log di Elasticsearch in Google Security Operations utilizzando Amazon S3. Il parser trasforma i log non elaborati in formato JSON in un modello unificato dei dati (UDM). Estrae i campi dalle strutture JSON nidificate, li mappa ai campi UDM e arricchisce i dati con il contesto pertinente per la sicurezza, come i livelli di gravità e i ruoli utente.
Prima di iniziare
- Un'istanza Google SecOps
- Accesso con privilegi all'amministrazione del cluster Elasticsearch
- Accesso privilegiato ad AWS (S3, IAM, EC2)
- Istanza EC2 o host permanente per eseguire Logstash
Recuperare i prerequisiti di Elasticsearch
- Accedi al tuo cluster Elasticsearch come amministratore.
- Verifica che il tuo abbonamento a Elasticsearch includa le funzionalità di sicurezza (necessarie per la registrazione degli audit).
- Prendi nota del nome e della versione del cluster Elasticsearch per riferimento.
- Identifica il percorso in cui verranno scritti i log di controllo (impostazione predefinita:
$ES_HOME/logs/<clustername>_audit.json
).
Abilita l'audit logging di Elasticsearch
- Su ogni nodo Elasticsearch, modifica il file di configurazione elasticsearch.yml.
Aggiungi la seguente impostazione:
xpack.security.audit.enabled: true
Esegui un riavvio in sequenza del cluster per applicare le modifiche:
- Disattiva l'allocazione degli shard:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}}
- Arresta e riavvia ogni nodo uno alla volta.
- Riattiva l'allocazione degli shard:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Disattiva l'allocazione degli shard:
Verifica che gli audit log vengano generati in
<clustername>_audit.json
nella directory dei log.
Configura il bucket AWS S3 e IAM per Google SecOps
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per riferimento futuro (ad esempio,
elastic-search-logs
). - Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca i criteri AmazonS3FullAccess.
- Seleziona la policy.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configura Logstash per inviare gli audit log a S3
- Installa Logstash su un'istanza EC2 o su un host permanente che possa accedere ai file di audit log di Elasticsearch.
Installa il plug-in di output S3 se non è già presente:
bin/logstash-plugin install logstash-output-s3
Crea un file di configurazione Logstash (
elastic-to-s3.conf
):input { file { path => "/path/to/elasticsearch/logs/*_audit.json" start_position => "beginning" codec => "json" # audit file: 1 JSON object per line sincedb_path => "/var/lib/logstash/sincedb_elastic_search" exclude => ["*.gz"] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id => "YOUR_AWS_ACCESS_KEY" secret_access_key => "YOUR_AWS_SECRET_KEY" region => "us-east-1" bucket => "elastic-search-logs" prefix => "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec => "json_lines" # NDJSON output (1 JSON object per line) encoding => "gzip" # compress objects server_side_encryption => true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file => 104857600 # 100MB rotation time_file => 300 # 5 min rotation } }
Avvia Logstash con la configurazione:
bin/logstash -f elastic-to-s3.conf
(Facoltativo) Crea un utente IAM con autorizzazione di sola lettura per Google SecOps
- Vai alla console AWS > IAM > Utenti > Aggiungi utenti.
- Fai clic su Add users (Aggiungi utenti).
- Fornisci i seguenti dettagli di configurazione:
- Utente: inserisci
secops-reader
. - Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
- Utente: inserisci
- Fai clic su Crea utente.
- Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
Nell'editor JSON, inserisci la seguente policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::elastic-search-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::elastic-search-logs" } ] }
Imposta il nome su
secops-reader-policy
.Vai a Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.
Vai a Credenziali di sicurezza > Chiavi di accesso > Crea chiave di accesso.
Scarica il file CSV (questi valori vengono inseriti nel feed).
Configurare un feed in Google SecOps per importare i log Elasticsearch
- Vai a Impostazioni SIEM > Feed.
- Fai clic su + Aggiungi nuovo feed.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio,
Elasticsearch Logs
). - Seleziona Amazon S3 V2 come Tipo di origine.
- Seleziona Elastic Search come Tipo di log.
- Fai clic su Avanti.
- Specifica i valori per i seguenti parametri di input:
- URI S3:
s3://elastic-search-logs/logs/
- Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
- Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
- ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
- Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
- Spazio dei nomi dell'asset: lo spazio dei nomi dell'asset.
- Etichette di importazione: l'etichetta applicata agli eventi di questo feed.
- URI S3:
- Fai clic su Avanti.
- Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
Livello | security_result.severity | La logica controlla il valore del campo "Level" e lo mappa al livello di gravità UDM corrispondente: - "INFO", "ALL", "OFF", "TRACE", "DEBUG" vengono mappati a "INFORMATIONAL". - "WARN" è mappato su "LOW". - "ERROR" è mappato su "ERROR". - "FATAL" è mappato su "CRITICAL". |
message.@timestamp | timestamp | Il timestamp viene analizzato dal campo "@timestamp" all'interno del campo "message" del log non elaborato, utilizzando il formato "yyyy-MM-ddTHH:mm:ss.SSS". |
message.action | security_result.action_details | Il valore viene estratto dal campo "action" all'interno del campo "message" del log non elaborato. |
message.event.action | security_result.summary | Il valore viene estratto dal campo "event.action" all'interno del campo "message" del log non elaborato. |
message.event.type | metadata.product_event_type | Il valore viene estratto dal campo "event.type" all'interno del campo "message" del log non elaborato. |
message.host.ip | target.ip | Il valore viene estratto dal campo "host.ip" all'interno del campo "message" del log non elaborato. |
message.host.name | target.hostname | Il valore viene estratto dal campo "host.name" all'interno del campo "message" del log non elaborato. |
message.indices | target.labels.value | Il valore viene estratto dal campo "indices" all'interno del campo "message" del log non elaborato. |
message.mrId | target.hostname | Il valore viene estratto dal campo "mrId" all'interno del campo "message" del log non elaborato. |
message.node.id | principal.asset.product_object_id | Il valore viene estratto dal campo "node.id" all'interno del campo "message" del log non elaborato. |
message.node.name | target.asset.hostname | Il valore viene estratto dal campo "node.name" all'interno del campo "message" del log non elaborato. |
message.origin.address | principal.ip | L'indirizzo IP viene estratto dal campo "origin.address" all'interno del campo "message" del log non elaborato rimuovendo il numero di porta. |
message.origin.type | principal.resource.resource_subtype | Il valore viene estratto dal campo "origin.type" all'interno del campo "message" del log non elaborato. |
message.properties.host_group | principal.hostname | Il valore viene estratto dal campo "properties.host_group" all'interno del campo "message" del log non elaborato. |
message.properties.host_group | target.group.group_display_name | Il valore viene estratto dal campo "properties.host_group" all'interno del campo "message" del log non elaborato. |
message.request.id | target.resource.product_object_id | Il valore viene estratto dal campo "request.id" all'interno del campo "message" del log non elaborato. |
message.request.name | target.resource.name | Il valore viene estratto dal campo "request.name" all'interno del campo "message" del log non elaborato. |
message.user.name | principal.user.userid | Il valore viene estratto dal campo "user.name" all'interno del campo "message" del log non elaborato. |
message.user.realm | principal.user.attribute.permissions.name | Il valore viene estratto dal campo "user.realm" all'interno del campo "message" del log non elaborato. |
message.user.roles | about.user.attribute.roles.name | Il valore viene estratto dal campo "user.roles" all'interno del campo "message" del log non elaborato. |
metadata.event_type | Valore hardcoded: "USER_RESOURCE_ACCESS" | |
metadata.log_type | Valore hardcoded: "ELASTIC_SEARCH" | |
metadata.product_name | Valore hardcoded: "ELASTICSEARCH" | |
metadata.vendor_name | Valore hardcoded: "ELASTIC" | |
principal.port | Il numero di porta viene estratto dal campo "origin.address" all'interno del campo "message" del log non elaborato. | |
target.labels.key | Valore hardcoded: "Indice" |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.