一个欲儿的博客

一个欲儿的博客

MySQL+Redis+RabbitMQ解决并发问题
2026-06-01

引言

由于业务不方便展示,此处特定构造了一个新的业务场景。

假设每个用户有一个uid字段和hot字段,游客可以为uid为1的用户点赞,点赞一次数值加1,一个游客还好,那万一有很多很多呢

未进行任何并发处理的源码已经分享

https://github.com/Anyuer9837/ConcurrencyStudy

其中数据库建议云的,家用的笔记本电脑或者台式电脑,一般来说性能都不错,很难压力测试,或者说效果不明显, 所以我留下了可以使用的MySQL服务器进行在src/main/resources/application.properties 里面

同时希望大佬可以手下留情,我这为爱发电,希望大家别搞破坏,和平交流学习!共同进步!

问题分析

其实问题的原因很简单,当有大规模用户同时对MySQL的某一行数据的每一个字段进行写入的时候,会出现异常问题,

解决思路

针对这种高并发单点读写的瓶颈,整体的应对策略是利用分布式锁控制并发、缓存承载高频读写、消息队列异步削峰落库以及异常反向补偿的组合方案。

程序开始先获取十五秒的分布式锁。这样做是为了让并发请求针对该用户串行排队,防止高并发在缓存失效时同时击穿到数据库,也避免了多线程交错计算导致的数据覆盖。

拿到锁后优先读取缓存,有数据则直接使用,无数据则去数据库查询基数并回填。优先读缓存利用了内存的高性能,缓存缺失时去数据库查询并在锁的保护下回填,确保了冷启动时只有一个线程访问数据库,绝对安全。随后程序在内存中将热度加一,并立刻写回缓存,让前端能毫秒级感知最新状态。

缓存更新后,程序将数据打包发送给消息队列,发送成功即对前端返回成功。这种设计不等待慢速的数据库事务提交,利用队列作为蓄水池平滑落库,切断了高并发流量对底层数据库的直接冲击,实现了流量削峰。

如果发送队列失败,程序会立刻捕获异常并执行缓存的原子递减,把刚才多加的数值原路减回去。这种反向补偿机制是为了在分布式系统遭遇局部网络故障时,及时撤销内存变更,保证缓存和数据库的数据最终一致性。

最后无论流程顺利还是抛出异常,程序都会在清理块中判断锁是否仍由当前线程持有,确认无误后再执行释放。这样可以防止当前线程因为自身卡顿导致锁超时后,去错误地释放正在保护其他线程的锁,完成了安全的生命周期闭环。

核心代码

代码已经贴到上方仓库的1.0分支,这里展示的是核心代码

查询逻辑+业务逻辑+发送消息到队列

    @Override
    public boolean updateHot(UserHot userHot) {
        String cacheKey = "user:hot:" + userHot.getUid();
        String lockKey = "lock:user-hot:" + userHot.getUid();
        RLock lock = redissonClient.getLock(lockKey);

        try {
            // 1. 尝试获取分布式锁,限时 5 秒
            if (lock.tryLock(5, TimeUnit.SECONDS)) {
                int newHot;
                // 2. 隔离并发下的 Redis 操作
                try {
                    String cachedHot = redisTemplate.opsForValue().get(cacheKey);
                    int currentHot;
                    if (cachedHot != null) {
                        currentHot = Integer.parseInt(cachedHot);
                    } else {
                        UserHot dbData = userHotMapper.selectByPrimaryKey(userHot.getUid());
                        if (dbData == null) return false;
                        currentHot = dbData.getHot();
                        redisTemplate.opsForValue().set(cacheKey, String.valueOf(currentHot));
                    }
                    // 内存计算并回写 Redis
                    newHot = currentHot + 1;
                    redisTemplate.opsForValue().set(cacheKey, String.valueOf(newHot));
                } catch (Exception e) {
                    System.err.println("【❌ Redis故障】操作Redis失败,原因: " + e.getMessage());
                    return false;
                }
                // 3. 隔离 MQ 消息发送
                try {
                    String message = userHot.getUid() + "," + newHot;
                    rabbitTemplate.convertAndSend(
                            RabbitConfig.HOT_EXCHANGE,
                            RabbitConfig.HOT_ROUTING_KEY,
                            message
                    );
                    return true;
                } catch (Exception e) {
                    System.err.println("【MQ故障】消息发送到RabbitMQ失败!原因: " + e.getMessage());

                    // 💡 开始执行回滚逻辑
                    System.out.println("【触发回滚】正在将 Redis 数据恢复原状...");
                    try {
                        // 使用 decrement 原子操作,把刚才加的 1 减回去
                        redisTemplate.opsForValue().decrement(cacheKey);
                        System.out.println("【回滚成功】Redis 数据已撤销,保证了数据一致性。");
                    } catch (Exception rollbackEx) {
                        // 🚨 极端边缘场景:刚才写 Redis 还好好的,回滚的时候 Redis 突然宕机或断网了!
                        System.err.println("【致命灾难】MQ发送失败,且 Redis 回滚也失败!出现严重数据不一致!");
                        System.err.println("急需人工介入!出问题的 UID: " + userHot.getUid() + ",报错: " + rollbackEx.getMessage());
                        // 真实生产环境中,这里通常会打印 Error 级别日志触发钉钉/邮件报警,
                        // 或者把这条失败记录写到一个本地磁盘日志文件里,等重启后人工修数据。
                    }
                    return false;
                }
            } else {
                System.out.println("【锁竞争激烈】获取分布式锁超时,其他线程正占用锁,本次更新放弃。");
                return false;
            }
        } catch (InterruptedException e) {
            System.out.println("【线程中断】等待锁的过程中线程被中断: " + e.getMessage());
            Thread.currentThread().interrupt();
            return false;
        } finally {
            // 严格释放锁
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

消费者

    @RabbitListener(queues = RabbitConfig.HOT_QUEUE)
    public void receiveUserHotMessage(String message) {
        try {
            // 1. 解析消息(把 "1,11" 拆成 uid=1, hot=11)
            String[] parts = message.split(",");
            Integer uid = Integer.parseInt(parts[0]);
            Integer newHot = Integer.parseInt(parts[1]);
            UserHot updateData = new UserHot();
            updateData.setUid(uid);
            updateData.setHot(newHot);
            userHotMapper.updateByPrimaryKeySelective(updateData);

            // System.out.println("【MQ成功落盘】用户 " + uid + " 的热度已同步为 " + newHot);
        } catch (Exception e) {
            System.err.println("【MQ消费失败】" + e.getMessage());
            // 注意:这里如果抛出异常,RabbitMQ 会自动把这条消息重新放回队列头部,实现自动重试!
            throw e;
        }
    }


发表评论: