diff --git a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebSocketClientConfig.java b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebSocketClientConfig.java deleted file mode 100644 index d887ad0..0000000 --- a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebSocketClientConfig.java +++ /dev/null @@ -1,93 +0,0 @@ -/* -package com.hnac.hzims.operational.config.ws; - -import cn.hutool.core.collection.CollectionUtil; -import com.alibaba.fastjson.JSONObject; -import com.hnac.hzims.operational.config.service.StFocusPropertiesService; -import com.hnac.hzims.operational.config.vo.StationRealVo; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@Slf4j -@EnableScheduling -@Component -public class WebSocketClientConfig { - - @Value("${hzims.config.ws-url}") - String wsUrl; - - List clientList; - - @Autowired - private StFocusPropertiesService stFocusPropertiesService; - - - private static final Integer MAX_SEND = 40; - - */ -/** - * 定时30秒推送一次消息 - *//* - - @Scheduled(cron = "0/30 * * * * ?") - private void keepAlive() throws InterruptedException, URISyntaxException { - */ -/*try { - List message = stFocusPropertiesService.getStationRealIds(); - if (CollectionUtil.isEmpty(message)) { - return; - } - int limit = countStep(message.size()); - List> list = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(a -> message.stream().skip(a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList()); - if (CollectionUtil.isEmpty(list)) { - return; - } - if (CollectionUtil.isEmpty(clientList)) { - clientList = new ArrayList<>(); - for (int i = 0; i < limit; i++) { - WebsocketConfigClient client = new WebsocketConfigClient(new URI(wsUrl)); - if (client.connectBlocking()) { - String json = JSONObject.toJSONString(list.get(i)); - client.send(json); - clientList.add(client); - } - } - return; - } - if(CollectionUtil.isNotEmpty(clientList.stream().filter(o->!o.isOpen()).collect(Collectors.toList()))){ - clientList.clear(); - return; - } - for (int i = 0; i < limit; i++) { - String json = JSONObject.toJSONString(list.get(i)); - WebsocketConfigClient client = clientList.get(i); - client.send(json); - } - }catch (Exception e){ - log.error("[WebsocketConfigClient] 错误={}",e.toString()); - }*//* - - } - - - */ -/** - * 计算切分次数 - *//* - - private static Integer countStep(Integer size) { - return (size + MAX_SEND - 1) / MAX_SEND; - } -} -*/ diff --git a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebsocketConfigClient.java b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebsocketConfigClient.java deleted file mode 100644 index 0860782..0000000 --- a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ws/WebsocketConfigClient.java +++ /dev/null @@ -1,124 +0,0 @@ -/* -package com.hnac.hzims.operational.config.ws; - -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; -import com.hnac.hzims.operational.config.service.StAlamRecordService; -import com.hnac.hzims.operational.station.service.IRealMonitorService; -import jodd.util.StringUtil; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.MapUtils; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import org.springblade.core.tool.utils.SpringUtil; -import org.springframework.beans.factory.annotation.Value; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import java.net.URI; -import java.security.SecureRandom; -import java.security.cert.X509Certificate; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - - -@Slf4j -public class WebsocketConfigClient extends WebSocketClient { - - @Value("${hzims.config.ws-url}") - String wsUrl; - - StAlamRecordService stAlamRecordService; - - private static final String MSG_SUCCESS_STATUS = "200"; - - - public WebsocketConfigClient(URI serverUri) { - super(serverUri); - stAlamRecordService = SpringUtil.getBean(StAlamRecordService.class); - if (serverUri.toString().contains("wss://") && serverUri.toString().contains("/data")) { - trustAllHosts(this); - } - } - - @Override - public void onOpen(ServerHandshake handshakedata) { - System.out.println("onOpen"); - } - - @Override - public void onMessage(String message) { - try { - manageHandleMsg(message); - } catch (Exception e) { - log.error("websocketConfigClient error{}",message,e); - } - log.error("onMessage"); - } - - @Override - public void onClose(int code, String reason, boolean remote) { - log.error("-----------------------------------------------,123,onClose"); - } - - @Override - public void onError(Exception ex) { - log.error("------------------onError"); - } - - - private void manageHandleMsg(String message) { - if(StringUtil.isBlank(message)) { - return; - } - // 将信息转换成json - JSONObject msg = JSONObject.parseObject(message); - String status = msg.getString("status"); - String result = msg.getString("result"); - if(StringUtil.isBlank(result)){ - return; - } - if(StringUtil.isBlank(status) || !MSG_SUCCESS_STATUS.equals(status)){ - return; - } - // 将结果响应结果转换成嵌套Map格式 - Map> resultMap = JSONObject.parseObject(result, new TypeReference>>(){}); - if(MapUtils.isEmpty(resultMap)) { - return; - } - // 存储redis - stAlamRecordService.receiveDataStorage(resultMap); - } - - - - void trustAllHosts(WebsocketConfigClient appClient) { - log.info("[websocket] wss 连接 ----- "); - try { - // wss需添加 - SSLContext sslContext = SSLContext.getInstance("TLS"); - sslContext.init(null, new TrustManager[]{new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, - String authType) { - } - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - }}, new SecureRandom()); - SSLSocketFactory factory = sslContext.getSocketFactory(); - appClient.setSocket(factory.createSocket()); - } catch (Exception e) { - log.error("[websocket] trustAllHosts 错误 - " + e.toString()); - } - } - -} -*/ diff --git a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/station/service/impl/RealMonitorServiceImpl.java b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/station/service/impl/RealMonitorServiceImpl.java index 5181197..4fe1b37 100644 --- a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/station/service/impl/RealMonitorServiceImpl.java +++ b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/station/service/impl/RealMonitorServiceImpl.java @@ -50,8 +50,6 @@ import org.springframework.web.socket.TextMessage; import javax.validation.constraints.NotNull; import java.math.BigDecimal; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -163,45 +161,43 @@ public class RealMonitorServiceImpl implements IRealMonitorService { } // 数据切割 int limit = countStep(stationRealVos.size()); - List> list = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(a -> stationRealVos.stream().skip(a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList()); + List> list = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(a -> stationRealVos.stream().skip((long) a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList()); ExecutorService pool = Executors.newFixedThreadPool(limit); // Map valueMap = new ConcurrentHashMap<>(); // > Map> keyMap = new ConcurrentHashMap<>(); CountDownLatch countDownLatch = new CountDownLatch(limit); - pool.execute(()->{ - list.forEach(stations -> { - stations.forEach(stationReal -> { - String[] realIdArr = stationReal.getRealId(); - List realIds = Stream.of(realIdArr).collect(Collectors.toList()); - if(CollectionUtil.isEmpty(realIds)){ - return; - } - List objects = redisClient.getBatchRealDataByRealId(stationReal.getStation(),realIds); - if(CollectionUtil.isEmpty(objects)){ - return; - } - for(int i = 0; i < objects.size() ;i++){ - if(ObjectUtil.isEmpty(objects.get(i))){ - log.error(realIds.get(i) + "is null"); - continue; - } - Map attribute = (Map) JSONObject.parse(objects.get(i)); - attribute.put("realId",attribute.get("k")); - attribute.put("value",attribute.get("v")); - attribute.put("time",attribute.get("t")); - attribute.remove("v"); - attribute.remove("k"); - attribute.remove("t"); - this.getCheckMap(attribute,switchOnOff); - valueMap.put(realIdArr[i],attribute.get("value")); - keyMap.put(realIdArr[i],attribute); + pool.execute(()-> list.forEach(stations -> { + stations.forEach(stationReal -> { + String[] realIdArr = stationReal.getRealId(); + List realIds = Stream.of(realIdArr).collect(Collectors.toList()); + if(CollectionUtil.isEmpty(realIds)){ + return; + } + List objects = redisClient.getBatchRealDataByRealId(stationReal.getStation(),realIds); + if(CollectionUtil.isEmpty(objects)){ + return; + } + for(int i = 0; i < objects.size() ;i++){ + if(ObjectUtil.isEmpty(objects.get(i))){ + log.error(realIds.get(i) + "is null"); + continue; } - }); - countDownLatch.countDown(); + Map attribute = (Map) JSONObject.parse(objects.get(i)); + attribute.put("realId",attribute.get("k")); + attribute.put("value",attribute.get("v")); + attribute.put("time",attribute.get("t")); + attribute.remove("v"); + attribute.remove("k"); + attribute.remove("t"); + this.getCheckMap(attribute,switchOnOff); + valueMap.put(realIdArr[i],attribute.get("value")); + keyMap.put(realIdArr[i],attribute); + } }); - }); + countDownLatch.countDown(); + })); // 等待所有线程执行完成 try { countDownLatch.await(); @@ -224,24 +220,19 @@ public class RealMonitorServiceImpl implements IRealMonitorService { * @return */ private void getCheckMap(Map value,List switchOnOff){ - try{ - // 不处理开机状态监测点 - if(switchOnOff.contains(value.get("realId"))){ - return; - } - String time = value.get("time"); - if(StringUtil.isEmpty(time)){ - return; - } - SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DateUtil.PATTERN_DATETIME); - Date date = simpleDateFormat.parse(time); - // 实时数据超出10分钟未刷新,值置为 0 显示 - if(System.currentTimeMillis() - date.getTime() > 10 * 60 * 1000L){ - value.put("value","0"); - } - }catch (ParseException e){ + // 不处理开机状态监测点 + if(switchOnOff.contains(value.get("realId"))){ return; } + String time = value.get("time"); + if(StringUtil.isEmpty(time)){ + return; + } + Date date = DateUtil.parse(time,DateUtil.PATTERN_DATETIME); + // 实时数据超出10分钟未刷新,值置为 0 显示 + if(System.currentTimeMillis() - date.getTime() > 10 * 60 * 1000L){ + value.put("value","0"); + } } /** @@ -285,45 +276,23 @@ public class RealMonitorServiceImpl implements IRealMonitorService { // key->机构编号 value -> 设备集合 Map> deviceMap = devices.stream().collect(Collectors.groupingBy(EminfoAndEmParamVo::getCreateDept)); Map data = new HashMap<>(); - deviceMap.entrySet().forEach(entry -> { - JointRelayVo value = new JointRelayVo(); - List relays = entry.getValue().stream().filter(o-> MapUtils.isNotEmpty(o.getPoint()) && !StringUtil.isEmpty(o.getPoint().get(HomePageConstant.JOINT_RELAY))).map(relay-> map.get(relay.getPoint().get(HomePageConstant.JOINT_RELAY))).collect(Collectors.toList()); - if(CollectionUtil.isEmpty(relays)){ - value.setUnitSum(0); - value.setShutdownCount(0); - value.setStartingUpCount(0); - }else{ - value.setUnitSum(relays.size()); - Optional> off = Optional.ofNullable(relays.stream().filter(relay -> "0".equals(relay)).collect(Collectors.toList())); - value.setShutdownCount(off.map(o->o.size()).orElse(0)); - Optional> on = Optional.ofNullable(relays.stream().filter(relay -> "1".equals(relay)).collect(Collectors.toList())); - value.setStartingUpCount(on.map(o->o.size()).orElse(0)); + deviceMap.forEach((key, value) -> { + JointRelayVo jointRelay = new JointRelayVo(); + List relays = value.stream().filter(o -> MapUtils.isNotEmpty(o.getPoint()) && !StringUtil.isEmpty(o.getPoint().get(HomePageConstant.JOINT_RELAY))).map(relay -> map.get(relay.getPoint().get(HomePageConstant.JOINT_RELAY))).collect(Collectors.toList()); + if (CollectionUtil.isEmpty(relays)) { + jointRelay.setUnitSum(0); + jointRelay.setShutdownCount(0); + jointRelay.setStartingUpCount(0); + } else { + jointRelay.setUnitSum(relays.size()); + Optional> off = Optional.ofNullable(relays.stream().filter("0"::equals).collect(Collectors.toList())); + jointRelay.setShutdownCount(off.map(List::size).orElse(0)); + Optional> on = Optional.ofNullable(relays.stream().filter("1"::equals).collect(Collectors.toList())); + jointRelay.setStartingUpCount(on.map(List::size).orElse(0)); } - data.put(entry.getKey(),value); + data.put(key, jointRelay); }); redisTemplate.opsForValue().set(joint_relay_key, data); - - /*List jointRelayInfo = alertService.getJointRelayInfo(); - - //根据站点统计开关机 - Map> jointRelayMap = jointRelayInfo.stream().collect(Collectors.groupingBy(AnalyzeCodeBySignagesVO::getStation)); - jointRelayMap.forEach((key, value) -> { - List readIdList = value.stream().map(AnalyzeCodeBySignagesVO::getRealId).collect(Collectors.toList()); - vo = new JointRelayVo(); - readIdList.forEach(iter -> { - String jointRelay = map.get(iter); - if ("0".equals(jointRelay)) {//关机 - vo.setShutdownCount(vo.getShutdownCount() + 1); - vo.setShutdownRealId(iter + "," + vo.getShutdownRealId()); - vo.setUnitSum(vo.getUnitSum() + 1); - } else if ("1".equals(jointRelay)) {//开机 - vo.setStartingUpCount(vo.getStartingUpCount() + 1); - vo.setStartingUpRealId(iter + "," + vo.getStartingUpRealId()); - vo.setUnitSum(vo.getUnitSum() + 1); - } - }); - stationJointRelayMap.put(key, vo); - });*/ } /** @@ -393,7 +362,7 @@ public class RealMonitorServiceImpl implements IRealMonitorService { */ @Override public void centralizedMonitoring(String param) { - Long beginTime = System.currentTimeMillis(); + long beginTime = System.currentTimeMillis(); Thread thread = Thread.currentThread(); Object json = redisTemplate.opsForValue().get(real_id_key_gather_path); if (ObjectUtil.isEmpty(json)) { @@ -440,19 +409,17 @@ public class RealMonitorServiceImpl implements IRealMonitorService { // 根据站点分组 Map> stationAttbtMap = list.stream().collect(Collectors.groupingBy(StationAttributeEntity::getStationId)); // 获取站点列表 - List stationEntityList = stationService.getStationByInCode(stationAttbtMap.keySet().stream().collect(Collectors.toList())); + List stationEntityList = stationService.getStationByInCode(new ArrayList<>(stationAttbtMap.keySet())); log.info(thread.getName() + "步骤8站点列表 耗时 : {}",System.currentTimeMillis() - beginTime); beginTime = System.currentTimeMillis(); // 隐藏设备列表 List hideList = stationAttrConfigService.getHideList(); // 分割,每个map限制10个长度 - List>> handleList = this.mapChunk(stationAttbtMap,MAX_SEND); + List>> handleList = this.mapChunk(stationAttbtMap); log.info(thread.getName() + "步骤9监测点数据切割 耗时 : {}",System.currentTimeMillis() - beginTime); beginTime = System.currentTimeMillis(); // 创建线程池 ExecutorService pool = Executors.newFixedThreadPool(handleList.size()); - //ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("centralized_monitor-pool-%d").build(); - //ExecutorService pool = new ThreadPoolExecutor(5, handleList.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); CountDownLatch countDownLatch = new CountDownLatch(handleList.size()); log.info(thread.getName() +"步骤10创建线程 耗时 : {}",System.currentTimeMillis() - beginTime); beginTime = System.currentTimeMillis(); @@ -521,14 +488,14 @@ public class RealMonitorServiceImpl implements IRealMonitorService { /** * map分割 - * @param chunkMap - * @param chunkNum - * @return + * * @param * @param + * @param chunkMap + * @return */ - private List> mapChunk(Map chunkMap, int chunkNum) { - if (chunkMap == null || chunkNum <= 0) { + private List> mapChunk(Map chunkMap) { + if (chunkMap == null || RealMonitorServiceImpl.MAX_SEND <= 0) { List> list = new ArrayList<>(); list.add(chunkMap); return list; @@ -541,7 +508,7 @@ public class RealMonitorServiceImpl implements IRealMonitorService { while (iterator.hasNext()) { k next = iterator.next(); tem.put(next, chunkMap.get(next)); - if (i == chunkNum) { + if (i == RealMonitorServiceImpl.MAX_SEND) { total.add(tem); tem = new HashMap<>(); i = 0; @@ -677,9 +644,9 @@ public class RealMonitorServiceImpl implements IRealMonitorService { // 正常状态 attest.setStatus(ConfigStatus.ConfigStatusEnum.NORMAL.getStatus()); int quality = Optional.ofNullable(attest.getQuality()).orElse(-1); - String time = Optional.ofNullable(attest.getTime()).orElse(null); + String time = Optional.ofNullable(attest.getTime()).orElse(""); // 延时状态 :质量为空 && 时间为空 - if (-1 == quality && StringUtil.isBlank(time)) { + if (-1 == quality && StringUtil.isEmpty(time)) { attest.setStatus(ConfigStatus.ConfigStatusEnum.GRAY.getStatus()); } int type = item.getAttributeType(); @@ -842,7 +809,7 @@ public class RealMonitorServiceImpl implements IRealMonitorService { */ @Override public TextMessage getSendMessage(MessageParamVo message) { - Long beginTime = System.currentTimeMillis(); + long beginTime = System.currentTimeMillis(); Thread thread = Thread.currentThread(); // 权限机构过滤 if(ObjectUtil.isEmpty(message) || CollectionUtil.isEmpty(message.getDeptIds())){