Merge remote-tracking branch 'origin/master' into master

master
土豆兄弟 2 years ago
commit 90e206512a

@ -338,7 +338,7 @@ protoc --java_out=[生成文件的目录]
## 5. 实战及调优 ## 5. 实战及调优
### 5.1 调优参数: 调整 System 参数 ### 5.1 调优参数: 调整 System 参数及 Netty 核心参数
- Linux 系统参数 - Linux 系统参数
- ![调整System参数](pic/调整System参数.png) - ![调整System参数](pic/调整System参数.png)
- 1: 查看 云服务器 支持不支持这种帮助你进行一键调优的工具或者配置 - 1: 查看 云服务器 支持不支持这种帮助你进行一键调优的工具或者配置
@ -384,16 +384,18 @@ protoc --java_out=[生成文件的目录]
- ![Netty支持的调优参数表7](pic/Netty支持的调优参数表7.png) - ![Netty支持的调优参数表7](pic/Netty支持的调优参数表7.png)
- 服务端调优 com.baiye.case5.server.ServerV1 - 服务端调优 com.baiye.case5.server.ServerV1
- 客户端调优 com.baiye.case5.client.OrderClientV4 - 客户端调优 com.baiye.case5.client.OrderClientV4
---
- SO_REUSEADDR
- 一般不会开启这个参数
- 地址重用参数
- ![地址重用参数](pic/地址重用参数.png)
- SO_LINGER
- 一般不会开启这个参数
- ![SO_LINGER参数](pic/SO_LINGER参数.png)
- ALLOW_HALF_CLOSURE
- 半关参数
- 一般不会开启这个参数
- ![ALLOW_HALF_CLOSURE参数](pic/ALLOW_HALF_CLOSURE参数.png)
```shell ```shell
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
@ -401,12 +403,156 @@ serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
- SocketChannel -> .childOption - SocketChannel -> .childOption
- ServerSocketChannel -> .option - ServerSocketChannel -> .option
### 5.2 跟踪诊断 应用易诊断 | 可视化 | 内存不泄露
- **"易" 诊断**
- 完善 "线程名"
- 完善 "Handler" 名称
- 使用好 Netty 日志
- ![完善 "线程名"](pic/完善线程名.png)
- 一般实现 2-1 表示 boss group
- 一般实现 3-1 表示 worker group
- 以后可能改变
- ![完善Handler名称](pic/完善Handler名称.png)
- "$1" 表示一个匿名内部类
- "#0" 防止一个pipeline中加入多个 handler
- Netty 日志原理及使用
- Netty 日志框架原理
- Netty 会自动去依赖主流的日志框架的实现, 把 slf4j log4j 的 <option>ture 进行设置 所以可以直接使用
- 修改 JDK logger 级别
- 默认 JDK 的日志没有 "DEBUG" 的日志级别, 默认是使用 "FINE"
- 使用 slf4j + log4j 示例
- 加入相关的 日志依赖即可
- 衡量好 longing handler 的位置和级别
- 参考代码 com.baiye.case5.server.ServerV1
- **可视化**
- 演示如何做 Netty 可视化
- 实现一个: 统计并展示当前系统连接数
- Console 日志定时输出
- JMX实时展示
- ~~ELKK TIG etc~~ (耗费时间, 但是是可视化图表)
- 参考 : com.baiye.case5.server.handler.MetricsHandler
- Netty 值得可视化的数据
- ![Netty值得可视化的数据](pic/Netty值得可视化的数据.png)
- ![Netty值得可视化的数据](pic/Netty值得可视化的数据1.png)
- ![Netty值得可视化的数据](pic/Netty值得可视化的数2.png)
- **内存不泄露**
- Netty 内存泄露指的是什么?
- 原因: "忘记" release
- ByteBuf buffer = ctx.alloc().buffer();
- ~~buffer.release() / ReferenceCountUtil.release(buffer);~~
- 后果: 资源未释放 -> OOM
- 堆外: 未 free (PlatformDependent.freeDirectBuffer(buffer));
- 池化: 未归还 (recyclerHandle.recycle(this))
- Netty 内存泄露检测核心思路
- 引用计数 (buffer.refCnt()) + 弱引用 (Weak reference)
- 引用计数
- 判断历史人物到底功大于过, 还是过大于功?
- 功 + 1, 过 -1 , =0时: 资源就该释放了
- 那什么时候判断? "盖棺定论" 时 -> 对象被 GC 后
- ![强引用和弱引用](pic/强引用和弱引用.png)
- ![Netty内存检测的核心](pic/Netty内存检测的核心.png)
- Netty 内存泄露检测的源码解析
- io.netty.util.ResourceLeakDetector
- ![Netty内存检测的核心](pic/Netty内存检测的核心1.png)
- 示例: 用 Netty 内存泄露检测工具做检测
- ![Netty内存检测的核心](pic/Netty内存检测的核心2.png)
### 5.3 优化使用
#### A. 用好 Netty 自带注解
- @Sharable
- 标识 handler 提醒可共享, 不标记共享的不能重复加入 pipeline, 会造成程序的中断
- @Skip
- 跳过 handler 的执行
- 4.x 不让使用, 因为直接把代码可以进行删除, 没必要通过比偶记得方式进行
- 但是他自己还是私人调用
- @UnstableApi
- 提醒不稳定, 慎用, 一般对未进行正式测试的类进行标记,不要进行使用
- @SuppressJava6Requirement
- ![去除Java6的报警注解](pic/去除Java6的报警注解.png)
- ![去除Java6的报警注解1](pic/去除Java6的报警注解1.png)
- ![去除Java6的报警注解2](pic/去除Java6的报警注解2.png)
- @SuppressForbidden
- ![去除报警注解](pic/去除报警注解.png)
- ![去除报警注解1](pic/去除报警注解1.png)
- ![去除报警注解1](pic/去除报警注解2.png)
#### B. 整改线程模型加快响应
- 业务常见的场景
- CPU密集型: 运算型
- IO密集型: 等待型
- CPU密集型
- ![CPU密集型](pic/CPU密集型.png)
- IO密集型
- ![IO密集型](pic/IO密集型.png)
#### C. 增强写, 延迟和吞吐量的抉择
- 写的 "问题"
- ![写的问题](pic/写的问题.png)
- 增加一点点延迟, 减少 flush 次数
- 改进方式1 : channelReadComplete
- ![channelReadComplete改进写](pic/channelReadComplete改进写.png)
- ![channelReadComplete改进写问题](pic/channelReadComplete改进写问题.png)
- 改进方式2 : flushConsulidationHandler
- ![flushConsulidationHandler源码分析](pic/flushConsulidationHandler源码分析.png)
#### D. 流量整形, 让其运行平稳
- 流量整形的用途
- 网盘限速(有意)
- 景点限流(无奈)
- Netty 内置的三种流量整形
- Channel 级别 io.netty.handler.traffic.ChannelTrafficShapingHandler
- Global 级别 io.netty.handler.traffic.GlobalTrafficShapingHandler
- Netty 流量整形的源码分析和总结
- ![流量整形的源码总结](pic/流量整形的源码总结.png)
- 示例: 流量整形的使用
- ![流量整形的使用](pic/流量整形的使用.png)
- com.baiye.case5.server.ServerV4
#### E. 不同平台开启 native
- 如何开启 native
- ![如何开启native](pic/如何开启native.png)
- com.baiye.case5.server.ServerV5
- 源码分析 Native 库的加载逻辑
- ![native库的加载逻辑](pic/native库的加载逻辑.png)
- 常见问题
- ![native库的加载逻辑](pic/native库的加载逻辑1.png)
- ![native库的加载逻辑](pic/native库的加载逻辑2.png)
### 5.4 安全增强
#### A. 设置 "水位线" 保护自己安全
- Netty OOM 的根本原因
- ![OOM的根本原因](pic/OOM的根本原因.png)
- Netty OOM - ChannelOutboundBuffer
- ![ChannelOutboundBuffer-OOM](pic/ChannelOutboundBuffer-OOM.png)
- Netty OOM - TrafficShapingHandler
- ![TrafficShapingHandler-OOM](pic/TrafficShapingHandler-OOM.png)
- ![unwritable-OOM](pic/unwritable-OOM.png)
- Netty OOM 的对策
- ![OOM-对策](pic/OOM-对策.png)
- com.baiye.case5.server.handler.OrderServerProcessHandler
#### B. 启动空闲检测
- ![启动空闲检测](pic/启动空闲检测.png)
- Server : com.baiye.case5.server.handler.ServerIdleCheckHandler
- Cliet : com.baiye.case5.client.handler.ClientIdleCheckHandler [编码前]| com.baiye.case5.client.handler.KeepaliveHandler [编码后]
- PS: 记得把整个 Handler 放到 Client 中
#### C. 黑白名单
- Netty 中的 "cidrPrefix" 是什么?
- ![cidrPrefix](pic/cidrPrefix.png)
- Netty 地址过滤功能源码分析
- 同一个 Ip 只能有一个连接
- Ip 地址过滤: 黑名单, 白名单
- 示例: 使用黑名单增强安全
- com.baiye.case5.server.ServerV7
#### D. 自定义授权
- com.baiye.case5.server.handler.AuthHandler [编码后]
- 对应Client端必须把整个消息进行第一个发送出去
#### E. SSL 设置

