Browse Source

#定时任务线程优化

zhongwei
yang_shj 6 months ago
parent
commit
e71ee9a69f
  1. 54
      hzims-service/hzims-scheduled/src/main/java/com/hnac/hzims/scheduled/service/operation/monitor/impl/MonitorServiceImpl.java

54
hzims-service/hzims-scheduled/src/main/java/com/hnac/hzims/scheduled/service/operation/monitor/impl/MonitorServiceImpl.java

@ -78,7 +78,7 @@ public class MonitorServiceImpl implements MonitorService {
private final IAnalyseDataSearchClient analyseDataSearchClient; private final IAnalyseDataSearchClient analyseDataSearchClient;
private static final ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("load-monitoring-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ExecutorService pool = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(256), new ThreadFactoryBuilder().setNameFormat("load-monitoring-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
@Value("${hzims.equipment.emInfo.emInfoList}") @Value("${hzims.equipment.emInfo.emInfoList}")
@ -196,7 +196,7 @@ public class MonitorServiceImpl implements MonitorService {
// <real,<attribute,value>> // <real,<attribute,value>>
Map<String,Map<String,String>> keyMap = new ConcurrentHashMap<>(); Map<String,Map<String,String>> keyMap = new ConcurrentHashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(limit); CountDownLatch countDownLatch = new CountDownLatch(limit);
pool.execute(()-> list.forEach(stations -> { pool.execute(()-> list.forEach(stations -> {
stations.forEach(stationReal -> { stations.forEach(stationReal -> {
String[] realIdArr = stationReal.getRealId(); String[] realIdArr = stationReal.getRealId();
List<String> realIds = Stream.of(realIdArr).collect(Collectors.toList()); List<String> realIds = Stream.of(realIdArr).collect(Collectors.toList());
@ -222,9 +222,12 @@ public class MonitorServiceImpl implements MonitorService {
}); });
countDownLatch.countDown(); countDownLatch.countDown();
})); }));
// 等待所有线程执行完成
try { try {
countDownLatch.await(); // 等待所有线程执行完成(至多等待三秒,超过三秒任务认为线程发生阻塞)
boolean flag = countDownLatch.await(3,TimeUnit.SECONDS);
if(!flag){
log.error("实时数据调度发生阻塞 : {}" , DateUtil.format(new Date(),DateUtil.PATTERN_DATETIME));
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -299,27 +302,36 @@ public class MonitorServiceImpl implements MonitorService {
for(Map<String, List<StationAttributeEntity>> item : limit){ for(Map<String, List<StationAttributeEntity>> item : limit){
// 提交线程任务 // 提交线程任务
pool.submit(()->{ pool.submit(()->{
item.forEach((key,value)->{ try{
RealStationVo realStation = new RealStationVo(); item.forEach((key,value)->{
// 设备信息 RealStationVo realStation = new RealStationVo();
List<RealDeviceVo> devices = this.monitorRealDevice(value,realTimeData); // 设备信息
// 设备状态 List<RealDeviceVo> devices = this.monitorRealDevice(value,realTimeData);
this.deviceStatus(devices,deviceClassifyMap); // 设备状态
realStation.setDeviceList(devices.stream().sorted(Comparator.comparing(RealDeviceVo::getDeviceName)).collect(Collectors.toList())); this.deviceStatus(devices,deviceClassifyMap);
// 站点基础信息: 编码、名称、限制水位、服务类型、机构、排序 realStation.setDeviceList(devices.stream().sorted(Comparator.comparing(RealDeviceVo::getDeviceName)).collect(Collectors.toList()));
this.stationBaseInfo(stations.stream().filter(station->key.equals(station.getCode())).findFirst(),sorts,realStation); // 站点基础信息: 编码、名称、限制水位、服务类型、机构、排序
// 站点总功率计算 this.stationBaseInfo(stations.stream().filter(station->key.equals(station.getCode())).findFirst(),sorts,realStation);
this.stationActivePower(devices,realStation); // 站点总功率计算
// 站点铃铛、数据中断状态 this.stationActivePower(devices,realStation);
this.stationStatus(aborts,bells,realStation); // 站点铃铛、数据中断状态
realStations.add(realStation); this.stationStatus(aborts,bells,realStation);
}); realStations.add(realStation);
countDownLatch.countDown(); });
countDownLatch.countDown();
}catch (Exception e){
countDownLatch.countDown();
Thread.currentThread().interrupt();
}
}); });
} }
// 等待所有线程执行完成 // 等待所有线程执行完成
try { try {
countDownLatch.await(); // 等待所有线程执行完成(至多等待二秒,超过二秒任务认为线程发生阻塞)
boolean flag = countDownLatch.await(2,TimeUnit.SECONDS);
if(!flag){
log.error("集中监控数据调度发生阻塞 : {}" , DateUtil.format(new Date(),DateUtil.PATTERN_DATETIME));
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

Loading…
Cancel
Save