Redisson 实现消息队列、延迟队列 (生产可用)
前言
之前使用redis实现了消息队列,但是没有延迟消费的功能,现在编写一个可以实现延迟消费的功能,同时也能满足及时消费,只需要将延迟时间设置0就行了,用到了Redission,不需要基于stream进行一些复杂配置。
原理概述
Redisson 将任务按照预定的执行时间存储在 zset 中,任务的执行时间作为 score,任务本身序列化后的数据作为 member。同时,Redisson 会在后台持续监控这个 ZSET,一旦发现有符合执行条件(当前时间 >= score)的任务,就会自动将这些任务转移到另一个 RQueue(Redisson 的队列实现)中,应用程序只需要从 RQueue 中取出任务执行即可。
参考文章
【redis缓存】怎么使用 Redis 实现一个延时队列?_redis实现延时队列-CSDN博客
Redisson 的延迟队列真的能用吗?一文看透原理 + 坑点_redission延时队列原理-CSDN博客
Spring Boot 集成 Redisson 实现消息队列_springboot redis消息队列-CSDN博客
【使用redisson完成延迟队列的功能】使用redisson配合线程池完成异步执行功能,延迟队列和不需要延迟的队列_redisson延时阻塞队列-CSDN博客
引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.18</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.7.18</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.5</version>
</dependency>
配置文件
server:
port: ${SERVER_PORT:9211}
# Spring
spring:
application:
# 应用名称
name: ruoyi-redis-msg
redis:
host: localhost
port: 6379
password: 123456
mvc:
pathmatch:
matching-strategy: ant_path_matcher
# redission延迟队列相关配置
redission:
delayqueue:
enable: true # 是否启用
# handler线程池配置
# 当任务数≤corePoolSize时:所有任务由核心线程直接执行
# 当corePoolSize<任务数≤blockingQueueNum+corePoolSize时:
# 前corePoolSize个任务由核心线程执行 剩余任务会触发线程扩容(最多到maximumPoolSize个线程),后面的任务进入队列等待
# 当任务数>blockingQueueNum+corePoolSize时:将触发拒绝策略(默认是AbortPolicy)
handlerConfig:
corePoolSize: 5 # 核心线程数:线程池中保持活跃的最小线程数,即使它们处于空闲状态
maximumPoolSize: 10 # 最大线程数: 线程池允许创建的最大线程数
keepAliveTime: 30 # 空闲线程存活时间:当线程数超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间
blockingQueueNum: 1000 # 队列的最大容量为1000个任务当队列已满时,新提交的任务会被阻塞(取决于线程池的拒绝策略)
deadLine: dead_line # 死信队列名前缀
queueThreads: 1 # 监听队列的线程数,不宜设置过大,范围建议1-5
queueConfigs:
- queueName: goodsDelayQueue # 延迟队列名 Redis Key
beanId: goodsDelayConsumeHandler # 延迟队列具体业务实现的Bean,可通过Spring的上下文获取
desc: 商品消费的延迟队列 # 延迟队列名描述
- queueName: orderDelayQueue
beanId: orderDelayConsumeHandler
desc: 订单消费的延迟队列
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@ConfigurationProperties(prefix = "redission.delayqueue")
@Slf4j
@Data
public class RedisDelayQueueConfigProperties {
private List<QueueConfig> queueConfigs;
private HandlerConfig handlerConfig;
private String deadLine;
private int queueThreads;
@Data
public static class QueueConfig {
private String queueName;
private String desc;
private String beanId;
}
@Data
public static class HandlerConfig {
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private int blockingQueueNum;
}
}
redission配置类
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
@Slf4j
public class RedissonConfig {
private final static String REDISSON_PREFIX = "redis://";
@Resource
RedisProperties redisProperties;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String url = REDISSON_PREFIX + redisProperties.getHost() + ":" + redisProperties.getPort();
config.useSingleServer()
.setAddress(url)
.setPassword(redisProperties.getPassword())
.setDatabase(redisProperties.getDatabase())
.setPingConnectionInterval(2000); //设置2秒心跳间隔
config.setLockWatchdogTimeout(10000L); //看门狗超时缩短(分布式锁自动续期更灵敏)显式设置为10秒
config.setCodec(new JsonJacksonCodec()); //配置redisson序列化方式
try {
return Redisson.create(config);
} catch (Exception e) {
log.error("RedissonClient init redis url:{}Exception:{}", url, e.getMessage());
return null;
}
}
}
延迟队列配置类
下面的配置类,会为每个队列创建一个新的线程并且循环运行,阻塞监听队列(当队列有元素则take获取,如果没有则阻塞等待)
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.redismessage.handler.RedisDelayQueueHandler;
import com.ruoyi.redismessage.porpertise.RedisDelayQueueConfigProperties;
import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.*;
/**
* 延迟队列配置类
* 启动延迟队列监测扫描,文件处理的延迟队列线程池
*/
@Configuration
@ConditionalOnProperty(value = "redission.delayqueue.enable")
@Slf4j
public class RedisDelayQueueConfig {
@Resource
RedisDelayQueueConfigProperties configProperties;
@Resource
RedisDelayQueueUtil redisDelayQueueUtil;
@PostConstruct
public void validateConfig() {
if (configProperties.getHandlerConfig().getCorePoolSize() >
configProperties.getHandlerConfig().getMaximumPoolSize()) {
throw new IllegalArgumentException("corePoolSize不能大于maximumPoolSize");
}
if (configProperties.getHandlerConfig().getBlockingQueueNum()<=0) {
throw new IllegalArgumentException("blockingQueueNum不能小于0");
}
if (StringUtils.isBlank(configProperties.getDeadLine())){
throw new IllegalArgumentException("请配置deadLine");
}
}
/**
* @Description 线程池 ——> 用于handle处理消费
*/
@Bean("handlerExecutor")
public ExecutorService getHandleExecutor() {
return new ThreadPoolExecutor(
configProperties.getHandlerConfig().getCorePoolSize(),
configProperties.getHandlerConfig().getMaximumPoolSize(),
configProperties.getHandlerConfig().getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(configProperties.getHandlerConfig().getBlockingQueueNum()),
new ThreadFactoryBuilder().setNameFormat("handler-queue-%d").build()
);
}
@Bean
public List<RedisDelayQueueConfigProperties.QueueConfig> startRedisDelayQueue(ApplicationContext applicationContext,
@Qualifier("handlerExecutor") ExecutorService handlerExecutor) {
//根据配置的队列创建对应的线程,
List<RedisDelayQueueConfigProperties.QueueConfig> queueConfigs = configProperties.getQueueConfigs();
for (RedisDelayQueueConfigProperties.QueueConfig queueConfig : queueConfigs) {
if (configProperties.getQueueThreads()>1){
// 1个队列对应N个线程
startMultiThreadListen(applicationContext, handlerExecutor, queueConfig,configProperties.getQueueThreads());
}else {
// 1个队列对应1个线程
startSingleThread(applicationContext, handlerExecutor, queueConfig);
}
}
return queueConfigs;
}
private void startMultiThreadListen(ApplicationContext applicationContext,
ExecutorService handlerExecutor,
RedisDelayQueueConfigProperties.QueueConfig queueConfig,
int queueThreads) {
for (int i = 0; i < queueThreads; i++) {
log.info("启动监听队列线程 queue={}", queueConfig.getQueueName() + "-" + i);
Thread thread = new Thread(() -> listenQueue(applicationContext, handlerExecutor, queueConfig));
thread.setName(queueConfig.getQueueName() + "-" + i);
thread.start();
}
}
private void startSingleThread(ApplicationContext applicationContext,
ExecutorService handlerExecutor,
RedisDelayQueueConfigProperties.QueueConfig queueConfig) {
log.info("启动监听队列线程 queue={}", queueConfig.getQueueName());
// 由于此线程需要常驻,可以新建线程,不用交给线程池管理
Thread thread = new Thread(() -> {
listenQueue(applicationContext, handlerExecutor, queueConfig);
});
thread.setName(queueConfig.getQueueName());
thread.start();
}
private void listenQueue(ApplicationContext applicationContext,
ExecutorService handlerExecutor,
RedisDelayQueueConfigProperties.QueueConfig queueConfig) {
while (true) {
String content ="";
try {
// 从阻塞队列中获取被执行对象,为空时阻塞,作为参数传递给redisDelayQueueHandler
content = redisDelayQueueUtil.getDelayQueue(queueConfig.getQueueName());
// 获取到执行类
log.info("监听队列成功:queueName={},content={},submit handler={}", queueConfig.getQueueName(),content,queueConfig.getBeanId());
RedisDelayQueueHandler redisDelayQueueHandler = applicationContext.getBean(queueConfig.getBeanId(),RedisDelayQueueHandler.class);
String finalContent = content;
//线程池执行
handlerExecutor.submit(() -> redisDelayQueueHandler.exec(queueConfig.getQueueName(),finalContent));
} catch (Exception e) {
log.error("执行失败:", e);
// 将失败的消息放入死信队列
redisDelayQueueUtil.sendToDeadLetterQueue(queueConfig.getQueueName(), content);
}
}
}
}
工具类
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.CharSequenceUtil;
import com.ruoyi.common.core.exception.ServiceException;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.redismessage.porpertise.RedisDelayQueueConfigProperties;
import io.micrometer.core.instrument.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Description: 延迟队列增删工具类
*/
@Slf4j
@Component
public class RedisDelayQueueUtil {
@Resource
private RedissonClient redissonClient;
@Resource
private RedisDelayQueueConfigProperties properties;
/**
* 添加延时队列
* @param queueName
* @param content
* @param delay
* @return
*/
public boolean addDelayQueue(String queueName, String content,Long delay) {
return addDelayQueue(queueName,content,delay,TimeUnit.SECONDS);
}
/**
* 添加延时队列
* @param queueName
* @param content
* @param endTime
* @return
*/
public boolean addDelayQueue(String queueName, String content, Date endTime) {
long seconds = DateUtils.diffTime(DateUtils.getNowDate(), endTime);
if (seconds <= 0) {
log.error("不能小于当前时间");
throw new RuntimeException("不能小于当前时间");
}
return addDelayQueue(queueName,content, seconds, TimeUnit.SECONDS);
}
/**
* @Description 添加延迟队列
*/
private boolean addDelayQueue(String queueName,
String content,
Long delay,
TimeUnit timeUnit) {
validateParam(queueName, content);
try {
// 获取Redisson的阻塞队列实例
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queueName);
// 基于阻塞队列创建延迟队列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
// 向延迟队列投递元素(content为内容,delay为延迟时间,timeUnit为时间单位)
delayedQueue.offer(content, delay, timeUnit);
log.info("添加延时队列成功:queueName={},content={},delay={}", queueName,content,timeUnit.toSeconds(delay) + "秒");
} catch (Exception e) {
log.error("添加延时队列失败:{}", e.getMessage());
throw new RuntimeException("添加延时队列失败:"+queueName);
}
return true;
}
/**
* 获取延迟队列
*
* @param queueName
*/
public String getDelayQueue(String queueName) throws InterruptedException {
if (StringUtils.isBlank(queueName)) {
throw new ServiceException("队列名不能为空");
}
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queueName);
return blockingQueue.take();
}
/**
* 删除指定队列中的消息
*
* @param content 指定删除的消息对象队列值(同队列需保证唯一性)
* @param queueName 指定队列键
*/
public boolean removeDelayedQueue(String queueName,String content) {
validateParam(queueName, content);
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
return delayedQueue.remove(content);
}
/**
* 将消息发送到死信队列
* @param queueName 原始队列名称
* @param content 消息内容
*/
public void sendToDeadLetterQueue(String queueName, String content) {
RList<String> deadLetterQueue = redissonClient.getList(properties.getDeadLine()+":"+queueName);
deadLetterQueue.add(content);
log.warn("消息已放入死信队列: queueName={}, content={}", queueName, content);
}
/**
* 校验参数
*
* @param queueName 消息主题
* @param content 消息体
*/
private void validateParam(String queueName, String content) {
List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueueConfigs();
if(CollUtil.isEmpty(queues)){
throw new ServiceException("请配置队列名");
}
if (CharSequenceUtil.isBlank(queueName)) {
throw new ServiceException("队列名不能为空");
}
boolean b = queues.stream().noneMatch(q -> q.getQueueName().equals(queueName));
if (b) {
throw new ServiceException("没有配置该队列");
}
if (CharSequenceUtil.isBlank(content)) {
throw new ServiceException("消息体不能为空");
}
}
}
消费者
这里我们创建一个接口,这样在刚才配置中根据配置文件,就能根据队列名选择对应的消费者
/**
* @Description: 延迟队列执行方法,需要具体实现
*/
public interface RedisDelayQueueHandler{
/**
* @Description 执行方法
*/
void exec(String queueName,String content);
}
import com.ruoyi.redismessage.handler.RedisDelayQueueHandler;
import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description: 商品延时消费处理类
*/
@Component
@Slf4j
public class GoodsDelayConsumeHandler implements RedisDelayQueueHandler {
@Resource
private RedisDelayQueueUtil redisDelayQueueUtil;
@Override
public void exec(String queueName,String content) {
try {
log.info("开始消费:queueName={},content={}",queueName, content);
}catch (Exception e) {
log.error("消费错误:{}", e.getMessage());
redisDelayQueueUtil.sendToDeadLetterQueue(queueName, content);
}
}
}
import com.ruoyi.redismessage.handler.RedisDelayQueueHandler;
import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description: 订单延时消费处理类
*/
@Component
@Slf4j
public class OrderDelayConsumeHandler implements RedisDelayQueueHandler {
@Resource
private RedisDelayQueueUtil redisDelayQueueUtil;
@Override
public void exec(String queueName,String content) {
try {
log.info("开始消费:queueName={},content={}",queueName, content);
}catch (Exception e) {
log.error("消费错误:{}", e.getMessage());
redisDelayQueueUtil.sendToDeadLetterQueue(queueName, content);
}
}
}
生产者
import lombok.Data;
import java.io.Serializable;
@Data
public class Msg implements Serializable {
String queueName;
String content;
long delay;
}
生产者只需要传入队列名、消息体、延迟时间
import com.ruoyi.common.core.domain.R;
import com.ruoyi.redismessage.model.Msg;
import com.ruoyi.redismessage.porpertise.RedisDelayQueueConfigProperties;
import com.ruoyi.redismessage.utils.RedisDelayQueueUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@RestController
@RequestMapping("/msg")
@Api(tags = "MsgController")
@Slf4j
public class MsgController {
@Resource
RedisDelayQueueConfigProperties properties;
@Resource
private RedisDelayQueueUtil redisDelayQueueUtil;
@PostMapping("/addDelayQueue")
@ApiOperation(value = "添加延时消息")
public R<?> addDelayQueue(@RequestBody Msg msg) {
// 模拟业务中添加延迟任务
boolean b = redisDelayQueueUtil.addDelayQueue(
msg.getQueueName(),
msg.getContent(),
msg.getDelay()
);
return R.toR(b);
}
@PostMapping("/findQueues")
@ApiOperation(value = "查询可用队列")
public R<?> findQueues() {
List<RedisDelayQueueConfigProperties.QueueConfig> queues = properties.getQueueConfigs();
return R.okList(queues,queues.size());
}
}
测试流程
1、首先查一下可用的队列,只有配置了的才能使用
2、然后我们往goodsDelayQueue队列插入两条数据,延时时间分别为0s、60s
3、往redis插入数据
这里每个创建两个key
1、redisson_delay_queue:{队列名}
2、redisson_delay_queue_timeout:{队列名}
第1个key是由redisson_delay_queue固定前缀+队列名组成,里面的值是ist集合
第2个key是由redisson_delay_queue_timeout固定前缀+队列名组成,里面的值是Zset集合
Zset集合解释
redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。
当Zset到了执行时间,队列的线程就会从take中获取到数据,然后再从线程池获取一个线程交给消费者处理
当数据take后,redis里面的数据就删除了,而且多实例的消费者只会有一个take进行消费
为何不会出现两个消费者同时消费?
- 竞争机制:多个消费者监听同一队列时,
take()
方法基于 Redis 的原子命令(如BLPOP
)实现。当消息可用时,Redis 确保只有一个客户端连接能获取消息。 - 阻塞行为:消费者调用
take()
后会阻塞,直到消息到达。假设队列为空,两个消费者均阻塞;当一条消息到期,第一个成功获取的消费者解除阻塞,消息被移除队列,第二个消费者继续阻塞。 - 示例场景:如果一条消息到期,端口 9000 的服务可能先获取它,端口 9001 的服务则保持阻塞状态,直到下一条消息可用
注意事项
消息丢失
一旦消息被一个消费者通过take获取并移除,其他消费者无法再访问该消息,那么如果take后的消息处理失败,那么这个处理失败的消息就相当于丢失了,所以我们应该记录消费失败的消息,可作为记录查看。当前我们使用redisson将失败的消息存入新的list集合中,方便记录。
数据量太大
redis消息队列毕竟不是正统的mq,只能处理小规模的消息发送处理,如果数据量比较大,比如很多数据的到期时间都一致,那么达到到期时间后,每个队列只有一个线程去处理,这个时候可能压力很大,所以如果数据量太大,需要考虑MQ消息队列
线程配置
项目中有两处可以配置线程
监听消息的线程:考虑如果到期时间有大量任务,单线程可能处理过慢,所以可以配置大一点
消费消息线程:消费消息的线程可以设置大一点,异步处理消费任务
参考:监听消息的线程配置为1个,压测100个任务同时到期,可以顺利完成
更多推荐
所有评论(0)