diff --git a/manage/ad-platform-task/src/main/java/com/baiye/api/WebSocketController.java b/manage/ad-platform-task/src/main/java/com/baiye/api/WebSocketController.java new file mode 100644 index 00000000..9fe67350 --- /dev/null +++ b/manage/ad-platform-task/src/main/java/com/baiye/api/WebSocketController.java @@ -0,0 +1,33 @@ +package com.baiye.api; + +import com.baiye.http.CommonResponse; +import com.baiye.model.dto.SendWebSocketDTO; +import com.baiye.service.WebSocketService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * @author wujingtao + * @date 2021/12/30 + */ +@RestController +@RequestMapping("/api") +@Slf4j +@Api(tags = "websocket消息") +public class WebSocketController { + @Resource + private WebSocketService webSocketService; + + @PostMapping("/websocket/message") + @ApiOperation("发送消息") + public CommonResponse sendWebSocket(@RequestBody SendWebSocketDTO sendWebSocketDTO) { + return webSocketService.sendWebSocket(sendWebSocketDTO); + } +} diff --git a/manage/ad-platform-task/src/main/java/com/baiye/job/WebSocketHeartbeat.java b/manage/ad-platform-task/src/main/java/com/baiye/job/WebSocketHeartbeat.java index 16a53794..3d64e9b7 100644 --- a/manage/ad-platform-task/src/main/java/com/baiye/job/WebSocketHeartbeat.java +++ b/manage/ad-platform-task/src/main/java/com/baiye/job/WebSocketHeartbeat.java @@ -1,6 +1,5 @@ package com.baiye.job; -import cn.hutool.core.date.DateUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baiye.constant.DefaultNumberConstants; @@ -35,7 +34,7 @@ public class WebSocketHeartbeat implements SimpleJob { public void execute(ShardingContext shardingContext) { JSONObject jsonObject = new JSONObject(); jsonObject.putOpt("message", "心跳检查"); - jsonObject.putOpt("code", 1); + jsonObject.putOpt("code", DefaultNumberConstants.ONE_NUMBER); WebSocketHeartbeat.webSocketServer.broadCastInfo(JSONUtil.toJsonStr(jsonObject)); } } diff --git a/manage/ad-platform-task/src/main/java/com/baiye/service/WebSocketService.java b/manage/ad-platform-task/src/main/java/com/baiye/service/WebSocketService.java new file mode 100644 index 00000000..135a2f69 --- /dev/null +++ b/manage/ad-platform-task/src/main/java/com/baiye/service/WebSocketService.java @@ -0,0 +1,18 @@ +package com.baiye.service; + +import com.baiye.http.CommonResponse; +import com.baiye.model.dto.SendWebSocketDTO; + +/** + * @author wujingtao + * @date 2021/12/30 + */ +public interface WebSocketService { + /** + * 发送消息 + * + * @param sendWebSocketDTO + * @return + */ + CommonResponse sendWebSocket(SendWebSocketDTO sendWebSocketDTO); +} diff --git a/manage/ad-platform-task/src/main/java/com/baiye/service/impl/WebSocketServiceImpl.java b/manage/ad-platform-task/src/main/java/com/baiye/service/impl/WebSocketServiceImpl.java new file mode 100644 index 00000000..a11fe894 --- /dev/null +++ b/manage/ad-platform-task/src/main/java/com/baiye/service/impl/WebSocketServiceImpl.java @@ -0,0 +1,41 @@ +package com.baiye.service.impl; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.json.JSONUtil; +import com.baiye.constant.DefaultNumberConstants; +import com.baiye.http.CommonResponse; +import com.baiye.model.dto.SendWebSocketDTO; +import com.baiye.service.WebSocketService; +import com.baiye.socket.WebSocketServer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author wujingtao + * @date 2021/12/30 + */ +@Service +@Slf4j +public class WebSocketServiceImpl implements WebSocketService { + @Resource + private WebSocketServer webSocketServer; + + @Override + public CommonResponse sendWebSocket(SendWebSocketDTO sendWebSocketDTO) { + try { + if (CollUtil.isEmpty(sendWebSocketDTO.getUserIds())) { + SendWebSocketDTO.SendMessage data = sendWebSocketDTO.getData(); + data.setCode(DefaultNumberConstants.TWO_HUNDRED); + webSocketServer.broadCastInfo(JSONUtil.toJsonStr(data)); + } else { + webSocketServer.sendMessage(sendWebSocketDTO.getData(), sendWebSocketDTO.getUserIds()); + } + } catch (Exception e) { + log.error("Method 【sendWebSocket】发送 websocket 错误 :{}", e.getMessage()); + return CommonResponse.createByErrorMessage(e.getMessage()); + } + return CommonResponse.createBySuccess(); + } +} diff --git a/manage/ad-platform-task/src/main/java/com/baiye/socket/WebSocketServer.java b/manage/ad-platform-task/src/main/java/com/baiye/socket/WebSocketServer.java index edded82b..b2422508 100644 --- a/manage/ad-platform-task/src/main/java/com/baiye/socket/WebSocketServer.java +++ b/manage/ad-platform-task/src/main/java/com/baiye/socket/WebSocketServer.java @@ -4,6 +4,7 @@ import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baiye.constant.DefaultNumberConstants; import com.baiye.entity.vo.ReportMessageInfoVO; +import com.baiye.model.dto.SendWebSocketDTO; import com.baiye.service.impl.ReportServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -14,6 +15,7 @@ import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; @@ -82,8 +84,10 @@ public class WebSocketServer { public void onMessage(String message, Session session) { log.info("来自客户端的消息:{}", message); JSONObject jsonObject = JSONUtil.parseObj(message); - - if ("adminStatistics".equals(jsonObject.getStr("type"))) { + //todo 后面建常量类 + String adminStatistics = "adminStatistics"; + String type = "type"; + if (adminStatistics.equals(jsonObject.getStr(type))) { webSocketServer.reportServiceImpl.reportHour(); } @@ -166,4 +170,15 @@ public class WebSocketServer { } } + public void sendMessage(SendWebSocketDTO.SendMessage data, List ids) throws IOException { + for (long id : ids) { + if (SESSIONS.containsKey(String.valueOf(id))) { + Session session = SESSIONS.get(String.valueOf(id)); + data.setCode(DefaultNumberConstants.TWO_HUNDRED); + session.getBasicRemote().sendText(JSONUtil.toJsonStr(data)); + } else { + log.warn("没有找到你指定ID的会话:{}", id); + } + } + } }