[代码重构](master): 消息队列异步的实战

举例完成消息队列相关的逻辑
master
土豆兄弟 2 years ago
parent 4e1e6f0c41
commit b1a8c1657d

@ -1583,15 +1583,137 @@ return orderDO;
### 3.25 设计 - 异步处理好用,但非常容易用错 ### 3.25 设计 - 异步处理好用,但非常容易用错
- 异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。
- 区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:
- 服务于主流程的分支流程。
- 比如,在注册流程中,把数据写入数据库的操作是主流程但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不那么强,可以进行异步处理。
- 用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可
- 异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。
- 不过,异步处理虽然好用,但在实现的时候却有三个最容易犯的错
- 异步处理流程的可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题
#### A. 异步处理需要消息补偿闭环
- 使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、
传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可用,需要考虑不可用时异步流程如何继续进行。
- **对于异步处理流程,必须考虑补偿或者说建立主备双活流程**
- 我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程,会员服务收到消息后发送欢迎消息的流程为异步流程。
- ![异步消息补偿闭环](pic/异步消息补偿闭环.png)
- 分析一下:
- 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线代表异步调用);
- 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消息;
- 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水平
- 相关的代码实现
- **代码示例**:
- 首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。
- **参考代码**: com.baiye.demo.case25.compensation.UserController
- 然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息,并发送欢迎短信。我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,避免相同的用户进行补偿时重复发送短信:
- **参考代码**: com.baiye.demo.case25.compensation.MemberService
- 对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:
- MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。
- 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且考虑到高内聚,补偿 Job 本身不会做去重处理。
- 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故,
MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大量资金重复发放。
- 接下来,定义补偿 Job 也就是备线操作。
- 我们在 CompensationJob 中定义一个 @Scheduled 定时任务5 秒做一次补偿操作,因为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开
始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。
- **参考代码**: com.baiye.demo.case25.compensation.CompensationJob
- 为了实现高内聚主线和备线处理消息最好使用同一个方法。比如本例中MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方法。
- 此外值得一说的是Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进行加强:
- 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量。
- 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以方便和主线 MQ 实时流程错开,避免冲突。
- 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。
- 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统。
- 执行注册方法注册 10 个用户,输出如下:
- 总共 10 个用户MQ 发送成功的用户有四个,分别是用户 1、5、7、8。
- 补偿任务第一次运行,补偿了用户 2、3、4第二次运行补偿了用户 6、9第三次运行补充了用户 10。
- 最后提一下,针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。也就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性,但至少确保流程都能正常执行。
#### B. 注意消息模式是广播还是工作队列
- 异步处理的一个重要优势,是实现消息广播。
- 消息广播,和我们平时说的“广播”意思差不多,就是希望同一条消息,不同消费者都能分别消费;而队列模式,就是不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。
- 比如,同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听以发送新用户小礼物。但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个用户的消息,
可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务 1 和会员服务 2不管哪个实例来处理处理一次即可工作队列模式
- ![消息模式广播](pic/消息模式广播.png)
- 在实现代码的时候,我们务必确认 MQ 系统的机制,确保消息的路由按照我们的期望。
- 对于类似 RocketMQ 这样的 MQ 来说,实现类似功能比较简单直白:如果消费者属于一个组,那么消息只会由同一个组的一个消费者来消费;如果消费者属于不同组,那么每个组都能消费一遍消息。
- 而对于 RabbitMQ 来说,消息路由的模式采用的是队列 + 交换器,队列是消息的载体,交换器决定了消息路由到队列的方式,配置比较复杂,容易出错。
- 演示使用 RabbitMQ 实现广播模式和工作队列模式的坑。
- **第一步,实现会员服务监听用户服务发出的新用户注册消息的那部分逻辑**。
- 我们启动两个会员服务,那么同一个用户的注册消息应该只能被其中一个实例消费
- 分别实现 RabbitMQ 队列、交换器、绑定三件套。其中,队列用的是匿名队列,交换器用的是直接交换器 DirectExchange交换器绑定到匿名队列的路由 Key 是空字符串。
在收到消息之后,我们会打印所在实例使用的端口:
- **参考代码**: com.baiye.demo.case25.fanoutvswork.WorkQueueWrong
- 使用 12345 和 45678 两个端口启动两个程序实例后,调用 sendMessage 接口发送一条消息,输出的日志,显示**同一个会员服务两个实例都收到了消息**
- **出现这个问题的原因是,我们没有理清楚 RabbitMQ 直接交换器和队列的绑定关系**
- 如下图所示RabbitMQ 的直接交换器根据 routingKey 对消息进行路由。由于我们的程序每次启动都会创建匿名(随机命名)的队列,所以相当于每一个会员服务实例都对应独立的队列,
以空 routingKey 绑定到直接交换器。用户服务发出消息的时候也设置了routingKey 为空,所以直接交换器收到消息之后,发现有两条队列匹配,于是都转发了消息:
- ![消息模式广播1](pic/消息模式广播1.png)
- 要修复这个问题其实很简单,对于会员服务不要使用匿名队列,而是使用同一个队列即可。
把上面代码中的匿名队列替换为一个普通队列:
- **参考代码**: com.baiye.demo.case25.fanoutvswork.WorkQueueRight
- 测试发现,对于同一条消息来说,两个实例中只有一个实例可以收到,不同的消息按照轮询分发给不同的实例。现在,交换器和队列的关系是这样的:
- ![消息模式广播2](pic/消息模式广播2.png)
- **第二步,进一步完整实现用户服务需要广播消息给会员服务和营销服务的逻辑。**
- 我们希望会员服务和营销服务都可以收到广播消息,但会员服务或营销服务中的每个实例只需要收到一次消息。
- 我们声明了一个队列和一个广播交换器 FanoutExchange然后模拟两个用户服务和两个营销服务
- **参考代码**: com.baiye.demo.case25.fanoutvswork.FanoutQueueWrong
- 我们请求四次 sendMessage 接口,注册四个用户。通过日志可以发现,**一条用户注册的消息,要么被会员服务收到,要么被营销服务收到,显然这不是广播**。那我们使用的FanoutExchange看名字就应该是实现广播的交换器为什么根本没有起作用呢
- 其实,广播交换器非常简单,它会忽略 routingKey广播消息到所有绑定的队列。在这个案例中两个会员服务和两个营销服务都绑定了同一个队列所以这四个服务只能收到一次消息
- ![消息模式广播3](pic/消息模式广播3.png)
- 修改方式很简单,我们把队列进行拆分,会员和营销两组服务分别使用一条独立队列绑定到广播交换器即可:
- **参考代码**: com.baiye.demo.case25.fanoutvswork.FanoutQueueRight
- 现在,交换器和队列的结构是这样的:
- ![消息模式广播4](pic/消息模式广播4.png)
- 从日志输出可以验证,对于每一条 MQ 消息,会员服务和营销服务分别都会收到一次,一条消息广播到两个服务的同时,在每一个服务的两个实例中通过轮询接收:
- 理解了 RabbitMQ **直接交换器、广播交换器的工作方式**之后,我们对消息的路由方式了解得很清晰了,实现代码就不会出错
- 对于异步流程来说,消息路由模式一旦配置出错,轻则可能导致消息的重复处理,重则可能导致重要的服务无法接收到消息,最终造成业务逻辑错误。
- 每个 MQ 中间件对消息的路由处理的配置各不相同,我们一定要先了解原理再着手编码。
#### C. 别让死信堵塞了消息队列
- 使用消息队列处理异步流程的时候,我们要注意消息队列的任务堆积问题。对于突发流量引起的消息队列堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。**但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息**。
- 比如,用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因为用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在 MQ 中像幽灵一样回荡的同一条消息,就是死信。
- 随着 MQ 被越来越多的死信填满,消费者需要花费大量时间反复处理死信,导致正常消息的消费受阻,**最终 MQ 可能因为数据量过大而崩溃**。
- 我们来测试一下这个场景。首先,定义一个队列、一个直接交换器,然后把队列绑定到交换
- **参考代码**: com.baiye.demo.case25.deadletter.RabbitConfiguration.declarables
- 然后,实现一个 sendMessage 方法来发送消息到 MQ访问一次提交一条消息使用自增标识作为消息内容
- **参考代码**: com.baiye.demo.case25.deadletter.DeadLetterController.sendMessage
- 收到消息后,直接抛出空指针异常,模拟处理出错的情况:
- **参考代码**: com.baiye.demo.case25.deadletter.MQListener.handler
- 调用 sendMessage 接口发送两条消息,然后来到 RabbitMQ 管理台,可以看到这两条消息始终在队列中,不断被重新投递,导致重新投递 QPS 达到了 1063。
- ![消息模式广播5](pic/消息模式广播5.png)
- 同时,在日志中可以看到大量异常信息
- 解决死信无限重复进入队列最简单的方式是在程序处理出错的时候直接抛出AmqpRejectAndDontRequeueException 异常,避免消息重新进入队列:
- throw new AmqpRejectAndDontRequeueException("error");
- 但,我们更希望的逻辑是,对于同一条消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。对于来自死信队列的数据,我们可能只是记录日志发送报警,
即使出现异常也不会再重复投递。整个逻辑如下图所示:
- ![消息模式广播6](pic/消息模式广播6.png)
- 针对这个问题Spring AMQP 提供了非常方便的解决方案:
- 首先,定义死信交换器和死信队列。其实,这些都是普通的交换器和队列,只不过被我们专门用于处理死信消息。
- 然后,通过 RetryInterceptorBuilder 构建一个 RetryOperationsInterceptor用于处理失败时候的重试。这里的策略是最多尝试 5 次(重试 4 次);并且采取指数退避重试,首次重试延迟 1 秒,第二次 2 秒,以此类推,最大延迟是 10 秒
;如果第 4 次重试还是失败,则使用 RepublishMessageRecoverer 把消息重新投入一个“死信交换器”中。
- 最后,定义死信队列的处理程序。这个案例中,我们只是简单记录日志。
- **参考代码**: com.baiye.demo.case25.deadletter.RabbitConfiguration | com.baiye.demo.case25.deadletter.MQListener.deadHandler
- 执行程序,发送两条消息:
- msg1 的 4 次重试间隔分别是 1 秒、2 秒、4 秒、8 秒,再加上首次的失败,所以最大尝试次数是 5。
- 4 次重试后RepublishMessageRecoverer 把消息发往了死信交换器。
- 死信处理程序输出了 got dead message 日志。
- 这里需要尤其注意的一点是,虽然我们几乎同时发送了两条消息,但是 msg2 是在 msg1的四次重试全部结束后才开始处理。原因是默认情况下**SimpleMessageListenerContainer 只有一个消费线程**。可以通过增加消费线程来避免
性能问题,如下我们直接设置 concurrentConsumers 参数为 10来增加到 10 个工作线程:
- 当然,我们也可以设置 maxConcurrentConsumers 参数来让SimpleMessageListenerContainer 自己动态地调整消费者线程数。不过,我们需要特别注意它的动态开启新线程的策略。你可以通过官方文档,来了解这个策略
#### D. 总结
- 在使用异步处理这种架构模式的时候,我们一般都会使用 MQ 中间件配合实现异步流程,需要重点考虑四个方面的问题。
- 第一,要考虑异步流程丢消息或处理中断的情况,异步流程需要有备线进行补偿。比如,我们今天介绍的全量补偿方式,即便异步流程彻底失效,通过补偿也能让业务继续进行。
- 第二,异步处理的时候需要考虑消息重复的可能性,处理逻辑需要实现幂等,防止重复处理。
- 第三,微服务场景下不同服务多个实例监听消息的情况,一般不同服务需要同时收到相同的消息,而相同服务的多个实例只需要轮询接收消息。我们需要确认 MQ 的消息路由配置是否满足需求,以避免消息重复或漏发问题。
- 第四,要注意始终无法处理的死信消息,可能会引发堵塞 MQ 的问题。一般在遇到消息处理失败的时候,我们可以设置一定的重试策略。如果重试还是不行,那可以把这个消息扔到专有的死信队列特别处理,不要让死信影响到正常消息的处理
#### E. 补充
- 基于canal做mysql数据同步需要将解析好的数据发到kafka里
- 发现这么一个问题就是kafka多partition消费时不能保证消息的顺序消费进而导致mysql数据同步异常。
- 由于kafka可以保证在同一个partition内消息有序于是我自定义了一个分区器将数据的id取hashcode然后根据partition的数量取余作为分区号保证同一条数据的binlog能投递有序
- 在用户注册后发送消息到 MQ然后会员服务监听消息进行异步处理的场景下有些时候我们会发现虽然用户服务先保存数据再发送 MQ但会员服务收到消息后去查询数据库却发现数据库中还没有新用户的信息。
- 建立本地消息表来确保MQ消息可补偿把业务处理和保存MQ消息到本地消息表操作在相同事务内处理然后异步发送和补偿发送消息表中的消息到MQ
### 3.26 设计 - 数据存储NoSQL与RDBMS如何取长补短、相辅相成 ### 3.26 设计 - 数据存储NoSQL与RDBMS如何取长补短、相辅相成
- NoSQL 一般可以分为缓存数据库、时间序列数据库、全文搜索数据库、文档数据库、图数据库等。 - NoSQL 一般可以分为缓存数据库、时间序列数据库、全文搜索数据库、文档数据库、图数据库等。

