Browse Source

#告警websocket

zhongwei
yang_shj 2 years ago
parent
commit
54f6ea559c
  1. 8
      hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/constants/AbnormalAlarmConstant.java
  2. 6
      hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/entity/HistoryAbnormalAlarmEntity.java
  3. 2
      hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/vo/AlarmDataVo.java
  4. 36
      hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/vo/AlarmMergeVo.java
  5. 15
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/AlarmMergeService.java
  6. 10
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/LevelAlarmService.java
  7. 1
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/AlertDefectServiceImpl.java
  8. 106
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/AlertMerageServiceImpl.java
  9. 1
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/HistoryAbnormalAlarmServiceImpl.java
  10. 18
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/LevelAlarmServiceImpl.java
  11. 21
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/config/WebSocketConfig.java
  12. 79
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/AlarmHandler.java
  13. 58
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/AlarmSessionManager.java
  14. 3
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/SessionManager.java
  15. 2
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/SocketPool.java

8
hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/constants/AbnormalAlarmConstant.java

@ -13,6 +13,13 @@ public interface AbnormalAlarmConstant {
/**通讯恢复*/ /**通讯恢复*/
String ABNORMAL_STATUS = "1"; String ABNORMAL_STATUS = "1";
/**事故**/
String FAULT = "2";
String WEBSOCKET_FAULT = "0";
List<String> ALARM_WARN_TYPES = Arrays.asList("0","1","2");
String[] TYPE_NAMES = {"默认", "系统", "告警", "故障", "用户操作", "遥测越限", "遥信变位", "注册信息", "信息提示", "设备巡检", "遥控操作", "遥测越限恢复","未定义","通讯中断","数据异常"}; String[] TYPE_NAMES = {"默认", "系统", "告警", "故障", "用户操作", "遥测越限", "遥信变位", "注册信息", "信息提示", "设备巡检", "遥控操作", "遥测越限恢复","未定义","通讯中断","数据异常"};
/**处理、延后、误报*/ /**处理、延后、误报*/
@ -22,4 +29,5 @@ public interface AbnormalAlarmConstant {
* 2-告警,3-故障,5-遥测越限,13-通讯异常,14-数据异常 * 2-告警,3-故障,5-遥测越限,13-通讯异常,14-数据异常
*/ */
List<Integer> SOE_TYPE_LIST = Arrays.asList(2,3,5,13,14); List<Integer> SOE_TYPE_LIST = Arrays.asList(2,3,5,13,14);
} }

6
hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/entity/HistoryAbnormalAlarmEntity.java

