前言

之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。

原理概述

Redisson 将任务按照预定的执行时间存储在 zset 中,任务的执行时间作为 score,任务本身序列化后的数据作为 member。同时,Redisson 会在后台持续监控这个 ZSET,一旦发现有符合执行条件(当前时间 >= score)的任务,就会自动将这些任务转移到另一个 RQueue(Redisson 的队列实现)中,应用程序只需要从 RQueue 中取出任务执行即可。

参考文章

【redis缓存】怎么使用 Redis 实现一个延时队列?_redis实现延时队列-CSDN博客

Redisson 的延迟队列真的能用吗?一文看透原理 + 坑点_redission延时队列原理-CSDN博客

Spring Boot 集成 Redisson 实现消息队列_springboot redis消息队列-CSDN博客

【使用redisson完成延迟队列的功能】使用redisson配合线程池完成异步执行功能,延迟队列和不需要延迟的队列_redisson延时阻塞队列-CSDN博客


引入相关依赖

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.18</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.7.18</version>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.23.3</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.13.5</version>
        </dependency>

配置文件

server:
  port: ${SERVER_PORT:9211}

# Spring
spring:
  application:
    # 应用名称
    name: ruoyi-redis-msg
  redis:
    host: localhost
    port: 6379
    password: 123456
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher

# redission延迟队列相关配置
redission:
  delayqueue:
    enable: true  # 是否启用
    # handler线程池配置
    # 当任务数≤corePoolSize时:所有任务由核心线程直接执行
    # 当corePoolSize<任务数≤blockingQueueNum+corePoolSize时:
    #   前corePoolSize个任务由核心线程执行 剩余任务会触发线程扩容(最多到maximumPoolSize个线程),后面的任务进入队列等待
    # 当任务数>blockingQueueNum+corePoolSize时:将触发拒绝策略(默认是AbortPolicy)
    handlerConfig:
      corePoolSize: 5              # 核心线程数:线程池中保持活跃的最小线程数,即使它们处于空闲状态
      maximumPoolSize: 10          # 最大线程数: 线程池允许创建的最大线程数
      keepAliveTime: 30            # 空闲线程存活时间:当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间
      blockingQueueNum: 1000       # 队列的最大容量为1000个任务当队列已满时,新提交的任务会被阻塞(取决于线程池的拒绝策略)
    deadLine: dead_line            # 死信队列名前缀
    queueThreads: 1                # 监听队列的线程数,不宜设置过大,范围建议1-5
    queueConfigs:
      - queueName: goodsDelayQueue        # 延迟队列名 Redis Key
        beanId: goodsDelayConsumeHandler  # 延迟队列具体业务实现的Bean,可通过Spring的上下文获取
        desc: 商品消费的延迟队列            # 延迟队列名描述
      - queueName: orderDelayQueue
        beanId: orderDelayConsumeHandler
        desc: 订单消费的延迟队列
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "redission.delayqueue")
@Slf4j
@Data
public class RedisDelayQueueConfigProperties {
    private List<QueueConfig> queueConfigs;
    private HandlerConfig handlerConfig;
    private String deadLine;
    private int queueThreads;

    @Data
    public static class QueueConfig {
        private String queueName;
        private String desc;
        private String beanId;
    }

    @Data
    public static class HandlerConfig {
        private int corePoolSize;
        private int maximumPoolSize;
        private long keepAliveTime;
        private int blockingQueueNum;
    }
}

