Recopila registros de WP Engine

Compatible con:

En este documento, se explica cómo transferir registros de WP Engine a Google Security Operations con Google Cloud Storage V2.

WP Engine es una plataforma de hosting administrado de WordPress que proporciona hosting de nivel empresarial con seguridad integrada, optimización del rendimiento y servicios de CDN. Genera registros de acceso, registros de errores y registros de eventos de CDN que se pueden recopilar a través de la API de WP Engine.

Antes de comenzar

Asegúrate de cumplir con los siguientes requisitos previos:

  • Una instancia de Google SecOps
  • Un proyecto de GCP con la API de Cloud Storage habilitada
  • Permisos para crear y administrar buckets de GCS
  • Permisos para administrar políticas de IAM en buckets de GCS
  • Permisos para crear servicios de Cloud Run, temas de Pub/Sub y trabajos de Cloud Scheduler
  • Acceso con privilegios al portal de usuarios de WP Engine con permisos de acceso a la API
  • Una cuenta de WP Engine con acceso a la API habilitado

Crea un bucket de Google Cloud Storage

  1. Ve a Google Cloud Console.
  2. Selecciona tu proyecto o crea uno nuevo.
  3. En el menú de navegación, ve a Cloud Storage > Buckets.
  4. Haz clic en Crear bucket.
  5. Proporciona los siguientes detalles de configuración:

    Parámetro de configuración Valor
    Asigna un nombre a tu bucket Ingresa un nombre global único (por ejemplo, wpengine-logs).
    Tipo de ubicación Elige según tus necesidades (región, birregional, multirregional)
    Ubicación Selecciona la ubicación (por ejemplo, us-central1).
    Clase de almacenamiento Estándar (recomendado para los registros a los que se accede con frecuencia)
    Control de acceso Uniforme (recomendado)
    Herramientas de protección Opcional: Habilita el control de versiones de objetos o la política de retención
  6. Haz clic en Crear.

Recopila las credenciales de la API de WP Engine

Genera credenciales de API

  1. Accede al Portal de usuarios de WP Engine.
  2. Haz clic en el nombre de tu perfil y, luego, ve a Perfil > Acceso a la API.
  3. Haz clic en Generar credenciales.
  4. Copia y guarda los siguientes detalles en una ubicación segura:

    • Nombre de usuario de la API: Es el nombre de usuario de la API generado.
    • Contraseña de la API: Es la contraseña de la API generada (se muestra solo una vez).

Obtener el nombre de la instalación

  1. Accede al Portal de usuarios de WP Engine.
  2. En el menú de navegación, ve a Sitios.
  3. Haz clic en el sitio del que deseas recopilar registros.
  4. Ten en cuenta el Nombre de instalación que se muestra en la página de descripción general del sitio. Cada entorno (producción, etapa de pruebas y desarrollo) tiene un nombre de instalación independiente.

Prueba el acceso a la API

  • Prueba tus credenciales antes de continuar con la integración:

    # Replace with your actual credentials
    WPE_USER="your-api-username"
    WPE_PASSWORD="your-api-password"
    
    # Test API access - list installs
    curl -v -u "${WPE_USER}:${WPE_PASSWORD}" "https://api.wpengineapi.com/v1/installs"
    

Crea una cuenta de servicio para la función de Cloud Run

La función de Cloud Run necesita una cuenta de servicio con permisos para escribir en el bucket de GCS y ser invocada por Pub/Sub.

