diff --git a/longpolling/demo/demo1/README.md b/longpolling/demo/demo1/README.md index 6d278e5..d9a4495 100644 --- a/longpolling/demo/demo1/README.md +++ b/longpolling/demo/demo1/README.md @@ -7,3 +7,28 @@ * Netty的高性能 * Disruptor核心池化封装 * 承接百万级连接的接入 + +## 2. 启动及结构说明 + +dev-protocol-disruptor-netty-client 客户端(后启动) +dev-protocol-disruptor-netty-common 公用的jar(对Server和Client端同时提供依赖) +dev-protocol-disruptor-netty-server 服务端(先启动) + +## 3. Netty高性能问题分析 + +WorkGroup <---- NettyClient -------------------------> NettyServer (BossGroup + WorkGroup) +| | +| | +|/ |/ +ClientHandler ServerHandler[如果业务很复杂的话,耗时很长的话,会影响性能] + +--- + +* 在使用Netty进行接收处理数据的时候,我们尽量都不要在工作线程上全编写自己的代码逻辑 +* 我们需要利用异步的机制,比如使用线程池异步处理,如果使用线程池就意味着使用阻塞队列,这里可以替换为Disruptor提高性能 + +--- + +* 解决办法 + * ServerHandler的消息处理交给一个异步的线程池去处理即可(数据库持久化操作,IO读写) + * 采用Disruptor作为异步线程池的替代来进行处理 diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/pom.xml b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/pom.xml index 26bb31e..0659b25 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/pom.xml +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/pom.xml @@ -12,9 +12,47 @@ dev-protocol-disruptor-netty-client + - 8 - 8 + UTF-8 + UTF-8 + 1.8 + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.example + dev-protocol-disruptor-netty-common + 1.0-SNAPSHOT + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + \ No newline at end of file diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java new file mode 100644 index 0000000..16272b7 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java @@ -0,0 +1,19 @@ +package com.baiye; + +import com.baiye.client.NettyClient; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author Administrator + */ +@SpringBootApplication +public class NettyClientApplication { + public static void main(String[] args) { + SpringApplication.run(NettyClientApplication.class, args); + + // FIXME: 2022/7/12 0012 这里直接进行调用进行模拟,整合的话,要把消息接口对外暴露,然后进行发送即可 + // 建立链接,并发送消息 + new NettyClient().sendMessage(); + } +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java new file mode 100644 index 0000000..a730566 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java @@ -0,0 +1,29 @@ +package com.baiye.client; + +import com.baiye.dto.TranslatorData; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; + +/** + * 客户端 消息处理 + * + * @author q + * @date 2022/07/12 + */ +public class ClientHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try{ + TranslatorData response = (TranslatorData) msg; + System.out.println("Client端 :" + response); + }finally { + // 消息在buffer中,因此要进行释放 -> 方便进行GC + // 最后一步如果有写就不需要考虑手动释放, 因为写的底层是用递归写完之后进行释放的 + ReferenceCountUtil.release(msg); + } + + } + +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/NettyClient.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/NettyClient.java new file mode 100644 index 0000000..f4d9011 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/NettyClient.java @@ -0,0 +1,97 @@ +package com.baiye.client; + +import com.baiye.codec.MarshallingCodeCFactory; +import com.baiye.dto.TranslatorData; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.channel.socket.SocketChannel; + + +public class NettyClient { + + /** + * Server 端Ip地址 + */ + public static final String HOST = "127.0.0.1"; + + /** + * Server 端端口号 + */ + public static final int PORT = 8765; + + private Channel channel; + + private ChannelFuture cf; + + // 1. 创建工作线程组,用于实际处理业务的线程组 + EventLoopGroup workGroup = new NioEventLoopGroup(); + + + public NettyClient() { + this.connect(HOST, PORT); + } + + private void connect(String host, int port) { + + + // 创建辅助类 - 帮助构建Netty模型 - Server 端和 Client 端不一样 + Bootstrap bootstrap; + try { + bootstrap = new Bootstrap() + .group(workGroup) + .channel(NioSocketChannel.class) + // fixme 表示缓存区动态调配(自适应) 数据包相差不大的时候比较合适 + .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) + // 缓冲区 池化操作 + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + // 记录日志 + .handler(new LoggingHandler(LogLevel.INFO)) + // 数据接收过来给哪个方法进行回调 -> 接收数据进行异步处理 + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + // fixme 在管道上加一些拦截器进行处理 + // 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码 + socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); + socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); + // 信息处理 + socketChannel.pipeline().addLast(new ClientHandler()); + } + }); + // 绑定端口, 同步请求链接 + cf = bootstrap.connect(host, port).sync(); + System.out.println(" Client connected ... "); + // 进行数据的发送 首先需要我们先获取通道chanel + // FIXME: 2022/7/12 0012 这里的channel 要做一个chanel池进行处理,提高性能 ConcurrentHashMap + this.channel = cf.channel(); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void sendMessage() { + // fixme 这里的消息内容可以从前端进行获取 + for (int i = 0; i < 10; i++) { + TranslatorData request = new TranslatorData(); + request.setId("" + i); + request.setName("请求消息名称" + i) ; + request.setMessage("请求消息内容" + i); + + this.channel.writeAndFlush(request); + } + + } + + public void close() throws InterruptedException { + // 异步的进行关闭 + cf.channel().closeFuture().sync(); + // 优雅进行关闭 + workGroup.shutdownGracefully(); + } +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java index f530611..538aa6d 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java @@ -1,10 +1,22 @@ package com.baiye; +import com.baiye.server.NettyServer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + /** * 启动入口 * * @author q * @date 2022/07/11 */ +@SpringBootApplication public class NettyServerApplication { + + public static void main(String[] args) { + SpringApplication.run(NettyServerApplication.class, args); + + // FIXME: 2022/7/12 0012 后续改成SpringBoot的方式进行初始化Server端 + new NettyServer(); + } } diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/NettyServer.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/NettyServer.java index 46114a0..53452ab 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/NettyServer.java +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/NettyServer.java @@ -3,10 +3,7 @@ package com.baiye.server; import com.baiye.codec.MarshallingCodeCFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -21,6 +18,11 @@ import io.netty.handler.logging.LoggingHandler; */ public class NettyServer { + /** + * Server 端端口号 + */ + public static final int PORT = 8765; + /** * Netty 两个线程租的底层原理 @@ -46,33 +48,47 @@ public class NettyServer { EventLoopGroup workGroup = new NioEventLoopGroup(); - // 创建辅助类 - 帮助构建Netty模型 - ServerBootstrap serverBootstrap = new ServerBootstrap() - .group(bossGroup, workGroup) - .channel(NioServerSocketChannel.class) - // fixme 设置 backlog大小 - .option(ChannelOption.SO_BACKLOG, 1024) - // fixme 表示缓存区动态调配(自适应) 数据包相差不大的时候比较合适 - .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) - // 缓冲区 池化操作 - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - // 记录日志 - .handler(new LoggingHandler(LogLevel.INFO)) - // 数据接收过来给哪个方法进行回调 -> 接收数据进行异步处理 - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - // fixme 在管道上加一些拦截器进行处理 - - // 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码 - socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); - socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); - // 信息处理 - socketChannel.pipeline().addLast(new ServerHandler()); - } - }); + // 创建辅助类 - 帮助构建Netty模型 + ServerBootstrap serverBootstrap; + try { + serverBootstrap = new ServerBootstrap() + .group(bossGroup, workGroup) + .channel(NioServerSocketChannel.class) + // fixme 设置 backlog大小 + .option(ChannelOption.SO_BACKLOG, 1024) + // fixme 表示缓存区动态调配(自适应) 数据包相差不大的时候比较合适 + .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) + // 缓冲区 池化操作 + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + // 记录日志 + .handler(new LoggingHandler(LogLevel.INFO)) + // 数据接收过来给哪个方法进行回调 -> 接收数据进行异步处理 + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + // fixme 在管道上加一些拦截器进行处理 + // 进行设置编解码 -> 对Java对象转为的二进制数据进行编解码 + socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); + socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); + // 信息处理 + socketChannel.pipeline().addLast(new ServerHandler()); + } + }); + // 绑定端口, 同步请求链接 + ChannelFuture cf = serverBootstrap.bind(PORT).sync(); + System.out.println(" Server Startup ... "); + // 异步的进行关闭 + cf.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + // 释放线程组资源 + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + System.out.println(" Server ShutDown ... "); + } } }