Recoger registros de Apache Hadoop

Disponible en:

En este documento se explica cómo ingerir registros de Apache Hadoop en Google Security Operations mediante Bindplane. El analizador primero extrae los campos de los registros de Hadoop sin procesar mediante patrones Grok basados en formatos de registro de Hadoop comunes. A continuación, asigna los campos extraídos a los campos correspondientes del esquema del modelo de datos unificado (UDM), realiza conversiones de tipos de datos y enriquece los datos con contexto adicional.

Antes de empezar

Asegúrate de que cumples los siguientes requisitos previos:

  • Una instancia de Google SecOps
  • Un host Windows 2016 o posterior, o Linux con systemd
  • Si se ejecuta a través de un proxy, asegúrese de que los puertos del cortafuegos estén abiertos según los requisitos del agente Bindplane.
  • Acceso privilegiado a los archivos de configuración del clúster de Apache Hadoop

Obtener el archivo de autenticación de ingestión de Google SecOps

  1. Inicia sesión en la consola de Google SecOps.
  2. Ve a Configuración de SIEM > Agentes de recogida.
  3. Descarga el archivo de autenticación de ingestión. Guarda el archivo de forma segura en el sistema en el que se instalará Bindplane.

Obtener el ID de cliente de Google SecOps

  1. Inicia sesión en la consola de Google SecOps.
  2. Ve a Configuración de SIEM > Perfil.
  3. Copia y guarda el ID de cliente de la sección Detalles de la organización.

Instalar el agente de Bindplane

Instala el agente Bindplane en tu sistema operativo Windows o Linux siguiendo las instrucciones que se indican a continuación.

Instalación de ventanas

  1. Abre la petición de comando o PowerShell como administrador.
  2. Ejecuta el siguiente comando:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
    

Instalación de Linux

  1. Abre un terminal con privilegios de root o sudo.
  2. Ejecuta el siguiente comando:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
    

Recursos de instalación adicionales

Configurar el agente de Bindplane para ingerir Syslog y enviarlo a Google SecOps

  1. Accede al archivo de configuración:

    1. Busca el archivo config.yaml. Normalmente, se encuentra en el directorio /etc/bindplane-agent/ en Linux o en el directorio de instalación en Windows.
    2. Abre el archivo con un editor de texto (por ejemplo, nano, vi o Bloc de notas).
  2. Edita el archivo config.yaml de la siguiente manera:

    receivers:
      udplog:
        # Replace the port and IP address as required
        listen_address: "0.0.0.0:514"
    
    exporters:
      chronicle/chronicle_w_labels:
        compression: gzip
        # Adjust the path to the credentials file you downloaded in Step 1
        creds_file_path: '/path/to/ingestion-authentication-file.json'
        # Replace with your actual customer ID from Step 2
        customer_id: <CUSTOMER_ID>
        endpoint: malachiteingestion-pa.googleapis.com
        # Add optional ingestion labels for better organization
        log_type: 'HADOOP'
        raw_log_field: body
        ingestion_labels:
    
    service:
      pipelines:
        logs/source0__chronicle_w_labels-0:
          receivers:
            - udplog
          exporters:
            - chronicle/chronicle_w_labels
    

Reinicia el agente de Bindplane para aplicar los cambios

  • Para reiniciar el agente de Bindplane en Linux, ejecuta el siguiente comando:

    sudo systemctl restart bindplane-agent
    
  • Para reiniciar el agente de Bindplane en Windows, puedes usar la consola Servicios o introducir el siguiente comando:

    net stop BindPlaneAgent && net start BindPlaneAgent
    

Configurar el reenvío de Syslog en Apache Hadoop

Apache Hadoop usa Log4j para el registro. Configura el appender Syslog adecuado en función de tu versión de Log4j para que los daemons de Hadoop (NameNode, DataNode, ResourceManager, NodeManager, etc.) reenvíen los registros directamente a tu receptor syslog (host de Bindplane). Log4j se configura mediante archivos (no con una interfaz web).

Opción 1: Configuración de Log4j 1.x

  1. Busca el archivo log4j.properties (normalmente en $HADOOP_CONF_DIR/log4j.properties).
  2. Añade la siguiente configuración de SyslogAppender al archivo:

    # Syslog appender (UDP example)
    log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
    log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514
    log4j.appender.SYSLOG.Facility=LOCAL0
    log4j.appender.SYSLOG.FacilityPrinting=true
    log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
    log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n
    
    # Example: send NameNode logs to syslog
    log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG
    log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false
    
    # Or attach to root logger to send all Hadoop logs
    # log4j.rootLogger=INFO, SYSLOG
    
  3. Sustituye <BINDPLANE_HOST_IP> por la dirección IP de tu host de Bindplane.

  4. Guarda el archivo.

  5. Reinicia los daemons de Hadoop para aplicar los cambios en la configuración.

Opción 2: Configuración de Log4j 2.x

  1. Busca el archivo log4j2.xml (normalmente en $HADOOP_CONF_DIR/log4j2.xml).
  2. Añade la siguiente configuración de Syslog appender al archivo:

    <Configuration status="WARN">
      <Appenders>
        <!-- UDP example; for TCP use protocol="TCP" -->
        <Syslog name="SYSLOG" format="RFC5424"
                host="<BINDPLANE_HOST_IP>"
                port="514"
                protocol="UDP"
                facility="LOCAL0"
                appName="hadoop"
                enterpriseNumber="18060"
                mdcId="mdc">
          <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/>
        </Syslog>
      </Appenders>
    
      <Loggers>
        <!-- Send NameNode logs to syslog -->
        <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false">
          <AppenderRef ref="SYSLOG"/>
        </Logger>
    
        <!-- Or send all Hadoop logs -->
        <Root level="info">
          <AppenderRef ref="SYSLOG"/>
        </Root>
      </Loggers>
    </Configuration>
    
    • Sustituye <BINDPLANE_HOST_IP> por la dirección IP de tu host de Bindplane.
  3. Guarda el archivo.

  4. Reinicia los daemons de Hadoop para aplicar los cambios en la configuración.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
