数据同步的几种姿势

作者:小跟班_mya日期:2026/6/3

先搞清楚一件事:为什么要有 ES 数据同步?

想象一下,你开了一家图书馆

  • MySQL 是图书管理员,负责把书一本本摆好、登记在册(增删改查、事务保证)
  • Elasticsearch 是图书检索系统,用户输入"三体",0.01 秒就能告诉你第几排第几架(全文检索、聚合分析)

问题来了:管理员刚上架了一本新书,检索系统怎么立刻知道这本书的存在?这就是数据同步要解决的事。


方案一:同步双写——"一边记账一边贴标签"

场景

你开了个电商网站,用户下单后,订单既要存到 MySQL(方便财务对账),又要同步到 ES(方便客服搜索订单)。

代码
1@Service
2public class OrderService {
3    
4    @Autowired
5    private OrderMapper orderMapper;
6    
7    @Autowired
8    private RestHighLevelClient esClient;
9    
10    @Transactional
11    public void createOrder(Order order) {
12        // 第一步:写入 MySQL
13        orderMapper.insert(order);
14        
15        // 第二步:写入 ES
16        IndexRequest request = new IndexRequest("orders")
17            .id(order.getId().toString())
18            .source(JSON.toJSONString(order), XContentType.JSON);
19        esClient.index(request, RequestOptions.DEFAULT);
20    }
21}

这就像你一边记账一边贴标签,看起来简单直接,但问题很大:

  • 耦合严重:每个写 MySQL 的地方都要加一段 ES 代码,改一个接口要改三处
  • 性能暴跌:本来写 MySQL 只要 10ms,现在还要等 ES 响应,接口响应时间翻倍
  • 数据不一致:MySQL 写成功了,ES 突然宕机了,数据就丢了。你总不能让用户"订单创建失败,因为搜索引擎挂了"吧?

结论:小项目玩玩可以,生产环境慎用。和redis也会有这样那样的问题

方案二:异步双写(MQ)——"写完账扔纸条给同事"

场景

还是电商订单,但这次我们学聪明了:写完 MySQL 就返回成功,同步 ES 的事交给别人干。

架构
1用户下单  写入 MySQL  发送 MQ 消息  返回成功给用户
2                                    
3                            消费者拿到消息  写入 ES
代码
1@Service
2public class OrderService {
3    
4    @Autowired
5    private OrderMapper orderMapper;
6    
7    @Autowired
8    private KafkaTemplate<String, String> kafkaTemplate;
9    
10    @Transactional
11    public void createOrder(Order order) {
12        // 第一步:写入 MySQL
13        orderMapper.insert(order);
14        
15        // 第二步:发 MQ 消息(事务提交后自动发送)
16        kafkaTemplate.send("order-sync-topic", order.getId().toString());
17        // 方法结束,事务提交,用户已经收到"下单成功"
18    }
19}
20
21// 消费者:异步同步到 ES
22@Component
23public class OrderEsSyncConsumer {
24    
25    @Autowired
26    private RestHighLevelClient esClient;
27    
28    @Autowired
29    private OrderMapper orderMapper;
30    
31    @KafkaListener(topics = "order-sync-topic")
32    public void syncToEs(String orderId) {
33        try {
34            // 根据 ID 查询完整订单数据
35            Order order = orderMapper.selectById(orderId);
36            
37            // 写入 ES
38            IndexRequest request = new IndexRequest("orders")
39                .id(orderId)
40                .source(JSON.toJSONString(order), XContentType.JSON);
41            esClient.index(request, RequestOptions.DEFAULT);
42            
43        } catch (Exception e) {
44            // 写入失败,抛异常触发 MQ 重试
45            throw new RuntimeException("ES 同步失败", e);
46        }
47    }
48}

这就像你写完账扔了张纸条给同事,让他帮你贴标签。好处是:

  • 解耦:ES 挂了不影响用户下单
  • 性能好:用户不用等 ES 响应
  • 可重试:同事贴标签失败了,纸条还在,可以重试

但要注意几个坑:

  • 顺序问题:如果用户先创建订单,然后立刻修改地址,两条消息可能乱序到达。解决:用 orderId 作为 Kafka 分区键,保证同一个订单的消息顺序消费
  • 幂等问题:同一条消息可能被消费多次。解决:ES 写入时用 id 作为文档 ID,重复写入会自动覆盖

方案三:Canal 监听 Binlog——"偷偷看管理员的记账本"

场景

