本頁說明如何刻意觸發偵測器並檢查結果,確認 Agent Platform Threat Detection 正常運作。Agent Platform Threat Detection 是 Security Command Center 的內建服務。
事前準備
如要偵測 Agent Runtime 代理程式的潛在威脅,請確保在 Security Command Center 中啟用 Agent Platform Threat Detection 服務。
設定環境
如要測試 Agent Platform Threat Detection,請設定模擬惡意活動的示範代理程式。
建立專案並啟用 Shell
選取或建立 Google Cloud 專案,用於測試。
如要測試偵測器,可以使用 Google Cloud 控制台和 Cloud Shell。
選取要用於測試的專案。
按一下「啟用 Cloud Shell」
。
您也可以透過本機殼層執行測試指令。
設定 Agent Runtime
如果您先前未在此專案中使用過 Agent Runtime,請先設定 Agent Runtime 環境,再開始使用。記下您建立的 Cloud Storage 暫存 bucket 名稱。我們也建議您按照代理執行階段快速入門指南操作,瞭解如何使用 Vertex AI SDK 開發及部署代理。
建立測試腳本
您將建立多個檔案,用於部署新代理程式以進行測試。請使用文字編輯器 (例如 nano) 執行這項操作。
建立名為
requirements.txt的新檔案,並在其中加入下列內容:google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydantic建立名為
installation_scripts/install.sh的新空白檔案。部分測試需要將內容新增至這個檔案。建立名為
main.py的新檔案,並加入以下內容。請替換PROJECT_ID、LOCATION和STAGING_BUCKET變數。暫存 bucket 名稱必須包含gs://前置字串。import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
設定虛擬環境
建立並啟用 Python 虛擬環境。
python3 -m venv env source env/bin/activate在虛擬環境中安裝必要的依附元件。
pip install -r requirements.txt
測試指令碼
在虛擬環境中執行指令碼,並使用 python3 main.py。這個指令需要幾分鐘才能建構、部署及執行測試代理程式。
指令碼會輸出已部署代理程式的資源名稱,以及幾個 JSON 物件,包括 LLM 回應和其他中繼資料。如果在這個階段遇到權限錯誤或部署錯誤,請參閱疑難排解,瞭解後續步驟。
測試偵測工具
如要測試 Agent Platform 威脅偵測偵測器,請將 threat_detection_test 函式中的程式碼替換為模擬攻擊的程式碼。部署指令碼及查詢代理程式可能需要很長時間。如要加快測試速度,可以合併多個函式的內容。
執行:已執行新增的惡意二進位檔
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
執行:已載入新增的惡意資料庫
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。您可能會收到 Vertex AI SDK 傳回的錯誤訊息,指出無法剖析回覆,但系統仍會生成發現項目。
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
執行:容器跳脫
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
執行:Kubernetes 攻擊工具執行作業
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
執行作業:本機偵查工具執行作業
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
執行:已執行修改過的惡意二進位檔
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
執行:已載入修改過的惡意資料庫
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
偵測到惡意網址
將 installation_scripts/install.sh 檔案更新為下列內容。
#!/bin/bash
apt-get install -y curl --no-install-recommends
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
反向殼層
取代測試腳本中的 threat_detection_test 函式,然後執行測試腳本。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output