🏆本文收录于《滚雪球学SpringBoot 3》,专门攻坚指数提升,本年度国内最系统+最专业+最详细(永久更新)。
  
本专栏致力打造最硬核 SpringBoot3 从零基础到进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中…欢迎大家订阅持续学习。 如果想快速定位学习,可以看这篇【SpringBoot3教程导航帖】,你想学习的都被收集在内,快速投入学习!!两不误。
  
若还想学习更多,可直接前往《滚雪球学SpringBoot(全版本合集)》,涵盖SpringBoot所有版本教学文章。

演示环境说明:

  • 开发工具:IDEA 2021.3
  • JDK版本: JDK 17(推荐使用 JDK 17 或更高版本,因为 Spring Boot 3.x 系列要求 Java 17,Spring Boot 3.5.4 基于 Spring Framework 6.x 和 Jakarta EE 9,它们都要求至少 JDK 17。)
  • Spring Boot版本:3.5.4(于25年7月24日发布)
  • Maven版本:3.8.2 (或更高)
  • Gradle:(如果使用 Gradle 构建工具的话):推荐使用 Gradle 7.5 或更高版本,确保与 JDK 17 兼容。
  • 操作系统:Windows 11

前言:我承认,我以前也“消息传着传着就丢了”

说出来有点丢人:我早年做微服务集成时,最喜欢的“架构风格”叫——能跑就行。A 服务发个 HTTP 给 B,B 再回调 C,C 再去戳 MQ……链路像一盘面条,谁打的结都不知道。
  
直到某天线上出现一句让我头皮发麻的反馈:“订单状态偶尔卡在‘已支付’不动了。”

我当时第一反应是:不可能吧?日志明明写了“已发送消息”。
第二反应是:完了,日志写了不代表消息真走完

后来我才真正开始认真用 Spring Boot 3.x + Spring Integration 6.x 来做企业集成:不是为了“用新技术装一下”,而是为了把消息流转这件事从“玄学”变成“工程”。而 Spring Integration 背后的那套 EIP(Enterprise Integration Patterns,企业集成模式),就是把“面条”拽回“管道”的那根绳。

1. EIP 在微服务里到底怎么用?别背概念,先讲人话

微服务里最常见的集成需求是什么?我粗暴总结三类(你看是不是很像你每天在写的那些活):

  1. 异步解耦:下单后发消息给库存、积分、短信、风控……别都挤在一个事务里。
  2. 协议适配:上游 HTTP,下游 MQTT/AMQP/Kafka,格式还各不相同。
  3. 路由与编排:不同业务走不同分支;失败重试;落库;补偿;聚合。

EIP 的价值就在于:它把这些需求拆成一组“积木”,每块积木都很明确——过滤(Filter)、转换(Transformer)、路由(Router)、拆分/聚合(Splitter/Aggregator)、网关(Gateway)……你不需要自己发明一套“消息管道哲学”,直接按模式搭就行。

Spring Integration 的 Java DSL 本质上就是把这些 EIP 积木,用更顺手的 Builder/Lambda 方式拼起来,让你把消息流“写成一条线”,而不是散在 20 个类里互相找对象。

2. “函数式端点定义”:别被词吓到,其实就是——少写类,多写流

你看到“函数式端点”这几个字,可能会联想到 Spring Cloud Function。可以,但不必只盯着它。
在 Spring Integration 6.x 里,“函数式”的核心体验其实是:端点逻辑更倾向用 lambda 内联表达,把“处理逻辑”贴在“消息路径”旁边——读起来就像在看一条流水线。

下面我用一个小而完整的例子演示:

  • 输入:订单事件(JSON)
  • 处理:校验 → 转换 → 路由(大额走风控,小额直达)
  • 输出:发往 AMQP(RabbitMQ)不同队列

2.1 依赖(Spring Boot 3.x + Spring Integration 6.x + AMQP)

Spring Boot 对 AMQP(RabbitMQ)提供 starter 与自动配置支持。
Spring Integration 的 AMQP 适配器/网关构建在 Spring AMQP 之上。

Gradle(示例)

