使用 LangGraph 代理

准备工作

本教程假定您已阅读并遵循以下说明:

获取代理的实例

如需查询 LanggraphAgent,您需要先创建新实例获取现有实例

如需获取与特定资源 ID 对应的 LanggraphAgent,请执行以下操作:

Vertex AI SDK for Python

运行以下代码:

import vertexai

client = vertexai.Client(  # For service interactions via client.agent_engines
    project="PROJECT_ID",
    location="LOCATION",
)

agent = client.agent_engines.get(name="projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID")

print(agent)

其中

Python 请求库

运行以下代码:

from google import auth as google_auth
from google.auth.transport import requests as google_requests
import requests

def get_identity_token():
    credentials, _ = google_auth.default()
    auth_request = google_requests.Request()
    credentials.refresh(auth_request)
    return credentials.token

response = requests.get(
f"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID",
    headers={
        "Content-Type": "application/json; charset=utf-8",
        "Authorization": f"Bearer {get_identity_token()}",
    },
)

REST API

curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/reasoningEngines/RESOURCE_ID

使用 Vertex AI SDK for Python 时,agent 对象对应于包含以下内容的 AgentEngine 类:

  • 包含已部署代理相关信息的 agent.api_resource。您还可以调用 agent.operation_schemas() 以返回代理支持的操作列表。如需了解详情,请参阅支持的操作
  • 一种允许同步服务交互的 agent.api_client
  • 一种允许异步服务交互的 agent.async_api_client

本部分的其余内容假设您有一个名为 agentAgentEngine 实例。

支持的操作

LanggraphAgent 支持以下操作:

对查询的响应进行流式传输

LangGraph 支持多种流式传输模式。以下是主要流式传输模式:

  • values:此模式在每次调用节点后,会将图的完整状态以流的形式传输。
  • updates:此模式在每次调用节点后,会将图状态的更新以流的形式传输。

如需回传 values(对应于图表的完整状态),请执行以下操作:

for state_values in agent.stream_query(
    input=inputs,
    stream_mode="values",
    config={"configurable": {"thread_id": "streaming-thread-values"}},
):
    print(state_values)

如需回流式传输 updates(对应于图表状态的更新),请执行以下操作:

for state_updates in agent.stream_query(
    input=inputs,
    stream_mode="updates",
    config={"configurable": {"thread_id": "streaming-thread-updates"}},
):
    print(state_updates)

人机协同 (human-in-the-loop)

在 LangGraph 中,人机协同的一个常见做法是添加断点以中断代理的操作序列,并允许用户稍后恢复流程。

回顾

在调用 .query.stream_query 时,您可以使用 interrupt_before=interrupt_after= 参数设置断点:

from langchain.load import load as langchain_load

response = agent.query(
    input=inputs,
    interrupt_before=["tools"], # after generating the function call, before invoking the function
    interrupt_after=["tools"], # after getting a function response, before moving on
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)

langchain_load(response['messages'][-1]).pretty_print()

输出应类似如下所示:

================================== Ai Message ==================================
Tool Calls:
  get_exchange_rate (12610c50-4465-4296-b1f3-d751ec959fd5)
 Call ID: 12610c50-4465-4296-b1f3-d751ec959fd5
  Args:
    currency_from: USD
    currency_to: SEK

审批

如需批准生成的工具调用并继续执行其余代码,您可以将 None 传递给输入,并在 config 中指定线程或检查点:

from langchain.load import load as langchain_load

response = agent.query(
    input=None,  # Continue with the function call
    interrupt_before=["tools"], # after generating the function call, before invoking the function
    interrupt_after=["tools"], # after getting a function response, before moving on
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)

langchain_load(response['messages'][-1]).pretty_print()

输出应类似如下所示:

================================= Tool Message =================================
Name: get_exchange_rate

{"amount": 1.0, "base": "USD", "date": "2024-11-14", "rates": {"SEK": 11.0159}}

历史

如需列出给定线程的所有检查点,请使用 .get_state_history 方法:

for state_snapshot in agent.get_state_history(
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
):
    if state_snapshot["metadata"]["step"] >= 0:
        print(f'step {state_snapshot["metadata"]["step"]}: {state_snapshot["config"]}')
        state_snapshot["values"]["messages"][-1].pretty_print()
        print("\n")

响应将类似于以下输出序列:

step 3: {'configurable': {'thread_id': 'human-in-the-loop-deepdive', 'checkpoint_ns': '', 'checkpoint_id': '1efa2e95-ded5-67e0-8003-2d34e04507f5'}}
================================== Ai Message ==================================

The exchange rate from US dollars to Swedish krona is 1 USD to 11.0159 SEK.
step 2: {'configurable': {'thread_id': 'human-in-the-loop-deepdive', 'checkpoint_ns': '', 'checkpoint_id': '1efa2e95-d189-6a77-8002-5dbe79e2ce58'}}
================================= Tool Message =================================
Name: get_exchange_rate

{"amount": 1.0, "base": "USD", "date": "2024-11-14", "rates": {"SEK": 11.0159}}
step 1: {'configurable': {'thread_id': 'human-in-the-loop-deepdive', 'checkpoint_ns': '', 'checkpoint_id': '1efa2e95-cc7f-6d68-8001-1f6b5e57c456'}}
================================== Ai Message ==================================
Tool Calls:
  get_exchange_rate (12610c50-4465-4296-b1f3-d751ec959fd5)
 Call ID: 12610c50-4465-4296-b1f3-d751ec959fd5
  Args:
    currency_from: USD
    currency_to: SEK