Crear cuenta de servicio

  1. En Google Cloud Console, ve a IAM y administración > Cuentas de servicio.
  2. Haz clic en Crear cuenta de servicio.
  3. Proporciona los siguientes detalles de configuración:
    • Nombre de la cuenta de servicio: Ingresa wpengine-logs-collector-sa.
    • Descripción de la cuenta de servicio: Ingresa Service account for Cloud Run function to collect WP Engine logs.
  4. Haz clic en Crear y continuar.
  5. En la sección Otorga a esta cuenta de servicio acceso al proyecto, agrega los siguientes roles:
    1. Haz clic en Selecciona un rol.
    2. Busca y selecciona Administrador de objetos de Storage.
    3. Haz clic en + Agregar otro rol.
    4. Busca y selecciona Invocador de Cloud Run.
    5. Haz clic en + Agregar otro rol.
    6. Busca y selecciona Invocador de Cloud Functions.
  6. Haz clic en Continuar.
  7. Haz clic en Listo.

Estos roles son necesarios para las siguientes acciones:

  • Administrador de objetos de Storage: Escribe registros en el bucket de GCS y administra archivos de estado
  • Invocador de Cloud Run: Permite que Pub/Sub invoque la función
  • Invocador de Cloud Functions: Permite la invocación de funciones

Otorga permisos de IAM en el bucket de GCS

