Recopila registros de WP Engine
En este documento, se explica cómo transferir registros de WP Engine a Google Security Operations con Google Cloud Storage V2.
WP Engine es una plataforma de hosting administrado de WordPress que proporciona hosting de nivel empresarial con seguridad integrada, optimización del rendimiento y servicios de CDN. Genera registros de acceso, registros de errores y registros de eventos de CDN que se pueden recopilar a través de la API de WP Engine.
Antes de comenzar
Asegúrate de cumplir con los siguientes requisitos previos:
- Una instancia de Google SecOps
- Un proyecto de GCP con la API de Cloud Storage habilitada
- Permisos para crear y administrar buckets de GCS
- Permisos para administrar políticas de IAM en buckets de GCS
- Permisos para crear servicios de Cloud Run, temas de Pub/Sub y trabajos de Cloud Scheduler
- Acceso con privilegios al portal de usuarios de WP Engine con permisos de acceso a la API
- Una cuenta de WP Engine con acceso a la API habilitado
Crea un bucket de Google Cloud Storage
- Ve a Google Cloud Console.
- Selecciona tu proyecto o crea uno nuevo.
- En el menú de navegación, ve a Cloud Storage > Buckets.
- Haz clic en Crear bucket.
Proporciona los siguientes detalles de configuración:
Parámetro de configuración Valor Asigna un nombre a tu bucket Ingresa un nombre global único (por ejemplo, wpengine-logs).Tipo de ubicación Elige según tus necesidades (región, birregional, multirregional) Ubicación Selecciona la ubicación (por ejemplo, us-central1).Clase de almacenamiento Estándar (recomendado para los registros a los que se accede con frecuencia) Control de acceso Uniforme (recomendado) Herramientas de protección Opcional: Habilita el control de versiones de objetos o la política de retención Haz clic en Crear.
Recopila las credenciales de la API de WP Engine
Genera credenciales de API
- Accede al Portal de usuarios de WP Engine.
- Haz clic en el nombre de tu perfil y, luego, ve a Perfil > Acceso a la API.
- Haz clic en Generar credenciales.
Copia y guarda los siguientes detalles en una ubicación segura:
- Nombre de usuario de la API: Es el nombre de usuario de la API generado.
- Contraseña de la API: Es la contraseña de la API generada (se muestra solo una vez).
Obtener el nombre de la instalación
- Accede al Portal de usuarios de WP Engine.
- En el menú de navegación, ve a Sitios.
- Haz clic en el sitio del que deseas recopilar registros.
- Ten en cuenta el Nombre de instalación que se muestra en la página de descripción general del sitio. Cada entorno (producción, etapa de pruebas y desarrollo) tiene un nombre de instalación independiente.
Prueba el acceso a la API
Prueba tus credenciales antes de continuar con la integración:
# Replace with your actual credentials WPE_USER="your-api-username" WPE_PASSWORD="your-api-password" # Test API access - list installs curl -v -u "${WPE_USER}:${WPE_PASSWORD}" "https://api.wpengineapi.com/v1/installs"
Crea una cuenta de servicio para la función de Cloud Run
La función de Cloud Run necesita una cuenta de servicio con permisos para escribir en el bucket de GCS y ser invocada por Pub/Sub.
Crear cuenta de servicio
- En Google Cloud Console, ve a IAM y administración > Cuentas de servicio.
- Haz clic en Crear cuenta de servicio.
- Proporciona los siguientes detalles de configuración:
- Nombre de la cuenta de servicio: Ingresa
wpengine-logs-collector-sa. - Descripción de la cuenta de servicio: Ingresa
Service account for Cloud Run function to collect WP Engine logs.
- Nombre de la cuenta de servicio: Ingresa
- Haz clic en Crear y continuar.
- En la sección Otorga a esta cuenta de servicio acceso al proyecto, agrega los siguientes roles:
- Haz clic en Selecciona un rol.
- Busca y selecciona Administrador de objetos de Storage.
- Haz clic en + Agregar otro rol.
- Busca y selecciona Invocador de Cloud Run.
- Haz clic en + Agregar otro rol.
- Busca y selecciona Invocador de Cloud Functions.
- Haz clic en Continuar.
- Haz clic en Listo.
Estos roles son necesarios para las siguientes acciones:
- Administrador de objetos de Storage: Escribe registros en el bucket de GCS y administra archivos de estado
- Invocador de Cloud Run: Permite que Pub/Sub invoque la función
- Invocador de Cloud Functions: Permite la invocación de funciones
Otorga permisos de IAM en el bucket de GCS
Otorga permisos de escritura a la cuenta de servicio en el bucket de GCS:
- Ve a Cloud Storage > Buckets.
- Haz clic en el nombre de tu bucket (por ejemplo,
wpengine-logs). - Ve a la pestaña Permisos.
- Haz clic en Otorgar acceso.
- Proporciona los siguientes detalles de configuración:
- Agregar entidades: Ingresa el correo electrónico de la cuenta de servicio (por ejemplo,
wpengine-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Asignar roles: Selecciona Administrador de objetos de almacenamiento.
- Agregar entidades: Ingresa el correo electrónico de la cuenta de servicio (por ejemplo,
- Haz clic en Guardar.
Crea un tema de Pub/Sub
Crea un tema de Pub/Sub en el que Cloud Scheduler publicará y al que se suscribirá la función de Cloud Run.
- En Google Cloud Console, ve a Pub/Sub > Temas.
- Haz clic en Crear tema.
- Proporciona los siguientes detalles de configuración:
- ID del tema: Ingresa
wpengine-logs-trigger. - Deja el resto de la configuración con sus valores predeterminados.
- ID del tema: Ingresa
- Haz clic en Crear.
Crea una función de Cloud Run para recopilar registros
La función de Cloud Run se activará con mensajes de Pub/Sub de Cloud Scheduler para recuperar registros de la API de WP Engine y escribirlos en GCS.
- En Google Cloud Console, ve a Cloud Run.
- Haz clic en Crear servicio.
- Selecciona Función (usa un editor intercalado para crear una función).
En la sección Configurar, proporciona los siguientes detalles de configuración:
Parámetro de configuración Valor Nombre del servicio wpengine-logs-collectorRegión Selecciona la región que coincida con tu bucket de GCS (por ejemplo, us-central1).Tiempo de ejecución Selecciona Python 3.12 o una versión posterior. En la sección Activador (opcional), haz lo siguiente:
- Haz clic en + Agregar activador.
- Selecciona Cloud Pub/Sub.
- En Selecciona un tema de Cloud Pub/Sub, elige el tema
wpengine-logs-trigger. - Haz clic en Guardar.
En la sección Autenticación, haz lo siguiente:
- Selecciona Necesita autenticación.
- Verifica Identity and Access Management (IAM).
Desplázate hacia abajo y expande Contenedores, redes y seguridad.
Ve a la pestaña Seguridad:
- Cuenta de servicio: Selecciona la cuenta de servicio
wpengine-logs-collector-sa.
- Cuenta de servicio: Selecciona la cuenta de servicio
Ve a la pestaña Contenedores:
- Haz clic en Variables y secretos.
- Haz clic en + Agregar variable para cada variable de entorno:
Nombre de la variable Valor de ejemplo Descripción GCS_BUCKETwpengine-logsNombre del bucket de GCS GCS_PREFIXwpenginePrefijo para los archivos de registro STATE_KEYwpengine/state.jsonRuta de acceso al archivo de estado WPE_API_USERyour-api-usernameNombre de usuario de la API de WP Engine WPE_API_PASSWORDyour-api-passwordContraseña de la API de WP Engine WPE_INSTALL_IDmyinstallNombre de instalación de WP Engine MAX_RECORDS5000Cantidad máxima de registros por ejecución PAGE_SIZE100Registros por página LOOKBACK_HOURS24Período de visualización inicial En la sección Variables y Secrets, desplázate hacia abajo hasta Solicitudes:
- Tiempo de espera de la solicitud: Ingresa
600segundos (10 minutos).
- Tiempo de espera de la solicitud: Ingresa
Ve a la pestaña Configuración:
- En la sección Recursos, haz lo siguiente:
- Memoria: Selecciona 512 MiB o más.
- CPU: Selecciona 1.
- En la sección Recursos, haz lo siguiente:
En la sección Escalamiento de revisión, haz lo siguiente:
- Cantidad mínima de instancias: Ingresa
0. - Cantidad máxima de instancias: Ingresa
100(o ajusta según la carga esperada).
- Cantidad mínima de instancias: Ingresa
Haz clic en Crear.
Espera a que se cree el servicio (de 1 a 2 minutos).
Después de crear el servicio, se abrirá automáticamente el editor de código intercalado.
Agregar el código de función
- Ingresa main en el campo Punto de entrada.
En el editor de código intercalado, crea dos archivos:
- Primer archivo: main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 from datetime import datetime, timezone, timedelta import time import base64 # Initialize HTTP client with timeouts http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=5.0, read=30.0), retries=False, ) # Initialize Storage client storage_client = storage.Client() # Environment variables GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'wpengine') STATE_KEY = os.environ.get('STATE_KEY', 'wpengine/state.json') WPE_API_USER = os.environ.get('WPE_API_USER') WPE_API_PASSWORD = os.environ.get('WPE_API_PASSWORD') WPE_INSTALL_ID = os.environ.get('WPE_INSTALL_ID') MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) # WP Engine API base URL API_BASE = 'https://api.wpengineapi.com/v1' # Log types to fetch LOG_TYPES = ['access', 'error'] def get_auth_header(): """Generate HTTP Basic auth header for WP Engine API.""" credentials = f"{WPE_API_USER}:{WPE_API_PASSWORD}" encoded = base64.b64encode(credentials.encode('utf-8')).decode('utf-8') return f"Basic {encoded}" @functions_framework.cloud_event def main(cloud_event): """ Cloud Run function triggered by Pub/Sub to fetch WP Engine logs and write to GCS. Args: cloud_event: CloudEvent object containing Pub/Sub message """ if not all([GCS_BUCKET, WPE_API_USER, WPE_API_PASSWORD, WPE_INSTALL_ID]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) # Load state state = load_state(bucket, STATE_KEY) # Determine time window now = datetime.now(timezone.utc) last_offsets = {} if isinstance(state, dict) and state.get("last_offsets"): last_offsets = state["last_offsets"] print(f"Fetching logs for install: {WPE_INSTALL_ID}") auth_header = get_auth_header() all_records = [] # Fetch both access and error log types for log_type in LOG_TYPES: last_offset = last_offsets.get(log_type, 0) records = fetch_logs( auth_header=auth_header, install_id=WPE_INSTALL_ID, log_type=log_type, start_offset=last_offset, page_size=PAGE_SIZE, max_records=MAX_RECORDS, ) # Tag records with log type for record in records: record['_wpe_log_type'] = log_type all_records.extend(records) # Update offset for this log type if records: last_offsets[log_type] = last_offset + len(records) print(f"Fetched {len(records)} {log_type} log records") if not all_records: print("No new log records found.") save_state(bucket, STATE_KEY, last_offsets) return # Write to GCS as NDJSON timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in all_records]) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(all_records)} records to gs://{GCS_BUCKET}/{object_key}") # Update state save_state(bucket, STATE_KEY, last_offsets) print(f"Successfully processed {len(all_records)} records") except Exception as e: print(f'Error processing logs: {str(e)}') raise def load_state(bucket, key): """Load state from GCS.""" try: blob = bucket.blob(key) if blob.exists(): state_data = blob.download_as_text() return json.loads(state_data) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, key, last_offsets: dict): """Save the last offsets to GCS state file.""" try: state = {'last_offsets': last_offsets} blob = bucket.blob(key) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_offsets={last_offsets}") except Exception as e: print(f"Warning: Could not save state: {e}") def fetch_logs(auth_header: str, install_id: str, log_type: str, start_offset: int, page_size: int, max_records: int): """ Fetch logs from WP Engine API with offset-based pagination and rate limiting. Args: auth_header: HTTP Basic auth header install_id: WP Engine install name log_type: Log type to fetch (access or error) start_offset: Starting offset for pagination page_size: Number of records per page max_records: Maximum total records to fetch Returns: List of log records """ headers = { 'Authorization': auth_header, 'Accept': 'application/json', 'User-Agent': 'GoogleSecOps-WPEngineCollector/1.0' } records = [] offset = start_offset page_num = 0 backoff = 1.0 while True: page_num += 1 if len(records) >= max_records: print(f"Reached max_records limit ({max_records}) for {log_type}") break limit = min(page_size, max_records - len(records)) url = f"{API_BASE}/installs/{install_id}/logs?type={log_type}&limit={limit}&offset={offset}" try: response = http.request('GET', url, headers=headers) # Handle rate limiting with exponential backoff if response.status == 429: retry_after = int(response.headers.get('Retry-After', str(int(backoff)))) print(f"Rate limited (429). Retrying after {retry_after}s...") time.sleep(retry_after) backoff = min(backoff * 2, 30.0) continue backoff = 1.0 if response.status != 200: print(f"HTTP Error: {response.status}") response_text = response.data.decode('utf-8') print(f"Response body: {response_text}") return [] data = json.loads(response.data.decode('utf-8')) page_results = data.get('results', data.get('data', [])) if not page_results: print(f"No more results (empty page) for {log_type}") break print(f"Page {page_num}: Retrieved {len(page_results)} {log_type} events") records.extend(page_results) offset += len(page_results) # If we got fewer results than requested, no more pages if len(page_results) < limit: print(f"Last page reached for {log_type}") break except Exception as e: print(f"Error fetching {log_type} logs: {e}") return [] print(f"Retrieved {len(records)} total {log_type} records from {page_num} pages") return records- Segundo archivo: requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0Haz clic en Implementar para guardar y, luego, implementar la función.
Espera a que se complete la implementación (de 2 a 3 minutos).
Crea un trabajo de Cloud Scheduler
Cloud Scheduler publicará mensajes en el tema de Pub/Sub a intervalos regulares, lo que activará la función de Cloud Run.
- En Google Cloud Console, ve a Cloud Scheduler.
- Haz clic en Crear trabajo.
Proporciona los siguientes detalles de configuración:
Parámetro de configuración Valor Nombre wpengine-logs-collector-hourlyRegión Selecciona la misma región que la función de Cloud Run Frecuencia 0 * * * *(cada hora, en punto)Zona horaria Selecciona la zona horaria (se recomienda UTC) Tipo de objetivo Pub/Sub Tema Selecciona el tema wpengine-logs-trigger.Cuerpo del mensaje {}(objeto JSON vacío)Haz clic en Crear.
Opciones de frecuencia de programación
Elige la frecuencia según los requisitos de latencia y volumen de registros:
| Frecuencia | Expresión cron | Caso de uso |
|---|---|---|
| Cada 5 minutos | */5 * * * * |
Alto volumen y baja latencia |
| Cada 15 minutos | */15 * * * * |
Volumen medio |
| Cada 1 hora | 0 * * * * |
Estándar (recomendado) |
| Cada 6 horas | 0 */6 * * * |
Procesamiento por lotes y volumen bajo |
| Diario | 0 0 * * * |
Recopilación de datos históricos |
Prueba la integración
- En la consola de Cloud Scheduler, busca tu trabajo.
- Haz clic en Ejecutar forzosamente para activar el trabajo de forma manual.
- Espera unos segundos.
- Ve a Cloud Run > Servicios.
- Haz clic en
wpengine-logs-collector: - Haz clic en la pestaña Registros.
Verifica que la función se haya ejecutado correctamente. Busca lo siguiente:
Fetching logs for install: myinstall Page 1: Retrieved X access events Fetched X access log records Page 1: Retrieved X error events Fetched X error log records Wrote X records to gs://wpengine-logs/wpengine/logs_YYYYMMDD_HHMMSS.ndjson Successfully processed X recordsVe a Cloud Storage > Buckets.
Haz clic en el nombre de tu bucket (
wpengine-logs).Navega a la carpeta
wpengine/:Verifica que se haya creado un archivo
.ndjsonnuevo con la marca de tiempo actual.
Si ves errores en los registros, haz lo siguiente:
- HTTP 401: Verifica las credenciales de la API en las variables de entorno
- HTTP 403: Verifica que el acceso a la API esté habilitado en el Portal de usuarios de WP Engine
- HTTP 429: Limitación de frecuencia. La función volverá a intentarlo automáticamente con una espera exponencial.
- Faltan variables de entorno: Verifica que estén configuradas todas las variables requeridas.
Configura un feed en Google SecOps para transferir registros de WP Engine
- Ve a Configuración de SIEM > Feeds.
- Haz clic en Agregar feed nuevo.
- Haz clic en Configura un feed único.
- En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo,
WP Engine Logs). - Selecciona Google Cloud Storage V2 como el Tipo de fuente.
- Selecciona WPEngine como el Tipo de registro.
Haz clic en Obtener cuenta de servicio. Se mostrará un correo electrónico único de la cuenta de servicio, por ejemplo:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.comCopia esta dirección de correo electrónico.
Haz clic en Siguiente.
Especifica valores para los siguientes parámetros de entrada:
URL del bucket de almacenamiento: Ingresa el URI del bucket de GCS con la ruta de acceso del prefijo:
gs://wpengine-logs/wpengine/- Reemplaza:
wpengine-logs: Es el nombre de tu bucket de GCS.wpengine: Es el prefijo o la ruta de carpeta opcionales en los que se almacenan los registros (déjalo vacío para la raíz).
- Reemplaza:
Opción de eliminación del código fuente: Selecciona la opción de eliminación según tu preferencia:
- Nunca: Nunca borra ningún archivo después de las transferencias (se recomienda para las pruebas).
- Borrar archivos transferidos: Borra los archivos después de que se transfirieron correctamente.
Borrar los archivos transferidos y los directorios vacíos: Borra los archivos y los directorios vacíos después de que se transfirieron correctamente.
Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días más reciente (el valor predeterminado es 180 días).
Espacio de nombres del recurso: Es el espacio de nombres del recurso.
Etiquetas de transferencia: Es la etiqueta que se aplicará a los eventos de este feed.
Haz clic en Siguiente.
Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.
Otorga permisos de IAM a la cuenta de servicio de Google SecOps
La cuenta de servicio de Google SecOps necesita el rol de visualizador de objetos de Storage en tu bucket de GCS.
- Ve a Cloud Storage > Buckets.
- Haz clic en el nombre del bucket.
- Ve a la pestaña Permisos.
- Haz clic en Otorgar acceso.
- Proporciona los siguientes detalles de configuración:
- Agregar entidades: Pega el correo electrónico de la cuenta de servicio de Google SecOps.
- Asignar roles: Selecciona Visualizador de objetos de Storage.
- Haz clic en Guardar.
Tabla de asignación de UDM
| Campo de registro | Asignación de UDM | Lógica |
|---|---|---|
| request, sig, blog_id, kind, name, slug, ver | additional.fields | Se fusiona con las etiquetas de la solicitud (como request_label), la firma (como sig_label), el ID del blog (como blog_id_label), el tipo (como kind_label), el nombre (como name_label), el slug (como slug_label) y la versión (como ver_label) si cada uno no está vacío. |
| msg | metadata.description | Valor copiado directamente |
| metadata.event_type | Se establece en "STATUS_UPDATE" si has_principal es verdadero; de lo contrario, se establece en "GENERIC_EVENT". | |
| protocolo | network.application_protocol | Valor copiado directamente |
| versión | network.application_protocol_version | Se convirtió en cadena |
| método | network.http.method | Valor copiado directamente |
| user_agent | network.http.parsed_user_agent | Se convirtió en parseduseragent |
| secure_url | network.http.referral_url | Valor copiado directamente |
| response_code | network.http.response_code | Se convierte en una cadena y, luego, en un número entero. |
| user_agent | network.http.user_agent | Valor copiado directamente |
| received_bytes | network.received_bytes | Se convirtió en una cadena y, luego, en un número entero sin signo. |
| Nombre de host | principal.asset.hostname | Valor copiado directamente |
| client_ip | principal.asset.ip | Valor copiado directamente |
| Nombre de host | principal.hostname | Valor copiado directamente |
| client_ip | principal.ip | Valor copiado directamente |
| puerto | principal.port | Se convierte en una cadena y, luego, en un número entero. |
| pid | principal.process.pid | Se convirtió en cadena |
| scan_type, scan_value | security_result.description | Valor de scan_value si no está vacío; de lo contrario, valor de scan_type si no está vacío |
¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.