这是企业级最主流的方案。你的系统已经很庞大了,有几十个微服务都在写 MySQL,你不可能让每个服务都去发 MQ 消息。怎么办?

答案是:直接监听 MySQL 的 binlog 日志,谁改了数据我都知道。

MySQL → 开启 binlog → Canal(伪装成 MySQL 从库)→ 解析 binlog → 发送到 MQ → 消费者写入 ES

配置

第一步:MySQL 开启 binlog

1-- my.cnf 配置
2[mysqld]
3log-bin=mysql-bin
4binlog-format=ROW
5server-id=1

第二步:Canal 配置

1# canal.properties
2canal.instance.master.address=127.0.0.1:3306
3canal.instance.dbUsername=canal
4canal.instance.dbPassword=canal
5canal.mq.topic=canal-mysql-sync

第三步:消费者代码

1@Component
2public class CanalEsSyncConsumer {
3    
4    @Autowired
5    private RestHighLevelClient esClient;
6    
7    @KafkaListener(topics = "canal-mysql-sync")
8    public void syncToEs(String canalMessage) {
9        // 解析 Canal 消息
10        CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(canalMessage);
11        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
12        
13        String tableName = entry.getHeader().getTableName();
14        CanalEntry.EventType eventType = rowChange.getEventType();
15        
16        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
17            if (eventType == CanalEntry.EventType.INSERT || 
18                eventType == CanalEntry.EventType.UPDATE) {
19                // 新增或更新:写入 ES
20                String json = parseRowDataToJson(rowData.getAfterColumnsList());
21                IndexRequest request = new IndexRequest(tableName)
22                    .id(extractId(rowData.getAfterColumnsList()))
23                    .source(json, XContentType.JSON);
24                esClient.index(request, RequestOptions.DEFAULT);
25                
26            } else if (eventType == CanalEntry.EventType.DELETE) {
27                // 删除:从 ES 删除
28                DeleteRequest request = new DeleteRequest(tableName)
29                    .id(extractId(rowData.getBeforeColumnsList()));
30                esClient.delete(request, RequestOptions.DEFAULT);
31            }
32        }
33    }
34}

这就像你偷偷在管理员的记账本旁边装了个摄像头,不管谁改了账本,你都能看到。好处是:

  • 零侵入:业务代码完全不用改,新增一个微服务也不用管 ES 同步的事
  • 高实时:binlog 是实时产生的,延迟在毫秒级
  • 可靠:binlog 是 MySQL 原生的,不会丢数据

结论:中大型项目首选方案。

方案四:Logstash 定时拉取——"每隔几分钟去翻一遍账本"

场景

你对实时性要求不高,比如用户行为日志分析,T+1 出报表就行。

配置
1input {
2    jdbc {
3        jdbc_driver => "com.mysql.jdbc.Driver"
4        jdbc_url => "jdbc:mysql://localhost:3306/log_db"
5        jdbc_user => "root"
6        jdbc_password => "123456"
7        schedule => "*/5 * * * *"  # 每5分钟执行一次
8        statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"
9        use_column_value => true
10        tracking_column => "update_time"
11    }
12}
13
14output {
15    elasticsearch {
16        hosts => ["http://es-host:9200"]
17        index => "user_logs"
18        document_id => "%{id}"
19    }
20}

这就像你每隔 5 分钟去翻一遍账本,把新记录抄到检索卡片上。好处是零代码改造,坏处是:

  • 延迟高:最多延迟 5 分钟
  • 数据库压力大:每次都要全表扫描(虽然有 update_time 索引优化)

结论:适合离线分析、报表类场景。

方案五:DataX 批量同步——"搬家式迁移"

场景

你要把历史数据从 MySQL 迁移到 ES,比如老系统升级,10 亿条订单数据要搬过去。

配置
1{
2  "job": {
3    "content": [
4      {
5        "reader": {
6          "name": "mysqlreader",
7          "parameter": {
8            "username": "root",
9            "password": "123456",
10            "connection": [
11              {
12                "querySql": ["SELECT * FROM orders WHERE create_time >= '2024-01-01'"],
13                "jdbcUrl": ["jdbc:mysql://localhost:3306/order_db"]
14              }
15            ],
16            "splitPk": "id"
17          }
18        },
19        "writer": {
20          "name": "elasticsearchwriter",
21          "parameter": {
22            "endpoint": "http://es-host:9200",
23            "index": "orders",
24            "type": "_doc",
25            "batchSize": 1000,
26            "splitter": ","
27          }
28        }
29      }
30    ],
31    "setting": {
32      "speed": {
33        "channel": 5
34      }
35    }
36  }
37}

