## SpringCloud Stream ### 1. 为什么会出现 SpringCloud Stream - 如果我们需要更换队列组件, 我们的代码就需要重写 - 解耦业务代码和中间件代码 ### 2. SpringCloud Stream 中的核心概念 -Spring Messaging 模块 - SpringMessaging 是 Spring Framework 中的一个模块, 其作用就是统一消息的编程模型 - Messaging 对应的模型只包括 Payload 和 Header ```java Producer --(send[Message])--> Message Channel --(receive)--> Consumer ``` - 负责与中间件交互的抽象绑定器: Binder - 通过 Binder 组件实现与外部消息系统通信, 屏蔽了底层中间件的使用细节 - 消息分类映射为通信信道, 可以为不同类的消息自定义通信信道 - 发送消息与接收消息的应用通信信道: Input, Output ```java Spring Cloud Stream Application Application Core | |\ | | (inputs) (outputs) | | |/ | Binder | Middleware ``` ### 3. SpringCloud Stream 应用模型 - 经典的 SpringCloud Stream 发布-订阅模型 - 作者参考 Kafka 的模型设计的 ```java Message | Topic(Exchange) | | 输入通道 输入通道 | | 订阅者-1 订阅者-2 ``` - Topic 可以认为就是 Kafka 中的 Topic 概念 - Producer 通过 Input 信道发布消息到 Topic 上 - Consumer 通过 Output 信道消费 Topic 上的消息 ### 4. SpringCloud Stream 消息分组和消费分区的配置与说明 ```java Message | Topic(Exchange) —————————|————————— | | 输入通道 输入通道 | | Service-A Service-B Service-A Service-B (GroupA) (GroupB) ``` - SpringCloud Stream 消费分组 - 消费分组: 应用多实例部署, 提供服务吞吐量, 且不重复消费消息 - 不指定默认是每一个实例都会有个生成的消费组, 所以消息会被消费很多次 - 我们尽量指定消费者组, 让其消息只消费一次 - 忘记指定消费者组的情况, 会发生消息不一致的问题 - SpringCloud Stream 消费者组模型 - 应用的不同实例放在一个消费者组中, 每一条消息只会被一个实例消费 - 消费者组的思想是通过多实例扩展服务吞吐量, 且不会造成消息的重复消费 - 默认会对不指定消费者组, 都会进行发送消费, 除非你有这样的场景, 要不然建议是指定的 --- - 消费分区 - 消费分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理 - 我们要确保指定的消息会被指定的消费者组的分区进行消费, 这样可以保证数据的先后和一致性 - 通过自定义提取Key, 和转发策略来进行实现 - 这样做可以为不支持消费分区的消息中间件也实现了这种功能的扩展 ````java HTTP --- | --> Partition1 --> Average Processor HTTP --- | --> Partition2 --> Average Processor HTTP --- ```` ### 5. TODO-list - 研究 Bus 消息总线的集合使用-实现消息的统一发送 - 补充 stream 消息持久化 - 补充 stream 监控 - 补充 stream 异常处理 - Spring Cloud Stream + RocketMQ 实现分布式事务 - RabbitMQ 的整合