【SpringBoot 3.x 第72节】都微服务了,你还在写一堆 if-else 串消息?那你集成到底图啥?
🏆本文收录于《滚雪球学SpringBoot 3》,专门攻坚指数提升,本年度国内最系统+最专业+最详细(永久更新)。 本专栏致力打造最硬核 SpringBoot3 从零基础到进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中…欢迎大家订阅持续学习。 如果想快速定位学习,可以看这篇【SpringBoot3教程导航帖】,你想学习的都被收集在内,快速投入学习!!两不误。 若还想学习
🏆本文收录于《滚雪球学SpringBoot 3》,专门攻坚指数提升,本年度国内最系统+最专业+最详细(永久更新)。
本专栏致力打造最硬核 SpringBoot3 从零基础到进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中…欢迎大家订阅持续学习。 如果想快速定位学习,可以看这篇【SpringBoot3教程导航帖】,你想学习的都被收集在内,快速投入学习!!两不误。
若还想学习更多,可直接前往《滚雪球学SpringBoot(全版本合集)》,涵盖SpringBoot所有版本教学文章。
演示环境说明:
- 开发工具:IDEA 2021.3
- JDK版本: JDK 17(推荐使用 JDK 17 或更高版本,因为 Spring Boot 3.x 系列要求 Java 17,Spring Boot 3.5.4 基于 Spring Framework 6.x 和 Jakarta EE 9,它们都要求至少 JDK 17。)
- Spring Boot版本:3.5.4(于25年7月24日发布)
- Maven版本:3.8.2 (或更高)
- Gradle:(如果使用 Gradle 构建工具的话):推荐使用 Gradle 7.5 或更高版本,确保与 JDK 17 兼容。
- 操作系统:Windows 11
全文目录:
前言:我承认,我以前也“消息传着传着就丢了”
说出来有点丢人:我早年做微服务集成时,最喜欢的“架构风格”叫——能跑就行。A 服务发个 HTTP 给 B,B 再回调 C,C 再去戳 MQ……链路像一盘面条,谁打的结都不知道。
直到某天线上出现一句让我头皮发麻的反馈:“订单状态偶尔卡在‘已支付’不动了。”
我当时第一反应是:不可能吧?日志明明写了“已发送消息”。
第二反应是:完了,日志写了不代表消息真走完。
后来我才真正开始认真用 Spring Boot 3.x + Spring Integration 6.x 来做企业集成:不是为了“用新技术装一下”,而是为了把消息流转这件事从“玄学”变成“工程”。而 Spring Integration 背后的那套 EIP(Enterprise Integration Patterns,企业集成模式),就是把“面条”拽回“管道”的那根绳。
1. EIP 在微服务里到底怎么用?别背概念,先讲人话
微服务里最常见的集成需求是什么?我粗暴总结三类(你看是不是很像你每天在写的那些活):
- 异步解耦:下单后发消息给库存、积分、短信、风控……别都挤在一个事务里。
- 协议适配:上游 HTTP,下游 MQTT/AMQP/Kafka,格式还各不相同。
- 路由与编排:不同业务走不同分支;失败重试;落库;补偿;聚合。
EIP 的价值就在于:它把这些需求拆成一组“积木”,每块积木都很明确——过滤(Filter)、转换(Transformer)、路由(Router)、拆分/聚合(Splitter/Aggregator)、网关(Gateway)……你不需要自己发明一套“消息管道哲学”,直接按模式搭就行。
Spring Integration 的 Java DSL 本质上就是把这些 EIP 积木,用更顺手的 Builder/Lambda 方式拼起来,让你把消息流“写成一条线”,而不是散在 20 个类里互相找对象。
2. “函数式端点定义”:别被词吓到,其实就是——少写类,多写流
你看到“函数式端点”这几个字,可能会联想到 Spring Cloud Function。可以,但不必只盯着它。
在 Spring Integration 6.x 里,“函数式”的核心体验其实是:端点逻辑更倾向用 lambda 内联表达,把“处理逻辑”贴在“消息路径”旁边——读起来就像在看一条流水线。
下面我用一个小而完整的例子演示:
- 输入:订单事件(JSON)
- 处理:校验 → 转换 → 路由(大额走风控,小额直达)
- 输出:发往 AMQP(RabbitMQ)不同队列
2.1 依赖(Spring Boot 3.x + Spring Integration 6.x + AMQP)
Spring Boot 对 AMQP(RabbitMQ)提供 starter 与自动配置支持。
Spring Integration 的 AMQP 适配器/网关构建在 Spring AMQP 之上。
Gradle(示例)
dependencies {
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation "org.springframework.integration:spring-integration-amqp"
implementation "org.springframework.boot:spring-boot-starter-amqp"
implementation "com.fasterxml.jackson.core:jackson-databind"
}
2.2 订单消息流(IntegrationFlow):EIP 一条龙
package com.example.demo;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import java.math.BigDecimal;
import java.util.Map;
@Configuration
public class OrderIntegrationConfig {
record OrderEvent(String orderId, BigDecimal amount, String userId) {}
@Bean
ObjectMapper objectMapper() {
return new ObjectMapper();
}
/**
* 入站:从 AMQP 队列收订单事件
* 出站:根据金额路由到不同队列
*/
@Bean
IntegrationFlow orderFlow(ConnectionFactory connectionFactory, ObjectMapper om) {
return IntegrationFlow
// Inbound Channel Adapter:RabbitMQ 消息驱动方式接入
.from(Amqp.inboundAdapter(connectionFactory, "order.events"))
// 给自己留条命:先打个“轻量日志”,别一上来就疯狂 print 大对象
.wireTap(f -> f.handle(m -> System.out.println("[tap] got messageId=" + m.getHeaders().getId())))
// Transformer:JSON -> OrderEvent
.transform(String.class, payload -> {
try {
return om.readValue(payload, OrderEvent.class);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid order payload: " + payload, e);
}
})
// Filter:过滤掉明显不合法的订单(别让垃圾数据污染下游)
.filter(OrderEvent.class, o -> o.amount() != null && o.amount().compareTo(BigDecimal.ZERO) > 0,
spec -> spec.discardChannel("order.discard"))
// Router:按金额分流
.route(OrderEvent.class, o -> o.amount().compareTo(new BigDecimal("1000")) >= 0 ? "RISK" : "NORMAL",
r -> r
.channelMapping("RISK", "order.risk.channel")
.channelMapping("NORMAL", "order.normal.channel"))
.get();
}
@Bean
IntegrationFlow riskOutbound(ConnectionFactory cf) {
return IntegrationFlow.from("order.risk.channel")
// Outbound Channel Adapter:发往风控队列
.handle(Amqp.outboundAdapter(cf).routingKey("order.risk"))
.get();
}
@Bean
IntegrationFlow normalOutbound(ConnectionFactory cf) {
return IntegrationFlow.from("order.normal.channel")
.handle(Amqp.outboundAdapter(cf).routingKey("order.normal"))
.get();
}
/**
* 丢弃通道:你可以接告警、落库、或发 DLQ
*/
@Bean
IntegrationFlow discardFlow() {
return IntegrationFlow.from("order.discard")
.handle(m -> System.out.println("[discard] " + m.getPayload()))
.get();
}
}
这段代码“函数式”的味道在哪?
- 端点逻辑(transform/filter/route)都用 lambda 写在流里
- EIP 模式一眼可见:变换、过滤、路由、出站适配
- 读起来像管道,而不是“到处跳转找 handler”
而且 AMQP 适配器这套能力是 Spring Integration 官方支持的典型路径:入站/出站 Channel Adapter、网关、请求-响应等。
3. MQTT 与 AMQP 在 Spring Boot 3 的集成更新:别踩坑,我先替你踩
3.1 AMQP(RabbitMQ):Boot 3 的“便利”不是白给的
Spring Boot 3 对 RabbitMQ 的便利,最实在的就是:
- 引入
spring-boot-starter-amqp,自动给你配RabbitTemplate、CachingConnectionFactory、AmqpAdmin等(条件满足才创建)。
你要做的通常就两件:
- 配
spring.rabbitmq.* - 在 IntegrationFlow 里用
Amqp.inboundAdapter / outboundAdapter接上去
如果你在做“更像业务函数”的事件驱动,也可以走 Spring Cloud Stream 的 RabbitMQ binder,它默认复用 Boot 的 ConnectionFactory,并支持同样的 spring.rabbitmq 配置前缀。
我个人的经验(带点情绪):
“能用 IntegrationFlow 把业务流写清楚的时候,就别让注解监听把逻辑散一地。”
注解监听不坏,但“可读性”经常被低估;半年后你回来看,真的会想把当时的自己揪出来谈谈人生。
3.2 MQTT:6.5 之后一个容易忽略的变化——Paho v3 变 optional 了
如果你用 Spring Integration 的 MQTT 模块,它当前实现基于 Eclipse Paho MQTT Client。
重点来了:从 Spring Integration 6.5 开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依赖变成 optional,也就是说——
你不显式引入它,项目可能能编译,但运行时就给你整“惊喜”。
所以 MQTT(v3)你要额外加依赖(Maven 示例):
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- 显式引入 Paho v3(Spring Integration 6.5+ 需要你自己带上) -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
3.3 MQTT 入站 + 出站:最小可用流(Boot 3 里照样很“丝滑”)
Spring Integration MQTT 提供入站和出站通道适配器,并推荐通过
DefaultMqttPahoClientFactory+MqttConnectOptions配置连接。
package com.example.demo;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.dsl.Mqtt;
@Configuration
public class MqttIntegrationConfig {
@Bean
MqttPahoClientFactory mqttClientFactory() {
var factory = new DefaultMqttPahoClientFactory();
var options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://localhost:1883"});
options.setAutomaticReconnect(true);
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
IntegrationFlow mqttInbound(MqttPahoClientFactory factory) {
return IntegrationFlow
.from(Mqtt.inboundAdapter(factory, "boot3-si6x-in", "devices/+/telemetry")
.qos(1))
.handle(m -> {
// 这里你可以做 Transformer / Router / Persist 等
System.out.println("[mqtt-in] topic=" + m.getHeaders().get("mqtt_receivedTopic")
+ " payload=" + m.getPayload());
})
.get();
}
@Bean
DirectChannel mqttOut() {
return new DirectChannel();
}
@Bean
IntegrationFlow mqttOutbound(MqttPahoClientFactory factory) {
return IntegrationFlow.from(mqttOut())
.handle(Mqtt.outboundAdapter(factory, "boot3-si6x-out")
.async(true)
.defaultTopic("devices/command"))
.get();
}
}
这套写法的“人类友好点”在于:
- MQTT 进来后你马上就能接 EIP 积木继续加工
- 出站也是一个 handler,不用你自己手写 client publish
- 你想做“设备消息 → 风控 → 告警 → 落库 → 回指令”,写成一条流就很顺
4. 可观测性:消息流转里最该被“照亮”的,其实是这些地方
我见过太多系统“有监控”,但监控只盯 CPU、内存、QPS。
消息系统真正会让你熬夜的,往往是:
- 消息在哪个端点堆积?
- 哪个 handler 变慢了?
- 重试到底重试了几次?
- 某个用户的一次请求跨服务走了哪些消息路径?
这就落到可观测性三件套:日志、指标、链路追踪。Spring Boot 的可观测性基座是 Micrometer Observation,你可以通过注入 ObservationRegistry 创建/传播观测事件,最终落到 metrics 和 traces。
而 Spring Integration 从 6.0 开始也基于 Micrometer Observation 抽象来做度量与追踪支持。
4.1 Spring Integration 里的观测点:它真的在“帮你记账”
官方文档说得很直白:有 MeterRegistry 会注册计时器/计数器;而从 6.0 起进一步引入 Observation 来统一 metrics + tracing。
同时,你还可以通过管理配置把 ObservationRegistry 注入到 Integration 管理组件里(这是 API 层面明确支持的)。
4.2 一个“能落地”的做法:给关键 handler 加 Observation(别全链路都打,别把自己打死)
下面示例展示:在订单处理的关键环节包一层 observation——你能在 tracing 系统里看到更清晰的 span/事件边界(具体落地到 OTel、Zipkin、Wavefront 等看你环境)。
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
@Configuration
public class ObservedFlowConfig {
@Bean
IntegrationFlow observedStage(ObservationRegistry registry) {
return IntegrationFlow.from("order.normal.channel")
.handle(msg -> Observation
.createNotStarted("eip.order.enrich", registry)
.lowCardinalityKeyValue("stage", "enrich")
.observe(() -> {
// 你真正的业务逻辑:查库、补字段、生成下游事件……
// 注意:别在这里做超重 IO 又不设超时,否则观测只会告诉你“慢”,但救不了你
System.out.println("[enrich] " + msg.getPayload());
})
)
.get();
}
}
我强烈建议你把观测点打在这三类位置(收益最大、噪声最小):
- 入站适配器之后:确认“消息进来了”
- 关键路由点:确认“走了哪条分支”
- 出站之前:确认“准备发出去”
别啥都打。真的,啥都打的结果通常是:你看到一堆 span,但依然不知道问题在哪——因为你把“信号”稀释成“噪声”了。
5. 把四个点串起来:一条“可维护”的微服务消息管道长什么样?
你可以把它想象成这样一条线(不是画饼,是工程套路):
- AMQP / MQTT 入站:协议进来先落到消息通道
- EIP 加工:transform/filter/route/split/aggregate/retry
- 函数式端点:用 lambda 把端点逻辑贴着流写,读起来像流水线
- 可观测性覆盖关键节点:Micrometer Observation + Integration 的 Observation 支持
- AMQP / MQTT 出站:统一出口,必要时做 DLQ、补偿、幂等
这套做下来,你会发现一个很“反直觉”的变化:
你写的类可能更少了,但系统反而更稳了。
因为复杂度不再藏在“散落的代码角落”,而是被摆在“消息流图”上了。
结语:最后送你一句我真心吃过亏才悟到的话
微服务时代最贵的不是“写代码”,而是“把系统解释清楚”。
Spring Integration + EIP 的价值,就在于它逼着你用一套大家都懂的模式,把消息从哪来、怎么走、到哪去讲明白。
如果还要继续深究,那么还可以往如下两个方面继续专研:
-
幂等 + 重试 + DLQ 的组合拳:怎么避免“重试把下游打爆”、怎么让失败可回放。
-
Splitter/Aggregator 在微服务里的正确用法:拆分并行处理、聚合超时策略、部分失败怎么兜底。
🧧福利赠与你🧧
无论你是计算机专业的学生,还是对编程有兴趣的小伙伴,都建议直接毫无顾忌的学习此专栏「滚雪球学SpringBoot」,bug菌郑重承诺,凡是学习此专栏的同学,均能获取到所需的知识和技能,全网最快速入门SpringBoot,就像滚雪球一样,越滚越大, 无边无际,指数级提升。
最后,如果这篇文章对你有所帮助,帮忙给作者来个一键三连,关注、点赞、收藏,您的支持就是我坚持写作最大的动力。
同时欢迎大家关注公众号:「猿圈奇妙屋」 ,以便学习更多同类型的技术文章,免费白嫖最新BAT互联网公司面试题、4000G PDF编程电子书、简历模板、技术文章Markdown文档等海量资料。
ps:本文涉及所有源代码,均已上传至Gitee开源,供同学们一对一参考 Gitee传送门,同时,原创开源不易,欢迎给个star🌟,想体验下被🌟的感jio,非常感谢❗
🫵 Who am I?
我是 bug菌:
- 热活跃于 CSDN | 掘金 | InfoQ | 51CTO | 华为云 | 阿里云 | 腾讯云 等技术社区;
- CSDN 博客之星 Top30、华为云多年度十佳博主&卓越贡献奖、掘金多年度人气作者 Top40;
- 掘金、InfoQ、51CTO 等平台签约及优质作者;
- 全网粉丝累计 30w+。
更多高质量技术内容及成长资料,可查看这个合集入口 👉 点击查看 👈️
硬核技术公众号 「猿圈奇妙屋」 期待你的加入,一起进阶、一起打怪升级。
- End -
更多推荐

所有评论(0)