diff --git a/README.md b/README.md index 9f44b7f..5bb5645 100644 --- a/README.md +++ b/README.md @@ -3,50 +3,54 @@ ## 0. 项目说明 对后端java代码进行一些标准化的过程说明,帮助整个技术团队提升自己的技术规范及水平 + ## 1. 项目内容说明 -dev-protocol-test - - SpringBoot项目Test的编写规范及使用标准 - - 参考: https://mp.weixin.qq.com/s/W5v8zOCHbc2_NvobMGaU8w +dev-protocol-test +- SpringBoot项目Test的编写规范及使用标准 +- 参考: https://mp.weixin.qq.com/s/W5v8zOCHbc2_NvobMGaU8w dev-protocol-log - - 分布式日志系统的设计及实现,主要涉及kafka Springboot ELK -dev-protocol-gateway - - 智能网关设计 +- 分布式日志系统的设计及实现,主要涉及kafka Springboot ELK +dev-protocol-gateway +- 智能网关设计 dev-protocol-devops - - DevOps 相关的最佳实现 +- DevOps 相关的最佳实现 dev-protocol-shardingtask - - 配置多数据源和分表分库实现 +- 配置多数据源和分表分库实现 + ### 1.1 基本命令(dev-protocol-log) - Kafka - 查看topic列表命令(连接其中一个就好了): - 【旧版】kafka-topics.sh --zookeeper 172.16.26.183:2181 --list - 【新版】kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --list - (– zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在) - 创建topic主题 - kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --create --topic topic1 --partitions 1 --replication-factor 3 - --create 命令后 --topic 为创建topic 并指定 topic name - --partitions 为指定分区数量 - --replication-factor 为指定副本集数量 - 向kafka集群发送数据 - 【无key型消息】kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 - 【有key型消息】 kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true - (默认消息键与消息值间使用“Tab键”进行分隔,切勿使用转义字符(\t)) - kafka命令接受数据 - kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning - kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】 - kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1 + 查看topic列表命令(连接其中一个就好了): + 【旧版】kafka-topics.sh --zookeeper 172.16.26.183:2181 --list + 【新版】kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --list + (– zookeeper is not a recognized option主要原因是 Kafka 版本过高,命令不存在) + 创建topic主题 + kafka-topics.sh --bootstrap-server 172.16.26.183:9092 --create --topic topic1 --partitions 1 --replication-factor 3 + --create 命令后 --topic 为创建topic 并指定 topic name + --partitions 为指定分区数量 + --replication-factor 为指定副本集数量 + 向kafka集群发送数据 + 【无key型消息】kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 + 【有key型消息】 kafka-console-producer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --property parse.key=true + (默认消息键与消息值间使用“Tab键”进行分隔,切勿使用转义字符(\t)) + kafka命令接受数据 + kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning + kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】 + kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1 + ### 1.2 注意事项(dev-protocol-log) 一致性确定 - 命令中的 --bootstrap-server 和配置文件中的要一致 - # 允许外部端口连接 - listeners=PLAINTEXT://0.0.0.0:9092 - # 外部代理地址 - advertised.listeners=PLAINTEXT://121.201.64.12:9092 - https://blog.csdn.net/pbrlovejava/article/details/103451302 +命令中的 --bootstrap-server 和配置文件中的要一致 +# 允许外部端口连接 +listeners=PLAINTEXT://0.0.0.0:9092 +# 外部代理地址 +advertised.listeners=PLAINTEXT://121.201.64.12:9092 +https://blog.csdn.net/pbrlovejava/article/details/103451302 国内下载镜像加速 - https://www.newbe.pro/ +https://www.newbe.pro/ + ### 1.3 海量日志收集架构设计(dev-protocol-log) - 海量日志收集架构示意图 @@ -116,6 +120,7 @@ dev-protocol-jpaauditing3 (正式环境使用) ### 原理分析 1. 从 @EnableJpaAuditing 入手分析 + 1. package org.springframework.data.jpa.repository.config; 2. @Import(JpaAuditingRegistrar.class) @@ -126,9 +131,11 @@ dev-protocol-jpaauditing3 (正式环境使用) --- - 步入 org.springframework.data.jpa.repository.config.JpaAuditingRegistrar.registerBeanDefinitions 方法 - 1. super.registerBeanDefinitions(annotationMetadata, registry); - 2. registerInfrastructureBeanWithId(...); +步入 org.springframework.data.jpa.repository.config.JpaAuditingRegistrar.registerBeanDefinitions 方法 + +1. super.registerBeanDefinitions(annotationMetadata, registry); +2. registerInfrastructureBeanWithId(...); + --- 2. 打开 AuditingEntityListener 的源码 @@ -147,6 +154,7 @@ dev-protocol-jpaauditing3 (正式环境使用) 使用相对应的注解去注册进Spring容器 使用注册的回调函数进行注解化设置 + ## 正确使用 @Entity 里面的回调方法 ### 概念掌握 @@ -171,4 +179,5 @@ Java Persistence API 里面规定的回调方法有哪些? 在实体中定义一些通用逻辑, 然后在对应 Listener中进行调用时机指定 (参数的合理化和健壮性检测) JPA Callbacks 的实现原理,事件机制 + ### JPA Callbacks 的最佳实践 diff --git a/longpolling/demo/demo1/README.md b/longpolling/demo/demo1/README.md index d9a4495..39e7f5b 100644 --- a/longpolling/demo/demo1/README.md +++ b/longpolling/demo/demo1/README.md @@ -20,7 +20,19 @@ WorkGroup <---- NettyClient -------------------------> NettyServer (Boss | | | | |/ |/ -ClientHandler ServerHandler[如果业务很复杂的话,耗时很长的话,会影响性能] +ClientHandler ServerHandler[如果业务很复杂的话,耗时很长的话,会影响性能 +| |/ +|/ DATA1 +DATA2 | +| | +|/ producer 生产者 +producer 生产者 | +| |/ +|/ disruptor组件 +disruptor组件 | | +| | |/ |/ +|/ |/ consumer1 consumer2 +consumer1 consumer2 --- diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java index 16272b7..6dc9208 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/NettyClientApplication.java @@ -1,6 +1,12 @@ package com.baiye; +import com.baiye.client.MessageConsumerImpl4Client; import com.baiye.client.NettyClient; +import com.baiye.disruptor.MessageConsumer; +import com.baiye.disruptor.RingBufferWorkerPoolFactory; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.YieldingWaitStrategy; +import com.lmax.disruptor.dsl.ProducerType; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -14,6 +20,25 @@ public class NettyClientApplication { // FIXME: 2022/7/12 0012 这里直接进行调用进行模拟,整合的话,要把消息接口对外暴露,然后进行发送即可 // 建立链接,并发送消息 + + // 消费者构建为4个,测试使用 + MessageConsumer[] consumers = new MessageConsumer[4]; + for (int i = 0; i < consumers.length; i++) { + MessageConsumerImpl4Client messageConsumer = new MessageConsumerImpl4Client("code:ClientId:" + i); + consumers[i] = messageConsumer; + } + + RingBufferWorkerPoolFactory.getInstance().initAndStart( + ProducerType.MULTI, + 1024 * 1024, + // FIXME: 2022/7/13 0013 new YieldingWaitStrategy() 的性能是最高的,多线程的自旋操作,可能会把CPU搞炸 new BlockingWaitStrategy() 对CPU是最友好的 + // TODO: 2022/7/13 0013 思考怎么进行CPU性能折中的优化方案 + new BlockingWaitStrategy(), + consumers + ); + + + new NettyClient().sendMessage(); } } diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java index a730566..f91a8fb 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/ClientHandler.java @@ -1,5 +1,7 @@ package com.baiye.client; +import com.baiye.disruptor.MessageProducer; +import com.baiye.disruptor.RingBufferWorkerPoolFactory; import com.baiye.dto.TranslatorData; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -15,6 +17,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + + // 测试代码 +/* try{ TranslatorData response = (TranslatorData) msg; System.out.println("Client端 :" + response); @@ -23,6 +28,12 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { // 最后一步如果有写就不需要考虑手动释放, 因为写的底层是用递归写完之后进行释放的 ReferenceCountUtil.release(msg); } +*/ + TranslatorData response = (TranslatorData) msg; + // fixme 这个Id 注意不要和producer端的生产者Id重复了,最好进行统一生成 + String producerId = "code:SesionId:002"; + MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId); + messageProducer.onData(response, ctx); } diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/MessageConsumerImpl4Client.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/MessageConsumerImpl4Client.java new file mode 100644 index 0000000..c932408 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-client/src/main/java/com/baiye/client/MessageConsumerImpl4Client.java @@ -0,0 +1,30 @@ +package com.baiye.client; + +import com.baiye.disruptor.MessageConsumer; +import com.baiye.dto.TranslatorData; +import com.baiye.dto.TranslatorDataWapper; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.ReferenceCountUtil; + +public class MessageConsumerImpl4Client extends MessageConsumer { + + + public MessageConsumerImpl4Client(String consumerId) { + super(consumerId); + } + + @Override + public void onEvent(TranslatorDataWapper wapper) throws Exception { + TranslatorData response = wapper.getTranslatorData(); + ChannelHandlerContext ctx = wapper.getCtx(); + + try{ + // TODO: 2022/7/13 0013 这里替代为实际的业务处理 + System.out.println("Client端 :" + response); + }finally { + // 消息在buffer中,因此要进行释放 -> 方便进行GC + // 最后一步如果有写就不需要考虑手动释放, 因为写的底层是用递归写完之后进行释放的 + ReferenceCountUtil.release(response); + } + } +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/MessageConsumer.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/MessageConsumer.java new file mode 100644 index 0000000..ee8cf51 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/MessageConsumer.java @@ -0,0 +1,22 @@ +package com.baiye.disruptor; + +import com.baiye.dto.TranslatorDataWapper; +import com.lmax.disruptor.WorkHandler; +import lombok.Data; + +/** + * onEvent 方法不在这里进行实现 具体的消费逻辑给Netty去进行实现 + * @author q + */ +@Data +public abstract class MessageConsumer implements WorkHandler { + + /** + * 消费者Id + */ + protected String consumerId; + + public MessageConsumer(String consumerId) { + this.consumerId = consumerId; + } +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/MessageProducer.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/MessageProducer.java new file mode 100644 index 0000000..c911794 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/MessageProducer.java @@ -0,0 +1,32 @@ +package com.baiye.disruptor; + +import com.baiye.dto.TranslatorData; +import com.baiye.dto.TranslatorDataWapper; +import com.lmax.disruptor.RingBuffer; +import io.netty.channel.ChannelHandlerContext; + +public class MessageProducer { + + private String producerId; + + private RingBuffer ringBuffer; + + public MessageProducer(String producerId, RingBuffer ringBuffer) { + this.producerId = producerId; + this.ringBuffer = ringBuffer; + } + + public void onData(TranslatorData data, ChannelHandlerContext ctx) { + long sequence = ringBuffer.next(); + + try{ + TranslatorDataWapper wapper = ringBuffer.get(sequence); + wapper.setTranslatorData(data); + wapper.setCtx(ctx); + + }finally { + ringBuffer.publish(sequence); + } + + } +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/RingBufferWorkerPoolFactory.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/RingBufferWorkerPoolFactory.java new file mode 100644 index 0000000..6912c43 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/disruptor/RingBufferWorkerPoolFactory.java @@ -0,0 +1,144 @@ +package com.baiye.disruptor; + +import com.baiye.dto.TranslatorDataWapper; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; + +/** + * 环形缓冲池工厂类 - 单例模式 + * + * @author q + * @date 2022/07/13 + */ +public class RingBufferWorkerPoolFactory { + + + /** + * 静态工厂 + */ + public static class SingletonHolder{ + static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory(); + } + + private RingBufferWorkerPoolFactory(){ + + } + + /** + * 对外调用 + * @return 返回工厂实例 + */ + public static RingBufferWorkerPoolFactory getInstance() { + return SingletonHolder.instance; + } + + /** + * 生产者池 - 管理生产者 + */ + private static Map producers = new ConcurrentHashMap<>(); + + /** + * 消费者池 - 管理消费者 + */ + private static Map consumers = new ConcurrentHashMap<>(); + + /* + 定义 Disruptor 需要的元素 + */ + private RingBuffer ringBuffer; + + private SequenceBarrier sequenceBarrier; + + private WorkerPool workerPool; + + + /** + * 初始化并启动 + * + * @param producerType 生产者类型 + * @param bufferSize 缓冲区大小 + * @param waitStrategy 等待策略 + * @param messageConsumers 消息的消费者组 + */ + public void initAndStart(ProducerType producerType, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) { + + // 1. 构建RingBuffer对象 + this.ringBuffer = RingBuffer.create( + producerType, + TranslatorDataWapper::new, + bufferSize, + waitStrategy); + + // 2. 设置序号栅栏 + this.sequenceBarrier = this.ringBuffer.newBarrier(); + + // 3. 设置工作池 + this.workerPool = new WorkerPool<>( + this.ringBuffer, + this.sequenceBarrier, + new EventExceptionHandler(), + messageConsumers + ); + + // 4. 把所构建的消费者放入池中 + for (MessageConsumer messageConsumer : messageConsumers) { + consumers.put(messageConsumer.getConsumerId(), messageConsumer); + } + + // 5. 添加我们的 sequence 集合 + this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences()); + + // 6. 启动我们的工作池 -> 这边一定要用自定义线程池 + // FIXME: 2022/7/13 0013 进行自定义线程池 + this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2)); + + } + + + /** + * 获取 MessageProducer + * + * @param producerId MessageProducerId + * @return {@link MessageProducer} + */ + public MessageProducer getMessageProducer(String producerId) { + MessageProducer messageProducer = producers.get(producerId); + if (null == messageProducer){ + messageProducer = new MessageProducer(producerId, this.ringBuffer); + producers.put(producerId, messageProducer); + } + return messageProducer; + } + + + /** + * 事件异常处理程序 + * + * + * + * @author q + * @date 2022/07/13 + */ + static class EventExceptionHandler implements ExceptionHandler{ + + @Override + public void handleEventException(Throwable throwable, long l, TranslatorDataWapper translatorDataWapper) { + // TODO: 2022/7/13 0013 todo... + } + + @Override + public void handleOnStartException(Throwable throwable) { + // TODO: 2022/7/13 0013 todo... + } + + @Override + public void handleOnShutdownException(Throwable throwable) { + // TODO: 2022/7/13 0013 todo... + } + } + +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/dto/TranslatorDataWapper.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/dto/TranslatorDataWapper.java new file mode 100644 index 0000000..41d28e9 --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-common/src/main/java/com/baiye/dto/TranslatorDataWapper.java @@ -0,0 +1,22 @@ +package com.baiye.dto; + +import io.netty.channel.ChannelHandlerContext; +import lombok.Data; + +/** + * 包装 Netty 使用的数据类型给 Disruptor 进行使用 + * + * @author q + * @date 2022/07/13 + */ +@Data +public class TranslatorDataWapper { + + private TranslatorData translatorData; + + private ChannelHandlerContext ctx; + + + + +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java index 538aa6d..b8d7af7 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/NettyServerApplication.java @@ -1,6 +1,11 @@ package com.baiye; +import com.baiye.disruptor.MessageConsumer; +import com.baiye.disruptor.RingBufferWorkerPoolFactory; +import com.baiye.server.MessageConsumerImpl4Server; import com.baiye.server.NettyServer; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.dsl.ProducerType; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -17,6 +22,21 @@ public class NettyServerApplication { SpringApplication.run(NettyServerApplication.class, args); // FIXME: 2022/7/12 0012 后续改成SpringBoot的方式进行初始化Server端 + + // 消费者构建为4个,测试使用 + MessageConsumer[] consumers = new MessageConsumer[4]; + for (int i = 0; i < consumers.length; i++) { + MessageConsumerImpl4Server messageConsumer = new MessageConsumerImpl4Server("code:sessionId:" + i); + consumers[i] = messageConsumer; + } + + RingBufferWorkerPoolFactory.getInstance().initAndStart( + ProducerType.MULTI, + 1024 * 1024, + new BlockingWaitStrategy(), + consumers + ); + new NettyServer(); } } diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/MessageConsumerImpl4Server.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/MessageConsumerImpl4Server.java new file mode 100644 index 0000000..77ad8ec --- /dev/null +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/MessageConsumerImpl4Server.java @@ -0,0 +1,42 @@ +package com.baiye.server; + +import com.baiye.disruptor.MessageConsumer; +import com.baiye.dto.TranslatorData; +import com.baiye.dto.TranslatorDataWapper; +import io.netty.channel.ChannelHandlerContext; + +/** + * 服务端的 Disruptor 消费者 + * + * @author q + * @date 2022/07/13 + */ +public class MessageConsumerImpl4Server extends MessageConsumer { + + /** + * 抽象方法中已经进行实现 + * @param consumerId 消费者Id + */ + public MessageConsumerImpl4Server(String consumerId) { + super(consumerId); + } + + @Override + public void onEvent(TranslatorDataWapper wapper) throws Exception { + + TranslatorData request = wapper.getTranslatorData(); + ChannelHandlerContext ctx = wapper.getCtx(); + + // 1. 业务处理逻辑 + System.out.println("Server 端数据 : " + request.toString()); + + // 2. 设计响应回送数据 + TranslatorData response = new TranslatorData(); + response.setId("resp: " + request.getId()); + response.setName("resp: " + request.getName()); + response.setMessage("resp: " + request.getMessage()); + + // 3. 写数据并刷新 + ctx.writeAndFlush(response); + } +} diff --git a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/ServerHandler.java b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/ServerHandler.java index e91e8a1..f7ee10f 100644 --- a/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/ServerHandler.java +++ b/longpolling/demo/demo1/dev-protocol-disruptor-netty-server/src/main/java/com/baiye/server/ServerHandler.java @@ -1,5 +1,7 @@ package com.baiye.server; +import com.baiye.disruptor.MessageProducer; +import com.baiye.disruptor.RingBufferWorkerPoolFactory; import com.baiye.dto.TranslatorData; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -15,6 +17,9 @@ public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 测试代码 + +/* // 接收到数据 数据已经进行了编解码, 所以直接进行转换 TranslatorData request = (TranslatorData) msg; System.out.println("Server 端数据 : " + request.toString()); @@ -27,6 +32,14 @@ public class ServerHandler extends ChannelInboundHandlerAdapter { // 写数据并刷新 ctx.writeAndFlush(response); +*/ + TranslatorData request = (TranslatorData) msg; + // FIXME: 2022/7/13 0013 producerId 自己的应用服务应该有一个ID生成规则,这里先进行写死 + String producerId = "code:sessionId:001"; + // 拿到生产者 + MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId); + // 传送数据 + messageProducer.onData(request, ctx); } }