Merge remote-tracking branch 'origin/master'

# Conflicts:
#	dev-protocol-springcloud/SpringCloud项目介绍.md
#	dev-protocol-springcloud/dev-protocol-springcloud-hystrix/NetflixHystrix.md
#	dev-protocol-springcloud/dev-protocol-springcloud-hystrix/src/main/java/org/example/controller/HystrixController.java
master
q 1 week ago
commit 234568cfdc

@ -11,6 +11,7 @@
- [dev-protocol-springcloud-sleuth-zipkin](dev-protocol-springcloud-sleuth-zipkin)
- 微服务容错 - (SpringCloud Netflix Hystrix)
- [dev-protocol-springcloud-hystrix](dev-protocol-springcloud-hystrix)
- [dev-protocol-springcloud-hystrix-dashboard](dev-protocol-springcloud-hystrix-dashboard)
- 消息驱动微服务 - (SpringCloud Stream)
- todo
- 分布式事务 - (SpringCloud Alibaba Seata)

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.example</groupId>
<artifactId>dev-protocol</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>dev-protocol-springcloud-hystrix-dashboard</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<!-- 模块名及描述信息 -->
<name>dev-protocol-springcloud-hystrix-dashboard</name>
<description>Hystrix Dashboard</description>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- spring cloud alibaba nacos discovery 依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>
<!--
SpringBoot的Maven插件, 能够以Maven的方式为应用提供SpringBoot的支持可以将
SpringBoot应用打包为可执行的jar或war文件, 然后以通常的方式运行SpringBoot应用
-->
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,21 @@
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;
/**
* <h1>hystrix dashboard </h1>
* http://127.0.0.1:9999/dev-protocol-springcloud-hystrix-dashboard/hystrix/
* stream : http://127.0.0.1:8111/dev-protocol-springcloud-hystrix/actuator/hystrix.stream
* */
@EnableDiscoveryClient
@SpringBootApplication
@EnableHystrixDashboard // 开启 Hystrix Dashboard
public class HystrixDashboardApplication {
public static void main(String[] args) {
SpringApplication.run(HystrixDashboardApplication.class, args);
}
}

@ -0,0 +1,33 @@
server:
port: 9999
servlet:
context-path: /dev-protocol-springcloud-hystrix-dashboard
spring:
application:
name: dev-protocol-springcloud-hystrix-dashboard
cloud:
nacos:
# 服务注册发现
discovery:
enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
server-addr: 127.0.0.1:8848
# server-addr: 127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850 # Nacos 服务器地址
namespace: 1ccc74ae-9398-4dbe-b9d7-4f9addf9f40c
metadata:
management:
context-path: ${server.servlet.context-path}/actuator
hystrix:
dashboard:
proxy-stream-allow-list: "127.0.0.1"
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always

@ -0,0 +1,31 @@
### 获取 Token
POST http://127.0.0.1:9001/dev-protocol-springcloud-gateway/dev-protocol-springcloud-hystrix/communication/token-by-feign
Content-Type: application/json
e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A
token: imooc
{
"username": "Qinyi@imooc.com",
"password": "25d55ad283aa400af464c76d713c07ad"
}
### 根据提供的 serviceId 获取实例信息
GET http://127.0.0.1:9001/dev-protocol-springcloud-gateway/ecommerce-nacos-client/hystrix/hystrix-command-annotation?serviceId=e-commerce-nacos-client
Content-Type: application/json
e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A
token: imooc
### 根据提供的 serviceId 获取实例信息
GET http://127.0.0.1:9001/dev-protocol-springcloud-gateway/ecommerce-nacos-client/hystrix/simple-hystrix-command?serviceId=e-commerce-nacos-client
Content-Type: application/json
e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A
token: imooc
### 根据提供的 serviceId 获取实例信息
GET http://127.0.0.1:9001/dev-protocol-springcloud-gateway/ecommerce-nacos-client/hystrix/hystrix-observable-command?serviceId=e-commerce-nacos-client
Content-Type: application/json
e-commerce-user: eyJhbGciOiJSUzI1NiJ9.eyJlLWNvbW1lcmNlLXVzZXIiOiJ7XCJpZFwiOjEwLFwidXNlcm5hbWVcIjpcIlFpbnlpQGltb29jLmNvbVwifSIsImp0aSI6IjJlMTgwYzA2LWIwZGQtNDJjYy1iNzk5LTZiNGI4N2Q3NTM3NCIsImV4cCI6MTYyNTQxNDQwMH0.P8gKWcwiibEmq80idCl9QzUaqJnQ7FVQ5yxzlMLfRNcG3nWMYh0o1kBO2vKvDGG8BofQ4__xn210R2P7zjH-uEn7hic0jbBt97vLkzvATnFegxaDSjUp9At9BIGl9Ecu7RPCwSQJFdw6NBMBz02Uom8yvHpUJEfv8_jZRurBp94tzES6IAACKC6NFq-3_mQfT4o_zA51U4hBbIBw6qNXvMSqK-VtaqkTNFfJsP_8Y0nmQck0jxmgdtLfRVfZCCp3rhcLNDBX37THQI6U7lvNFETceAKyJ5NFaA8p68lw_3D2IiZER1jEU3M2QIMQGeU0pmQ6IPwGDdA0MP5IKKzj8A
token: imooc

