Coletar registros do Apache Hadoop
Este documento explica como ingerir registros do Apache Hadoop no Google Security Operations usando o Bindplane. Primeiro, o analisador extrai campos dos registros brutos do Hadoop usando padrões Grok com base em formatos comuns de registros do Hadoop. Em seguida, ele mapeia os campos extraídos para os campos correspondentes no esquema do modelo de dados unificado (UDM), realiza conversões de tipo de dados e enriquece os dados com contexto adicional.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Uma instância do Google SecOps
- Um host Windows 2016 ou mais recente ou Linux com
systemd - Se você estiver executando por trás de um proxy, verifique se as portas do firewall estão abertas de acordo com os requisitos do agente Bindplane.
- Acesso privilegiado aos arquivos de configuração do cluster do Apache Hadoop
Receber o arquivo de autenticação de ingestão do Google SecOps
- Faça login no console do Google SecOps.
- Acesse Configurações do SIEM > Agentes de coleta.
- Baixe o arquivo de autenticação de ingestão. Salve o arquivo de forma segura no sistema em que o Bindplane será instalado.
Receber o ID de cliente do Google SecOps
- Faça login no console do Google SecOps.
- Acesse Configurações do SIEM > Perfil.
- Copie e salve o ID do cliente na seção Detalhes da organização.
Instalar o agente do Bindplane
Instale o agente do Bindplane no seu sistema operacional Windows ou Linux de acordo com as instruções a seguir.
Instalação do Windows
- Abra o Prompt de Comando ou o PowerShell como administrador.
Execute este comando:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Instalação do Linux
- Abra um terminal com privilégios de root ou sudo.
Execute este comando:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Outros recursos de instalação
- Para mais opções de instalação, consulte este guia de instalação.
Configurar o agente do Bindplane para ingerir o Syslog e enviar ao Google SecOps
Acesse o arquivo de configuração:
- Localize o arquivo
config.yaml. Normalmente, ele fica no diretório/etc/bindplane-agent/no Linux ou no diretório de instalação no Windows. - Abra o arquivo usando um editor de texto (por exemplo,
nano,viou Bloco de Notas).
- Localize o arquivo
Edite o arquivo
config.yamlda seguinte forma:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Substitua a porta e o endereço IP conforme necessário na sua infraestrutura.
- Substitua
<CUSTOMER_ID>pelo ID do cliente real. - Atualize
/path/to/ingestion-authentication-file.jsonpara o caminho em que o arquivo de autenticação foi salvo na seção Receber arquivo de autenticação de ingestão do Google SecOps.
Reinicie o agente do Bindplane para aplicar as mudanças
Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:
sudo systemctl restart bindplane-agentPara reiniciar o agente do Bindplane no Windows, use o console Serviços ou insira o seguinte comando:
net stop BindPlaneAgent && net start BindPlaneAgent
Configurar o encaminhamento de syslog no Apache Hadoop
O Apache Hadoop usa o Log4j para geração de registros. Configure o appender Syslog adequado com base na sua versão do Log4j para que os daemons do Hadoop (NameNode, DataNode, ResourceManager, NodeManager etc.) encaminhem os registros diretamente para o receptor syslog (host Bindplane). O Log4j é configurado por arquivos, não por UI da Web.
Opção 1: configuração do Log4j 1.x
- Localize o arquivo log4j.properties (normalmente em
$HADOOP_CONF_DIR/log4j.properties). Adicione a seguinte configuração SyslogAppender ao arquivo:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGSubstitua
<BINDPLANE_HOST_IP>pelo endereço IP do host do Bindplane.Salve o arquivo.
Reinicie os daemons do Hadoop para aplicar as mudanças de configuração.
Opção 2: configuração do Log4j 2.x
- Localize o arquivo log4j2.xml (normalmente em
$HADOOP_CONF_DIR/log4j2.xml). Adicione a seguinte configuração de appender Syslog ao arquivo:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Substitua
<BINDPLANE_HOST_IP>pelo endereço IP do host do Bindplane.
- Substitua
Salve o arquivo.
Reinicie os daemons do Hadoop para aplicar as mudanças de configuração.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| permitido | security_result.action | Se for "false", a ação será "BLOCK". Se for "true", a ação será "ALLOW". |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Extraído do campo "ugi" usando o padrão grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Os parênteses e "auth:" são removidos. |
| call | additional.fields.key = "Call#", additional.fields.value.string_value | Mapeado diretamente. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Mapeado diretamente. |
| cliIP | principal.ip | Mapeado somente quando o campo "json_data" existe e é analisado como JSON. |
| cmd | principal.process.command_line | Mapeado diretamente. |
| cluster_name | target.hostname | Usado como nome do host de destino, se presente. |
| dia | metadata.event_timestamp.seconds | Usado com mês, ano, horas, minutos e segundos para criar event_timestamp. |
| descrição | metadata.description | Mapeado diretamente. |
| motorista | additional.fields.key = "driver", additional.fields.value.string_value | Mapeado diretamente. |
| dst | target.ip OR target.hostname OR target.file.full_path | Se for analisado como IP, será mapeado para o IP de destino. Se o valor começar com "/user", será mapeado para o caminho do arquivo de destino. Caso contrário, será mapeado para o nome do host de destino. |
| dstport | target.port | Mapeado e convertido diretamente para número inteiro. |
| aplicador | security_result.rule_name | Mapeado diretamente. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Mapeado diretamente e convertido em string. |
| fname | src.file.full_path | Mapeado diretamente. |
| horas | metadata.event_timestamp.seconds | Usado com mês, dia, ano, minutos e segundos para criar event_timestamp. |
| ID | additional.fields.key = "id", additional.fields.value.string_value | Mapeado diretamente. |
| ip | principal.ip | Mapeado para o IP principal após a remoção de qualquer caractere "/" inicial. |
| json_data | Analisado como JSON. Os campos extraídos são mapeados para os campos correspondentes do UDM. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Mapeado diretamente. |
| mensagem | Usado para extrair vários campos usando padrões grok. | |
| método | network.http.method | Mapeado diretamente. |
| minutos | metadata.event_timestamp.seconds | Usado com mês, dia, ano, horas e segundos para criar event_timestamp. |
| mês | metadata.event_timestamp.seconds | Usado com dia, ano, horas, minutos e segundos para criar "event_timestamp". |
| observador | observer.hostname OR observer.ip | Se for analisado como IP, será mapeado para o IP do observador. Caso contrário, será mapeado para o nome do host do observador. |
| perm | additional.fields.key = "perm", additional.fields.value.string_value | Mapeado diretamente. |
| política | security_result.rule_id | Mapeado diretamente e convertido em string. |
| produto | metadata.product_name | Mapeado diretamente. |
| product_event | metadata.product_event_type | Mapeado diretamente. Se for "rename", o campo "dst" será mapeado para "target.file.full_path". |
| proto | network.application_protocol | Mapeado e convertido diretamente para maiúsculas se não for "webhdfs". |
| reason | security_result.summary | Mapeado diretamente. |
| repo | additional.fields.key = "repo", additional.fields.value.string_value | Mapeado diretamente. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Mapeado diretamente. |
| resultado | additional.fields.key = "result", additional.fields.value.string_value | Mapeado diretamente e convertido em string. |
| Tentar novamente | additional.fields.key = "Retry#", additional.fields.value.string_value | Mapeado diretamente. |
| segundos | metadata.event_timestamp.seconds | Usado com mês, dia, ano, horas e minutos para criar event_timestamp. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Mapeado diretamente e convertido em string. |
| gravidade, | security_result.severity | Mapeado para diferentes níveis de gravidade com base no valor: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL". |
| shost | principal.hostname | Usado como nome de host principal se for diferente de "src". |
| src | principal.ip OR principal.hostname OR observer.ip | Se for analisado como IP, será mapeado para o principal e o IP do observador. Caso contrário, será mapeado para o nome do host principal. |
| srcport | principal.port | Mapeado e convertido diretamente para número inteiro. |
| resumo | security_result.summary | Mapeado diretamente. |
| suser | principal.user.userid | Mapeado diretamente. |
| tags | additional.fields.key = "tags", additional.fields.value.string_value | Mapeado diretamente. |
| conversa | additional.fields.key = "thread", additional.fields.value.string_value | Mapeado diretamente. |
| tip | target.ip | Mapeado diretamente. |
| ugi | target.hostname | Usado como nome do host de destino se o campo "log_data" não contiver "·". |
| url | target.url | Mapeado diretamente. |
| fornecedor | metadata.vendor_name | Mapeado diretamente. |
| version | metadata.product_version | Mapeado diretamente. |
| ano | metadata.event_timestamp.seconds | Usado com mês, dia, horas, minutos e segundos para criar event_timestamp. |
| N/A | metadata.event_type | Definido como "NETWORK_CONNECTION" por padrão. Mudou para "STATUS_UPDATE" se nenhum destino for identificado. |
| N/A | metadata.log_type | Defina como "HADOOP". |
| N/A | security_result.alert_state | Definido como "ALERTING" se a gravidade for "HIGH" ou "CRITICAL". |
| N/A | is_alert | Definido como "true" se a gravidade for "HIGH" ou "CRITICAL". |
| N/A | is_significant | Definido como "true" se a gravidade for "HIGH" ou "CRITICAL". |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.