|
|
|
@ -1,8 +1,8 @@
|
|
|
|
|
package com.hnac.hzims.operational.alert.service.impl; |
|
|
|
|
|
|
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
|
|
|
|
import com.hnac.hzims.message.MessageConstants; |
|
|
|
|
import com.hnac.hzims.message.dto.BusinessMessageDTO; |
|
|
|
|
import com.hnac.hzims.message.dto.MessagePushRecordDto; |
|
|
|
|
import com.hnac.hzims.message.fegin.IMessageClient; |
|
|
|
|
import com.hnac.hzims.operational.alert.constants.AbnormalAlarmConstant; |
|
|
|
|
import com.hnac.hzims.operational.alert.entity.AbnormalAlarmEntity; |
|
|
|
@ -33,8 +33,6 @@ import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
import java.time.LocalDateTime; |
|
|
|
|
import java.util.*; |
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
import java.util.function.Function; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -59,39 +57,57 @@ public class AbnormalAlarmServiceImpl extends BaseServiceImpl<AbnormalAlarmMappe
|
|
|
|
|
private final IMessageClient messageClient; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 数据中断、数据异常告警 |
|
|
|
|
* 故障、中断,异常告警数据处理 |
|
|
|
|
* @param param |
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public void alarmDataHandle(String param) { |
|
|
|
|
// 查询代运维站点
|
|
|
|
|
// 查询站点站点
|
|
|
|
|
List<StationEntity> stations = stationService.list(); |
|
|
|
|
if(CollectionUtil.isEmpty(stations)){ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// 查询告警数据: 间隔3分钟
|
|
|
|
|
SoeQueryConditionByStation query = new SoeQueryConditionByStation(); |
|
|
|
|
query.setTypes(AbnormalAlarmConstant.SEND_MESSSAGE_TYPE_LIST); |
|
|
|
|
query.setStationIds(stations.stream().map(StationEntity::getCode).collect(Collectors.toList())); |
|
|
|
|
Calendar calendar = Calendar.getInstance(); |
|
|
|
|
query.setEndTime(LocalDateTime.parse(DateUtil.format(calendar.getTime(), DateUtil.PATTERN_DATETIME),DateUtil.DATETIME_FORMATTER)); |
|
|
|
|
calendar.add(Calendar.MINUTE,-2); |
|
|
|
|
calendar.add(Calendar.MINUTE,-3); |
|
|
|
|
query.setBeginTime(LocalDateTime.parse(DateUtil.format(calendar.getTime() , DateUtil.PATTERN_DATETIME),DateUtil.DATETIME_FORMATTER)); |
|
|
|
|
query.setNeedPage(false); |
|
|
|
|
log.error("alarm_data_handle_param : {}",query); |
|
|
|
|
query.setPage(1); |
|
|
|
|
query.setLimit(100000); |
|
|
|
|
Result<HzPage<SoeData>> result = soeClient.getByStationsAndTime(query); |
|
|
|
|
// 未查询到告警信息
|
|
|
|
|
if(!result.isSuccess() || ObjectUtil.isEmpty(result.getData().getRecords())){ |
|
|
|
|
if(!result.isSuccess() || ObjectUtil.isEmpty(result.getData()) || CollectionUtil.isEmpty(result.getData().getRecords())) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
log.error("alarm_data_handle_begin_result : {}",result.getData().getRecords()); |
|
|
|
|
// 遍历告警信息
|
|
|
|
|
List<SoeData> list = new ArrayList<>(result.getData().getRecords().stream().sorted(Comparator.comparing(SoeData::getTs).reversed()) |
|
|
|
|
.collect(Collectors.toMap(o -> o.getStation() + o.getSoeType(), Function.identity(), (o1, o2) -> o1)).values()); |
|
|
|
|
log.error("alarm_data_handle_end_result : {}",list); |
|
|
|
|
list.forEach(item -> { |
|
|
|
|
result.getData().getRecords().forEach(item -> { |
|
|
|
|
if(!AbnormalAlarmConstant.INTERRUPT_LIST.contains(item.getSoeType())){ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
this.interrupt(stations,item); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
// 历史数据处理
|
|
|
|
|
this.saveHistoryAlarm(stations,result.getData().getRecords()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 中断、异常数据处理 |
|
|
|
|
* @param stations |
|
|
|
|
* @param item |
|
|
|
|
*/ |
|
|
|
|
private void interrupt(List<StationEntity> stations,SoeData item) { |
|
|
|
|
// 查询中断数据
|
|
|
|
|
AbnormalAlarmEntity queryEntity = this.baseMapper.getAbnormalAlarm(item.getStation(),item.getSoeType()); |
|
|
|
|
// 是否为中断恢复
|
|
|
|
|
boolean flag = AbnormalAlarmConstant.ABNORMAL_STATUS.equals(item.getSoeAlarmType()); |
|
|
|
|
// 站点名称
|
|
|
|
|
String stationName = Optional.ofNullable(stations.stream().filter(o-> o.getCode().equals(item.getStation())).collect(Collectors.toList())).map(o->o.get(0).getName()).orElse(null); |
|
|
|
|
// 不存在记录进行保存
|
|
|
|
|
if(ObjectUtil.isEmpty(queryEntity)){ |
|
|
|
|
AbnormalAlarmEntity entity = new AbnormalAlarmEntity(); |
|
|
|
|
entity.setStationId(item.getStation()); |
|
|
|
@ -109,11 +125,9 @@ public class AbnormalAlarmServiceImpl extends BaseServiceImpl<AbnormalAlarmMappe
|
|
|
|
|
} |
|
|
|
|
// 保存告警信息
|
|
|
|
|
this.save(entity); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// 存在记录进行修改
|
|
|
|
|
}else{ |
|
|
|
|
queryEntity.setSoeExplain(item.getSoeExplain()); |
|
|
|
|
queryEntity.setStationName(stationName); |
|
|
|
|
queryEntity.setType(item.getSoeType()); |
|
|
|
|
queryEntity.setStartTime(queryEntity.getStartTime()); |
|
|
|
|
queryEntity.setUpdateTime(new Date()); |
|
|
|
|
queryEntity.setEndTime(null); |
|
|
|
@ -123,60 +137,46 @@ public class AbnormalAlarmServiceImpl extends BaseServiceImpl<AbnormalAlarmMappe
|
|
|
|
|
queryEntity.setStatus(1); |
|
|
|
|
} |
|
|
|
|
this.updateById(queryEntity); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
// 异步保存历史告警
|
|
|
|
|
CompletableFuture.supplyAsync(()-> this.saveHistoryAlarm(result.getData().getRecords(),stations)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 查询实时告警数据 |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public List<String> getAbnormalAlarmList() { |
|
|
|
|
List<String> alarmList = this.baseMapper.getAbnormalAlarmList(); |
|
|
|
|
if(CollectionUtil.isEmpty(alarmList)){ |
|
|
|
|
return new ArrayList<>(); |
|
|
|
|
} |
|
|
|
|
return alarmList; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 保存告警历史信息 |
|
|
|
|
* 历史数据处理 |
|
|
|
|
* |
|
|
|
|
* @param stations |
|
|
|
|
* @param list |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
private String saveHistoryAlarm(List<SoeData> list,List<StationEntity> stations) { |
|
|
|
|
private void saveHistoryAlarm(List<StationEntity> stations, List<SoeData> list) { |
|
|
|
|
List<HistoryAbnormalAlarmEntity> historys = historyAbnormalAlarmService.list(Wrappers.<HistoryAbnormalAlarmEntity>lambdaQuery() |
|
|
|
|
.in(HistoryAbnormalAlarmEntity::getAlarmId,list.stream().map(SoeData::getId).collect(Collectors.toList())) |
|
|
|
|
); |
|
|
|
|
Set<String> explainSet = new HashSet<>(); |
|
|
|
|
list.forEach(item->{ |
|
|
|
|
Date ts = DateUtil.parse(DateUtil.format(item.getTs(),DateUtil.PATTERN_DATETIME),DateUtil.DATETIME_FORMAT); |
|
|
|
|
// 历史数据异常查询
|
|
|
|
|
HistoryAbnormalAlarmEntity queryEntity = this.historyAbnormalAlarmService.getAbnormalAlarm(item.getStation(),item.getSoeType()); |
|
|
|
|
// 数据中断恢复
|
|
|
|
|
boolean flag = AbnormalAlarmConstant.ABNORMAL_STATUS.equals(item.getSoeAlarmType()); |
|
|
|
|
String stationName = Optional.ofNullable(stations.stream().filter(o-> o.getCode().equals(item.getStation())).collect(Collectors.toList())).map(o->o.get(0).getName()).orElse(null); |
|
|
|
|
if(ObjectUtil.isEmpty(queryEntity) || !flag){ |
|
|
|
|
if(CollectionUtil.isNotEmpty(historys) && historys.stream().map(HistoryAbnormalAlarmEntity::getAlarmId).collect(Collectors.toList()).contains(item.getId())){ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if(explainSet.contains(item.getSoeExplain())){ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
HistoryAbnormalAlarmEntity entity = new HistoryAbnormalAlarmEntity(); |
|
|
|
|
String stationName = Optional.ofNullable(stations.stream().filter(o-> o.getCode().equals(item.getStation())).collect(Collectors.toList())).map(o->o.get(0).getName()).orElse(null); |
|
|
|
|
entity.setAlarmId(item.getId()); |
|
|
|
|
entity.setStationId(item.getStation()); |
|
|
|
|
entity.setStationName(stationName); |
|
|
|
|
entity.setRealId(item.getRealId()); |
|
|
|
|
entity.setSoeExplain(item.getSoeExplain()); |
|
|
|
|
entity.setType(item.getSoeType()); |
|
|
|
|
entity.setStartTime(ts); |
|
|
|
|
entity.setStatus(0); |
|
|
|
|
entity.setStartTime(item.getTs()); |
|
|
|
|
if(AbnormalAlarmConstant.ABNORMAL_STATUS.equals(item.getSoeAlarmType())){ |
|
|
|
|
entity.setStatus(1); |
|
|
|
|
}else{ |
|
|
|
|
entity.setStatus(1); |
|
|
|
|
} |
|
|
|
|
this.historyAbnormalAlarmService.save(entity); |
|
|
|
|
// 消息推送
|
|
|
|
|
// 相同告警只允许添加一次,发送一次消息
|
|
|
|
|
explainSet.add(item.getSoeExplain()); |
|
|
|
|
this.sendAlarmMessage(Collections.singletonList(entity),stations); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
queryEntity.setSoeExplain(item.getSoeExplain()); |
|
|
|
|
queryEntity.setUpdateTime(new Date()); |
|
|
|
|
queryEntity.setEndTime(ts); |
|
|
|
|
queryEntity.setStatus(1); |
|
|
|
|
this.historyAbnormalAlarmService.updateById(queryEntity); |
|
|
|
|
}); |
|
|
|
|
return "success"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -203,12 +203,13 @@ public class AbnormalAlarmServiceImpl extends BaseServiceImpl<AbnormalAlarmMappe
|
|
|
|
|
} |
|
|
|
|
List<User> users = this.parentAuthUser(depts.get(0)); |
|
|
|
|
if(CollectionUtil.isEmpty(users)){ |
|
|
|
|
log.error("alarmmessagestation {} user is null",entity.getStationId()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
BusinessMessageDTO message = new BusinessMessageDTO(); |
|
|
|
|
message.setDeptId(depts.get(0)); |
|
|
|
|
message.setBusinessClassify("warning"); |
|
|
|
|
message.setUserIds(users.stream().map(o->String.valueOf(o.getId())).collect(Collectors.joining(","))); |
|
|
|
|
message.setUserIds(users.stream().map(o->String.valueOf(o.getId())).distinct().collect(Collectors.joining(","))); |
|
|
|
|
message.setBusinessKey(MessageConstants.BusinessClassifyEnum.WARNING.getKey()); |
|
|
|
|
message.setSubject(MessageConstants.BusinessClassifyEnum.WARNING.getDescription()); |
|
|
|
|
message.setTaskId(entity.getId()); |
|
|
|
@ -237,11 +238,25 @@ public class AbnormalAlarmServiceImpl extends BaseServiceImpl<AbnormalAlarmMappe
|
|
|
|
|
} |
|
|
|
|
R<Dept> dept = sysClient.getDept(deptId); |
|
|
|
|
if(dept.isSuccess() && ObjectUtil.isNotEmpty(dept.getData())){ |
|
|
|
|
R<List<User>> parents = userClient.userListByDeptId(dept.getData().getParentId()); |
|
|
|
|
R<List<User>> parents = userClient.userByDeptId("200000",deptId); |
|
|
|
|
if(parents.isSuccess() && CollectionUtil.isNotEmpty(parents.getData())){ |
|
|
|
|
users.addAll(parents.getData().stream().filter(o->!users.stream().map(User::getId).collect(Collectors.toList()).contains(o.getId())).collect(Collectors.toList())); |
|
|
|
|
users.addAll(parents.getData()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return users; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 查询实时告警数据 |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public List<String> getAbnormalAlarmList() { |
|
|
|
|
List<String> alarmList = this.baseMapper.getAbnormalAlarmList(); |
|
|
|
|
if(CollectionUtil.isEmpty(alarmList)){ |
|
|
|
|
return new ArrayList<>(); |
|
|
|
|
} |
|
|
|
|
return alarmList; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|