Desenvolva um agente personalizado

Os modelos de agentes no Vertex AI Agent Engine são definidos como classes Python. Os passos seguintes mostram como criar um modelo personalizado para instanciar agentes implementáveis no Vertex AI:

  1. Exemplo básico
  2. (Opcional) Respostas graduais
  3. (Opcional) Registe métodos personalizados
  4. (Opcional) Forneça anotações de tipo
  5. (Opcional) Enviar rastreios para o Cloud Trace
  6. (Opcional) Trabalhar com variáveis de ambiente
  7. (Opcional) Integrar com o Secret Manager
  8. (Opcional) Processar credenciais
  9. (Opcional) Lidar com erros

Exemplo básico

Para dar um exemplo básico, a seguinte classe Python é um modelo para instanciar agentes implementáveis no Vertex AI (pode atribuir à variável CLASS_NAME um valor como MyAgent):

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Considerações sobre a implementação

Ao escrever a sua classe Python, os três métodos seguintes são importantes:

  1. __init__():
    • Use este método apenas para parâmetros de configuração do agente. Por exemplo, pode usar este método para recolher os parâmetros do modelo e os atributos de segurança como argumentos de entrada dos seus utilizadores. Também pode usar este método para recolher parâmetros como o ID do projeto, a região, as credenciais da aplicação e as chaves da API.
    • O construtor devolve um objeto que tem de ser "pickle-able" para poder ser implementado no Vertex AI Agent Engine. Por conseguinte, deve inicializar os clientes de serviço e estabelecer ligações a bases de dados no método .set_up em vez do método __init__.
    • Este método é opcional. Se não for especificado, o Vertex AI usa o construtor Python predefinido para a classe.
  2. set_up():
    • Tem de usar este método para definir a lógica de inicialização do agente. Por exemplo, use este método para estabelecer ligações a bases de dados ou serviços dependentes, importar pacotes dependentes ou pré-calcular dados usados para pedidos de publicação.
    • Este método é opcional. Se não for especificado, o Vertex AI assume que o agente não precisa de chamar um método .set_up antes de publicar as consultas do utilizador.
  3. query() / stream_query():
    • Use query() para devolver a resposta completa como um único resultado.
    • Use stream_query() para devolver a resposta em partes à medida que fica disponível, o que permite uma experiência de streaming. O método stream_query tem de devolver um objeto iterável (por exemplo, um gerador) para permitir o streaming.
    • Pode implementar ambos os métodos se quiser suportar interações de resposta única e de streaming com o seu agente.
    • Deve dar a este método uma string de documentação clara que defina o que faz, documente os respetivos atributos e forneça anotações de tipo para as respetivas entradas. Evite argumentos variáveis no método query e stream_query.

Instancie o agente localmente

Pode criar uma instância local do seu agente através do seguinte código:

agent = CLASS_NAME(
    model=model,  # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Teste o método query

Pode testar o agente enviando consultas para a instância local:

response = agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

A resposta é um dicionário semelhante ao seguinte:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Consultar de forma assíncrona

Para responder a consultas de forma assíncrona, pode definir um método (como async_query) que devolve uma rotina colaborativa do Python. Por exemplo, o seguinte modelo expande o exemplo básico para responder de forma assíncrona e é implementável no Vertex AI:

class AsyncAgent(CLASS_NAME):

    async def async_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.ainvoke(**kwargs):
            yield dumpd(chunk)

agent = AsyncAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Teste o método async_query

Pode testar o agente localmente chamando o método async_query. Segue-se um exemplo:

response = await agent.async_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)

A resposta é um dicionário semelhante ao seguinte:

