Browse Source

#集中监控修改

zhongwei
yang_shj 2 years ago
parent
commit
b8e7c2d641
  1. 93
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebSocketClientConfig.java
  2. 124
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebsocketConfigClient.java
  3. 165
      hzims-service/operational/src/main/java/com/hnac/hzims/operational/station/service/impl/RealMonitorServiceImpl.java

93
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebSocketClientConfig.java

@ -1,93 +0,0 @@
/*
package com.hnac.hzims.operational.config.ws;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.hnac.hzims.operational.config.service.StFocusPropertiesService;
import com.hnac.hzims.operational.config.vo.StationRealVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
@EnableScheduling
@Component
public class WebSocketClientConfig {
@Value("${hzims.config.ws-url}")
String wsUrl;
List<WebsocketConfigClient> clientList;
@Autowired
private StFocusPropertiesService stFocusPropertiesService;
private static final Integer MAX_SEND = 40;
*/
/**
* 定时30秒推送一次消息
*//*
@Scheduled(cron = "0/30 * * * * ?")
private void keepAlive() throws InterruptedException, URISyntaxException {
*/
/*try {
List<StationRealVo> message = stFocusPropertiesService.getStationRealIds();
if (CollectionUtil.isEmpty(message)) {
return;
}
int limit = countStep(message.size());
List<List<StationRealVo>> list = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(a -> message.stream().skip(a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList());
if (CollectionUtil.isEmpty(list)) {
return;
}
if (CollectionUtil.isEmpty(clientList)) {
clientList = new ArrayList<>();
for (int i = 0; i < limit; i++) {
WebsocketConfigClient client = new WebsocketConfigClient(new URI(wsUrl));
if (client.connectBlocking()) {
String json = JSONObject.toJSONString(list.get(i));
client.send(json);
clientList.add(client);
}
}
return;
}
if(CollectionUtil.isNotEmpty(clientList.stream().filter(o->!o.isOpen()).collect(Collectors.toList()))){
clientList.clear();
return;
}
for (int i = 0; i < limit; i++) {
String json = JSONObject.toJSONString(list.get(i));
WebsocketConfigClient client = clientList.get(i);
client.send(json);
}
}catch (Exception e){
log.error("[WebsocketConfigClient] 错误={}",e.toString());
}*//*
}
*/
/**
* 计算切分次数
*//*
private static Integer countStep(Integer size) {
return (size + MAX_SEND - 1) / MAX_SEND;
}
}
*/

124
hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebsocketConfigClient.java

@ -1,124 +0,0 @@
/*
package com.hnac.hzims.operational.config.ws;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.hnac.hzims.operational.config.service.StAlamRecordService;
import com.hnac.hzims.operational.station.service.IRealMonitorService;
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springblade.core.tool.utils.SpringUtil;
import org.springframework.beans.factory.annotation.Value;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.net.URI;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class WebsocketConfigClient extends WebSocketClient {
@Value("${hzims.config.ws-url}")
String wsUrl;
StAlamRecordService stAlamRecordService;
private static final String MSG_SUCCESS_STATUS = "200";
public WebsocketConfigClient(URI serverUri) {
super(serverUri);
stAlamRecordService = SpringUtil.getBean(StAlamRecordService.class);
if (serverUri.toString().contains("wss://") && serverUri.toString().contains("/data")) {
trustAllHosts(this);
}
}
@Override
public void onOpen(ServerHandshake handshakedata) {
System.out.println("onOpen");
}
@Override
public void onMessage(String message) {
try {
manageHandleMsg(message);
} catch (Exception e) {
log.error("websocketConfigClient error{}",message,e);
}
log.error("onMessage");
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.error("-----------------------------------------------,123,onClose");
}
@Override
public void onError(Exception ex) {
log.error("------------------onError");
}
private void manageHandleMsg(String message) {
if(StringUtil.isBlank(message)) {
return;
}
// 将信息转换成json
JSONObject msg = JSONObject.parseObject(message);
String status = msg.getString("status");
String result = msg.getString("result");
if(StringUtil.isBlank(result)){
return;
}
if(StringUtil.isBlank(status) || !MSG_SUCCESS_STATUS.equals(status)){
return;
}
// 将结果响应结果转换成嵌套Map格式
Map<String, Map<String, String>> resultMap = JSONObject.parseObject(result, new TypeReference<Map<String, Map<String, String>>>(){});
if(MapUtils.isEmpty(resultMap)) {
return;
}
// 存储redis
stAlamRecordService.receiveDataStorage(resultMap);
}
void trustAllHosts(WebsocketConfigClient appClient) {
log.info("[websocket] wss 连接 ----- ");
try {
// wss需添加
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, new TrustManager[]{new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain,
String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}}, new SecureRandom());
SSLSocketFactory factory = sslContext.getSocketFactory();
appClient.setSocket(factory.createSocket());
} catch (Exception e) {
log.error("[websocket] trustAllHosts 错误 - " + e.toString());
}
}
}
*/

