Coletar registros do Apache Hadoop

Compatível com:

Este documento explica como ingerir registros do Apache Hadoop no Google Security Operations usando o Bindplane. Primeiro, o analisador extrai campos dos registros brutos do Hadoop usando padrões Grok com base em formatos comuns de registros do Hadoop. Em seguida, ele mapeia os campos extraídos para os campos correspondentes no esquema do modelo de dados unificado (UDM), realiza conversões de tipo de dados e enriquece os dados com contexto adicional.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps
  • Um host Windows 2016 ou mais recente ou Linux com systemd
  • Se você estiver executando por trás de um proxy, verifique se as portas do firewall estão abertas de acordo com os requisitos do agente Bindplane.
  • Acesso privilegiado aos arquivos de configuração do cluster do Apache Hadoop

Receber o arquivo de autenticação de ingestão do Google SecOps

  1. Faça login no console do Google SecOps.
  2. Acesse Configurações do SIEM > Agentes de coleta.
  3. Baixe o arquivo de autenticação de ingestão. Salve o arquivo de forma segura no sistema em que o Bindplane será instalado.

Receber o ID de cliente do Google SecOps

  1. Faça login no console do Google SecOps.
  2. Acesse Configurações do SIEM > Perfil.
  3. Copie e salve o ID do cliente na seção Detalhes da organização.

Instalar o agente do Bindplane

Instale o agente do Bindplane no seu sistema operacional Windows ou Linux de acordo com as instruções a seguir.

Instalação do Windows

  1. Abra o Prompt de Comando ou o PowerShell como administrador.
  2. Execute este comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Instalação do Linux

  1. Abra um terminal com privilégios de root ou sudo.
  2. Execute este comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Outros recursos de instalação

Configurar o agente do Bindplane para ingerir o Syslog e enviar ao Google SecOps

  1. Acesse o arquivo de configuração:

    1. Localize o arquivo config.yaml. Normalmente, ele fica no diretório /etc/bindplane-agent/ no Linux ou no diretório de instalação no Windows.
    2. Abra o arquivo usando um editor de texto (por exemplo, nano, vi ou Bloco de Notas).
  2. Edite o arquivo config.yaml da seguinte forma:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    
    • Substitua a porta e o endereço IP conforme necessário na sua infraestrutura.
    • Substitua <CUSTOMER_ID> pelo ID do cliente real.
    • Atualize /path/to/ingestion-authentication-file.json para o caminho em que o arquivo de autenticação foi salvo na seção Receber arquivo de autenticação de ingestão do Google SecOps.

Reinicie o agente do Bindplane para aplicar as mudanças

  • Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:

    sudo systemctl restart bindplane-agent
    
  • Para reiniciar o agente do Bindplane no Windows, use o console Serviços ou insira o seguinte comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configurar o encaminhamento de syslog no Apache Hadoop

O Apache Hadoop usa o Log4j para geração de registros. Configure o appender Syslog adequado com base na sua versão do Log4j para que os daemons do Hadoop (NameNode, DataNode, ResourceManager, NodeManager etc.) encaminhem os registros diretamente para o receptor syslog (host Bindplane). O Log4j é configurado por arquivos, não por UI da Web.

Opção 1: configuração do Log4j 1.x

  1. Localize o arquivo log4j.properties (normalmente em $HADOOP_CONF_DIR/log4j.properties).
  2. Adicione a seguinte configuração SyslogAppender ao arquivo:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Substitua <BINDPLANE_HOST_IP> pelo endereço IP do host do Bindplane.

  4. Salve o arquivo.

  5. Reinicie os daemons do Hadoop para aplicar as mudanças de configuração.

Opção 2: configuração do Log4j 2.x

  1. Localize o arquivo log4j2.xml (normalmente em $HADOOP_CONF_DIR/log4j2.xml).
  2. Adicione a seguinte configuração de appender Syslog ao arquivo:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Substitua <BINDPLANE_HOST_IP> pelo endereço IP do host do Bindplane.
  3. Salve o arquivo.

  4. Reinicie os daemons do Hadoop para aplicar as mudanças de configuração.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