dependencies {
  implementation "org.springframework.boot:spring-boot-starter-integration"
  implementation "org.springframework.integration:spring-integration-amqp"
  implementation "org.springframework.boot:spring-boot-starter-amqp"
  implementation "com.fasterxml.jackson.core:jackson-databind"
}

2.2 订单消息流(IntegrationFlow):EIP 一条龙

package com.example.demo;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;

import java.math.BigDecimal;
import java.util.Map;

@Configuration
public class OrderIntegrationConfig {

  record OrderEvent(String orderId, BigDecimal amount, String userId) {}

  @Bean
  ObjectMapper objectMapper() {
    return new ObjectMapper();
  }

  /**
   * 入站:从 AMQP 队列收订单事件
   * 出站:根据金额路由到不同队列
   */
  @Bean
  IntegrationFlow orderFlow(ConnectionFactory connectionFactory, ObjectMapper om) {
    return IntegrationFlow
      // Inbound Channel Adapter:RabbitMQ 消息驱动方式接入
      .from(Amqp.inboundAdapter(connectionFactory, "order.events"))
      // 给自己留条命:先打个“轻量日志”,别一上来就疯狂 print 大对象
      .wireTap(f -> f.handle(m -> System.out.println("[tap] got messageId=" + m.getHeaders().getId())))
      // Transformer:JSON -> OrderEvent
      .transform(String.class, payload -> {
        try {
          return om.readValue(payload, OrderEvent.class);
        } catch (Exception e) {
          throw new IllegalArgumentException("Invalid order payload: " + payload, e);
        }
      })
      // Filter:过滤掉明显不合法的订单(别让垃圾数据污染下游)
      .filter(OrderEvent.class, o -> o.amount() != null && o.amount().compareTo(BigDecimal.ZERO) > 0,
        spec -> spec.discardChannel("order.discard"))
      // Router:按金额分流
      .route(OrderEvent.class, o -> o.amount().compareTo(new BigDecimal("1000")) >= 0 ? "RISK" : "NORMAL",
        r -> r
          .channelMapping("RISK", "order.risk.channel")
          .channelMapping("NORMAL", "order.normal.channel"))
      .get();
  }

  @Bean
  IntegrationFlow riskOutbound(ConnectionFactory cf) {
    return IntegrationFlow.from("order.risk.channel")
      // Outbound Channel Adapter:发往风控队列
      .handle(Amqp.outboundAdapter(cf).routingKey("order.risk"))
      .get();
  }

  @Bean
  IntegrationFlow normalOutbound(ConnectionFactory cf) {
    return IntegrationFlow.from("order.normal.channel")
      .handle(Amqp.outboundAdapter(cf).routingKey("order.normal"))
      .get();
  }

  /**
   * 丢弃通道:你可以接告警、落库、或发 DLQ
   */
  @Bean
  IntegrationFlow discardFlow() {
    return IntegrationFlow.from("order.discard")
      .handle(m -> System.out.println("[discard] " + m.getPayload()))
      .get();
  }
}

这段代码“函数式”的味道在哪?

  • 端点逻辑(transform/filter/route)都用 lambda 写在流里
  • EIP 模式一眼可见:变换、过滤、路由、出站适配
  • 读起来像管道,而不是“到处跳转找 handler”

而且 AMQP 适配器这套能力是 Spring Integration 官方支持的典型路径:入站/出站 Channel Adapter、网关、请求-响应等。

3. MQTT 与 AMQP 在 Spring Boot 3 的集成更新:别踩坑,我先替你踩

3.1 AMQP(RabbitMQ):Boot 3 的“便利”不是白给的

Spring Boot 3 对 RabbitMQ 的便利,最实在的就是:

  • 引入 spring-boot-starter-amqp,自动给你配 RabbitTemplateCachingConnectionFactoryAmqpAdmin 等(条件满足才创建)。

你要做的通常就两件:

  1. spring.rabbitmq.*
  2. 在 IntegrationFlow 里用 Amqp.inboundAdapter / outboundAdapter 接上去

如果你在做“更像业务函数”的事件驱动,也可以走 Spring Cloud Stream 的 RabbitMQ binder,它默认复用 Boot 的 ConnectionFactory,并支持同样的 spring.rabbitmq 配置前缀。