@ -0,0 +1,5 @@
## 异步处理好用,但非常容易用错
- 异步处理需要消息补偿闭环compensation
- 注意消息模式是广播还是工作队列fanoutvswork
- 别让死信堵塞了消息队列deadletter
- 补充使用RMQ的DLX实现延迟重试rabbitmqdlx

@ -0,0 +1,14 @@
package com.baiye.demo.case25.compensation;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class CommonMistakesApplication {
public static void main(String[] args) {
SpringApplication.run(CommonMistakesApplication.class, args);
}
}

@ -0,0 +1,47 @@
package com.baiye.demo.case25.compensation;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class CompensationJob {
/**
* Job线
*/
private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
10, 10,
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
@Autowired
private UserService userService;
@Autowired
private MemberService memberService;
/**
* ID
*/
private long offset = 0;
/**
* 105
*/
@Scheduled(initialDelay = 10_000, fixedRate = 5_000)
public void compensationJob() {
log.info("开始从用户ID {} 补偿", offset);
// 获取从offset开始的用户
userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
compensationThreadPool.execute(() -> memberService.welcome(user));
offset = user.getId();
});
}
}

@ -0,0 +1,37 @@
package com.baiye.demo.case25.compensation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MemberService {
/**
*
*/
private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
//监听用户注册成功的消息,发送欢迎消息
@RabbitListener(queues = RabbitConfiguration.QUEUE)
public void listen(User user) {
log.info("receive mq user {}", user.getId());
welcome(user);
}
// 发送欢迎消息
public void welcome(User user) {
// 去重操作
if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
log.info("memberService: welcome new user {}", user.getId());
}
}
}

