LangChain的中间件(Middleware)是围绕Agent执行流程构建的“可插拔钩子系统”。它允许开发者在不修改核心逻辑的情况下,在执行的关键节点(如输入处理、模型调用前后、输出解析等)对数据流进行拦截、修改或验证。中间件类型以AgentMiddleware为基类。
1. AgentMiddleware
AgentMiddleware是一个泛型类型,两个泛型参数分别代表状态和静态上下文的类型,我们可以利用state_schema字段得到状态类型。它的name属性返回中间件的名称,默认返回的是当前的类名。
1class AgentMiddleware(Generic[StateT, ContextT]): 2 state_schema: type[StateT] = cast("type[StateT]", _DefaultAgentState) 3 tools: Sequence[BaseTool] 4 5 @property 6 def name(self) -> str: 7 return self.__class__.__name__ 8 9 def before_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None: 10 pass 11 12 async def abefore_agent( 13 self, state: StateT, runtime: Runtime[ContextT] 14 ) -> dict[str, Any] | None: 15 pass 16 17 def before_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None: 18 pass 19 20 async def abefore_model( 21 self, state: StateT, runtime: Runtime[ContextT] 22 ) -> dict[str, Any] | None: 23 pass 24 25 def after_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None: 26 pass 27 28 async def aafter_model( 29 self, state: StateT, runtime: Runtime[ContextT] 30 ) -> dict[str, Any] | None: 31 pass 32 33 def after_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None: 34 pass 35 36 async def aafter_agent( 37 self, state: StateT, runtime: Runtime[ContextT] 38 ) -> dict[str, Any] | None: 39 pass 40 41 def wrap_model_call( 42 self, 43 request: ModelRequest, 44 handler: Callable[[ModelRequest], ModelResponse], 45 ) -> ModelCallResult: 46 msg = ( 47 "Synchronous implementation of wrap_model_call is not available. " 48 "You are likely encountering this error because you defined only the async version " 49 "(awrap_model_call) and invoked your agent in a synchronous context " 50 "(e.g., using `stream()` or `invoke()`). " 51 "To resolve this, either: " 52 "(1) subclass AgentMiddleware and implement the synchronous wrap_model_call method, " 53 "(2) use the @wrap_model_call decorator on a standalone sync function, or " 54 "(3) invoke your agent asynchronously using `astream()` or `ainvoke()`." 55 ) 56 raise NotImplementedError(msg) 57 58 async def awrap_model_call( 59 self, 60 request: ModelRequest, 61 handler: Callable[[ModelRequest], Awaitable[ModelResponse]], 62 ) -> ModelCallResult: 63 msg = ( 64 "Asynchronous implementation of awrap_model_call is not available. " 65 "You are likely encountering this error because you defined only the sync version " 66 "(wrap_model_call) and invoked your agent in an asynchronous context " 67 "(e.g., using `astream()` or `ainvoke()`). " 68 "To resolve this, either: " 69 "(1) subclass AgentMiddleware and implement the asynchronous awrap_model_call method, " 70 "(2) use the @wrap_model_call decorator on a standalone async function, or " 71 "(3) invoke your agent synchronously using `stream()` or `invoke()`." 72 ) 73 raise NotImplementedError(msg) 74 75 def wrap_tool_call( 76 self, 77 request: ToolCallRequest, 78 handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]], 79 ) -> ToolMessage | Command[Any]: 80 msg = ( 81 "Synchronous implementation of wrap_tool_call is not available. " 82 "You are likely encountering this error because you defined only the async version " 83 "(awrap_tool_call) and invoked your agent in a synchronous context " 84 "(e.g., using `stream()` or `invoke()`). " 85 "To resolve this, either: " 86 "(1) subclass AgentMiddleware and implement the synchronous wrap_tool_call method, " 87 "(2) use the @wrap_tool_call decorator on a standalone sync function, or " 88 "(3) invoke your agent asynchronously using `astream()` or `ainvoke()`." 89 ) 90 raise NotImplementedError(msg) 91 92 async def awrap_tool_call( 93 self, 94 request: ToolCallRequest, 95 handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command[Any]]], 96 ) -> ToolMessage | Command[Any]: 97 msg = ( 98 "Asynchronous implementation of awrap_tool_call is not available. " 99 "You are likely encountering this error because you defined only the sync version " 100 "(wrap_tool_call) and invoked your agent in an asynchronous context " 101 "(e.g., using `astream()` or `ainvoke()`). " 102 "To resolve this, either: " 103 "(1) subclass AgentMiddleware and implement the asynchronous awrap_tool_call method, " 104 "(2) use the @wrap_tool_call decorator on a standalone async function, or " 105 "(3) invoke your agent synchronously using `stream()` or `invoke()`." 106 ) 107 raise NotImplementedError(msg) 108
通过前面的介绍我们知道,在调用create_agent函数时可以利用tools参数进行工具注册,其实工具也可以利用tools字段封装到中间件中。中间件被注册时,其封装的工具也会一并予以注册。换句话说,create_agent方法内部会读取所有注册中间件的tools字段存储的工具,连同利用tools参数直接注册的工具一起处理。虽然Agent定义了众多方法,但我们可以将它们划分为如下两类:
- 生命周期拦截器:在Agent和Model执行前后调用,包括
- before_agent/before_model/after_agent/after_model
- abefore_agent/abefore_model/aafter_agent/aafter_model,
- 调用包装器:对Model和Tool的调用进行包装;
- wrap_model_call/wrap_tool_call
- awrap_model_call/awrap_tool_call
2. 生命周期拦截器
对于一个利用create_agent函数创建的Agent,在没有任何中间件注册的情况下,它本质上是由model和tools两个核心节点组成的Pregel对象。注册中间件的生命周期拦截器方法针对Agent和Model调用前后的拦截,是通过为Pregel对象添加额外节点和通道来实现的。我们可以利用如下这个简单的实例来验证:
1from langchain.agents import create_agent 2from dotenv import load_dotenv 3from langchain.agents.middleware.types import AgentState 4from langchain_openai import ChatOpenAI 5from PIL import Image as PILImage 6from langchain.agents.middleware import AgentMiddleware 7from typing import Any 8from langgraph.runtime import Runtime 9import io 10 11class FooMiddleware(AgentMiddleware): 12 def before_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 13 return super().before_agent(state, runtime) 14 def before_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 15 return super().before_model(state, runtime) 16 def after_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 17 return super().after_agent(state, runtime) 18 def after_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 19 return super().after_model(state, runtime) 20 21load_dotenv() 22 23def test_tool(): 24 """A test tool""" 25 26agent = create_agent( 27 model= ChatOpenAI(model="gpt-5.2-chat"), 28 tools=[test_tool], 29 middleware=[FooMiddleware()] 30) 31 32payload = agent.get_graph(xray=True).draw_mermaid_png() 33PILImage.open(io.BytesIO(payload)).show() 34 35print("channels:") 36for (name, chan) in agent.channels.items(): 37 print(f"\t[{chan.__class__.__name__}]{name}") 38print("trigger_to_nodes") 39 40for (name, nodes) in agent.trigger_to_nodes.items(): 41 print(f"\t{name}: {nodes}") 42
在如上的演示程序中,我们创建了自定义中间件类型FooMiddleware,并重写它的before_agent、before_model、after_agent和after_model四个方法,我们通过注册此中间件调用create_agent函数创建了一个Agent,并将他的拓扑结构以PNG图片的形式呈现出来(呈现效果如下所示)。
从上图可以看出,注册的中间件为Agent添加了四个节点,这四个节点对应于我们重写的四个方法,节点所在的位置体现四个方法的执行顺序:FooMiddleware.before_agent->FooMiddleware.before_model->FooMiddleware.after_model->FooMiddleware.after_agent。而且FooMiddleware.before_model可以实现针对“tools”节点的跳转,“tools”执行结束后又会被FooMiddleware.before_model拦截。
演示程序还输出了通道列表,以及节点与订阅通道之间的映射关系。从如下的输出结果可以看出,上述四个节点各自具有独立定于的通道。
1channels: 2 [BinaryOperatorAggregate]messages 3 [EphemeralValue]jump_to 4 [LastValue]structured_response 5 [EphemeralValue]__start__ 6 [Topic]__pregel_tasks 7 [EphemeralValue]branch:to:model 8 [EphemeralValue]branch:to:tools 9 [EphemeralValue]branch:to:FooMiddleware.before_agent 10 [EphemeralValue]branch:to:FooMiddleware.before_model 11 [EphemeralValue]branch:to:FooMiddleware.after_model 12 [EphemeralValue]branch:to:FooMiddleware.after_agent 13trigger_to_nodes 14 __start__: ['__start__'] 15 branch:to:model: ['model'] 16 branch:to:tools: ['tools'] 17 branch:to:FooMiddleware.before_agent: ['FooMiddleware.before_agent'] 18 branch:to:FooMiddleware.before_model: ['FooMiddleware.before_model'] 19 branch:to:FooMiddleware.after_model: ['FooMiddleware.after_model'] 20 branch:to:FooMiddleware.after_agent: ['FooMiddleware.after_agent'] 21
如果我们采用如下的方式再注册一个中间件BarMiddleware:
1class BarMiddleware(AgentMiddleware): 2 def before_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 3 return super().before_agent(state, runtime) 4 def before_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 5 return super().before_model(state, runtime) 6 def after_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 7 return super().after_agent(state, runtime) 8 def after_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None: 9 return super().after_model(state, runtime) 10 11agent = create_agent( 12 model= ChatOpenAI(model="gpt-5.2-chat"), 13 tools=[test_tool], 14 middleware=[FooMiddleware(),BarMiddleware()] 15) 16
在Agent新的拓扑结构中,优化多出四个针对BarMiddleware的节点。
3. 调用包装器
AgentMiddleware提供了四个方法(wrap_model_call、awrap_model_call、wrap_agent_call和awrap_agent_call),分别用于包装针对Model和Tool的同步和异步调用。对于作为Pregel的Agent来说,针对模型和工具的调用是由“model”和“tools”节点发出的,所以利用中间件对调用的封装也在这两个节点中完成。
1class AgentMiddleware(Generic[StateT, ContextT]): 2 def wrap_model_call( 3 self, 4 request: ModelRequest, 5 handler: Callable[[ModelRequest], ModelResponse], 6 ) -> ModelCallResult 7 async def awrap_model_call( 8 self, 9 request: ModelRequest, 10 handler: Callable[[ModelRequest], Awaitable[ModelResponse]], 11 ) -> ModelCallResult 12 13 def wrap_tool_call( 14 self, 15 request: ToolCallRequest, 16 handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]], 17 ) -> ToolMessage | Command[Any] 18 19 async def awrap_tool_call( 20 self, 21 request: ToolCallRequest, 22 handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command[Any]]], 23 ) -> ToolMessage | Command[Any] 24
3.1 针对模型调用的包装
常规的模型调用会返回一个AIMessage消息。如果采用基于ToolStrategy的结构化输出,除了返回格式化的输出外,还会涉及格式化工具生成的ToolMessage,它们被封装在一个ModelResponse对象里。所以表示模型调用结果的ModelResult类型是ModelResponse和AIMessage这两个类型的联合。
1@dataclass 2class ModelResponse: 3 result: list[BaseMessage] 4 structured_response: Any = None 5 6ModelCallResult: TypeAlias = ModelResponse | AIMessage 7
ModelRequest表示模型调用的请求,我们从中可以得到Chat模型组件、请求消息列表、系统指令、注册的工具以及针对工具选择策略、结构化输出Schema、状态、运行时和针对模型的设置。在绝大部情况下,我们通过自定义中间件包装模型调用的目的都是为了更新上述的某一个或者多个请求元素,ModelRequest利用override方法将一切变得简单。
1@dataclass(init=False) 2class ModelRequest: 3 model: BaseChatModel 4 messages: list[AnyMessage] 5 system_message: SystemMessage | None 6 tool_choice: Any | None 7 tools: list[BaseTool | dict[str, Any]] 8 response_format: ResponseFormat[Any] | None 9 state: AgentState[Any] 10 runtime: Runtime[ContextT] 11 model_settings: dict[str, Any] = field(default_factory=dict) 12 13 @property 14 def system_prompt(self) -> str | None: 15 def override(self, **overrides: Unpack[_ModelRequestOverrides]) -> ModelRequest: 16 17class _ModelRequestOverrides(TypedDict, total=False): 18 model: BaseChatModel 19 system_message: SystemMessage | None 20 messages: list[AnyMessage] 21 tool_choice: Any | None 22 tools: list[BaseTool | dict[str, Any]] 23 response_format: ResponseFormat[Any] | None 24 model_settings: dict[str, Any] 25 state: AgentState[Any] 26
由于模型调用的输入和输出类型分别是ModelRequest和ModelResponse,所以被封装的针对模型的同步调用和异步调用可以表示成Callable[[ModelRequest], ModelResponse]和Callable[[ModelRequest], Awaitable[ModelResponse]]对象,wrap_model_call/awrap_model_call方法的handler参数分别返回的就是这两个对象。
3.2 针对工具调用的包装
表示工具调用请求的ToolRequest类型定义如下,请求携带了模型生成的用于调用目标工具的ToolCall对象,代表工具自身的BaseTool对象,以及当前状态和工具运行时。ToolCallRequest也提供了override方法实现针对这些请求元素的更新。
1@dataclass 2class ToolCallRequest: 3 tool_call: ToolCall 4 tool: BaseTool | None 5 state: Any 6 runtime: ToolRuntime 7 8 def override( 9 self, **overrides: Unpack[_ToolCallRequestOverrides] 10 ) -> ToolCallRequest: 11 12class _ToolCallRequestOverrides(TypedDict, total=False): 13 tool_call: ToolCall 14 tool: BaseTool 15 state: Any 16
调用工具执行的结构可以封装成一个ToolCallRequest反馈给模型,也可以返回一个Command对象实现对状态的更新和跳转,所以wrap_tool_call/awrap_tool_call方法中表示针对工具原始调用的handler参数分别是一个Callable[[ToolCallRequest], ToolMessage|Command[Any]]和Callable[[ToolCallRequest], Awaitable[ToolMessage|Command[Any]]]对象。
在介绍create_agent方法针对工具的注册时,我们曾经说过:除了以可执行对象或者BaseTool对象标识注册的工具外,我们还可以指定一个表示注册工具JSON Schema的字典。但是以这种方式注册的工具并没有绑定一个具体的可执行对象,所以默认是无法被调用的。我们可以采用中间件的方式来解决这个问题。
1from langchain.agents import create_agent 2from dotenv import load_dotenv 3from langchain_openai import ChatOpenAI 4from langchain_core.messages import ToolMessage 5from langchain.agents.middleware import AgentMiddleware, ToolCallRequest 6from langgraph.types import Command 7from typing import Any, Callable 8 9load_dotenv() 10 11tool = { 12 "name": "get_weather", 13 "description": "Get weather information for given city", 14 "parameters": { 15 "type": "object", 16 "properties": { 17 "city": {"type": "string"} 18 }, 19 "required": ["city"] 20 } 21} 22 23class WeatherMiddleware(AgentMiddleware): 24 def wrap_tool_call( 25 self, 26 request: ToolCallRequest, 27 handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]], 28 ) -> ToolMessage | Command[Any]: 29 tool_call = request.tool_call 30 if tool_call["name"] == "get_weather": 31 city = tool_call["args"]["city"] 32 return ToolMessage( 33 content=f"It's sunny in {city}.", 34 tool_call_id=tool_call["id"]) 35 else: 36 return handler(request) 37 38 39agent = create_agent( 40 model= ChatOpenAI(model="gpt-5.2-chat"), 41 tools=[tool], 42 middleware=[WeatherMiddleware()], 43) 44 45result = agent.invoke(input={ 46 "messages":[{"role":"user", "content":"What is the weather like in Suzhou?"}] 47}) 48 49for message in result["messages"]: 50 message.pretty_print() 51
如上面的演示程序所示,我们注册的工具是一个字典,它表示注册工具的JSON Schema。其中提供了工具的名称(“get_weather”)和参数结构(包含一个必需的名为“city”的字符串成员)。注册的WeatherMiddleware通过重写的wrap_tool_call实现了针对工具调用的拦截。如果是针对工具get_weather的调用,我们将天气信息封装成返回的ToolMessage。程序执行后会以如下的方式输出消息历史:
1================================ Human Message ================================= 2 3What is the weather like in Suzhou? 4================================== Ai Message ================================== 5Tool Calls: 6 get_weather (call_LjastyaYNrovwMhSmvoJMcNz) 7 Call ID: call_LjastyaYNrovwMhSmvoJMcNz 8 Args: 9 city: Suzhou 10================================= Tool Message ================================= 11 12It's sunny in Suzhou. 13================================== Ai Message ================================== 14 15The weather in **Suzhou** is **sunny**. ☀️ 16
《[LangChain智能体本质论]中间件是如何参与Agent、Model和Tool三者交互的?》 是转载文章,点击查看原文。