@ -21,25 +21,73 @@
- 请求依赖项失败后,可以选择出错或者是兜底回退
### 使用注解方式实现服务的容错、降级
---
- SpringCloud Netflix Hystrix 的三种模式
- Hystrix 的断路器模式和后备策略模式
- 断路器模式:设置超时或者失败等熔断策略
- 后备策略模式:断路器模式触发之后,如果存在后备策略,则执行后备
- Hystrix 的舱壁模式Hystrix 舱壁模式的启发:货船为了进行防止漏水和火灾的扩散,会将货仓分隔为多个当发生灾害时,将所在货仓进行隔离就可以降低整艘船的风险
- 舱壁模式功能就是:我们可以通过为各个服务分别指定线程池
- 每一个远程资源调用都被放置在自己的线程池中,每一个线程池只有一定数量的线程来处理远程调用
- 一个性能低下的服务,只会影响到同一个线程池中的其他服务调用。所以降低了会产生的负面影响
### 使用编程方式实现服务的容错、降级
- [NacosClientHystrixCommand.java] - [NacosClientHystrixObservableCommand.java]
### 编程方式开启 Hystrix 请求缓存
- 先编写过滤器 [HystrixRequestContextServletFilter.java]
- Hystrix 的结果缓存指的是在一次 Hystrix 的请求上下文中
- Hystrix 请求缓存的两种实现方式
- 编程(继承 HystrixCommand)方式,重写 getCacheKey 方法即可
- 注解方式
- @CacheResult:该注解用来标记请求命令返回的结果应该被缓存,它必须与@HystrixCommand注解结合使用, 属性: cacheKeyMethod
- @CacheRemove: 该注解用来让请求命令的缓存失效失效的缓存根据commandKey进行查找。属性: commandKey,cacheKeyMethod
- @CacheKey: 该注解用来在请求命令的参数上标记使其作为cacheKey如果没有使用此注解则会使用所有参数列表中的参数作为cacheKey, 属性: value
### 注解方式开启 Hystrix 请求缓存
-
### 编程方式应用 Hystrix 请求合并
- 默认情况下,每一个请求都会占用一个线程和一次网络请求,高并发场景下效率不高\
- 使用 Hystrix 的请求合并,将多个请求 merge 为一个,提高服务的并发能力
---
- 请求合并的适用场景与注意事项
- 适用场景:单个对象的查询并发数很高,服务提供方负载较高,就可以考虑使用请求合并
- 注意事项
- 请求在代码中人为的设置了延迟时间,会降低请求的响应速度
- 可能会提高服务提供方的负载,因为返回 List 结果数据量偏大
- 实现请求合并比较复杂
### 注解方式应用 Hystrix 请求合并
- 使用注解的方式比较多
### OpenFeign 集成 Hystrix 开启后备模式
- OpenFeign 集成 Hystrix 的步骤
- 在配置文件中开启 Hystrix 的熔断功能:feign.hystrix.enabled: true
- @FeignClient 注解的 fallback 和 fallbackFactory 属性
- fallback 和 fallbackFactory 都是用于配置响应回退,但是不可以同时使用
- fallbackFactory 能够获取到 OpenFeign 调用抛出的异常
### 使用 Hystrix 监控面板监测客户端容错
- Hystrix Dashboard 是一个单独的应用,用来实时的对 Hystrix 的『使用情况』进行实时的监控
### SpringCloud Netflix Hystrix 容错组件总结
- ![Hystrix工作流程图.png](pic/Hystrix工作流程图.png)
---
- Hystrix 完整的工作流程
- 构造-个 HystrixCommand 或者 HystrixObservableCommand 对象
- 表示对一个对依赖项请求的包装
- 执行命令获取响应
- execute():阻塞,直到收到响应或者抛出异常, 同步执行的, 调用的是 queue().get()
- queue():返回一个 Future, 异步执行的, 调用的是 observable.toBlocking().toFuture()
- observe():订阅代表响应的 Observable
- toObservable():返回一个 Observable,当你订阅它以后将会执行Hystrix 命令并且推送它的响应
- 总结: 最终每一个 HystrixCommand, 都是 Observable 的实现
- 响应是否已经被缓存
- 如果请求在缓存中的结果是存在的, 该请求在缓存中对应的结果是可用的, 缓存就会立即以一个 Observable 的形式被返回
- 断路器是否打开
- 缓存不存在的时候, 当你执行命令的时候, Hystrix 检查断路器是否打开, 如果打开, 不会执行任何命令, 执行回退逻辑, 如果是关闭, 检查容量是否可用
- 线程池、信号量是否已经使用率 100%
- 判断, 线程池和信号量是否已经满了, 如果满了, 不会执行命令, 立马执行回退逻辑
- HystrixObservableCommand.construct() or HystrixCommand.run()
- 对依赖项的调用请求, 真正的逻辑编写在这里, run 会返回一个响应, 或者抛出一个异常, construct 会返回一个 Observable 对象, 会推送一个响应或者一个 ERROR 通知, 会抛出一个 Timeout 异常
- 计算电路健康
- Hystrix 报告成功, 失败, 拒绝, 超时给断路器, 断路器会维护一组计算统计的计数器, 断路器会用这些统计数据来决定, 什么时候要跳闸, 后续的所有请求都会被短路, 直到一个恢复周期耗尽以后, 在第一次检查健康检查之后, 才会重新打开电路
- 回退或返回成功响应
- 一, 当你失败以后, HystrixCommand 会回退一个单一的回退值, HystrixObservableCommand 会返回一个或者多个回退值, 通过 fallback, 当 fallback 有异常也会被进一步抛出, 给调用者发送 ERROR 通知, 命令调用者进行处理
- 二, 成功会直接抛出响应给调用者, 返回一个 Observable