@ -0,0 +1,31 @@
package com.baiye.demo.case25.compensation;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
public static final String QUEUE = "newuserQueueCompensation";
public static final String EXCHANGE = "newuserExchangeCompensation";
public static final String ROUTING_KEY = "newuserRoutingCompensation";
//队列
@Bean
public Queue queue() {
return new Queue(QUEUE);
}
//交换器
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
}
//绑定
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();
}
}

@ -0,0 +1,16 @@
package com.baiye.demo.case25.compensation;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@Data
public class User implements Serializable {
private static AtomicLong atomicLonng = new AtomicLong();
private Long id = atomicLonng.incrementAndGet();
private String name = UUID.randomUUID().toString();
private LocalDateTime registerTime = LocalDateTime.now();
}

@ -0,0 +1,36 @@
package com.baiye.demo.case25.compensation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
@RestController
@Slf4j
@RequestMapping("user")
public class UserController {
@Autowired
private UserService userService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("register")
public void register() {
// 模拟10个用户注册
IntStream.rangeClosed(1, 10).forEach(i -> {
//落库
User user = userService.register();
// 模拟50%的消息可能发送失败
if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {
// 通过RabbitMQ发送消息
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
log.info("sent mq user {}", user.getId());
}
});
}
}

@ -0,0 +1,25 @@
package com.baiye.demo.case25.compensation;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class UserService {
private List<User> users = new ArrayList<>();
public User register() {
User user = new User();
users.add(user);
return user;
}
public List<User> getUsersAfterIdWithLimit(long id, int limit) {
return users.stream()
.filter(user -> user.getId() >= id)
.limit(limit)
.collect(Collectors.toList());
}
}