我个人的经验(带点情绪):
“能用 IntegrationFlow 把业务流写清楚的时候,就别让注解监听把逻辑散一地。”
注解监听不坏,但“可读性”经常被低估;半年后你回来看,真的会想把当时的自己揪出来谈谈人生。

3.2 MQTT:6.5 之后一个容易忽略的变化——Paho v3 变 optional 了

如果你用 Spring Integration 的 MQTT 模块,它当前实现基于 Eclipse Paho MQTT Client。

重点来了:从 Spring Integration 6.5 开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依赖变成 optional,也就是说——

你不显式引入它,项目可能能编译,但运行时就给你整“惊喜”。

所以 MQTT(v3)你要额外加依赖(Maven 示例):

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

<!-- 显式引入 Paho v3(Spring Integration 6.5+ 需要你自己带上) -->
<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>

3.3 MQTT 入站 + 出站:最小可用流(Boot 3 里照样很“丝滑”)

Spring Integration MQTT 提供入站和出站通道适配器,并推荐通过 DefaultMqttPahoClientFactory + MqttConnectOptions 配置连接。

package com.example.demo;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.dsl.Mqtt;

@Configuration
public class MqttIntegrationConfig {

  @Bean
  MqttPahoClientFactory mqttClientFactory() {
    var factory = new DefaultMqttPahoClientFactory();
    var options = new MqttConnectOptions();
    options.setServerURIs(new String[]{"tcp://localhost:1883"});
    options.setAutomaticReconnect(true);
    options.setCleanSession(true);
    factory.setConnectionOptions(options);
    return factory;
  }

  @Bean
  IntegrationFlow mqttInbound(MqttPahoClientFactory factory) {
    return IntegrationFlow
      .from(Mqtt.inboundAdapter(factory, "boot3-si6x-in", "devices/+/telemetry")
        .qos(1))
      .handle(m -> {
        // 这里你可以做 Transformer / Router / Persist 等
        System.out.println("[mqtt-in] topic=" + m.getHeaders().get("mqtt_receivedTopic")
          + " payload=" + m.getPayload());
      })
      .get();
  }

  @Bean
  DirectChannel mqttOut() {
    return new DirectChannel();
  }

  @Bean
  IntegrationFlow mqttOutbound(MqttPahoClientFactory factory) {
    return IntegrationFlow.from(mqttOut())
      .handle(Mqtt.outboundAdapter(factory, "boot3-si6x-out")
        .async(true)
        .defaultTopic("devices/command"))
      .get();
  }
}

这套写法的“人类友好点”在于:

  • MQTT 进来后你马上就能接 EIP 积木继续加工
  • 出站也是一个 handler,不用你自己手写 client publish
  • 你想做“设备消息 → 风控 → 告警 → 落库 → 回指令”,写成一条流就很顺

4. 可观测性:消息流转里最该被“照亮”的,其实是这些地方

我见过太多系统“有监控”,但监控只盯 CPU、内存、QPS。

消息系统真正会让你熬夜的,往往是:

  • 消息在哪个端点堆积?
  • 哪个 handler 变慢了?
  • 重试到底重试了几次?
  • 某个用户的一次请求跨服务走了哪些消息路径?

这就落到可观测性三件套:日志、指标、链路追踪。Spring Boot 的可观测性基座是 Micrometer Observation,你可以通过注入 ObservationRegistry 创建/传播观测事件,最终落到 metrics 和 traces。

而 Spring Integration 从 6.0 开始也基于 Micrometer Observation 抽象来做度量与追踪支持。

4.1 Spring Integration 里的观测点:它真的在“帮你记账”

官方文档说得很直白:有 MeterRegistry 会注册计时器/计数器;而从 6.0 起进一步引入 Observation 来统一 metrics + tracing。

同时,你还可以通过管理配置把 ObservationRegistry 注入到 Integration 管理组件里(这是 API 层面明确支持的)。

4.2 一个“能落地”的做法:给关键 handler 加 Observation(别全链路都打,别把自己打死)

下面示例展示:在订单处理的关键环节包一层 observation——你能在 tracing 系统里看到更清晰的 span/事件边界(具体落地到 OTel、Zipkin、Wavefront 等看你环境)。

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;