{"input": "What is the exchange rate from US dollars to Swedish currency?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Respostas de streaming

Para transmitir respostas a consultas, pode definir um método denominado stream_query que produz respostas. Por exemplo, o seguinte modelo expande o exemplo básico para transmitir respostas e é implementável na Vertex AI:

from typing import Iterable

class StreamingAgent(CLASS_NAME):

    def stream_query(self, **kwargs) -> Iterable:
        from langchain.load.dump import dumpd

        for chunk in self.graph.stream(**kwargs):
            yield dumpd(chunk)

agent = StreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Seguem-se alguns aspetos essenciais a ter em conta quando usar a API de streaming:

  • Tempo limite máximo: o tempo limite máximo para respostas de streaming é de 10 minutos. Se o seu agente precisar de tempos de processamento mais longos, considere dividir a tarefa em partes mais pequenas.
  • Modelos e cadeias de streaming: a interface Runnable do LangChain suporta streaming, pelo que pode fazer streaming de respostas não só de agentes, mas também de modelos e cadeias.
  • Compatibilidade com o LangChain: tenha em atenção que os métodos assíncronos, como o método astream_event do LangChain, não são suportados de momento.
  • Restrinja a geração de conteúdo: se tiver problemas de contrapressão (em que o produtor gera dados mais rapidamente do que o consumidor os consegue processar), deve restringir a taxa de geração de conteúdo. Isto pode ajudar a evitar o excesso de capacidade do buffer e garantir uma experiência de streaming sem problemas.

Teste o método stream_query

Pode testar a consulta de streaming localmente chamando o método stream_query e iterando os resultados. Segue-se um exemplo:

import pprint

for chunk in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Este código imprime cada parte da resposta à medida que é gerada. O resultado pode ter um aspeto semelhante a este:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Neste exemplo, cada bloco contém informações diferentes sobre a resposta, como as ações realizadas pelo agente, as mensagens trocadas e o resultado final.

Fazer stream de respostas de forma assíncrona

Para transmitir respostas de forma assíncrona, pode definir um método (por exemplo, async_stream_query) que devolve um gerador assíncrono. Por exemplo, o seguinte modelo expande o exemplo básico para transmitir respostas de forma assíncrona e é implementável na Vertex AI:

class AsyncStreamingAgent(CLASS_NAME):

    async def async_stream_query(self, **kwargs):
        from langchain.load.dump import dumpd

        for chunk in self.graph.astream(**kwargs):
            yield dumpd(chunk)

agent = AsyncStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Teste o método async_stream_query

Semelhante ao código para testar consultas de streaming, pode testar o agente localmente chamando o método async_stream_query e iterando os resultados. Segue-se um exemplo:

import pprint

async for chunk in agent.async_stream_query(
    input="What is the exchange rate from US dollars to Swedish currency?"
):
    # Use pprint with depth=1 for a more concise, high-level view of the
    # streamed output.
    # To see the full content of the chunk, use:
    # print(chunk)
    pprint.pprint(chunk, depth=1)

Este código imprime cada parte da resposta à medida que é gerada. O resultado pode ter um aspeto semelhante a este:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
           '10.5751 SEK. \n'}

Registar métodos personalizados

Por predefinição, os métodos query e stream_query são registados como operações no agente implementado. Pode substituir o comportamento predefinido e definir o conjunto de operações a registar através do método register_operations. As operações podem ser registadas como modos de execução padrão (representados por uma string vazia "") ou de streaming ("stream").

Para registar várias operações, pode definir um método denominado register_operations que liste os métodos a disponibilizar aos utilizadores quando o agente for implementado. No código de exemplo seguinte, o método register_operations resulta no registo do agente implementado de query e get_state como operações executadas de forma síncrona e stream_query e get_state_history como operações que transmitem as respostas:

from typing import Iterable

class CustomAgent(StreamingAgent):

    def get_state(self) -> dict: # new synchronous method
        return self.graph.get_state(**kwargs)._asdict()

    def get_state_history(self) -> Iterable: # new streaming operation
        for state_snapshot in self.graph.get_state_history(**kwargs):
            yield state_snapshot._asdict()

    def register_operations(self):
        return {
            # The list of synchronous operations to be registered
            "": ["query", "get_state"],
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"],
        }

Pode testar os métodos personalizados chamando-os diretamente na instância local do agente, de forma semelhante à forma como testaria os métodos query e stream_query.

Fornecer anotações de tipo

Pode usar anotações de tipo para especificar os tipos de entrada e saída esperados dos métodos do seu agente. Quando o agente é implementado, apenas os tipos serializáveis em JSON são suportados na entrada e na saída das operações suportadas pelo agente. Os esquemas das entradas e saídas podem ser anotados através de TypedDict ou modelos Pydantic.

No exemplo seguinte, anotamos a entrada como TypedDict e convertemos a saída não processada de .get_state (que é um NamedTuple) num dicionário serializável através do respetivo método ._asdict():

from typing import Any, Dict, TypedDict

# schemas.py
class RunnableConfig(TypedDict, total=False):
    metadata: Dict[str, Any]
    configurable: Dict[str, Any]

# agents.py
class AnnotatedAgent(CLASS_NAME):

    def get_state(self, config: RunnableConfig) -> dict:
        return self.graph.get_state(config=config)._asdict()

    def register_operations(self):
        return {"": ["query", "get_state"]}

Enviar rastreios para o Cloud Trace

Para enviar rastreios para o Cloud Trace com bibliotecas de instrumentação que suportam o OpenTelemetry, pode importá-los e inicializá-los no método .set_up. Para frameworks de agentes comuns, pode usar a integração do Open Telemetry Google Cloud em combinação com um framework de instrumentação, como o OpenInference ou o OpenLLMetry.

