diff --git a/best-practice/css-webflux/reative-spring-css/README.md b/best-practice/css-webflux/reative-spring-css/README.md new file mode 100644 index 0000000..47b053c --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/README.md @@ -0,0 +1,12 @@ +# 基于 响应式的 客户管理系统 + + +## 设计 +- 该项目中的Web服务设计 +> generateCustomerTicket{ +> 创建 CustomerTicket 对象 +> 从远程 account-service 中获取 Account 对象 +> 从远程 order-service 中获取 Order 对象 +> 设置 CustomerTicket 对象属性 +> 保存 CustomerTicket 对象并返回 +> } \ No newline at end of file diff --git a/best-practice/css-webflux/reative-spring-css/account-service/pom.xml b/best-practice/css-webflux/reative-spring-css/account-service/pom.xml new file mode 100644 index 0000000..fea2ccd --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/account-service/pom.xml @@ -0,0 +1,93 @@ + + + + reative-spring-css + org.example + 1.0-SNAPSHOT + + 4.0.0 + account-service + 1.0.0-REALSE + jar + + Account Service + + + 8 + 8 + + + + + + + + + + + + + + + + + + + + + + + org.springframework.boot + spring-boot-starter-data-mongodb-reactive + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.cloud + spring-cloud-stream-reactive + 2.2.1.RELEASE + + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/AccountApplication.java b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/AccountApplication.java new file mode 100644 index 0000000..80d36a3 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/AccountApplication.java @@ -0,0 +1,14 @@ +package com.baiye; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; + +@SpringBootApplication +//@EnableBinding(Source.class) +public class AccountApplication { + + public static void main(String[] args) { + SpringApplication.run(AccountApplication.class, args); + } +} diff --git a/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/conroller/AccountController.java b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/conroller/AccountController.java new file mode 100644 index 0000000..abedaaf --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/conroller/AccountController.java @@ -0,0 +1,52 @@ +package com.baiye.conroller; + +import com.baiye.domain.Account; +import com.baiye.service.AccountService; +import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Mono; + +/** + * Account 对应的响应式端点暴露 + */ +@RestController +@RequestMapping(value = "accounts") +public class AccountController { + + + private final AccountService accountService; + + public AccountController(AccountService accountService) { + this.accountService = accountService; + } + + @GetMapping(value = "/{accountId}") + public Mono getAccountById(@PathVariable("accountId") String accountId) { +// +// Account account = new Account(); +// account.setId(1L); +// account.setAccountCode("DemoCode"); +// account.setAccountName("DemoName"); + + Mono account = accountService.getAccountById(accountId); + return account; + } + + @PostMapping(value = "/") + public Mono addAccount(@RequestBody Mono account) { + + return accountService.addAccount(account); + } + + @PutMapping(value = "/") + public Mono updateAccount(@RequestBody Mono account) { + + return accountService.updateAccount(account); + } + + @GetMapping(value = "accountname/{accountName}") + public Mono getAccountByAccountName(@PathVariable("accountName") String accountName) { + + Mono account = accountService.getAccountByAccountName(accountName); + return account; + } +} diff --git a/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/domain/Account.java b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/domain/Account.java new file mode 100644 index 0000000..9dd7939 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/domain/Account.java @@ -0,0 +1,38 @@ +package com.baiye.domain; + +public class Account { + + private String id; + private String accountCode; + private String accountName; + + public Account() { + super(); + } + + public Account(String id, String accountCode, String accountName) { + super(); + this.id = id; + this.accountCode = accountCode; + this.accountName = accountName; + } + + public String getId() { + return id; + } + public void setId(String id) { + this.id = id; + } + public String getAccountCode() { + return accountCode; + } + public void setAccountCode(String accountCode) { + this.accountCode = accountCode; + } + public String getAccountName() { + return accountName; + } + public void setAccountName(String accountName) { + this.accountName = accountName; + } +} diff --git a/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/service/AccountService.java b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/service/AccountService.java new file mode 100644 index 0000000..a1a5dae --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/service/AccountService.java @@ -0,0 +1,50 @@ +package com.baiye.service; + +import com.baiye.domain.Account; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface AccountService { + + /** + * 通过 Id 获取 Account + * @param accountId + * @return + */ + Mono getAccountById(String accountId); + + + /** + * Account 增 + * + * @param account + * @return + */ + Mono addAccount(Mono account); + + + /** + * Account 改 + * + * @param account + * @return + */ + Mono updateAccount(Mono account); + + + /** + * Mono 通过 Name 获取 Account + * @param accountName + * @return + */ + Mono getAccountByAccountName(String accountName); + + + /** + * Flux 通过 Name 获取 Account + * + * @param accountName + * @return + */ + Flux getAccountsByAccountName(String accountName); +} diff --git a/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/service/AccountServiceImpl.java b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/service/AccountServiceImpl.java new file mode 100644 index 0000000..2489c82 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/account-service/src/main/java/com/baiye/service/AccountServiceImpl.java @@ -0,0 +1,34 @@ +package com.baiye.service; + +import com.baiye.domain.Account; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +public class AccountServiceImpl implements AccountService{ + @Override + public Mono getAccountById(String accountId) { + return null; + } + + @Override + public Mono addAccount(Mono account) { + return null; + } + + @Override + public Mono updateAccount(Mono account) { + return null; + } + + @Override + public Mono getAccountByAccountName(String accountName) { + return null; + } + + @Override + public Flux getAccountsByAccountName(String accountName) { + return null; + } +} diff --git a/best-practice/css-webflux/reative-spring-css/customer-service/pom.xml b/best-practice/css-webflux/reative-spring-css/customer-service/pom.xml new file mode 100644 index 0000000..5328d77 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/customer-service/pom.xml @@ -0,0 +1,122 @@ + + + + reative-spring-css + org.example + 1.0-SNAPSHOT + + 4.0.0 + customer-service + 1.0.0-RELEASE + jar + + + 8 + 8 + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + spring-libs-snapshot + https://repo.spring.io/libs-snapshot + + + + + + + + + + + + + org.springframework.boot + spring-boot-starter-data-mongodb-reactive + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + org.springframework.cloud + spring-cloud-function-context + + + org.springframework.cloud + spring-cloud-stream-reactive + 2.2.1.RELEASE + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.boot + spring-boot-starter-data-redis-reactive + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + org.springframework.data + spring-data-redis + + + redis.clients + jedis + + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/CustomerApplication.java b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/CustomerApplication.java new file mode 100644 index 0000000..4c89b57 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/CustomerApplication.java @@ -0,0 +1,12 @@ +package com.baiye; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class CustomerApplication { + + public static void main(String[] args) { + SpringApplication.run(CustomerApplication.class, args); + } +} diff --git a/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/domain/CustomerTicket.java b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/domain/CustomerTicket.java new file mode 100644 index 0000000..518588d --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/domain/CustomerTicket.java @@ -0,0 +1,84 @@ +package com.baiye.domain; + +import org.springframework.util.Assert; + +public class CustomerTicket { + + private String id; + private String accountId; + private String orderNumber; + private String description; + private Date createTime; + + + public CustomerTicket() { + super(); + } + + public CustomerTicket(String accountId, String orderNumber) { + super(); + + Assert.notNull(accountId, "Account Id must not be null"); + Assert.notNull(orderNumber, "Order Number must not be null"); + + this.accountId = accountId; + this.orderNumber = orderNumber; + } + + public CustomerTicket(String accountId, String orderNumber, String description, Date createTime) { + + this(accountId, orderNumber); + + this.description = description; + this.createTime = createTime; + } + + public CustomerTicket(String id, String accountId, String orderNumber, String description, Date createTime) { + + this(accountId, orderNumber); + + this.id = id; + this.description = description; + this.createTime = createTime; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getAccountId() { + return accountId; + } + + public void setAccountId(String accountId) { + this.accountId = accountId; + } + + public String getOrderNumber() { + return orderNumber; + } + + public void setOrderNumber(String orderNumber) { + this.orderNumber = orderNumber; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } +} diff --git a/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/service/CustomerTicketService.java b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/service/CustomerTicketService.java new file mode 100644 index 0000000..679bf71 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/service/CustomerTicketService.java @@ -0,0 +1,10 @@ +package com.baiye.service; + +import com.baiye.domain.CustomerTicket; +import reactor.core.publisher.Mono; + +public interface CustomerTicketService { + + + Mono generateCustomerTicket(String accountId, String orderNumber); +} diff --git a/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/service/CustomerTicketServiceImpl.java b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/service/CustomerTicketServiceImpl.java new file mode 100644 index 0000000..2e9047f --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/customer-service/src/main/java/com/baiye/service/CustomerTicketServiceImpl.java @@ -0,0 +1,37 @@ +package com.baiye.service; + +import com.baiye.domain.CustomerTicket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import java.util.UUID; + +@Service +public class CustomerTicketServiceImpl implements CustomerTicketService{ + + private static final Logger logger = LoggerFactory.getLogger(CustomerTicketService.class); + + /** + * 典型使用案例: 从其他的两个服务拿到相应的对象进行聚合转化保存 + * + * @param accountId + * @param orderNumber + * @return + */ + @Override + public Mono generateCustomerTicket(String accountId, String orderNumber) { + logger.debug("Generate customer ticket record with account: {} and order: {}", accountId, orderNumber); + + // 设置 customerTicket - id + // fixme 正式环境不要使用 UUID + CustomerTicket customerTicket = new CustomerTicket(); + customerTicket.setId("C_" + UUID.randomUUID().toString()); + + // TODO: 2022/9/4 代码暂存 + + + return null; + } +} diff --git a/best-practice/css-webflux/reative-spring-css/message/pom.xml b/best-practice/css-webflux/reative-spring-css/message/pom.xml new file mode 100644 index 0000000..28e4da0 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/message/pom.xml @@ -0,0 +1,19 @@ + + + + reative-spring-css + org.example + 1.0-SNAPSHOT + + 4.0.0 + + message + + + 8 + 8 + + + \ No newline at end of file diff --git a/best-practice/css-webflux/reative-spring-css/order-service/pom.xml b/best-practice/css-webflux/reative-spring-css/order-service/pom.xml new file mode 100644 index 0000000..998ed9d --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/pom.xml @@ -0,0 +1,64 @@ + + + + reative-spring-css + org.example + 1.0-SNAPSHOT + + 4.0.0 + order-service + 1.0.1-RELEASE + jar + + Order Service + + + 8 + 8 + + + + + + + + + + + org.springframework.boot + spring-boot-starter-data-mongodb-reactive + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-starter-actuator + + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/OrderApplication.java b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/OrderApplication.java new file mode 100644 index 0000000..7909d70 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/OrderApplication.java @@ -0,0 +1,11 @@ +package com.baiye; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class OrderApplication { + public static void main(String[] args) { + SpringApplication.run(OrderApplication.class, args); + } +} diff --git a/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/controller/OrderHandler.java b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/controller/OrderHandler.java new file mode 100644 index 0000000..9af52c0 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/controller/OrderHandler.java @@ -0,0 +1,30 @@ +package com.baiye.controller; + +import com.baiye.domain.Order; +import com.baiye.service.OrderService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Configuration +public class OrderHandler { + + private final OrderService orderService; + + public OrderHandler(OrderService orderService) { + this.orderService = orderService; + } + + public Mono getOrderByOrderNumber(ServerRequest request) { + // fixme 获取参数中的 orderName 设置 - 这种最好放在 common中定义 constant + String orderNumber = request.pathVariable("orderNumber"); + + // 返回包装后的 Reactor 调用结果 + return ServerResponse + .ok() + .body(this.orderService.getOrderByOrderNumber(orderNumber), Order.class); + } + +} diff --git a/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/controller/OrderRouter.java b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/controller/OrderRouter.java new file mode 100644 index 0000000..c5e733f --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/controller/OrderRouter.java @@ -0,0 +1,21 @@ +package com.baiye.controller; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.server.*; + +@Configuration +public class OrderRouter { + + @Bean + public RouterFunction routeOrder(OrderHandler orderHandler) { + return RouterFunctions + .route(RequestPredicates + .GET("/orders/{orderNumber}") + .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), + // 这里要进行非空判断 + orderHandler::getOrderByOrderNumber + ); + } +} diff --git a/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/domain/Order.java b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/domain/Order.java new file mode 100644 index 0000000..aa7d735 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/domain/Order.java @@ -0,0 +1,49 @@ +package com.baiye.domain; + +public class Order { + + private String id; + private String orderNumber; + private String deliveryAddress; + private String goods; + + public Order() { + super(); + } + + public Order(String id, String orderNumber, String deliveryAddress, String goods) { + super(); + this.id = id; + this.orderNumber = orderNumber; + this.deliveryAddress = deliveryAddress; + this.goods = goods; + } + + public String getId() { + return id; + } + public void setId(String id) { + this.id = id; + } + public String getOrderNumber() { + return orderNumber; + } + public void setOrderNumber(String orderNumber) { + this.orderNumber = orderNumber; + } + public String getDeliveryAddress() { + return deliveryAddress; + } + public void setDeliveryAddress(String deliveryAddress) { + this.deliveryAddress = deliveryAddress; + } + + public String getGoods() { + return goods; + } + + public void setGoods(String goods) { + this.goods = goods; + } + +} diff --git a/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/service/OrderService.java b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/service/OrderService.java new file mode 100644 index 0000000..6f82aa8 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/service/OrderService.java @@ -0,0 +1,9 @@ +package com.baiye.service; + +import com.baiye.domain.Order; +import reactor.core.publisher.Mono; + +public interface OrderService { + + Mono getOrderByOrderNumber(String orderNumber); +} diff --git a/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/service/OrderServiceImpl.java b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/service/OrderServiceImpl.java new file mode 100644 index 0000000..f15e071 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/order-service/src/main/java/com/baiye/service/OrderServiceImpl.java @@ -0,0 +1,13 @@ +package com.baiye.service; + +import com.baiye.domain.Order; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +public class OrderServiceImpl implements OrderService{ + @Override + public Mono getOrderByOrderNumber(String orderNumber) { + return null; + } +} diff --git a/best-practice/css-webflux/reative-spring-css/pom.xml b/best-practice/css-webflux/reative-spring-css/pom.xml new file mode 100644 index 0000000..82df350 --- /dev/null +++ b/best-practice/css-webflux/reative-spring-css/pom.xml @@ -0,0 +1,27 @@ + + + + dev-protocol + org.example + 1.0-SNAPSHOT + ../../../pom.xml + + 4.0.0 + reative-spring-css + pom + + + account-service + customer-service + order-service + message + + + + 8 + 8 + + + \ No newline at end of file diff --git a/bigdata/kafka/README.md b/bigdata/kafka/README.md index 4cb0a32..5d30f99 100644 --- a/bigdata/kafka/README.md +++ b/bigdata/kafka/README.md @@ -1,6 +1,18 @@ # Kafka +## 0. 目录 + +- Kafka Kafka基本概念及配置 + + Kafka 客户端实践及原理剖析 + + Kafka 内核及源码 + + Kafka 管理和监控 + + Kafka 流处理 + ## 1. Kafka基本概念及配置 @@ -54,6 +66,141 @@ log.dirs=你的kafka的日志目录 zookeeper.connect=[你的ip地址]:2181 ``` +### 1.4 Kafka 的线上方案 +- 操作系统 + - 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的 快速数据传输特性。 + - Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证, 千万不要应用于生产环境。 +- 磁盘 + - **普通的机械磁盘** 即可, Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘 最大的劣势,即随机读写操作慢。 + - 追求性价比的公司可以**不搭建 RAID**,使用**普通磁盘组成存储空间即可**。 [Kafka 1.1开始正式支持JBOD了。再说Kafka本身在软件层面也提供了冗余的机制来对 抗磁盘损坏。] +- 磁盘容量 + - 在规划磁盘容量时你需要考虑下面这几个元素: + - 新增消息数 + - 消息留存时间 + - 平均消息大小 + - 备份数 + - 是否启用压缩 + - 例子: + - 假设你所在公司有个业务每天需要向 Kafka 集群发送 **1 亿条消息**,每条消息**保存两份**以防止数据丢失,另外消息默认**保存两周** 时间。现在假设消息的**平均大小是 1KB**? + - 每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空 间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息 数据还有其他类型的数据, + - 比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空 间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB * 14, 大约 3TB 左右。Kafka 支持数据的压缩, + 假设压缩比是 0.75,那么最后你需要规划的存储 空间就是 0.75 * 3 = 2.25TB。 +- 带宽 + - 对于 Kafka 这种通过网络大量进行数据传输的框架而言,**带宽特别容易成为瓶颈**。事实 上,在我接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。 + 如果你的环境中还涉及跨机房传输,那么情况可能就更糟了。 + - 带宽也主要有 两种:1Gbps 的千兆网络和 10Gbps 的万兆网络,特别是千兆网络应该是一般公司网络的标准配置了。 + - 与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。 + - 假设你公司 的机房环境是千兆网络,即 1Gbps,现在你有个业务,其业务目标或 SLA 是在 1 小时内处 理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢? + - 让我们来计算一下,由于带宽是 1Gbps,即每秒处理 1Gb 的数据,假设每台 Kafka 服务 器都是安装在专属的机器上,也就是说每台 Kafka 机器上没有混布其他服务,毕竟真实环境中不建议这么做。 + 通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其 他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比 较合理的值, + 也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源, 故通常要再额外预留出 2/3 的资源, + 即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需 要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少 此值。 好了,有了 240Mbps, + 我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根 据这个目标,我们**每秒需要处理 2336Mb [1024*1024/3600*8] 的数据,除以 240,约等于 10 台服务器**。 + 如果 消**息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台**。 +- Kafka 性能测试脚本 + - kafka-producer-perf-test 脚本还不错,kafka-consumer-perf-test有点难用 +- 混部 + - 一个机器上 部署一个zookeeper和一个kafka, 如果配置好**ZooKeeper事务日志**(比如设置好autopurge.purgeInterval及定期删除 snapshot等),它对IO的需求不大,混布也是可以的。 + +--- +- 最最最重要的集群参数配置 + - Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题(Topic)级别的参数、JVM 端参数和操作系统级别的参数 +- Broker 端参数 + - 首先 Broker 是需要配置存储信息的,即 Broker 使用哪些 磁盘。那么针对存储信息的重要参数有以下这么几个: + - **log.dirs**: 这是非常重要的参数,指定了 Broker 需要 使用的若干个文件目录路径。要知道这个参数是没有默认 值的,这说明什么?这说明它**必须由你亲自指定**。 + - log.dir: 注意这是 dir,结尾没有 s,说明它只能表示 单个路径,它是**补充上一个参数用的**。 + - 这两个参数应该怎么设置呢?很简单,你只要设置 log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置 多个路径, + 具体格式是一个 CSV 格式,也就是用逗号分隔 的多个路径,比 如/home/kafka1,/home/kafka2,/home/kafka3这 样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。 + - 这样做有两个好处: + - 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。 + - 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新 引入的强大功能。要知道在以前,只要 Kafka Broker 使 用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。 但是自 1.1 开始,这种情况被修正了, + 坏掉的磁盘上的数 据会自动地转移到其他正常的磁盘上,而且 Broker 还能 正常工作。这个改进正是我们舍弃 RAID 方案的基 础:没有这种 Failover 的话,我们只能依靠 RAID 来提 供保障。 + - 与 ZooKeeper 相关的设置 + - 首先 ZooKeeper 是 做什么的呢?它是一个分布式协调框架,负责协调管理并保 存 Kafka 集群的所有元数据信息,比如集群都有哪些Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少 分区以及这些分区的 Leader 副本都在哪些机器上等信息。 + - Kafka 与 ZooKeeper 相关的最重要的参数当属 **zookeeper.connect** + - 这也是一个 CSV 格式的参数,比 如我可以指定它的值为 **zk1:2181,zk2:2181,zk3:2181**。2181 是 ZooKeeper 的默认端口。 + - 如果我让**多个 Kafka 集群使用同一套 ZooKeeper 集群**,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概 念,类似于别名。 + - 如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以 这样指定:**zk1:2181,zk2:2181,zk3:2181/kafka1和 zk1:2181,zk2:2181,zk3:2181/kafka2**。 + - **切记 chroot 只需要写一次,而且是加到最后的**。我经常碰到有人这样指 定: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。 + - 第三组参数是与 Broker 连接相关的,即客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参 数: + - **listeners**: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。 + - advertised.listeners: 和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的, 就是说**这组监听器是 Broker 用于对外发布的**。 + - [内网环境访问Kafka不需要配置这个参数,常见的玩法是:你的Kafka Broker机器上配置了双网卡,一 块网卡用于内网访问(即我们常说的内网IP);另一个块用 于外网访问。那么你可以配置listeners为内网IP, advertised.listeners为外网IP。] + - ~~host.name/port~~: 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。 + - 监听器的概念,从构成上来说,它是若干个逗 号分隔的三元组,**每个三元组的格式为<协议名称,主机 名,端口号>**。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密 传输等; + 也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。 + - 一旦你自己定义了协议名称,你必须还要指定 listener.security.protocol.map参数告诉这个协议,底层使用了哪种安全协议,比如指定 listener.security.protocol.map=CONTROLLER:P LAINTEXT表示CONTROLLER这个自定义协议底层使用明文 不加密传输数据。 + - 经常有人会问主机 名这个设置中我到底使用 IP 地址还是主机名。这里我给出 统一的建议:**最好全部使用主机名**,即 **Broker 端和 Client 端应用配置中全部填写主机名**。 Broker 源代码中也使用的 是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。 + - 第四组参数是关于 Topic 管理的。我来讲讲下面这三个参数: + - auto.create.topics.enable:是否允许自动创建 Topic。 + - [建议最好设置成 **false**,即不允许自动创建 Topic。每个部门被分配的 Topic 应该由运维严格把控,决不能允许自行创建任何 Topic。] + - unclean.leader.election.enable:是否允许 Unclean Leader 选举。 + - [每个分区都有多个副本来提供高可用。 在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。建议你还是显式地把它设置成 **false** 吧] + - auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。 + - [对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满 足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于, + 它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若 true,则有可能 一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这 个参数设置成 **false**。] + - 最后一组参数是数据留存方面的,即: + - log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。 + - [虽然 ms 设置有最高的优先级,但是 通常情况下我们还是设置 hour 级别的多一些,比如 log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使 用,那么这个值就要相应地调大。] + - log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。 + - [这个值默认是 -1, 表明你想在这台 Broker 上保存多少数据都可以,至少在容 量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参 数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集 群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间] + - message.max.bytes:控制 Broker 能够接收的最大消息大小。 + - [不能使用默认值的参数,默认的 1000012 太少了,还不到 1MB。实际场景中突 破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一 个比较大的值还是比较保险的做法。毕竟它只是一个标尺而 已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置 大一点也不会耗费什么磁盘空间的。] + - 补充参数 + - gg.handler.kafkahandler.Mode = tx gg.handler.kafkahandler.Mode = op 这两个的差别。遇到时 dml 数据会丢失的情况。用的是 op 。 + - 当设置成op单个数据库表的变更(插入、更新、删除) 会被当成一条Kafka消息发送;当设置成tx时,数据库事务 所做的所有变更统一被封装进一条Kafka消息,并在事务提 交后被发送。后者有事务性的保障,至少有原子性方面的保证,不会丢失部分 CDC 数据。 + +- Topic 级别参数 + - Topic 级别参数会**覆盖全局 Broker 参数的值**,而每个 Topic 都能设置自己的参数值, 这就是所谓的 Topic 级别参数。 + - **retention.ms**: 规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。 + - **retention.bytes**:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以 无限使用磁盘空间。 + - Topic 级别 参数的设置就是这种情况,我们有两种方式可以设置: + - 创建 Topic 时进行设置 + - 修改 Topic 时设置 + - 如何在创建 Topic 时设置这些参数 + - 你的部门需要将交易数据发送到 Kafka 进行处理,需要保 存最近半年的交易数据,同时这些数据很大,通常都有几 MB,但一般不会超过 5MB。现在让我们用以下命令来创建 Topic: + - bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880 + - 请注意结尾处的--config设置,我们就是在 config 后面指定了想要设置的 Topic 级别参数。 + - 使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下: + - bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760 + - [你最好始终坚持使用第二种方式来设置,并且在未来,Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。] + +- JVM 参数 + - Kafka 服务器端代码是用 Scala 语言编写的,但终归还是编译成 Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。 + - 有条件的话至少使用 Java 8 吧, [Kafka 2.0已经不支持Java 7了,2.1版本开始初步支持Java 11,但不建议生产环境用11,所以还是使用Java 8吧。] + - 堆大小这个参数至关重要。无脑给出一个通用的建议:将你的 JVM 堆大小设置成 **6GB** 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka,说实话默认的 1GB 有点小, + 毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,Heap Size 不能太小。 + - [没有通用的标准,只有一个最佳实践值:6GB。最好还是监控一下实时的堆大小,特别是**GC之后的live data大小,通常将heapsize设置成其1.5~2倍就足以了**] + - JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器: + - 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。 + - 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。 + - 当然了,如果你已经在使用 Java 8 了[G1是jdk9中默认的,jdk8还是需要显式指定的]。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。 + - Java8默认的新生代垃圾回收器是:UseParallelGC,可以用-XX:+PrintCommandLineFlags -version查看,如果显示指定 -XX:+UseCurrentMarkSweepGC 的话,会默认开启 -XX:+UseParallelGC + - 确定好了要设置的 JVM 参数,我们该如何为 Kafka 进行设置呢? + - 只需要设置下面这两个环境变量即可: + - KAFKA_HEAP_OPTS:指定堆大小。 + - KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。 + - 比如你可以这样启动 Kafka Broker,即在启动 Kafka Broker 之前,先设置上这两个环境变量: + - $> **export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g** + - $> **export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true** + - $> **bin/kafka-server-start.sh config/server.properties** + +- 操作系统参数 + - Kafka 并不需要设置太多的 OS 参数,但有些因素最好还是关注一下,比如下面这几个: + - 文件描述符限制 + - 文件系统类型 + - Swappiness + - 提交时间 + - 首先是 **ulimit -n** + - 觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 **ulimit -n 1000000**。 + - 但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。 + - 其次是文件系统类型的选择。 + - 这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,**XFS** 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 **ZFS** 的数据报告,貌似性能更加强劲,有条件的话不妨一试。 + - 第三是 swap 的调优 + - 网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件, + 它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑, + 我**个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1**。 + ### 1.5 kafka的基本操作 ```shell @@ -104,9 +251,343 @@ kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --grou ``` +## 2. 客户端实践及原理剖析 + +### 2.1 生产者消息分区机制原理剖析 +- 为什么分区? + - Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息 + - 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。 + - 为什么使用分区的概念而不是直接使用多个主题呢? + - 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了**实现系统的高伸缩性(Scalability)** + - 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能**独立地执行各自分区的读写请求处理**。 + - 并且,我们还可以通过**添加新的节点机器来增加整体系统的吞吐量**。 + - 实际上分区的概念以及分区数据库早在 1980 年就已经有大牛们在做了,比如那时候有个叫 Teradata 的数据库就引入了**分区的概念**。 + - Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode + - 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如**实现业务级别的消息顺序的问题** + +- 都有哪些分区策略? + - 所谓分区策略是决定生产者将消息发送到哪个分区的算法。 + - 如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class + - 在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。 + - 这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法。 + - 方法签名:int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); + - 这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等) + - Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。 + - 只要你自己的**实现类定义好了 partition 方法,同时设置partitioner.class参数为你自己实现类的 Full Qualified Name**,那么生产者程序就会按照你的代码逻辑对消息进行分区。 + - 比较常见的分区策略 + - **轮询策略** + - 也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。 + - ![轮询策略](pic/轮询策略.png) + - 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API **默认提供的分区策略**。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。 + - [轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。] + - **随机策略** + - 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。 + - ![随机策略](pic/随机策略.png) + - 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可: + - List partitions = cluster.partitionsForTopic(topic); + - return ThreadLocalRandom.current().nextInt(partitions.size()); + - 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。 + - 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以**如果追求数据的均匀分布,还是使用轮询策略比较好**。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。 + - **按消息键保序策略** + - 也称 Key-ordering 策略。这个名词是我自己编的,Kafka 官网上并无这样的提法。 + - Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。 + 特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面, + **由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略**,如下图所示。 + - ![按消息键保序策略](pic/按消息键保序策略.png) + - 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可: + - List partitions = cluster.partitionsForTopic(topic); + - return Math.abs(key.hashCode()) % partitions.size(); + - 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:**如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。** + - 如何实现消息的顺序问题? + - 企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。 + - 当时企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。 + 这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。 + - 后来经过了解和调研,我发现这种具有因果关系的消息都有一定的特点,比如在**消息体中都封装了固定的标志位**,改进建议他们**对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区**, + 这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。 + - 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把**标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想**。 + - 其他分区策略 + - 其实还有一种比较常见的,即所谓的**基于地理位置的分区策略**。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。 + - 假设的所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在考虑在南方找个城市(比如广州)再创建一个机房; + 另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在广州。 + - 计划为每个新注册用户提供一份注册礼品,比如南方的用户可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。 + 如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。 + - 但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。 + 换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品! + - 此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码: + - List partitions = cluster.partitionsForTopic(topic); + - return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get(); + - 我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。 +- 补充问题 + - 在消息重试的时候,分区策略会重新再计算一次吗?比如一开始选择到5号分区,但是5号分区有问题导致重试,重试的时候可以重试发送到别的分区上吗? + - 不会的。消息重试只是简单地将消息重新发送到之前的分区 + - 在看kafka-client生产者默认分区源码时,看到了cluster.partitionsForTopic和cluster.availablePartitionsForTopic,请问什么时候分区是available,什么时候是不unavailable的? + - 分区没有leader的时候就是unavailable了。某些操作会导致瞬间没有leader,比如分区reassign、换leader等 + - RocketMQ与Kafka的主要区别 + - Kafka吞吐量大,多是面向大数据场景。RocketMQ吞吐量也很强, 不过它号称是金融业务级的消息中间件,也就是说可以用于实际的业务系统; + - RocketMQ毕竟是阿里出品,在国内技术支持力度要比Kafka强; + - Kafka现在主要发力Streaming,RocketMQ在流处理这块表现如何我不太清楚,至少streaming不是它现阶段的主要卖点。 + - 其他方面这两者确实都差不多 + - 广州机房怎么消费广州partition的数据,consumer如何指定消费的partition。 + - 使用这个方法:consumer.assign()直接消息指定分区 + +### 2.2 生产者压缩算法面面观 +- 说起压缩(compression),我相信你一定不会感到陌生。它秉承了**用时间去换空间的经典 trade-off 思想**,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。在 Kafka 中,压缩也是用来做这件事的。 + +- 怎么压缩? + - Kafka 是如何压缩消息的呢?要弄清楚这个问题,就要从 Kafka 的消息格式说起了。目前 **Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本**。V2 版本是 Kafka 0.11.0.0 中正式引入的。 + - 不论是哪个版本,Kafka 的消息层次都分为两层:**消息集合(message set)以及消息(message)**。**一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方**。 + Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka **通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作**。 + - V2 版本主要是针对 V1 版本的一些弊端做了修正 + - 一个,就是把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。 + - 在 V2 版本中,消息的 CRC 校验工作就被移到了消息集合这一层。 + - V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。 + - 之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。 + - V2 版本都比 V1 版本节省磁盘空间。 + +- 何时压缩? + - 在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。 +- 生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。 +- 下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象: +```java + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("acks", "all"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + // 开启 GZIP 压缩 + props.put("compression.type", "gzip"); + + Producer producer = new KafkaProducer<>(props); +``` +- 这里比较关键的代码行是 props.put(“compression.type”, “gzip”),它表明该 Producer 的压缩算法使用的是 GZIP。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。 +- 在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?其实大部分情况下 **Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改**,但这里的“大部分情况”也是要满足一定条件的。 +- 有两种例外情况就可能让 Broker 重新压缩消息。 + - 情况一:Broker 端指定了和 Producer 端不同的压缩算法。 + - Producer 说:“我要使用 GZIP 进行压缩。” + - Broker 说:“不好意思,我这边接收的消息必须使用 Snappy 算法进行压缩。” + - Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type,和上面那个例子中的同名。但是这个**参数的默认值是 producer**,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。 + 可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,**通常表现为 Broker 端 CPU 使用率飙升**。 + - 情况二:Broker 端发生了消息格式转换。 + - 在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外, + 它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。 + - 所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。如果有兴趣你可以深入地了解下 Zero Copy 的原理。 + +- 何时解压缩? + - 通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,**当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息**。 + - Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。**Kafka 会将启用了哪种压缩算法封装进消息集合中**,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。 + - 住这句话:**Producer 端压缩、Broker 端保持、Consumer 端解压缩。** + - 除了在 Consumer 端解压缩,Broker 端也会进行解压缩。注意了,这和前面提到消息格式转换时发生的解压缩是不同的场景。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,**目的就是为了对消息执行各种验证**。 + 我们必须承认**这种解压缩对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言**。 + - 国内京东的小伙伴们刚刚向社区提出了一个 bugfix,建议去掉因为做消息校验而引入的解压缩。据他们称,**去掉了解压缩之后,Broker 端的 CPU 使用率至少降低了 50%**。 + - 目前社区并未采纳这个建议,原因就是这种消息校验是非常重要的,不可盲目去之。毕竟先把事情做对是最重要的,**在做对的基础上,再考虑把事情做好做快**。 + - 你也可以思考一下,是否有一个两全其美的方案,既能避免消息解压缩也能对消息执行校验。 + +- 各种压缩算法对比 + - 在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:**GZIP、Snappy 和 LZ4**。从 2.1.0 开始,Kafka 正式支持 **Zstandard** 算法(简写为 zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)。 + - 对了,看一个压缩算法的优劣,有两个重要的指标: + - 一个指标是**压缩比**,原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5,显然压缩比越高越好; + - 另一个指标就是**压缩 / 解压缩吞吐量**,比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好。 + - Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果: + - ![Zstandard压缩算法对比](pic/Zstandard压缩算法对比.png) + - zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩。 + - 反观 LZ4 算法,它在吞吐量方面则是毫无疑问的执牛耳者。 + - 在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。 + - 但对于 Kafka 而言,它们的性能测试结果却出奇得一致, + - 即**在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP**; + - 而**在压缩比方面,zstd > LZ4 > GZIP > Snappy**。 + - 具体到**物理资源**,使用 **Snappy 算法占用的网络带宽最多,zstd 最少**,这是合理的,毕竟 zstd 就是要提供超高的压缩比; + - 在 **CPU 使用率**方面,各个算法表现得差不多,**只是在压缩时 Snappy 算法使用的 CPU 较多一些** + - 而**在解压缩时 GZIP 算法则可能使用更多的 CPU**。 + - 最佳实践 + - 首先来说压缩。何时启用压缩是比较合适的时机呢? + - 你现在已经知道 Producer 端完成的压缩,那么启用压缩的一个条件就是 **Producer 程序运行机器上的 CPU 资源要很充足**。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。 + - 除了 CPU 资源充足这一条件,如果你的环境中带宽资源有限,那么我也建议你开启压缩。 + - 事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,**带宽可是比 CPU 和内存还要珍贵的稀缺资源**,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。 + - **如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗**。 + - 其次说说解压缩。其实也没什么可说的。 + - 一旦启用压缩,解压缩是不可避免的事情。 + - 这里只想强调一点:我们对不可抗拒的解压缩无能为力,但至少能规避掉那些意料之外的解压缩。就像我前面说的,因为要兼容老版本而引入的解压缩操作就属于这类。有条件的话尽量保证不要出现消息格式转换的情况。 + +- 补充问题 + - 正常情况下broker端会原样保存起来,但是为了检验需要解压缩。该怎么去理解这个过程呢,broker端解压缩以后还会压缩还原吗? + - 它只是解压缩读取而已,不会将解压缩之后的数据回写到磁盘。另外就像我置顶的留言那样,目前社区已经接纳了京东小伙伴的修改,貌似可以绕过这部分解压缩了. + - 规避了broker端为执行校验而做的解压缩操作,代码也merge进了2.4版本。有兴趣的同学可以看一下: + - https://issues.apache.org/jira/browse/KAFKA-8106 + - 消息层次、消息集合、消息、日志项这些概念与它们之间的关系 + - 消息批次RecordBatch里面包含若干条消息(record)。 你可以认为消息批次和消息集合是等价的,消息和日志项是等价的。这样消息层次有两层:外层是消息批次(或消息集合);里层是消息(或日志项)。 + Producer以recordbatch为单位发送消息,对于V2版本一个batch中通常包含多条消息。在V2版本中,在batch层面计算CRC值;在V1版本中,每条消息都要计算CRC值。 + +### 2.3 无消息丢失配置怎么实现? +- 一句话概括,Kafka 只对**“已提交”的消息(committed message)做有限度的持久化保证**。 + - 第一个核心要素是 **“已提交的消息”**。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。 + - 那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。 + - 第二个核心要素就是 **“有限度的持久化保证”** + - 假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中**至少有 1 个存活**。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。 + +- “消息丢失”案例 + - 案例 1:生产者程序丢失数据 + - 目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。 + - 这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget” + - 如果用这个方式,可能会有哪些因素导致消息没有发送成功呢? + - 其实原因有很多,例如**网络抖动,导致消息压根就没有发送到 Broker 端**;或者**消息本身不合格导致 Broker 拒绝接收**(比如消息太大了,超过了 Broker 的承受能力) + - 就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。 + - 解决此问题的方法非常简单: + - Producer **永远要使用带有回调通知的发送 API**,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。 + - (回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。 + - 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。 + - 总之,**处理发送失败的责任在 Producer 端而非 Broker 端。** + - 案例 2:消费者程序丢失数据 + - Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。 + - ![Consumer端的位移数据](pic/Consumer端的位移数据.png) + - 比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。 + - 这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。 + - 正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。**不能颠倒** + - 要对抗这种消息丢失,办法很简单:**维持先消费消息(阅读),再更新位移(书签)的顺序即可**。这样就能最大限度地保证消息不丢失。 + - 当然,**这种处理方式可能带来的问题是消息的重复处理**,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。 + - 还存在一种比较隐蔽的消息丢失场景。 + - 对于 Kafka 而言, Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。 + 假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。 + - 这里的**关键在于 Consumer 自动提交位移** + - 这个问题的解决方案也很简单: + - 如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序**手动提交位移**。 + - 单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。 +- 最佳实践 + - 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,**一定要使用带有回调通知的 send 方法**。 + - **设置 acks = all**。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。 + - **设置 retries 为一个较大的值**。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。 + - **设置 unclean.leader.election.enable = false**。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。 + 故一般都要将该参数设置成 false,即不允许这种情况的发生。 + - **设置 replication.factor >= 3**。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 + - **设置 min.insync.replicas > 1**。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 + - 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。 + **推荐设置成 replication.factor = min.insync.replicas + 1**。 + - 确保消息消费完成再提交。Consumer 端有个参数 **enable.auto.commit,最好把它设置成 false**,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。 +- 补充 + - 设置 acks = all。表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。如果所有的Broker都要收到消息才能算作已提交,会不会对系统的吞吐量影响很大?另外这里的副本指的是不是仅仅是ISR? + - 碰到的实际场景,影响还是很大的。acks=all时,大部分的请求处理延时都花在了follower同步上。 是的,acks=all表明所有ISR中的副本都要同步。 + +### 2.4 客户端都有哪些不常见但是很高级的功能? +- 什么是拦截器? + - 基本思想就是允许应用程序在**不修改逻辑的情况下**,**动态地实现**一组可**插拔的事件处理逻辑链**。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。 + +- Kafka 拦截器 + - Kafka **拦截器分为生产者拦截器和消费者拦截器**。 + - 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。 + 值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,**Kafka 会按照添加顺序依次执行拦截器逻辑**。 + - Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 **interceptor.classes**,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。 + - 假设第一个拦截器的完整类路径是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor, + 那么你需要按照以下方法在 Producer 端指定拦截器: +```java +Properties props = new Properties(); +List interceptors = new ArrayList<>(); +interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1 +interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2 +props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); +``` +- 我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢? + - 这两个类以及你自己编写的所有 Producer 端拦截器实现类都**要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口**。该接口是 Kafka 提供的,里面有两个核心的方法。 + - onSend:该方法会在消息发送之前被调用。如果你想在**发送之前**对消息“美美容”,这个方法是你唯一的机会。 + - onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?**onAcknowledgement 的调用要早于 callback 的调用**。 + 值得注意的是,**这个方法和 onSend 不是在同一个线程中被调用的**,因此如果你在这两个方法中调用了某个共享可变对象,**一定要保证线程安全哦**。 + 还有一点很重要,这个方法处在 Producer 发送的主路径中,所以**最好别放一些太重的逻辑进去**,否则你会发现你的 Producer TPS 直线下降。 + - 同理,指定消费者拦截器也是同样的方法,只是具体的实现类要**实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口**,这里面也有两个核心方法。 + - onConsume:该方法在消息返回给 Consumer 程序**之前**调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。 + - onCommit:Consumer 在**提交位移之后调用**该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。 + - 一定要注意的是,指定拦截器类时要指定它们的**全限定名**,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。 + +- 典型使用场景 + - 其实,跟很多拦截器的用法相同,**Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计**等多种功能在内的场景。 + - Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去**追踪集群间消息的流转路径**。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。 + - 通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够**从具体的消息层面上去收集这些数据**。这就是 Kafka 拦截器的一个非常典型的使用场景。 + - 我们再来看看消息审计(message audit)的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要**涉及多租户以及消息审计的功能**。 + - 作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。 + 一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。 + +- 案例分享 + - 通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。 + - 某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计。 + - 既然是要计算总延时,**那么一定要有个公共的地方来保存它**,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中。 + - 实现生产者拦截器 +```java +public class AvgLatencyProducerInterceptor implements ProducerInterceptor { + + private Jedis jedis; // 省略 Jedis 初始化 + + @Override + public ProducerRecord onSend(ProducerRecord record) { + jedis.incr("totalSentMessage"); + return record; + } + + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + } + + + @Override + public void close() { + } + + + @Override + public void configure(Map configs) { + } + +``` +- 上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。 +- 消费者端的拦截器实现,代码如下: +```java +public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor { + + + private Jedis jedis; // 省略 Jedis 初始化 + + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + long lantency = 0L; + for (ConsumerRecord record : records) { + lantency += (System.currentTimeMillis() - record.timestamp()); + } + jedis.incrBy("totalLatency", lantency); + long totalLatency = Long.parseLong(jedis.get("totalLatency")); + long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage")); + jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs)); + return records; + } + + + @Override + public void onCommit(Map offsets) { + } + + + @Override + public void close() { + } + + + @Override + public void configure(Map configs) { +``` +- 在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用**当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中**。 + 之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。 +- 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。 + 这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。 + +### 2.5 + + + + + + + + + -## 2. Kafka客户端操作 @@ -127,6 +608,13 @@ kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --grou ## 7. Kafka集群部署与开发 + + + + + + + ## 8. Kafka集群监控、安全与最佳实践 diff --git a/bigdata/kafka/pic/Consumer端的位移数据.png b/bigdata/kafka/pic/Consumer端的位移数据.png new file mode 100644 index 0000000..38b8c1d Binary files /dev/null and b/bigdata/kafka/pic/Consumer端的位移数据.png differ diff --git a/bigdata/kafka/pic/Zstandard压缩算法对比.png b/bigdata/kafka/pic/Zstandard压缩算法对比.png new file mode 100644 index 0000000..4d2a729 Binary files /dev/null and b/bigdata/kafka/pic/Zstandard压缩算法对比.png differ diff --git a/bigdata/kafka/pic/按消息键保序策略.png b/bigdata/kafka/pic/按消息键保序策略.png new file mode 100644 index 0000000..ce4c456 Binary files /dev/null and b/bigdata/kafka/pic/按消息键保序策略.png differ diff --git a/bigdata/kafka/pic/轮询策略.png b/bigdata/kafka/pic/轮询策略.png new file mode 100644 index 0000000..745a04b Binary files /dev/null and b/bigdata/kafka/pic/轮询策略.png differ diff --git a/bigdata/kafka/pic/随机策略.png b/bigdata/kafka/pic/随机策略.png new file mode 100644 index 0000000..138d87d Binary files /dev/null and b/bigdata/kafka/pic/随机策略.png differ diff --git a/fuli/jav-addr/pom.xml b/fuli/jav-addr/pom.xml new file mode 100644 index 0000000..97ce4e5 --- /dev/null +++ b/fuli/jav-addr/pom.xml @@ -0,0 +1,28 @@ + + + + dev-protocol + org.example + 1.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + jav-addr + + + 8 + 8 + + + + + cn.hutool + hutool-all + 5.7.22 + + + + \ No newline at end of file diff --git a/fuli/jav-addr/src/main/java/com.baiye/Jav.java b/fuli/jav-addr/src/main/java/com.baiye/Jav.java new file mode 100644 index 0000000..27e5b86 --- /dev/null +++ b/fuli/jav-addr/src/main/java/com.baiye/Jav.java @@ -0,0 +1,39 @@ +package com.baiye; + +import cn.hutool.core.util.StrUtil; + +import java.util.List; + +/** + * 小网站生成器 + */ +public class Jav { + + public static void main(String[] args) { + String pre = "javbus|busjav|busfan|fanbus|buscdn|cdnbus|dmmsee|seedmm|busdmm|dmmbus|javsee|seejav|avsox"; + + String end = "jav|bus|dmm|see|cdn|fan"; + + + List split = StrUtil.split(pre, "|"); + + List split1 = StrUtil.split(end, "|"); + + String dian = "."; + + for (String preIndex : split) { + + String www = "https://www"; + for (String endIndex : split1) { + StringBuilder stringBuilder = new StringBuilder() + .append(www) + .append(dian) + .append(preIndex) + .append(dian) + .append(endIndex); + System.out.println(stringBuilder); + } + } + + } +} diff --git a/pom.xml b/pom.xml index c9d995f..a1f236f 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,9 @@ longpolling/demo/demo3/dev-protocol-netty-common code-language/java/java-demo bigdata/spark/best-spark + fuli/jav-addr + spring/spring-webflux/spring-webflux-demo + best-practice/css-webflux/reative-spring-css diff --git a/spring/spring-webflux/README.md b/spring/spring-webflux/README.md new file mode 100644 index 0000000..b982b02 --- /dev/null +++ b/spring/spring-webflux/README.md @@ -0,0 +1,113 @@ +

