一、回顾消息队列

消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于数据通信来进行分布式系统的集成。

通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦弹性伸缩冗余存储流量削峰异步通信数据同步 等等功能,其作为 分布式系统架构 中的一个重要组件,有着举足轻重的地位。

mq_overview

现在回顾下,我们使用的消息队列,一般都有什么样的特点:

  • 三个角色:生产者、消费者、消息处理中心
  • 异步处理模式:生产者 将消息发送到一条 虚拟的通道(消息队列)上,而无须等待响应。消费者 则 订阅 或是 监听 该通道,取出消息。两者互不干扰,甚至都不需要同时在线,也就是我们说的 松耦合
  • 可靠性:消息要可以保证不丢失、不重复消费、有时可能还需要顺序性的保证

撇开我们常用的消息中间件不说,你觉得 Redis 的哪些数据类型可以满足 MQ 的常规需求~~

二、Redis 实现消息队列

思来想去,只有 List 和 Streams 两种数据类型,可以实现消息队列的这些需求,当然,Redis 还提供了发布、订阅(pub/sub) 模式。

我们逐一看下这 3 种方式的使用和场景。

2.1 List 实现消息队列

Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。

所以常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理。

Redis 提供了好几对 List 指令,先大概看下这些命令,混个眼熟

List 常用命令

挑几个弹入、弹出的命令就可以组合出很多姿势

  • LPUSH、RPOP 左进右出
  • RPUSH、LPOP 右进左出

127.0.0.1:6379> lpush mylist a a b c d e
(integer) 6
127.0.0.1:6379> rpop mylist
“a”
127.0.0.1:6379> rpop mylist
“a”
127.0.0.1:6379> rpop mylist
“b”
127.0.0.1:6379>

redis-RPOP

即时消费问题

通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时的处理数据,就要在程序中写个类似 while(true) 这样的逻辑,不停地去调用 RPOP 或 LPOP 命令,这就会给消费者程序带来些不必要的性能损失。

所以,Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。

  • LPUSH、BRPOP 左进右阻塞出
  • RPUSH、BLPOP 右进左阻塞出

127.0.0.1:6379> lpush yourlist a b c d
(integer) 4
127.0.0.1:6379> blpop yourlist 10
1) “yourlist”
2) “d”
127.0.0.1:6379> blpop yourlist 10
1) “yourlist”
2) “c”
127.0.0.1:6379> blpop yourlist 10
1) “yourlist”
2) “b”
127.0.0.1:6379> blpop yourlist 10
1) “yourlist”
2) “a”
127.0.0.1:6379> blpop yourlist 10
(nil)
(10.02s)

如果将超时时间设置为 0 时,即可无限等待,直到弹出消息

因为 Redis 单线程的特点,所以在消费数据时,同一个消息会不会同时被多个 consumer 消费掉,但是需要我们考虑消费不成功的情况。

可靠队列模式 | ack 机制

以上方式中, List 队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息。究其原因,就是缺少消息确认机制。

为了保证消息的可靠性,消息队列都会有完善的消息确认机制(Acknowledge),即消费者向队列报告消息已收到或已处理的机制。

Redis List 怎么搞一搞呢?

再看上边的表格中,有两个命令, RPOPLPUSH、BRPOPLPUSH (阻塞)从一个 list 中获取消息的同时把这条消息复制到另一个 list 里(可以当做备份),而且这个过程是原子的。

这样我们就可以在业务流程安全结束后,再删除队列元素,实现消息确认机制。

127.0.0.1:6379> rpush myqueue one
(integer) 1
127.0.0.1:6379> rpush myqueue two
(integer) 2
127.0.0.1:6379> rpush myqueue three
(integer) 3
127.0.0.1:6379> rpoplpush myqueue queuebak
“three”
127.0.0.1:6379> lrange myqueue 0 -1
1) “one”
2) “two”
127.0.0.1:6379> lrange queuebak 0 -1
1) “three”

redis-rpoplpush

之前做过的项目中就有用到这样的方式去处理数据,数据标识从一个 List 取出后放入另一个 List,业务操作安全执行完成后,再去删除 List 中的数据,如果有问题的话,很好回滚。

当然,还有更特殊的场景,可以通过 zset 来实现延时消息队列,原理就是将消息加到 zset 结构后,将要被消费的时间戳设置为对应的 score 即可,只要业务数据不会是重复数据就 OK。

2.2 订阅与发布实现消息队列

我们都知道消息模型有两种

  • 点对点:Point-to-Point(P2P)
  • 发布订阅:Publish/Subscribe(Pub/Sub)

List 实现方式其实就是点对点的模式,下边我们再看下 Redis 的发布订阅模式(消息多播),这才是“根正苗红”的 Redis MQ