@ -9,6 +9,9 @@ import org.springblade.core.mp.base.BaseEntity;
import java.util.Date; import java.util.Date;
/**
* @author ysj
*/
@Data @Data
@TableName("hzims_history_abnormal_alarm") @TableName("hzims_history_abnormal_alarm")
@EqualsAndHashCode(callSuper = false) @EqualsAndHashCode(callSuper = false)
@ -24,6 +27,9 @@ public class HistoryAbnormalAlarmEntity extends BaseEntity {
@ApiModelProperty("站点名称") @ApiModelProperty("站点名称")
private String stationName; private String stationName;
@ApiModelProperty("告警编码")
private Long alarmId;
@ApiModelProperty("检查点") @ApiModelProperty("检查点")
private String realId; private String realId;

2
hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/vo/AlarmDataVo.java

@ -3,12 +3,14 @@ package com.hnac.hzims.operational.alert.vo;
import com.hnac.hzinfo.datasearch.soe.domian.SoeData; import com.hnac.hzinfo.datasearch.soe.domian.SoeData;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
/** /**
* @author ysj * @author ysj
* @date 2023/03/23 14:06:27 * @date 2023/03/23 14:06:27
* @version 4.0.0 * @version 4.0.0
*/ */
@EqualsAndHashCode(callSuper = true)
@Data @Data
public class AlarmDataVo extends SoeData { public class AlarmDataVo extends SoeData {

36
hzims-service-api/hzims-operational-api/src/main/java/com/hnac/hzims/operational/alert/vo/AlarmMergeVo.java

@ -0,0 +1,36 @@
package com.hnac.hzims.operational.alert.vo;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author ysj
* @date 2023/03/17 10:02:33
* @version 4.0.0
*/
@Data
public class AlarmMergeVo{
@ApiModelProperty("站点编号")
private String stationCode;
@ApiModelProperty("站点名称")
private String stationName;
@ApiModelProperty("检测点位")
private String realId;
@ApiModelProperty("告警编号")
private String alarmCode;
@ApiModelProperty("告警类型: 0 - 事故 、 1 - 一级告警、 2 - 二级告警")
private String type;
@ApiModelProperty("告警内容: 0 - 事故 、 1 - 一级告警、 2 - 二级告警")
private String content;
@ApiModelProperty("告警时间")
private Date date;
}

15
hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/AlarmMergeService.java

@ -0,0 +1,15 @@
package com.hnac.hzims.operational.alert.service;
import com.hnac.hzims.operational.config.vo.MessageParamVo;
import org.springframework.web.socket.TextMessage;
/**
* @author ysj
* @date 2023/03/09 09:19:13
* @version 4.0.0
*/
public interface AlarmMergeService {
// 获取发送消息
TextMessage getSendMessage(MessageParamVo param);
}

10
hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/LevelAlarmService.java

@ -0,0 +1,10 @@
package com.hnac.hzims.operational.alert.service;
/**
* 告警处理接口
* @author ysj
*/
public interface LevelAlarmService {
}

1
hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/AlertDefectServiceImpl.java

@ -27,6 +27,7 @@ import java.util.stream.Collectors;
/** /**
* 告警缺陷处理实现类 * 告警缺陷处理实现类
* @author ysj
*/ */
@Slf4j @Slf4j
@Service @Service

106
hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/AlertMerageServiceImpl.java

@ -0,0 +1,106 @@
package com.hnac.hzims.operational.alert.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.hnac.hzims.operational.alert.constants.AbnormalAlarmConstant;
import com.hnac.hzims.operational.alert.entity.AlarmHandleEntity;
import com.hnac.hzims.operational.alert.entity.HistoryAbnormalAlarmEntity;
import com.hnac.hzims.operational.alert.service.AlarmHandleService;
import com.hnac.hzims.operational.alert.service.AlarmMergeService;
import com.hnac.hzims.operational.alert.service.HistoryAbnormalAlarmService;
import com.hnac.hzims.operational.alert.service.LevelAlarmService;
import com.hnac.hzims.operational.alert.vo.AlarmMergeVo;
import com.hnac.hzims.operational.config.vo.MessageParamVo;
import com.hnac.hzims.operational.main.constant.HomePageConstant;
import com.hnac.hzims.operational.station.entity.StationEntity;
import com.hnac.hzims.operational.station.service.IStationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.utils.CollectionUtil;
import org.springblade.core.tool.utils.DateUtil;
import org.springblade.core.tool.utils.ObjectUtil;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.TextMessage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
/**
* 告警合并处理实现类
* @author ysj
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AlertMerageServiceImpl implements AlarmMergeService {
private final IStationService stationService;
private final LevelAlarmService levelAlarmService;
private final AlarmHandleService alarmHandleService;
private final HistoryAbnormalAlarmService faultAlarmService;
/**
* 根据用户获取hz3000事故告警平台一级告警二级告警
* @param param
* @return
*/
@Override
public TextMessage getSendMessage(MessageParamVo param) {
// 查询用户
if(ObjectUtil.isEmpty(param)){
return null;
}
List<Long> depts = param.getDeptIds();
if(CollectionUtil.isEmpty(depts)){
return null;
}
// 查询站点
List<StationEntity> stations = stationService.list(Wrappers.<StationEntity>lambdaQuery()
.in(StationEntity::getRefDept,depts)
.eq(StationEntity::getServeType, HomePageConstant.HYDROPOWER_SERVETYPE)
);
if(CollectionUtil.isEmpty(stations)){
return null;
}
String start = DateUtil.format(new Date(),DateUtil.PATTERN_DATE) + " 00:00:00";
String end = DateUtil.format(new Date(),DateUtil.PATTERN_DATE) + " 23:59:59";
// hz3000事故告警
List<HistoryAbnormalAlarmEntity> faults = faultAlarmService.list(Wrappers.<HistoryAbnormalAlarmEntity>lambdaQuery()
.in(HistoryAbnormalAlarmEntity::getStationId,stations.stream().map(StationEntity::getCode).collect(Collectors.toList()))
.eq(HistoryAbnormalAlarmEntity::getType, AbnormalAlarmConstant.FAULT)
.between(HistoryAbnormalAlarmEntity::getStartTime,start,end)
);
// 处理告警记录
List<AlarmHandleEntity> handles = alarmHandleService.list(Wrappers.<AlarmHandleEntity>lambdaQuery()
.in(AlarmHandleEntity::getStationCode,stations.stream().map(StationEntity::getCode).collect(Collectors.toList()))
.in(AlarmHandleEntity::getAlarmType, AbnormalAlarmConstant.ALARM_WARN_TYPES)
.between(AlarmHandleEntity::getCreateTime,start,end)
);
List<AlarmMergeVo> alarms = new ArrayList<>();
if(CollectionUtil.isNotEmpty(faults)){
alarms.addAll(faults.stream().filter(o -> CollectionUtil.isEmpty(handles)
|| handles.stream().map(AlarmHandleEntity::getAlarmId).collect(Collectors.toList()).contains(o.getAlarmId())).
map(fault->{
AlarmMergeVo alarm = new AlarmMergeVo();
alarm.setStationCode(fault.getStationId());
alarm.setStationName(fault.getStationName());
alarm.setContent(fault.getSoeExplain());
alarm.setDate(fault.getStartTime());
alarm.setRealId(fault.getRealId());
alarm.setType(AbnormalAlarmConstant.WEBSOCKET_FAULT);
return alarm;
}).collect(Collectors.toList()));
}
// hz3000事故告警
// faultAlarmService.list();
return new TextMessage(JSONObject.toJSONString(alarms.stream().sorted(Comparator.comparing(AlarmMergeVo::getDate)).collect(Collectors.toList())));
}
}

1
hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/HistoryAbnormalAlarmServiceImpl.java

@ -49,6 +49,7 @@ import java.util.stream.Collectors;
/** /**
* 历史告警实现类 * 历史告警实现类
* @author ysj
*/ */
@Slf4j @Slf4j
@Service @Service

18
hzims-service/operational/src/main/java/com/hnac/hzims/operational/alert/service/impl/LevelAlarmServiceImpl.java

@ -0,0 +1,18 @@
package com.hnac.hzims.operational.alert.service.impl;
import com.hnac.hzims.operational.alert.service.LevelAlarmService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 等级告警实现类
* @author ysj
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class LevelAlarmServiceImpl implements LevelAlarmService {
}

21
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/config/WebSocketConfig.java

@ -1,10 +1,8 @@
package com.hnac.hzims.operational.config.config; package com.hnac.hzims.operational.config.config;
import com.hnac.hzims.operational.config.ws.AlarmHandler;
import com.hnac.hzims.operational.config.ws.MessageHandler; import com.hnac.hzims.operational.config.ws.MessageHandler;
import com.hnac.hzims.operational.vRView.service.IHzImsRealDataService;
import com.hnac.hzims.operational.vRView.ws.HzWebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
@ -21,21 +19,24 @@ import org.springframework.web.socket.server.standard.ServerEndpointExporter;
public class WebSocketConfig implements WebSocketConfigurer{ public class WebSocketConfig implements WebSocketConfigurer{
@Override @Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/comprehensiveD/{uid}").setAllowedOrigins("*"); // 集中监控处理器
registry.addHandler(monitorHandler(), "/comprehensiveD/{uid}").setAllowedOrigins("*");
// 告警处理器
registry.addHandler(alarmHandler(), "/alarmHandler/{uid}").setAllowedOrigins("*");
} }
@Bean @Bean
public WebSocketHandler myHandler() { public WebSocketHandler monitorHandler() {
return new MessageHandler(); return new MessageHandler();
} }
@Bean @Bean
public ServerEndpointExporter serverEndpointExporter() { public WebSocketHandler alarmHandler() {
return new ServerEndpointExporter(); return new AlarmHandler();
} }
@Autowired @Bean
public void hzImsRealDataService(IHzImsRealDataService realDataService) { public ServerEndpointExporter serverEndpointExporter() {
HzWebSocketServer.hzImsRealDataService = realDataService; return new ServerEndpointExporter();
} }
} }

79
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/AlarmHandler.java

@ -0,0 +1,79 @@
package com.hnac.hzims.operational.config.ws;
import com.alibaba.fastjson.JSONObject;
import com.hnac.hzims.operational.alert.service.AlarmHandleService;
import com.hnac.hzims.operational.alert.service.AlarmMergeService;
import com.hnac.hzims.operational.config.vo.MessageParamVo;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.utils.ObjectUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
/**
* @author ysj
*/
@Slf4j
public class AlarmHandler extends TextWebSocketHandler {
private final Long defaultRealDataRefreshTime = 20000L;
@Autowired
private AlarmMergeService alarmMergeService;
//WebSocket连接建立成功之后调用
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String[] split = session.getUri().toString().split("/");
String uid = split[split.length - 1];
// session 参数设置用户进行标识
session.getAttributes().put("userId", uid);
AlarmSessionManager.add(uid, session);
SocketPool.alarm_pool.put(session.getId(), this);
}
// 连接关闭时调用
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
// 移除会话
AlarmSessionManager.removeAndClose(session.getId());
// map移除用户
SocketPool.alarm_pool.remove(session.getId());
}
// 消息传输错误触发
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
// 移除会话
SessionManager.removeAndClose(session.getId());
// map移除用户
SocketPool.pool.remove(session.getId());
}
// 接收消息触发
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws IOException {
// 获取会话用户编号
String userId = (String) session.getAttributes().get("userId");
String message = textMessage.getPayload();
if(StringUtil.isBlank(message)){
return;
}
MessageParamVo depts = JSONObject.parseObject(message, MessageParamVo.class);
if(ObjectUtil.isEmpty(depts)){
return;
}
TextMessage sendMessage = alarmMergeService.getSendMessage(depts);
if(ObjectUtil.isEmpty(sendMessage)){
return;
}
session.sendMessage(sendMessage);
}
}