Spring 响应式编程

+ + +# 0. 目录 + + + + +# 1. 基本概念 +- 数据流和响应式 + - 数据流就是说明全链路都是以事件的方式进行驱动的 + - 响应式编程核心特点: 不采用传统的同步调用方式处理数据, 而是由处于数据库上游的各层组件自动执行事件 + - 响应式编程: 基于事件的发布订阅机制, 使用推的方式 + - 优势: 生成事件和消费事件的过程是异步执行的,所以线程的生命周期很短,资源之间的竞争关系较少, 服务器的响应能力也就越高 +- 响应式宣言和响应式系统 + - 即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)构成了响应式宣言的主体内容。 + - ![响应式宣言](pic/响应式宣言.png) + - 具备上图中各个特性的系统,就可以称为响应式系统 + +# 2. 背压机制 +- 流概念 + - 由生产者生产并由多个消费者消费的元素序列(生产者消费者模型 | 发布者订阅者模型) +- 流的处理模式 + - "拉模式": 消费者主动从生产者拉取元素 + - "推模式": 生产者将元素推送给消费者[ 资源利用率更好] +- 流量控制 + - 生产者生产数据的速率小于消费者 + - 消费者数据没有压力, 也就不需要进行流量控制 + - 生产者生产数据大于消费者消费数据 - [常见] + - 消费者可能因为无法处理过多的数据而发生崩溃 + - 常见地解决方案是 **在生产者和消费者之间加队列的形式** +- 纯推模式下的数据流量会有很多不可控的因素, 需要在推模式和拉模式之间考虑一定的平衡性从而优雅的实现流量的控制 +- 背压机制: 下游能够向上游反馈流量请求的机制 + - 如果消费者消费数据的速度赶不上生产者生产数据的速度时, 他将会持续消耗系统的资源 +- 响应式流的核心接口 +```java +// Publisher + +public interface Publisher{ + public void subscribe(Subscriber s); +} + + +// Subscriber + +public void onSubscribe(Subscriber s) // 回调方法 + +public void onNext(T t) {} // 向订阅者发送数据 + +public void onError(Throwable e) {} // 触发异常时候 + +public void onCompleted() {} // 数据流发送结束 + + +// Subscription 对象是确保生产者和消费者针对数据处理速度达成的一种动态平衡的基础, 也是流量控制中实现背压机制的关键所在 + +public interface Subscription { + void request(long var1); + + void cancel(); +} + +``` +- 业界主流的响应式开发库包括: **RxJava, Akka, Vert.x 以及 Project Reactor** + +# 3. 响应式编程应用场景 +- 数据流处理是响应式编程的一大应用场景, 流式系统的主要特点是低延迟和高吞吐 +- 网关的使用 + - 网关的作用就是用来响应前端系统的流量并将其转发到后端服务 + - Netflix Hystrix SpringCLoud Gateway 以及 SpringWebFlux + + +# 4. 基于 Spring 框架学习响应式编程 +- 响应式编程并不是只针对系统中的某一个部分组件, 而是需要适用于调用链路上的所有组件 +- 只要有一个环节不是响应式的, 那么这个环节就会出现同步阻塞 +- Spring5 提供了 WebFlux + SpringData Reactive +- WebFlux不仅包含了对创建和访问响应式HTTP端点的支持, 还可以用来实现服务器推送事件以及 WebSocket +- Spring WebFlux 需要支持异步的运行环境 + - 比如 Netty,Undertow 以及 Servlet 3.1 版本以上的 Tomcat 和 Jetty +- 非常适合开发 I/O 密集型服务 +- 不要 WebFlux和 SpringMVC混合使用, 无法保证全栈式的响应式流 +- 案例: CSS: (Customer Service System)客户服务系统 + - ![ReactiveSpringCSS架构](pic/ReactiveSpringCSS架构.png) + - 开发重点 + - Web层: 构建 RESTFUL 端点, 并通过响应式请求的WebClient客户端组件来消费这些端点 + - Service层: 核心逻辑在于完成事件处理和消息通信相关的业务场景 account-service 消息的发布者, customer-service 则是消息消费者 + - Repository层: 引入 MongoDB和Redis两款支持响应式流的 NoSQL 数据库, MongoDB为各个服务存储业务数据, Redis主要用于在 customer-service 中 + - ![ReactiveSpringCSS技术组件图](pic/ReactiveSpringCSS技术组件图.png) + +# 5. 了解 Reactor +- RxJava诞生的更早, 但是 Reactor 更有前途 +- 使用 弹珠图来说明 响应式编程的策略(Marble Diagram) + +# 6. 使用 Flux 和 Mono 构建响应式数据流 +- Flux 创建 + - 基于各种工厂模式的静态创建方法 + - just() range() interval() 以及各种以 from- 为前缀的方法组等 + - 采用编程方式动态创建Flux + - +- 注解方式进行编码 + - 基于Java注解的方式进行编码, 编程模型和 Spring MVC 一致 + - 基于函数式编程模型 + + + + + + + + + + + diff --git a/spring/spring-webflux/pic/ReactiveSpringCSS技术组件图.png b/spring/spring-webflux/pic/ReactiveSpringCSS技术组件图.png new file mode 100644 index 0000000..483a0af Binary files /dev/null and b/spring/spring-webflux/pic/ReactiveSpringCSS技术组件图.png differ diff --git a/spring/spring-webflux/pic/ReactiveSpringCSS架构.png b/spring/spring-webflux/pic/ReactiveSpringCSS架构.png new file mode 100644 index 0000000..555f800 Binary files /dev/null and b/spring/spring-webflux/pic/ReactiveSpringCSS架构.png differ diff --git a/spring/spring-webflux/pic/响应式宣言.png b/spring/spring-webflux/pic/响应式宣言.png new file mode 100644 index 0000000..39a0eda Binary files /dev/null and b/spring/spring-webflux/pic/响应式宣言.png differ diff --git a/spring/spring-webflux/spring-webflux-demo/pom.xml b/spring/spring-webflux/spring-webflux-demo/pom.xml new file mode 100644 index 0000000..34a51f6 --- /dev/null +++ b/spring/spring-webflux/spring-webflux-demo/pom.xml @@ -0,0 +1,78 @@ + + + + dev-protocol + org.example + 1.0-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + spring-webflux-demo + + + 8 + 8 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-starter-test + test + + + + io.projectreactor + reactor-test + test + + + + + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/spring/spring-webflux/spring-webflux-demo/src/main/java/com/baiye/HelloWebFluxApplication.java b/spring/spring-webflux/spring-webflux-demo/src/main/java/com/baiye/HelloWebFluxApplication.java new file mode 100644 index 0000000..addbe1f --- /dev/null +++ b/spring/spring-webflux/spring-webflux-demo/src/main/java/com/baiye/HelloWebFluxApplication.java @@ -0,0 +1,16 @@ +package com.baiye; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * 启动入口 + * + * @author q + */ +@SpringBootApplication +public class HelloWebFluxApplication { + public static void main(String[] args) { + SpringApplication.run(HelloWebFluxApplication.class, args); + } +} diff --git a/spring/spring-webflux/spring-webflux-demo/src/main/java/com/baiye/controller/HelloWebFluxController.java b/spring/spring-webflux/spring-webflux-demo/src/main/java/com/baiye/controller/HelloWebFluxController.java new file mode 100644 index 0000000..914f8b8 --- /dev/null +++ b/spring/spring-webflux/spring-webflux-demo/src/main/java/com/baiye/controller/HelloWebFluxController.java @@ -0,0 +1,14 @@ +package com.baiye.controller; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +@RestController +public class HelloWebFluxController { + + @GetMapping("/") + public Mono hello() { + return Mono.just("Hello World for WebFlux !"); + } +}