redis-pub_sub

"发布/订阅"模式同样可以实现进程间的消息传递,其原理如下:

"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。

Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式。

这个 频道 和 模式 有什么区别呢?

频道我们可以先理解为是个 Redis 的 key 值,而模式,可以理解为是一个类似正则匹配的 Key,只是个可以匹配给定模式的频道。这样就不需要显式地去订阅多个名称了,可以通过模式订阅这种方式,一次性关注多个频道。

我们启动三个 Redis 客户端看下效果:

redis-subscribe

先启动两个客户端订阅(subscribe) 名字叫 framework 的频道,然后第三个客户端往 framework 发消息,可以看到前两个客户端都会接收到对应的消息:

redis-publish

我们可以看到订阅的客户端每次可以收到一个 3 个参数的消息,分别为:

  • 消息的种类
  • 始发频道的名称
  • 实际的消息

再来看下订阅符合给定模式的频道,这回订阅的命令是 PSUBSCRIBE

redis-psubscribe

我们往 java.framework 这个频道发送了一条消息,不止订阅了该频道的 Consumer1 和 Consumer2 可以接收到消息,订阅了模式 java.* 的 Consumer3 和 Consumer4 也可以接收到消息。

redis-psubscribe1

Pub/Sub 常用命令:

2.3 Streams 实现消息队列

Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

后来 Redis 的父亲 Antirez,又单独开启了一个叫 Disque 的项目来完善这些问题,但是没有做起来,github 的更新也定格在了 5 年前,所以我们就不讨论了。

Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

它就像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的。

redis-stream

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

Streams 是 Redis 专门为消息队列设计的数据类型,所以提供了丰富的消息队列操作命令。

Stream 常用命令

CRUD 工程师上线

增删改查来一波

# * 号表示服务器自动生成 ID,后面顺序跟着一堆 key/value
127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3
“1609404470049-0”  ## 生成的消息 ID,有两部分组成,毫秒时间戳-该毫秒内产生的第1条消息

# 消息ID 必须要比上个 ID 大
127.0.0.1:6379> xadd mystream 123 f4 v4  
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

# 自定义ID
127.0.0.1:6379> xadd mystream 1609404470049-1 f4 v4
“1609404470049-1”

# -表示最小值 , + 表示最大值,也可以指定最大消息ID,或最小消息ID,配合 -、+ 使用
127.0.0.1:6379> xrange mystream - +
1) 1) “1609404470049-0”
2) 1) “f1”
2) “v1”
3) “f2”
4) “v2”
5) “f3”
6) “v3”
2) 1) “1609404470049-1”
2) 1) “f4”
2) “v4”

127.0.0.1:6379> xdel mystream 1609404470049-1
(integer) 1
127.0.0.1:6379> xlen mystream
(integer) 1
# 删除整个 stream
127.0.0.1:6379> del mystream
(integer) 1

独立消费

xread 以阻塞或非阻塞方式获取消息列表,指定 BLOCK 选项即表示阻塞,超时时间 0 毫秒(意味着永不超时)

# 从ID是0-0的开始读前2条
127.0.0.1:6379> xread count 2 streams mystream 0
1) 1) “mystream”
2) 1) 1) “1609405178536-0”
2) 1) “f5”
2) “v5”
2) 1) “1609405198676-0”
2) 1) “f1”
2) “v1”
3) “f2”
4) “v2”

# 阻塞的从尾部读取流,开启新的客户端xadd后发现这里就读到了,block 0 表示永久阻塞
127.0.0.1:6379> xread block 0 streams mystream $
1) 1) “mystream”
2) 1) 1) “1609408791503-0”
2) 1) “f6”
2) “v6”
(42.37s)

可以看到,我并没有给流 mystream 传入一个常规的 ID,而是传入了一个特殊的 ID $这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。以便我们仅接收从我们开始监听时间以后的新消息。这在某种程度上相似于 Unix 命令tail -f。

当然,也可以指定任意有效的 ID。

而且, XREAD 的阻塞形式还可以同时监听多个 Strema,只需要指定多个键名即可。

127.0.0.1:6379> xread block 0 streams mystream yourstream       

最后

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长,自己不成体系的自学效果低效漫长且无助。

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,不论你是刚入门Android开发的新手,还是希望在技术上不断提升的资深开发者,这些资料都将为你打开新的学习之门!

如果你觉得这些内容对你有帮助,需要这份全套学习资料的朋友可以戳我获取!!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
77)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,不论你是刚入门Android开发的新手,还是希望在技术上不断提升的资深开发者,这些资料都将为你打开新的学习之门!

如果你觉得这些内容对你有帮助,需要这份全套学习资料的朋友可以戳我获取!!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