Binary file not shown.

After

Width:  |  Height:  |  Size: 133 KiB

@ -49,6 +49,27 @@
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!-- openfeign -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-micrometer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- feign 替换 JDK 默认的 URLConnection 为 okhttp -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
</dependency>
<!-- 使用原生的 Feign Api 做的自定义配置, encoder 和 decoder -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-gson</artifactId>
<version>12.1</version>
</dependency>
</dependencies>
<!--

@ -2,10 +2,10 @@ package org.example.controller;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.example.hystrix.CacheHystrixCommand;
import org.example.hystrix.NacosClientHystrixCommand;
import org.example.hystrix.NacosClientHystrixObservableCommand;
import org.example.hystrix.UseHystrixCommandAnnotation;
import org.example.hystrix.request_merge.NacosClientCollapseCommand;
import org.example.service.NacosClientService4HystrixDemo;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.web.bind.annotation.GetMapping;
@ -164,4 +164,156 @@ public class HystrixController {
log.info("result03, result04: [{}], [{}]",
JSON.toJSONString(result03), JSON.toJSONString(result04));
}
/*@GetMapping("/cache-hystrix-command")
public void cacheHystrixCommand(@RequestParam String serviceId) {
// 使用缓存 Command, 发起两次请求
CacheHystrixCommand command1 = new CacheHystrixCommand(
nacosClientService, serviceId
);
CacheHystrixCommand command2 = new CacheHystrixCommand(
nacosClientService, serviceId
);
List<ServiceInstance> result01 = command1.execute();
List<ServiceInstance> result02 = command2.execute();
log.info("result01, result02: [{}], [{}]",
JSON.toJSONString(result01), JSON.toJSONString(result02));
// 清除缓存
CacheHystrixCommand.flushRequestCache(serviceId);
// 使用缓存 Command, 发起两次请求
CacheHystrixCommand command3 = new CacheHystrixCommand(
nacosClientService, serviceId
);
CacheHystrixCommand command4 = new CacheHystrixCommand(
nacosClientService, serviceId
);
List<ServiceInstance> result03 = command3.execute();
List<ServiceInstance> result04 = command4.execute();
log.info("result03, result04: [{}], [{}]",
JSON.toJSONString(result03), JSON.toJSONString(result04));
}
@GetMapping("/cache-annotation-01")
public List<ServiceInstance> useCacheByAnnotation01(@RequestParam String serviceId) {
log.info("use cache by annotation01(controller) to get nacos client info: [{}]",
serviceId);
List<ServiceInstance> result01 =
cacheHystrixCommandAnnotation.useCacheByAnnotation01(serviceId);
List<ServiceInstance> result02 =
cacheHystrixCommandAnnotation.useCacheByAnnotation01(serviceId);
// 清除掉缓存
cacheHystrixCommandAnnotation.flushCacheByAnnotation01(serviceId);
List<ServiceInstance> result03 =
cacheHystrixCommandAnnotation.useCacheByAnnotation01(serviceId);
// 这里有第四次调用
return cacheHystrixCommandAnnotation.useCacheByAnnotation01(serviceId);
}
@GetMapping("/cache-annotation-02")
public List<ServiceInstance> useCacheByAnnotation02(@RequestParam String serviceId) {
log.info("use cache by annotation02(controller) to get nacos client info: [{}]",
serviceId);
List<ServiceInstance> result01 =
cacheHystrixCommandAnnotation.useCacheByAnnotation02(serviceId);
List<ServiceInstance> result02 =
cacheHystrixCommandAnnotation.useCacheByAnnotation02(serviceId);
// 清除掉缓存
cacheHystrixCommandAnnotation.flushCacheByAnnotation02(serviceId);
List<ServiceInstance> result03 =
cacheHystrixCommandAnnotation.useCacheByAnnotation02(serviceId);
// 这里有第四次调用
return cacheHystrixCommandAnnotation.useCacheByAnnotation02(serviceId);
}
@GetMapping("/cache-annotation-03")
public List<ServiceInstance> useCacheByAnnotation03(@RequestParam String serviceId) {
log.info("use cache by annotation03(controller) to get nacos client info: [{}]",
serviceId);
List<ServiceInstance> result01 =
cacheHystrixCommandAnnotation.useCacheByAnnotation03(serviceId);
List<ServiceInstance> result02 =
cacheHystrixCommandAnnotation.useCacheByAnnotation03(serviceId);
// 清除掉缓存
cacheHystrixCommandAnnotation.flushCacheByAnnotation03(serviceId);
List<ServiceInstance> result03 =
cacheHystrixCommandAnnotation.useCacheByAnnotation03(serviceId);
// 这里有第四次调用
return cacheHystrixCommandAnnotation.useCacheByAnnotation03(serviceId);
}*/
/**
* <h2></h2>
* */
@GetMapping("/request-merge")
public void requestMerge() throws Exception {
// 前三个请求会被合并
NacosClientCollapseCommand collapseCommand01 = new NacosClientCollapseCommand(
nacosClientService4HystrixDemo, "dev-protocol-springcloud-hystrix1");
NacosClientCollapseCommand collapseCommand02 = new NacosClientCollapseCommand(
nacosClientService4HystrixDemo, "dev-protocol-springcloud-hystrix2");
NacosClientCollapseCommand collapseCommand03 = new NacosClientCollapseCommand(
nacosClientService4HystrixDemo, "dev-protocol-springcloud-hystrix3");
Future<List<ServiceInstance>> future01 = collapseCommand01.queue();
Future<List<ServiceInstance>> future02 = collapseCommand02.queue();
Future<List<ServiceInstance>> future03 = collapseCommand03.queue();
future01.get();
future02.get();
future03.get();
Thread.sleep(2000);
// 过了合并的时间窗口, 第四个请求单独发起
NacosClientCollapseCommand collapseCommand04 = new NacosClientCollapseCommand(
nacosClientService4HystrixDemo, "dev-protocol-springcloud-hystrix4");
Future<List<ServiceInstance>> future04 = collapseCommand04.queue();
future04.get();
}
/**
* <h2></h2>
* */
@GetMapping("/request-merge-annotation")
public void requestMergeAnnotation() throws Exception {
Future<List<ServiceInstance>> future01 = nacosClientService4HystrixDemo.findNacosClientInfo(
"dev-protocol-springcloud-hystrix1"
);
Future<List<ServiceInstance>> future02 = nacosClientService4HystrixDemo.findNacosClientInfo(
"dev-protocol-springcloud-hystrix2"
);
Future<List<ServiceInstance>> future03 = nacosClientService4HystrixDemo.findNacosClientInfo(
"dev-protocol-springcloud-hystrix3"
);
future01.get();
future02.get();
future03.get();
Thread.sleep(2000);
Future<List<ServiceInstance>> future04 = nacosClientService4HystrixDemo.findNacosClientInfo(
"dev-protocol-springcloud-hystrix4"
);
future04.get();
}
}

