次の手順では、Agent Platform にデプロイ可能なエージェントをインスタンス化するためのカスタム テンプレートを作成する方法について説明します。
- 基本的な例
- 省略可: レスポンスをストリーミングする
- 省略可: カスタム メソッドを登録する
- 省略可: 型アノテーションを指定する
- 省略可: トレースを Cloud Trace に送信する
- 省略可: 環境変数を使用する
- 省略可: Secret Manager と統合する
- 省略可: 認証情報を処理する
- 省略可: エラーを処理する
基本的な例
基本的な例として、次の Python クラスは、Agent Platform にデプロイ可能なエージェントをインスタンス化するためのテンプレートです(CLASS_NAME 変数に MyAgent などの値を指定できます)。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
デプロイに関する考慮事項
Python クラスを作成する場合、次の 3 つのメソッドが重要になります。
__init__():- このメソッドは、エージェント構成パラメータにのみ使用します。 たとえば、このメソッドを使用して、モデル パラメータと 安全性属性 をユーザーの入力引数として収集できます。このメソッドを使用して、プロジェクト ID、リージョン、アプリケーション認証情報、API キーなどのパラメータを収集することもできます。
- コンストラクタは、Agent Runtime にデプロイできるように「pickle 対応」である必要があるオブジェクトを返します。そのため、サービス クライアントを初期化し、データベースへの接続を確立する際は、
__init__メソッドではなく.set_upメソッドを使用する必要があります。 - このメソッドは省略可能です。指定しない場合、Agent Runtime はクラスのデフォルトの Python コンストラクタを使用します。
set_up():- このメソッドを使用して、エージェントの初期化ロジックを定義する必要があります。たとえば、このメソッドを使用して、データベースまたは依存サービスへの接続を確立したり、依存パッケージをインポートしたり、クエリの処理に使用するデータの事前計算を行うことができます。
- このメソッドは省略可能です。指定しない場合、Agent Runtime は、ユーザークエリを処理する前にエージェントで
.set_upメソッドを呼び出す必要がないと見なします。
query()/stream_query():query()を使用して、完了した回答を単一の結果として返します。stream_query()を使用すると、レスポンスが利用可能になったときにチャンク単位で返されるため、ストリーミング エクスペリエンスが可能になります。ストリーミングを有効にするには、stream_queryメソッドが反復可能なオブジェクト(ジェネレータなど)を返す必要があります。- エージェントとの単一レスポンスとストリーミングの両方のインタラクションをサポートする場合は、両方のメソッドを実装できます。
- このメソッドには、処理の内容を定義して属性を文書化し、入力に型アノテーションを提供する明確な docstring を指定する必要があります。
queryメソッドとstream_queryメソッドで変数引数を使用しないでください。
エージェントをローカルでインスタンス化する
次のコードを使用して、エージェントのローカル インスタンスを作成できます。
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
query メソッドをテストする
ローカル インスタンスにクエリを送信して、エージェントをテストできます。
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
レスポンスは、次のようなディクショナリ形式になります。
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
非同期でクエリを実行する
クエリに非同期で応答するには、async_query)
を返すPython コルーチンなどのメソッドを定義します。たとえば、次のテンプレートは、非同期で応答するように基本的な例を拡張したもので、Agent Platform にデプロイできます。
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
async_query メソッドをテストする
async_query メソッドを呼び出して、エージェントをローカルでテストできます。次に例を示します。
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
レスポンスは、次のようなディクショナリ形式になります。
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
ストリーミング レスポンス
クエリへのレスポンスをストリーミングするには、レスポンスを生成する stream_query という名前のメソッドを定義します。たとえば、次のテンプレートは、レスポンスをストリーミングするように基本的な例を拡張したもので、Agent Platform にデプロイできます。
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
ストリーミング API を使用する際は、次の点にご留意ください。
- 最大タイムアウト: ストリーミング レスポンスの最大タイムアウトは 10 分です。エージェントで処理時間が長くなる場合は、タスクを小さなチャンクに分割することを検討してください。
- ストリーミング モデルとチェーン: LangChain の Runnable インターフェースはストリーミングを サポートしているため、エージェントだけでなく、モデルや チェーンからのレスポンスもストリーミングできます。
- LangChain の互換性: LangChain の
astream_eventメソッドなどの非同期メソッドは、現時点ではサポートされていません。 - コンテンツ生成をスロットリングする: バックプレッシャーの問題( 消費者が処理できるよりも速い速度でプロデューサーがデータを生成する)が発生した場合は、 コンテンツ生成率をスロットリングする必要があります。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。
stream_query メソッドをテストする
stream_query メソッドを呼び出して結果を反復処理することで、ストリーミング クエリをローカルでテストできます。次に例を示します。
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
このコードは、レスポンスの各チャンクが生成されるたびに出力します。出力は次のようになります。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
この例では、各チャンクに、エージェントが実行したアクション、交換されたメッセージ、最終的な出力など、レスポンスに関するさまざまな情報が含まれています。
レスポンスを非同期でストリーミングする
レスポンスを非同期でストリーミングするには、非同期ジェネレータを返すメソッド(
async_stream_queryなど)を定義します。たとえば、次のテンプレートは、レスポンスを非同期でストリーミングするように基本的な例を拡張したもので、Agent Platform にデプロイできます。
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
async_stream_query メソッドをテストする
ストリーミング クエリのテストコードと同様に、ストリーミング クエリをテストするためのコードと同様に、
エージェントをローカルでテストするには、async_stream_query メソッドを呼び出して結果を反復処理します。次に例を示します。
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
このコードは、レスポンスの各チャンクが生成されるたびに出力します。出力は次のようになります。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
カスタム メソッドを登録する
デフォルトでは、メソッド query と stream_query は、デプロイされたエージェントのオペレーション
として登録されます。デフォルトの動作をオーバーライドし、register_operations
メソッドを使用して登録するオペレーションのセットを定義できます。
オペレーションは、標準(空の文字列
""で表される)またはストリーミング("stream")のいずれかの実行モードとして登録できます。
複数のオペレーションを登録するには、エージェントのデプロイ時にユーザーが使用できるメソッドを一覧表示する register_operations
という名前のメソッドを定義します。次のコード例では、register_operations
メソッドにより、デプロイされたエージェントは、同期的に実行されるオペレーションとして query と get_state を登録し、レスポンスをストリーミングするオペレーションとして
stream_query と get_state_history を登録します。
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
カスタム メソッドをテストするには、エージェントのローカル インスタンス
で直接呼び出します。これは、query と
stream_query メソッドをテストする場合と同様です。
型アノテーションを指定する
型アノテーションを使用して、エージェント メソッドの想定される入力型と出力型を指定できます。エージェントがデプロイされると、エージェントでサポートされているオペレーションの入力と出力では、JSON
シリアル化可能な型のみがサポートされます。入力と出力のスキーマは、TypedDict モデルまたは Pydantic
モデルを使用してアノテーションを付けることができます。
次の例では、入力を TypedDict としてアノテーションを付け、.get_state からの未加工の出力(NamedTuple)を
._asdict() メソッドを使用してシリアル化可能なディクショナリに変換します。
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
トレースを Cloud Trace に送信する
OpenTelemetry をサポートする計測ライブラリを使用してトレースを Cloud Trace に送信するには、.set_up
メソッドでインポートして初期化します。一般的なエージェント フレームワークでは、Open Telemetry Google Cloud
インテグレーションと、OpenInferenceや
OpenLLMetryなどの計測化
フレームワークを組み合わせて使用できます。
たとえば、次のテンプレートは、トレースを Cloud Trace にエクスポートするように 基本的な例を変更したものです。
OpenInference
まず、必要なパッケージ
を pip を使用してインストールします。
pip install openinference-instrumentation-langchain==0.1.34次に、インストルメンタをインポートして初期化します。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
まず、必要なパッケージ
を pip を使用してインストールします。
pip install opentelemetry-instrumentation-langchain==0.38.10次に、インストルメンタをインポートして初期化します。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
環境変数を使用する
環境変数を設定するには、開発時に
os.environを介して環境変数を使用できるようにし、エージェントのデプロイ時に
環境変数を
定義するの手順に沿って操作します。
Secret Manager と統合する
Secret Manager と統合するには:
次のコマンドを実行して、クライアント ライブラリをインストールします。
pip install google-cloud-secret-managerデプロイされたエージェントのロールを付与する の手順に沿って、コンソールからサービス アカウントに「Secret Manager Secret Accessor」ロール(
roles/secretmanager.secretAccessor)を付与します。 Google Cloud.set_upメソッドでクライアントをインポートして初期化し、必要に応じて対応するシークレットを取得します。たとえば、次のテンプレートは、Secret Manager に保存されている 基本的な例 を API キーを使用するように 変更したものですChatAnthropic。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
認証情報を扱う
エージェントがデプロイされると、さまざまなタイプの認証情報を処理する必要が生じることがあります。
- サービス アカウントから発生する アプリケーションのデフォルト認証情報(ADC)、
- OAuth、
- 外部アカウント(Workload Identity 連携)の認証情報の ID プロバイダ。
アプリケーションのデフォルト認証情報
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
コードでは次のように使用できます。
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
詳細については、アプリケーションのデフォルト認証情報 の仕組みをご覧ください。
OAuth
通常、ユーザー認証情報は OAuth 2.0 を使用して取得します。
アクセス トークン(
oauthlib など)がある場合は、
google.oauth2.credentials.Credentials インスタンスを作成できます。また、更新トークンを取得した場合は、更新トークンとトークン URI
を指定して、認証情報を自動的に更新できるようにすることもできます。
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
ここで、TOKEN_URI、CLIENT_ID、
CLIENT_SECRET は OAuth クライアント
認証情報を作成するに基づいています。
アクセス トークンがない場合は、google_auth_oauthlib.flow を使用して
OAuth 2.0 認証コード
フローを実行し、対応する google.oauth2.credentials.Credentials インスタンスを取得できます。
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
詳細については、google_auth_oauthlib.flow
モジュールのドキュメントをご覧ください。
ID プロバイダ
メールとパスワード、電話番号、Google、Facebook、GitHub などのソーシャル プロバイダ、カスタム認証メカニズムを使用してユーザーの認証を行う場合は、Identity Platform または Firebase Authentication、または OpenID Connect(OIDC)をサポートする任意の ID プロバイダを使用できます。
詳細については、OIDC ID プロバイダからリソースにアクセスするをご覧ください。
エラー処理
API エラーが構造化された JSON 形式で返されるようにするには、try...except
ブロックを使用してエージェントのコード内でエラー処理を実装することをおすすめします。このブロックはデコレータに抽象化できます。
Agent Platform はさまざまなステータス コードを内部で処理できますが、Python には、すべての例外タイプで関連する HTTP ステータス コードを使用してエラーを表す標準的な方法がありません。基盤となるサービス内のすべての可能な Python 例外を HTTP ステータスにマッピングしようとすると、複雑になり、維持が困難になります。
よりスケーラブルなアプローチは、エージェント メソッド内で関連する例外を明示的にキャッチするか、error_wrapper
などの再利用可能なデコレータを使用することです。次に、適切なステータスコードを関連付け(たとえば、codeと
error属性をカスタム例外に追加するか、標準例外を具体的に処理する)、エラーを JSON ディクショナリとして返り値の形式にします。これにより、エージェント
メソッド自体のコード変更を最小限に抑えることができます。多くの場合、デコレータを追加するだけで済みます。
次に、エージェントでエラー処理を実装する方法の例を示します。
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
上記のコードを実行すると、次の出力が生成されます。
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}