[新增功能](master): 完善客户端代码和文档问题

1.完善客户端代码
2.对Netty高性能问题进行部分分析,并设计整合Disrupter
master
土豆兄弟 2 years ago
parent 4537f00517
commit 333647d1cf

@ -7,3 +7,28 @@
* Netty的高性能
* Disruptor核心池化封装
* 承接百万级连接的接入
## 2. 启动及结构说明
dev-protocol-disruptor-netty-client 客户端(后启动)
dev-protocol-disruptor-netty-common 公用的jar(对Server和Client端同时提供依赖)
dev-protocol-disruptor-netty-server 服务端(先启动)
## 3. Netty高性能问题分析
WorkGroup <---- NettyClient -------------------------> NettyServer (BossGroup + WorkGroup)
| |
| |
|/ |/
ClientHandler ServerHandler[如果业务很复杂的话,耗时很长的话,会影响性能]
---
* 在使用Netty进行接收处理数据的时候,我们尽量都不要在工作线程上全编写自己的代码逻辑
* 我们需要利用异步的机制,比如使用线程池异步处理,如果使用线程池就意味着使用阻塞队列,这里可以替换为Disruptor提高性能
---
* 解决办法
* ServerHandler的消息处理交给一个异步的线程池去处理即可(数据库持久化操作,IO读写)
* 采用Disruptor作为异步线程池的替代来进行处理

@ -12,9 +12,47 @@
<artifactId>dev-protocol-disruptor-netty-client</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.example</groupId>
<artifactId>dev-protocol-disruptor-netty-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,19 @@
package com.baiye;
import com.baiye.client.NettyClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Administrator
*/
@SpringBootApplication
public class NettyClientApplication {
public static void main(String[] args) {
SpringApplication.run(NettyClientApplication.class, args);
// FIXME: 2022/7/12 0012 这里直接进行调用进行模拟,整合的话,要把消息接口对外暴露,然后进行发送即可
// 建立链接,并发送消息
new NettyClient().sendMessage();
}
}

@ -0,0 +1,29 @@
package com.baiye.client;
import com.baiye.dto.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
*
*
* @author q
* @date 2022/07/12
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
TranslatorData response = (TranslatorData) msg;
System.out.println("Client端 :" + response);
}finally {
// 消息在buffer中,因此要进行释放 -> 方便进行GC
// 最后一步如果有写就不需要考虑手动释放, 因为写的底层是用递归写完之后进行释放的
ReferenceCountUtil.release(msg);
}
}
}

@ -0,0 +1,97 @@
package com.baiye.client;
import com.baiye.codec.MarshallingCodeCFactory;
import com.baiye.dto.TranslatorData;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.channel.socket.SocketChannel;
public class NettyClient {
/**
* Server Ip
*/
public static final String HOST = "127.0.0.1";
/**
* Server
*/
public static final int PORT = 8765;
private Channel channel;
private ChannelFuture cf;
// 1. 创建工作线程组,用于实际处理业务的线程组
EventLoopGroup workGroup = new NioEventLoopGroup();
public NettyClient() {
this.connect(HOST, PORT);
}
private void connect(String host, int port) {
// 创建辅助类 - 帮助构建Netty模型 - Server 端和 Client 端不一样
Bootstrap bootstrap;
try {
bootstrap = new Bootstrap()
.group(workGroup)
.channel(NioSocketChannel.class)
// fixme 表示缓存区动态调配(自适应) 数据包相差不大的时候比较合适
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
// 缓冲区 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 记录日志
.handler(new LoggingHandler(LogLevel.INFO))
// 数据接收过来给哪个方法进行回调 -> 接收数据进行异步处理
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// fixme 在管道上加一些拦截器进行处理
// 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
// 信息处理
socketChannel.pipeline().addLast(new ClientHandler());
}
});
// 绑定端口, 同步请求链接
cf = bootstrap.connect(host, port).sync();
System.out.println(" Client connected ... ");
// 进行数据的发送 首先需要我们先获取通道chanel
// FIXME: 2022/7/12 0012 这里的channel 要做一个chanel池进行处理提高性能 ConcurrentHashMap<String, Channel>
this.channel = cf.channel();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sendMessage() {
// fixme 这里的消息内容可以从前端进行获取
for (int i = 0; i < 10; i++) {
TranslatorData request = new TranslatorData();
request.setId("" + i);
request.setName("请求消息名称" + i) ;
request.setMessage("请求消息内容" + i);
this.channel.writeAndFlush(request);
}
}
public void close() throws InterruptedException {
// 异步的进行关闭
cf.channel().closeFuture().sync();
// 优雅进行关闭
workGroup.shutdownGracefully();
}
}

@ -1,10 +1,22 @@
package com.baiye;
import com.baiye.server.NettyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
*
* @author q
* @date 2022/07/11
*/
@SpringBootApplication
public class NettyServerApplication {
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
// FIXME: 2022/7/12 0012 后续改成SpringBoot的方式进行初始化Server端
new NettyServer();
}
}

@ -3,10 +3,7 @@ package com.baiye.server;
import com.baiye.codec.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -21,6 +18,11 @@ import io.netty.handler.logging.LoggingHandler;
*/
public class NettyServer {
/**
* Server
*/
public static final int PORT = 8765;
/**
* Netty 线
@ -46,33 +48,47 @@ public class NettyServer {
EventLoopGroup workGroup = new NioEventLoopGroup();
// 创建辅助类 - 帮助构建Netty模型
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
// fixme 设置 backlog大小
.option(ChannelOption.SO_BACKLOG, 1024)
// fixme 表示缓存区动态调配(自适应) 数据包相差不大的时候比较合适
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
// 缓冲区 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 记录日志
.handler(new LoggingHandler(LogLevel.INFO))
// 数据接收过来给哪个方法进行回调 -> 接收数据进行异步处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// fixme 在管道上加一些拦截器进行处理
// 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
// 信息处理
socketChannel.pipeline().addLast(new ServerHandler());
}
});
// 创建辅助类 - 帮助构建Netty模型
ServerBootstrap serverBootstrap;
try {
serverBootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
// fixme 设置 backlog大小
.option(ChannelOption.SO_BACKLOG, 1024)
// fixme 表示缓存区动态调配(自适应) 数据包相差不大的时候比较合适
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
// 缓冲区 池化操作
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 记录日志
.handler(new LoggingHandler(LogLevel.INFO))
// 数据接收过来给哪个方法进行回调 -> 接收数据进行异步处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// fixme 在管道上加一些拦截器进行处理
// 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
// 信息处理
socketChannel.pipeline().addLast(new ServerHandler());
}
});
// 绑定端口, 同步请求链接
ChannelFuture cf = serverBootstrap.bind(PORT).sync();
System.out.println(" Server Startup ... ");
// 异步的进行关闭
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放线程组资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
System.out.println(" Server ShutDown ... ");
}
}
}

Loading…
Cancel
Save