线上部分接口频繁报错,经排查是因为消息发送超时导致的,我们用的是阿里云的 RocketMQ,去询问客服发现是因为云厂商升级故障使得消息发送频繁超时。
当时没有办法兜底方案处理消息队列故障的场景。
涉及的业务简单概括:需要在一个service 方法里执行一些业务,修改了一些数据落库,然后再发送一条 MQ 消息,触发下一个流程
因此需要保证
- 当前 service 修改的数据事务提交成功,消息一定被发出去
- 事务提交失败,消息不能发出去。
将同步发送消息的锣辑写在事务内部,就能保证发送失败,事务不会提交。但如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能撤回吗?
因此要解决的第一个问题其实是:
当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。那如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?
万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。
这归根结底是分布式事务问题,是数据库操作跟 MQ 消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。而本地消息表则非常简单,接下来我们开始操作!
首先我们需要建立一张本地消息表(当前这个设计主要是为了MO消息的事务场量):
CREATE TABLE `message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除',
`topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic',
`tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag',
`msg_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id',
`msg_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key',
`data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串',
`try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送',
`next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间'
PRIMARY KEY (`id`),
KEY `idx_key` (`msg_key`),
KEY `idx_nexttime_status` (`next_time`,`status`),
KEY `idx_msgid` (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';
然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。
(大家需要详细看下代码逻辑)
@Service
public class MessageService implements IMessageService{
@Resource
private Producer producer;
@Resource
private MessageMapper messageMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void send(String topic, String tag, String key, Object obj) {
sendDelay(topic, tag, key, obj, 0L);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void sendDelay(String topic, String tag, String key, Object obj, Long period) {
//计算时间,防止定时任务扫描将还在正常流程中的消息进行重试
int time = (period == 0L ? 10 : period.intValue() / 1000);
Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time);
String data = JSON.toJSONString(obj);
Message message = new Message()
.setStatusDelete(0)
.setTopic(topic)
.setTag(tag)
.setMsgId("")
.setMsgKey(key)
.setData(data)
.setTryNum(0)
.setStatus(0)
.setNextTime(nextTime);
// 保存本地消息记录
messageMapper.save(message);
// 当前事务提交后,再执行发送消息和更改本地消息记录状态
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
String messageId;
try {
if (period == 0L) {
messageId = producer.send(topic, tag, key, data);
} else {
messageId = producer.sendDelay(topic, tag, key, data, period);
}
Message update = new Message()
.setId(message.getId())
.setMsgId(messageId)
.setStatus(1);
messageMapper.updateById(update);
} catch (Exception e) {
log.error("..");
}
}
}
);
}
}
定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多整述.
最终的使用就非常简单了:
@Transactional(rollbackFor = Exception.class)
public void doSth(xx) {
saveA();
saveB();
messageService.send(xxx);
}
我们来分析一下:
- 假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常
- 假设数据库操作成动、但是数据库事务提交后,服务宕机了、那么消息没发出去。此时 saveA和 saveB 都保存成功、那么 message 首定也插入了(它们在同一个事务中),message的 status是0,那么我们有个定时任务。根间nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送.
- 假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。
假设依赖的消息队列中间件又产生频繁超时的故障,那么也不会影响业务的正常运行,数据都会正常落库,事务正常提交,部分发送超时的消息,由后续补偿任务自动补偿重试.。可以想象,如果没有这个机制可能会发送两种情况:
- 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MO底层升级导致的问题),妥妥PO故障
- 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。
简单补充
一般 service 事务相关方法都用 @Transactional 修饰,messageService.send也被 @Transactional, 默认事务传播级别是 PROPAGAION_REQUIRED,继承外部事务,因此它们处于同一个事务
然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类
它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。
总结一下,这其实就是分布式事务本地消息表的实现方案
场景:修改完数据库后,要发送一条 MQ 消息触发下一个流程。
可是消息队列宕机了,消息发送失败报异常。
- 如果“发送消息操作”在事务内,会导致事务一直回滚,一直报错,直接 P0 事故(反复地“裤子脱一半然后拉上去”)
- 如果在事务外,则只有数据库改成功,消息发送失败,后面的流程执行失败,并且消息丢失了,后面如果要补数据的话人麻了(裤子才脱一半就结束了)
原因:消息队列宕机
改造需求:
- MQ 消息发送失败,不影响流程的正常进行,事务不回滚,已经完成的操作保留,系统不会频繁报错
- MQ 消息发送失败后能够进行重试,而不是直接丢失掉。
改造步骤:
- 新增一个“本地消息表”,主要目的是为了记录发送失败的消息,消息重试所需的数据都要保存。关键字段
- 消息的 id
- topic
- key
- 消息数据
- tag
- 消息状态:未发送、已发送
- 下次发送重试时间 next tme
- 新增一个 messageService 来处理消息发送,Transactional(Rollbackfor=Exception.class),默认事务传播级别为PROPAGAION_REQUIRED,会线承外部事务,会与外部事务处于同一事务
- Messageservice 的发送 send()方法:
- 先保存消息到数据库中,设置消息状态为 未发送,并且将下次发送重试时间设置为10秒后(发个消息不用这么久吧,10秒都没发说明出问题了,不能设置太短,否则可能会对正常发送的消息进行重试)
- 利用 TransactionSynchronizationManager 的 aftercommit方法,我们控制在“事务提交后”再执行发送消息,如果消息发送失败了,则不会回滚,因为事务已经提交了,不会影响已完成的流程,并且我们可以不做任何操作,等待消息发送的重试就行。如果消息发送成功了就修改数据库中的消息状态为 已发送
- 事务执行成功后,则能够保证消息已经存储到数据库中了,流程还在正常进行中!
- 设置定时任务,对本地消息表中,状态为 未发送 的,并且已经到了"下次发送重试时间”的消息进行重新发送,
这里要注意的是,消费端也要做好幂等,方式重复消息导致的问题