@Configuration
public class ObservedFlowConfig {

  @Bean
  IntegrationFlow observedStage(ObservationRegistry registry) {
    return IntegrationFlow.from("order.normal.channel")
      .handle(msg -> Observation
        .createNotStarted("eip.order.enrich", registry)
        .lowCardinalityKeyValue("stage", "enrich")
        .observe(() -> {
          // 你真正的业务逻辑:查库、补字段、生成下游事件……
          // 注意:别在这里做超重 IO 又不设超时,否则观测只会告诉你“慢”,但救不了你
          System.out.println("[enrich] " + msg.getPayload());
        })
      )
      .get();
  }
}

我强烈建议你把观测点打在这三类位置(收益最大、噪声最小):

  1. 入站适配器之后:确认“消息进来了”
  2. 关键路由点:确认“走了哪条分支”
  3. 出站之前:确认“准备发出去”

别啥都打。真的,啥都打的结果通常是:你看到一堆 span,但依然不知道问题在哪——因为你把“信号”稀释成“噪声”了。

5. 把四个点串起来:一条“可维护”的微服务消息管道长什么样?

你可以把它想象成这样一条线(不是画饼,是工程套路):

  1. AMQP / MQTT 入站:协议进来先落到消息通道
  2. EIP 加工:transform/filter/route/split/aggregate/retry
  3. 函数式端点:用 lambda 把端点逻辑贴着流写,读起来像流水线
  4. 可观测性覆盖关键节点:Micrometer Observation + Integration 的 Observation 支持
  5. AMQP / MQTT 出站:统一出口,必要时做 DLQ、补偿、幂等

这套做下来,你会发现一个很“反直觉”的变化:

你写的类可能更少了,但系统反而更稳了。
因为复杂度不再藏在“散落的代码角落”,而是被摆在“消息流图”上了。

结语:最后送你一句我真心吃过亏才悟到的话

微服务时代最贵的不是“写代码”,而是“把系统解释清楚”。

Spring Integration + EIP 的价值,就在于它逼着你用一套大家都懂的模式,把消息从哪来、怎么走、到哪去讲明白。

如果还要继续深究,那么还可以往如下两个方面继续专研:

  • 幂等 + 重试 + DLQ 的组合拳:怎么避免“重试把下游打爆”、怎么让失败可回放。

  • Splitter/Aggregator 在微服务里的正确用法:拆分并行处理、聚合超时策略、部分失败怎么兜底。

🧧福利赠与你🧧

  无论你是计算机专业的学生,还是对编程有兴趣的小伙伴,都建议直接毫无顾忌的学习此专栏「滚雪球学SpringBoot」,bug菌郑重承诺,凡是学习此专栏的同学,均能获取到所需的知识和技能,全网最快速入门SpringBoot,就像滚雪球一样,越滚越大, 无边无际,指数级提升。

最后,如果这篇文章对你有所帮助,帮忙给作者来个一键三连,关注、点赞、收藏,您的支持就是我坚持写作最大的动力。

同时欢迎大家关注公众号:「猿圈奇妙屋」 ,以便学习更多同类型的技术文章,免费白嫖最新BAT互联网公司面试题、4000G PDF编程电子书、简历模板、技术文章Markdown文档等海量资料。

ps:本文涉及所有源代码,均已上传至Gitee开源,供同学们一对一参考 Gitee传送门,同时,原创开源不易,欢迎给个star🌟,想体验下被🌟的感jio,非常感谢❗

🫵 Who am I?

我是 bug菌:

  • 热活跃于 CSDN | 掘金 | InfoQ | 51CTO | 华为云 | 阿里云 | 腾讯云 等技术社区;
  • CSDN 博客之星 Top30、华为云多年度十佳博主&卓越贡献奖、掘金多年度人气作者 Top40;
  • 掘金、InfoQ、51CTO 等平台签约及优质作者;
  • 全网粉丝累计 30w+

更多高质量技术内容及成长资料,可查看这个合集入口 👉 点击查看 👈️
硬核技术公众号 「猿圈奇妙屋」 期待你的加入,一起进阶、一起打怪升级。

- End -

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