permitido security_result.action Si es "false", la acción es "BLOCK". Si es "true", la acción es "ALLOW".
auth_type additional.fields.key = "auth_type", additional.fields.value.string_value Se ha extraído del campo "ugi" mediante el patrón grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Se quitan los paréntesis y "auth:".
llamada additional.fields.key = "Call#", additional.fields.value.string_value Asignación directa.
call_context additional.fields.key = "callerContext", additional.fields.value.string_value Asignación directa.
cliIP principal.ip Se asigna solo cuando existe el campo "json_data" y se analiza correctamente como JSON.
cmd principal.process.command_line Asignación directa.
cluster_name target.hostname Se usa como nombre de host de destino si está presente.
día metadata.event_timestamp.seconds Se usa con el mes, el año, las horas, los minutos y los segundos para crear event_timestamp.
description metadata.description Asignación directa.
controlador additional.fields.key = "driver", additional.fields.value.string_value Asignación directa.
dst target.ip OR target.hostname OR target.file.full_path Si se analiza correctamente como IP, se asigna a la IP de destino. Si el valor empieza por "/user", se asigna a la ruta del archivo de destino. De lo contrario, se asigna al nombre de host de destino.
dstport target.port Se asigna y se convierte directamente en un número entero.
Enforcer security_result.rule_name Asignación directa.
event_count additional.fields.key = "event_count", additional.fields.value.string_value Se asigna directamente y se convierte en una cadena.
fname src.file.full_path Asignación directa.
horas metadata.event_timestamp.seconds Se usa con el mes, el día, el año, los minutos y los segundos para crear event_timestamp.
id additional.fields.key = "id", additional.fields.value.string_value Asignación directa.
ip principal.ip Se ha asignado a la IP principal después de quitar el carácter "/" inicial.
json_data Analizado como JSON. Los campos extraídos se asignan a los campos de UDM correspondientes.
logType additional.fields.key = "logType", additional.fields.value.string_value Asignación directa.
mensaje Se usa para extraer varios campos mediante patrones grok.
método network.http.method Asignación directa.
minutos metadata.event_timestamp.seconds Se usa con el mes, el día, el año, las horas y los segundos para crear event_timestamp.
mes metadata.event_timestamp.seconds Se usa con el día, el año, las horas, los minutos y los segundos para crear event_timestamp.
observador observer.hostname OR observer.ip Si se analiza correctamente como IP, se asigna a la IP del observador. De lo contrario, se asigna al nombre de host del observador.
permanente additional.fields.key = "perm", additional.fields.value.string_value Asignación directa.
política security_result.rule_id Se asigna directamente y se convierte en una cadena.
producto metadata.product_name Asignación directa.
product_event metadata.product_event_type Asignación directa. Si es "rename", el campo "dst" se asigna a "target.file.full_path".
proto network.application_protocol Se asigna directamente y se convierte a mayúsculas si no es "webhdfs".
reason security_result.summary Asignación directa.
repo additional.fields.key = "repo", additional.fields.value.string_value Asignación directa.
resType additional.fields.key = "resType", additional.fields.value.string_value Asignación directa.
result additional.fields.key = "result", additional.fields.value.string_value Se asigna directamente y se convierte en una cadena.
Reintentar additional.fields.key = "Retry#", additional.fields.value.string_value Asignación directa.
segundos metadata.event_timestamp.seconds Se usa con el mes, el día, el año, las horas y los minutos para crear event_timestamp.
seq_num additional.fields.key = "seq_num", additional.fields.value.string_value Se asigna directamente y se convierte en una cadena.
gravedad security_result.severity Se asigna a diferentes niveles de gravedad en función del valor: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL".
shost principal.hostname Se usa como nombre de host principal si es diferente de "src".
src principal.ip OR principal.hostname OR observer.ip Si se ha analizado correctamente como IP, se ha asignado a la IP principal y a la de observador. De lo contrario, se asigna al nombre de host principal.
srcport principal.port Se asigna y se convierte directamente en un número entero.
resumen security_result.summary Asignación directa.
suser principal.user.userid Asignación directa.
etiquetas additional.fields.key = "tags", additional.fields.value.string_value Asignación directa.
hilo additional.fields.key = "thread", additional.fields.value.string_value Asignación directa.
Consejo target.ip Asignación directa.
ugi target.hostname Se usa como nombre de host de destino si el campo "log_data" no contiene "·".
url target.url Asignación directa.
vendor metadata.vendor_name Asignación directa.
version metadata.product_version Asignación directa.
año metadata.event_timestamp.seconds Se usa con el mes, el día, las horas, los minutos y los segundos para crear event_timestamp.
N/A metadata.event_type El valor predeterminado es "NETWORK_CONNECTION". Cambia a "STATUS_UPDATE" si no se identifica ningún objetivo.
N/A metadata.log_type Selecciona "HADOOP".
N/A security_result.alert_state Se asigna el valor "ALERTING" si la gravedad es "HIGH" o "CRITICAL".
N/A is_alert Se asigna el valor "true" si la gravedad es "HIGH" o "CRITICAL".
N/A is_significant Se asigna el valor "true" si la gravedad es "HIGH" o "CRITICAL".

¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.