这就像搬家公司,一次性把整个仓库的东西搬走。适合大数据量迁移,但不适合实时同步

方案实时性侵入性复杂度适用场景
同步双写毫秒级小项目、简单业务
MQ 异步双写秒级中型分布式系统
Canal 监听 Binlog毫秒级中高企业级首选
Logstash 定时拉取分钟级离线分析、报表
DataX 批量同步一次性历史数据迁移

生产环境必须遵守的 4 条铁律

铁律一:ES 写入必须幂等
1//  正确:指定文档 ID,重复写入自动覆盖
2IndexRequest request = new IndexRequest("orders")
3    .id(order.getId().toString())  // 指定 ID
4    .source(json, XContentType.JSON);
铁律二:必须处理顺序问题
1// 发送消息时,用业务 ID 作为分区键
2kafkaTemplate.send("order-sync-topic", orderId, orderJson);
3// 这样同一个 orderId 的消息永远进入同一个分区,保证顺序
铁律三:必须定期校对数据
1-- 每天凌晨跑一次校对脚本
2SELECT COUNT(*) FROM orders;  -- MySQL 数据量
3GET /orders/_count;           -- ES 数据量
4-- 如果不一致,触发全量修复
铁律四:ES 故障时要有降级方案
1@KafkaListener(topics = "order-sync-topic")
2public void syncToEs(String orderId) {
3    int retryCount = 0;
4    while (retryCount < 3) {
5        try {
6            // 写入 ES
7            esClient.index(request, RequestOptions.DEFAULT);
8            return; // 成功则退出
9        } catch (Exception e) {
10            retryCount++;
11            if (retryCount == 3) {
12                // 三次失败,写入失败队列,人工介入
13                log.error("ES 同步失败,写入死信队列: {}", orderId, e);
14                deadLetterQueue.add(orderId);
15            }
16            Thread.sleep(1000 * retryCount); // 指数退避
17        }
18    }
19}

一句话总结

  • 小项目:同步双写,简单粗暴
  • 中型项目:MQ 异步双写,解耦可靠
  • 大型项目:Canal 监听 Binlog,零侵入、高实时
  • 离线分析:Logstash 定时拉取,零代码改造
  • 数据迁移:DataX 批量同步,搬家式处理

数据同步的几种姿势》 是转载文章,点击查看原文


相关推荐


Vibe Coding 全栈实战:章鱼哥解题 07|功能跑通后的架构收敛
小小小小小鹿2026/5/28

Vibe Coding 全栈实战:章鱼哥解题 07|功能跑通后的架构收敛 上一期做完对话持久化以后,章鱼哥已经不只是一个“能回答问题”的接口了。它有了登录态,有了当前对话,有了 LangGraph thread,也能在刷新页面后恢复最近的消息。 但功能跑通以后,我回头看了一下后端模块依赖,发现了两个不太舒服的地方。 一个是 agent 依赖了 chat: agent.nodes → chat.question_classifier 另一个是 infra.llm 依赖了 rag.context_


开发了一个管理本地开发环境的软件
神奇的程序员2026/5/5

前言 前阵子换了新电脑,我在整理本地开发环境时,看到一堆需要重新装的,顿时感觉好麻烦。想着都过去这么久了,应该有工具可以做到统一管理,实现快速安装、更新、切换版本吧。 经过一番查找后,找到了mise这个东西,只需要简单的一句命令就能安装java、node、redis、go等工具,而且还支持对这些工具做统一管理(更新、删除),支持三大主流平台(macOS/Windows/Linux) 命令行始终不方便,于是我萌生了一个做GUI的想法,花了亿点时间用Flutter把它开发出来了,欢迎各位有需要的开发


Hello 算法:“走一步看一步”的智慧
灵感__idea2026/4/26

每个系列一本前端好书,帮你轻松学重点。 本系列来自上海交通大学硕士,华为高级算法工程师 靳宇栋 的 《Hello,算法》 “走一步看一步”,是我们面对不断变化的世界所采取的应对策略。 多数时候,我们无法对未来做出准确预测,只能根据上一件事的结果对下一件事做决策。介绍“分治”的时候,我们已经接触过这种策略。本篇主角依然如此,但又有所不同。 先看个例子。 爬楼梯 给一个 n 阶楼梯,每步可以上 1 阶或者 2 阶,问有多少种方案可以爬到楼顶? 假设 n 是3,那么方案共 3 种。如下图所示。 这


