Recoger registros de Apache Hadoop
En este documento se explica cómo ingerir registros de Apache Hadoop en Google Security Operations mediante Bindplane. El analizador primero extrae los campos de los registros de Hadoop sin procesar mediante patrones Grok basados en formatos de registro de Hadoop comunes. A continuación, asigna los campos extraídos a los campos correspondientes del esquema del modelo de datos unificado (UDM), realiza conversiones de tipos de datos y enriquece los datos con contexto adicional.
Antes de empezar
Asegúrate de que cumples los siguientes requisitos previos:
- Una instancia de Google SecOps
- Un host Windows 2016 o posterior, o Linux con
systemd - Si se ejecuta a través de un proxy, asegúrese de que los puertos del cortafuegos estén abiertos según los requisitos del agente Bindplane.
- Acceso privilegiado a los archivos de configuración del clúster de Apache Hadoop
Obtener el archivo de autenticación de ingestión de Google SecOps
- Inicia sesión en la consola de Google SecOps.
- Ve a Configuración de SIEM > Agentes de recogida.
- Descarga el archivo de autenticación de ingestión. Guarda el archivo de forma segura en el sistema en el que se instalará Bindplane.
Obtener el ID de cliente de Google SecOps
- Inicia sesión en la consola de Google SecOps.
- Ve a Configuración de SIEM > Perfil.
- Copia y guarda el ID de cliente de la sección Detalles de la organización.
Instalar el agente de Bindplane
Instala el agente Bindplane en tu sistema operativo Windows o Linux siguiendo las instrucciones que se indican a continuación.
Instalación de ventanas
- Abre la petición de comando o PowerShell como administrador.
Ejecuta el siguiente comando:
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Instalación de Linux
- Abre un terminal con privilegios de root o sudo.
Ejecuta el siguiente comando:
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Recursos de instalación adicionales
- Para ver otras opciones de instalación, consulta esta guía de instalación.
Configurar el agente de Bindplane para ingerir Syslog y enviarlo a Google SecOps
Accede al archivo de configuración:
- Busca el archivo
config.yaml. Normalmente, se encuentra en el directorio/etc/bindplane-agent/en Linux o en el directorio de instalación en Windows. - Abre el archivo con un editor de texto (por ejemplo,
nano,vio Bloc de notas).
- Busca el archivo
Edita el archivo
config.yamlde la siguiente manera:receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: <CUSTOMER_ID> endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'HADOOP' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels- Sustituye el puerto y la dirección IP según sea necesario en tu infraestructura.
- Sustituye
<CUSTOMER_ID>por el ID de cliente real. - Actualiza
/path/to/ingestion-authentication-file.jsona la ruta donde se guardó el archivo de autenticación en la sección Obtener el archivo de autenticación de ingestión de Google SecOps.
Reinicia el agente de Bindplane para aplicar los cambios
Para reiniciar el agente de Bindplane en Linux, ejecuta el siguiente comando:
sudo systemctl restart bindplane-agentPara reiniciar el agente de Bindplane en Windows, puedes usar la consola Servicios o introducir el siguiente comando:
net stop BindPlaneAgent && net start BindPlaneAgent
Configurar el reenvío de Syslog en Apache Hadoop
Apache Hadoop usa Log4j para el registro. Configura el appender Syslog adecuado en función de tu versión de Log4j para que los daemons de Hadoop (NameNode, DataNode, ResourceManager, NodeManager, etc.) reenvíen los registros directamente a tu receptor syslog (host de Bindplane). Log4j se configura mediante archivos (no con una interfaz web).
Opción 1: Configuración de Log4j 1.x
- Busca el archivo log4j.properties (normalmente en
$HADOOP_CONF_DIR/log4j.properties). Añade la siguiente configuración de SyslogAppender al archivo:
# Syslog appender (UDP example) log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender log4j.appender.SYSLOG.SyslogHost=<BINDPLANE_HOST_IP>:514 log4j.appender.SYSLOG.Facility=LOCAL0 log4j.appender.SYSLOG.FacilityPrinting=true log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout log4j.appender.SYSLOG.layout.ConversionPattern=%d{ISO8601} level=%p logger=%c thread=%t msg=%m%n # Example: send NameNode logs to syslog log4j.logger.org.apache.hadoop.hdfs.server.namenode=INFO,SYSLOG log4j.additivity.org.apache.hadoop.hdfs.server.namenode=false # Or attach to root logger to send all Hadoop logs # log4j.rootLogger=INFO, SYSLOGSustituye
<BINDPLANE_HOST_IP>por la dirección IP de tu host de Bindplane.Guarda el archivo.
Reinicia los daemons de Hadoop para aplicar los cambios en la configuración.
Opción 2: Configuración de Log4j 2.x
- Busca el archivo log4j2.xml (normalmente en
$HADOOP_CONF_DIR/log4j2.xml). Añade la siguiente configuración de Syslog appender al archivo:
<Configuration status="WARN"> <Appenders> <!-- UDP example; for TCP use protocol="TCP" --> <Syslog name="SYSLOG" format="RFC5424" host="<BINDPLANE_HOST_IP>" port="514" protocol="UDP" facility="LOCAL0" appName="hadoop" enterpriseNumber="18060" mdcId="mdc"> <PatternLayout pattern="%d{ISO8601} level=%p logger=%c thread=%t msg=%m %X%n"/> </Syslog> </Appenders> <Loggers> <!-- Send NameNode logs to syslog --> <Logger name="org.apache.hadoop.hdfs.server.namenode" level="info" additivity="false"> <AppenderRef ref="SYSLOG"/> </Logger> <!-- Or send all Hadoop logs --> <Root level="info"> <AppenderRef ref="SYSLOG"/> </Root> </Loggers> </Configuration>- Sustituye
<BINDPLANE_HOST_IP>por la dirección IP de tu host de Bindplane.
- Sustituye
Guarda el archivo.
Reinicia los daemons de Hadoop para aplicar los cambios en la configuración.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
| permitido | security_result.action | Si es "false", la acción es "BLOCK". Si es "true", la acción es "ALLOW". |
| auth_type | additional.fields.key = "auth_type", additional.fields.value.string_value | Se ha extraído del campo "ugi" mediante el patrón grok "%{DATA:suser}@.*auth:%{WORD:auth_type}". Se quitan los paréntesis y "auth:". |
| llamada | additional.fields.key = "Call#", additional.fields.value.string_value | Asignación directa. |
| call_context | additional.fields.key = "callerContext", additional.fields.value.string_value | Asignación directa. |
| cliIP | principal.ip | Se asigna solo cuando existe el campo "json_data" y se analiza correctamente como JSON. |
| cmd | principal.process.command_line | Asignación directa. |
| cluster_name | target.hostname | Se usa como nombre de host de destino si está presente. |
| día | metadata.event_timestamp.seconds | Se usa con el mes, el año, las horas, los minutos y los segundos para crear event_timestamp. |
| description | metadata.description | Asignación directa. |
| controlador | additional.fields.key = "driver", additional.fields.value.string_value | Asignación directa. |
| dst | target.ip OR target.hostname OR target.file.full_path | Si se analiza correctamente como IP, se asigna a la IP de destino. Si el valor empieza por "/user", se asigna a la ruta del archivo de destino. De lo contrario, se asigna al nombre de host de destino. |
| dstport | target.port | Se asigna y se convierte directamente en un número entero. |
| Enforcer | security_result.rule_name | Asignación directa. |
| event_count | additional.fields.key = "event_count", additional.fields.value.string_value | Se asigna directamente y se convierte en una cadena. |
| fname | src.file.full_path | Asignación directa. |
| horas | metadata.event_timestamp.seconds | Se usa con el mes, el día, el año, los minutos y los segundos para crear event_timestamp. |
| id | additional.fields.key = "id", additional.fields.value.string_value | Asignación directa. |
| ip | principal.ip | Se ha asignado a la IP principal después de quitar el carácter "/" inicial. |
| json_data | Analizado como JSON. Los campos extraídos se asignan a los campos de UDM correspondientes. | |
| logType | additional.fields.key = "logType", additional.fields.value.string_value | Asignación directa. |
| mensaje | Se usa para extraer varios campos mediante patrones grok. | |
| método | network.http.method | Asignación directa. |
| minutos | metadata.event_timestamp.seconds | Se usa con el mes, el día, el año, las horas y los segundos para crear event_timestamp. |
| mes | metadata.event_timestamp.seconds | Se usa con el día, el año, las horas, los minutos y los segundos para crear event_timestamp. |
| observador | observer.hostname OR observer.ip | Si se analiza correctamente como IP, se asigna a la IP del observador. De lo contrario, se asigna al nombre de host del observador. |
| permanente | additional.fields.key = "perm", additional.fields.value.string_value | Asignación directa. |
| política | security_result.rule_id | Se asigna directamente y se convierte en una cadena. |
| producto | metadata.product_name | Asignación directa. |
| product_event | metadata.product_event_type | Asignación directa. Si es "rename", el campo "dst" se asigna a "target.file.full_path". |
| proto | network.application_protocol | Se asigna directamente y se convierte a mayúsculas si no es "webhdfs". |
| reason | security_result.summary | Asignación directa. |
| repo | additional.fields.key = "repo", additional.fields.value.string_value | Asignación directa. |
| resType | additional.fields.key = "resType", additional.fields.value.string_value | Asignación directa. |
| result | additional.fields.key = "result", additional.fields.value.string_value | Se asigna directamente y se convierte en una cadena. |
| Reintentar | additional.fields.key = "Retry#", additional.fields.value.string_value | Asignación directa. |
| segundos | metadata.event_timestamp.seconds | Se usa con el mes, el día, el año, las horas y los minutos para crear event_timestamp. |
| seq_num | additional.fields.key = "seq_num", additional.fields.value.string_value | Se asigna directamente y se convierte en una cadena. |
| gravedad | security_result.severity | Se asigna a diferentes niveles de gravedad en función del valor: "INFO", "Info", "info" -> "INFORMATIONAL"; "Low", "low", "LOW" -> "LOW"; "error", "Error", "WARN", "Warn" -> "MEDIUM"; "High", "high", "HIGH" -> "HIGH"; "Critical", "critical", "CRITICAL" -> "CRITICAL". |
| shost | principal.hostname | Se usa como nombre de host principal si es diferente de "src". |
| src | principal.ip OR principal.hostname OR observer.ip | Si se ha analizado correctamente como IP, se ha asignado a la IP principal y a la de observador. De lo contrario, se asigna al nombre de host principal. |
| srcport | principal.port | Se asigna y se convierte directamente en un número entero. |
| resumen | security_result.summary | Asignación directa. |
| suser | principal.user.userid | Asignación directa. |
| etiquetas | additional.fields.key = "tags", additional.fields.value.string_value | Asignación directa. |
| hilo | additional.fields.key = "thread", additional.fields.value.string_value | Asignación directa. |
| Consejo | target.ip | Asignación directa. |
| ugi | target.hostname | Se usa como nombre de host de destino si el campo "log_data" no contiene "·". |
| url | target.url | Asignación directa. |
| vendor | metadata.vendor_name | Asignación directa. |
| version | metadata.product_version | Asignación directa. |
| año | metadata.event_timestamp.seconds | Se usa con el mes, el día, las horas, los minutos y los segundos para crear event_timestamp. |
| N/A | metadata.event_type | El valor predeterminado es "NETWORK_CONNECTION". Cambia a "STATUS_UPDATE" si no se identifica ningún objetivo. |
| N/A | metadata.log_type | Selecciona "HADOOP". |
| N/A | security_result.alert_state | Se asigna el valor "ALERTING" si la gravedad es "HIGH" o "CRITICAL". |
| N/A | is_alert | Se asigna el valor "true" si la gravedad es "HIGH" o "CRITICAL". |
| N/A | is_significant | Se asigna el valor "true" si la gravedad es "HIGH" o "CRITICAL". |
¿Necesitas más ayuda? Recibe respuestas de los miembros de la comunidad y de los profesionales de Google SecOps.