@ -0,0 +1,12 @@
package com.baiye.demo.case25.deadletter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CommonMistakesApplication {
public static void main(String[] args) {
SpringApplication.run(CommonMistakesApplication.class, args);
}
}

@ -0,0 +1,13 @@
package com.baiye.demo.case25.deadletter;
public class Consts {
public static final String QUEUE = "test";
public static final String EXCHANGE = "test";
public static final String ROUTING_KEY = "test";
public static final String DEAD_EXCHANGE = "deadtest";
public static final String DEAD_QUEUE = "deadtest";
public static final String DEAD_ROUTING_KEY = "deadtest";
}

@ -0,0 +1,31 @@
package com.baiye.demo.case25.deadletter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.atomic.AtomicLong;
@RequestMapping("deadletter")
@Slf4j
@RestController
public class DeadLetterController {
/**
*
*/
AtomicLong atomicLong = new AtomicLong();
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage")
public void sendMessage() {
String msg = "msg" + atomicLong.incrementAndGet();
log.info("send message {}", msg);
// 发送消息
rabbitTemplate.convertAndSend(Consts.EXCHANGE, msg);
}
}

@ -0,0 +1,27 @@
package com.baiye.demo.case25.deadletter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MQListener {
@RabbitListener(queues = Consts.QUEUE)
public void handler(String data) {
//http://localhost:15672/#/
log.info("got message {}", data);
throw new NullPointerException("error");
//throw new AmqpRejectAndDontRequeueException("error");
}
/**
*
*/
@RabbitListener(queues = Consts.DEAD_QUEUE)
public void deadHandler(String data) {
log.error("got dead message {}", data);
}
}