step 0: {'configurable': {'thread_id': 'human-in-the-loop-deepdive', 'checkpoint_ns': '', 'checkpoint_id': '1efa2e95-c2e4-6f3c-8000-477fd654cb53'}}
================================ Human Message =================================

What is the exchange rate from US dollars to Swedish currency?

获取步骤配置

如需获取较早的检查点,请指定 checkpoint_id(和 checkpoint_ns)。首先,回退到生成工具调用的第 1 步:

snapshot_config = {}
for state_snapshot in agent.get_state_history(
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
):
    if state_snapshot["metadata"]["step"] == 1:
        snapshot_config = state_snapshot["config"]
        break

print(snapshot_config)

输出应类似如下所示:

{'configurable': {'thread_id': 'human-in-the-loop-deepdive',
  'checkpoint_ns': '',
  'checkpoint_id': '1efa2e95-cc7f-6d68-8001-1f6b5e57c456'}}

时间旅行

如需获取检查点,可以使用 .get_state 方法:

# By default, it gets the latest state [unless (checkpoint_ns, checkpoint_id) is specified]
state = agent.get_state(config={"configurable": {
    "thread_id": "human-in-the-loop-deepdive",
}})

print(f'step {state["metadata"]["step"]}: {state["config"]}')
state["values"]["messages"][-1].pretty_print()

默认情况下,它会获取最新的检查点(按时间戳)。输出应类似如下所示:

step 3: {'configurable': {'thread_id': 'human-in-the-loop-deepdive', 'checkpoint_ns': '', 'checkpoint_id': '1efa2e95-ded5-67e0-8003-2d34e04507f5'}}
================================== Ai Message ==================================

The exchange rate from US dollars to Swedish krona is 1 USD to 11.0159 SEK.

获取配置的检查点

对于给定的配置(例如,步骤配置中的 snapshot_config),您可以获取相应的检查点:

state = agent.get_state(config=snapshot_config)
print(f'step {state["metadata"]["step"]}: {state["config"]}')
state["values"]["messages"][-1].pretty_print()

输出应类似如下所示:

step 1: {'configurable': {'thread_id': 'human-in-the-loop-deepdive', 'checkpoint_ns': '', 'checkpoint_id': '1efa2e95-cc7f-6d68-8001-1f6b5e57c456'}}
================================== Ai Message ==================================
Tool Calls:
  get_exchange_rate (12610c50-4465-4296-b1f3-d751ec959fd5)
 Call ID: 12610c50-4465-4296-b1f3-d751ec959fd5
  Args:
    currency_from: USD
    currency_to: SEK

重放

如需从给定状态重放,请将状态配置(即 state["config"])传递给代理。状态配置是一个字典,如下所示:

{'configurable': {'thread_id': 'human-in-the-loop-deepdive',
  'checkpoint_ns': '',
  'checkpoint_id': '1efa2e95-cc7f-6d68-8001-1f6b5e57c456'}}

如需从 state["config"](生成工具调用的代码)开始重放,请在输入中指定 None

from langchain.load import load as langchain_load

for state_values in agent.stream_query(
    input=None, # resume
    stream_mode="values",
    config=state["config"],
):
    langchain_load(state_values["messages"][-1]).pretty_print()

这将生成类似于以下输出序列的结果:

================================== Ai Message ==================================
Tool Calls:
  get_exchange_rate (12610c50-4465-4296-b1f3-d751ec959fd5)
 Call ID: 12610c50-4465-4296-b1f3-d751ec959fd5
  Args:
    currency_from: USD
    currency_to: SEK
================================= Tool Message =================================
Name: get_exchange_rate

{"amount": 1.0, "base": "USD", "date": "2024-11-14", "rates": {"SEK": 11.0159}}
================================== Ai Message ==================================

The exchange rate from US dollars to Swedish krona is 1 USD to 11.0159 SEK.

分支

您可以使用 .update_state 方法从先前的检查点分支出来,以尝试其他场景:

branch_config = agent.update_state(
    config=state["config"],
    values={"messages": [last_message]}, # the update we want to make
)

print(branch_config)

输出应类似如下所示:

{'configurable': {'thread_id': 'human-in-the-loop-deepdive',
  'checkpoint_ns': '',
  'checkpoint_id': '1efa2e96-0560-62ce-8002-d1bb48a337bc'}}

我们可以使用 branch_config 查询代理,以便从检查点恢复更新后的状态:

from langchain.load import load as langchain_load

for state_values in agent.stream_query(
    input=None, # resume
    stream_mode="values",
    config=branch_config,
):
    langchain_load(state_values["messages"][-1]).pretty_print()

这将生成类似于以下输出序列的结果:

================================== Ai Message ==================================
Tool Calls:
  get_exchange_rate (12610c50-4465-4296-b1f3-d751ec959fd5)
 Call ID: 12610c50-4465-4296-b1f3-d751ec959fd5
  Args:
    currency_date: 2024-09-01
    currency_from: USD
    currency_to: SEK
================================= Tool Message =================================
Name: get_exchange_rate

{"amount": 1.0, "base": "USD", "date": "2024-08-30", "rates": {"SEK": 10.2241}}
================================== Ai Message ==================================

The exchange rate from US dollars to Swedish krona on 2024-08-30 was 1 USD to 10.2241 SEK.

后续步骤