1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| RQueue<TipMessageDTO> queue = redissonClient.getQueue("延迟消息队列常量"); RDelayedQueue<TipMessageDTO> delayedQueue =redissonClient.getDelayedQueue(queue); delayedQueue.offer(tipMessageDTO, delayMill, TimeUnit.MILLISECONDS);
RBlockingQueue<Object> blockingFairQueue =redissonClient.getBlockingQueue(HEART_LOOP_OPT_DELAYED_QUEUE); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(null, 1, TimeUnit.SECONDS); while (true) { try { Object obj = blockingFairQueue.take(); if (Objects.nonNull(obj)) { log.info("事件重试:{}", obj); eventPublisher.publishEvent(obj); } } catch (Exception e) { log.warn("处理延迟队列失败", e); } }
|