五张图带你理解 RocketMQ 延时消息机制

五张图带你理解 RocketMQ 延时消息机制

今天来聊一聊 RocketMQ 的延时消息是怎么实现的。

延时消息是指发送到 RocketMQ 后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。

延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。

1.生产者

首先看一个生产者发送延时消息的官方示例代码

public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer(“ExampleProducerGroup”); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown();}

从上面的代码可以看到,跟普通消息不一样的是,消息设置 setDelayTimeLevel 属性值,这里设置为 3,这里最终将 3 这个延时级别复制给了 DELAY 属性。

关于延时级别,可以看下面这个定义:

//MessageStoreConfig类private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

这里延时级别有 18 个,上面的示例代码中延迟级别是 3,消息会延迟 10s 后消费者才能拉取。

2.Broker 处理

2.1 写入消息

Broker 收到消息后,会将消息写入 CommitLog。在写入时,会判断消息 DELAY 属性是否大于 0。代码如下:

//CommitLog 类if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);}

从上面的代码可以看到,CommitLog 写入时并没有直接写入,而是把 Topic 改为 SCHEDULE_TOPIC_XXXX,把 queueId 改为延时级别减 1。因为延时级别有 18 个,所以这里有 18 个队列。如下图:

2.2 调度消息

延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类 ScheduleMessageService。这个类的初始化代码如下:

public void start() { if (started.compareAndSet(false, true)) { this.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl(“ScheduleMessageTimerThread_”)); //省略部分逻辑 for (Map.Entry entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //省略部分逻辑 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } //省略持久化的逻辑 }}

上面的 load() 方法会加载一个 delayLevelTable(ConcurrentHashMap类型),key 保存延时级别(从 1 开始),value 保存延时时间(单位是 ms)。

load() 方法结束后,创建了一个有 18 个核心线程的定时线程池,然后遍历 delayLevelTable,创建 18 个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度。任务调度的代码逻辑如下:

public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { //省略部分逻辑 this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); //事务消息判断省略 boolean deliverSuc; //只保留同步 deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy); if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error(“ScheduleMessageService, messageTimeup execute error, offset = {}”, nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}

这段代码可以参考下面的流程图来进行理解:

上面有一个修正投递时间的函数,这个函数的意义是如果已经过了投递时间,那么立即投递。代码如下:

private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = now; } return result;}

注意:消息从 CommitLog 转发到 ConsumeQueue 时,会判断是否是延时消息(Topic = SCHEDULE_TOPIC_XXXX 并且延时级别大于 0),如果是延时消息,就会修改 tagsCode 值为消息投递的时间戳,而 tagsCode 原值是 tag 的 HashCode。代码如下:

//CommitLog类checkMessageAndReturnSize方法if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);}

如下图:

而 ScheduleMessageService 调度线程将消息从 ConsumeQueue 重新投递到原始队列中时,会把 tagsCode 再次修改为 tag 的 HashCode,代码如下:

//类MessageExtBrokerInner,这个方法被 messageTimeup 方法调用。public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } return tags.hashCode();}

如下图:

2.3 一个问题

如果有一个业务场景,要求延时消息 3 小时才能消费,而 RocketMQ 的延时消息最大延时级别只支持延时 2 小时,怎么处理?

这里提供两个思路供大家参考:

在 Broker 上修改 messageDelayLevel 的默认配置;

客户端缓存 msgId,先设置延时级别是 18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是 17(1h),如果没有缓存则进行消费。

3 总结

经过上面的讲解,延时消息的处理流程如下:

最后,延时消息的延时时间并不精确,这个时间是 Broker 调度线程把消息重新投递到原始的 MessageQueue 的时间,如果发生消息积压或者 RocketMQ 客户端发生流量管控,客户端拉取到消息后进行处理的时间可能会超出预设的延时时间

郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
(0)
用户投稿
上一篇 2022年6月14日
下一篇 2022年6月15日

相关推荐

  • 炎炎夏日,专家版“科学防晒要点”快记好

    来源:【华声在线】 华声在线7月6日讯(全媒体记者 李琪 实习生 廖莹 吴雨霜 孙昕 杨祎莎 通讯员 彭文彬)炎炎夏日,阳光诱惑着人们外出游玩嬉戏,紫外线也如影随形地伤害着皮肤。 …

    2022年7月9日
  • 结果正式宣布,中芯国际赢了

    马云退休后,关于他的消息就不太多了。不过,前段时间有个消息从美传来,马云被起诉了,再次引起大家注意,就是美投资人把马云增列为起诉阿里的诉讼对象。 这件事被认为是打压中企的一系列措施…

    2022年6月21日
  • 普通学校校服都是裤装,贵族学校女生却能穿裙子,背后原因有三

    中国学生最重要的一个标志就是校服,从上小学开始学生就必须每天都要穿校服,而且还会分夏季款、春秋款和冬季款,除了周末几乎没有机会能穿上自己的衣服。 中国的校服一直被学生吐槽,大多数学…

    2022年6月5日
  • 畅游湖北 乐享十一 – 国庆景区福利来了!

    喜迎二十大,欢度国庆节!2022年最后一个小长假——国庆节来临啦!大家的出游计划都做好了么?别急!我省各大景区针对国庆假期,推出了丰富多彩的特色活动和优惠政策,湖北文旅之声整理了第…

    2022年9月24日
  • 真正有本事的人,往往具备的共同特征

    1.专注自己,不操心别人的事。 2.很早就看透人性和关系本质,不对别人抱幻想。 3.自学能力强,不需要别人鼓励和鞭策。 4.不做浪费时间和无意义的事,凡事衡量利弊。 5、永远不要让…

    2022年7月5日
  • 曝苹果下调 iPhone 14、iPhone 14 Max 零部件备货

    目前已经出现了不少关于今年新iPhone系列的相关消息。 按照这些爆料中提到的说法,iPhone 14系列可能会扩大产品间的差异,并为不同机型搭载不同的芯片。现在最新的消息也再次提…

    2022年6月14日
  • 让自己自律上瘾,其实很简单

    今天中午和同事聊天,突然聊到关于自律这个话题。 于是我跟她说 我最近每天5:00在体育场晨跑,都看到一个大概5岁左右的小女孩在跑步。 同事说 这么小的一个小女孩就这么自律,长大了一…

    2022年8月18日
  • 售价10万的“妻子机器人”,除了生孩子什么都能做?别被骗了

    现在各个技术都在日益更新,并且在精益求精,比如最初的工业机器人慢慢地发展到现如今的帅哥美女机器人。 (此处已添加小程序,请到今日头条客户端查看) 不仅从最初的破铜烂铁的堆积,发展到…

    2022年8月17日
  • 比特币等虚拟货币持续暴跌-货币现象到底是什么?

    全球虚拟币抛售潮持续,比特币7天内下跌33%,市场有人担心“币圈的雷曼危机”。因为我自己不做虚拟币这一块投资,所以一般来讲关注度不高,平日里也就和大家一样充当吃瓜群众,比如今天又涨…

    2022年6月20日
  • 紫薯蒸多久能熟透(蒸紫薯冷水下锅好还是热水)

    紫薯富含丰富的花青素,可谓主食中的养颜一霸,平时一年四季都可以吃到,但是要说最为香甜的季节还得是现在的秋季。紫薯大多数都在秋季收获,此时稍微放地上晾上一周,之后拿来蒸着吃味道香甜可…

    2022年4月29日

联系我们

联系邮箱:admin#wlmqw.com
工作时间:周一至周五,10:30-18:30,节假日休息