redission配置类

    import lombok.extern.slf4j.Slf4j;
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.codec.JsonJacksonCodec;
    import org.redisson.config.Config;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.annotation.Resource;
    
    @Configuration
    @Slf4j
    public class RedissonConfig {
    
        private final static String REDISSON_PREFIX = "redis://";
        @Resource
        RedisProperties redisProperties;
    
        @Bean
        public RedissonClient redissonClient() {
            Config config = new Config();
            String url = REDISSON_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort();
            config.useSingleServer()
                    .setAddress(url)
                    .setPassword(redisProperties.getPassword())
                    .setDatabase(redisProperties.getDatabase())
                    .setPingConnectionInterval(2000);  //设置2秒心跳间隔
            config.setLockWatchdogTimeout(10000L);     //看门狗超时缩短(分布式锁自动续期更灵敏)显式设置为10秒
            config.setCodec(new JsonJacksonCodec());   //配置redisson序列化方式
            try {
                return Redisson.create(config);
            } catch (Exception e) {
                log.error("RedissonClient init redis url:{}Exception:{}", url, e.getMessage());
                return null;
            }
        }
    }

    延迟队列配置类

      下面的配置类,会为每个队列创建一个新的线程并且循环运行,阻塞监听队列(当队列有元素则take获取,如果没有则阻塞等待)

      import com.google.common.util.concurrent.ThreadFactoryBuilder;
      import com.ruoyi.common.core.utils.StringUtils;
      import com.ruoyi.redismessage.handler.RedisDelayQueueHandler;
      import com.ruoyi.redismessage.porpertise.RedisDelayQueueConfigProperties;
      import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
      import org.springframework.context.ApplicationContext;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import javax.annotation.PostConstruct;
      import javax.annotation.Resource;
      import java.util.List;
      import java.util.concurrent.*;
      
      /**
       * 延迟队列配置类
       * 启动延迟队列监测扫描,文件处理的延迟队列线程池
       */
      @Configuration
      @ConditionalOnProperty(value = "redission.delayqueue.enable")
      @Slf4j
      public class RedisDelayQueueConfig {
      
          @Resource
          RedisDelayQueueConfigProperties configProperties;
          @Resource
          RedisDelayQueueUtil redisDelayQueueUtil;
      
          @PostConstruct
          public void validateConfig() {
              if (configProperties.getHandlerConfig().getCorePoolSize() >
                      configProperties.getHandlerConfig().getMaximumPoolSize()) {
                  throw new IllegalArgumentException("corePoolSize不能大于maximumPoolSize");
              }
              if (configProperties.getHandlerConfig().getBlockingQueueNum()<=0) {
                  throw new IllegalArgumentException("blockingQueueNum不能小于0");
              }
              if (StringUtils.isBlank(configProperties.getDeadLine())){
                  throw new IllegalArgumentException("请配置deadLine");
              }
          }
      
      
          /**
           * @Description 线程池 ——> 用于handle处理消费
           */
          @Bean("handlerExecutor")
          public ExecutorService getHandleExecutor() {
              return new ThreadPoolExecutor(
                      configProperties.getHandlerConfig().getCorePoolSize(),
                      configProperties.getHandlerConfig().getMaximumPoolSize(),
                      configProperties.getHandlerConfig().getKeepAliveTime(),
                      TimeUnit.SECONDS,
                      new LinkedBlockingQueue<>(configProperties.getHandlerConfig().getBlockingQueueNum()),
                      new ThreadFactoryBuilder().setNameFormat("handler-queue-%d").build()
              );
      
          }
      
          @Bean
          public List<RedisDelayQueueConfigProperties.QueueConfig> startRedisDelayQueue(ApplicationContext applicationContext,
                                                                                        @Qualifier("handlerExecutor") ExecutorService handlerExecutor) {
              //根据配置的队列创建对应的线程,
              List<RedisDelayQueueConfigProperties.QueueConfig> queueConfigs = configProperties.getQueueConfigs();
              for (RedisDelayQueueConfigProperties.QueueConfig queueConfig : queueConfigs) {
                  if (configProperties.getQueueThreads()>1){
                      // 1个队列对应N个线程
                      startMultiThreadListen(applicationContext, handlerExecutor, queueConfig,configProperties.getQueueThreads());
                  }else {
                      // 1个队列对应1个线程
                      startSingleThread(applicationContext, handlerExecutor, queueConfig);
                  }
              }
              return queueConfigs;
          }
      
          private void startMultiThreadListen(ApplicationContext applicationContext,
                                              ExecutorService handlerExecutor,
                                              RedisDelayQueueConfigProperties.QueueConfig queueConfig,
                                              int queueThreads) {
              for (int i = 0; i < queueThreads; i++) {
                  log.info("启动监听队列线程 queue={}", queueConfig.getQueueName() + "-" + i);
                  Thread thread = new Thread(() -> listenQueue(applicationContext, handlerExecutor, queueConfig));
                  thread.setName(queueConfig.getQueueName() + "-" + i);
                  thread.start();
              }
          }
      
          private void startSingleThread(ApplicationContext applicationContext,
                                   ExecutorService handlerExecutor,
                                   RedisDelayQueueConfigProperties.QueueConfig queueConfig) {
              log.info("启动监听队列线程 queue={}", queueConfig.getQueueName());
              // 由于此线程需要常驻,可以新建线程,不用交给线程池管理
              Thread thread = new Thread(() -> {
                  listenQueue(applicationContext, handlerExecutor, queueConfig);
              });
              thread.setName(queueConfig.getQueueName());
              thread.start();
          }
      
          private void listenQueue(ApplicationContext applicationContext,
                                   ExecutorService handlerExecutor,
                                   RedisDelayQueueConfigProperties.QueueConfig queueConfig) {
              while (true) {
                  String content ="";
                  try {
                      // 从阻塞队列中获取被执行对象,为空时阻塞,作为参数传递给redisDelayQueueHandler
                      content = redisDelayQueueUtil.getDelayQueue(queueConfig.getQueueName());
                      // 获取到执行类
                      log.info("监听队列成功:queueName={},content={},submit handler={}", queueConfig.getQueueName(),content,queueConfig.getBeanId());
                      RedisDelayQueueHandler redisDelayQueueHandler = applicationContext.getBean(queueConfig.getBeanId(),RedisDelayQueueHandler.class);
                      String finalContent = content;
                      //线程池执行
                      handlerExecutor.submit(() -> redisDelayQueueHandler.exec(queueConfig.getQueueName(),finalContent));
                  } catch (Exception e) {
                      log.error("执行失败:", e);
                      // 将失败的消息放入死信队列
                      redisDelayQueueUtil.sendToDeadLetterQueue(queueConfig.getQueueName(), content);
                  }
              }
          }
      }

      工具类

      import cn.hutool.core.collection.CollUtil;
      import cn.hutool.core.text.CharSequenceUtil;
      import com.ruoyi.common.core.exception.ServiceException;
      import com.ruoyi.common.core.utils.DateUtils;
      import com.ruoyi.redismessage.porpertise.RedisDelayQueueConfigProperties;
      import io.micrometer.core.instrument.util.StringUtils;
      import lombok.extern.slf4j.Slf4j;
      import org.redisson.api.*;
      import org.springframework.stereotype.Component;
      import javax.annotation.Resource;
      import java.util.Date;
      import java.util.List;
      import java.util.concurrent.TimeUnit;
      
      /**
       * @Description: 延迟队列增删工具类
       */
      @Slf4j
      @Component
      public class RedisDelayQueueUtil {
      
          @Resource
          private RedissonClient redissonClient;
          @Resource
          private RedisDelayQueueConfigProperties properties;
      
      
          /**
           * 添加延时队列
           * @param queueName
           * @param content
           * @param delay
           * @return
           */
          public boolean addDelayQueue(String queueName, String content,Long delay) {
              return addDelayQueue(queueName,content,delay,TimeUnit.SECONDS);
          }
      
          /**
           * 添加延时队列
           * @param queueName
           * @param content
           * @param endTime
           * @return
           */
          public  boolean addDelayQueue(String queueName, String content, Date endTime) {
              long seconds = DateUtils.diffTime(DateUtils.getNowDate(), endTime);
              if (seconds <= 0) {
                  log.error("不能小于当前时间");
                  throw new RuntimeException("不能小于当前时间");
              }
              return addDelayQueue(queueName,content, seconds, TimeUnit.SECONDS);
          }
      
          /**
           * @Description 添加延迟队列
           */
          private boolean addDelayQueue(String queueName,
                                       String content,
                                       Long delay,
                                       TimeUnit timeUnit) {
              validateParam(queueName, content);
              try {
                  // 获取Redisson的阻塞队列实例
                  RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queueName);
                  // 基于阻塞队列创建延迟队列
                  RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
                  // 向延迟队列投递元素(content为内容,delay为延迟时间,timeUnit为时间单位)
                  delayedQueue.offer(content, delay, timeUnit);
                  log.info("添加延时队列成功:queueName={},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");
              } catch (Exception e) {
                  log.error("添加延时队列失败:{}", e.getMessage());
                  throw new RuntimeException("添加延时队列失败:"+queueName);
              }
              return true;
          }
      
          /**
           * 获取延迟队列
           *
           * @param queueName
           */
          public String getDelayQueue(String queueName) throws InterruptedException {
              if (StringUtils.isBlank(queueName)) {
                  throw new ServiceException("队列名不能为空");
              }
              RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queueName);
              return blockingQueue.take();
          }
      
          /**
           * 删除指定队列中的消息
           *
           * @param content 指定删除的消息对象队列值(同队列需保证唯一性)
           * @param queueName 指定队列键
           */
          public boolean removeDelayedQueue(String queueName,String content) {
              validateParam(queueName, content);
              RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queueName);
              RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
              return delayedQueue.remove(content);
          }
      
          /**
           * 将消息发送到死信队列
           * @param queueName 原始队列名称
           * @param content   消息内容
           */
          public void sendToDeadLetterQueue(String queueName, String content) {
              RList<String> deadLetterQueue = redissonClient.getList(properties.getDeadLine()+":"+queueName);
              deadLetterQueue.add(content);
              log.warn("消息已放入死信队列: queueName={}, content={}", queueName, content);
          }
      
          /**
           * 校验参数
           *
           * @param queueName 消息主题
           * @param content   消息体
           */
          private void validateParam(String queueName, String content) {
              List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueueConfigs();
              if(CollUtil.isEmpty(queues)){
                  throw new ServiceException("请配置队列名");
              }
              if (CharSequenceUtil.isBlank(queueName)) {
                  throw new ServiceException("队列名不能为空");
              }
              boolean b = queues.stream().noneMatch(q -> q.getQueueName().equals(queueName));
              if (b) {
                  throw new ServiceException("没有配置该队列");
              }
              if (CharSequenceUtil.isBlank(content)) {
                  throw new ServiceException("消息体不能为空");
              }
          }
      }

      消费者

      这里我们创建一个接口,这样在刚才配置中根据配置文件,就能根据队列名选择对应的消费者

      /**
       * @Description: 延迟队列执行方法,需要具体实现
       */
      public interface RedisDelayQueueHandler{
      
          /**
           * @Description 执行方法
           */
          void exec(String queueName,String content);
      }
      import com.ruoyi.redismessage.handler.RedisDelayQueueHandler;
      import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;
      import javax.annotation.Resource;
      
      /**
       * @Description: 商品延时消费处理类
       */
      @Component
      @Slf4j
      public class GoodsDelayConsumeHandler implements RedisDelayQueueHandler {
      
          @Resource
          private RedisDelayQueueUtil redisDelayQueueUtil;
      
          @Override
          public void exec(String queueName,String content) {
              try {
                  log.info("开始消费:queueName={},content={}",queueName, content);
              }catch (Exception e) {
                  log.error("消费错误:{}", e.getMessage());
                  redisDelayQueueUtil.sendToDeadLetterQueue(queueName, content);
              }
          }
      }
      import com.ruoyi.redismessage.handler.RedisDelayQueueHandler;
      import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;
      import javax.annotation.Resource;
      
      /**
       * @Description: 订单延时消费处理类
       */
      @Component
      @Slf4j
      public class OrderDelayConsumeHandler implements RedisDelayQueueHandler {
      
          @Resource
          private RedisDelayQueueUtil redisDelayQueueUtil;
      
          @Override
          public void exec(String queueName,String content) {
              try {
                  log.info("开始消费:queueName={},content={}",queueName, content);
              }catch (Exception e) {
                  log.error("消费错误:{}", e.getMessage());
                  redisDelayQueueUtil.sendToDeadLetterQueue(queueName, content);
              }
          }
      }

      生产者

      import lombok.Data;
      import java.io.Serializable;
      
      @Data
      public class Msg implements Serializable {
          String queueName;
          String content;
          long delay;
      }

      生产者只需要传入队列名、消息体、延迟时间

      import com.ruoyi.common.core.domain.R;
      import com.ruoyi.redismessage.model.Msg;
      import com.ruoyi.redismessage.porpertise.RedisDelayQueueConfigProperties;
      import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
      import io.swagger.annotations.Api;
      import io.swagger.annotations.ApiOperation;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.web.bind.annotation.PostMapping;
      import org.springframework.web.bind.annotation.RequestBody;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      import javax.annotation.Resource;
      import java.util.List;
      
      @RestController
      @RequestMapping("/msg")
      @Api(tags = "MsgController")
      @Slf4j
      public class MsgController {
      
          @Resource
          RedisDelayQueueConfigProperties properties;
          @Resource
          private RedisDelayQueueUtil redisDelayQueueUtil;
      
          @PostMapping("/addDelayQueue")
          @ApiOperation(value = "添加延时消息")
          public R<?> addDelayQueue(@RequestBody Msg msg) {
              // 模拟业务中添加延迟任务
              boolean b = redisDelayQueueUtil.addDelayQueue(
                      msg.getQueueName(),
                      msg.getContent(),
                      msg.getDelay()
                      );
              return R.toR(b);
          }
      
          @PostMapping("/findQueues")
          @ApiOperation(value = "查询可用队列")
          public R<?> findQueues() {
              List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueueConfigs();
              return R.okList(queues,queues.size());
          }
      
      }

      测试流程

      1、首先查一下可用的队列,只有配置了的才能使用

      2、然后我们往goodsDelayQueue队列插入两条数据,延时时间分别为0s、60s

      3、往redis插入数据

      这里每个创建两个key

      1、redisson_delay_queue:{队列名}

      2、redisson_delay_queue_timeout:{队列名}

      第1个key是由redisson_delay_queue固定前缀+队列名组成,里面的值是ist集合

      第2个key是由redisson_delay_queue_timeout固定前缀+队列名组成,里面的值是Zset集合

      Zset集合解释

      redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
      任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
      直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。

      当Zset到了执行时间,队列的线程就会从take中获取到数据,然后再从线程池获取一个线程交给消费者处理

      当数据take后,redis里面的数据就删除了,而且多实例的消费者只会有一个take进行消费

      为何不会出现两个消费者同时消费?

      • 竞争机制‌:多个消费者监听同一队列时,take() 方法基于 Redis 的原子命令(如 BLPOP)实现。当消息可用时,Redis 确保只有一个客户端连接能获取消息‌。
      • 阻塞行为‌:消费者调用 take() 后会阻塞,直到消息到达。假设队列为空,两个消费者均阻塞;当一条消息到期,第一个成功获取的消费者解除阻塞,消息被移除队列,第二个消费者继续阻塞‌。
      • 示例场景‌:如果一条消息到期,端口 9000 的服务可能先获取它,端口 9001 的服务则保持阻塞状态,直到下一条消息可用‌

      注意事项

      消息丢失

      一旦消息被一个消费者通过take获取并移除,其他消费者无法再访问该消息‌,那么如果take后的消息处理失败,那么这个处理失败的消息就相当于丢失了,所以我们应该记录消费失败的消息,可作为记录查看。当前我们使用redisson将失败的消息存入新的list集合中,方便记录。

      数据量太大

      redis消息队列毕竟不是正统的mq,只能处理小规模的消息发送处理,如果数据量比较大,比如很多数据的到期时间都一致,那么达到到期时间后,每个队列只有一个线程去处理,这个时候可能压力很大,所以如果数据量太大,需要考虑MQ消息队列

      线程配置

      项目中有两处可以配置线程

      监听消息的线程:考虑如果到期时间有大量任务,单线程可能处理过慢,所以可以配置大一点

      消费消息线程:消费消息的线程可以设置大一点,异步处理消费任务

      参考:监听消息的线程配置为1个,压测100个任务同时到期,可以顺利完成

      Logo

      助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

      更多推荐