165
hzims-service/operational/src/main/java/com/hnac/hzims/operational/station/service/impl/RealMonitorServiceImpl.java

@ -50,8 +50,6 @@ import org.springframework.web.socket.TextMessage;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -163,45 +161,43 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
} }
// 数据切割 // 数据切割
int limit = countStep(stationRealVos.size()); int limit = countStep(stationRealVos.size());
List<List<StationRealVo>> list = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(a -> stationRealVos.stream().skip(a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList()); List<List<StationRealVo>> list = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(a -> stationRealVos.stream().skip((long) a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList());
ExecutorService pool = Executors.newFixedThreadPool(limit); ExecutorService pool = Executors.newFixedThreadPool(limit);
// <real,value> // <real,value>
Map<String,String> valueMap = new ConcurrentHashMap<>(); Map<String,String> valueMap = new ConcurrentHashMap<>();
// <real,<attribute,value>> // <real,<attribute,value>>
Map<String,Map<String,String>> keyMap = new ConcurrentHashMap<>(); Map<String,Map<String,String>> keyMap = new ConcurrentHashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(limit); CountDownLatch countDownLatch = new CountDownLatch(limit);
pool.execute(()->{ pool.execute(()-> list.forEach(stations -> {
list.forEach(stations -> { stations.forEach(stationReal -> {
stations.forEach(stationReal -> { String[] realIdArr = stationReal.getRealId();
String[] realIdArr = stationReal.getRealId(); List<String> realIds = Stream.of(realIdArr).collect(Collectors.toList());
List<String> realIds = Stream.of(realIdArr).collect(Collectors.toList()); if(CollectionUtil.isEmpty(realIds)){
if(CollectionUtil.isEmpty(realIds)){ return;
return; }
} List<String> objects = redisClient.getBatchRealDataByRealId(stationReal.getStation(),realIds);
List<String> objects = redisClient.getBatchRealDataByRealId(stationReal.getStation(),realIds); if(CollectionUtil.isEmpty(objects)){
if(CollectionUtil.isEmpty(objects)){ return;
return; }
} for(int i = 0; i < objects.size() ;i++){
for(int i = 0; i < objects.size() ;i++){ if(ObjectUtil.isEmpty(objects.get(i))){
if(ObjectUtil.isEmpty(objects.get(i))){ log.error(realIds.get(i) + "is null");
log.error(realIds.get(i) + "is null"); continue;
continue;
}
Map<String,String> attribute = (Map<String, String>) JSONObject.parse(objects.get(i));
attribute.put("realId",attribute.get("k"));
attribute.put("value",attribute.get("v"));
attribute.put("time",attribute.get("t"));
attribute.remove("v");
attribute.remove("k");
attribute.remove("t");
this.getCheckMap(attribute,switchOnOff);
valueMap.put(realIdArr[i],attribute.get("value"));
keyMap.put(realIdArr[i],attribute);
} }
}); Map<String,String> attribute = (Map<String, String>) JSONObject.parse(objects.get(i));
countDownLatch.countDown(); attribute.put("realId",attribute.get("k"));
attribute.put("value",attribute.get("v"));
attribute.put("time",attribute.get("t"));
attribute.remove("v");
attribute.remove("k");
attribute.remove("t");
this.getCheckMap(attribute,switchOnOff);
valueMap.put(realIdArr[i],attribute.get("value"));
keyMap.put(realIdArr[i],attribute);
}
}); });
}); countDownLatch.countDown();
}));
// 等待所有线程执行完成 // 等待所有线程执行完成
try { try {
countDownLatch.await(); countDownLatch.await();
@ -224,24 +220,19 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
* @return * @return
*/ */
private void getCheckMap(Map<String, String> value,List<String> switchOnOff){ private void getCheckMap(Map<String, String> value,List<String> switchOnOff){
try{ // 不处理开机状态监测点
// 不处理开机状态监测点 if(switchOnOff.contains(value.get("realId"))){
if(switchOnOff.contains(value.get("realId"))){
return;
}
String time = value.get("time");
if(StringUtil.isEmpty(time)){
return;
}
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DateUtil.PATTERN_DATETIME);
Date date = simpleDateFormat.parse(time);
// 实时数据超出10分钟未刷新,值置为 0 显示
if(System.currentTimeMillis() - date.getTime() > 10 * 60 * 1000L){
value.put("value","0");
}
}catch (ParseException e){
return; return;
} }
String time = value.get("time");
if(StringUtil.isEmpty(time)){
return;
}
Date date = DateUtil.parse(time,DateUtil.PATTERN_DATETIME);
// 实时数据超出10分钟未刷新,值置为 0 显示
if(System.currentTimeMillis() - date.getTime() > 10 * 60 * 1000L){
value.put("value","0");
}
} }
/** /**
@ -285,45 +276,23 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
// key->机构编号 value -> 设备集合 // key->机构编号 value -> 设备集合
Map<Long,List<EminfoAndEmParamVo>> deviceMap = devices.stream().collect(Collectors.groupingBy(EminfoAndEmParamVo::getCreateDept)); Map<Long,List<EminfoAndEmParamVo>> deviceMap = devices.stream().collect(Collectors.groupingBy(EminfoAndEmParamVo::getCreateDept));
Map<Long, JointRelayVo> data = new HashMap<>(); Map<Long, JointRelayVo> data = new HashMap<>();
deviceMap.entrySet().forEach(entry -> { deviceMap.forEach((key, value) -> {
JointRelayVo value = new JointRelayVo(); JointRelayVo jointRelay = new JointRelayVo();
List<String> relays = entry.getValue().stream().filter(o-> MapUtils.isNotEmpty(o.getPoint()) && !StringUtil.isEmpty(o.getPoint().get(HomePageConstant.JOINT_RELAY))).map(relay-> map.get(relay.getPoint().get(HomePageConstant.JOINT_RELAY))).collect(Collectors.toList()); List<String> relays = value.stream().filter(o -> MapUtils.isNotEmpty(o.getPoint()) && !StringUtil.isEmpty(o.getPoint().get(HomePageConstant.JOINT_RELAY))).map(relay -> map.get(relay.getPoint().get(HomePageConstant.JOINT_RELAY))).collect(Collectors.toList());
if(CollectionUtil.isEmpty(relays)){ if (CollectionUtil.isEmpty(relays)) {
value.setUnitSum(0); jointRelay.setUnitSum(0);
value.setShutdownCount(0); jointRelay.setShutdownCount(0);
value.setStartingUpCount(0); jointRelay.setStartingUpCount(0);
}else{ } else {
value.setUnitSum(relays.size()); jointRelay.setUnitSum(relays.size());
Optional<List<String>> off = Optional.ofNullable(relays.stream().filter(relay -> "0".equals(relay)).collect(Collectors.toList())); Optional<List<String>> off = Optional.ofNullable(relays.stream().filter("0"::equals).collect(Collectors.toList()));
value.setShutdownCount(off.map(o->o.size()).orElse(0)); jointRelay.setShutdownCount(off.map(List::size).orElse(0));
Optional<List<String>> on = Optional.ofNullable(relays.stream().filter(relay -> "1".equals(relay)).collect(Collectors.toList())); Optional<List<String>> on = Optional.ofNullable(relays.stream().filter("1"::equals).collect(Collectors.toList()));
value.setStartingUpCount(on.map(o->o.size()).orElse(0)); jointRelay.setStartingUpCount(on.map(List::size).orElse(0));
} }
data.put(entry.getKey(),value); data.put(key, jointRelay);
}); });
redisTemplate.opsForValue().set(joint_relay_key, data); redisTemplate.opsForValue().set(joint_relay_key, data);
/*List<AnalyzeCodeBySignagesVO> jointRelayInfo = alertService.getJointRelayInfo();
//根据站点统计开关机
Map<String, List<AnalyzeCodeBySignagesVO>> jointRelayMap = jointRelayInfo.stream().collect(Collectors.groupingBy(AnalyzeCodeBySignagesVO::getStation));
jointRelayMap.forEach((key, value) -> {
List<String> readIdList = value.stream().map(AnalyzeCodeBySignagesVO::getRealId).collect(Collectors.toList());
vo = new JointRelayVo();
readIdList.forEach(iter -> {
String jointRelay = map.get(iter);
if ("0".equals(jointRelay)) {//关机
vo.setShutdownCount(vo.getShutdownCount() + 1);
vo.setShutdownRealId(iter + "," + vo.getShutdownRealId());
vo.setUnitSum(vo.getUnitSum() + 1);
} else if ("1".equals(jointRelay)) {//开机
vo.setStartingUpCount(vo.getStartingUpCount() + 1);
vo.setStartingUpRealId(iter + "," + vo.getStartingUpRealId());
vo.setUnitSum(vo.getUnitSum() + 1);
}
});
stationJointRelayMap.put(key, vo);
});*/
} }
/** /**
@ -393,7 +362,7 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
*/ */
@Override @Override
public void centralizedMonitoring(String param) { public void centralizedMonitoring(String param) {
Long beginTime = System.currentTimeMillis(); long beginTime = System.currentTimeMillis();
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
Object json = redisTemplate.opsForValue().get(real_id_key_gather_path); Object json = redisTemplate.opsForValue().get(real_id_key_gather_path);
if (ObjectUtil.isEmpty(json)) { if (ObjectUtil.isEmpty(json)) {
@ -440,19 +409,17 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
// 根据站点分组 // 根据站点分组
Map<String, List<StationAttributeEntity>> stationAttbtMap = list.stream().collect(Collectors.groupingBy(StationAttributeEntity::getStationId)); Map<String, List<StationAttributeEntity>> stationAttbtMap = list.stream().collect(Collectors.groupingBy(StationAttributeEntity::getStationId));
// 获取站点列表 // 获取站点列表
List<StationEntity> stationEntityList = stationService.getStationByInCode(stationAttbtMap.keySet().stream().collect(Collectors.toList())); List<StationEntity> stationEntityList = stationService.getStationByInCode(new ArrayList<>(stationAttbtMap.keySet()));
log.info(thread.getName() + "步骤8站点列表 耗时 : {}",System.currentTimeMillis() - beginTime); log.info(thread.getName() + "步骤8站点列表 耗时 : {}",System.currentTimeMillis() - beginTime);
beginTime = System.currentTimeMillis(); beginTime = System.currentTimeMillis();
// 隐藏设备列表 // 隐藏设备列表
List<String> hideList = stationAttrConfigService.getHideList(); List<String> hideList = stationAttrConfigService.getHideList();
// 分割,每个map限制10个长度 // 分割,每个map限制10个长度
List<Map<String, List<StationAttributeEntity>>> handleList = this.mapChunk(stationAttbtMap,MAX_SEND); List<Map<String, List<StationAttributeEntity>>> handleList = this.mapChunk(stationAttbtMap);
log.info(thread.getName() + "步骤9监测点数据切割 耗时 : {}",System.currentTimeMillis() - beginTime); log.info(thread.getName() + "步骤9监测点数据切割 耗时 : {}",System.currentTimeMillis() - beginTime);
beginTime = System.currentTimeMillis(); beginTime = System.currentTimeMillis();
// 创建线程池 // 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(handleList.size()); ExecutorService pool = Executors.newFixedThreadPool(handleList.size());
//ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("centralized_monitor-pool-%d").build();
//ExecutorService pool = new ThreadPoolExecutor(5, handleList.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
CountDownLatch countDownLatch = new CountDownLatch(handleList.size()); CountDownLatch countDownLatch = new CountDownLatch(handleList.size());
log.info(thread.getName() +"步骤10创建线程 耗时 : {}",System.currentTimeMillis() - beginTime); log.info(thread.getName() +"步骤10创建线程 耗时 : {}",System.currentTimeMillis() - beginTime);
beginTime = System.currentTimeMillis(); beginTime = System.currentTimeMillis();
@ -521,14 +488,14 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
/** /**
* map分割 * map分割
* @param chunkMap *
* @param chunkNum
* @return
* @param <k> * @param <k>
* @param <v> * @param <v>
* @param chunkMap
* @return
*/ */
private <k, v> List<Map<k, v>> mapChunk(Map<k, v> chunkMap, int chunkNum) { private <k, v> List<Map<k, v>> mapChunk(Map<k, v> chunkMap) {
if (chunkMap == null || chunkNum <= 0) { if (chunkMap == null || RealMonitorServiceImpl.MAX_SEND <= 0) {
List<Map<k, v>> list = new ArrayList<>(); List<Map<k, v>> list = new ArrayList<>();
list.add(chunkMap); list.add(chunkMap);
return list; return list;
@ -541,7 +508,7 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
while (iterator.hasNext()) { while (iterator.hasNext()) {
k next = iterator.next(); k next = iterator.next();
tem.put(next, chunkMap.get(next)); tem.put(next, chunkMap.get(next));
if (i == chunkNum) { if (i == RealMonitorServiceImpl.MAX_SEND) {
total.add(tem); total.add(tem);
tem = new HashMap<>(); tem = new HashMap<>();
i = 0; i = 0;
@ -677,9 +644,9 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
// 正常状态 // 正常状态
attest.setStatus(ConfigStatus.ConfigStatusEnum.NORMAL.getStatus()); attest.setStatus(ConfigStatus.ConfigStatusEnum.NORMAL.getStatus());
int quality = Optional.ofNullable(attest.getQuality()).orElse(-1); int quality = Optional.ofNullable(attest.getQuality()).orElse(-1);
String time = Optional.ofNullable(attest.getTime()).orElse(null); String time = Optional.ofNullable(attest.getTime()).orElse("");
// 延时状态 :质量为空 && 时间为空 // 延时状态 :质量为空 && 时间为空
if (-1 == quality && StringUtil.isBlank(time)) { if (-1 == quality && StringUtil.isEmpty(time)) {
attest.setStatus(ConfigStatus.ConfigStatusEnum.GRAY.getStatus()); attest.setStatus(ConfigStatus.ConfigStatusEnum.GRAY.getStatus());
} }
int type = item.getAttributeType(); int type = item.getAttributeType();
@ -842,7 +809,7 @@ public class RealMonitorServiceImpl implements IRealMonitorService {
*/ */
@Override @Override
public TextMessage getSendMessage(MessageParamVo message) { public TextMessage getSendMessage(MessageParamVo message) {
Long beginTime = System.currentTimeMillis(); long beginTime = System.currentTimeMillis();
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
// 权限机构过滤 // 权限机构过滤
if(ObjectUtil.isEmpty(message) || CollectionUtil.isEmpty(message.getDeptIds())){ if(ObjectUtil.isEmpty(message) || CollectionUtil.isEmpty(message.getDeptIds())){

Loading…
Cancel
Save