From e71ee9a69f072d1ea5773190bdc3aca819d07090 Mon Sep 17 00:00:00 2001 From: yang_shj <1069818635@QQ.com> Date: Mon, 12 Aug 2024 08:37:06 +0800 Subject: [PATCH] =?UTF-8?q?#=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../operation/monitor/impl/MonitorServiceImpl.java | 54 +++++++++++++--------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/hzims-service/hzims-scheduled/src/main/java/com/hnac/hzims/scheduled/service/operation/monitor/impl/MonitorServiceImpl.java b/hzims-service/hzims-scheduled/src/main/java/com/hnac/hzims/scheduled/service/operation/monitor/impl/MonitorServiceImpl.java index 3fbfb56..bb23b2d 100644 --- a/hzims-service/hzims-scheduled/src/main/java/com/hnac/hzims/scheduled/service/operation/monitor/impl/MonitorServiceImpl.java +++ b/hzims-service/hzims-scheduled/src/main/java/com/hnac/hzims/scheduled/service/operation/monitor/impl/MonitorServiceImpl.java @@ -78,7 +78,7 @@ public class MonitorServiceImpl implements MonitorService { private final IAnalyseDataSearchClient analyseDataSearchClient; - private static final ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("load-monitoring-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy()); + private static final ExecutorService pool = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(256), new ThreadFactoryBuilder().setNameFormat("load-monitoring-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy()); @Value("${hzims.equipment.emInfo.emInfoList}") @@ -196,7 +196,7 @@ public class MonitorServiceImpl implements MonitorService { // > Map> keyMap = new ConcurrentHashMap<>(); CountDownLatch countDownLatch = new CountDownLatch(limit); - pool.execute(()-> list.forEach(stations -> { + pool.execute(()-> list.forEach(stations -> { stations.forEach(stationReal -> { String[] realIdArr = stationReal.getRealId(); List realIds = Stream.of(realIdArr).collect(Collectors.toList()); @@ -222,9 +222,12 @@ public class MonitorServiceImpl implements MonitorService { }); countDownLatch.countDown(); })); - // 等待所有线程执行完成 try { - countDownLatch.await(); + // 等待所有线程执行完成(至多等待三秒,超过三秒任务认为线程发生阻塞) + boolean flag = countDownLatch.await(3,TimeUnit.SECONDS); + if(!flag){ + log.error("实时数据调度发生阻塞 : {}" , DateUtil.format(new Date(),DateUtil.PATTERN_DATETIME)); + } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); @@ -299,27 +302,36 @@ public class MonitorServiceImpl implements MonitorService { for(Map> item : limit){ // 提交线程任务 pool.submit(()->{ - item.forEach((key,value)->{ - RealStationVo realStation = new RealStationVo(); - // 设备信息 - List devices = this.monitorRealDevice(value,realTimeData); - // 设备状态 - this.deviceStatus(devices,deviceClassifyMap); - realStation.setDeviceList(devices.stream().sorted(Comparator.comparing(RealDeviceVo::getDeviceName)).collect(Collectors.toList())); - // 站点基础信息: 编码、名称、限制水位、服务类型、机构、排序 - this.stationBaseInfo(stations.stream().filter(station->key.equals(station.getCode())).findFirst(),sorts,realStation); - // 站点总功率计算 - this.stationActivePower(devices,realStation); - // 站点铃铛、数据中断状态 - this.stationStatus(aborts,bells,realStation); - realStations.add(realStation); - }); - countDownLatch.countDown(); + try{ + item.forEach((key,value)->{ + RealStationVo realStation = new RealStationVo(); + // 设备信息 + List devices = this.monitorRealDevice(value,realTimeData); + // 设备状态 + this.deviceStatus(devices,deviceClassifyMap); + realStation.setDeviceList(devices.stream().sorted(Comparator.comparing(RealDeviceVo::getDeviceName)).collect(Collectors.toList())); + // 站点基础信息: 编码、名称、限制水位、服务类型、机构、排序 + this.stationBaseInfo(stations.stream().filter(station->key.equals(station.getCode())).findFirst(),sorts,realStation); + // 站点总功率计算 + this.stationActivePower(devices,realStation); + // 站点铃铛、数据中断状态 + this.stationStatus(aborts,bells,realStation); + realStations.add(realStation); + }); + countDownLatch.countDown(); + }catch (Exception e){ + countDownLatch.countDown(); + Thread.currentThread().interrupt(); + } }); } // 等待所有线程执行完成 try { - countDownLatch.await(); + // 等待所有线程执行完成(至多等待二秒,超过二秒任务认为线程发生阻塞) + boolean flag = countDownLatch.await(2,TimeUnit.SECONDS); + if(!flag){ + log.error("集中监控数据调度发生阻塞 : {}" , DateUtil.format(new Date(),DateUtil.PATTERN_DATETIME)); + } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt();