先搞清楚一件事:为什么要有 ES 数据同步?
想象一下,你开了一家图书馆:
- MySQL 是图书管理员,负责把书一本本摆好、登记在册(增删改查、事务保证)
- Elasticsearch 是图书检索系统,用户输入"三体",0.01 秒就能告诉你第几排第几架(全文检索、聚合分析)
问题来了:管理员刚上架了一本新书,检索系统怎么立刻知道这本书的存在?这就是数据同步要解决的事。
方案一:同步双写——"一边记账一边贴标签"
场景
你开了个电商网站,用户下单后,订单既要存到 MySQL(方便财务对账),又要同步到 ES(方便客服搜索订单)。
代码
1@Service 2public class OrderService { 3 4 @Autowired 5 private OrderMapper orderMapper; 6 7 @Autowired 8 private RestHighLevelClient esClient; 9 10 @Transactional 11 public void createOrder(Order order) { 12 // 第一步:写入 MySQL 13 orderMapper.insert(order); 14 15 // 第二步:写入 ES 16 IndexRequest request = new IndexRequest("orders") 17 .id(order.getId().toString()) 18 .source(JSON.toJSONString(order), XContentType.JSON); 19 esClient.index(request, RequestOptions.DEFAULT); 20 } 21}
这就像你一边记账一边贴标签,看起来简单直接,但问题很大:
- 耦合严重:每个写 MySQL 的地方都要加一段 ES 代码,改一个接口要改三处
- 性能暴跌:本来写 MySQL 只要 10ms,现在还要等 ES 响应,接口响应时间翻倍
- 数据不一致:MySQL 写成功了,ES 突然宕机了,数据就丢了。你总不能让用户"订单创建失败,因为搜索引擎挂了"吧?
结论:小项目玩玩可以,生产环境慎用。和redis也会有这样那样的问题
方案二:异步双写(MQ)——"写完账扔纸条给同事"
场景
还是电商订单,但这次我们学聪明了:写完 MySQL 就返回成功,同步 ES 的事交给别人干。
架构
1用户下单 → 写入 MySQL → 发送 MQ 消息 → 返回成功给用户 2 ↓ 3 消费者拿到消息 → 写入 ES
代码
1@Service 2public class OrderService { 3 4 @Autowired 5 private OrderMapper orderMapper; 6 7 @Autowired 8 private KafkaTemplate<String, String> kafkaTemplate; 9 10 @Transactional 11 public void createOrder(Order order) { 12 // 第一步:写入 MySQL 13 orderMapper.insert(order); 14 15 // 第二步:发 MQ 消息(事务提交后自动发送) 16 kafkaTemplate.send("order-sync-topic", order.getId().toString()); 17 // 方法结束,事务提交,用户已经收到"下单成功" 18 } 19} 20 21// 消费者:异步同步到 ES 22@Component 23public class OrderEsSyncConsumer { 24 25 @Autowired 26 private RestHighLevelClient esClient; 27 28 @Autowired 29 private OrderMapper orderMapper; 30 31 @KafkaListener(topics = "order-sync-topic") 32 public void syncToEs(String orderId) { 33 try { 34 // 根据 ID 查询完整订单数据 35 Order order = orderMapper.selectById(orderId); 36 37 // 写入 ES 38 IndexRequest request = new IndexRequest("orders") 39 .id(orderId) 40 .source(JSON.toJSONString(order), XContentType.JSON); 41 esClient.index(request, RequestOptions.DEFAULT); 42 43 } catch (Exception e) { 44 // 写入失败,抛异常触发 MQ 重试 45 throw new RuntimeException("ES 同步失败", e); 46 } 47 } 48}
这就像你写完账扔了张纸条给同事,让他帮你贴标签。好处是:
- 解耦:ES 挂了不影响用户下单
- 性能好:用户不用等 ES 响应
- 可重试:同事贴标签失败了,纸条还在,可以重试
但要注意几个坑:
- 顺序问题:如果用户先创建订单,然后立刻修改地址,两条消息可能乱序到达。解决:用
orderId作为 Kafka 分区键,保证同一个订单的消息顺序消费 - 幂等问题:同一条消息可能被消费多次。解决:ES 写入时用
id作为文档 ID,重复写入会自动覆盖
方案三:Canal 监听 Binlog——"偷偷看管理员的记账本"
场景
这是企业级最主流的方案。你的系统已经很庞大了,有几十个微服务都在写 MySQL,你不可能让每个服务都去发 MQ 消息。怎么办?
答案是:直接监听 MySQL 的 binlog 日志,谁改了数据我都知道。
MySQL → 开启 binlog → Canal(伪装成 MySQL 从库)→ 解析 binlog → 发送到 MQ → 消费者写入 ES
配置
第一步:MySQL 开启 binlog
1-- my.cnf 配置 2[mysqld] 3log-bin=mysql-bin 4binlog-format=ROW 5server-id=1
第二步:Canal 配置
1# canal.properties 2canal.instance.master.address=127.0.0.1:3306 3canal.instance.dbUsername=canal 4canal.instance.dbPassword=canal 5canal.mq.topic=canal-mysql-sync
第三步:消费者代码
1@Component 2public class CanalEsSyncConsumer { 3 4 @Autowired 5 private RestHighLevelClient esClient; 6 7 @KafkaListener(topics = "canal-mysql-sync") 8 public void syncToEs(String canalMessage) { 9 // 解析 Canal 消息 10 CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(canalMessage); 11 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); 12 13 String tableName = entry.getHeader().getTableName(); 14 CanalEntry.EventType eventType = rowChange.getEventType(); 15 16 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { 17 if (eventType == CanalEntry.EventType.INSERT || 18 eventType == CanalEntry.EventType.UPDATE) { 19 // 新增或更新:写入 ES 20 String json = parseRowDataToJson(rowData.getAfterColumnsList()); 21 IndexRequest request = new IndexRequest(tableName) 22 .id(extractId(rowData.getAfterColumnsList())) 23 .source(json, XContentType.JSON); 24 esClient.index(request, RequestOptions.DEFAULT); 25 26 } else if (eventType == CanalEntry.EventType.DELETE) { 27 // 删除:从 ES 删除 28 DeleteRequest request = new DeleteRequest(tableName) 29 .id(extractId(rowData.getBeforeColumnsList())); 30 esClient.delete(request, RequestOptions.DEFAULT); 31 } 32 } 33 } 34}
这就像你偷偷在管理员的记账本旁边装了个摄像头,不管谁改了账本,你都能看到。好处是:
- 零侵入:业务代码完全不用改,新增一个微服务也不用管 ES 同步的事
- 高实时:binlog 是实时产生的,延迟在毫秒级
- 可靠:binlog 是 MySQL 原生的,不会丢数据
结论:中大型项目首选方案。
方案四:Logstash 定时拉取——"每隔几分钟去翻一遍账本"
场景
你对实时性要求不高,比如用户行为日志分析,T+1 出报表就行。
配置
1input { 2 jdbc { 3 jdbc_driver => "com.mysql.jdbc.Driver" 4 jdbc_url => "jdbc:mysql://localhost:3306/log_db" 5 jdbc_user => "root" 6 jdbc_password => "123456" 7 schedule => "*/5 * * * *" # 每5分钟执行一次 8 statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value" 9 use_column_value => true 10 tracking_column => "update_time" 11 } 12} 13 14output { 15 elasticsearch { 16 hosts => ["http://es-host:9200"] 17 index => "user_logs" 18 document_id => "%{id}" 19 } 20}
这就像你每隔 5 分钟去翻一遍账本,把新记录抄到检索卡片上。好处是零代码改造,坏处是:
- 延迟高:最多延迟 5 分钟
- 数据库压力大:每次都要全表扫描(虽然有
update_time索引优化)
结论:适合离线分析、报表类场景。
方案五:DataX 批量同步——"搬家式迁移"
场景
你要把历史数据从 MySQL 迁移到 ES,比如老系统升级,10 亿条订单数据要搬过去。
配置
1{ 2 "job": { 3 "content": [ 4 { 5 "reader": { 6 "name": "mysqlreader", 7 "parameter": { 8 "username": "root", 9 "password": "123456", 10 "connection": [ 11 { 12 "querySql": ["SELECT * FROM orders WHERE create_time >= '2024-01-01'"], 13 "jdbcUrl": ["jdbc:mysql://localhost:3306/order_db"] 14 } 15 ], 16 "splitPk": "id" 17 } 18 }, 19 "writer": { 20 "name": "elasticsearchwriter", 21 "parameter": { 22 "endpoint": "http://es-host:9200", 23 "index": "orders", 24 "type": "_doc", 25 "batchSize": 1000, 26 "splitter": "," 27 } 28 } 29 } 30 ], 31 "setting": { 32 "speed": { 33 "channel": 5 34 } 35 } 36 } 37}
这就像搬家公司,一次性把整个仓库的东西搬走。适合大数据量迁移,但不适合实时同步
| 方案 | 实时性 | 侵入性 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 同步双写 | 毫秒级 | 高 | 低 | 小项目、简单业务 |
| MQ 异步双写 | 秒级 | 中 | 中 | 中型分布式系统 |
| Canal 监听 Binlog | 毫秒级 | 低 | 中高 | 企业级首选 |
| Logstash 定时拉取 | 分钟级 | 无 | 低 | 离线分析、报表 |
| DataX 批量同步 | 一次性 | 无 | 低 | 历史数据迁移 |
生产环境必须遵守的 4 条铁律
铁律一:ES 写入必须幂等
1// ✅ 正确:指定文档 ID,重复写入自动覆盖 2IndexRequest request = new IndexRequest("orders") 3 .id(order.getId().toString()) // 指定 ID 4 .source(json, XContentType.JSON);
铁律二:必须处理顺序问题
1// 发送消息时,用业务 ID 作为分区键 2kafkaTemplate.send("order-sync-topic", orderId, orderJson); 3// 这样同一个 orderId 的消息永远进入同一个分区,保证顺序
铁律三:必须定期校对数据
1-- 每天凌晨跑一次校对脚本 2SELECT COUNT(*) FROM orders; -- MySQL 数据量 3GET /orders/_count; -- ES 数据量 4-- 如果不一致,触发全量修复
铁律四:ES 故障时要有降级方案
1@KafkaListener(topics = "order-sync-topic") 2public void syncToEs(String orderId) { 3 int retryCount = 0; 4 while (retryCount < 3) { 5 try { 6 // 写入 ES 7 esClient.index(request, RequestOptions.DEFAULT); 8 return; // 成功则退出 9 } catch (Exception e) { 10 retryCount++; 11 if (retryCount == 3) { 12 // 三次失败,写入失败队列,人工介入 13 log.error("ES 同步失败,写入死信队列: {}", orderId, e); 14 deadLetterQueue.add(orderId); 15 } 16 Thread.sleep(1000 * retryCount); // 指数退避 17 } 18 } 19}
一句话总结
- 小项目:同步双写,简单粗暴
- 中型项目:MQ 异步双写,解耦可靠
- 大型项目:Canal 监听 Binlog,零侵入、高实时
- 离线分析:Logstash 定时拉取,零代码改造
- 数据迁移:DataX 批量同步,搬家式处理