@ -0,0 +1,50 @@
package org.example.hystrix.request_merge;
import com.alibaba.fastjson.JSON;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import lombok.extern.slf4j.Slf4j;
import org.example.service.NacosClientService4HystrixDemo;
import org.springframework.cloud.client.ServiceInstance;
import java.util.Collections;
import java.util.List;
/**
* <h1> Hystrix Command</h1>
* */
@Slf4j
public class NacosClientBatchCommand extends HystrixCommand<List<List<ServiceInstance>>> {
private final NacosClientService4HystrixDemo nacosClientService;
private final List<String> serviceIds;
protected NacosClientBatchCommand(
NacosClientService4HystrixDemo nacosClientService, List<String> serviceIds
) {
super(
HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey("NacosClientBatchCommand")
)
);
this.nacosClientService = nacosClientService;
this.serviceIds = serviceIds;
}
@Override
protected List<List<ServiceInstance>> run() throws Exception {
log.info("use nacos client batch command to get result: [{}]",
JSON.toJSONString(serviceIds));
return nacosClientService.getNacosClientInfos(serviceIds);
}
@Override
protected List<List<ServiceInstance>> getFallback() {
log.warn("nacos client batch command failure, use fallback");
return Collections.emptyList();
}
}

@ -0,0 +1,85 @@
package org.example.hystrix.request_merge;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;
import lombok.extern.slf4j.Slf4j;
import org.example.service.NacosClientService4HystrixDemo;
import org.springframework.cloud.client.ServiceInstance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* <h1></h1>
* */
@Slf4j
public class NacosClientCollapseCommand
extends HystrixCollapser<List<List<ServiceInstance>>, List<ServiceInstance>, String> {
// 批量返回类型, 单个请求对象的返回类型, 请求参数的类型
private final NacosClientService4HystrixDemo nacosClientService;
private final String serviceId;
public NacosClientCollapseCommand(NacosClientService4HystrixDemo nacosClientService, String serviceId) {
super(
HystrixCollapser.Setter.withCollapserKey(
HystrixCollapserKey.Factory.asKey("NacosClientCollapseCommand")
).andCollapserPropertiesDefaults(
HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(300) // 等待 300ms 合并请求
)
);
this.nacosClientService = nacosClientService;
this.serviceId = serviceId;
}
/**
* <h2></h2>
* */
@Override
public String getRequestArgument() {
return this.serviceId;
}
/**
* <h2> Hystrix Command</h2>
* */
@Override
protected HystrixCommand<List<List<ServiceInstance>>> createCommand(
Collection<CollapsedRequest<List<ServiceInstance>, String>> collapsedRequests) {
List<String> serviceIds = new ArrayList<>(collapsedRequests.size()); // 合并请求的大小就是参数们的大小
serviceIds.addAll(
collapsedRequests.stream()
.map(CollapsedRequest::getArgument) // 获取每个请求的参数
.collect(Collectors.toList())
);
return new NacosClientBatchCommand(nacosClientService, serviceIds);
}
/**
* <h2></h2>
* */
@Override
protected void mapResponseToRequests(List<List<ServiceInstance>> batchResponse,
Collection<CollapsedRequest<List<ServiceInstance>,
String>> collapsedRequests) {
int count = 0;
for (CollapsedRequest<List<ServiceInstance>, String> collapsedRequest : collapsedRequests) {
// 从批量响应集合中按顺序取出结果
List<ServiceInstance> instances = batchResponse.get(count++);
// 将结果返回原 Response 中
collapsedRequest.setResponse(instances);
}
}
}

