[文档修改](master): 周末学习 Spring Webflux + JavBus

更新了周末学习的成果 2022年09月04日22:41:25
master
土豆兄弟 2 years ago
parent c6d8f16080
commit df1a3ecc0d

@ -0,0 +1,12 @@
# 基于 响应式的 客户管理系统
## 设计
- 该项目中的Web服务设计
> generateCustomerTicket{
> 创建 CustomerTicket 对象
> 从远程 account-service 中获取 Account 对象
> 从远程 order-service 中获取 Order 对象
> 设置 CustomerTicket 对象属性
> 保存 CustomerTicket 对象并返回
> }

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>reative-spring-css</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>account-service</artifactId>
<version>1.0.0-REALSE</version>
<packaging>jar</packaging>
<name>Account Service</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<!-- <dependencyManagement>-->
<!-- <dependencies>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-dependencies</artifactId>-->
<!-- <version>Greenwich.SR6</version>-->
<!-- <type>pom</type>-->
<!-- <scope>import</scope>-->
<!-- </dependency>-->
<!-- </dependencies>-->
<!-- </dependencyManagement>-->
<dependencies>
<!-- <dependency>-->
<!-- <groupId>com.springcss</groupId>-->
<!-- <artifactId>message</artifactId>-->
<!-- <version>0.0.1-SNAPSHOT</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- spring-cloud-stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,14 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
//@EnableBinding(Source.class)
public class AccountApplication {
public static void main(String[] args) {
SpringApplication.run(AccountApplication.class, args);
}
}

@ -0,0 +1,52 @@
package com.baiye.conroller;
import com.baiye.domain.Account;
import com.baiye.service.AccountService;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
/**
* Account
*/
@RestController
@RequestMapping(value = "accounts")
public class AccountController {
private final AccountService accountService;
public AccountController(AccountService accountService) {
this.accountService = accountService;
}
@GetMapping(value = "/{accountId}")
public Mono<Account> getAccountById(@PathVariable("accountId") String accountId) {
//
// Account account = new Account();
// account.setId(1L);
// account.setAccountCode("DemoCode");
// account.setAccountName("DemoName");
Mono<Account> account = accountService.getAccountById(accountId);
return account;
}
@PostMapping(value = "/")
public Mono<Void> addAccount(@RequestBody Mono<Account> account) {
return accountService.addAccount(account);
}
@PutMapping(value = "/")
public Mono<Void> updateAccount(@RequestBody Mono<Account> account) {
return accountService.updateAccount(account);
}
@GetMapping(value = "accountname/{accountName}")
public Mono<Account> getAccountByAccountName(@PathVariable("accountName") String accountName) {
Mono<Account> account = accountService.getAccountByAccountName(accountName);
return account;
}
}