Otorga permisos de escritura a la cuenta de servicio en el bucket de GCS:

  1. Ve a Cloud Storage > Buckets.
  2. Haz clic en el nombre de tu bucket (por ejemplo, wpengine-logs).
  3. Ve a la pestaña Permisos.
  4. Haz clic en Otorgar acceso.
  5. Proporciona los siguientes detalles de configuración:
    • Agregar entidades: Ingresa el correo electrónico de la cuenta de servicio (por ejemplo, wpengine-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Asignar roles: Selecciona Administrador de objetos de almacenamiento.
  6. Haz clic en Guardar.

Crea un tema de Pub/Sub

Crea un tema de Pub/Sub en el que Cloud Scheduler publicará y al que se suscribirá la función de Cloud Run.

  1. En Google Cloud Console, ve a Pub/Sub > Temas.
  2. Haz clic en Crear tema.
  3. Proporciona los siguientes detalles de configuración:
    • ID del tema: Ingresa wpengine-logs-trigger.
    • Deja el resto de la configuración con sus valores predeterminados.
  4. Haz clic en Crear.

Crea una función de Cloud Run para recopilar registros

La función de Cloud Run se activará con mensajes de Pub/Sub de Cloud Scheduler para recuperar registros de la API de WP Engine y escribirlos en GCS.

  1. En Google Cloud Console, ve a Cloud Run.
  2. Haz clic en Crear servicio.
  3. Selecciona Función (usa un editor intercalado para crear una función).
  4. En la sección Configurar, proporciona los siguientes detalles de configuración:

    Parámetro de configuración Valor
    Nombre del servicio wpengine-logs-collector
    Región Selecciona la región que coincida con tu bucket de GCS (por ejemplo, us-central1).
    Tiempo de ejecución Selecciona Python 3.12 o una versión posterior.
  5. En la sección Activador (opcional), haz lo siguiente:

    1. Haz clic en + Agregar activador.
    2. Selecciona Cloud Pub/Sub.
    3. En Selecciona un tema de Cloud Pub/Sub, elige el tema wpengine-logs-trigger.
    4. Haz clic en Guardar.
  6. En la sección Autenticación, haz lo siguiente:

    1. Selecciona Necesita autenticación.
    2. Verifica Identity and Access Management (IAM).
  7. Desplázate hacia abajo y expande Contenedores, redes y seguridad.

  8. Ve a la pestaña Seguridad:

    • Cuenta de servicio: Selecciona la cuenta de servicio wpengine-logs-collector-sa.
  9. Ve a la pestaña Contenedores:

    1. Haz clic en Variables y secretos.
    2. Haz clic en + Agregar variable para cada variable de entorno:
    Nombre de la variable Valor de ejemplo Descripción
    GCS_BUCKET wpengine-logs Nombre del bucket de GCS
    GCS_PREFIX wpengine Prefijo para los archivos de registro
    STATE_KEY wpengine/state.json Ruta de acceso al archivo de estado
    WPE_API_USER your-api-username Nombre de usuario de la API de WP Engine
    WPE_API_PASSWORD your-api-password Contraseña de la API de WP Engine
    WPE_INSTALL_ID myinstall Nombre de instalación de WP Engine
    MAX_RECORDS 5000 Cantidad máxima de registros por ejecución
    PAGE_SIZE 100 Registros por página
    LOOKBACK_HOURS 24 Período de visualización inicial
  10. En la sección Variables y Secrets, desplázate hacia abajo hasta Solicitudes:

    • Tiempo de espera de la solicitud: Ingresa 600 segundos (10 minutos).
  11. Ve a la pestaña Configuración:

    • En la sección Recursos, haz lo siguiente:
      • Memoria: Selecciona 512 MiB o más.
      • CPU: Selecciona 1.
  12. En la sección Escalamiento de revisión, haz lo siguiente:

    • Cantidad mínima de instancias: Ingresa 0.
    • Cantidad máxima de instancias: Ingresa 100 (o ajusta según la carga esperada).
  13. Haz clic en Crear.

  14. Espera a que se cree el servicio (de 1 a 2 minutos).

  15. Después de crear el servicio, se abrirá automáticamente el editor de código intercalado.

Agregar el código de función

  1. Ingresa main en el campo Punto de entrada.
  2. En el editor de código intercalado, crea dos archivos:

    • Primer archivo: main.py:
    import functions_framework
    from google.cloud import storage
    import json
    import os
    import urllib3
    from datetime import datetime, timezone, timedelta
    import time
    import base64
    
    # Initialize HTTP client with timeouts
    http = urllib3.PoolManager(
      timeout=urllib3.Timeout(connect=5.0, read=30.0),
      retries=False,
    )
    
    # Initialize Storage client
    storage_client = storage.Client()
    
    # Environment variables
    GCS_BUCKET = os.environ.get('GCS_BUCKET')
    GCS_PREFIX = os.environ.get('GCS_PREFIX', 'wpengine')
    STATE_KEY = os.environ.get('STATE_KEY', 'wpengine/state.json')
    WPE_API_USER = os.environ.get('WPE_API_USER')
    WPE_API_PASSWORD = os.environ.get('WPE_API_PASSWORD')
    WPE_INSTALL_ID = os.environ.get('WPE_INSTALL_ID')
    MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000'))
    PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
    LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24'))
    
    # WP Engine API base URL
    API_BASE = 'https://api.wpengineapi.com/v1'
    
    # Log types to fetch
    LOG_TYPES = ['access', 'error']
    
    def get_auth_header():
      """Generate HTTP Basic auth header for WP Engine API."""
      credentials = f"{WPE_API_USER}:{WPE_API_PASSWORD}"
      encoded = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
      return f"Basic {encoded}"
    
    @functions_framework.cloud_event
    def main(cloud_event):
      """
      Cloud Run function triggered by Pub/Sub to fetch WP Engine
      logs and write to GCS.
    
      Args:
        cloud_event: CloudEvent object containing Pub/Sub message
      """
    
      if not all([GCS_BUCKET, WPE_API_USER, WPE_API_PASSWORD, WPE_INSTALL_ID]):
        print('Error: Missing required environment variables')
        return
    
      try:
        bucket = storage_client.bucket(GCS_BUCKET)
    
        # Load state
        state = load_state(bucket, STATE_KEY)
    
        # Determine time window
        now = datetime.now(timezone.utc)
        last_offsets = {}
    
        if isinstance(state, dict) and state.get("last_offsets"):
          last_offsets = state["last_offsets"]
    
        print(f"Fetching logs for install: {WPE_INSTALL_ID}")
    
        auth_header = get_auth_header()
        all_records = []
    
        # Fetch both access and error log types
        for log_type in LOG_TYPES:
          last_offset = last_offsets.get(log_type, 0)
    
          records = fetch_logs(
            auth_header=auth_header,
            install_id=WPE_INSTALL_ID,
            log_type=log_type,
            start_offset=last_offset,
            page_size=PAGE_SIZE,
            max_records=MAX_RECORDS,
          )
    
          # Tag records with log type
          for record in records:
            record['_wpe_log_type'] = log_type
    
          all_records.extend(records)
    
          # Update offset for this log type
          if records:
            last_offsets[log_type] = last_offset + len(records)
    
          print(f"Fetched {len(records)} {log_type} log records")
    
        if not all_records:
          print("No new log records found.")
          save_state(bucket, STATE_KEY, last_offsets)
          return
    
        # Write to GCS as NDJSON
        timestamp = now.strftime('%Y%m%d_%H%M%S')
        object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
        blob = bucket.blob(object_key)
    
        ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in all_records]) + '\n'
        blob.upload_from_string(ndjson, content_type='application/x-ndjson')
    
        print(f"Wrote {len(all_records)} records to gs://{GCS_BUCKET}/{object_key}")
    
        # Update state
        save_state(bucket, STATE_KEY, last_offsets)
    
        print(f"Successfully processed {len(all_records)} records")
    
      except Exception as e:
        print(f'Error processing logs: {str(e)}')
        raise
    
    def load_state(bucket, key):
      """Load state from GCS."""
      try:
        blob = bucket.blob(key)
        if blob.exists():
          state_data = blob.download_as_text()
          return json.loads(state_data)
      except Exception as e:
        print(f"Warning: Could not load state: {e}")
    
      return {}
    
    def save_state(bucket, key, last_offsets: dict):
      """Save the last offsets to GCS state file."""
      try:
        state = {'last_offsets': last_offsets}
        blob = bucket.blob(key)
        blob.upload_from_string(
          json.dumps(state, indent=2),
          content_type='application/json'
        )
        print(f"Saved state: last_offsets={last_offsets}")
      except Exception as e:
        print(f"Warning: Could not save state: {e}")
    
    def fetch_logs(auth_header: str, install_id: str, log_type: str, start_offset: int, page_size: int, max_records: int):
      """
      Fetch logs from WP Engine API with offset-based pagination
      and rate limiting.
    
      Args:
        auth_header: HTTP Basic auth header
        install_id: WP Engine install name
        log_type: Log type to fetch (access or error)
        start_offset: Starting offset for pagination
        page_size: Number of records per page
        max_records: Maximum total records to fetch
    
      Returns:
        List of log records
      """
      headers = {
        'Authorization': auth_header,
        'Accept': 'application/json',
        'User-Agent': 'GoogleSecOps-WPEngineCollector/1.0'
      }
    
      records = []
      offset = start_offset
      page_num = 0
      backoff = 1.0
    
      while True:
        page_num += 1
    
        if len(records) >= max_records:
          print(f"Reached max_records limit ({max_records}) for {log_type}")
          break
    
        limit = min(page_size, max_records - len(records))
        url = f"{API_BASE}/installs/{install_id}/logs?type={log_type}&limit={limit}&offset={offset}"
    
        try:
          response = http.request('GET', url, headers=headers)
    
          # Handle rate limiting with exponential backoff
          if response.status == 429:
            retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
            print(f"Rate limited (429). Retrying after {retry_after}s...")
            time.sleep(retry_after)
            backoff = min(backoff * 2, 30.0)
            continue
    
          backoff = 1.0
    
          if response.status != 200:
            print(f"HTTP Error: {response.status}")
            response_text = response.data.decode('utf-8')
            print(f"Response body: {response_text}")
            return []
    
          data = json.loads(response.data.decode('utf-8'))
    
          page_results = data.get('results', data.get('data', []))
    
          if not page_results:
            print(f"No more results (empty page) for {log_type}")
            break
    
          print(f"Page {page_num}: Retrieved {len(page_results)} {log_type} events")
          records.extend(page_results)
    
          offset += len(page_results)
    
          # If we got fewer results than requested, no more pages
          if len(page_results) < limit:
            print(f"Last page reached for {log_type}")
            break
    
        except Exception as e:
          print(f"Error fetching {log_type} logs: {e}")
          return []
    
      print(f"Retrieved {len(records)} total {log_type} records from {page_num} pages")
      return records
    
    • Segundo archivo: requirements.txt:
    functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0
    
  3. Haz clic en Implementar para guardar y, luego, implementar la función.

  4. Espera a que se complete la implementación (de 2 a 3 minutos).

