commit 2aff309853de146e0a3c72be77a7a181cf6fb020
Author: bynt <13586541001@163.com>
Date: Fri Jul 5 10:44:08 2024 +0800
初始化项目结构
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..39dbaab
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,373 @@
+
+
+ 4.0.0
+ com.eco
+ customer-service-websocket
+ 0.0.1
+ customer-service-websocket
+ customer-service-websocket
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.0.4.RELEASE
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ Finchley.RELEASE
+
+
+
+
+
+
+ com.googlecode.protobuf-java-format
+ protobuf-java-format
+ 1.2
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-discovery
+
+
+
+
+ com.alibaba.nacos
+ nacos-client
+ 2.1.0
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-config
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-undertow
+
+
+ io.undertow
+ undertow-websockets-jsr
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-freemarker
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+
+
+ io.netty
+ netty-all
+ 4.1.25.Final
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.16
+ provided
+
+
+
+ commons-codec
+ commons-codec
+
+
+
+ commons-fileupload
+ commons-fileupload
+ 1.3.1
+
+
+
+
+ com.github.binarywang
+ java-emoji-converter
+ 0.1.1
+
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
+ cn.hutool
+ hutool-all
+ 5.8.5
+
+
+
+
+ jaxen
+ jaxen
+ 1.2.0
+
+
+
+ net.sf.json-lib
+ json-lib
+ 2.2.2
+ jdk15
+
+
+
+ xom
+ xom
+ 1.2.5
+
+
+
+ xom
+ xom
+ 1.2.5
+ sources
+
+
+
+ ws.schild
+ jave-all-deps
+ 3.1.1
+
+
+
+ com.jcraft
+ jsch
+ 0.1.54
+
+
+
+ com.eco.common
+ fission-base
+ 0.0.1
+
+
+ org.apache.dubbo
+ dubbo-registry-nacos
+
+
+ org.apache.dubbo
+ dubbo-spring-boot-starter
+
+
+ hutool-all
+ cn.hutool
+
+
+ mybatis-plus-boot-starter
+ com.baomidou
+
+
+
+
+
+ org.apache.dubbo
+ dubbo-registry-nacos
+ 2.7.4.1
+
+
+
+ org.apache.dubbo
+ dubbo-spring-boot-starter
+ 2.7.3
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.5.5
+
+
+
+ com.eco.common
+ common-job
+ 0.0.1
+
+
+ hutool-all
+ cn.hutool
+
+
+
+
+
+ org.springframework.retry
+ spring-retry
+
+
+
+ io.minio
+ minio
+ 8.5.2
+
+
+
+ com.squareup.okhttp3
+ okhttp
+ 4.2.2
+
+
+
+ com.eco
+ common
+ 0.0.1
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-dependencies
+ 2.0.0.RELEASE
+ pom
+ import
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ 1.8
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+
+ UTF-8
+
+ @
+ ${*}
+
+ false
+
+
+
+
+
+
+ src/main/resources
+ true
+
+
+
+
+
+
+ local
+
+ local
+ 172.18.0.228:8848
+ d5123679-020d-46d6-8ac8-f17bdd08e8e2
+
+
+
+ true
+
+
+
+
+ inner
+
+ inner
+ 172.18.0.228:8848
+ d5123679-020d-46d6-8ac8-f17bdd08e8e2
+
+
+
+ true
+
+
+
+
+ prod
+
+ prod
+ 192.168.1.2:8848
+ 8c3bab0f-c9b4-4f4d-a833-487f7c1977db
+ classpath:logback-client.xml
+
+
+
+
+ prod_wss
+
+ prod_wss
+ 192.168.1.2:8848
+ 8c3bab0f-c9b4-4f4d-a833-487f7c1977db
+ classpath:logback-client.xml
+
+
+
+
+
+
+
+
diff --git a/src/main/java/com/eco/websocket/CustomerServiceWebsocketApplication.java b/src/main/java/com/eco/websocket/CustomerServiceWebsocketApplication.java
new file mode 100644
index 0000000..c654d8b
--- /dev/null
+++ b/src/main/java/com/eco/websocket/CustomerServiceWebsocketApplication.java
@@ -0,0 +1,15 @@
+package com.eco.websocket;
+
+import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@EnableDubbo
+@SpringBootApplication
+public class CustomerServiceWebsocketApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(CustomerServiceWebsocketApplication.class, args);
+ }
+
+}
diff --git a/src/main/java/com/eco/websocket/netty/NettyConnectionUtil.java b/src/main/java/com/eco/websocket/netty/NettyConnectionUtil.java
new file mode 100644
index 0000000..a937de1
--- /dev/null
+++ b/src/main/java/com/eco/websocket/netty/NettyConnectionUtil.java
@@ -0,0 +1,183 @@
+package com.eco.websocket.netty;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.eco.common.rpc.CommunityCustomerInfoService;
+import com.eco.common.rpc.NettyConnectionService;
+import com.eco.websocket.netty.utils.MessageUtil;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dubbo.config.annotation.Reference;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
+
+@Slf4j
+@Service
+@org.apache.dubbo.config.annotation.Service(protocol = DUBBO_PROTOCOL, timeout = 10000)
+public class NettyConnectionUtil implements NettyConnectionService {
+
+ /**
+ * key 为deviceid
+ * value 为channel 用于存贮通道
+ */
+ public static Map deviceid_nettyChannel = new ConcurrentHashMap<>();
+
+ /**
+ * key userId 微信唯一id 或者 pc端账号名
+ * value ChannelHandlerContext
+ */
+ public static Map userId_nettyChannel = new ConcurrentHashMap<>();
+
+ @Reference(check = false)
+ private CommunityCustomerInfoService communityCustomerInfoService;
+
+ /**
+ * 存储通道
+ */
+ public synchronized void saveDeviceChannel(ChannelHandlerContext cx, String deviceid) {
+ ChannelHandlerContext cc = getNettyChannelByDeviceId(deviceid);
+ if (null != cc) {
+ deviceid_nettyChannel.remove(deviceid);
+ }
+ deviceid_nettyChannel.put(deviceid, cx);
+ }
+
+ /**
+ * 注册真实用户,客户端以imei为准、pc端以账号用户名为准
+ */
+ public synchronized void registerUserid(String account, ChannelHandlerContext cx) {
+ log.info("register:当前netty通道连接数:{},有效通道连接数:{}", deviceid_nettyChannel.size(), userId_nettyChannel.size());
+ ChannelHandlerContext chc = userId_nettyChannel.get(account);
+ if (null != chc) {
+ userId_nettyChannel.remove(account);
+ }
+ userId_nettyChannel.put(account, cx);
+
+
+ // 更新客服上线
+ communityCustomerInfoService.updateOnlineStatus(account, true);
+ }
+
+
+ /**
+ * 通过deviceid获取通道
+ *
+ * @return
+ */
+ public static synchronized ChannelHandlerContext getNettyChannelByDeviceId(String deviceid) {
+ return deviceid_nettyChannel.get(deviceid);
+ }
+
+ /**
+ * 删除通道
+ */
+ public synchronized void removeChannel(ChannelHandlerContext cx) {
+ // 清除设备通道
+ try {
+ String nettyId = getNettyId(cx);
+ if (!StringUtils.isEmpty(nettyId)) {
+ String deviceid = getDeviceIdByNettyId(nettyId);
+ if (null != deviceid && !"".equals(deviceid)) {
+ deviceid_nettyChannel.remove(deviceid);
+ }
+
+ // 清除微信通道
+ String userId = getUserIdByNettyId(nettyId);
+ if (!StringUtils.isEmpty(userId)) {
+ userId_nettyChannel.remove(userId);
+ }
+
+ // 更新客服下线
+ communityCustomerInfoService.updateOnlineStatus(userId, false);
+
+ }
+ } catch (Exception e) {
+ log.error("error --> ", e);;
+ }
+ }
+
+ /**
+ * 客户端退出时调用,清理数据
+ */
+ public synchronized void exit(ChannelHandlerContext ctx) {
+ try {
+ log.info("exit:当前netty通道连接数:{},有效通道连接数:{}", deviceid_nettyChannel.size(), userId_nettyChannel.size());
+ removeChannel(ctx);
+ ctx.close();
+ } catch (Exception e) {
+ log.error("error --> ", e);;
+ }
+
+ }
+
+ /**
+ * 通过userId获取nettyId
+ */
+ public synchronized String getUserIdByNettyId(String nettyId) {
+ return getKey(userId_nettyChannel, nettyId);
+ }
+
+ /**
+ * 根据通道获取用户id
+ *
+ * @param cx 通道
+ * @return
+ */
+ public synchronized String getUserIdByNettyId(ChannelHandlerContext cx) {
+ return getUserIdByNettyId(cx.channel().id().asShortText());
+ }
+
+ /**
+ * 通过nettyid获取deviceid
+ */
+ public synchronized String getDeviceIdByNettyId(String nettyId) {
+ return getKey(deviceid_nettyChannel, nettyId);
+ }
+
+ public synchronized ChannelHandlerContext getClientChannelHandlerContextByUserId(String userId) {
+ if (userId == null) {
+ return null;
+ }
+ return userId_nettyChannel.get(userId);
+ }
+
+ @Override
+ public boolean containsKey(String userId) {
+
+ return userId_nettyChannel.containsKey(userId);
+ }
+
+ public synchronized String getNettyId(ChannelHandlerContext cx) {
+ return cx.channel().id().asShortText();
+ }
+
+ /**
+ * 根据map的value获取map的key
+ */
+ private synchronized String getKey(Map map, String value) {
+ for (Map.Entry entry : map.entrySet()) {
+ if (StrUtil.equals(value, getNettyId(entry.getValue()))) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void accountPush(String account, String enumMsgType, String refMsgId, JSONObject object) {
+ ChannelHandlerContext chxOther = getClientChannelHandlerContextByUserId(account);
+ if (null != chxOther) {
+ if (StringUtils.isNotEmpty(refMsgId)) {
+ MessageUtil.sendJsonStringMsg(chxOther, enumMsgType, getNettyId(chxOther), refMsgId, object);
+ } else {
+ MessageUtil.sendJsonStringMsg(chxOther, enumMsgType, getNettyId(chxOther), null, object);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/eco/websocket/netty/WebSocketServer.java b/src/main/java/com/eco/websocket/netty/WebSocketServer.java
new file mode 100644
index 0000000..f4c1593
--- /dev/null
+++ b/src/main/java/com/eco/websocket/netty/WebSocketServer.java
@@ -0,0 +1,83 @@
+package com.eco.websocket.netty;
+
+import com.eco.websocket.netty.handler.WebSocketJsonHandler;
+import com.eco.websocket.utils.PropertyUtils;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+@Service
+@Slf4j
+public class WebSocketServer {
+
+ @Resource
+ private Environment env;
+
+ @Autowired
+ private WebSocketJsonHandler webSocketFrameHandler;
+
+ // 程序初始方法入口注解,提示spring这个程序先执行这里
+ @PostConstruct
+ public void nettyMain() {
+ new Thread(() -> {
+ // 1 创建线两个程组
+ // 一个是用于处理服务器端接收客户端连接的
+ // 一个是进行网络通信的(网络读写的)
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ // 2 创建辅助工具类,用于服务器通道的一系列配置
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup);
+ b.channel(NioServerSocketChannel.class);// 指定NIO的模式
+// b.option(ChannelOption.SO_KEEPALIVE, true); // 保持连接
+ b.childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ pipeline.addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ pipeline.addLast(new ChunkedWriteHandler());
+ //netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
+ pipeline.addLast(new HttpObjectAggregator(65536));
+ pipeline.addLast(new WebSocketServerProtocolHandler("/"));
+ pipeline.addLast(webSocketFrameHandler);
+ }
+ });
+
+
+ // 3、绑定端口 同步等待成功
+ Integer port = PropertyUtils.getNettyWebsocketPort(env);
+ ChannelFuture f = b.bind(port).sync();
+ log.info("netty启动成功。。。" + "websocket占用端口" + port);
+
+ // 4、等待服务端监听端口关闭
+ f.channel().closeFuture().sync();
+
+ } catch (Exception e) {
+ log.info("netty启动失败 -- > ", e);
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }).start();
+ }
+
+}
diff --git a/src/main/java/com/eco/websocket/netty/async/AsyncTaskService.java b/src/main/java/com/eco/websocket/netty/async/AsyncTaskService.java
new file mode 100644
index 0000000..0d771e8
--- /dev/null
+++ b/src/main/java/com/eco/websocket/netty/async/AsyncTaskService.java
@@ -0,0 +1,57 @@
+package com.eco.websocket.netty.async;
+
+import Jubo.JuLiao.IM.Wx.Proto.TransportMessageOuterClass;
+import com.google.protobuf.Message;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+
+@Service
+@EnableAsync
+@Slf4j
+public class AsyncTaskService {
+
+
+ /**
+ * 转发消息给手机客户端
+ */
+ @Async
+ public void msgSend2Phone(ChannelHandlerContext ctx, String wechatId, TransportMessageOuterClass.EnumMsgType type, TransportMessageOuterClass.TransportMessage vo, Message req) throws Exception {
+
+ try {
+ log.info(LocalDateTime.now() + " msgSend2Phone 对应的线程名: " + Thread.currentThread().getName());
+
+ if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.TalkToFriendTask.getNumber()) {
+ // 客服发送消息给客户
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.TriggerConversationPushTask.getNumber()) {
+ // 会话列表
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.TriggerHistoryMsgPushTask.getNumber()) {
+ // 历史聊天记录
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.RequestTalkDetailTask.getNumber()) {
+ // 同步视频、图片、音频资源
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.ModifyFriendMemoTask.getNumber()) {
+ // 备注、电话
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.ContactLabelTask.getNumber()) {
+ // 打标签
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.TriggerFriendPushTask.getNumber()) {
+ // 通讯录
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.TriggerChatroomPushTask.getNumber()) {
+ // 群聊通讯录
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.TriggerMessageReadTask.getNumber()) {
+ // 消息已读
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.RequestChatRoomInfoTask.getNumber()) {
+ // 请求具体群聊的详细信息
+ } else if (type.getNumber() == TransportMessageOuterClass.EnumMsgType.RequestContactsInfoTask.getNumber()) {
+ // 获取联系人详细信息(不一定是好友,如群聊成员)
+ }
+
+ } catch (Throwable e) {
+ log.error("error --> ", e);;
+ }
+ }
+
+}
diff --git a/src/main/java/com/eco/websocket/netty/common/Constant.java b/src/main/java/com/eco/websocket/netty/common/Constant.java
new file mode 100644
index 0000000..7d3648d
--- /dev/null
+++ b/src/main/java/com/eco/websocket/netty/common/Constant.java
@@ -0,0 +1,16 @@
+package com.eco.websocket.netty.common;
+
+public class Constant {
+
+ public static final String ERROR_MSG_ILLEGALDEVICE = "非法设备";
+ public static final String ERROR_MSG_DECODFAIL = "服务器繁忙";
+ public static final String ERROR_MSG_VERIFYWAY = "认证方式不支持";
+ public static final String ERROR_MSG_LOGINFAIL = "账号密码错误";
+ public static final String ERROR_MSG_NOT_ACCOUNT = "账号不存在或账号密码错误";
+ public static final String ERROR_MSG_NOTONLINE = "对方不在线";
+ public static final String ERROR_MSG_PARAMERROR = "参数传入错误";
+ public static final String ERROR_MSG_LOGINNORIGHT = "登录账号无权限";
+ public static final String ERROR_MSG_ELSEWHERELOGINN = "账号已在别处登录";
+ public static final String ERROR_MSG_DATABASENOTBIND = "数据库未绑定";
+
+}
diff --git a/src/main/java/com/eco/websocket/netty/common/listener/HttpEventListener.java b/src/main/java/com/eco/websocket/netty/common/listener/HttpEventListener.java
new file mode 100644
index 0000000..09a9ae3
--- /dev/null
+++ b/src/main/java/com/eco/websocket/netty/common/listener/HttpEventListener.java
@@ -0,0 +1,33 @@
+package com.eco.websocket.netty.common.listener;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.web.context.WebServerInitializedEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author qp
+ * @date 2023/5/31 12:03
+ */
+@Slf4j
+@Component
+public class HttpEventListener implements ApplicationListener {
+ private volatile boolean isServerStarted = false;
+
+ public boolean isServerStarted() {
+ return isServerStarted;
+ }
+
+ @Override
+ public void onApplicationEvent(@NonNull WebServerInitializedEvent event) {
+ if (event.getWebServer() == null) {
+ return;
+ }
+ int port = event.getWebServer().getPort();
+ if (port > 0) {
+ log.info("http server started, port:{}", port);
+ isServerStarted = true;
+ }
+ }
+}
diff --git a/src/main/java/com/eco/websocket/netty/decoder/SelfDecoder.java b/src/main/java/com/eco/websocket/netty/decoder/SelfDecoder.java
new file mode 100644
index 0000000..6e2c829
--- /dev/null
+++ b/src/main/java/com/eco/websocket/netty/decoder/SelfDecoder.java
@@ -0,0 +1,62 @@
+package com.eco.websocket.netty.decoder;
+
+import Jubo.JuLiao.IM.Wx.Proto.TransportMessageOuterClass;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+public class SelfDecoder extends ByteToMessageDecoder {
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List