カスタム エージェントを使用する

始める前に

このチュートリアルは、次の手順を読んで理解していることを前提としています。

エージェントのインスタンスを取得する

エージェントにクエリを実行するには、最初にエージェントのインスタンスが必要です。エージェントの新しいインスタンスを作成するか、既存のインスタンスを取得します。

特定のリソース ID に対応するエージェントを取得するには:

Vertex AI SDK for Python

次のコードを実行します。

import vertexai

client = vertexai.Client(  # For service interactions via client.agent_engines
    project="PROJECT_ID",
    location="LOCATION",
)

agent = client.agent_engines.get(name="projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

print(agent)

ここで

リクエスト

次のコードを実行します。

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

response = requests.get(
f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID",
    headers={
        "Content-Type": "application/json; charset=utf-8",
        "Authorization": f"Bearer {get_identity_token()}",
    },
)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID

Vertex AI SDK for Python を使用する場合、agent オブジェクトは、次のものを含む AgentEngine クラスに対応します。

  • デプロイされたエージェントに関する情報を含む agent.api_resourceagent.operation_schemas() を呼び出して、エージェントがサポートするオペレーションのリストを返すこともできます。詳しくは、サポートされているオペレーションをご覧ください。
  • 同期サービス インタラクションを可能にする agent.api_client
  • 非同期サービス インタラクションを可能にする agent.async_api_client

このセクションの残りの部分では、インスタンスが agent という名前であると仮定しています。

サポートされているオペレーションの一覧を取得する

エージェントをローカルで開発する場合は、エージェントがサポートするオペレーションにアクセスして知識を得ることができます。デプロイされたエージェントを使用するには、エージェントがサポートするオペレーションを列挙します。

Vertex AI SDK for Python

次のコードを実行します。

print(agent.operation_schemas())

リクエスト

次のコードを実行します。

import json

json.loads(response.content).get("spec").get("classMethods")

REST

curl リクエストに対するレスポンスから spec.class_methods で表されます。

各オペレーションのスキーマは、呼び出せるエージェントのメソッドに関する情報をドキュメント化したディクショナリです。サポートされているオペレーションのセットは、エージェントの開発に使用したフレームワークによって異なります。

たとえば、LangchainAgentquery オペレーションのスキーマは次のとおりです。

{'api_mode': '',
 'name': 'query',
 'description': """Queries the Agent with the given input and config.
    Args:
        input (Union[str, Mapping[str, Any]]):
            Required. The input to be passed to the Agent.
        config (langchain_core.runnables.RunnableConfig):
            Optional. The config (if any) to be used for invoking the Agent.
    Returns:
        The output of querying the Agent with the given input and config.
""",            '        ',
 'parameters': {'$defs': {'RunnableConfig': {'description': 'Configuration for a Runnable.',
                                             'properties': {'configurable': {...},
                                                            'run_id': {...},
                                                            'run_name': {...},
                                                            ...},
                                             'type': 'object'}},
                'properties': {'config': {'nullable': True},
                               'input': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}},
                'required': ['input'],
                'type': 'object'}}

ここで

  • name は、オペレーションの名前です(query という名前のオペレーションの場合は agent.query)。
  • api_mode は、オペレーションの API モードです(同期の場合は ""、ストリーミングの場合は "stream")。
  • description は、メソッドの docstring に基づくオペレーションの説明です。
  • parameters は、OpenAPI スキーマ形式の入力引数のスキーマです。

サポートされているオペレーションを使用してエージェントにクエリを実行する

カスタム エージェントの場合、エージェントの開発時に定義した次のクエリ オペレーションまたはストリーミング オペレーションを使用できます。

特定のフレームワークは、特定のクエリ オペレーションまたはストリーミング オペレーションのみをサポートします。

フレームワーク サポートされているクエリ オペレーション
Agent Development Kit async_stream_query
LangChain querystream_query
LangGraph querystream_query
AG2 query
LlamaIndex query

エージェントにクエリを実行する

query オペレーションを使用してエージェントにクエリを実行します。

Vertex AI SDK for Python

agent.query(input="What is the exchange rate from US dollars to Swedish Krona today?")

リクエスト

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

requests.post(
    f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:query",
    headers={
        "Content-Type": "application/json; charset=utf-8",
        "Authorization": f"Bearer {get_identity_token()}",
    },
    data=json.dumps({
        "class_method": "query",
        "input": {
            "input": "What is the exchange rate from US dollars to Swedish Krona today?"
        }
    })
)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:query -d '{
  "class_method": "query",
  "input": {
    "input": "What is the exchange rate from US dollars to Swedish Krona today?"
  }
}'

クエリのレスポンスは、ローカル アプリケーション テストの出力に似た文字列になります。

{"input": "What is the exchange rate from US dollars to Swedish Krona today?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

エージェントからのレスポンスをストリーミングする

stream_query オペレーションを使用して、エージェントからのレスポンスをストリーミングします。

Vertex AI SDK for Python

agent = agent_engines.get("projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

for response in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
):
    print(response)

リクエスト

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

requests.post(
    f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:streamQuery",
    headers={
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_identity_token()}",
    },
    data=json.dumps({
        "class_method": "stream_query",
        "input": {
            "input": "What is the exchange rate from US dollars to Swedish Krona today?"
        },
    }),
    stream=True,
)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:streamQuery?alt=sse -d '{
  "class_method": "stream_query",
  "input": {
    "input": "What is the exchange rate from US dollars to Swedish Krona today?"
  }
}'

Vertex AI Agent Engine は、反復的に生成されたオブジェクトのシーケンスとしてレスポンスをストリーミングします。たとえば、3 つのレスポンスのセットは次のようになります。

{'actions': [{'tool': 'get_exchange_rate', ...}]}  # first response
{'steps': [{'action': {'tool': 'get_exchange_rate', ...}}]}  # second response
{'output': 'The exchange rate is 11.0117 SEK per USD as of 2024-12-03.'}  # final response

エージェントに非同期でクエリを実行する

エージェントの開発時に async_query オペレーションを定義した場合は、Vertex AI SDK for Python でエージェントのクライアントサイド非同期クエリがサポートされます。

Vertex AI SDK for Python

agent = agent_engines.get("projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

response = await agent.async_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)

クエリのレスポンスは、ローカルテストの出力と同じディクショナリになります。

{"input": "What is the exchange rate from US dollars to Swedish Krona today?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

エージェントからのレスポンスを非同期でストリーミングする

エージェントの開発時に async_stream_query オペレーションを定義した場合は、いずれかのオペレーション(async_stream_query など)を使用して、エージェントからのレスポンスを非同期でストリーミングできます。

Vertex AI SDK for Python

agent = agent_engines.get("projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

async for response in agent.async_stream_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
):
    print(response)

async_stream_query オペレーションは、内部で同じ streamQuery エンドポイントを呼び出し、反復的に生成されたオブジェクトのシーケンスとしてレスポンスを非同期でストリーミングします。たとえば、3 つのレスポンスのセットは次のようになります。

{'actions': [{'tool': 'get_exchange_rate', ...}]}  # first response
{'steps': [{'action': {'tool': 'get_exchange_rate', ...}}]}  # second response
{'output': 'The exchange rate is 11.0117 SEK per USD as of 2024-12-03.'}  # final response

ローカルテストで生成されたレスポンスと同じになるはずです。

次のステップ