Crea un trabajo de Cloud Scheduler

Cloud Scheduler publicará mensajes en el tema de Pub/Sub a intervalos regulares, lo que activará la función de Cloud Run.

  1. En Google Cloud Console, ve a Cloud Scheduler.
  2. Haz clic en Crear trabajo.
  3. Proporciona los siguientes detalles de configuración:

    Parámetro de configuración Valor
    Nombre wpengine-logs-collector-hourly
    Región Selecciona la misma región que la función de Cloud Run
    Frecuencia 0 * * * * (cada hora, en punto)
    Zona horaria Selecciona la zona horaria (se recomienda UTC)
    Tipo de objetivo Pub/Sub
    Tema Selecciona el tema wpengine-logs-trigger.
    Cuerpo del mensaje {} (objeto JSON vacío)
  4. Haz clic en Crear.

Opciones de frecuencia de programación

Elige la frecuencia según los requisitos de latencia y volumen de registros:

Frecuencia Expresión cron Caso de uso
Cada 5 minutos */5 * * * * Alto volumen y baja latencia
Cada 15 minutos */15 * * * * Volumen medio
Cada 1 hora 0 * * * * Estándar (recomendado)
Cada 6 horas 0 */6 * * * Procesamiento por lotes y volumen bajo
Diario 0 0 * * * Recopilación de datos históricos