Linux 驱动开发入门:从最简单的 hello 驱动到硬件交互
4. 嵌入式铲屎官2026/4/17

Linux 驱动开发入门:从最简单的 hello 驱动到硬件交互 🎉 写给未来的自己和领导:本文是 Linux 驱动开发的 入门级保姆教程,从零开始搭建驱动框架,逐行解释代码,记录每一个踩过的坑。无论你是刚接触内核编程,还是想快速上手 GPIO 中断,都能在这里找到清晰的思路和可复现的步骤。 📚 目录 引言:驱动是什么?驱动的基本框架 —— 一切皆文件实战:第一个 hello 驱动 3.1 完整的驱动源码(带详细注释)3.2 编译驱动 —— Makefile 解析3.3 上机测试 ——


深入剖析 Redis 经典面试题
Thomas.Sir2026/4/9

1、什么是Redis?它主要用来什么的? Redis,英文全称是Remote Dictionary Server(远程字典服务),是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。 与MySQL数据库不同的是,Redis的数据是存在内存中的。它的读写速度非常快,每秒可以处理超过10万次读写操作。因此redis被广泛应用于缓存,另外,Redis也经常用来做分布式锁。除此之外,Redis支持事务、持久化、


260331-OpenWebUI统计所有Chat的对话字符个数
GuokLiu2026/4/1

1 OWUI启动脚本 # Open-WebUI Settings export DATA_DIR='data0331' export ENABLE_SIGNUP=True export DEFAULT_USER_ROLE='admin' export DEFAULT_GROUP_ID='xai' export OFFLINE_MODE=false export HF_HUB_OFFLINE=1 # OpenAI API 配置 export ENABLE_OLLAMA_API=false ex


[LangChain智能体本质论]中间件是如何参与Agent、Model和Tool三者交互的?
JaydenAI2026/3/23

LangChain的中间件(Middleware)是围绕Agent执行流程构建的“可插拔钩子系统”。它允许开发者在不修改核心逻辑的情况下,在执行的关键节点(如输入处理、模型调用前后、输出解析等)对数据流进行拦截、修改或验证。中间件类型以AgentMiddleware为基类。 1. AgentMiddleware AgentMiddleware是一个泛型类型,两个泛型参数分别代表状态和静态上下文的类型,我们可以利用state_schema字段得到状态类型。它的name属性返回中间件的名称,默认返回


haproxy案例项目(haproxy+dns+nginx+nfs+keepalived)
爱莉希雅&&&2026/3/15

HAProxy+Nginx+NFS+DNS 部署笔记 一、环境规划 主机名IP 地址安装软件角色说明haproxy192.168.72.100/24haproxy负载均衡器nginx1192.168.72.10/24nginx、nfs-utilsWeb 节点 1(挂载 NFS 共享)nginx2192.168.72.20/24nginx、nfs-utilsWeb 节点 2(挂载 NFS 共享)nfs192.168.72.30/24nfs-utilsNFS 文件共享服务器dns192.168.


Spring Cloud+AI :实现分布式智能推荐系统
我不是呆头2026/3/7

欢迎文末添加好友交流,共同进步! “ 俺はモンキー・D・ルフィ。海贼王になる男だ!” 引言 在当今数字化时代,推荐系统已成为电商平台、内容分发平台、社交网络等互联网产品的核心竞争力之一。从淘宝的"猜你喜欢"、抖音的精准内容推送,到 Netflix 的影视推荐,优秀的推荐系统不仅能显著提升用户留存率和转化率,更能为企业带来可观的商业价值。据统计,亚马逊约 35% 的销售额来自推荐系统,Netflix 则通过推荐算法为用户节省了每年约 10 亿美元的搜索成本。 然而,随着业


一文搞懂激活函数!
aicoting2026/2/27

推荐直接网站在线阅读:aicoting.cn 在深度学习中,激活函数(Activation Function)是神经网络的灵魂。它不仅赋予网络非线性能力,还决定了训练的稳定性和模型性能。那么,激活函数到底是什么?为什么我们非用不可?有哪些经典函数?又该如何选择? 所有相关源码示例、流程图、模型配置与知识库构建技巧,我也将持续更新在Github:AIHub,欢迎关注收藏! 1. 什么是激活函数,为什么需要激活函数 激活函数的核心作用就是为神经网络引入非线性。 为什么需要非线性? 想象一下,如果

首页编辑器站点地图

本站内容在 CC BY-SA 4.0 协议下发布

Copyright © 2026 聚合阅读