Recolha registos do Apache Hadoop

Suportado em:

Este documento explica como carregar registos do Apache Hadoop para o Google Security Operations através do Bindplane. O analisador extrai primeiro os campos dos registos Hadoop não processados através de padrões Grok baseados em formatos de registos Hadoop comuns. Em seguida, mapeia os campos extraídos para os campos correspondentes no esquema do modelo de dados unificado (UDM), faz conversões de tipos de dados e enriquece os dados com contexto adicional.

Antes de começar

Certifique-se de que cumpre os seguintes pré-requisitos:

  • Uma instância do Google SecOps
  • Um anfitrião Windows 2016 ou posterior, ou Linux com systemd
  • Se estiver a ser executado através de um proxy, certifique-se de que as portas da firewall estão abertas de acordo com os requisitos do agente Bindplane
  • Acesso privilegiado aos ficheiros de configuração do cluster Apache Hadoop

Obtenha o ficheiro de autenticação de carregamento do Google SecOps

  1. Inicie sessão na consola Google SecOps.
  2. Aceda a Definições do SIEM > Agentes de recolha.
  3. Transfira o ficheiro de autenticação de carregamento. Guarde o ficheiro de forma segura no sistema onde o Bindplane vai ser instalado.

Obtenha o ID de cliente do Google SecOps

  1. Inicie sessão na consola Google SecOps.
  2. Aceda a Definições do SIEM > Perfil.
  3. Copie e guarde o ID do cliente da secção Detalhes da organização.

Instale o agente do Bindplane

Instale o agente do Bindplane no seu sistema operativo Windows ou Linux de acordo com as seguintes instruções.

Instalação de janelas

  1. Abra a Linha de comandos ou o PowerShell como administrador.
  2. Execute o seguinte comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Instalação do Linux

  1. Abra um terminal com privilégios de raiz ou sudo.
  2. Execute o seguinte comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Recursos de instalação adicionais

Configure o agente Bindplane para carregar o Syslog e enviá-lo para o Google SecOps

  1. Aceda ao ficheiro de configuração:

    1. Localize o ficheiro config.yaml. Normalmente, encontra-se no diretório /etc/bindplane-agent/ no Linux ou no diretório de instalação no Windows.
    2. Abra o ficheiro com um editor de texto (por exemplo, nano, vi ou Bloco de notas).
  2. Edite o ficheiro config.yaml da seguinte forma:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    

Reinicie o agente do Bindplane para aplicar as alterações

  • Para reiniciar o agente do Bindplane no Linux, execute o seguinte comando:

    sudo systemctl restart bindplane-agent
    
  • Para reiniciar o agente Bindplane no Windows, pode usar a consola Serviços ou introduzir o seguinte comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configure o encaminhamento de Syslog no Apache Hadoop

O Apache Hadoop usa o Log4j para registo. Configure o Syslog appender adequado com base na sua versão do Log4j para que os daemons do Hadoop (NameNode, DataNode, ResourceManager, NodeManager, etc.) encaminhem os registos diretamente para o seu recetor syslog (anfitrião do Bindplane). O Log4j é configurado através de ficheiros (sem IU Web).

Opção 1: configuração do Log4j 1.x

  1. Localize o ficheiro log4j.properties (normalmente em $HADOOP_CONF_DIR/log4j.properties).
  2. Adicione a seguinte configuração SyslogAppender ao ficheiro:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Substitua <BINDPLANE_HOST_IP> pelo endereço IP do seu anfitrião do Bindplane.

  4. Guarde o ficheiro.

  5. Reinicie os daemons do Hadoop para aplicar as alterações de configuração.

Opção 2: configuração do Log4j 2.x

  1. Localize o ficheiro log4j2.xml (normalmente em $HADOOP_CONF_DIR/log4j2.xml).
  2. Adicione a seguinte configuração do Syslog appender ao ficheiro:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Substitua <BINDPLANE_HOST_IP> pelo endereço IP do seu anfitrião do Bindplane.
  3. Guarde o ficheiro.

  4. Reinicie os daemons do Hadoop para aplicar as alterações de configuração.

Tabela de mapeamento da UDM