Prueba la integración

  1. En la consola de Cloud Scheduler, busca tu trabajo.
  2. Haz clic en Ejecutar forzosamente para activar el trabajo de forma manual.
  3. Espera unos segundos.
  4. Ve a Cloud Run > Servicios.
  5. Haz clic en wpengine-logs-collector:
  6. Haz clic en la pestaña Registros.
  7. Verifica que la función se haya ejecutado correctamente. Busca lo siguiente:

    Fetching logs for install: myinstall
    Page 1: Retrieved X access events
    Fetched X access log records
    Page 1: Retrieved X error events
    Fetched X error log records
    Wrote X records to gs://wpengine-logs/wpengine/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Ve a Cloud Storage > Buckets.

  9. Haz clic en el nombre de tu bucket (wpengine-logs).

  10. Navega a la carpeta wpengine/:

  11. Verifica que se haya creado un archivo .ndjson nuevo con la marca de tiempo actual.

Si ves errores en los registros, haz lo siguiente:

  • HTTP 401: Verifica las credenciales de la API en las variables de entorno
  • HTTP 403: Verifica que el acceso a la API esté habilitado en el Portal de usuarios de WP Engine
  • HTTP 429: Limitación de frecuencia. La función volverá a intentarlo automáticamente con una espera exponencial.
  • Faltan variables de entorno: Verifica que estén configuradas todas las variables requeridas.

