I passaggi riportati di seguito mostrano come creare un modello personalizzato per creare istanze di agenti di cui è possibile eseguire il deployment su Agent Platform:
- Esempio di base
- (Facoltativo) Risposte dinamiche
- (Facoltativo) Registra metodi personalizzati
- (Facoltativo) Fornisci annotazioni di tipo
- (Facoltativo) Invia tracce a Cloud Trace
- (Facoltativo) Utilizza le variabili di ambiente
- (Facoltativo) Integra con Secret Manager
- (Facoltativo) Gestisci le credenziali
- (Facoltativo) Gestisci gli errori
Esempio di base
Per fornire un esempio di base, la seguente classe Python è un modello per creare istanze di agenti di cui è possibile eseguire il deployment su Agent Platform (puoi assegnare alla variabile CLASS_NAME un valore come MyAgent):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Considerazioni sul deployment
Quando scrivi la classe Python, i seguenti tre metodi sono importanti:
__init__():- Utilizza questo metodo solo per i parametri di configurazione dell'agente. Ad esempio, puoi utilizzare questo metodo per raccogliere i parametri del modello e gli attributi di sicurezza come argomenti di input dagli utenti. Puoi anche utilizzare questo metodo per raccogliere parametri come l'ID progetto, la regione, le credenziali dell'applicazione e le chiavi API.
- Il costruttore restituisce un oggetto che deve essere "pickle-able" per poter essere sottoposto a deployment in Agent Runtime. Pertanto, devi inizializzare i client di servizio e stabilire le connessioni ai database nel metodo
.set_upanziché nel metodo__init__. - Questo metodo è facoltativo. Se non è specificato, Agent Runtime utilizza il costruttore Python predefinito per la classe.
set_up():- Devi utilizzare questo metodo per definire la logica di inizializzazione dell'agente. Ad esempio, utilizzi questo metodo per stabilire connessioni a database o servizi dipendenti, importare pacchetti dipendenti o precalcolare i dati utilizzati per l'erogazione delle query.
- Questo metodo è facoltativo. Se non è specificato, Agent Runtime presuppone che l'agente non debba chiamare un metodo
.set_upprima di pubblicare le query utente.
query()/stream_query():- Utilizza
query()per restituire la risposta completa come singolo risultato. - Utilizza
stream_query()per restituire la risposta in blocchi man mano che diventa disponibile, consentendo un'esperienza di streaming. Il metodostream_querydeve restituire un oggetto iterabile (ad esempio un generatore) per abilitare lo streaming. - Puoi implementare entrambi i metodi se vuoi supportare le interazioni a risposta singola e di streaming con il tuo agente.
- Devi fornire a questo metodo una stringa di documentazione chiara che ne definisca la funzione, ne documenti gli attributi e fornisca annotazioni di tipo per gli input.
Evita gli argomenti variabili nei metodi
queryestream_query.
- Utilizza
Crea un'istanza dell'agente in locale
Puoi creare un'istanza locale dell'agente utilizzando il seguente codice:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testa il metodo query
Puoi testare l'agente inviando query all'istanza locale:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
La risposta è un dizionario simile al seguente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Esecuzione di query in modo asincrono
Per rispondere alle query in modo asincrono, puoi definire un metodo (ad esempio async_query)
che restituisce una coroutine Python. Ad esempio, il seguente modello estende l'esempio di base per rispondere in modo asincrono ed è di cui è possibile eseguire il deployment su Agent Platform:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testa il metodo async_query
Puoi testare l'agente in locale chiamando il metodo async_query. Ecco un esempio:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
La risposta è un dizionario simile al seguente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Risposte dinamiche
Per trasmettere in streaming le risposte alle query, puoi definire un metodo denominato stream_query che genera risposte. Ad esempio, il seguente modello estende l'esempio di base per trasmettere in streaming le risposte ed è di cui è possibile eseguire il deployment su Agent Platform:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Ecco alcuni aspetti chiave da tenere presenti quando utilizzi l'API di streaming:
- Timeout massimo: il timeout massimo per le risposte di streaming è di 10 minuti. Se l'agente richiede tempi di elaborazione più lunghi, valuta la possibilità di suddividere l'attività in blocchi più piccoli.
- Modelli e catene di streaming: l'interfaccia Runnable di LangChain supporta lo streaming, quindi puoi trasmettere in streaming le risposte non solo dagli agenti, ma anche da modelli e catene.
- Compatibilità con LangChain: tieni presente che al momento i metodi asincroni come il metodo
di LangChain
astream_eventnon sono supportati. - Limitazione della generazione di contenuti: se riscontri problemi di contropressione (dove il produttore genera dati più velocemente di quanto il consumatore possa elaborarli), devi limitare la velocità di generazione dei contenuti. In questo modo puoi evitare overflow del buffer e garantire un'esperienza di streaming fluida.
Testa il metodo stream_query
Puoi testare la query di streaming in locale chiamando il metodo stream_query e scorrendo i risultati. Ecco un esempio:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Questo codice stampa ogni blocco della risposta man mano che viene generato. L'output potrebbe essere simile al seguente:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
In questo esempio, ogni blocco contiene informazioni diverse sulla risposta, ad esempio le azioni intraprese dall'agente, i messaggi scambiati e l'output finale.
Risposte di streaming in modo asincrono
Per trasmettere in streaming le risposte in modo asincrono, puoi definire un metodo (ad esempio
async_stream_query) che restituisce un generatore
asincrono. Ad esempio, il seguente modello estende l'esempio di base per trasmettere in streaming le risposte in modo asincrono ed è di cui è possibile eseguire il deployment su Agent Platform:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Testa il metodo async_stream_query
Analogamente al codice per testare le query di streaming, puoi
testare l'agente in locale chiamando il metodo async_stream_query e scorrendo
i risultati. Ecco un esempio:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Questo codice stampa ogni blocco della risposta man mano che viene generato. L'output potrebbe essere simile al seguente:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Registrazione di metodi personalizzati
Per impostazione predefinita, i metodi query e stream_query vengono registrati come operazioni
nell'agente di cui è stato eseguito il deployment. Puoi ignorare il comportamento predefinito e definire l'insieme di operazioni da registrare utilizzando il metodo register_operations.
Le operazioni possono essere registrate come modalità di esecuzione standard (rappresentate da una stringa vuota
"") o di streaming ("stream") execution modes.
Per registrare più operazioni, puoi definire un metodo denominato register_operations che elenca i metodi da rendere disponibili agli utenti quando viene eseguito il deployment dell'agente. Nel seguente codice di esempio, il register_operations
metodo fa sì che l'agente di cui è stato eseguito il deployment registri query e get_state come
operazioni eseguite in modo sincrono e stream_query e get_state_history come
operazioni che trasmettono in streaming le risposte:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Puoi testare i metodi personalizzati chiamandoli direttamente sull'istanza locale
dell'agente, in modo simile a come testeresti i metodi query e
stream_query.
Fornire annotazioni di tipo
Puoi utilizzare le annotazioni di tipo per specificare i tipi di input e output previsti dei metodi dell'agente. Quando viene eseguito il deployment dell'agente, nell'input e nell'output delle operazioni supportate dall'agente sono supportati solo i tipi serializzabili in JSON. Gli schemi degli input e degli output possono essere annotati utilizzando i modelli TypedDict o Pydantic.
Nell'esempio seguente, annotiamo l'input come TypedDict e convertiamo l'output non elaborato da .get_state (che è un NamedTuple) in un dizionario serializzabile utilizzando il relativo metodo ._asdict():
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Invio di tracce a Cloud Trace
Per inviare tracce a Cloud Trace con le librerie di instrumentazione che supportano OpenTelemetry, puoi importarle e inizializzarle nel metodo .set_up. Per
i framework di agenti comuni, potresti essere in grado di utilizzare Open Telemetry Google Cloud
l'integrazione in combinazione con un framework di instrumentazione
come OpenInference o
OpenLLMetry.
Ad esempio, il seguente modello è una modifica del esempio di base per esportare le tracce in Cloud Trace:
OpenInference
Per prima cosa, installa il pacchetto richiesto
utilizzando pip eseguendo
pip install openinference-instrumentation-langchain==0.1.34Quindi, importa e inizializza l'instrumentatore:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Per prima cosa, installa il pacchetto richiesto
utilizzando pip eseguendo
pip install opentelemetry-instrumentation-langchain==0.38.10Quindi, importa e inizializza l'instrumentatore:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Utilizzo delle variabili di ambiente
Per impostare le variabili di ambiente, assicurati che siano disponibili tramite
os.environ durante lo sviluppo e segui le istruzioni riportate in Definire
le variabili di ambiente
durante il deployment dell'agente.
Integrazione con Secret Manager
Per integrare con Secret Manager:
Installa la libreria client eseguendo
pip install google-cloud-secret-managerSegui le istruzioni riportate in Concedere ruoli per un agente di cui è stato eseguito il deployment per concedere al service account il ruolo "Secret Manager Secret Accessor" (
roles/secretmanager.secretAccessor) tramite la Google Cloud console.Importa e inizializza il client nel metodo
.set_upe recupera il secret corrispondente quando necessario. Ad esempio, il seguente modello è una modifica dell'esempio di base per utilizzare una chiave API perChatAnthropicche è stata archiviata in Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Gestione delle credenziali
Quando viene eseguito il deployment dell'agente, potrebbe essere necessario gestire diversi tipi di credenziali:
- Credenziali predefinite dell'applicazione (ADC) che in genere derivano da service account,
- OAuth che in genere derivano da account utente e
- Provider di identità per le credenziali di account esterni (federazione delle identità per i workload).
Credenziali predefinite dell'applicazione
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Può essere utilizzato nel codice nel seguente modo:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Per informazioni dettagliate, consulta Come funzionano le credenziali predefinite dell'applicazione works.
OAuth
Le credenziali utente vengono in genere ottenute utilizzando OAuth 2.0.
Se hai un token di accesso (ad esempio da
oauthlib), puoi creare un'istanza
google.oauth2.credentials.Credentials. Inoltre, se ottieni un token di aggiornamento, puoi anche specificare il token di aggiornamento e l'URI del token per consentire l'aggiornamento automatico delle credenziali:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Qui, TOKEN_URI, CLIENT_ID e
CLIENT_SECRET si basano su Creare una credenziale client OAuth.
Se non hai un token di accesso, puoi utilizzare google_auth_oauthlib.flow per
eseguire il flusso di concessione dell'autorizzazione OAuth 2.0 per
ottenere un'istanza google.oauth2.credentials.Credentials corrispondente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Per informazioni dettagliate, consulta la documentazione del google_auth_oauthlib.flow
modulo.
Provider di identità
Se vuoi autenticare gli utenti utilizzando email/password, numero di telefono, provider social come Google, Facebook o GitHub o un meccanismo di autenticazione personalizzato, puoi utilizzare Identity Platform o Firebase Authentication o qualsiasi provider di identità che supporti OpenID Connect (OIDC).
Per informazioni dettagliate, consulta Accesso alle risorse da un provider di identità OIDC.
Gestione degli errori
Per assicurarti che gli errori dell'API vengano restituiti in un formato JSON strutturato, ti consigliamo di implementare la gestione degli errori nel codice dell'agente utilizzando un blocco try...except, che può essere estratto in un decoratore.
Sebbene Agent Platform possa gestire internamente vari codici di stato, Python non dispone di un modo standardizzato per rappresentare gli errori con i codici di stato HTTP associati in tutti i tipi di eccezione. Tentare di mappare tutte le possibili eccezioni Python agli stati HTTP all'interno del servizio sottostante sarebbe complesso e difficile da gestire.
Un approccio più scalabile consiste nell'intercettare esplicitamente le eccezioni pertinenti all'interno dei metodi dell'agente o utilizzando un decoratore riutilizzabile come error_wrapper. Puoi
quindi associare i codici di stato appropriati (ad esempio aggiungendo code e
error attributi alle eccezioni personalizzate o gestendo in modo specifico le eccezioni standard
) e formattare l'errore come un dizionario JSON per il valore restituito.
Ciò richiede modifiche minime al codice all'interno dei metodi dell'agente, spesso solo l'aggiunta del decoratore.
Di seguito è riportato un esempio di come puoi implementare la gestione degli errori nell'agente:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
Il codice precedente genera il seguente output:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
Passaggi successivi
Esegui il deployment degli agenti
Scopri i cinque modi per eseguire il deployment di un agente su Agent Platform Runtime in base alle tue esigenze di sviluppo.
Valuta i tuoi agenti
Crea ed esegui il deployment di un agente di base e utilizza il servizio di valutazione di AI generativa per valutare l'agente
Risolvi i problemi relativi alla creazione di agenti
Scopri come risolvere gli errori comuni durante la creazione di agenti personalizzati.