@ -0,0 +1,38 @@
package com.baiye.domain;
public class Account {
private String id;
private String accountCode;
private String accountName;
public Account() {
super();
}
public Account(String id, String accountCode, String accountName) {
super();
this.id = id;
this.accountCode = accountCode;
this.accountName = accountName;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAccountCode() {
return accountCode;
}
public void setAccountCode(String accountCode) {
this.accountCode = accountCode;
}
public String getAccountName() {
return accountName;
}
public void setAccountName(String accountName) {
this.accountName = accountName;
}
}

@ -0,0 +1,50 @@
package com.baiye.service;
import com.baiye.domain.Account;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface AccountService {
/**
* Id Account
* @param accountId
* @return
*/
Mono<Account> getAccountById(String accountId);
/**
* Account
*
* @param account
* @return
*/
Mono<Void> addAccount(Mono<Account> account);
/**
* Account
*
* @param account
* @return
*/
Mono<Void> updateAccount(Mono<Account> account);
/**
* Mono Name Account
* @param accountName
* @return
*/
Mono<Account> getAccountByAccountName(String accountName);
/**
* Flux Name Account
*
* @param accountName
* @return
*/
Flux<Account> getAccountsByAccountName(String accountName);
}

@ -0,0 +1,34 @@
package com.baiye.service;
import com.baiye.domain.Account;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class AccountServiceImpl implements AccountService{
@Override
public Mono<Account> getAccountById(String accountId) {
return null;
}
@Override
public Mono<Void> addAccount(Mono<Account> account) {
return null;
}
@Override
public Mono<Void> updateAccount(Mono<Account> account) {
return null;
}
@Override
public Mono<Account> getAccountByAccountName(String accountName) {
return null;
}
@Override
public Flux<Account> getAccountsByAccountName(String accountName) {
return null;
}
}

@ -0,0 +1,122 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>reative-spring-css</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>customer-service</artifactId>
<version>1.0.0-RELEASE</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-libs-snapshot</id>
<url>https://repo.spring.io/libs-snapshot</url>
</repository>
</repositories>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>com.springcss</groupId>-->
<!-- <artifactId>message</artifactId>-->
<!-- <version>0.0.1-SNAPSHOT</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- spring-cloud-stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,12 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CustomerApplication {
public static void main(String[] args) {
SpringApplication.run(CustomerApplication.class, args);
}
}

@ -0,0 +1,84 @@
package com.baiye.domain;
import org.springframework.util.Assert;
public class CustomerTicket {
private String id;
private String accountId;
private String orderNumber;
private String description;
private Date createTime;
public CustomerTicket() {
super();
}
public CustomerTicket(String accountId, String orderNumber) {
super();
Assert.notNull(accountId, "Account Id must not be null");
Assert.notNull(orderNumber, "Order Number must not be null");
this.accountId = accountId;
this.orderNumber = orderNumber;
}
public CustomerTicket(String accountId, String orderNumber, String description, Date createTime) {
this(accountId, orderNumber);
this.description = description;
this.createTime = createTime;
}
public CustomerTicket(String id, String accountId, String orderNumber, String description, Date createTime) {
this(accountId, orderNumber);
this.id = id;
this.description = description;
this.createTime = createTime;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAccountId() {
return accountId;
}
public void setAccountId(String accountId) {
this.accountId = accountId;
}
public String getOrderNumber() {
return orderNumber;
}
public void setOrderNumber(String orderNumber) {
this.orderNumber = orderNumber;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
}

@ -0,0 +1,10 @@
package com.baiye.service;
import com.baiye.domain.CustomerTicket;
import reactor.core.publisher.Mono;
public interface CustomerTicketService {
Mono<CustomerTicket> generateCustomerTicket(String accountId, String orderNumber);
}

@ -0,0 +1,37 @@
package com.baiye.service;
import com.baiye.domain.CustomerTicket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.UUID;
@Service
public class CustomerTicketServiceImpl implements CustomerTicketService{
private static final Logger logger = LoggerFactory.getLogger(CustomerTicketService.class);
/**
* 使:
*
* @param accountId
* @param orderNumber
* @return
*/
@Override
public Mono<CustomerTicket> generateCustomerTicket(String accountId, String orderNumber) {
logger.debug("Generate customer ticket record with account: {} and order: {}", accountId, orderNumber);
// 设置 customerTicket - id
// fixme 正式环境不要使用 UUID
CustomerTicket customerTicket = new CustomerTicket();
customerTicket.setId("C_" + UUID.randomUUID().toString());
// TODO: 2022/9/4 代码暂存
return null;
}
}

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>reative-spring-css</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>message</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>reative-spring-css</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>order-service</artifactId>
<version>1.0.1-RELEASE</version>
<packaging>jar</packaging>
<name>Order Service</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>com.springcss</groupId>-->
<!-- <artifactId>message</artifactId>-->
<!-- <version>0.0.1-SNAPSHOT</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,11 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}

@ -0,0 +1,30 @@
package com.baiye.controller;
import com.baiye.domain.Order;
import com.baiye.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Configuration
public class OrderHandler {
private final OrderService orderService;
public OrderHandler(OrderService orderService) {
this.orderService = orderService;
}
public Mono<ServerResponse> getOrderByOrderNumber(ServerRequest request) {
// fixme 获取参数中的 orderName 设置 - 这种最好放在 common中定义 constant
String orderNumber = request.pathVariable("orderNumber");
// 返回包装后的 Reactor 调用结果
return ServerResponse
.ok()
.body(this.orderService.getOrderByOrderNumber(orderNumber), Order.class);
}
}

@ -0,0 +1,21 @@
package com.baiye.controller;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.*;
@Configuration
public class OrderRouter {
@Bean
public RouterFunction<ServerResponse> routeOrder(OrderHandler orderHandler) {
return RouterFunctions
.route(RequestPredicates
.GET("/orders/{orderNumber}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
// 这里要进行非空判断
orderHandler::getOrderByOrderNumber
);
}
}

@ -0,0 +1,49 @@
package com.baiye.domain;
public class Order {
private String id;
private String orderNumber;
private String deliveryAddress;
private String goods;
public Order() {
super();
}
public Order(String id, String orderNumber, String deliveryAddress, String goods) {
super();
this.id = id;
this.orderNumber = orderNumber;
this.deliveryAddress = deliveryAddress;
this.goods = goods;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOrderNumber() {
return orderNumber;
}
public void setOrderNumber(String orderNumber) {
this.orderNumber = orderNumber;
}
public String getDeliveryAddress() {
return deliveryAddress;
}
public void setDeliveryAddress(String deliveryAddress) {
this.deliveryAddress = deliveryAddress;
}
public String getGoods() {
return goods;
}
public void setGoods(String goods) {
this.goods = goods;
}
}

@ -0,0 +1,9 @@
package com.baiye.service;
import com.baiye.domain.Order;
import reactor.core.publisher.Mono;
public interface OrderService {
Mono<Order> getOrderByOrderNumber(String orderNumber);
}

@ -0,0 +1,13 @@
package com.baiye.service;
import com.baiye.domain.Order;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class OrderServiceImpl implements OrderService{
@Override
public Mono<Order> getOrderByOrderNumber(String orderNumber) {
return null;
}
}

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>reative-spring-css</artifactId>
<packaging>pom</packaging>
<modules>
<module>account-service</module>
<module>customer-service</module>
<module>order-service</module>
<module>message</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jav-addr</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,39 @@
package com.baiye;
import cn.hutool.core.util.StrUtil;
import java.util.List;
/**
*
*/
public class Jav {
public static void main(String[] args) {
String pre = "javbus|busjav|busfan|fanbus|buscdn|cdnbus|dmmsee|seedmm|busdmm|dmmbus|javsee|seejav|avsox";
String end = "jav|bus|dmm|see|cdn|fan";
List<String> split = StrUtil.split(pre, "|");
List<String> split1 = StrUtil.split(end, "|");
String dian = ".";
for (String preIndex : split) {
String www = "https://www";
for (String endIndex : split1) {
StringBuilder stringBuilder = new StringBuilder()
.append(www)
.append(dian)
.append(preIndex)
.append(dian)
.append(endIndex);
System.out.println(stringBuilder);
}
}
}
}

@ -45,6 +45,9 @@
<module>longpolling/demo/demo3/dev-protocol-netty-common</module> <module>longpolling/demo/demo3/dev-protocol-netty-common</module>
<module>code-language/java/java-demo</module> <module>code-language/java/java-demo</module>
<module>bigdata/spark/best-spark</module> <module>bigdata/spark/best-spark</module>
<module>fuli/jav-addr</module>
<module>spring/spring-webflux/spring-webflux-demo</module>
<module>best-practice/css-webflux/reative-spring-css</module>
</modules> </modules>
<properties> <properties>

@ -0,0 +1,113 @@
<h1><div style="text-align: center; color: #fa4861">Spring 响应式编程</div></h1>
# 0. 目录
# 1. 基本概念
- 数据流和响应式
- 数据流就是说明全链路都是以事件的方式进行驱动的
- 响应式编程核心特点: 不采用传统的同步调用方式处理数据, 而是由处于数据库上游的各层组件自动执行事件
- 响应式编程: 基于事件的发布订阅机制, 使用推的方式
- 优势: 生成事件和消费事件的过程是异步执行的,所以线程的生命周期很短,资源之间的竞争关系较少, 服务器的响应能力也就越高
- 响应式宣言和响应式系统
- 即时响应性Responsive、回弹性Resilient、弹性Elastic以及消息驱动Message Driven构成了响应式宣言的主体内容。
- ![响应式宣言](pic/响应式宣言.png)
- 具备上图中各个特性的系统,就可以称为响应式系统
# 2. 背压机制
- 流概念
- 由生产者生产并由多个消费者消费的元素序列(生产者消费者模型 | 发布者订阅者模型)
- 流的处理模式
- "拉模式": 消费者主动从生产者拉取元素
- "推模式": 生产者将元素推送给消费者[ 资源利用率更好]
- 流量控制
- 生产者生产数据的速率小于消费者
- 消费者数据没有压力, 也就不需要进行流量控制
- 生产者生产数据大于消费者消费数据 - [常见]
- 消费者可能因为无法处理过多的数据而发生崩溃
- 常见地解决方案是 **在生产者和消费者之间加队列的形式**
- 纯推模式下的数据流量会有很多不可控的因素, 需要在推模式和拉模式之间考虑一定的平衡性从而优雅的实现流量的控制
- 背压机制: 下游能够向上游反馈流量请求的机制
- 如果消费者消费数据的速度赶不上生产者生产数据的速度时, 他将会持续消耗系统的资源
- 响应式流的核心接口
```java
// Publisher<T>
public interface Publisher<T>{
public void subscribe(Subscriber<? super T> s);
}
// Subscriber<T>
public void onSubscribe(Subscriber s) // 回调方法
public void onNext(T t) {} // 向订阅者发送数据
public void onError(Throwable e) {} // 触发异常时候
public void onCompleted() {} // 数据流发送结束
// Subscription 对象是确保生产者和消费者针对数据处理速度达成的一种动态平衡的基础, 也是流量控制中实现背压机制的关键所在
public interface Subscription {
void request(long var1);
void cancel();
}
```
- 业界主流的响应式开发库包括: **RxJava, Akka, Vert.x 以及 Project Reactor**
# 3. 响应式编程应用场景
- 数据流处理是响应式编程的一大应用场景, 流式系统的主要特点是低延迟和高吞吐
- 网关的使用
- 网关的作用就是用来响应前端系统的流量并将其转发到后端服务
- Netflix Hystrix SpringCLoud Gateway 以及 SpringWebFlux
# 4. 基于 Spring 框架学习响应式编程
- 响应式编程并不是只针对系统中的某一个部分组件, 而是需要适用于调用链路上的所有组件
- 只要有一个环节不是响应式的, 那么这个环节就会出现同步阻塞
- Spring5 提供了 WebFlux + SpringData Reactive
- WebFlux不仅包含了对创建和访问响应式HTTP端点的支持, 还可以用来实现服务器推送事件以及 WebSocket
- Spring WebFlux 需要支持异步的运行环境
- 比如 NettyUndertow 以及 Servlet 3.1 版本以上的 Tomcat 和 Jetty
- 非常适合开发 I/O 密集型服务
- 不要 WebFlux和 SpringMVC混合使用, 无法保证全栈式的响应式流
- 案例: CSS: (Customer Service System)客户服务系统
- ![ReactiveSpringCSS架构](pic/ReactiveSpringCSS架构.png)
- 开发重点
- Web层: 构建 RESTFUL 端点, 并通过响应式请求的WebClient客户端组件来消费这些端点
- Service层: 核心逻辑在于完成事件处理和消息通信相关的业务场景 account-service 消息的发布者, customer-service 则是消息消费者
- Repository层: 引入 MongoDB和Redis两款支持响应式流的 NoSQL 数据库, MongoDB为各个服务存储业务数据, Redis主要用于在 customer-service 中
- ![ReactiveSpringCSS技术组件图](pic/ReactiveSpringCSS技术组件图.png)
# 5. 了解 Reactor
- RxJava诞生的更早, 但是 Reactor 更有前途
- 使用 弹珠图来说明 响应式编程的策略(Marble Diagram)
# 6. 使用 Flux 和 Mono 构建响应式数据流
- Flux 创建
- 基于各种工厂模式的静态创建方法
- just() range() interval() 以及各种以 from- 为前缀的方法组等
- 采用编程方式动态创建Flux
-
- 注解方式进行编码
- 基于Java注解的方式进行编码, 编程模型和 Spring MVC 一致
- 基于函数式编程模型

Binary file not shown.

After

Width:  |  Height:  |  Size: 324 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 229 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dev-protocol</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-webflux-demo</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- &lt;!&ndash; spring data r2dbc &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-data-r2dbc</artifactId>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; r2dbc 连接池 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>io.r2dbc</groupId>-->
<!-- <artifactId>r2dbc-pool</artifactId>-->
<!-- </dependency>-->
<!-- &lt;!&ndash;r2dbc mysql库 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>dev.miku</groupId>-->
<!-- <artifactId>r2dbc-mysql</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-data-jdbc</artifactId>-->
<!-- </dependency>-->
<!-- WebFlux 核心组件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter</artifactId>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,16 @@
package com.baiye;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
*
* @author q
*/
@SpringBootApplication
public class HelloWebFluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebFluxApplication.class, args);
}
}

@ -0,0 +1,14 @@
package com.baiye.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class HelloWebFluxController {
@GetMapping("/")
public Mono<String> hello() {
return Mono.just("Hello World for WebFlux !");
}
}
Loading…
Cancel
Save