@ -42,6 +42,18 @@
<artifactId>hutool-all</artifactId> <artifactId>hutool-all</artifactId>
<version>5.7.22</version> <version>5.7.22</version>
</dependency> </dependency>
<!-- 连接检测集成 -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.1.1</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -59,6 +59,13 @@ public class OrderClientV0 {
OrderOperation orderOperation = new OrderOperation(1, "fish"); OrderOperation orderOperation = new OrderOperation(1, "fish");
// 生成Id // 生成Id
long id = IdUtil.nextId(); long id = IdUtil.nextId();
// fixme 模拟多发送点消息产生 GC
// for (int i = 0; i < 100000; i++) {
// channelFuture.channel().writeAndFlush(new RequestMessage(id, orderOperation));
// }
// 发送消息 // 发送消息
channelFuture.channel().writeAndFlush(new RequestMessage(id, orderOperation)); channelFuture.channel().writeAndFlush(new RequestMessage(id, orderOperation));

@ -0,0 +1,11 @@
package com.baiye.case5.client.handler;
import io.netty.handler.timeout.IdleStateHandler;
public class ClientIdleCheckHandler extends IdleStateHandler {
public ClientIdleCheckHandler() {
super(0, 5, 0);
}
}

@ -0,0 +1,26 @@
package com.baiye.case5.client.handler;
import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.common.keepalive.KeepaliveOperation;
import com.baiye.case5.util.IdUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
log.info("write idle happen. so need to send keepalive to keep connection not closed by server");
KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), keepaliveOperation);
ctx.writeAndFlush(requestMessage);
}
super.userEventTriggered(ctx, evt);
}
}

