Usar um agente personalizado

Antes de começar

Este tutorial pressupõe que você leu e seguiu as instruções em:

Receber uma instância de um agente

Para consultar um agente, primeiro você precisa de uma instância dele. É possível criar uma instância nova ou acessar uma instância atual de um agente.

Para receber o agente que corresponde a um ID de recurso específico:

SDK da Agent Platform

Execute o seguinte código:

import vertexai

client = vertexai.Client(  # For service interactions via client.agent_engines
    project="PROJECT_ID",
    location="LOCATION",
)

agent = client.agent_engines.get(name="projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

print(agent)

em que

solicitações

Execute o seguinte código:

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

response = requests.get(
f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID",
    headers={
        "Content-Type": "application/json; charset=utf-8",
        "Authorization": f"Bearer {get_identity_token()}",
    },
)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID

Ao usar o SDK da Agent Platform para Python, o agent objeto corresponde a uma AgentEngine classe que contém o seguinte:

O restante desta seção pressupõe que você tenha uma instância, nomeada como agent.

Listar operações aceitas

Ao desenvolver o agente localmente, você tem acesso e conhecimento das operações que ele aceita. Para usar um agente implantado, é possível enumerar as operações que ele aceita:

SDK da Agent Platform

Execute o seguinte código:

print(agent.operation_schemas())

solicitações

Execute o seguinte código:

import json

json.loads(response.content).get("spec").get("classMethods")

REST

Representado em spec.class_methods da resposta à solicitação do curl.

O esquema de cada operação é um dicionário que documenta as informações de um método para o agente que você pode chamar. O conjunto de operações aceitas depende do framework usado para desenvolver o agente:

Como exemplo, a seguir está o esquema da operação query de um LangchainAgent:

{'api_mode': '',
 'name': 'query',
 'description': """Queries the Agent with the given input and config.
    Args:
        input (Union[str, Mapping[str, Any]]):
            Required. The input to be passed to the Agent.
        config (langchain_core.runnables.RunnableConfig):
            Optional. The config (if any) to be used for invoking the Agent.
    Returns:
        The output of querying the Agent with the given input and config.
""",            '        ',
 'parameters': {'$defs': {'RunnableConfig': {'description': 'Configuration for a Runnable.',
                                             'properties': {'configurable': {...},
                                                            'run_id': {...},
                                                            'run_name': {...},
                                                            ...},
                                             'type': 'object'}},
                'properties': {'config': {'nullable': True},
                               'input': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}},
                'required': ['input'],
                'type': 'object'}}

em que

  • name é o nome da operação (ou seja, agent.query para uma operação chamada query).
  • api_mode é o modo de API da operação ("" para síncrono, "stream" para streaming).
  • description é uma descrição da operação com base na docstring do método.
  • parameters é o esquema dos argumentos de entrada no formato de esquema OpenAPI.

Consultar o agente usando operações aceitas

Para agentes personalizados, é possível usar qualquer uma das seguintes operações de consulta ou streaming definidas ao desenvolver o agente:

Alguns frameworks só aceitam operações de consulta ou streaming específicas:

Framework Operações de consulta aceitas
Kit de Desenvolvimento de Agente async_stream_query
LangChain query, stream_query
LangGraph query, stream_query
AG2 query
LlamaIndex query

Consultar o agente

Consulte o agente usando a operação query:

SDK da Agent Platform

agent.query(input="What is the exchange rate from US dollars to Swedish Krona today?")

solicitações

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

requests.post(
    f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:query",
    headers={
        "Content-Type": "application/json; charset=utf-8",
        "Authorization": f"Bearer {get_identity_token()}",
    },
    data=json.dumps({
        "class_method": "query",
        "input": {
            "input": "What is the exchange rate from US dollars to Swedish Krona today?"
        }
    })
)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:query -d '{
  "class_method": "query",
  "input": {
    "input": "What is the exchange rate from US dollars to Swedish Krona today?"
  }
}'

A resposta da consulta é uma string semelhante à saída de um teste de aplicativo local:

{"input": "What is the exchange rate from US dollars to Swedish Krona today?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Transmitir respostas do agente

Transmita uma resposta do agente usando a operação stream_query:

SDK da Agent Platform

agent = agent_engines.get("projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

for response in agent.stream_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
):
    print(response)

solicitações

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

requests.post(
    f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:streamQuery",
    headers={
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_identity_token()}",
    },
    data=json.dumps({
        "class_method": "stream_query",
        "input": {
            "input": "What is the exchange rate from US dollars to Swedish Krona today?"
        },
    }),
    stream=True,
)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:streamQuery?alt=sse -d '{
  "class_method": "stream_query",
  "input": {
    "input": "What is the exchange rate from US dollars to Swedish Krona today?"
  }
}'

O Agent Platform transmite respostas como uma sequência de objetos gerados de forma iterativa. Por exemplo, um conjunto de três respostas pode ser semelhante ao seguinte:

{'actions': [{'tool': 'get_exchange_rate', ...}]}  # first response
{'steps': [{'action': {'tool': 'get_exchange_rate', ...}}]}  # second response
{'output': 'The exchange rate is 11.0117 SEK per USD as of 2024-12-03.'}  # final response

Consultar o agente de forma assíncrona

Se você definiu uma operação async_query ao criar o agente, há suporte para consultas assíncronas do lado do cliente do agente no SDK do Agent Platform para Python:

SDK da Agent Platform para Python

agent = agent_engines.get("projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

response = await agent.async_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)

A resposta da consulta é um dicionário igual à saída de um teste local:

{"input": "What is the exchange rate from US dollars to Swedish Krona today?",
 # ...
 "output": "For 1 US dollar you will get 10.7345 Swedish Krona."}

Transmitir respostas do agente de forma assíncrona

Se você definiu uma operação async_stream_query ao criar o agente, é possível transmitir uma resposta do agente de forma assíncrona usando uma das operações dele (por exemplo, async_stream_query):

SDK da Agent Platform para Python

agent = agent_engines.get("projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

async for response in agent.async_stream_query(
    input="What is the exchange rate from US dollars to Swedish Krona today?"
):
    print(response)

A operação async_stream_query chama o mesmo streamQuery endpoint nos bastidores e transmite respostas de forma assíncrona como uma sequência de objetos gerados de forma iterativa. Por exemplo, um conjunto de três respostas pode ser semelhante ao seguinte:

{'actions': [{'tool': 'get_exchange_rate', ...}]}  # first response
{'steps': [{'action': {'tool': 'get_exchange_rate', ...}}]}  # second response
{'output': 'The exchange rate is 11.0117 SEK per USD as of 2024-12-03.'}  # final response

As respostas precisam ser as mesmas geradas durante o teste local.

Jobs de consulta de longa duração

Para consultas que podem levar muito tempo para serem concluídas (até sete dias), é possível executá-las como jobs de longa duração. Para mais informações, consulte Usar um agente do ADK.

Iniciar um job de consulta de longa duração

Para iniciar um job de consulta de longa duração:

SDK da Agent Platform para Python

import vertexai

client = vertexai.Client(
    project="PROJECT_ID",
    location="LOCATION",
)

response = client.agent_engines.run_query_job(
    name="projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID",
    config={
        "query": '{ "input": {"product": "Google"} }',
        "output_gcs_uri": "gs://GCS_BUCKET_NAME/OUTPUT_FILE",
    },
)
print(response)

REST

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1beta1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID:asyncQuery -d \
'{
  "input_gcs_uri": "gs://GCS_BUCKET_NAME/INPUT_FILE",
  "output_gcs_uri": "gs://GCS_BUCKET_NAME/OUTPUT_FILE"
}'

Verificar o status de um job de consulta de longa duração

Para verificar o status e recuperar os resultados de um job de consulta de longa duração:

SDK da Agent Platform

response = client.agent_engines.check_query_job(
    name="JOB_NAME",
    config={
        "retrieve_result": True,
    },
)
print(response)

A seguir