一、分布式架构下的延时业务

电商业务中,有下单后xmin时间无操作提醒,ymin后关单等。
违规业务中,有罚单xmin时间后无申诉判决,yday无人处理关单等。

总之:所有的业务中,抽离一个线上核心单元体后,该单元体承载多个业务操作下的不同状态变更,就有可能涉及到使用延时消息。
任意时刻的延时队列,如何接入!

二、猜想+业界方案

分布式架构下,消息中间件一般都会选择kafka 和 rocketmq,(为什么不选rabbit mq ,都说看不懂源码的缘故,掌控感不强),

1、kafka

不直接支持延时消息。那如何实现变种的延时消息?

未尝试,但是问题应该不少,比如方案二的额外服务的稳定性,消息重新投递失败的情况等,只是理论可行
基本思路:消息保存到指定的位置,等到了时间才投入到正式的队列中。

方案 实现方式 优点 缺点
方案一 发送消息中,带消费者实际消费的时间,超过该时间,才进行消费,否则消费到后继续投递到该队列中 不需要引进额外的存储和队列 生产端和消费端都需处理延时逻辑
方案二 kafka中指定延时队列,生产者直接投递到延时队列中,消息体中包含实际队列 见名之意,传入到延时队列中,消费者不需要任何其他逻辑 需要引进额外的队列,还需要引入专门消费延时队列的消费者

流程图

2、ROCKETMQ

延时的等级,分为messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
缺点:只能支持固定时间的延迟。原因:底层实现是基于每个时间段的topic

详细原理参考:(https://blog.csdn.net/Weixiaohuai/article/details/123658301)

3、QMQ

之前有幸使用过去哪儿公司的QMQ消息中间件,(https://github.com/xxxlxy2008/qmq/blob/master/docs/cn/arch.md)

延时消息存储结构
在延时/定时消息里也存在三种log:

message log 和实时消息里的message log类似,收到消息后append到该log就返回给producer
schedule log 按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层hash wheel的第一层,位于磁盘上。schedule log里是包含完整的消息内容的,因为消息内容从message log同步到了schedule log,所以历史message log都可以删除。另外,schedule log是按照延时时间组织的,所以延时时间已过的schedule log文件也可以删除
dispatch log 延时/定时消息投递后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容

qmq中消息的存储方式:

存储信息结构
上图中方框上方的数字,表示该方框在自己log中的偏移,而方框内的数字是该项的内容。
比如message log方框上方的数字:3,6,9几表示这几条消息在message log中的偏移。
而consume log中方框内的数字3,6,9,20正对应着message log的偏移,表示这几个位置上的消息都是subject1的消息,consume log方框上方的1,2,3,4表示这几个方框在consume log中的逻辑偏移。
pull log方框内的内容对应着consume log的逻辑偏移,而pull log方框外的数字表示pull log的逻辑偏移。

在实时Server存储模型中有三种重要的log:
message log 所有subject的消息进入该log,消息的主存储
consume log consume log存储的是message log的索引信息
pull log 每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consume log中的sequence

4、redission

使用redisson的DelayedQueue 可以实现延时队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class DelayQueueService {
private static final Logger log = LoggerFactory.getLogger(DelayQueueService.class);

private RedissonClient redissonClient;

@Value("${spring.application.name}")
private String appId;

public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

/**
* 发送延时消息到队列
*
* @param t 消息载体
* @param delay 延迟时间
* @param timeUnit 时间单位
* @param callBack 消息处理回调类
* @param <T> 泛型
*/
public <T> void send(T t, long delay, TimeUnit timeUnit, Class<? extends AbstractDelayMessageListener<T>> callBack) {
Objects.requireNonNull(callBack, "the callback object must not be null");
String listenerName = callBack.getName();
send(t, delay, timeUnit, listenerName);
}

public <T> void send(T t, long delay, TimeUnit timeUnit, String callBackName) {
Objects.requireNonNull(timeUnit, "the time unit must not be null");
Objects.requireNonNull(callBackName, "the callback name must not be null");
String queueName = appId + Constant.COLON_SEPARATOR + callBackName;
log.info("send message to [{}],delay=[{}],timeUnit=[{}]", queueName, delay, timeUnit);
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class ListenerService implements ApplicationListener<ApplicationStartedEvent> {
private static final Logger log = LoggerFactory.getLogger(ListenerService.class);
private static final Map<String, ExecutorService> EXECUTOR_SERVICE_MAP = new HashMap<>();

/**
* 线程池中线程个数
*/
private static final int THREADS_CNT = 3;

private RedissonClient redissonClient;

@Value("${spring.application.name}")
private String appId;

public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

/**
* 启动线程监听队列
*
* @param queueName 队列名(同监听线程名)
* @param listener 任务回调监听
* @param <T> 泛型
*/
private <T> void initialize(String queueName, AbstractDelayMessageListener listener) {
log.info("initialize listener thread start...");
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
ExecutorService executorService = EXECUTOR_SERVICE_MAP.get(queueName);
for (int i = 0; i < THREADS_CNT; i++) {
executorService.submit(() -> {
boolean isShutdown = false;
while (!isShutdown) {
try {
T t = blockingFairQueue.take();
listener.invoke(t);
} catch (RedissonShutdownException e) {
isShutdown = true;
String name = Thread.currentThread().getName();
log.info("redisson listener thread [{}] is closing", name);
} catch (Throwable e) {
log.error("listener thread occurs error", e);
}
}
});
}
log.info("initialize listener thread end...");
}

private static ExecutorService createExecutorService(String simpleListenerName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("pool-" + simpleListenerName + "-%d").build();
return Executors.newFixedThreadPool(THREADS_CNT, threadFactory);
}

@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
ApplicationContext context = event.getApplicationContext();
Map<String, AbstractDelayMessageListener> map = context.getBeansOfType(AbstractDelayMessageListener.class);
for (Map.Entry<String, AbstractDelayMessageListener> entry : map.entrySet()) {
AbstractDelayMessageListener listener = entry.getValue();

String listenerName = listener.getClass().getName();
//考虑到listenerName过长,不适合用作线程名
String simpleListenerName = listener.getClass().getSimpleName();
//队列名字不能重复,此处必须使用listenerName(全路径),不能使用simpleListenerName
String queueName = appId + Constant.COLON_SEPARATOR + listenerName;
//存储创建的线程池,key为队列名,value为对应的线程池
EXECUTOR_SERVICE_MAP.put(queueName, createExecutorService(simpleListenerName));
initialize(queueName, listener);
Runtime.getRuntime().addShutdownHook(new Thread(
() -> EXECUTOR_SERVICE_MAP.forEach((k, v) -> {
log.info("shutting down ExecutorService [{}]", k);
v.shutdown();
})
));
}
}
}

使用姿势

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class ListenerA extends AbstractDelayMessageListener<DelayMessage> {

@Override
protected void doInvoke(DelayMessage s) {
log.info("ListenerA receive message [{}]", s);
long id = s.getId();
int delay = s.getDelay();
//do your logic
}
}