permitido security_result.action Se for "false", a ação será "BLOCK". Se for "true", a ação será "ALLOW".
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Extraído do campo "ugi" usando o padrão grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Os parênteses e "auth:" são removidos.
call additional.fields.key = "Call#", additional.fields.value.string_value Mapeado diretamente.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Mapeado diretamente.
cliIP principal.ip Mapeado somente quando o campo "json_data" existe e é analisado como JSON.
cmd principal.process.command_line Mapeado diretamente.
cluster_name target.hostname Usado como nome do host de destino, se presente.
dia metadata.event_timestamp.seconds Usado com mês, ano, horas, minutos e segundos para criar event_timestamp.
descrição metadata.description Mapeado diretamente.
motorista additional.fields.key = "driver", additional.fields.value.string_value Mapeado diretamente.
dst target.ip OR target.hostname OR target.file.full_path Se for analisado como IP, será mapeado para o IP de destino. Se o valor começar com "/user", será mapeado para o caminho do arquivo de destino. Caso contrário, será mapeado para o nome do host de destino.
dstport target.port Mapeado e convertido diretamente para número inteiro.
aplicador security_result.rule_name Mapeado diretamente.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Mapeado diretamente e convertido em string.
fname src.file.full_path Mapeado diretamente.
horas metadata.event_timestamp.seconds Usado com mês, dia, ano, minutos e segundos para criar event_timestamp.
ID additional.fields.key = "id", additional.fields.value.string_value Mapeado diretamente.
ip principal.ip Mapeado para o IP principal após a remoção de qualquer caractere "/" inicial.
json_data Analisado como JSON. Os campos extraídos são mapeados para os campos correspondentes do UDM.
logType additional.fields.key = "logType", additional.fields.value.string_value Mapeado diretamente.
mensagem Usado para extrair vários campos usando padrões grok.
método network.http.method Mapeado diretamente.
minutos metadata.event_timestamp.seconds Usado com mês, dia, ano, horas e segundos para criar event_timestamp.
mês metadata.event_timestamp.seconds Usado com dia, ano, horas, minutos e segundos para criar "event_timestamp".
observador observer.hostname OR observer.ip Se for analisado como IP, será mapeado para o IP do observador. Caso contrário, será mapeado para o nome do host do observador.
perm additional.fields.key = "perm", additional.fields.value.string_value Mapeado diretamente.
política security_result.rule_id Mapeado diretamente e convertido em string.
produto metadata.product_name Mapeado diretamente.
product_event metadata.product_event_type Mapeado diretamente. Se for "rename", o campo "dst" será mapeado para "target.file.full_path".
proto network.application_protocol Mapeado e convertido diretamente para maiúsculas se não for "webhdfs".
reason security_result.summary Mapeado diretamente.
repo additional.fields.key = "repo", additional.fields.value.string_value Mapeado diretamente.
resType additional.fields.key = "resType", additional.fields.value.string_value Mapeado diretamente.
resultado additional.fields.key = "result", additional.fields.value.string_value Mapeado diretamente e convertido em string.
Tentar novamente additional.fields.key = "Retry#", additional.fields.value.string_value Mapeado diretamente.
segundos metadata.event_timestamp.seconds Usado com mês, dia, ano, horas e minutos para criar event_timestamp.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Mapeado diretamente e convertido em string.
gravidade, security_result.severity Mapeado para diferentes níveis de gravidade com base no valor: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL".
shost principal.hostname Usado como nome de host principal se for diferente de "src".
src principal.ip OR principal.hostname OR observer.ip Se for analisado como IP, será mapeado para o principal e o IP do observador. Caso contrário, será mapeado para o nome do host principal.
srcport principal.port Mapeado e convertido diretamente para número inteiro.
resumo security_result.summary Mapeado diretamente.
suser principal.user.userid Mapeado diretamente.
tags additional.fields.key = "tags", additional.fields.value.string_value Mapeado diretamente.
conversa additional.fields.key = "thread", additional.fields.value.string_value Mapeado diretamente.
tip target.ip Mapeado diretamente.
ugi target.hostname Usado como nome do host de destino se o campo "log_data" não contiver "·".
url target.url Mapeado diretamente.
fornecedor metadata.vendor_name Mapeado diretamente.
version metadata.product_version Mapeado diretamente.
ano metadata.event_timestamp.seconds Usado com mês, dia, horas, minutos e segundos para criar event_timestamp.
N/A metadata.event_type Definido como "NETWORK_CONNECTION" por padrão. Mudou para "STATUS_UPDATE" se nenhum destino for identificado.
N/A metadata.log_type Defina como "HADOOP".
N/A security_result.alert_state Definido como "ALERTING" se a gravidade for "HIGH" ou "CRITICAL".
N/A is_alert Definido como "true" se a gravidade for "HIGH" ou "CRITICAL".
N/A is_significant Definido como "true" se a gravidade for "HIGH" ou "CRITICAL".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.