@ -2,8 +2,11 @@ package com.baiye.case5.common.order;
import com.baiye.case5.common.Operation; import com.baiye.case5.common.Operation;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.Data; import lombok.Data;
import java.util.concurrent.TimeUnit;
@Data @Data
public class OrderOperation extends Operation { public class OrderOperation extends Operation {
@ -18,7 +21,8 @@ public class OrderOperation extends Operation {
@Override @Override
public OrderOperationResult execute() { public OrderOperationResult execute() {
System.out.println("order's executing startup with orderRequest: " + toString()); System.out.println("order's executing startup with orderRequest: " + toString());
//execute order logic //execute order logic - 模拟业务执行花费3秒钟 - 测试异步线程使用
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
System.out.println("order's executing complete"); System.out.println("order's executing complete");
OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true); OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true);
return orderResponse; return orderResponse;

@ -4,6 +4,7 @@ import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder; import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder; import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder; import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler; import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -15,6 +16,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -32,10 +34,14 @@ public class ServerV1 {
serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别 // 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 // 设置 Reactor 方式 - 规范需要指定线程名称
NioEventLoopGroup group = new NioEventLoopGroup(); NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
try{ NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
serverBootstrap.group(group);
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调 // 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true); serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调 // 参数调优2 - 必调
@ -45,16 +51,20 @@ public class ServerV1 {
@Override @Override
protected void initChannel(NioSocketChannel ch) throws Exception { protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast(new OrderFrameDecoder()); pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast(new OrderFrameEncoder()); pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
// 设置 日志级别 // 设置 日志级别
pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler()); pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
} }
}); });
// 绑定端口启动 // 绑定端口启动
@ -62,7 +72,8 @@ public class ServerV1 {
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().sync();
} finally { } finally {
group.shutdownGracefully(); boss.shutdownGracefully();
worker.shutdownGracefully();
} }

