MySQL+Redis+RabbitMQ解决并发问题
2026-06-01
引言
由于业务不方便展示,此处特定构造了一个新的业务场景。
假设每个用户有一个uid字段和hot字段,游客可以为uid为1的用户点赞,点赞一次数值加1,一个游客还好,那万一有很多很多呢
未进行任何并发处理的源码已经分享
https://github.com/Anyuer9837/ConcurrencyStudy
其中数据库建议云的,家用的笔记本电脑或者台式电脑,一般来说性能都不错,很难压力测试,或者说效果不明显, 所以我留下了可以使用的MySQL服务器进行在src/main/resources/application.properties 里面
同时希望大佬可以手下留情,我这为爱发电,希望大家别搞破坏,和平交流学习!共同进步!
问题分析
其实问题的原因很简单,当有大规模用户同时对MySQL的某一行数据的每一个字段进行写入的时候,会出现异常问题,
解决思路
针对这种高并发单点读写的瓶颈,整体的应对策略是利用分布式锁控制并发、缓存承载高频读写、消息队列异步削峰落库以及异常反向补偿的组合方案。
程序开始先获取十五秒的分布式锁。这样做是为了让并发请求针对该用户串行排队,防止高并发在缓存失效时同时击穿到数据库,也避免了多线程交错计算导致的数据覆盖。
拿到锁后优先读取缓存,有数据则直接使用,无数据则去数据库查询基数并回填。优先读缓存利用了内存的高性能,缓存缺失时去数据库查询并在锁的保护下回填,确保了冷启动时只有一个线程访问数据库,绝对安全。随后程序在内存中将热度加一,并立刻写回缓存,让前端能毫秒级感知最新状态。
缓存更新后,程序将数据打包发送给消息队列,发送成功即对前端返回成功。这种设计不等待慢速的数据库事务提交,利用队列作为蓄水池平滑落库,切断了高并发流量对底层数据库的直接冲击,实现了流量削峰。
如果发送队列失败,程序会立刻捕获异常并执行缓存的原子递减,把刚才多加的数值原路减回去。这种反向补偿机制是为了在分布式系统遭遇局部网络故障时,及时撤销内存变更,保证缓存和数据库的数据最终一致性。
最后无论流程顺利还是抛出异常,程序都会在清理块中判断锁是否仍由当前线程持有,确认无误后再执行释放。这样可以防止当前线程因为自身卡顿导致锁超时后,去错误地释放正在保护其他线程的锁,完成了安全的生命周期闭环。
核心代码
代码已经贴到上方仓库的1.0分支,这里展示的是核心代码
查询逻辑+业务逻辑+发送消息到队列
@Override
public boolean updateHot(UserHot userHot) {
String cacheKey = "user:hot:" + userHot.getUid();
String lockKey = "lock:user-hot:" + userHot.getUid();
RLock lock = redissonClient.getLock(lockKey);
try {
// 1. 尝试获取分布式锁,限时 5 秒
if (lock.tryLock(5, TimeUnit.SECONDS)) {
int newHot;
// 2. 隔离并发下的 Redis 操作
try {
String cachedHot = redisTemplate.opsForValue().get(cacheKey);
int currentHot;
if (cachedHot != null) {
currentHot = Integer.parseInt(cachedHot);
} else {
UserHot dbData = userHotMapper.selectByPrimaryKey(userHot.getUid());
if (dbData == null) return false;
currentHot = dbData.getHot();
redisTemplate.opsForValue().set(cacheKey, String.valueOf(currentHot));
}
// 内存计算并回写 Redis
newHot = currentHot + 1;
redisTemplate.opsForValue().set(cacheKey, String.valueOf(newHot));
} catch (Exception e) {
System.err.println("【❌ Redis故障】操作Redis失败,原因: " + e.getMessage());
return false;
}
// 3. 隔离 MQ 消息发送
try {
String message = userHot.getUid() + "," + newHot;
rabbitTemplate.convertAndSend(
RabbitConfig.HOT_EXCHANGE,
RabbitConfig.HOT_ROUTING_KEY,
message
);
return true;
} catch (Exception e) {
System.err.println("【MQ故障】消息发送到RabbitMQ失败!原因: " + e.getMessage());
// 💡 开始执行回滚逻辑
System.out.println("【触发回滚】正在将 Redis 数据恢复原状...");
try {
// 使用 decrement 原子操作,把刚才加的 1 减回去
redisTemplate.opsForValue().decrement(cacheKey);
System.out.println("【回滚成功】Redis 数据已撤销,保证了数据一致性。");
} catch (Exception rollbackEx) {
// 🚨 极端边缘场景:刚才写 Redis 还好好的,回滚的时候 Redis 突然宕机或断网了!
System.err.println("【致命灾难】MQ发送失败,且 Redis 回滚也失败!出现严重数据不一致!");
System.err.println("急需人工介入!出问题的 UID: " + userHot.getUid() + ",报错: " + rollbackEx.getMessage());
// 真实生产环境中,这里通常会打印 Error 级别日志触发钉钉/邮件报警,
// 或者把这条失败记录写到一个本地磁盘日志文件里,等重启后人工修数据。
}
return false;
}
} else {
System.out.println("【锁竞争激烈】获取分布式锁超时,其他线程正占用锁,本次更新放弃。");
return false;
}
} catch (InterruptedException e) {
System.out.println("【线程中断】等待锁的过程中线程被中断: " + e.getMessage());
Thread.currentThread().interrupt();
return false;
} finally {
// 严格释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}消费者
@RabbitListener(queues = RabbitConfig.HOT_QUEUE)
public void receiveUserHotMessage(String message) {
try {
// 1. 解析消息(把 "1,11" 拆成 uid=1, hot=11)
String[] parts = message.split(",");
Integer uid = Integer.parseInt(parts[0]);
Integer newHot = Integer.parseInt(parts[1]);
UserHot updateData = new UserHot();
updateData.setUid(uid);
updateData.setHot(newHot);
userHotMapper.updateByPrimaryKeySelective(updateData);
// System.out.println("【MQ成功落盘】用户 " + uid + " 的热度已同步为 " + newHot);
} catch (Exception e) {
System.err.println("【MQ消费失败】" + e.getMessage());
// 注意:这里如果抛出异常,RabbitMQ 会自动把这条消息重新放回队列头部,实现自动重试!
throw e;
}
}
发表评论: