深入理解Stream消费者组工作模式
目录
前言
这周在对终端网络聊天室项目进行实现异步消息队列功能的时候, 要利用到redis来实现消息持久化。针对实现消息对列功能redis提供的类型有列表List, 发布订阅PubSub以及stream流, 其中List结构也可以作为一个消息队列来实现数据持久化, 但是存在消息丢失, 只支持单消费者模式等缺陷; 而PubSub虽然采用发布订阅模式, 支持多生产多消费模型, 却不支持数据持久化, 同时也无法避免消息丢失和队列溢出的缺点。所以redis在5.0版本设计了一款解决上面所有问题, 功能完善的也就是本文探讨的基于Stream的消息队列。
单消费者模式
1. 发送数据XADD, 这条命令是通用的, 无论是单消费者还是消费者组。
2. 读取数据单消费者模式就是跟List工作模式类似, 工作方式是直接使用 XREAD 命令从 Stream 尾部(或指定 ID)读取消息, 但是其中不同消费者可以重复读取。
消费者1:
消费者2:
可以看出不同的消费者都可以重复读取同一条数据。
当然也可以加入阻塞block时间来获取新消息避免返回nil值
此时消费者2阻塞读取数据:
消费者1发送数据:
此刻消费者2接收到数据:
应用场景特点:
- 消息处理逻辑简单。
- 吞吐量低,单个消费者足以应对。
- 不需要高可用或负载均衡。
消费者组模式
不同于单消费者模式, 消费者组引入了许多机制, 是Stream最强大的特性, 旨在解决高并发、高可靠、可扩展的消息处理需求:
1. 概念:
2. 核心特点如下:
-
组内竞争消费: 设计了消费者组的模式, 同一个消费者组内的多个消费者共同消费同一条Stream队列中的消息, 实现了负载均衡, 每条消息在被确认之前( ACK )只会投递给一个消费者, 确保消息不会被重复处理
-
独立消费进度: 每个消费者都有维护自己独立的标示也被称为投递id, 用来记录改组当前消费的位置, 也就是说不同的消费者组可以有不同的执行速度或顺序来处理消息
-
待处理消息列表( pending-list ): 记录已被消费者读取但尚未被ACK确认的消息, 用来实现服务出现异常故障恢复和监控的关键部分
其记录了: 1.消息id 2.负责处理的消费者名称 3.上次投递经过的时间 4.消息被重新投递的次数
-
手动确认机制( ACK ): 消费者必须显式地使用XACK命令来确认一条消息已经被成功处理, 只有这样消息才会被从pending-list中删除, 投递id才能向前推进
-
可通过 PEL进行失败重投递( Pending Entries List): 通过
XPENDING命令可以查询 PEL找出长时间未被确认或被多次尝试处理的消息, 监控程序可以使用XCLAIM命令, 将属于已宕机消费者的待处理消息, 强制转移给另一个健康的消费者进行处理
简单的理解就是设计了 消费者组, PEL队列, ACK确认机制, 重投递机制
创建消费者组:
消费者读取操作:
在消费者组模式中正确消费方式:只用 XREADGROUP
- 必须使用 XREADGROUP 来从消费者组读取消息。
- 切勿使用
XREAD它会绕过 PEL 和 ACK 机制,导致重复消费或状态混乱。 - 消费者名称由应用在首次调用时指定,Redis 自动创建;可通过
XINFO CONSUMERS查看所有活跃消费者
eg:
创建消费者组g1并read读取( 即消费者c1, 从Stream队列s1监听的消费者组g1中, 读取下一个未消费的消息, 等待时间为2000ms ):
消费者c1
消费者c2
可以看出不同消费者获取下一个未消费的数据时, 是不会重复读取的, 也就是对应第一个功能组内竞争消费, 当数据被消费也就是被read之后需要被ACK手动确认, 数据才会从PEL中被移除, 若不ACK, 消息将一直保留在 PEL 中, 可供重试。

只有消息被手动确认之后才会从PEL中删除, 保证每一个消息被完整处理, 从而解决消息丢失问题。当消息被删除之后, 投递id才会自增继续处理下一个消息。当消息被消费但长时间未确认时, 可以通过 XPENDING 命令( 发现问题 )可以查询 PEL找出长时间未被确认或被多次尝试处理的消息, 监控程序可以使用 XCLAIM 命令( 手动干预恢复 ), 将属于已宕机消费者的待处理消息, 强制转移给另一个健康的消费者进行处理。


总结
Redis中的Stream是一种高效的消息队列机制, 其参考了kafka的设计模式, 允许生产者向流中添加消息, 同时支持消费者组以可靠的方式竞争和处理这些消息。通过内置的待处理列表(PEL)确保每条消息仅被一个消费者处理, 并通过确认机制(XACK)来标记消息为已成功处理, 是一个功能相对完善的的消息队列类型。最后附上一张个人总结的流程图和项目中集成的相关函数:

// AddToStream 向Stream中发送消息
func (msg *ChatMessage) AddToStream(streamName string) error {
msgData, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("json序列化失败: %w", err)
}
err = RDB.XAdd(context.Background(), &redis.XAddArgs{
Stream: streamName,
MaxLen: 10000, // 限制10000条消息记录
Approx: true, // 近似删除
Values: map[string]interface{}{
"message": msgData,
},
}).Err()
if err != nil {
return fmt.Errorf("消息写入stream失败: %w", err)
}
return nil
}
// BroadcastFromStreamLoop 从Stream中读取消息并发送给客户端
func BroadcastFromStreamLoop() {
defer func() {
if err := recover(); err != nil {
log.Printf("stream群聊消息出现panic... %v\n%s", err, debug.Stack())
}
}()
for {
result, err := RDB.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName1,
Streams: []string{streamNameChat, ">"},
Count: 10,
Block: 0,
}).Result()
if err != nil {
if err == redis.Nil { // 查询到不存在的key或超时未收到消息
continue
}
log.Printf("读取stream消息失败: %v", err)
return
}
for _, stream := range result {
for _, message := range stream.Messages {
var chatMsg ChatMessage
err := json.Unmarshal([]byte(message.Values["message"].(string)), &chatMsg)
if err != nil {
log.Printf("消息反序列化失败: %v", err)
continue
}
switch chatMsg.Type {
case MessageTypePublic:
broadcastToAll(fmt.Sprintf("[群聊] %s: %s", chatMsg.Username, chatMsg.Message))
}
RDB.XAck(context.Background(), streamNameChat, groupName, message.ID)
}
}
}
}
更多推荐



所有评论(0)