58
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/AlarmSessionManager.java

@ -0,0 +1,58 @@
package com.hnac.hzims.operational.config.ws;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Lch
*/
@Slf4j
public class AlarmSessionManager {
public static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
/**
* 添加会话
* @param uid 用户
* @param session 会话对象
*/
public static void add(String uid, WebSocketSession session) {
if (SESSION_POOL.containsKey(uid)) {
AlarmSessionManager.removeAndClose(uid);
}
SESSION_POOL.put(uid, session);
log.info("添加 WebSocketSession 会话成功,uid=" + uid);
}
/**
* 获取会话
* @param uid 用户
* @return
*/
public static WebSocketSession get(String uid) {
return SESSION_POOL.get(uid);
}
/**
* 移除会话并关闭会话
* @param uid 用户
*/
public static void removeAndClose(String uid) {
WebSocketSession session = SESSION_POOL.get(uid);
if (session != null) {
try {
//关闭连接
session.close();
} catch (IOException ex) {
throw new RuntimeException("关闭ws会话失败!", ex);
}
}
SESSION_POOL.remove(uid);
}
}

3
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/SessionManager.java

@ -7,9 +7,6 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/**
* @author ysj
*/
@Slf4j @Slf4j
public class SessionManager { public class SessionManager {
/** /**

2
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/SocketPool.java

@ -10,4 +10,6 @@ public class SocketPool {
public static Map<String, MessageHandler> pool = new ConcurrentHashMap<>(); public static Map<String, MessageHandler> pool = new ConcurrentHashMap<>();
public static Map<String, AlarmHandler> alarm_pool = new ConcurrentHashMap<>();
} }

Loading…
Cancel
Save