Por exemplo, o modelo seguinte é uma modificação do exemplo básico para exportar rastreios para o Cloud Trace:

OpenInference

Primeiro, instale o pacote necessário com pip executando o seguinte comando:

pip install openinference-instrumentation-langchain==0.1.34

Em seguida, importe e inicialize o instrumentador:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from openinference.instrumentation.langchain import LangChainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangChainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

OpenLLMetry

Primeiro, instale o pacote necessário com pip executando o seguinte comando:

pip install opentelemetry-instrumentation-langchain==0.38.10

Em seguida, importe e inicialize o instrumentador:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
        location: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.location = location

    def set_up(self):
        # The additional code required for tracing instrumentation.
        from opentelemetry import trace
        from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import SimpleSpanProcessor
        from opentelemetry.instrumentation.langchain import LangchainInstrumentor
        import google.cloud.trace_v2 as cloud_trace_v2
        import google.auth

        credentials, _ = google.auth.default()

        trace.set_tracer_provider(TracerProvider())
        cloud_trace_exporter = CloudTraceSpanExporter(
            project_id=self.project,
            client=cloud_trace_v2.TraceServiceClient(
                credentials=credentials.with_quota_project(self.project),
            ),
        )
        trace.get_tracer_provider().add_span_processor(
            SimpleSpanProcessor(cloud_trace_exporter)
        )
        LangchainInstrumentor().instrument()
        # end of additional code required

        import vertexai
        from langchain_google_vertexai import ChatVertexAI
        from langgraph.prebuilt import create_react_agent

        vertexai.init(project=self.project, location=self.location)

        model = ChatVertexAI(model_name=self.model_name)
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Trabalhar com variáveis de ambiente

Para definir variáveis de ambiente, certifique-se de que estão disponíveis através de os.environ durante o desenvolvimento e siga as instruções em Defina variáveis de ambiente quando implementar o agente.

Integração com o Secret Manager

Para integrar com o Secret Manager:

  1. Instale a biblioteca cliente executando o seguinte comando

    pip install google-cloud-secret-manager
  2. Siga as instruções em Conceda funções a um agente implementado para conceder à conta de serviço a função "Aceder ao Secret Manager Secret" (roles/secretmanager.secretAccessor) através da Google Cloud consola.

  3. Importe e inicialize o cliente no método .set_up e obtenha o segredo correspondente quando necessário. Por exemplo, o modelo seguinte é uma modificação do exemplo básico para usar uma chave da API para ChatAnthropic que foi armazenada no Secret Manager:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str,
        tools: Sequence[Callable],
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.secret_id = secret_id # <- new

    def set_up(self):
        from google.cloud import secretmanager
        from langchain_anthropic import ChatAnthropic
        from langgraph.prebuilt import create_react_agent

        # Get the API Key from Secret Manager here.
        self.secret_manager_client = secretmanager.SecretManagerServiceClient()
        secret_version = self.secret_manager_client.access_secret_version(request={
            "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
        })
        # Use the API Key from Secret Manager here.
        model = ChatAnthropic(
            model_name=self.model_name,
            model_kwargs={"api_key": secret_version.payload.data.decode()},  # <- new
        )
        self.graph = create_react_agent(model, tools=self.tools)

    def query(self, **kwargs):
        return self.graph.invoke(**kwargs)

Processamento de credenciais

Quando o agente é implementado, pode ter de processar diferentes tipos de credenciais:

  1. Credenciais padrão da aplicação (ADC), que surgem frequentemente de contas de serviço,
  2. OAuth que surgem frequentemente nas contas de utilizador e
  3. Fornecedores de identidade para credenciais de contas externas (federação de identidades da carga de trabalho).

Credenciais padrão da aplicação

import google.auth

credentials, project = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

Pode ser usado no código da seguinte forma:

from typing import Callable, Sequence

class CLASS_NAME:
    def __init__(
        self,
        model: str = "meta/llama3-405b-instruct-maas",
        tools: Sequence[Callable],
        location: str,
        project: str,
    ):
        self.model_name = model
        self.tools = tools
        self.project = project
        self.endpoint = f"https://{location}-aiplatform.googleapis.com"
        self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'

    def query(self, **kwargs):
        import google.auth
        from langchain_openai import ChatOpenAI
        from langgraph.prebuilt import create_react_agent

        # Note: the credential lives for 1 hour by default.
        # After expiration, it must be refreshed.
        creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        creds.refresh(google.auth.transport.requests.Request())

        model = ChatOpenAI(
            model=self.model_name,
            base_url=self.base_url,
            api_key=creds.token,  # Use the token from the credentials here.
        )
        graph = create_react_agent(model, tools=self.tools)
        return graph.invoke(**kwargs)