@ -0,0 +1,31 @@
package org.example.service;
import org.example.service.hystrix.bak.AuthorityFeignClientFallback;
import org.example.service.hystrix.bak.AuthorityFeignClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
/**
* <h1> Authority Feign Client </h1>
* */
@FeignClient(
// contextId 是对 FeignClient 的声明, 每一个进行的通信都要进行定义, value 表示需要进行通信的服务id是什么
contextId = "AuthorityFeignClient", value = "e-commerce-authority-center",
// fallback = AuthorityFeignClientFallback.class, // fixme fallback 和 fallbackFactory 只能使用一个
fallbackFactory = AuthorityFeignClientFallbackFactory.class
)
public interface AuthorityFeignClientHystrixDemo {
/**
* <h2> OpenFeign 访 Authority Token</h2>
*
* value , ip + port
* consumes, produces: , , 使 Api ,
* */
@RequestMapping(value = "/ecommerce-authority-center/authority/token", // fixme :这里的调用代码是不对的, 仅仅作为 Hystrix 举例使用
method = RequestMethod.POST,
consumes = "application/json", produces = "application/json")
String getTokenByFeign(@RequestBody String usernameAndPassword);
}

@ -1,11 +1,17 @@
package org.example.service;
import com.alibaba.fastjson.JSON;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
@Slf4j
@Service
@ -35,4 +41,45 @@ public class NacosClientService4HystrixDemo {
log.info("request nacos client to get service instance info: [{}]", serviceId);
return discoveryClient.getInstances(serviceId);
}
/**
* <h2> Hystrix </h2>
* */
public List<List<ServiceInstance>> getNacosClientInfos(List<String> serviceIds) {
log.info("request nacos client to get service instance infos: [{}]",
JSON.toJSONString(serviceIds));
List<List<ServiceInstance>> result = new ArrayList<>(serviceIds.size());
serviceIds.forEach(s -> result.add(discoveryClient.getInstances(s)));
return result;
}
/**
* <h2>使 Hystrix </h2>
*/
@HystrixCollapser(
batchMethod = "findNacosClientInfos",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
// 时间窗口的填充
@HystrixProperty(name = "timerDelayInMilliseconds", value = "300")
}
)
public Future<List<ServiceInstance>> findNacosClientInfo(String serviceId) {
// 系统运行正常, 不会走这个方法
throw new RuntimeException("This method body should not be executed!");
}
/**
*
*/
@HystrixCommand // 使用默认的 GroupKey CommandKey
public List<List<ServiceInstance>> findNacosClientInfos(List<String> serviceIds) {
log.info("coming in find nacos client infos: [{}]", JSON.toJSONString(serviceIds));
return getNacosClientInfos(serviceIds);
}
}

@ -0,0 +1,21 @@
package org.example.service.hystrix.bak;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.example.service.AuthorityFeignClientHystrixDemo;
import org.springframework.stereotype.Component;
/**
* <h1>AuthorityFeignClient fallback</h1>
* */
@Slf4j
@Component
public class AuthorityFeignClientFallback implements AuthorityFeignClientHystrixDemo {
@Override
public String getTokenByFeign(String usernameAndPassword) {
log.info("authority feign client get token by feign request error " +
"(Hystrix Fallback): [{}]", JSON.toJSONString(usernameAndPassword));
return "qqqqq";
}
}

@ -0,0 +1,27 @@
package org.example.service.hystrix.bak;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.example.service.AuthorityFeignClientHystrixDemo;
import org.springframework.stereotype.Component;
/**
* <h1>OpenFeign Hystrix </h1>
* */
@Slf4j
@Component
public class AuthorityFeignClientFallbackFactory implements FallbackFactory<AuthorityFeignClientHystrixDemo> {
@Override
public AuthorityFeignClientHystrixDemo create(Throwable throwable) {
log.warn("authority feign client get token by feign request error " +
"(Hystrix FallbackFactory): [{}]", throwable.getMessage(), throwable);
return new AuthorityFeignClientHystrixDemo() {
@Override
public String getTokenByFeign(String usernameAndPassword) {
return "q-factory";
}
};
}
}

