Os modelos de agentes no Vertex AI Agent Engine são definidos como classes Python. Os passos seguintes mostram como criar um modelo personalizado para instanciar agentes implementáveis no Vertex AI:
- Exemplo básico
- (Opcional) Respostas graduais
- (Opcional) Registe métodos personalizados
- (Opcional) Forneça anotações de tipo
- (Opcional) Enviar rastreios para o Cloud Trace
- (Opcional) Trabalhar com variáveis de ambiente
- (Opcional) Integrar com o Secret Manager
- (Opcional) Processar credenciais
- (Opcional) Lidar com erros
Exemplo básico
Para dar um exemplo básico, a seguinte classe Python é um modelo para instanciar agentes implementáveis no Vertex AI (pode atribuir à variável CLASS_NAME
um valor como MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerações sobre a implementação
Ao escrever a sua classe Python, os três métodos seguintes são importantes:
__init__()
:- Use este método apenas para parâmetros de configuração do agente. Por exemplo, pode usar este método para recolher os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos seus utilizadores. Também pode usar este método para recolher parâmetros como o ID do projeto, a região, as credenciais da aplicação e as chaves da API.
- O construtor devolve um objeto que tem de ser "pickle-able" para poder ser implementado no Vertex AI Agent Engine. Por conseguinte, deve inicializar os clientes de serviço e estabelecer ligações a bases de dados no método
.set_up
em vez do método__init__
. - Este método é opcional. Se não for especificado, o Vertex AI usa o construtor Python predefinido para a classe.
set_up()
:- Tem de usar este método para definir a lógica de inicialização do agente. Por exemplo, use este método para estabelecer ligações a bases de dados ou serviços dependentes, importar pacotes dependentes ou pré-calcular dados usados para pedidos de publicação.
- Este método é opcional. Se não for especificado, o Vertex AI assume que o agente não precisa de chamar um método
.set_up
antes de publicar as consultas do utilizador.
query()
/stream_query()
:- Use
query()
para devolver a resposta completa como um único resultado. - Use
stream_query()
para devolver a resposta em partes à medida que fica disponível, o que permite uma experiência de streaming. O métodostream_query
tem de devolver um objeto iterável (por exemplo, um gerador) para permitir o streaming. - Pode implementar ambos os métodos se quiser suportar interações de resposta única e de streaming com o seu agente.
- Deve dar a este método uma string de documentação clara que defina o que faz,
documente os respetivos atributos e forneça anotações de tipo para as respetivas entradas.
Evite argumentos variáveis no método
query
estream_query
.
- Use
Instancie o agente localmente
Pode criar uma instância local do seu agente através do seguinte código:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Teste o método query
Pode testar o agente enviando consultas para a instância local:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
A resposta é um dicionário semelhante ao seguinte:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Consultar de forma assíncrona
Para responder a consultas de forma assíncrona, pode definir um método (como async_query
) que devolve uma rotina colaborativa do Python. Por exemplo, o seguinte modelo expande o exemplo básico para responder de forma assíncrona e é implementável no Vertex AI:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Teste o método async_query
Pode testar o agente localmente chamando o método async_query
. Segue-se um exemplo:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
A resposta é um dicionário semelhante ao seguinte:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respostas de streaming
Para transmitir respostas a consultas, pode definir um método denominado stream_query
que produz respostas. Por exemplo, o seguinte modelo expande o exemplo básico para transmitir respostas e é implementável na Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Seguem-se alguns aspetos essenciais a ter em conta quando usar a API de streaming:
- Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o seu agente precisar de tempos de processamento mais longos, considere dividir a tarefa em partes mais pequenas.
- Modelos e cadeias de streaming: a interface Runnable do LangChain suporta streaming, pelo que pode fazer streaming de respostas não só de agentes, mas também de modelos e cadeias.
- Compatibilidade com o LangChain: tenha em atenção que os métodos assíncronos, como o método
astream_event
do LangChain, não são suportados de momento. - Restrinja a geração de conteúdo: se tiver problemas de contrapressão (em que o produtor gera dados mais rapidamente do que o consumidor os consegue processar), deve restringir a taxa de geração de conteúdo. Isto pode ajudar a evitar o excesso de capacidade do buffer e garantir uma experiência de streaming sem problemas.
Teste o método stream_query
Pode testar a consulta de streaming localmente chamando o método stream_query
e iterando os resultados. Segue-se um exemplo:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Este código imprime cada parte da resposta à medida que é gerada. O resultado pode ter um aspeto semelhante a este:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Neste exemplo, cada bloco contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e o resultado final.
Fazer stream de respostas de forma assíncrona
Para transmitir respostas de forma assíncrona, pode definir um método (por exemplo, async_stream_query
) que devolve um gerador assíncrono. Por exemplo, o seguinte modelo expande o exemplo básico para transmitir respostas de forma assíncrona e é implementável na Vertex AI:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Teste o método async_stream_query
Semelhante ao código para testar consultas de streaming, pode testar o agente localmente chamando o método async_stream_query
e iterando os resultados. Segue-se um exemplo:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Este código imprime cada parte da resposta à medida que é gerada. O resultado pode ter um aspeto semelhante a este:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Registar métodos personalizados
Por predefinição, os métodos query
e stream_query
são registados como operações no agente implementado.
Pode substituir o comportamento predefinido e definir o conjunto de operações a registar através do método register_operations
.
As operações podem ser registadas como modos de execução padrão (representados por uma string vazia
""
) ou de streaming ("stream"
).
Para registar várias operações, pode definir um método denominado register_operations
que liste os métodos a disponibilizar aos utilizadores quando o agente for implementado. No código de exemplo seguinte, o método register_operations
resulta no registo do agente implementado de query
e get_state
como
operações executadas de forma síncrona e stream_query
e get_state_history
como
operações que transmitem as respostas:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Pode testar os métodos personalizados chamando-os diretamente na instância local do agente, de forma semelhante à forma como testaria os métodos query
e stream_query
.
Fornecer anotações de tipo
Pode usar anotações de tipo para especificar os tipos de entrada e saída esperados dos métodos do seu agente. Quando o agente é implementado, apenas os tipos serializáveis em JSON são suportados na entrada e na saída das operações suportadas pelo agente. Os esquemas das entradas e saídas podem ser anotados através de TypedDict
ou modelos Pydantic.
No exemplo seguinte, anotamos a entrada como TypedDict
e convertemos
a saída não processada de .get_state
(que é um NamedTuple
) num dicionário
serializável através do respetivo método ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Enviar rastreios para o Cloud Trace
Para enviar rastreios para o Cloud Trace com bibliotecas de instrumentação que suportam o OpenTelemetry, pode importá-los e inicializá-los no método .set_up
. Para frameworks de agentes comuns, pode usar a integração do Open Telemetry Google Cloud em combinação com um framework de instrumentação, como o OpenInference ou o OpenLLMetry.
Por exemplo, o modelo seguinte é uma modificação do exemplo básico para exportar rastreios para o Cloud Trace:
OpenInference
Primeiro, instale o pacote necessário
com pip
executando o seguinte comando:
pip install openinference-instrumentation-langchain==0.1.34
Em seguida, importe e inicialize o instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Primeiro, instale o pacote necessário
com pip
executando o seguinte comando:
pip install opentelemetry-instrumentation-langchain==0.38.10
Em seguida, importe e inicialize o instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Trabalhar com variáveis de ambiente
Para definir variáveis de ambiente, certifique-se de que estão disponíveis através de os.environ
durante o desenvolvimento e siga as instruções em Defina variáveis de ambiente
quando implementar o agente.
Integração com o Secret Manager
Para integrar com o Secret Manager:
Instale a biblioteca cliente executando o seguinte comando
pip install google-cloud-secret-manager
Siga as instruções em Conceda funções a um agente implementado para conceder à conta de serviço a função "Aceder ao Secret Manager Secret" (
roles/secretmanager.secretAccessor
) através da Google Cloud consola.Importe e inicialize o cliente no método
.set_up
e obtenha o segredo correspondente quando necessário. Por exemplo, o modelo seguinte é uma modificação do exemplo básico para usar uma chave da API paraChatAnthropic
que foi armazenada no Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Processamento de credenciais
Quando o agente é implementado, pode ter de processar diferentes tipos de credenciais:
- Credenciais padrão da aplicação (ADC), que surgem frequentemente de contas de serviço,
- OAuth que surgem frequentemente nas contas de utilizador e
- Fornecedores de identidade para credenciais de contas externas (federação de identidades da carga de trabalho).
Credenciais padrão da aplicação
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Pode ser usado no código da seguinte forma:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Para ver detalhes, consulte o artigo Como funcionam as Credenciais padrão da aplicação.
OAuth
Normalmente, as credenciais do utilizador são obtidas através do OAuth 2.0.
Se tiver um token de acesso (por exemplo, de oauthlib
),
pode criar uma instância google.oauth2.credentials.Credentials
. Além disso, se obtiver uma chave de atualização, também pode especificar a chave de atualização e o URI da chave para permitir que as credenciais sejam atualizadas automaticamente:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Aqui, TOKEN_URI
, CLIENT_ID
e CLIENT_SECRET
baseiam-se em Criar uma credencial do cliente OAuth.
Se não tiver um token de acesso, pode usar google_auth_oauthlib.flow
para
executar o fluxo de concessão de autorização do OAuth 2.0
para obter uma instância google.oauth2.credentials.Credentials
correspondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Para ver detalhes, consulte a documentação do módulo google_auth_oauthlib.flow
.
Fornecedor de identidade
Se quiser autenticar utilizadores através de email/palavra-passe, número de telefone, fornecedores sociais como o Google, o Facebook ou o GitHub, ou um mecanismo de autenticação personalizado, pode usar a Identity Platform ou o Firebase Authentication, ou qualquer fornecedor de identidade que suporte o OpenID Connect (OIDC).
Para ver detalhes, consulte o artigo Aceder a recursos a partir de um fornecedor de identidade OIDC.
Processamento de erros
Para garantir que os erros da API são devolvidos num formato JSON estruturado, recomendamos que implemente o processamento de erros no código do agente através de um bloco try...except
, que pode ser abstraído num decorador.
Embora o Vertex AI Agent Engine possa processar vários códigos de estado internamente, o Python não tem uma forma padronizada de representar erros com códigos de estado HTTP associados em todos os tipos de exceções. Tentar mapear todas as exceções possíveis do Python para estados HTTP no serviço subjacente seria complexo e difícil de manter.
Uma abordagem mais escalável consiste em captar explicitamente as exceções relevantes nos métodos do agente ou usar um decorador reutilizável, como error_wrapper
. Em seguida, pode associar códigos de estado adequados (por exemplo, adicionando atributos code
e error
a exceções personalizadas ou processando exceções padrão especificamente) e formatar o erro como um dicionário JSON para o valor de retorno. Isto requer uma alteração mínima ao código nos próprios métodos do agente, muitas vezes apenas exigindo que adicione o decorador.
Segue-se um exemplo de como pode implementar o processamento de erros no seu agente:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
O código anterior resulta no seguinte resultado:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
O que se segue?
- Use um agente personalizado.
- Avalie um agente.
- Implemente um agente.
- Resolva problemas de desenvolvimento de um agente.
- Receba apoio técnico.