@ -0,0 +1,73 @@
package com.baiye.demo.case25.deadletter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
@Configuration
@Slf4j
public class RabbitConfiguration {
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public Declarables declarables() {
// 队列
Queue queue = new Queue(Consts.QUEUE);
// 交换器
DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE);
// 快速声明一组对象,包含队列、交换器,以及队列到交换器的绑定
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY));
}
/**
*
*/
@Bean
public Declarables declarablesForDead() {
Queue queue = new Queue(Consts.DEAD_QUEUE);
DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE);
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY));
}
/**
*
*/
@Bean
public RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
// 最多尝试不是重试5次
.maxAttempts(5)
// 指数退避重试
.backOffOptions(1000, 2.0, 10000)
.recoverer(new RepublishMessageRecoverer(rabbitTemplate, Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY))
.build();
}
/**
* SimpleRabbitListenerContainerFactoryadviceChainRetryOperationsInterceptor
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(interceptor());
// fixme 增加消费线程来避免性能问题
factory.setConcurrentConsumers(10);
return factory;
}
}

@ -0,0 +1,12 @@
package com.baiye.demo.case25.fanoutvswork;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CommonMistakesApplication {
public static void main(String[] args) {
SpringApplication.run(CommonMistakesApplication.class, args);
}
}

@ -0,0 +1,67 @@
package com.baiye.demo.case25.fanoutvswork;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutright")
public class FanoutQueueRight {
private static final String MEMBER_QUEUE = "newusermember";
private static final String PROMOTION_QUEUE = "newuserpromotion";
private static final String EXCHANGE = "newuser";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
}
@Bean
public Declarables declarables() {
Queue memberQueue = new Queue(MEMBER_QUEUE);
Queue promotionQueue = new Queue(PROMOTION_QUEUE);
FanoutExchange exchange = new FanoutExchange(EXCHANGE);
return new Declarables(memberQueue, promotionQueue, exchange,
BindingBuilder.bind(memberQueue).to(exchange),
BindingBuilder.bind(promotionQueue).to(exchange));
}
@RabbitListener(queues = MEMBER_QUEUE)
public void memberService1(String userName) {
log.info("memberService1: welcome message sent to new user {}", userName);
}
@RabbitListener(queues = MEMBER_QUEUE)
public void memberService2(String userName) {
log.info("memberService2: welcome message sent to new user {}", userName);
}
@RabbitListener(queues = PROMOTION_QUEUE)
public void promotionService1(String userName) {
log.info("promotionService1: gift sent to new user {}", userName);
}
@RabbitListener(queues = PROMOTION_QUEUE)
public void promotionService2(String userName) {
log.info("promotionService2: gift sent to new user {}", userName);
}
}

@ -0,0 +1,76 @@
package com.baiye.demo.case25.fanoutvswork;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import java.util.UUID;
@Slf4j
//@Configuration
//@RestController
@RequestMapping("fanoutwrong")
public class FanoutQueueWrong {
private static final String QUEUE = "newuser";
private static final String EXCHANGE = "newuser";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
}
/**
* FanoutExchangeFanoutExchangeroutingKey
*/
@Bean
public Declarables declarables() {
Queue queue = new Queue(QUEUE);
FanoutExchange exchange = new FanoutExchange(EXCHANGE);
return new Declarables(queue, exchange,
BindingBuilder.bind(queue).to(exchange));
}
/**
* 1
*/
@RabbitListener(queues = QUEUE)
public void memberService1(String userName) {
log.info("memberService1: welcome message sent to new user {}", userName);
}
/**
* 2
*/
@RabbitListener(queues = QUEUE)
public void memberService2(String userName) {
log.info("memberService2: welcome message sent to new user {}", userName);
}
/**
* 1
*/
@RabbitListener(queues = QUEUE)
public void promotionService1(String userName) {
log.info("promotionService1: gift sent to new user {}", userName);
}
/**
* 2
*/
@RabbitListener(queues = QUEUE)
public void promotionService2(String userName) {
log.info("promotionService2: gift sent to new user {}", userName);
}
}

