一、分布式架构下的延时业务 电商业务中,有下单后xmin时间无操作提醒,ymin后关单等。 违规业务中,有罚单xmin时间后无申诉判决,yday无人处理关单等。
总之:所有的业务中,抽离一个线上核心单元体后,该单元体承载多个业务操作下的不同状态变更,就有可能涉及到使用延时消息。任意时刻的延时队列,如何接入!
二、猜想+业界方案 分布式架构下,消息中间件一般都会选择kafka 和 rocketmq,(为什么不选rabbit mq ,都说看不懂源码的缘故,掌控感不强),
1、kafka 不直接支持延时消息。那如何实现变种的延时消息?
未尝试,但是问题应该不少,比如方案二的额外服务的稳定性,消息重新投递失败的情况等,只是理论可行 基本思路:消息保存到指定的位置,等到了时间才投入到正式的队列中。
方案
实现方式
优点
缺点
方案一
发送消息中,带消费者实际消费的时间,超过该时间,才进行消费,否则消费到后继续投递到该队列中
不需要引进额外的存储和队列
生产端和消费端都需处理延时逻辑
方案二
kafka中指定延时队列,生产者直接投递到延时队列中,消息体中包含实际队列
见名之意,传入到延时队列中,消费者不需要任何其他逻辑
需要引进额外的队列,还需要引入专门消费延时队列的消费者
2、ROCKETMQ 延时的等级,分为messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 缺点:只能支持固定时间的延迟。原因:底层实现是基于每个时间段的topic
详细原理参考:(https://blog.csdn.net/Weixiaohuai/article/details/123658301 )
3、QMQ 之前有幸使用过去哪儿公司的QMQ消息中间件,(https://github.com/xxxlxy2008/qmq/blob/master/docs/cn/arch.md )
在延时/定时消息里也存在三种log:
message log 和实时消息里的message log类似,收到消息后append到该log就返回给producer schedule log 按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层hash wheel的第一层,位于磁盘上。schedule log里是包含完整的消息内容的,因为消息内容从message log同步到了schedule log,所以历史message log都可以删除。另外,schedule log是按照延时时间组织的,所以延时时间已过的schedule log文件也可以删除 dispatch log 延时/定时消息投递后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容
qmq中消息的存储方式:
上图中方框上方的数字,表示该方框在自己log中的偏移,而方框内的数字是该项的内容。 比如message log方框上方的数字:3,6,9几表示这几条消息在message log中的偏移。 而consume log中方框内的数字3,6,9,20正对应着message log的偏移,表示这几个位置上的消息都是subject1的消息,consume log方框上方的1,2,3,4表示这几个方框在consume log中的逻辑偏移。 pull log方框内的内容对应着consume log的逻辑偏移,而pull log方框外的数字表示pull log的逻辑偏移。
在实时Server存储模型中有三种重要的log: message log 所有subject的消息进入该log,消息的主存储 consume log consume log存储的是message log的索引信息 pull log 每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consume log中的sequence
4、redission 使用redisson的DelayedQueue 可以实现延时队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class DelayQueueService { private static final Logger log = LoggerFactory.getLogger(DelayQueueService.class); private RedissonClient redissonClient; @Value("${spring.application.name}") private String appId; public void setRedissonClient(RedissonClient redissonClient) { this.redissonClient = redissonClient; } /** * 发送延时消息到队列 * * @param t 消息载体 * @param delay 延迟时间 * @param timeUnit 时间单位 * @param callBack 消息处理回调类 * @param <T> 泛型 */ public <T> void send(T t, long delay, TimeUnit timeUnit, Class<? extends AbstractDelayMessageListener<T>> callBack) { Objects.requireNonNull(callBack, "the callback object must not be null"); String listenerName = callBack.getName(); send(t, delay, timeUnit, listenerName); } public <T> void send(T t, long delay, TimeUnit timeUnit, String callBackName) { Objects.requireNonNull(timeUnit, "the time unit must not be null"); Objects.requireNonNull(callBackName, "the callback name must not be null"); String queueName = appId + Constant.COLON_SEPARATOR + callBackName; log.info("send message to [{}],delay=[{}],timeUnit=[{}]", queueName, delay, timeUnit); RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class ListenerService implements ApplicationListener<ApplicationStartedEvent> { private static final Logger log = LoggerFactory.getLogger(ListenerService.class); private static final Map<String, ExecutorService> EXECUTOR_SERVICE_MAP = new HashMap<>(); /** * 线程池中线程个数 */ private static final int THREADS_CNT = 3; private RedissonClient redissonClient; @Value("${spring.application.name}") private String appId; public void setRedissonClient(RedissonClient redissonClient) { this.redissonClient = redissonClient; } /** * 启动线程监听队列 * * @param queueName 队列名(同监听线程名) * @param listener 任务回调监听 * @param <T> 泛型 */ private <T> void initialize(String queueName, AbstractDelayMessageListener listener) { log.info("initialize listener thread start..."); RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); ExecutorService executorService = EXECUTOR_SERVICE_MAP.get(queueName); for (int i = 0; i < THREADS_CNT; i++) { executorService.submit(() -> { boolean isShutdown = false; while (!isShutdown) { try { T t = blockingFairQueue.take(); listener.invoke(t); } catch (RedissonShutdownException e) { isShutdown = true; String name = Thread.currentThread().getName(); log.info("redisson listener thread [{}] is closing", name); } catch (Throwable e) { log.error("listener thread occurs error", e); } } }); } log.info("initialize listener thread end..."); } private static ExecutorService createExecutorService(String simpleListenerName) { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("pool-" + simpleListenerName + "-%d").build(); return Executors.newFixedThreadPool(THREADS_CNT, threadFactory); } @Override public void onApplicationEvent(ApplicationStartedEvent event) { ApplicationContext context = event.getApplicationContext(); Map<String, AbstractDelayMessageListener> map = context.getBeansOfType(AbstractDelayMessageListener.class); for (Map.Entry<String, AbstractDelayMessageListener> entry : map.entrySet()) { AbstractDelayMessageListener listener = entry.getValue(); String listenerName = listener.getClass().getName(); //考虑到listenerName过长,不适合用作线程名 String simpleListenerName = listener.getClass().getSimpleName(); //队列名字不能重复,此处必须使用listenerName(全路径),不能使用simpleListenerName String queueName = appId + Constant.COLON_SEPARATOR + listenerName; //存储创建的线程池,key为队列名,value为对应的线程池 EXECUTOR_SERVICE_MAP.put(queueName, createExecutorService(simpleListenerName)); initialize(queueName, listener); Runtime.getRuntime().addShutdownHook(new Thread( () -> EXECUTOR_SERVICE_MAP.forEach((k, v) -> { log.info("shutting down ExecutorService [{}]", k); v.shutdown(); }) )); } } }
使用姿势
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class ListenerA extends AbstractDelayMessageListener<DelayMessage> { @Override protected void doInvoke(DelayMessage s) { log.info("ListenerA receive message [{}]", s); long id = s.getId(); int delay = s.getDelay(); //do your logic } }