Recolha registos do Apache Hadoop
Este documento explica como carregar registos do Apache Hadoop para o Google Security Operations através do Bindplane. O analisador extrai primeiro os campos dos registos Hadoop não processados através de padrões Grok baseados em formatos de registos Hadoop comuns. Em seguida, mapeia os campos extraídos para os campos correspondentes no esquema do modelo de dados unificado (UDM), faz conversões de tipos de dados e enriquece os dados com contexto adicional.
Antes de começar
Certifique-se de que cumpre os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Um anfitrião Windows 2016 ou posterior, ou Linux com
systemd - Se estiver a ser executado através de um proxy, certifique-se de que as portas da firewall estão abertas de acordo com os requisitos do agente Bindplane
- Acesso privilegiado aos ficheiros de configuração do cluster Apache Hadoop
Obtenha o ficheiro de autenticação de carregamento do Google SecOps
- Inicie sessão na consola Google SecOps.
- Aceda a Definições do SIEM > Agentes de recolha.
- Transfira o ficheiro de autenticação de carregamento. Guarde o ficheiro de forma segura no sistema onde o Bindplane vai ser instalado.
Obtenha o ID de cliente do Google SecOps
- Inicie sessão na consola Google SecOps.
- Aceda a Definições do SIEM > Perfil.
- Copie e guarde o ID do cliente da secção Detalhes da organização.
Instale o agente do Bindplane
Instale o agente do Bindplane no seu sistema operativo Windows ou Linux de acordo com as seguintes instruções.
Instalação de janelas
- Abra a Linha de comandos ou o PowerShell como administrador.
Execute o seguinte comando:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Instalação do Linux
- Abra um terminal com privilégios de raiz ou sudo.
Execute o seguinte comando:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Recursos de instalação adicionais
- Para ver opções de instalação adicionais, consulte este guia de instalação.
Configure o agente Bindplane para carregar o Syslog e enviá-lo para o Google SecOps
Aceda ao ficheiro de configuração:
- Localize o ficheiro
config.yaml. Normalmente, encontra-se no diretório/etc/bindplane-agent/no Linux ou no diretório de instalação no Windows. - Abra o ficheiro com um editor de texto (por exemplo,
nano,viou Bloco de notas).
- Localize o ficheiro
Edite o ficheiro
config.yamlda seguinte forma:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Substitua a porta e o endereço IP conforme necessário na sua infraestrutura.
- Substitua
<CUSTOMER_ID>pelo ID de cliente real. - Atualize
/path/to/ingestion-authentication-file.jsonpara o caminho onde o ficheiro de autenticação foi guardado na secção Obtenha o ficheiro de autenticação de carregamento do Google SecOps.
Reinicie o agente do Bindplane para aplicar as alterações
Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:
sudo systemctl restart bindplane-agentPara reiniciar o agente Bindplane no Windows, pode usar a consola Serviços ou introduzir o seguinte comando:
net stop BindPlaneAgent && net start BindPlaneAgent
Configure o encaminhamento de Syslog no Apache Hadoop
O Apache Hadoop usa o Log4j para registo. Configure o Syslog appender adequado com base na sua versão do Log4j para que os daemons do Hadoop (NameNode, DataNode, ResourceManager, NodeManager, etc.) encaminhem os registos diretamente para o seu recetor syslog (anfitrião do Bindplane). O Log4j é configurado através de ficheiros (sem IU Web).
Opção 1: configuração do Log4j 1.x
- Localize o ficheiro log4j.properties (normalmente em
$HADOOP_CONF_DIR/log4j.properties). Adicione a seguinte configuração SyslogAppender ao ficheiro:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGSubstitua
<BINDPLANE_HOST_IP>pelo endereço IP do seu anfitrião do Bindplane.Guarde o ficheiro.
Reinicie os daemons do Hadoop para aplicar as alterações de configuração.
Opção 2: configuração do Log4j 2.x
- Localize o ficheiro log4j2.xml (normalmente em
$HADOOP_CONF_DIR/log4j2.xml). Adicione a seguinte configuração do Syslog appender ao ficheiro:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Substitua
<BINDPLANE_HOST_IP>pelo endereço IP do seu anfitrião do Bindplane.
- Substitua
Guarde o ficheiro.
Reinicie os daemons do Hadoop para aplicar as alterações de configuração.
Tabela de mapeamento da UDM
| Campo de registo | Mapeamento da UDM | Lógica |
|---|---|---|
| permitido | security_result.action | Se for "false", a ação é "BLOCK". Se for "verdadeiro", a ação é "PERMITIR". |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Extraído do campo "ugi" através do padrão grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Os parênteses e "auth:" são removidos. |
| chamada | additional.fields.key = "Call#", additional.fields.value.string_value | Mapeado diretamente. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Mapeado diretamente. |
| cliIP | principal.ip | Mapeado apenas quando o campo "json_data" existe e é analisado com êxito como JSON. |
| cmd | principal.process.command_line | Mapeado diretamente. |
| cluster_name | target.hostname | Usado como nome de anfitrião de destino, se estiver presente. |
| dia | metadata.event_timestamp.seconds | Usado com o mês, o ano, as horas, os minutos e os segundos para criar event_timestamp. |
| descrição | metadata.description | Mapeado diretamente. |
| condutor | additional.fields.key = "driver", additional.fields.value.string_value | Mapeado diretamente. |
| dst | target.ip OR target.hostname OR target.file.full_path | Se for analisado com êxito como IP, é mapeado para o IP de destino. Se o valor começar por "/user", é mapeado para o caminho do ficheiro de destino. Caso contrário, é mapeado para o nome do anfitrião de destino. |
| dstport | target.port | Mapeado diretamente e convertido em número inteiro. |
| agente da autoridade | security_result.rule_name | Mapeado diretamente. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Mapeado diretamente e convertido em string. |
| fname | src.file.full_path | Mapeado diretamente. |
| horas | metadata.event_timestamp.seconds | Usado com o mês, o dia, o ano, os minutos e os segundos para criar event_timestamp. |
| id | additional.fields.key = "id", additional.fields.value.string_value | Mapeado diretamente. |
| ip | principal.ip | Mapeado para o IP principal após a remoção de qualquer caráter "/" inicial. |
| json_data | Analisado como JSON. Os campos extraídos são mapeados para os campos da UDM correspondentes. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Mapeado diretamente. |
| mensagem | Usado para extrair vários campos através de padrões grok. | |
| método | network.http.method | Mapeado diretamente. |
| minutos | metadata.event_timestamp.seconds | Usado com o mês, o dia, o ano, as horas e os segundos para criar event_timestamp. |
| mês | metadata.event_timestamp.seconds | Usado com dia, ano, horas, minutos e segundos para criar event_timestamp. |
| observador | observer.hostname OU observer.ip | Se for analisado com êxito como IP, é mapeado para o IP do observador. Caso contrário, é mapeado para o nome do anfitrião do observador. |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | Mapeado diretamente. |
| política | security_result.rule_id | Mapeado diretamente e convertido em string. |
| produto | metadata.product_name | Mapeado diretamente. |
| product_event | metadata.product_event_type | Mapeado diretamente. Se for "rename", o campo "dst" é mapeado para "target.file.full_path". |
| proto | network.application_protocol | Mapeado diretamente e convertido em letras maiúsculas se não for "webhdfs". |
| motivo | security_result.summary | Mapeado diretamente. |
| repo | additional.fields.key = "repo", additional.fields.value.string_value | Mapeado diretamente. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Mapeado diretamente. |
| result | additional.fields.key = "result", additional.fields.value.string_value | Mapeado diretamente e convertido em string. |
| Tentar novamente | additional.fields.key = "Retry#", additional.fields.value.string_value | Mapeado diretamente. |
| segundos | metadata.event_timestamp.seconds | Usado com o mês, o dia, o ano, as horas e os minutos para criar event_timestamp. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Mapeado diretamente e convertido em string. |
| gravidade | security_result.severity | Mapeado para diferentes níveis de gravidade com base no valor: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL". |
| shost | principal.hostname | Usado como nome do anfitrião principal se for diferente de "src". |
| src | principal.ip OR principal.hostname OR observer.ip | Se for analisado com êxito como IP, é mapeado para o IP principal e o IP do observador. Caso contrário, é mapeado para o nome do anfitrião principal. |
| srcport | principal.port | Mapeado diretamente e convertido em número inteiro. |
| resumo | security_result.summary | Mapeado diretamente. |
| suser | principal.user.userid | Mapeado diretamente. |
| etiquetas | additional.fields.key = "tags", additional.fields.value.string_value | Mapeado diretamente. |
| thread | additional.fields.key = "thread", additional.fields.value.string_value | Mapeado diretamente. |
| dica | target.ip | Mapeado diretamente. |
| ugi | target.hostname | Usado como nome do anfitrião de destino se o campo "log_data" não contiver "·". |
| url | target.url | Mapeado diretamente. |
| fornecedor | metadata.vendor_name | Mapeado diretamente. |
| versão | metadata.product_version | Mapeado diretamente. |
| ano | metadata.event_timestamp.seconds | Usado com o mês, o dia, as horas, os minutos e os segundos para criar event_timestamp. |
| N/A | metadata.event_type | A predefinição é "NETWORK_CONNECTION". Alterado para "STATUS_UPDATE" se não for identificado nenhum destino. |
| N/A | metadata.log_type | Definido como "HADOOP". |
| N/A | security_result.alert_state | Definido como "ALERTING" se a gravidade for "HIGH" ou "CRITICAL". |
| N/A | is_alert | Definido como "true" se a gravidade for "HIGH" ou "CRITICAL". |
| N/A | is_significant | Definido como "true" se a gravidade for "HIGH" ou "CRITICAL". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.