@ -0,0 +1,92 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import java.util.concurrent.ExecutionException;
/**
* - - v2 handler
*
* @author q
*/
public class ServerV2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 - 规范需要指定线程名称
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 整个程序共享的业务线程池 - 切记不能使用 NioEventLoopGroup, 这样只能使用其中的一个线程
UnorderedThreadPoolEventExecutor orderServerProcessThreadPool = new UnorderedThreadPoolEventExecutor(
10,
new DefaultThreadFactory("OrderServerProcessThreadPool")
);
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
// 业务处理 Handler
// pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
// FIXME: 使用独立的线程池进行异步执行逻辑
pipeline.addLast(orderServerProcessThreadPool, new OrderServerProcessHandler());
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

@ -0,0 +1,88 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException;
/**
* - - flush
*
* @author q
*/
public class ServerV3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 - 规范需要指定线程名称
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
// fixme flush 优化版本 - 非共享 handler
pipeline.addLast("FlushOrderServerProcessHandler", new FlushConsolidationHandler(
// 5次 write 才 flush一次
5,
// 异步增强打开
true
));
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

@ -0,0 +1,91 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException;
/**
* - -
*
* @author q
*/
public class ServerV4 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 - 规范需要指定线程名称
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
// FIXME: 流量整形 Handler 简易的线程池 + 100MB + 100MB
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(
new NioEventLoopGroup(),
100 * 1024 * 1024,
100 * 1024 * 1024
);
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
// fixme 全局流控
pipeline.addLast("GlobalTrafficShapingHandler", globalTrafficShapingHandler);
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

