[新增功能](master): 更新代码

1. 实现了基于百万长链接架构的消息推送服务系统
2. 进行了相关的设计及优化配置服务
master
土豆兄弟 2 years ago
parent 333647d1cf
commit 389ff387ef

@ -3,19 +3,21 @@
## 0. 项目说明
对后端java代码进行一些标准化的过程说明,帮助整个技术团队提升自己的技术规范及水平
## 1. 项目内容说明
dev-protocol-test
- SpringBoot项目Test的编写规范及使用标准
- 参考: https://mp.weixin.qq.com/s/W5v8zOCHbc2_NvobMGaU8w
- SpringBoot项目Test的编写规范及使用标准
- 参考: https://mp.weixin.qq.com/s/W5v8zOCHbc2_NvobMGaU8w
dev-protocol-log
- 分布式日志系统的设计及实现,主要涉及kafka Springboot ELK
- 分布式日志系统的设计及实现,主要涉及kafka Springboot ELK
dev-protocol-gateway
- 智能网关设计
- 智能网关设计
dev-protocol-devops
- DevOps 相关的最佳实现
- DevOps 相关的最佳实现
dev-protocol-shardingtask
- 配置多数据源和分表分库实现
- 配置多数据源和分表分库实现
### 1.1 基本命令(dev-protocol-log)
- Kafka
@ -36,17 +38,19 @@ dev-protocol-shardingtask
kafka-console-consumer.sh --bootstrap-server 172.16.26.183:9092 --topic topic1 --from-beginning
kafka查看消费进度当我们需要查看一个消费者组的消费进度时则使用下面的命令(启动Consumer的时候才会真的生效)【这条命令对查询消费特别重要】
kafka-consumer-groups.sh --bootstrap-server 172.16.26.183:9092 --describe --group group1
### 1.2 注意事项(dev-protocol-log)
一致性确定
命令中的 --bootstrap-server 和配置文件中的要一致
# 允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 外部代理地址
advertised.listeners=PLAINTEXT://121.201.64.12:9092
https://blog.csdn.net/pbrlovejava/article/details/103451302
命令中的 --bootstrap-server 和配置文件中的要一致
# 允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 外部代理地址
advertised.listeners=PLAINTEXT://121.201.64.12:9092
https://blog.csdn.net/pbrlovejava/article/details/103451302
国内下载镜像加速
https://www.newbe.pro/
https://www.newbe.pro/
### 1.3 海量日志收集架构设计(dev-protocol-log)
- 海量日志收集架构示意图
@ -116,6 +120,7 @@ dev-protocol-jpaauditing3 (正式环境使用)
### 原理分析
1. 从 @EnableJpaAuditing 入手分析
1. package org.springframework.data.jpa.repository.config;
2. @Import(JpaAuditingRegistrar.class)
@ -126,9 +131,11 @@ dev-protocol-jpaauditing3 (正式环境使用)
---
步入 org.springframework.data.jpa.repository.config.JpaAuditingRegistrar.registerBeanDefinitions 方法
1. super.registerBeanDefinitions(annotationMetadata, registry);
2. registerInfrastructureBeanWithId(...);
步入 org.springframework.data.jpa.repository.config.JpaAuditingRegistrar.registerBeanDefinitions 方法
1. super.registerBeanDefinitions(annotationMetadata, registry);
2. registerInfrastructureBeanWithId(...);
---
2. 打开 AuditingEntityListener 的源码
@ -147,6 +154,7 @@ dev-protocol-jpaauditing3 (正式环境使用)
使用相对应的注解去注册进Spring容器
使用注册的回调函数进行注解化设置
## 正确使用 @Entity 里面的回调方法
### 概念掌握
@ -171,4 +179,5 @@ Java Persistence API 里面规定的回调方法有哪些?
在实体中定义一些通用逻辑, 然后在对应 Listener中进行调用时机指定 (参数的合理化和健壮性检测)
JPA Callbacks 的实现原理,事件机制
### JPA Callbacks 的最佳实践

@ -20,7 +20,19 @@ WorkGroup <---- NettyClient -------------------------> NettyServer (Boss
| |
| |
|/ |/
ClientHandler ServerHandler[如果业务很复杂的话,耗时很长的话,会影响性能]
ClientHandler ServerHandler[如果业务很复杂的话,耗时很长的话,会影响性能
| |/
|/ DATA1
DATA2 |
| |
|/ producer 生产者
producer 生产者 |
| |/
|/ disruptor组件
disruptor组件 | |
| | |/ |/
|/ |/ consumer1 consumer2
consumer1 consumer2
---

@ -1,6 +1,12 @@
package com.baiye;
import com.baiye.client.MessageConsumerImpl4Client;
import com.baiye.client.NettyClient;
import com.baiye.disruptor.MessageConsumer;
import com.baiye.disruptor.RingBufferWorkerPoolFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -14,6 +20,25 @@ public class NettyClientApplication {
// FIXME: 2022/7/12 0012 这里直接进行调用进行模拟,整合的话,要把消息接口对外暴露,然后进行发送即可
// 建立链接,并发送消息
// 消费者构建为4个,测试使用
MessageConsumer[] consumers = new MessageConsumer[4];
for (int i = 0; i < consumers.length; i++) {
MessageConsumerImpl4Client messageConsumer = new MessageConsumerImpl4Client("code:ClientId:" + i);
consumers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory.getInstance().initAndStart(
ProducerType.MULTI,
1024 * 1024,
// FIXME: 2022/7/13 0013 new YieldingWaitStrategy() 的性能是最高的,多线程的自旋操作,可能会把CPU搞炸 new BlockingWaitStrategy() 对CPU是最友好的
// TODO: 2022/7/13 0013 思考怎么进行CPU性能折中的优化方案
new BlockingWaitStrategy(),
consumers
);
new NettyClient().sendMessage();
}
}

@ -1,5 +1,7 @@
package com.baiye.client;
import com.baiye.disruptor.MessageProducer;
import com.baiye.disruptor.RingBufferWorkerPoolFactory;
import com.baiye.dto.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -15,6 +17,9 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 测试代码
/*
try{
TranslatorData response = (TranslatorData) msg;
System.out.println("Client端 :" + response);
@ -23,6 +28,12 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
// 最后一步如果有写就不需要考虑手动释放, 因为写的底层是用递归写完之后进行释放的
ReferenceCountUtil.release(msg);
}
*/
TranslatorData response = (TranslatorData) msg;
// fixme 这个Id 注意不要和producer端的生产者Id重复了,最好进行统一生成
String producerId = "code:SesionId:002";
MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
messageProducer.onData(response, ctx);
}

@ -0,0 +1,30 @@
package com.baiye.client;
import com.baiye.disruptor.MessageConsumer;
import com.baiye.dto.TranslatorData;
import com.baiye.dto.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class MessageConsumerImpl4Client extends MessageConsumer {
public MessageConsumerImpl4Client(String consumerId) {
super(consumerId);
}
@Override
public void onEvent(TranslatorDataWapper wapper) throws Exception {
TranslatorData response = wapper.getTranslatorData();
ChannelHandlerContext ctx = wapper.getCtx();
try{
// TODO: 2022/7/13 0013 这里替代为实际的业务处理
System.out.println("Client端 :" + response);
}finally {
// 消息在buffer中,因此要进行释放 -> 方便进行GC
// 最后一步如果有写就不需要考虑手动释放, 因为写的底层是用递归写完之后进行释放的
ReferenceCountUtil.release(response);
}
}
}

@ -0,0 +1,22 @@
package com.baiye.disruptor;
import com.baiye.dto.TranslatorDataWapper;
import com.lmax.disruptor.WorkHandler;
import lombok.Data;
/**
* onEvent Netty
* @author q
*/
@Data
public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> {
/**
* Id
*/
protected String consumerId;
public MessageConsumer(String consumerId) {
this.consumerId = consumerId;
}
}

@ -0,0 +1,32 @@
package com.baiye.disruptor;
import com.baiye.dto.TranslatorData;
import com.baiye.dto.TranslatorDataWapper;
import com.lmax.disruptor.RingBuffer;
import io.netty.channel.ChannelHandlerContext;
public class MessageProducer {
private String producerId;
private RingBuffer<TranslatorDataWapper> ringBuffer;
public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
this.producerId = producerId;
this.ringBuffer = ringBuffer;
}
public void onData(TranslatorData data, ChannelHandlerContext ctx) {
long sequence = ringBuffer.next();
try{
TranslatorDataWapper wapper = ringBuffer.get(sequence);
wapper.setTranslatorData(data);
wapper.setCtx(ctx);
}finally {
ringBuffer.publish(sequence);
}
}
}

@ -0,0 +1,144 @@
package com.baiye.disruptor;
import com.baiye.dto.TranslatorDataWapper;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
/**
* -
*
* @author q
* @date 2022/07/13
*/
public class RingBufferWorkerPoolFactory {
/**
*
*/
public static class SingletonHolder{
static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
}
private RingBufferWorkerPoolFactory(){
}
/**
*
* @return
*/
public static RingBufferWorkerPoolFactory getInstance() {
return SingletonHolder.instance;
}
/**
* -
*/
private static Map<String, MessageProducer> producers = new ConcurrentHashMap<>();
/**
* -
*/
private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<>();
/*
Disruptor
*/
private RingBuffer<TranslatorDataWapper> ringBuffer;
private SequenceBarrier sequenceBarrier;
private WorkerPool<TranslatorDataWapper> workerPool;
/**
*
*
* @param producerType
* @param bufferSize
* @param waitStrategy
* @param messageConsumers
*/
public void initAndStart(ProducerType producerType, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
// 1. 构建RingBuffer对象
this.ringBuffer = RingBuffer.create(
producerType,
TranslatorDataWapper::new,
bufferSize,
waitStrategy);
// 2. 设置序号栅栏
this.sequenceBarrier = this.ringBuffer.newBarrier();
// 3. 设置工作池
this.workerPool = new WorkerPool<>(
this.ringBuffer,
this.sequenceBarrier,
new EventExceptionHandler(),
messageConsumers
);
// 4. 把所构建的消费者放入池中
for (MessageConsumer messageConsumer : messageConsumers) {
consumers.put(messageConsumer.getConsumerId(), messageConsumer);
}
// 5. 添加我们的 sequence 集合
this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
// 6. 启动我们的工作池 -> 这边一定要用自定义线程池
// FIXME: 2022/7/13 0013 进行自定义线程池
this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2));
}
/**
* MessageProducer
*
* @param producerId MessageProducerId
* @return {@link MessageProducer}
*/
public MessageProducer getMessageProducer(String producerId) {
MessageProducer messageProducer = producers.get(producerId);
if (null == messageProducer){
messageProducer = new MessageProducer(producerId, this.ringBuffer);
producers.put(producerId, messageProducer);
}
return messageProducer;
}
/**
*
*
*
*
* @author q
* @date 2022/07/13
*/
static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper>{
@Override
public void handleEventException(Throwable throwable, long l, TranslatorDataWapper translatorDataWapper) {
// TODO: 2022/7/13 0013 todo...
}
@Override
public void handleOnStartException(Throwable throwable) {
// TODO: 2022/7/13 0013 todo...
}
@Override
public void handleOnShutdownException(Throwable throwable) {
// TODO: 2022/7/13 0013 todo...
}
}
}

@ -0,0 +1,22 @@
package com.baiye.dto;
import io.netty.channel.ChannelHandlerContext;
import lombok.Data;
/**
* Netty 使 Disruptor 使
*
* @author q
* @date 2022/07/13
*/
@Data
public class TranslatorDataWapper {
private TranslatorData translatorData;
private ChannelHandlerContext ctx;
}

@ -1,6 +1,11 @@
package com.baiye;
import com.baiye.disruptor.MessageConsumer;
import com.baiye.disruptor.RingBufferWorkerPoolFactory;
import com.baiye.server.MessageConsumerImpl4Server;
import com.baiye.server.NettyServer;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -17,6 +22,21 @@ public class NettyServerApplication {
SpringApplication.run(NettyServerApplication.class, args);
// FIXME: 2022/7/12 0012 后续改成SpringBoot的方式进行初始化Server端
// 消费者构建为4个,测试使用
MessageConsumer[] consumers = new MessageConsumer[4];
for (int i = 0; i < consumers.length; i++) {
MessageConsumerImpl4Server messageConsumer = new MessageConsumerImpl4Server("code:sessionId:" + i);
consumers[i] = messageConsumer;
}
RingBufferWorkerPoolFactory.getInstance().initAndStart(
ProducerType.MULTI,
1024 * 1024,
new BlockingWaitStrategy(),
consumers
);
new NettyServer();
}
}

@ -0,0 +1,42 @@
package com.baiye.server;
import com.baiye.disruptor.MessageConsumer;
import com.baiye.dto.TranslatorData;
import com.baiye.dto.TranslatorDataWapper;
import io.netty.channel.ChannelHandlerContext;
/**
* Disruptor
*
* @author q
* @date 2022/07/13
*/
public class MessageConsumerImpl4Server extends MessageConsumer {
/**
*
* @param consumerId Id
*/
public MessageConsumerImpl4Server(String consumerId) {
super(consumerId);
}
@Override
public void onEvent(TranslatorDataWapper wapper) throws Exception {
TranslatorData request = wapper.getTranslatorData();
ChannelHandlerContext ctx = wapper.getCtx();
// 1. 业务处理逻辑
System.out.println("Server 端数据 " + request.toString());
// 2. 设计响应回送数据
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
// 3. 写数据并刷新
ctx.writeAndFlush(response);
}
}

@ -1,5 +1,7 @@
package com.baiye.server;
import com.baiye.disruptor.MessageProducer;
import com.baiye.disruptor.RingBufferWorkerPoolFactory;
import com.baiye.dto.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -15,6 +17,9 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 测试代码
/*
// 接收到数据 数据已经进行了编解码, 所以直接进行转换
TranslatorData request = (TranslatorData) msg;
System.out.println("Server 端数据 " + request.toString());
@ -27,6 +32,14 @@ public class ServerHandler extends ChannelInboundHandlerAdapter {
// 写数据并刷新
ctx.writeAndFlush(response);
*/
TranslatorData request = (TranslatorData) msg;
// FIXME: 2022/7/13 0013 producerId 自己的应用服务应该有一个ID生成规则,这里先进行写死
String producerId = "code:sessionId:001";
// 拿到生产者
MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
// 传送数据
messageProducer.onData(request, ctx);
}
}

Loading…
Cancel
Save