@ -0,0 +1,51 @@
package com.baiye.demo.case25.fanoutvswork;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import java.util.UUID;
@Slf4j
//@Configuration
//@RestController
@RequestMapping("workqueueright")
public class WorkQueueRight {
private static final String EXCHANGE = "newuserExchange";
private static final String QUEUE = "newuserQueue";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "test", UUID.randomUUID().toString());
}
@Bean
public Queue queue() {
return new Queue(QUEUE);
}
@Bean
public Declarables declarables() {
DirectExchange exchange = new DirectExchange(EXCHANGE);
return new Declarables(queue(), exchange,
BindingBuilder.bind(queue()).to(exchange).with("test"));
}
@RabbitListener(queues = "#{queue.name}")
public void memberService(String userName) {
log.info("memberService: welcome message sent to new user {}", userName);
}
}

@ -0,0 +1,55 @@
package com.baiye.demo.case25.fanoutvswork;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import java.util.UUID;
@Slf4j
//@Configuration
//@RestController
@RequestMapping("workqueuewrong")
public class WorkQueueWrong {
private static final String EXCHANGE = "newuserExchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
}
/**
* 使
*/
@Bean
public Queue queue() {
return new AnonymousQueue();
}
/**
* DirectExchange
*/
@Bean
public Declarables declarables() {
DirectExchange exchange = new DirectExchange(EXCHANGE);
return new Declarables(queue(), exchange,
BindingBuilder.bind(queue()).to(exchange).with(""));
}
/**
* SpELBean
*/
@RabbitListener(queues = "#{queue.name}")
public void memberService(String userName) {
log.info("memberService: welcome message sent to new user {} from {}", userName, System.getProperty("server.port"));
}
}

@ -0,0 +1,12 @@
package com.baiye.demo.case25.rabbitmqdlx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CommonMistakesApplication {
public static void main(String[] args) {
SpringApplication.run(CommonMistakesApplication.class, args);
}
}