@ -0,0 +1,40 @@
## 基于 SpringCloud Stream 构建消息驱动微服务
### SpringBoot 集成 Kafka 构建消息驱动微服务
- 下载安装 kafka
- 下载 Kafka :https://kafka.apache.org/quickstart
- 解压、启动 ZK 和 Kafka Server 即可(使用默认配置)
---
- 基本架构
- Producer -> Message[Topic] -> Kafka Broker -> Partition <- Consumer[Topic]
### SpringBoot 集成 RocketMQ 构建消息驱动微服务
- 下载、安装 RocketMQ
- 下载 RocketMQ: http://rocketmg.apache.org/docs/quick-start
- 下载以 bin-release 结尾的 zip 包解压即完成安装
---
- MQ 的启动, 关注2个
- mqnamesrv
- sh mqnamesrv
- mqbroker
- sh mqbroker -n localhost:9876
### SpringCloud Stream 消息驱动组件概览
- 为什么会出现 SpringCloud Stream
- 如果没有 SpringCloud Stream,我们会怎么应用消息驱动?
- Producer -> Message -> Kafka/RocketMQ <- Consumer
- 有了Stream
- Producer -> Message -> [Kafka/RocketMQ][Stream] <- Consumer
### 基于 SpringCloud Stream 消息驱动的简单应用
- [dev-protocol-springcloud-stream](..%2Fdev-protocol-springcloud-stream)
### 自定义 Stream 消息通信信道实现定制分发
- [dev-protocol-springcloud-stream](..%2Fdev-protocol-springcloud-stream)
### SpringCloud Stream 消息分组和消费分区的配置与说明
- [dev-protocol-springcloud-stream](..%2Fdev-protocol-springcloud-stream)
### SpringCloud Stream 消息驱动组件总结

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.example</groupId>
<artifactId>dev-protocol</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>dev-protocol-springcloud-message-study</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dev-protocol-springcloud-message-study</name>
<description>MQ 学习实践</description>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot 监控端点 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 让 SpringBoot 能够识别 bootstrap.yml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<!-- RocketMQ 这个版本必须和你的 RocketMQ 版本匹配 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version> <!-- 匹配 4.8.0-->
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- Spring Data Jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,14 @@
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* <h1></h1>
* */
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

@ -0,0 +1,41 @@
package org.example.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.example.kafka.KafkaProducer;
import org.example.vo.QMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* <h1>SpringBoot kafka </h1>
* */
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final ObjectMapper mapper;
private final KafkaProducer kafkaProducer;
public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
this.mapper = mapper;
this.kafkaProducer = kafkaProducer;
}
/**
* <h2> kafka </h2>
* */
@GetMapping("/send-message")
public void sendMessage(@RequestParam(required = false) String key,
@RequestParam String topic) throws Exception {
QMessage message = new QMessage(
1,
"q-Study-Message"
);
kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
}
}

@ -0,0 +1,51 @@
package org.example.controller;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.example.rocket.RocketMQProducer;
import org.example.vo.QMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <h1>SpringBoot RocketMQ</h1>
* */
@Slf4j
@RestController
@RequestMapping("/rocket-mq")
public class RocketMQController {
private static final QMessage RocketMQMessage = new QMessage(
1,
"Q-Study-RocketMQ-In-SpringBoot"
);
private final RocketMQProducer rocketMQProducer;
public RocketMQController(RocketMQProducer rocketMQProducer) {
this.rocketMQProducer = rocketMQProducer;
}
@GetMapping("/message-with-value")
public void sendMessageWithValue() {
rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage));
}
@GetMapping("/message-with-key")
public void sendMessageWithKey() {
rocketMQProducer.sendMessageWithKey("qy", JSON.toJSONString(RocketMQMessage));
}
@GetMapping("/message-with-tag")
public void sendMessageWithTag() {
rocketMQProducer.sendMessageWithTag("qy",
JSON.toJSONString(RocketMQMessage));
}
@GetMapping("/message-with-all")
public void sendMessageWithAll() {
rocketMQProducer.sendMessageWithAll("q", "q",
JSON.toJSONString(RocketMQMessage));
}
}

@ -0,0 +1,81 @@
package org.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* <h1> Kafka </h1>
*
*
* ,
* */
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* <h2>Kafka Producer </h2>
* */
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
* <h2>Kafka Producer </h2>
* */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* <h2>Kafka Consumer </h2>
* */
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 因为配置为拉取模式, 最多拉取 50条记录
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* <h2>Kafka Consumer </h2>
* */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 并发数就是一个消费者实例起几个线程
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