Configura un feed en Google SecOps para transferir registros de WP Engine

  1. Ve a Configuración de SIEM > Feeds.
  2. Haz clic en Agregar feed nuevo.
  3. Haz clic en Configura un feed único.
  4. En el campo Nombre del feed, ingresa un nombre para el feed (por ejemplo, WP Engine Logs).
  5. Selecciona Google Cloud Storage V2 como el Tipo de fuente.
  6. Selecciona WPEngine como el Tipo de registro.
  7. Haz clic en Obtener cuenta de servicio. Se mostrará un correo electrónico único de la cuenta de servicio, por ejemplo:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  8. Copia esta dirección de correo electrónico.

  9. Haz clic en Siguiente.

  10. Especifica valores para los siguientes parámetros de entrada:

    • URL del bucket de almacenamiento: Ingresa el URI del bucket de GCS con la ruta de acceso del prefijo:

      gs://wpengine-logs/wpengine/
      
      • Reemplaza:
        • wpengine-logs: Es el nombre de tu bucket de GCS.
        • wpengine: Es el prefijo o la ruta de carpeta opcionales en los que se almacenan los registros (déjalo vacío para la raíz).
    • Opción de eliminación del código fuente: Selecciona la opción de eliminación según tu preferencia:

      • Nunca: Nunca borra ningún archivo después de las transferencias (se recomienda para las pruebas).
      • Borrar archivos transferidos: Borra los archivos después de que se transfirieron correctamente.
      • Borrar los archivos transferidos y los directorios vacíos: Borra los archivos y los directorios vacíos después de que se transfirieron correctamente.

    • Antigüedad máxima del archivo: Incluye los archivos modificados en la cantidad de días más reciente (el valor predeterminado es 180 días).

    • Espacio de nombres del recurso: Es el espacio de nombres del recurso.

    • Etiquetas de transferencia: Es la etiqueta que se aplicará a los eventos de este feed.

  11. Haz clic en Siguiente.

  12. Revisa la nueva configuración del feed en la pantalla Finalizar y, luego, haz clic en Enviar.

Otorga permisos de IAM a la cuenta de servicio de Google SecOps

La cuenta de servicio de Google SecOps necesita el rol de visualizador de objetos de Storage en tu bucket de GCS.

  1. Ve a Cloud Storage > Buckets.
  2. Haz clic en el nombre del bucket.
  3. Ve a la pestaña Permisos.
  4. Haz clic en Otorgar acceso.
  5. Proporciona los siguientes detalles de configuración:
    • Agregar entidades: Pega el correo electrónico de la cuenta de servicio de Google SecOps.
    • Asignar roles: Selecciona Visualizador de objetos de Storage.
  6. Haz clic en Guardar.

Tabla de asignación de UDM

Campo de registro Asignación de UDM Lógica
request, sig, blog_id, kind, name, slug, ver additional.fields Se fusiona con las etiquetas de la solicitud (como request_label), la firma (como sig_label), el ID del blog (como blog_id_label), el tipo (como kind_label), el nombre (como name_label), el slug (como slug_label) y la versión (como ver_label) si cada uno no está vacío.
msg metadata.description Valor copiado directamente
metadata.event_type Se establece en "STATUS_UPDATE" si has_principal es verdadero; de lo contrario, se establece en "GENERIC_EVENT".
protocolo network.application_protocol Valor copiado directamente
versión network.application_protocol_version Se convirtió en cadena
método network.http.method Valor copiado directamente
user_agent network.http.parsed_user_agent Se convirtió en parseduseragent
secure_url network.http.referral_url Valor copiado directamente
response_code network.http.response_code Se convierte en una cadena y, luego, en un número entero.
user_agent network.http.user_agent Valor copiado directamente
received_bytes network.received_bytes Se convirtió en una cadena y, luego, en un número entero sin signo.
Nombre de host principal.asset.hostname Valor copiado directamente
client_ip principal.asset.ip Valor copiado directamente
Nombre de host principal.hostname Valor copiado directamente
client_ip principal.ip Valor copiado directamente
puerto principal.port Se convierte en una cadena y, luego, en un número entero.
pid principal.process.pid Se convirtió en cadena
scan_type, scan_value security_result.description Valor de scan_value si no está vacío; de lo contrario, valor de scan_type si no está vacío

¿Necesitas más ayuda? Obtén respuestas de miembros de la comunidad y profesionales de Google SecOps.