@ -0,0 +1,19 @@
package com.baiye.demo.case25.rabbitmqdlx;
public class Consts {
public static final Integer RETRY_INTERNAL = 3000;
public static final Integer RETRY_COUNT = 2;
public static final String EXCHANGE = "worker";
public static final String QUEUE = "worker";
public static final String ROUTING_KEY = "worker";
public static final String BUFFER_QUEUE = "buffer";
public static final String BUFFER_EXCHANGE = "buffer";
public static final String BUFFER_ROUTING_KEY = "buffer";
public static final String DEAD_EXCHANGE = "dead";
public static final String DEAD_QUEUE = "dead";
public static final String DEAD_ROUTING_KEY = "dead";
}

@ -0,0 +1,27 @@
package com.baiye.demo.case25.rabbitmqdlx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.atomic.AtomicLong;
@RequestMapping("deadletter")
@Slf4j
@RestController
public class DeadLetterController {
AtomicLong atomicLong = new AtomicLong();
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void send() {
String message = "msg" + atomicLong.incrementAndGet();
log.info("Client 发送消息 {}", message);
rabbitTemplate.convertAndSend(Consts.EXCHANGE, Consts.QUEUE, message);
}
}

@ -0,0 +1,64 @@
package com.baiye.demo.case25.rabbitmqdlx;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
public class MQListener {
@Autowired
private MessagePropertiesConverter messagePropertiesConverter;
@RabbitListener(queues = Consts.QUEUE)
public void handler(@Payload Message message, Channel channel) throws IOException {
String m = new String(message.getBody());
try {
log.info("Handler 收到消息:{}", m);
throw new RuntimeException("处理消息失败");
} catch (Exception e) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
Long retryCount = getRetryCount(headers);
if (retryCount < Consts.RETRY_COUNT) {
log.info("Handler 消费消息:{} 异常,准备重试第{}次", m, ++retryCount);
AMQP.BasicProperties rabbitMQProperties =
messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), "UTF-8");
rabbitMQProperties.builder().headers(headers);
channel.basicPublish(Consts.BUFFER_EXCHANGE, Consts.BUFFER_ROUTING_KEY, rabbitMQProperties, message.getBody());
} else {
log.info("Handler 消费消息:{} 异常,已重试 {} 次,发送到死信队列处理!", m, Consts.RETRY_COUNT);
channel.basicPublish(Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY, null, message.getBody());
}
}
}
private long getRetryCount(Map<String, Object> headers) {
long retryCount = 0;
if (null != headers) {
if (headers.containsKey("x-death")) {
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
if (!deathList.isEmpty()) {
Map<String, Object> deathEntry = deathList.get(0);
retryCount = (Long) deathEntry.get("count");
}
}
}
return retryCount;
}
@RabbitListener(queues = Consts.DEAD_QUEUE)
public void deadHandler(@Payload Message message) {
log.error("DeadHandler 收到死信消息: {}", new String(message.getBody()));
}
}

@ -0,0 +1,68 @@
package com.baiye.demo.case25.rabbitmqdlx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
@Configuration
@Slf4j
public class RabbitConfiguration implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public MessagePropertiesConverter messagePropertiesConverter() {
return new DefaultMessagePropertiesConverter();
}
@Bean
public Declarables declarablesForWorker() {
Queue queue = new Queue(Consts.QUEUE);
DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE);
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY));
}
@Bean
public Declarables declarablesForBuffer() {
Queue queue = QueueBuilder.durable(Consts.BUFFER_QUEUE)
.withArgument("x-dead-letter-exchange", Consts.EXCHANGE)
.withArgument("x-dead-letter-routing-key", Consts.ROUTING_KEY)
.withArgument("x-message-ttl", Consts.RETRY_INTERNAL)
.build();
DirectExchange directExchange = new DirectExchange(Consts.BUFFER_EXCHANGE);
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.BUFFER_ROUTING_KEY));
}
@Bean
public Declarables declarablesForDead() {
Queue queue = new Queue(Consts.DEAD_QUEUE);
DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE);
return new Declarables(queue, directExchange,
BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY));
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Loading…
Cancel
Save