@ -0,0 +1,56 @@
package org.example.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.example.vo.QMessage;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* <h1>Kafka </h1>
* */
@Slf4j
@Component
public class KafkaConsumer {
private final ObjectMapper mapper;
public KafkaConsumer(ObjectMapper mapper) {
this.mapper = mapper;
}
/**
* <h2> Kafka </h2>
* */
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka")
public void listener01(ConsumerRecord<String, String> record) throws Exception {
String key = record.key();
String value = record.value();
QMessage kafkaMessage = mapper.readValue(value, QMessage.class);
log.info("in listener01 consume kafka message: [{}], [{}]",
key, mapper.writeValueAsString(kafkaMessage));
}
/**
* <h2> Kafka </h2>
*
* */
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1")
public void listener02(ConsumerRecord<?, ?> record) throws Exception {
Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
if (_kafkaMessage.isPresent()) {
Object message = _kafkaMessage.get();
// 如果不能确定类型的时候, 下面的代码要进行包装
QMessage kafkaMessage = mapper.readValue(message.toString(),
QMessage.class);
log.info("in listener02 consume kafka message: [{}]",
mapper.writeValueAsString(kafkaMessage));
}
}
}

@ -0,0 +1,77 @@
package org.example.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
/**
* <h1>kafka </h1>
* */
@Slf4j
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* <h2> kafka </h2>
* */
public void sendMessage(String key, String value, String topic) {
if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("value or topic is null or empty");
}
ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
// 异步回调的方式获取通知
future.addCallback(
success -> {
// 元数据信息不为空
assert null != success && null != success.getRecordMetadata();
// 发送到 kafka 的 topic
String _topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = success.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]",
_topic, partition, offset);
}, failure -> {
log.error("send kafka message failure: [{}], [{}], [{}]",
key, value, topic);
}
);
// future 支持多次获取消息, 不需要重新发送消息
// 同步等待的方式获取通知
try {
// SendResult<String, String> sendResult = future.get();
SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);
// 发送到 kafka 的 topic
String _topic = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = sendResult.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]",
_topic, partition, offset);
} catch (Exception ex) {
log.error("send kafka message failure: [{}], [{}], [{}]",
key, value, topic);
}
}
}

@ -0,0 +1,29 @@
package org.example.rocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <h1> RocketMQ , </h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "q-study-rocketmq",
consumerGroup = "q-springboot-rocketmq-message-ext"
)
public class RocketMQConsumerMessageExt implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String value = new String(message.getBody());
log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
message.getKeys(), value); // 能拿到消息中的 key
log.info("MessageExt: [{}]", JSON.toJSONString(message)); // 会慢一些
}
}

@ -0,0 +1,27 @@
package org.example.rocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.example.vo.QMessage;
import org.springframework.stereotype.Component;
/**
* <h1>, RocketMQ , tag , Java Pojo</h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "imooc-study-rocketmq",
consumerGroup = "q-springboot-rocketmq-tag-object",
selectorExpression = "q" // 根据 tag 做过滤
)
public class RocketMQConsumerObject implements RocketMQListener<QMessage> {
@Override
public void onMessage(QMessage message) {
log.info("consume message in RocketMQConsumerObject: [{}]",
JSON.toJSONString(message));
// so something
}
}

@ -0,0 +1,28 @@
package org.example.rocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.example.vo.QMessage;
import org.springframework.stereotype.Component;
/**
* <h1> RocketMQ </h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "q-study-rocketmq",
consumerGroup = "q-springboot-rocketmq-string"
)
public class RocketMQConsumerString implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
QMessage rocketMessage = JSON.parseObject(message, QMessage.class);
log.info("consume message in RocketMQConsumerString: [{}]",
JSON.toJSONString(rocketMessage));
}
}

@ -0,0 +1,28 @@
package org.example.rocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.example.vo.QMessage;
import org.springframework.stereotype.Component;
/**
* <h1> RocketMQ , tag </h1>
* */
@Slf4j
@Component
@RocketMQMessageListener(
topic = "q-study-rocketmq",
consumerGroup = "q-springboot-rocketmq-tag-string",
selectorExpression = "qy" // 根据 tag 过滤, tag 中要带有 qy
)
public class RocketMQConsumerTagString implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
QMessage rocketMessage = JSON.parseObject(message, QMessage.class);
log.info("consume message in RocketMQConsumerTagString: [{}]",
JSON.toJSONString(rocketMessage));
}
}

