Java实战篇18-消息推送系统(Spring Boot+WebSocket)
本文介绍了基于Spring Boot和WebSocket构建实时消息推送系统的完整方案。文章首先分析了传统HTTP协议的局限性,对比了WebSocket在双向通信、低延迟等方面的优势。通过序列图详细解析了WebSocket的握手过程,并提供了请求响应示例。随后讲解了Spring Boot集成WebSocket的具体实现步骤,包括配置类定义、端点实现以及核心注解(@ServerEndpoint、@O
👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长。
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕一个常见的开发话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!
文章目录
📢 消息推送系统(Spring Boot + WebSocket):让信息实时“飞”起来 ✈️
在当今的互联网应用中,实时性已成为用户体验的核心指标之一。无论是电商平台的订单状态更新、社交应用的聊天消息、在线教育的直播互动,还是企业后台的系统通知,用户都期望信息能够“秒到”,而不是手动刷新页面或等待邮件通知。
传统的 HTTP 请求-响应模式,由于其单向通信的特性,无法满足这种需求。开发者曾尝试使用轮询(Polling)、长轮询(Long Polling)甚至 Server-Sent Events(SSE)来模拟实时通信,但这些方案要么效率低下,要么功能受限。
而 WebSocket 的出现,彻底改变了这一局面。它提供了一个在单个 TCP 连接上进行全双工、双向通信的通道,使得服务器可以主动向客户端推送消息,真正实现了“实时”通信。
本文将带你从零开始,使用 Spring Boot 和 WebSocket 构建一个功能完整、性能稳定的消息推送系统。我们将涵盖理论基础、架构设计、代码实现、安全验证、集群部署等核心内容,并提供可运行的代码示例和流程图。
🔗 WebSocket 官方规范:https://tools.ietf.org/html/rfc6455
🔗 Spring 官方文档 - WebSocket:https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket
🌐 为什么选择 WebSocket?
✅ HTTP 的局限性
HTTP 协议是无状态、单向的。客户端发起请求,服务器返回响应,连接随即关闭。这导致了以下问题:
- ❌ 服务器无法主动推送:必须由客户端发起请求,服务器才能响应。
- ❌ 频繁连接开销大:轮询方式会不断建立和关闭连接,消耗大量资源。
- ❌ 头部信息冗余:每次请求都携带大量 HTTP 头,对小数据包传输不友好。
✅ WebSocket 的优势
WebSocket 协议通过一次 HTTP 握手,将连接“升级”为持久化的双向通道:
- ✅ 全双工通信:客户端和服务器可以同时发送和接收消息。
- ✅ 低延迟:消息直接在长连接上传输,无需重复建立连接。
- ✅ 轻量级:数据帧头部开销小,适合高频、小数据量通信。
- ✅ 跨平台:浏览器、移动端、服务端均可支持。
🧩 WebSocket 的连接过程
✅ 握手流程图
✅ 握手请求示例
GET /ws/chat HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://localhost:3000
✅ 握手响应示例
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
一旦收到 101
响应,连接即升级为 WebSocket,后续通信将使用 WebSocket 数据帧。
🛠️ 环境准备
✅ Spring Boot 项目搭建
使用 Spring Initializr 创建项目,添加以下依赖:
spring-boot-starter-web
spring-boot-starter-websocket
✅ Maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 前端模板(可选) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
</dependencies>
🔗 Thymeleaf 官网:https://www.thymeleaf.org/
🔌 Spring Boot 集成 WebSocket
✅ 配置类
Spring Boot 需要一个 ServerEndpointExporter
Bean 来扫描和注册 WebSocket 端点。
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
📌 注意:如果你使用的是外部 Tomcat 容器,此配置可省略。
✅ WebSocket 端点实现
使用 @ServerEndpoint
注解定义 WebSocket 端点。
@Component
@ServerEndpoint("/ws/chat/{userId}")
public class ChatWebSocket {
// 存储所有在线会话
private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
// 在线人数统计
private static volatile int onlineCount = 0;
/**
* 连接建立时触发
*/
@OnOpen
public void onOpen(@PathParam("userId") String userId, Session session) {
sessions.put(userId, session);
addOnlineCount();
System.out.println("用户 " + userId + " 已连接!当前在线人数:" + getOnlineCount());
// 发送欢迎消息
try {
sendMessage(session, "欢迎加入聊天室!");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 收到客户端消息时触发
*/
@OnMessage
public void onMessage(String message, Session session, @PathParam("userId") String userId) {
System.out.println("来自用户 " + userId + " 的消息:" + message);
// 广播给所有在线用户
broadcastMessage(userId + ": " + message);
}
/**
* 连接关闭时触发
*/
@OnClose
public void onClose(@PathParam("userId") String userId) {
sessions.remove(userId);
subOnlineCount();
System.out.println("用户 " + userId + " 已断开!当前在线人数:" + getOnlineCount());
}
/**
* 发生错误时触发
*/
@OnError
public void onError(Session session, Throwable error) {
System.err.println("WebSocket 发生错误");
error.printStackTrace();
}
/**
* 发送消息给指定会话
*/
private void sendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(message);
}
/**
* 广播消息给所有在线用户
*/
private void broadcastMessage(String message) {
sessions.values().forEach(session -> {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
});
}
// 在线人数相关方法
public static synchronized int getOnlineCount() {
return onlineCount;
}
private static synchronized void addOnlineCount() {
WebSocketChat.onlineCount++;
}
private static synchronized void subOnlineCount() {
WebSocketChat.onlineCount--;
}
}
📌 核心注解说明:
@OnOpen
:连接建立时执行@OnMessage
:收到消息时执行@OnClose
:连接关闭时执行@OnError
:发生异常时执行
🖥️ 前端页面实现
✅ HTML 页面
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>WebSocket 聊天室</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
#messages { height: 300px; border: 1px solid #ddd; padding: 10px; overflow-y: scroll; }
#input { width: 80%; padding: 10px; }
button { padding: 10px 20px; }
</style>
</head>
<body>
<h1>WebSocket 聊天室 💬</h1>
<div>
<label>用户ID: <input type="text" id="userId" value="user1"></label>
<button onclick="connect()">连接</button>
<button onclick="disconnect()">断开</button>
</div>
<div id="messages"></div>
<div>
<input type="text" id="input" placeholder="输入消息..." onkeydown="if(event.key==='Enter')send()">
<button onclick="send()">发送</button>
</div>
<script>
let socket = null;
function connect() {
const userId = document.getElementById('userId').value;
const url = `ws://localhost:8080/ws/chat/${userId}`;
socket = new WebSocket(url);
socket.onopen = function(event) {
addMessage("系统:连接成功!");
};
socket.onmessage = function(event) {
addMessage("收到:" + event.data);
};
socket.onclose = function(event) {
addMessage("系统:连接已断开");
};
socket.onerror = function(event) {
addMessage("系统:连接发生错误");
};
}
function disconnect() {
if (socket) {
socket.close();
}
}
function send() {
const input = document.getElementById('input');
const message = input.value;
if (message && socket && socket.readyState === WebSocket.OPEN) {
socket.send(message);
addMessage("我:" + message);
input.value = '';
}
}
function addMessage(text) {
const messages = document.getElementById('messages');
const p = document.createElement('p');
p.textContent = text;
messages.appendChild(p);
messages.scrollTop = messages.scrollHeight;
}
</script>
</body>
</html>
🔐 用户身份验证与会话管理
在实际项目中,不能仅靠 URL 参数传递 userId
,存在安全风险。我们需要在握手阶段进行身份验证。
✅ 自定义握手配置
@Configuration
public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
// 从请求头或查询参数中获取 token
List<String> tokens = request.getHeaders().get("Authorization");
if (tokens != null && !tokens.isEmpty()) {
String token = tokens.get(0);
// 验证 token,解析出 userId
String userId = validateToken(token);
if (userId != null) {
// 将用户信息存储到 config 的 userProperties 中
config.getUserProperties().put("userId", userId);
}
}
super.modifyHandshake(config, request, response);
}
private String validateToken(String token) {
// 模拟 token 验证逻辑
if ("valid-token-123".equals(token)) {
return "user123";
}
return null;
}
}
✅ 在端点中使用验证后的用户信息
@ServerEndpoint(value = "/ws/chat", configurator = GetHttpSessionConfig.class)
public class SecureChatWebSocket {
private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
String userId = (String) config.getUserProperties().get("userId");
if (userId == null) {
try {
session.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "未授权"));
return;
} catch (IOException e) {
e.printStackTrace();
}
}
sessions.put(userId, session);
System.out.println("用户 " + userId + " 已安全连接");
}
// 其他方法保持不变...
}
前端连接时需在请求头中携带 token:
const headers = new Headers();
headers.append('Authorization', 'valid-token-123');
const socket = new WebSocket('ws://localhost:8080/ws/chat', [], headers); // 注意:标准 WebSocket 不支持 headers
⚠️ 注意:原生 WebSocket API 不支持自定义请求头。解决方案:
- 使用查询参数传递 token:
ws://localhost:8080/ws/chat?token=xxx
- 使用 STOMP over WebSocket(推荐)
📡 单播、广播与群组消息
✅ 消息类型对比
类型 | 说明 | 示例 |
---|---|---|
单播 | 点对点消息 | A 发送给 B |
广播 | 发送给所有在线用户 | 系统公告 |
群组 | 发送给特定群组成员 | 聊天室、直播间 |
✅ 代码实现
// 单播:发送给指定用户
public void sendToUser(String userId, String message) {
Session session = sessions.get(userId);
if (session != null && session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 广播:发送给所有人
public void broadcast(String message) {
sessions.values().forEach(session -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
// 群组:假设有 groupMap 存储群组成员
private static final Map<String, Set<String>> groupMap = new HashMap<>();
public void sendToGroup(String groupId, String message) {
Set<String> members = groupMap.get(groupId);
if (members != null) {
members.forEach(userId -> sendToUser(userId, message));
}
}
🔄 使用 STOMP 提升开发体验
虽然原生 WebSocket API 功能强大,但开发复杂应用时略显繁琐。STOMP(Simple Text Oriented Messaging Protocol)是一个基于帧的轻量级消息协议,常与 WebSocket 结合使用,提供更高级的抽象,如主题(Topic)、队列(Queue)、消息确认等。
✅ 添加 STOMP 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
🔗 SockJS 官网:https://github.com/sockjs/sockjs-client
✅ 配置 STOMP Endpoints
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp") // STOMP 端点
.setAllowedOriginPatterns("*")
.withSockJS(); // 启用 SockJS 降级
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue"); // 订阅目的地前缀
registry.setApplicationDestinationPrefixes("/app"); // 应用目的地前缀
}
}
✅ STOMP 消息处理
@Controller
public class ChatController {
@MessageMapping("/chat.send") // 对应前端 send("/app/chat.send")
@SendTo("/topic/public") // 广播到 /topic/public
public ChatMessage send(ChatMessage message) {
return message;
}
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public ChatMessage addUser(@Payload ChatMessage message,
SimpMessageHeaderAccessor headerAccessor) {
headerAccessor.getSessionAttributes().put("username", message.getSender());
return message;
}
}
// 消息实体
public class ChatMessage {
private String sender;
private String content;
private String timestamp;
// getters and setters
}
✅ 前端 STOMP 客户端
<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
<script>
let stompClient = null;
function connect() {
const socket = new SockJS('/ws-stomp');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/public', onMessageReceived);
});
}
function onMessageReceived(payload) {
const message = JSON.parse(payload.body);
addMessage(message.sender + ": " + message.content);
}
function sendMessage() {
const message = {
sender: "Alice",
content: document.getElementById('input').value,
timestamp: new Date()
};
stompClient.send("/app/chat.send", {}, JSON.stringify(message));
}
</script>
📊 消息推送系统架构图
🚀 性能优化与集群部署
✅ 单机性能瓶颈
单个 WebSocket 服务实例能承载的连接数有限(通常几千到几万),且存在单点故障风险。
✅ 解决方案:集群 + 消息中间件
使用 Redis 作为消息代理,实现多实例间的通信。
✅ 集成 Redis
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest");
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp").withSockJS();
}
}
使用 RabbitMQ 或 ActiveMQ 作为 STOMP 消息代理,Redis 也可通过插件支持。
🔗 RabbitMQ STOMP 插件:https://www.rabbitmq.com/stomp.html
🛡️ 安全性最佳实践
- HTTPS/WSS:生产环境务必使用
wss://
加密连接。 - 身份验证:握手阶段验证用户身份(JWT、Session 等)。
- 消息校验:防止 XSS、SQL 注入等攻击。
- 连接限制:防止单用户建立过多连接。
- 日志监控:记录异常连接和消息。
📈 监控与运维
- 使用 Spring Boot Actuator 监控 WebSocket 会话数。
- 集成 Prometheus + Grafana 进行可视化监控。
- 设置告警规则,如连接数突增、消息积压等。
🎯 应用场景
- 💬 在线客服系统
- 📊 实时数据看板(股票、监控)
- 🎮 多人在线游戏
- 📢 系统通知与公告
- 📈 直播弹幕
🏁 结语
通过 Spring Boot 与 WebSocket 的结合,我们可以轻松构建出高性能、高可用的实时消息推送系统。无论是简单的聊天应用,还是复杂的金融交易系统,这套技术栈都能提供坚实的底层支持。
WebSocket 不仅仅是一个技术,它代表了一种实时、互动、连接的互联网新范式。掌握它,你就能让你的应用“活”起来,为用户带来前所未有的体验。
🔗 推荐阅读:
💬 让信息流动,让连接无界。
🚀 用 WebSocket,开启实时应用的新篇章!
🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨
更多推荐
所有评论(0)