Campo de registo Mapeamento da UDM Lógica
permitido security_result.action Se for "false", a ação é "BLOCK". Se for "verdadeiro", a ação é "PERMITIR".
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Extraído do campo "ugi" através do padrão grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Os parênteses e "auth:" são removidos.
chamada additional.fields.key = "Call#", additional.fields.value.string_value Mapeado diretamente.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Mapeado diretamente.
cliIP principal.ip Mapeado apenas quando o campo "json_data" existe e é analisado com êxito como JSON.
cmd principal.process.command_line Mapeado diretamente.
cluster_name target.hostname Usado como nome de anfitrião de destino, se estiver presente.
dia metadata.event_timestamp.seconds Usado com o mês, o ano, as horas, os minutos e os segundos para criar event_timestamp.
descrição metadata.description Mapeado diretamente.
condutor additional.fields.key = "driver", additional.fields.value.string_value Mapeado diretamente.
dst target.ip OR target.hostname OR target.file.full_path Se for analisado com êxito como IP, é mapeado para o IP de destino. Se o valor começar por "/user", é mapeado para o caminho do ficheiro de destino. Caso contrário, é mapeado para o nome do anfitrião de destino.
dstport target.port Mapeado diretamente e convertido em número inteiro.
agente da autoridade security_result.rule_name Mapeado diretamente.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Mapeado diretamente e convertido em string.
fname src.file.full_path Mapeado diretamente.
horas metadata.event_timestamp.seconds Usado com o mês, o dia, o ano, os minutos e os segundos para criar event_timestamp.
id additional.fields.key = "id", additional.fields.value.string_value Mapeado diretamente.
ip principal.ip Mapeado para o IP principal após a remoção de qualquer caráter "/" inicial.
json_data Analisado como JSON. Os campos extraídos são mapeados para os campos da UDM correspondentes.
logType additional.fields.key = "logType", additional.fields.value.string_value Mapeado diretamente.
mensagem Usado para extrair vários campos através de padrões grok.
método network.http.method Mapeado diretamente.
minutos metadata.event_timestamp.seconds Usado com o mês, o dia, o ano, as horas e os segundos para criar event_timestamp.
mês metadata.event_timestamp.seconds Usado com dia, ano, horas, minutos e segundos para criar event_timestamp.
observador observer.hostname OU observer.ip Se for analisado com êxito como IP, é mapeado para o IP do observador. Caso contrário, é mapeado para o nome do anfitrião do observador.
perm additional.fields.key = "perm", additional.fields.value.string_value Mapeado diretamente.
política security_result.rule_id Mapeado diretamente e convertido em string.
produto metadata.product_name Mapeado diretamente.
product_event metadata.product_event_type Mapeado diretamente. Se for "rename", o campo "dst" é mapeado para "target.file.full_path".
proto network.application_protocol Mapeado diretamente e convertido em letras maiúsculas se não for "webhdfs".
motivo security_result.summary Mapeado diretamente.
repo additional.fields.key = "repo", additional.fields.value.string_value Mapeado diretamente.
resType additional.fields.key = "resType", additional.fields.value.string_value Mapeado diretamente.
result additional.fields.key = "result", additional.fields.value.string_value Mapeado diretamente e convertido em string.
Tentar novamente additional.fields.key = "Retry#", additional.fields.value.string_value Mapeado diretamente.
segundos metadata.event_timestamp.seconds Usado com o mês, o dia, o ano, as horas e os minutos para criar event_timestamp.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Mapeado diretamente e convertido em string.
gravidade security_result.severity Mapeado para diferentes níveis de gravidade com base no valor: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL".
shost principal.hostname Usado como nome do anfitrião principal se for diferente de "src".
src principal.ip OR principal.hostname OR observer.ip Se for analisado com êxito como IP, é mapeado para o IP principal e o IP do observador. Caso contrário, é mapeado para o nome do anfitrião principal.
srcport principal.port Mapeado diretamente e convertido em número inteiro.
resumo security_result.summary Mapeado diretamente.
suser principal.user.userid Mapeado diretamente.
etiquetas additional.fields.key = "tags", additional.fields.value.string_value Mapeado diretamente.
thread additional.fields.key = "thread", additional.fields.value.string_value Mapeado diretamente.
dica target.ip Mapeado diretamente.
ugi target.hostname Usado como nome do anfitrião de destino se o campo "log_data" não contiver "·".
url target.url Mapeado diretamente.
fornecedor metadata.vendor_name Mapeado diretamente.
versão metadata.product_version Mapeado diretamente.
ano metadata.event_timestamp.seconds Usado com o mês, o dia, as horas, os minutos e os segundos para criar event_timestamp.
N/A metadata.event_type A predefinição é "NETWORK_CONNECTION". Alterado para "STATUS_UPDATE" se não for identificado nenhum destino.
N/A metadata.log_type Definido como "HADOOP".
N/A security_result.alert_state Definido como "ALERTING" se a gravidade for "HIGH" ou "CRITICAL".
N/A is_alert Definido como "true" se a gravidade for "HIGH" ou "CRITICAL".
N/A is_significant Definido como "true" se a gravidade for "HIGH" ou "CRITICAL".

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.