@ -0,0 +1,100 @@
package org.example.rocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.example.vo.QMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* <h1> RocketMQ </h1>
* Spring Messaging
* */
@Slf4j
@Component
public class RocketMQProducer {
/** 类似 Kafka 中的 topic, 默认的读写队列都是4个, 默认自动创建topic */
private static final String TOPIC = "q-study-rocketmq";
/** RocketMQ 客户端 */
private final RocketMQTemplate rocketMQTemplate;
public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
/**
* <h2>使, key tag</h2>
* */
public void sendMessageWithValue(String value) {
// 随机选择一个 Topic 的 Message Queue 发送消息
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));
SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(
TOPIC, value, "QQQ"
);
log.info("sendMessageWithValue orderly result: [{}]",
JSON.toJSONString(sendResultOrderly));
}
/**
* <h2>使, key</h2>
* */
public void sendMessageWithKey(String key, String value) {
Message<String> message = MessageBuilder.withPayload(value)
// 这个Key 不是分区的效果, 只是方便进行查询, 在设置的时候, 可以使用空格进行分开, 例如: aaaa bbb
.setHeader(RocketMQHeaders.KEYS, key).build();
// 异步发送消息, 并设定回调
rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("sendMessageWithKey success result: [{}]",
JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
}
});
}
/**
* <h2>使, tag, Java Pojo</h2>
*
* */
public void sendMessageWithTag(String tag, String value) {
QMessage qMessage = JSON.parseObject(value, QMessage.class);
SendResult sendResult = rocketMQTemplate.syncSend(
String.format("%s:%s", TOPIC, tag), // 不同的消费者组使用不同的 tag
qMessage
);
log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
}
/**
* <h2>使, key tag</h2>
* */
public void sendMessageWithAll(String key, String tag, String value) {
Message<String> message = MessageBuilder.withPayload(value)
.setHeader(RocketMQHeaders.KEYS, key).build();
SendResult sendResult = rocketMQTemplate.syncSend(
String.format("%s:%s", TOPIC, tag),
message
);
log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
}
}

@ -0,0 +1,27 @@
package org.example.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* <h1> Kafka </h1>
* */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class QMessage {
/**
*
*/
private Integer id;
/**
*
*/
private String projectName;
// todo 自己进行扩展
}

@ -0,0 +1,49 @@
server:
port: 8001
servlet:
context-path: /dev-protocol-springcloud-message-study
spring:
# SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
kafka:
bootstrap-servers: 127.0.0.1:9092
jpa:
show-sql: true
hibernate:
ddl-auto: none
properties:
hibernate.show_sql: true
hibernate.format_sql: true
open-in-view: false
datasource:
# 数据源
url: jdbc:mysql://127.0.0.1:3306/dev_protocol_springcloud_project?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
# 连接池
hikari:
maximum-pool-size: 8
minimum-idle: 4
idle-timeout: 30000
connection-timeout: 30000
max-lifetime: 45000
auto-commit: true
pool-name: devProtocolSpringcloudHikariCP
# consumer:
# 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
# group-id: imooc-study-ecommerce
# auto-offset-reset: latest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
# RocketMQ 的配置, 这是最低配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 发送同一类消息的设置为同一个 group, 保证唯一
group: dev-protocol-springcloud-message-study

@ -0,0 +1,16 @@
spring:
profiles:
# prod, dev
active: dev
application:
name: dev-protocol-springcloud-message-study
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always

@ -0,0 +1,7 @@
### kafka-send-message
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?key=qqq&topic=q-springboot
Content-Type: application/json
### kafka-send-message - (测试kafka 支持无key消息, key 是用来分区的)
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?topic=q-springboot
Content-Type: application/json

@ -0,0 +1,15 @@
### message-with-value
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-value
Content-Type: application/json
### message-with-key
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-key
Content-Type: application/json
### message-with-tag
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-tag
Content-Type: application/json
### message-with-all
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/rocket-mq/message-with-all
Content-Type: application/json

@ -0,0 +1,7 @@
### wrong-rollback-for
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for
Content-Type: application/json
### wrong-inner-call
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call
Content-Type: application/json

@ -12,7 +12,7 @@ spring:
discovery:
enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
server-addr: 127.0.0.1:8848
namespace: 1bc13fd5-843b-4ac0-aa55-695c25bc0ac6
namespace: 1ccc74ae-9398-4dbe-b9d7-4f9addf9f40c
metadata:
management:
context-path: ${server.servlet.context-path}/actuator
@ -27,15 +27,15 @@ spring:
# rocketmq:
# binder:
# name-server: 127.0.0.1:9876
# fixme 开启 stream 分区支持
# 开启 stream 分区支持
instanceCount: 1 # 消费者的总数
instanceIndex: 0 # 当前消费者的索引
instanceIndex: 0 # 当前消费者的索引, 多个的时候, 另外一个要配置成 1, 依次类推
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: dev-protocol-springcloud-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# fixme 消息分区
# 消息分区 - 只对默认接收方和发送方进行配置, 下面自定义的原理相同
producer:
# fixme partitionKeyExpression 这种方式需要我们需要对 author 字段进行各种处理, 这里不使用这个方式, 使用下面自定义的配置策略
# partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性, 不存在 author 可能会报错
@ -46,9 +46,9 @@ spring:
# 默认接收方
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: dev-protocol-springcloud-stream-client-default
# fixme 这里指定了group 方便其进行 分组消费
# 这里指定了group 方便其进行 分组消费
group: dev-protocol-springcloud-stream-client-group-default
# fixme 消费者开启 分区支持
# 消费者开启 分区支持
consumer:
partitioned: true
# q 发送方 destination 对应

@ -62,6 +62,8 @@
<module>dev-protocol-springcloud/dev-protocol-springcloud-project-service-sdk</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-project-goods-service</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-hystrix</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-message-study</module>
</modules>
<properties>

Loading…
Cancel
Save