LangGraph 入门到精通0x02:基础 API (二)

作者:chaors日期:2026/4/14

前言

今天继续学习一些 LangGraph 常用到的基础 API。废话少说上干货。

.点语法

Graph 的创建可以使用看着更简洁的 .语法。

1graph = (
2    StateGraph(State)
3    .add_node(a)
4    .add_node(b)
5
6    .add_edge(START, "a")
7    .add_edge("a", "b")
8    .add_edge("b", END)
9
10    .compile()
11)
12

步骤序列

1StateGraph(State).add_sequence([step_1, step_2, step_3])
2

并行运行

1graph = (
2    StateGraph(State)
3    .add_node(a)
4    .add_node(b)
5    .add_node(c)
6    .add_node(d)
7
8    .add_edge(START, "a")
9    .add_edge("a", "b")
10    .add_edge("a", "c")
11    .add_edge("b", "d")
12    .add_edge("c", "d")
13    .add_edge("d", END)
14
15    .compile()
16)
17

上面的 Graph就是支持并行的,b, c 两个节点就是并行运行的两个节点。这个时候图的结构为:

image.png

MapReduce

MapReduce 模式,是一套用于构建动态、可扩展的智能体(Agent)或数据处理流水线的核心架构模式。

  • 本质:将经典的“分而治之”并行计算
  • Map:“分”与“散”
    • 输入:一个包含多个待处理项(如文档列表、查询列表、任务对象)的总体状态。
    • 过程:通过一个条件边(Conditional Edge ​ 中的路由函数,利用 Send API 将输入列表“映射”为多个独立的子任务。每个 Send对象指定一个目标处理节点和一份独立的、仅包含该子任务数据的私有状态。
    • 关键:这些 Send出的子任务会并行执行,实现高效扇出(Fan-out)。
  • Reduce:“合”与“聚”
    • 输入:所有并行 Map 节点处理完成后产生的多个中间结果状态。
    • 过程:通过预定义的 Reducer(如 operator.add用于列表合并)自动将所有子任务的结果状态“归约”合并,更新回主工作流状态。
    • 关键:自动声明式,无需编写结果收集与合并代码,实现扇入(Fan-in)

图状态:

1class OverallState(TypedDict):
2    """全局状态:贯穿整个工作流"""
3    topic: str                  # 输入的大主题(如 animals)
4    subjects: list              # 生成的子主题列表
5    jokes: Annotated[list, operator.add]  # 自动合并多条并行生成的笑话
6    best_selected_joke: str     # 最终选中的最佳笑话
7
8class JokeState(TypedDict):
9    """子任务状态:仅给 generate_joke 使用"""
10    subject: str
11

核心:

1def continue_to_jokes(state: OverallState):
2    """
3    核心技术:Map-Reduce 并行执行
4    给每个子主题分发一个独立的 generate_joke 任务
5    """
6    return [Send("generate_joke", {"subject": sub}) for sub in state["subjects"]]
7

recursion_limit

  • recursion_limit:LangGraph 强制安全上限,防止死循环,我们可以控制的最大步数
    • 步数:每个 Node 的执行为一步

以下代码表示:程序最多执行 100步

1result = graph.invoke({"aggregate": []}, {"recursion_limit": 100})
2

RemainingSteps

  • RemainingSteps:LangGraph 的一个内置托管值,表示程序还剩多少步
    • ReadOnly:完全由框架自动维护
    • RemainingSteps = recursion_limit - 已走步数(框架自动算)

RetryPolicy

RetryPolicy 是一个声明式的重试策略配置对象。你通过参数告诉 LangGraph:“当这个节点执行失败时,请按我定的规矩再试几次,而不是立刻报错”。核心参数:

  • max_attempts: 最大重试次数(含首次)。
  • initial_interval: 首次重试前的等待时间(s)。
  • backoff_factor: 退避因子。每次重试间隔会乘以这个因子,用于实现“**指数退避
  • jitter: 是否在重试间隔中加入随机抖动。用于“重试风暴”。
  • retry_on: 一个异常类型或函数,用于判断何种失败才需要重试。

赶脚是不是和 http 请求的重试差不多,我们也模拟一个 http 节点 的重试case。这里实现重试策略有两种方式:

  1. 内置的 RetryPolicy 方法
1# 2. 定义针对此节点的“重试作战计划”
2api_retry_policy = RetryPolicy(
3    max_attempts=3,           # 最多尝试3次(首次+2次重试)
4    initial_interval=1.0,     # 第一次重试等1秒
5    backoff_factor=2.0,       # 指数退避:第二次等2秒,第三次等4秒
6    jitter=True,              # 增加随机抖动,避免重试风暴
7    retry_on=(requests.exceptions.Timeout,
8              requests.exceptions.ConnectionError,
9              requests.exceptions.HTTPError)  # 只针对网络和5xx错误重试
10)
11
  1. 自定义的 retry 方法
1# 方法2:自定义函数匹配 - 更精细地控制,例如只重试特定HTTP状态码
2def retry_on_policy(exception: Exception) -> bool:
3    """
4    自定义重试判断函数:
5    1. 对所有网络连接错误(ConnectionError)和超时(Timeout)进行重试。
6    2. 仅对特定HTTP状态码(429, 503)进行重试。
7    """
8    # 规则1: 如果是连接错误或超时,立即决定重试
9    if isinstance(exception, (requests.exceptions.ConnectionError,
10                              requests.exceptions.Timeout)):
11        return True
12
13    # 规则2: 如果是HTTP错误,则检查状态码
14    if isinstance(exception, requests.exceptions.HTTPError):
15        # 注意:exception.response 可能为 None(例如在请求未发出时)
16        if exception.response is not None:
17            if exception.response.status_code in (429, 503):
18                return True
19    # 其他所有情况,不重试
20    return False
21

重试方法和节点绑定:

1builder.add_node("call_api", call_external_api, retry=api_retry_policy)
2

长时记忆(向量检索)

前面我们知道 checkpointer 用于 LangGraph 中工作流的快照,但是他只能保存基于上下文的短期记忆。那么如果我们要做长时记忆该怎么做呢?

InMemoryStore

InMemoryStore 是 LangGraph 中基于 Python 字典实现的内存键值存储库。其核心主要在:

  • 存储 (put)
    • namespace:命名空间,是一个元组,用于逻辑隔离,如 ("users", "123", "preferences")
    • 键 (key):字典 key
    • 值 (value):字典 value
  • 读取 (get) :通过命名空间和键精确获取数据。
  • 检索 (search) :可检索某个命名空间下的所有条目,如果配置了嵌入模型,还能支持基于语义相似度的查询。

和用于 checkpointerInMemorySaver 不同,InMemorySaver用于保存单个对话线程(Thread)的完整状态,数据与 thread_id 强绑定。而 InMemoryStore 更像一个共享的白板或缓存,用结构化的方式(通过命名空间)存储和检索任意数据,这些数据可以被多个不同的线程或会话访问和复用。一个管“对话流程状态”,一个管“共享知识记忆”。

InMemoryStore 构造代码:

  • embed:文本嵌入模型,用于文本向量化
  • dims:嵌入模型维度。各个模型可能不同!!!
1in_memory_store = InMemoryStore(
2    index={
3        "embed": get_ali_embeddings(),  # 文本向量化
4        "dims": 1024,  # 嵌入模型固定输出维度。各个模型可能不同!!!
5    }
6)
7

咿?存储就存储呗,怎么参数还需要文本嵌入,这是为什么呢?我们上面也看到了 InMemoryStore检索功能。长期记忆若只支持精确键值检索,则效用有限。向量LLM 的引入,将其升级为一个支持模糊联想、概念匹配的智能检索系统。这也正是轻量化的 RAG 在记忆系统中的实践。

记忆 + 对话

记忆和对话是长时记忆的核心与灵魂。

1def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
2    # 获取用户ID  每个用户独立记忆空间
3    user_id = config["configurable"]["user_id"]
4    namespace = ("memories", user_id)
5
6    # 向量检索:从长时记忆中匹配当前对话相关信息
7    user_query = state["messages"][-1].content
8    memories = store.search(namespace, query=user_query)
9    user_info = "\n".join([data.value["data"] for data in memories])
10
11    # 构建系统提示:注入用户记忆
12    system_prompt = f"""
13    你是乐于助人的助手。
14    用户信息(请记住并使用):{user_info}
15    """
16
17    # 判断:用户提到 remember  自动存储记忆
18    last_msg = user_query.lower()
19    if "请记住" in last_msg:
20        # 提取用户真实输入的内容
21        new_memory = user_query.strip()
22        # 存储:命名空间 + 唯一ID + 记忆内容
23        store.put(namespace, str(uuid.uuid4()), {"data": new_memory})
24
25    # 调用AI生成回复(系统提示 + 对话历史)
26    response = model.invoke(
27        [{"role": "system", "content": system_prompt}] + state["messages"]
28    )
29    return {"messages": response}
30

Graph 构造

1workflow = (
2    StateGraph(MessagesState)
3    .add_node("dialogue_with_memory", call_model)
4    .add_edge(START, "dialogue_with_memory")
5)
6

工作流编译

双记忆配置的 Graph:

  • checkpointer:短时对话记忆(聊天上下文)
  • store:长时永久记忆(用户信息)
1agent = workflow.compile(
2    checkpointer=MemorySaver(),
3    store=in_memory_store
4)
5

示例代码

1config1 = {"configurable": {"thread_id": "1", "user_id": "1"}}
2logger.info("【第一轮对话:存储记忆】")
3for chunk in agent.stream(
4        {"messages": [{"role": "user", "content": "你好! 请记住: 我就叫 Monkey Bro"}]},
5        config1,
6        stream_mode="values"
7):
8    chunk["messages"][-1].pretty_print()
9
10# 查看存储的记忆
11logger.info("\n【已存储的长时记忆】")
12for memory in in_memory_store.search(("memories", "1")):
13    logger.info(memory.value)
14
15# --------------------------
16# 测试2:跨线程读取记忆
17# 换thread_id,依然能读取同一user_id的记忆
18# --------------------------
19config2 = {"configurable": {"thread_id": "3", "user_id": "1"}}
20logger.info("\n【第二轮对话:读取记忆】")
21for chunk in agent.stream(
22    {"messages": [{"role": "user", "content": "我的名字叫啥?"}]},
23    config2, stream_mode="values"
24):
25    chunk["messages"][-1].pretty_print()
26

Running

image.png

源码

github


LangGraph 入门到精通0x02:基础 API (二)》 是转载文章,点击查看原文


相关推荐


你的 Android App 可能白白损失了 35% 的性能——R8 全模式配置详解
陆业聪2026/4/6

字节跳动的工程师优化启动速度时,可能花了数周分析 trace、改代码;Monzo 的团队却只改了一行配置,性能指标全线提升了 35%。这不是段子,是 Google 官方 blog 2026 年 3 月底发出来的案例。 问题来了:你的项目,是不是也开着 R8,但根本没用对? R8 到底做了什么——大多数人理解是错的 很多人对 R8 的理解停留在「代码混淆 + 压缩」。打开 minifyEnabled true,觉得任务完成了。 但 R8 实际上分两种工作模式: • 兼容模式(Compatibili


核心概念层——深入理解 Agent 是什么
想打游戏的程序猿2026/3/28

1 Agent vs ChatBot:从根本上理解区别 1.1 一个直观的例子 假设你对 AI 说:"帮我分析一下我们公司上周的销售数据,找出表现最好的产品,并给团队发一封总结邮件"。 ChatBot 的反应: ChatBot: "要分析销售数据,你可以按以下步骤操作: 1. 打开数据库,执行 SQL 查询获取上周的销售记录 2. 使用 Excel 或 Python 进行数据汇总 3. 找出销售额最高的产品 4. 撰写邮件总结发送给团队 你需要我帮你写 SQL 查询语句吗?" → ChatB


配置钉钉龙虾OpenClaw机器人调用OpenMetadata
光于前裕于后2026/3/20

目录 一、前言1️⃣钉钉(DingTalk)2️⃣OpenClaw3️⃣OpenMetadata4️⃣MCP(Model Context Protocol) 二、安装OpenClaw三、配置OpenClaw钉钉机器人四、调用OpenMetadata MCP 一、前言 先介绍下这四个工具/协议的定位与核心能力,本文将从零开始配置。 1️⃣钉钉(DingTalk) 阿里巴巴旗下的企业协作平台,2014年上线,是中国市场份额最大的企业即时通讯与办公套件之一。 核心能


10分钟搭建 Windows + WSL + Codex环境
Lei_official2026/3/12

并不是 AI 替代人,而是会用 AI 的人替代不会用 AI 的人。 我的大模型使用历程 从2023年秋季,我开始使用对话型的大模型,提升工作和学习的效率,以及回答一些生活上的常识问题。最开始是 ChatGPT 的免费版本,随着使用频率提高,慢慢会遇到问答超过上限的情况。随后便开通了Plus订阅直至今日。期间也曾使用过 Deepseek、Gemini、Minimax 等等,不过最主要的仍然是 ChatGPT,个人感觉它在回答的质量、速度、上下文方面体验最好。 在这段历程里,网页对话型 的 AI


MySQL中 SHOW FULL PROCESSLIST` 输出中 `State` 列的所有可能值
左Python右Java2026/3/4

SHOW FULL PROCESSLIST输出中State` 列的所有可能值,以及这些值代表的含义,这能帮你精准判断数据库连接的状态(包括锁相关、执行状态等)。 一、State 列核心分类及含义 State 列描述了当前线程正在执行的操作状态,不同状态对应不同的数据库行为,以下是最常见且实用的分类(按场景划分): 1. 锁相关状态(排查锁表核心) 这是你最关心的锁表相关状态,直接反映锁等待 / 阻塞: 状态值含义Waiting for t


326. Java Stream API - 实现自定义的 toList() 与 toSet() 收集器
yaoxin5211232026/2/23

文章目录 326. Java Stream API - 实现自定义的 `toList()` 与 `toSet()` 收集器📦 实现一个自定义 `toList()` 收集器🚀 使用我们的 `ToList` 收集器🔄 将其改造成 `toSet()` 收集器✅ 修改 1:使用 `HashSet` 作为容器✅ 修改 2:声明该收集器是无序的 🧪 `ToSet` 收集器完整实现示例🎯 总结一下关键点🧠 小贴士 326. Java Stream API - 实现自定义的 toL


Kafka 生产者与消费者配置详解
倚肆2026/2/15

Kafka 生产者与消费者配置详解 一、DefaultKafkaProducerFactory 生产者配置详解 配置项示例值作用说明调优建议ProducerConfig.BOOTSTRAP_SERVERS_CONFIG"localhost:9092"Kafka 集群地址列表,生产者通过此地址发现集群。配置多个地址(用逗号分隔)以提高可用性。ProducerConfig.KEY_SERIALIZER_CLASS_CONFIGStringSerializer.class消息键的序列化器。键用于分区


Spring IOC&DI(上)
阿武不想上早八2026/2/6

Spring IOC&DI(上) 1. Spring IOC&DI Spring 是包含了众多工具方法的 IOC 容器 1.1 容器 概念:容器时用来容纳物品的装置。 例子:List/Map -> 数据存储容器;Tomcat -> Web 容器 1.2 IOC 概念:全称:Inversion of Control(控制反转),是 Spring 的核心思想,把对象交给 Spring 管理,就是 IOC 思想。 总的来说,Spring 就是一个”控制反转“的容器。 2. I


【学习笔记】C++(1)
贺一航【Niki】2026/1/28

C++学习笔记 一、基础 1、类型表示范围 2、cout 3、char 4、string 5、逻辑运算符 6、枚举 7、随机数 8、数组 9、其他 一、基础 1、类型表示范围 类型 字节数 位宽 十进制范围(大约) 具体值范围 char 1


【AI大模型开发】-基于FAISS的语义搜索系统(实战)
Java后端的Ai之路2026/1/19

向量数据库实战:基于FAISS的语义搜索系统 一、项目概述 1.1 什么是向量数据库? 向量数据库是一种专门用于存储、索引和检索高维向量数据的数据库系统。在AI领域,向量通常是指通过预训练模型(如Transformer)将文本、图像等非结构化数据转换而成的数值表示(Embedding)。 1.2 项目背景 本项目展示了如何使用阿里云百炼Embedding API生成文本向量,并结合FAISS(Facebook AI Similarity Search)构建一个简单但功能完整的语义搜索系统。 1.

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 XYZ博客