@ -0,0 +1,86 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException;
/**
* - - native
*
* "Nio" "Epoll"
*
* Linux
*
* @author q
*/
public class ServerV5 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(EpollServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 - 规范需要指定线程名称
EpollEventLoopGroup boss = new EpollEventLoopGroup(0, new DefaultThreadFactory("boss"));
EpollEventLoopGroup worker = new EpollEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调
serverBootstrap.childOption(EpollChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(EpollChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<EpollSocketChannel>() {
@Override
protected void initChannel(EpollSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

@ -0,0 +1,85 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import com.baiye.case5.server.handler.ServerIdleCheckHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException;
/**
* - -
*
* @author q
*/
public class ServerV6 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 - 规范需要指定线程名称
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
// fixme 加入空闲检测
pipeline.addLast("ServerIdleCheckHandler", new ServerIdleCheckHandler());
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

@ -0,0 +1,90 @@
package com.baiye.case5.server;
import com.baiye.case5.server.codec.OrderFrameDecoder;
import com.baiye.case5.server.codec.OrderFrameEncoder;
import com.baiye.case5.server.codec.OrderProtocolDecoder;
import com.baiye.case5.server.codec.OrderProtocolEncoder;
import com.baiye.case5.server.handler.MetricsHandler;
import com.baiye.case5.server.handler.OrderServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ipfilter.IpFilterRuleType;
import io.netty.handler.ipfilter.IpSubnetFilterRule;
import io.netty.handler.ipfilter.RuleBasedIpFilter;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException;
/**
* - -
*
* @author q
*/
public class ServerV7 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置 I/O 模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置 日志级别
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 设置 Reactor 方式 - 规范需要指定线程名称
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
// 共享的 Handler
MetricsHandler metricsHandler = new MetricsHandler();
// fixme 加入黑白名单 REJECT 标识为黑名单 | 切换 IpFilterRuleType 可以变成白名单
IpSubnetFilterRule ipSubnetFilterRule = new IpSubnetFilterRule("127.0.0.1", 8, IpFilterRuleType.ACCEPT);
RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(ipSubnetFilterRule);
try {
serverBootstrap.group(boss, worker);
// 参数调优1 - 必调
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
// 参数调优2 - 必调
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 调试的时候使用的
pipeline.addLast("LoggingHandler-DEBUG", new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("IpFileter", ruleBasedIpFilter);
// Handler - 规范一定要指定线程名称
pipeline.addLast("OrderFrameDecoder", new OrderFrameDecoder());
pipeline.addLast("OrderFrameEncoder", new OrderFrameEncoder());
pipeline.addLast("OrderProtocolEncoder", new OrderProtocolEncoder());
pipeline.addLast("OrderProtocolDecoder", new OrderProtocolDecoder());
// 设置 日志级别
pipeline.addLast("LoggingHandler-INFO", new LoggingHandler(LogLevel.INFO));
pipeline.addLast("OrderServerProcessHandler", new OrderServerProcessHandler());
// 连接数监测 Handler
pipeline.addLast("MetricsHandler", metricsHandler);
}
});
// 绑定端口启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

@ -0,0 +1,44 @@
package com.baiye.case5.server.handler;
import com.baiye.case5.common.Operation;
import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.common.auth.AuthOperation;
import com.baiye.case5.common.auth.AuthOperationResult;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ChannelHandler.Sharable
public class AuthHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) throws Exception {
try {
Operation operation = msg.getMessageBody();
if (operation instanceof AuthOperation) {
AuthOperation authOperation = (AuthOperation) operation;
AuthOperationResult authOperationResult = authOperation.execute();
if (authOperationResult.isPassAuth()) {
log.info("pass auth");
} else {
log.error("fail to auth");
ctx.close();
}
} else {
// 第一个请求必须是做授权的
log.error("expect first msg is auth");
ctx.close();
}
} catch (Exception e) {
// 异常发生, 直接关闭好了
log.error("exception happen for: " + e.getMessage(), e);
ctx.close();
} finally {
// 授权完成后从 pipeline 进行移除
ctx.pipeline().remove(this);
}
}
}

@ -0,0 +1,62 @@
package com.baiye.case5.server.handler;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* ChannelDuplexHandler Handler
*
* @ChannelHandler.Sharable handler
*/
@ChannelHandler.Sharable
public class MetricsHandler extends ChannelDuplexHandler {
/**
*
*/
private AtomicLong totalConnectionNumber = new AtomicLong();
{
// jar - 入口
MetricRegistry metricRegistry = new MetricRegistry();
// 把 totalConnectionNumber register 进去
metricRegistry.register("totalConnectionNumber", (Gauge<Long>) () -> totalConnectionNumber.longValue());
// Console 的方式进行展示数据
ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
// 以 10秒 的时间周期进行打印
consoleReporter.start(10, TimeUnit.SECONDS);
// JMX 的方式进行展示
JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
jmxReporter.start();
}
/**
* + 1
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.incrementAndGet();
super.channelActive(ctx);
}
/**
* - 1
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.decrementAndGet();
super.channelInactive(ctx);
}
}

@ -4,15 +4,23 @@ import com.baiye.case5.common.Operation;
import com.baiye.case5.common.OperationResult; import com.baiye.case5.common.OperationResult;
import com.baiye.case5.common.RequestMessage; import com.baiye.case5.common.RequestMessage;
import com.baiye.case5.common.ResponseMessage; import com.baiye.case5.common.ResponseMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
/** /**
* 3. Handler * 3. Handler
*/ */
@Slf4j
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> { public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestMessage requestMessage) throws Exception { protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestMessage requestMessage) throws Exception {
// 伪造内存泄露问题
// ByteBuf buffer = channelHandlerContext.alloc().buffer();
// 启动修改参数 netty.NettyProperties.LeakDetection.PARANOID
Operation operation = requestMessage.getMessageBody(); Operation operation = requestMessage.getMessageBody();
OperationResult operationResult = operation.execute(); OperationResult operationResult = operation.execute();
@ -22,7 +30,14 @@ public class OrderServerProcessHandler extends SimpleChannelInboundHandler<Reque
responseMessage.setMessageHeader(requestMessage.getMessageHeader()); responseMessage.setMessageHeader(requestMessage.getMessageHeader());
responseMessage.setMessageBody(operationResult); responseMessage.setMessageBody(operationResult);
// fixme 加一个写保护判断
if (channelHandlerContext.channel().isActive() && channelHandlerContext.channel().isWritable()) {
channelHandlerContext.writeAndFlush(responseMessage);
}else {
// 避免了 OOM 但是这里的数据一定要做补偿
log.error("message dropped");
}
// 信息写出去 // 信息写出去
channelHandlerContext.writeAndFlush(responseMessage); // channelHandlerContext.writeAndFlush(responseMessage);
} }
} }

@ -0,0 +1,38 @@
package com.baiye.case5.server.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {
/**
* 10s
*/
public ServerIdleCheckHandler() {
super(10, 0, 0, TimeUnit.SECONDS);
}
/**
*
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
// 断了连接
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
log.info("idle check happen, so close the connection");
ctx.close();
return;
}
super.channelIdle(ctx, evt);
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 253 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 481 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 187 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 576 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 186 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 306 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 340 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 211 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 566 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 331 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 223 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 320 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 426 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 411 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 523 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 260 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 508 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 750 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 588 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 244 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 692 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 754 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 284 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 196 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 242 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 444 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 446 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 397 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 379 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 412 KiB

Loading…
Cancel
Save