消息队列 场景题
第三方上游接口接入做异步处理,是否可以使用 MQ?主要考虑什么?如果不用MQ,还有什么方式?
可以使用 MQ。
MQ 的核心作用是把调用方到第三方接口的直连模式,变成调用方→MQ→消费者→第三方接的异步模式,这样可以解决调用方因等待上游接口响应而超时、阻塞的问题
如果要用 MQ,我主要会考虑以下两点:
- 消息可靠性:第三方接口可能不稳定(超时、失败),因此需确保消息不丢失,要考虑MQ的持久化、消息确认机制(生产者确认、消费者 ACK)、失败重试策略((如重试N次后转死信队列)。
- 消息顺序性:如果业务要求先发送的消息先处理(比如订单状态更新),那么需确保 MQ的分区/队列内消息有序,比如涌过业务ID 哈希到固定队列等。
如果不用 MQ,也可以通过以下几个方式来实现异步调用:
- 本地内存队列:用 LinedBlockingQueue 或 Disruptor 等内存以列暂存请求,后台线程异步消费调用上游。因为数据存内存,服务重启或宕机时消息丢失,所以适合轻量级、数据允许少量丢失(如日志上报)的场景
- 定时任务轮询:将请求记录到DB或本地文件,通过定时任务(如XXL-J0B)批量拉取并调用上游。适合对实时性要求低(比如每小时同步一次数据)的场景。这个方式不会丢请求。
线上消息队列故障,兜底改造方案
线上部分接口频繁报错,经排查是因为消息发送超时导致的,我们用的是阿里云的 RocketMQ,去询问客服发现是因为云厂商升级故障使得消息发送频繁超时。
当时没有办法兜底方案处理消息队列故障的场景。
涉及的业务简单概括:需要在一个service 方法里执行一些业务,修改了一些数据落库,然后再发送一条 MQ 消息,触发下一个流程
因此需要保证
- 当前 service 修改的数据事务提交成功,消息一定被发出去
- 事务提交失败,消息不能发出去。
将同步发送消息的锣辑写在事务内部,就能保证发送失败,事务不会提交。但如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能撤回吗?
因此要解决的第一个问题其实是:
当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。那如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?
万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。
这归根结底是分布式事务问题,是数据库操作跟 MQ 消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。而本地消息表则非常简单,接下来我们开始操作!
首先我们需要建立一张本地消息表(当前这个设计主要是为了MO消息的事务场量):
CREATE TABLE `message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除',
`topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic',
`tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag',
`msg_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id',
`msg_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key',
`data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串',
`try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送',
`next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间'
PRIMARY KEY (`id`),
KEY `idx_key` (`msg_key`),
KEY `idx_nexttime_status` (`next_time`,`status`),
KEY `idx_msgid` (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。
(大家需要详细看下代码逻辑)
@Service
public class MessageService implements IMessageService{
@Resource
private Producer producer;
@Resource
private MessageMapper messageMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void send(String topic, String tag, String key, Object obj) {
sendDelay(topic, tag, key, obj, 0L);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void sendDelay(String topic, String tag, String key, Object obj, Long period) {
//计算时间,防止定时任务扫描将还在正常流程中的消息进行重试
int time = (period == 0L ? 10 : period.intValue() / 1000);
Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time);
String data = JSON.toJSONString(obj);
Message message = new Message()
.setStatusDelete(0)
.setTopic(topic)
.setTag(tag)
.setMsgId("")
.setMsgKey(key)
.setData(data)
.setTryNum(0)
.setStatus(0)
.setNextTime(nextTime);
// 保存本地消息记录
messageMapper.save(message);
// 当前事务提交后,再执行发送消息和更改本地消息记录状态
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
String messageId;
try {
if (period == 0L) {
messageId = producer.send(topic, tag, key, data);
} else {
messageId = producer.sendDelay(topic, tag, key, data, period);
}
Message update = new Message()
.setId(message.getId())
.setMsgId(messageId)
.setStatus(1);
messageMapper.updateById(update);
} catch (Exception e) {
log.error("..");
}
}
}
);
}
}定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多整述.
最终的使用就非常简单了:
@Transactional(rollbackFor = Exception.class)
public void doSth(xx) {
saveA();
saveB();
messageService.send(xxx);
}我们来分析一下:
- 假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常
- 假设数据库操作成动、但是数据库事务提交后,服务宕机了、那么消息没发出去。此时 saveA和 saveB 都保存成功、那么 message 首定也插入了(它们在同一个事务中),message的 status是0,那么我们有个定时任务。根间nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送.
- 假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。
假设依赖的消息队列中间件又产生频繁超时的故障,那么也不会影响业务的正常运行,数据都会正常落库,事务正常提交,部分发送超时的消息,由后续补偿任务自动补偿重试.。可以想象,如果没有这个机制可能会发送两种情况:
- 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MO底层升级导致的问题),妥妥PO故障
- 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。
简单补充
一般 service 事务相关方法都用 @Transactional 修饰,messageService.send也被 @Transactional, 默认事务传播级别是 PROPAGAION_REQUIRED,继承外部事务,因此它们处于同一个事务
然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类
它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。
总结一下,这其实就是分布式事务本地消息表的实现方案
场景:修改完数据库后,要发送一条 MQ 消息触发下一个流程。
可是消息队列宕机了,消息发送失败报异常。
- 如果“发送消息操作”在事务内,会导致事务一直回滚,一直报错,直接 P0 事故(反复地“裤子脱一半然后拉上去”)
- 如果在事务外,则只有数据库改成功,消息发送失败,后面的流程执行失败,并且消息丢失了,后面如果要补数据的话人麻了(裤子才脱一半就结束了)
原因:消息队列宕机
改造需求:
- MQ 消息发送失败,不影响流程的正常进行,事务不回滚,已经完成的操作保留,系统不会频繁报错
- MQ 消息发送失败后能够进行重试,而不是直接丢失掉。
改造步骤:
- 新增一个“本地消息表”,主要目的是为了记录发送失败的消息,消息重试所需的数据都要保存。关键字段
- 消息的 id
- topic
- key
- 消息数据
- tag
- 消息状态:未发送、已发送
- 下次发送重试时间 next tme
- 新增一个 messageService 来处理消息发送,Transactional(Rollbackfor=Exception.class),默认事务传播级别为PROPAGAION_REQUIRED,会线承外部事务,会与外部事务处于同一事务
- Messageservice 的发送 send()方法:
- 先保存消息到数据库中,设置消息状态为 未发送,并且将下次发送重试时间设置为10秒后(发个消息不用这么久吧,10秒都没发说明出问题了,不能设置太短,否则可能会对正常发送的消息进行重试)
- 利用 TransactionSynchronizationManager 的 aftercommit方法,我们控制在“事务提交后”再执行发送消息,如果消息发送失败了,则不会回滚,因为事务已经提交了,不会影响已完成的流程,并且我们可以不做任何操作,等待消息发送的重试就行。如果消息发送成功了就修改数据库中的消息状态为 已发送
- 事务执行成功后,则能够保证消息已经存储到数据库中了,流程还在正常进行中!
- 设置定时任务,对本地消息表中,状态为 未发送 的,并且已经到了"下次发送重试时间”的消息进行重新发送,
这里要注意的是,消费端也要做好幂等,方式重复消息导致的问题
每秒 200 笔订单请求到支付服务,支付服务需调用第三方支付,限流 100 笔每秒。请你设计支付服务,要求分布式的,并且是 FIFO,不超过第三方限流量,但是要用满第三方100 笔每秒的量,请问如何设计,需要考虑哪些方面?
这个问题的核心在于生产者的速率(200)大于消费者(100),这种场景就非常适合用消息队列来实现。
订单服务将每秒 200 笔下单请求发到消息队列,保证 FIFO顺序。然后支付服务就是消费者,可以部署多个节点,都订阅这个 Topic,消费逻辑就是消费订单的消息调用第三方支付。
而调用第三方支付需要共用一个限流器来保证调用第三方的速率为 100 笔每秒。比如可以用 Redisson 的 Ratelimiter 来实现限流.
这样,系统能平稳缓冲中积压请求(消息队列可以削峰填谷),并充分利用第三方 100 笔每秒的容量。并且也是分布式的,FIFO 的。
这里要注意 FIF0 是无法全局有序的,如果要全局有序,那么只能保证只有一个消费者,这样效率就很低了。企业级实的 FIFO 基本上是局部有序,比如使用订单ID作为健(key)分区,同一个分区内的消费是有序的。
面试官可能会继续追问:
- 为什么要用消息队列,消息队列有什么用?
- 限流具体是用的什么限流算法?分别有什么区别?
- 你刚说的保证消息的 FIFO 具体是如何实现的?
- 用这个消息队列如何保证订单支付消息不丢失?
- 如果调用第三方失败会如何? 重试机制
- 如果调用第三方支付一直失败怎么办? 死信消息
使用消息队列实现数据同步时,发现订单状态更新的顺序错乱了,如何解决?
出现订单状态更新顺序错乱,是因为同一订单的多条消息被不同消费者并发处理,导致顺序丢失。
解决核心是:让同一订单的消息按顺序处理。
这样就能转换成另一个问题:消息队列如何保证消息的有序性(顺序性)主要处理方式为:
- 按订单 ID 分区(Hash 分区):将同一订单的消息发送到同一个分区(Kafka/RocketMQ 都支持)。每个分区内部是天然有序的,从而保证同一订单的消息按顺序被消费。
- 单线程顺序消费:消费端单线程消费或使用队列串行执行,避免并发执行更新逻辑
- 幂等控制 + 最终一致性:可以加一个兜底策略。即便乱序,也能根据消息中的 update time 或 version 字段判断新旧,只应用最新状态,确保最终数据正确
数据同步任务执行到一半失败了,如何保证数据的一致性?
数据同步任务执行到一半失败,要保证一致性,核心是:不让中间状态数据留在系统里
关键点是:回滚已同步的部分、补偿未同步的部分、校验最终一致。
具体步骤如下:
先定位失败点同步任务通常有进度标记(比如同步到第N条数据、某个时间点的Binlog位置),先找到失败时已处理的数据范围(比如已同步1000条,第1001条失败)。区分“已同步”和“未同步”,明确哪些数据已写入目标库(可能部分脏数据),哪些还在源库未处理
让数据回到一致状态
- 强一致性场景:需要回读已同步的脏数据,如果同步的是关键交易数据(I订单支付状态),必须回滚,比如用反向操作,删除已同步的订单状态更新,或调用目标库的逆向接口,比如订单系统提供cancel_sync万法,撤销之前的更新。注意:这要求同步操作本身支持逆向,否则得靠“补偿事务”。
- 最终一致性场景:则可以忽略脏数据,直接重试未同步的数据。
如果是“非关键数据”(如用户地址变更),可以标记失败位置,跳过已失败的部分,从下一个未同步的位置重试,比如Kafa消费者记录 offset,失败后从 offset=1000 重新拉取消息。
- 补全与校验,确保最终一致性
- 补偿机制:对未同步的数据,启动补同步任务(比如凌晨跑批量任务,重新同步失败的那批数据)
- 数据校验:同步完成后,对比源库和目标库的记录数、关键字段(如订单金额总和),用MD5或CRC校验一致性
- 人工兜底:如果校验发现仍有缺失,手动补录或触发紧急同步。
消息队列设计成推消息还是拉消息?推拉模式的优缺点?
推模式(Push):消息队列将消息主动推送给消费者,适合实时性要求高、消费者能够及时处理消息的场景。
- 优点:实时性好,消息可立即送达消费者。
- 缺点:难以控制消费速度,容易导致消费者过载,尤其是在高并发时。
拉模式(Pull):消费者主动从消息队列中拉取消息,适合消费能力有限、需要根据自身处理能力调控速率的场景。
- 优点:消费者可以根据自身负载决定拉取频率,避免过载;更适合批量处理
- 缺点:可能会导致消息延迟,实时性不如推模式,尤其是拉取频率较低时。
推拉模式
首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互
默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。
想象一下,如果需要 Broker 去拉取消息,那么 produrcer 就必须在本地通过日志的形式保存消息来等待 Broker的拉取,如果有很多生产者的话,那么消息的可靠性不仅仅靠 Broker 自身,还需要靠成百上千的 producer
Broker 还能靠多副本等机制来保证消息的存储可靠,而成百上千的 Producer 可靠性就有点难办了,所以默认的 Producer 都是推消息给 Broker,所以说有些情况分布式好,而有些时候还是集中管理好。
推模式
推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送
推模式有什么好处:
- 消息实时性高,Broker 接受完消息之后可以立马推送给Consumer。
- 对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来
推模式有什么缺点?
推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者住 Broker发送消息的读率大于消费者消费消息的读率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊,当推送速率过快就像 DDos 攻击一样消费者就傻了。
滨区给不同的消费者的消费速录还不一样,身为Broker 很难平衡偶每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 推Broker,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的张态,进行推送速率的变更。这其实就增加了 Broker 自身的复杂度。
所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。
拉模式
拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给Consumer
我们来想一下拉模式有什么好处?
拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。
拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就准送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。
拉模式有什么缺点?
消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker了。因此需要降低清求的频率,比如隔个2 秒清求一次,你看着消息就很有可能延迟2秒了。
消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。
到底是推还是拉?可以看到推模式和拉模式各有优缺点,到底该如何选择呢?
RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。
我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。
而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。
虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。
那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一波,减轻了拉模式的缺点。
即
长轮询
RocketMQ 和Kafka 都是利用“长轮询”来实现拉模式。所置的“长轮询”具体的做法都是通过消费者去 Broker 拉取消息时,当有消息的情况下 Broker 会直接返回消息,如果没有消息都会采取延识处理的策略,即保持连接,暂时 hold 住请求,然后在对应队列或者分区有新消息到来的时候都会提醒消息来了,通过之前 hold 住的请求及时返回消息,保证消息的及时性。
一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住请求,避免了多次频繁的拉取动作,当消息一到就返回消息。
- 扩展阅读:RocketMQ 中的长轮询
- 扩展阅读:[]
两百万个生产者发送消息,仅一个消费者,如何高效设计锁?
- 使用无锁数据结构:可以使用无锁的并发队列,如lava的 ConcurrentLinkedQueue,它采用了非阻塞算法,避免了加锁的开销。这样,多个生产者可以并发地将消息添加进队列中,而消费者可以从队列中安全地读取消息。
- 批量操作:生产者可以批量发送消息,减少锁竞争的频率。例如,一个生产者可以先在本地缓存一批消息,然后一次性将这些消息推送到队列中。这减少了多次加锁解锁的开销。
- 分发锁:可以使用细粒度的锁,对以列的不同部分进行分段锁定,避免整个队列被独占锁定。例如,使用分级锁,通过将数据分成多个段(例如让消息有明确的分区属性,可以对消息进行分区处理),生产者可以同时操作不同段,从面减少锁竞争。
ConcurrentLinkedQueue 原理
ConcurentlinkedQueue 是Java 中的一种无锁并发队列,基于 Michael-Scott队列算法(MS Queue),专为高并发环境设计,能实现线程安全的无阻塞操作
它通过 CAS(Compare-And-Swap)操作实现线程安全,而不是依赖传统的锁机制,避免了锁的开销,降低了线程争用带来的性能损耗。
入队操作:
- 创建新节点:首先为要插入的元素创建一个新的节点。
- 定位尾节点:通过尾指针找到当前的尾节点。
- CAS操作更新尾节点:通过CAS操作将新节点链接到当前尾节点的next引|用上。然后更新尾指针,使其指向新的尾节点。
如果多个线程同时尝试更新尾节点,只有一个线程的CAS操作会成功,其他线程会重试直到成功。
出队操作:
- 定位头节点:通过头指针找到当前的头节点。
- 获取下一节点:检查头节点的next引用,找到下一个节点。
- CAS操作更新头节点:通过CAS操作将头指针更新为下一个节点,旧的头节点被垃圾回收机制回收。
如果队列为空,即头指针和尾指针相同且头节点的next引用为空,返回null表示队列为空。
