延迟队列实现

1.redission 延迟消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  // 放消息
RQueue<TipMessageDTO> queue = redissonClient.getQueue("延迟消息队列常量");
RDelayedQueue<TipMessageDTO> delayedQueue =redissonClient.getDelayedQueue(queue);
delayedQueue.offer(tipMessageDTO, delayMill, TimeUnit.MILLISECONDS);

// 取消息
RBlockingQueue<Object> blockingFairQueue =redissonClient.getBlockingQueue(HEART_LOOP_OPT_DELAYED_QUEUE);
// 应用启动时往队列里面放一个空值【如果不放数据,重启应用可能导致队列已有的数据消费不及时】
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(null, 1, TimeUnit.SECONDS);
while (true) {
try {
Object obj = blockingFairQueue.take();
if (Objects.nonNull(obj)) {
log.info("事件重试:{}", obj);
eventPublisher.publishEvent(obj);
}
} catch (Exception e) {
log.warn("处理延迟队列失败", e);
}
}