diff --git a/hzims-service-api/alarm-api/src/main/java/com/hnac/hzims/business/ws/alart/vo/AlartParamVo.java b/hzims-service-api/alarm-api/src/main/java/com/hnac/hzims/business/ws/alart/vo/AlartParamVo.java index 06123de..4c5a13b 100644 --- a/hzims-service-api/alarm-api/src/main/java/com/hnac/hzims/business/ws/alart/vo/AlartParamVo.java +++ b/hzims-service-api/alarm-api/src/main/java/com/hnac/hzims/business/ws/alart/vo/AlartParamVo.java @@ -10,12 +10,9 @@ import java.util.List; * @author ysj */ @Data -@ApiModel(value = "告警弹框参数", description = "") +@ApiModel(value = "告警websocket参数", description = "") public class AlartParamVo { - @ApiModelProperty("站点类型: 2-云服务 1-代运维") - private Integer serveType; - - @ApiModelProperty("用户权限机构集合") - private List depts; + @ApiModelProperty("站点集合") + private List codes; } \ No newline at end of file diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmListener.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmListener.java index e8393f4..b7a2e08 100644 --- a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmListener.java +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmListener.java @@ -12,6 +12,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; + /** * @author ysj */ @@ -21,6 +24,9 @@ import org.springframework.stereotype.Service; public class AlarmListener implements IQueueConsume { @Autowired + private ThreadPoolExecutor pool; + + @Autowired private MessageService messageService; @Autowired @@ -36,22 +42,30 @@ public class AlarmListener implements IQueueConsume { // 步骤2.websocket消息推送 if(ObjectUtil.isNotEmpty(alarm.getIsRightTabulation()) && alarm.getIsRightTabulation() == 0){ - + CompletableFuture.runAsync(() -> { + messageService.webRightMessage(alarm); + },pool); } // 步骤3.WEB/APP消息推送 if(ObjectUtil.isNotEmpty(alarm.getIsPlatformMessage()) && alarm.getIsPlatformMessage() == 0){ - messageService.webAppMessage(alarm); + CompletableFuture.runAsync(() -> { + messageService.webAppMessage(alarm); + },pool); } // 步骤4.短信推送 - if(ObjectUtil.isNotEmpty(alarm.getIsShortMessage()) && alarm.getIsShortMessage() == 0){ - messageService.shortMessage(alarm); + if (ObjectUtil.isNotEmpty(alarm.getIsShortMessage()) && alarm.getIsShortMessage() == 0) { + CompletableFuture.runAsync(() -> { + messageService.shortMessage(alarm); + },pool); } // 步骤5.微信公众号发送 - if(ObjectUtil.isNotEmpty(alarm.getIsWxMessage()) && alarm.getIsWxMessage() == 0){ - messageService.weChatMessage(alarm); + if (ObjectUtil.isNotEmpty(alarm.getIsWxMessage()) && alarm.getIsWxMessage() == 0) { + CompletableFuture.runAsync(() -> { + messageService.weChatMessage(alarm); + },pool); } } } \ No newline at end of file diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmPoolManager.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmPoolManager.java new file mode 100644 index 0000000..3fd1e35 --- /dev/null +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/monitor/listener/AlarmPoolManager.java @@ -0,0 +1,34 @@ + +package com.hnac.hzims.alarm.monitor.listener; + +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; + +/** + * @author: ysj + */ +@Component +public class AlarmPoolManager { + + @Bean + public ThreadPoolExecutor threadPoolExecutor() { + // 核心线程数 + int corePoolSize = 8; + // 最大线程数 + int maximumPoolSize = 16; + // 线程空闲时的存活时间 + long keepAliveTime = 60L; + // 时间单位 + TimeUnit unit = TimeUnit.SECONDS; + // 任务队列 + BlockingQueue workQueue = new LinkedBlockingQueue<>(100); + // 线程工厂 + ThreadFactory threadFactory = Executors.defaultThreadFactory(); + // 等待策略 + RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); + return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler); + } + +} diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/AlarmService.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/AlarmService.java index b114c17..2d142a7 100644 --- a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/AlarmService.java +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/AlarmService.java @@ -22,7 +22,7 @@ public interface AlarmService extends IService { List broadcast(String startTime, String endTime,Integer serveType); - TextMessage majorAlarm(AlartParamVo param); + TextMessage majorAlarm(List stations); List alarmVideos(String stationCode, String deviceCode, String realId,Integer alarmType); } \ No newline at end of file diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/impl/AlarmServiceImpl.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/impl/AlarmServiceImpl.java index be038f1..e4c4169 100644 --- a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/impl/AlarmServiceImpl.java +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/show/service/impl/AlarmServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.commons.collections4.MapUtils; import org.springblade.core.log.exception.ServiceException; import org.springblade.core.tool.api.R; import org.springblade.core.tool.utils.CollectionUtil; +import org.springblade.core.tool.utils.DateUtil; import org.springblade.core.tool.utils.ObjectUtil; import org.springblade.core.tool.utils.StringUtil; import org.springblade.message.fegin.IMessageClient; @@ -138,6 +139,18 @@ public class AlarmServiceImpl extends ServiceImpl impl } /** + * 获取权限站点 + * @return + */ + private List stationsByCodes(List stations) { + R> result = stationClient.querySatationByCodes(stations); + if(!result.isSuccess() || CollectionUtil.isEmpty(result.getData())){ + return new ArrayList<>(); + } + return result.getData(); + } + + /** * 查询告警数量 * @return */ @@ -219,26 +232,25 @@ public class AlarmServiceImpl extends ServiceImpl impl /** * 弹框告警 - * @param param + * @param codes * @return */ @Override - public TextMessage majorAlarm(AlartParamVo param) { + public TextMessage majorAlarm(List codes) { // 站点查询 - List stations = this.stations(param.getServeType()); - if(CollectionUtil.isEmpty(stations) || CollectionUtil.isEmpty(param.getDepts())){ - return new TextMessage(new ArrayList().toString()); - } - List effectives = stations.stream().filter(station->param.getDepts().contains(station.getRefDept())).map(StationEntity::getCode).collect(Collectors.toList()); - if(CollectionUtil.isEmpty(effectives)){ + List stations = this.stationsByCodes(codes); + if(CollectionUtil.isEmpty(stations)){ return new TextMessage(new ArrayList().toString()); } // 条件过滤 QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.lambda().in(AlarmEntity::getStationId,effectives); - queryWrapper.lambda().in(AlarmEntity::getAlarmType, Arrays.asList(AlarmConstants.FAULT,AlarmConstants.EARLY)); - queryWrapper.lambda().eq(AlarmEntity::getIsShowAlert,0); - queryWrapper.lambda().eq(AlarmEntity::getStatus,0); + queryWrapper.lambda().in(AlarmEntity::getStationId,stations.stream().map(StationEntity::getCode).collect(Collectors.toList())); + queryWrapper.lambda().in(AlarmEntity::getAlarmType, Arrays.asList(AlarmConstants.WARNING,AlarmConstants.FAULT,AlarmConstants.OFFSIDE,AlarmConstants.INTERRUPT,AlarmConstants.ABNORMAL,AlarmConstants.EARLY,AlarmConstants.START,AlarmConstants.STOP)); + queryWrapper.lambda().eq(AlarmEntity::getIsRightTabulation,0); + queryWrapper.lambda().ge(AlarmEntity::getAlarmTime, DateUtil.format(new Date(),DateUtil.PATTERN_DATE) + " 00:00:00"); + queryWrapper.lambda().le(AlarmEntity::getAlarmTime,DateUtil.format(new Date(),DateUtil.PATTERN_DATETIME)); + queryWrapper.lambda().orderByDesc(AlarmEntity::getAlarmTime); + //queryWrapper.lambda().eq(AlarmEntity::getStatus,0); // 处理告警过滤 :当天处理告警 List handles = alarmHandleService.handles(); if(!CollectionUtil.isEmpty(handles)){ @@ -248,7 +260,7 @@ public class AlarmServiceImpl extends ServiceImpl impl if(CollectionUtil.isEmpty(alarms)){ return new TextMessage(new ArrayList().toString()); } - return new TextMessage(JSONObject.toJSONString(alarms.stream().sorted(Comparator.comparing(AlarmEntity::getAlarmTime).reversed()).collect(Collectors.toList()))); + return new TextMessage(JSONObject.toJSONString(alarms.stream().skip(0).limit(1000).collect(Collectors.toList()))); } /** diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/MessageService.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/MessageService.java index 71b8661..87b2426 100644 --- a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/MessageService.java +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/MessageService.java @@ -7,6 +7,8 @@ import com.hnac.hzims.alarm.config.entity.AlarmEntity; */ public interface MessageService { + void webRightMessage(AlarmEntity alarm); + void webAppMessage(AlarmEntity entity); void shortMessage(AlarmEntity entity); diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/impl/MessageServiceImpl.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/impl/MessageServiceImpl.java index 21a8d5d..f881882 100644 --- a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/impl/MessageServiceImpl.java +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/source/service/impl/MessageServiceImpl.java @@ -1,8 +1,10 @@ package com.hnac.hzims.alarm.source.service.impl; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.hnac.hzims.alarm.config.entity.AlarmEntity; import com.hnac.hzims.alarm.source.service.MessageService; +import com.hnac.hzims.alarm.ws.alart.AlarmSessionManager; import com.hnac.hzims.common.constant.CommonConstant; import com.hnac.hzims.operational.station.entity.StationEntity; import com.hnac.hzims.operational.station.feign.IStationClient; @@ -17,13 +19,16 @@ import org.springblade.message.dto.SmsImmediatelyPushDTO; import org.springblade.message.dto.WxMessageDTO; import org.springblade.message.fegin.IMessageClient; import org.springblade.system.cache.DictCache; +import org.springblade.system.feign.IDeptClient; import org.springblade.system.feign.ISysClient; import org.springblade.system.user.entity.User; import org.springblade.system.user.entity.UserInfo; import org.springblade.system.user.feign.IUserClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.web.socket.TextMessage; +import java.io.IOException; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -44,6 +49,8 @@ public class MessageServiceImpl implements MessageService { private final IUserClient userClient; + private final IDeptClient deptClient; + private final IMessageClient messageClient; private final IStationClient stationClient; @@ -56,6 +63,34 @@ public class MessageServiceImpl implements MessageService { private String wxPushTemplate; @Value("${hzims.wxPush.enabled}") private Boolean wxPushEnabled; + + /** + * web告警展示消息推送 + * @param alarm + */ + @Override + public void webRightMessage(AlarmEntity alarm) { + if(CollectionUtil.isEmpty(AlarmSessionManager.SESSION_POOL)){ + return; + } + // 遍历websocket链接,拥有告警站点权限进行推送消息 + AlarmSessionManager.SESSION_POOL.forEach((key, session) -> { + if(!session.getAttributes().containsKey("codes")){ + return; + } + List codes = (List) session.getAttributes().get("codes"); + if(CollectionUtil.isEmpty(codes) || !codes.contains(alarm.getStationId())){ + return; + } + try { + session.sendMessage(new TextMessage(JSONObject.toJSONString(Collections.singletonList(alarm)))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + /** * web/app消息推送 * @param entity : 告警对象 @@ -161,8 +196,6 @@ public class MessageServiceImpl implements MessageService { return; } userList.addAll(result.getData()); - log.info("短信发送用户列表:{}",userList); - log.info("短信发送用户列表,{}",userList); }else { //内测只推给对应的3个用户 List phoneList = Arrays.asList("18351807087", "18163793336", "18285121497"); @@ -172,9 +205,9 @@ public class MessageServiceImpl implements MessageService { userList.add(userByPhone.getData().getUser()); } } - log.info("短信发送用户列表:{}",userList); - log.info("短信发送用户列表,{}",userList); } + log.info("短信发送用户列表:{}",userList); + log.info("短信发送用户列表,{}",userList); String userIds = userList.stream().map(o -> String.valueOf(o.getId())).distinct().collect(Collectors.joining(",")); WxMessageDTO message = new WxMessageDTO(); // 模板Id @@ -191,8 +224,8 @@ public class MessageServiceImpl implements MessageService { message.setTaskId(entity.getId()); // 微信参数键值对 HashMap map = new HashMap<>(); - map.put("thing18",getTruncateString(entity.getStationName(), 20)); - map.put("thing11",getTruncateString(entity.getAlarmContext(), 20)); + map.put("thing18",getTruncateString(entity.getStationName())); + map.put("thing11",getTruncateString(entity.getAlarmContext())); map.put("time2",entity.getAlarmTime()); map.put("thing14", DictCache.getValue("alarm_type", String.valueOf(entity.getAlarmType()))); DateTimeFormatter format = DateTimeFormatter.ofPattern(DateUtil.PATTERN_DATETIME); @@ -211,11 +244,12 @@ public class MessageServiceImpl implements MessageService { message.setTenantId(station.getData().getTenantId()); messageClient.sendWxMessage(message); } - private String getTruncateString(String ruleDefName, int maxSize) { - if (ruleDefName.length()<= maxSize){ + + private String getTruncateString(String ruleDefName) { + if (ruleDefName.length()<= 20){ return ruleDefName; }else { - return ruleDefName.substring(0, maxSize - 3) + "..."; + return ruleDefName.substring(0, 20 - 3) + "..."; } } } \ No newline at end of file diff --git a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/ws/alart/AlarmHandler.java b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/ws/alart/AlarmHandler.java index 668a4b8..34d6196 100644 --- a/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/ws/alart/AlarmHandler.java +++ b/hzims-service/hzims-alarm/src/main/java/com/hnac/hzims/alarm/ws/alart/AlarmHandler.java @@ -4,11 +4,9 @@ import com.alibaba.fastjson.JSONObject; import com.hnac.hzims.alarm.show.service.AlarmService; import com.hnac.hzims.business.ws.alart.vo.AlartParamVo; import lombok.extern.slf4j.Slf4j; +import org.springblade.core.tool.utils.CollectionUtil; import org.springblade.core.tool.utils.ObjectUtil; import org.springblade.core.tool.utils.StringUtil; -import org.springblade.system.feign.IDeptClient; -import org.springblade.system.feign.ISysClient; -import org.springblade.system.user.feign.IUserClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -16,6 +14,8 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * @author ysj @@ -56,11 +56,13 @@ public class AlarmHandler extends TextWebSocketHandler { if(StringUtil.isBlank(message)){ return; } - AlartParamVo param = JSONObject.parseObject(message,AlartParamVo.class); - if(ObjectUtil.isEmpty(param)){ + AlartParamVo param = JSONObject.parseObject(textMessage.getPayload(), AlartParamVo.class); + if(ObjectUtil.isEmpty(param) || CollectionUtil.isEmpty(param.getCodes())){ return; } - TextMessage sendMessage = alarmService.majorAlarm(param); + session.getAttributes().put("codes", param.getCodes()); + AlarmSessionManager.SESSION_POOL.put(userId,session); + TextMessage sendMessage = alarmService.majorAlarm(param.getCodes()); session.sendMessage(sendMessage); } }