Para ver detalhes, consulte o artigo Como funcionam as Credenciais padrão da aplicação.

OAuth

Normalmente, as credenciais do utilizador são obtidas através do OAuth 2.0.

Se tiver um token de acesso (por exemplo, de oauthlib), pode criar uma instância google.oauth2.credentials.Credentials. Além disso, se obtiver uma chave de atualização, também pode especificar a chave de atualização e o URI da chave para permitir que as credenciais sejam atualizadas automaticamente:

credentials = google.oauth2.credentials.Credentials(
    token="ACCESS_TOKEN",
    refresh_token="REFRESH_TOKEN",  # Optional
    token_uri="TOKEN_URI",          # E.g. "https://oauth2.googleapis.com/token"
    client_id="CLIENT_ID",          # Optional
    client_secret="CLIENT_SECRET"   # Optional
)

Aqui, TOKEN_URI, CLIENT_ID e CLIENT_SECRET baseiam-se em Criar uma credencial do cliente OAuth.

Se não tiver um token de acesso, pode usar google_auth_oauthlib.flow para executar o fluxo de concessão de autorização do OAuth 2.0 para obter uma instância google.oauth2.credentials.Credentials correspondente:

from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json

# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
    "name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())

# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
    client_config,
    scopes=['https://www.googleapis.com/auth/cloud-platform'],
    state="OAUTH_FLOW_STATE"  # from flow.authorization_url(...)
)

# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials

# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai

vertexai.init(
    project="PROJECT_ID",
    location="LOCATION",
    credentials=credentials, # specify the credentials here
)

Para ver detalhes, consulte a documentação do módulo google_auth_oauthlib.flow.

Fornecedor de identidade

Se quiser autenticar utilizadores através de email/palavra-passe, número de telefone, fornecedores sociais como o Google, o Facebook ou o GitHub, ou um mecanismo de autenticação personalizado, pode usar a Identity Platform ou o Firebase Authentication, ou qualquer fornecedor de identidade que suporte o OpenID Connect (OIDC).

Para ver detalhes, consulte o artigo Aceder a recursos a partir de um fornecedor de identidade OIDC.

Processamento de erros

Para garantir que os erros da API são devolvidos num formato JSON estruturado, recomendamos que implemente o processamento de erros no código do agente através de um bloco try...except, que pode ser abstraído num decorador.

Embora o Vertex AI Agent Engine possa processar vários códigos de estado internamente, o Python não tem uma forma padronizada de representar erros com códigos de estado HTTP associados em todos os tipos de exceções. Tentar mapear todas as exceções possíveis do Python para estados HTTP no serviço subjacente seria complexo e difícil de manter.

Uma abordagem mais escalável consiste em captar explicitamente as exceções relevantes nos métodos do agente ou usar um decorador reutilizável, como error_wrapper. Em seguida, pode associar códigos de estado adequados (por exemplo, adicionando atributos code e error a exceções personalizadas ou processando exceções padrão especificamente) e formatar o erro como um dicionário JSON para o valor de retorno. Isto requer uma alteração mínima ao código nos próprios métodos do agente, muitas vezes apenas exigindo que adicione o decorador.

Segue-se um exemplo de como pode implementar o processamento de erros no seu agente:

from functools import wraps
import json

def error_wrapper(func):
    @wraps(func)  # Preserve original function metadata
    def wrapper(*args, **kwargs):
        try:
            # Execute the original function with its arguments
            return func(*args, **kwargs)
        except Exception as err:
            error_code = getattr(err, 'code')
            error_message = getattr(err, 'error')

            # Construct the error response dictionary
            error_response = {
                "error": {
                    "code": error_code,
                    "message": f"'{func.__name__}': {error_message}"
                }
            }
            # Return the Python dictionary directly.
            return error_response

    return wrapper

# Example exception
class SessionNotFoundError(Exception):
    def __init__(self, session_id, message="Session not found"):
        self.code = 404
        self.error = f"{message}: {session_id}"
        super().__init__(self.error)

# Example Agent Class
class MyAgent:
    @error_wrapper
    def get_session(self, session_id: str):
        # Simulate the condition where the session isn't found
        raise SessionNotFoundError(session_id=session_id)


# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))

O código anterior resulta no seguinte resultado: json { "error": { "code": 404, "message": "Invocation error in 'get_session': Session not found: nonexistent_session_123" } }

O que se segue?