diff --git a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ThreadConfig.java b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ThreadConfig.java new file mode 100644 index 0000000..a9877fe --- /dev/null +++ b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/config/ThreadConfig.java @@ -0,0 +1,36 @@ +package com.hnac.hzims.operational.config; + +import com.hnac.hzims.operational.propperties.ThreadPoolConfigProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @Author WL + * @Version v1.0 + * @Serial 1.0 + * @Date 2023/5/10 12:49 + */ +@Configuration +public class ThreadConfig { + + @Autowired + private ThreadPoolConfigProperties threadPoolConfigProperties; + + @Bean + public ThreadPoolExecutor threadPoolExecutor() { + return new ThreadPoolExecutor(threadPoolConfigProperties.getCorePoolSize(), + threadPoolConfigProperties.getMaxSize(), + threadPoolConfigProperties.getKeepAliveTime(), + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(threadPoolConfigProperties.getCapacity()), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + } + +} diff --git a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/maintenance/service/impl/MaintenanceServiceImpl.java b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/maintenance/service/impl/MaintenanceServiceImpl.java index ffcd825..12f48bb 100644 --- a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/maintenance/service/impl/MaintenanceServiceImpl.java +++ b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/maintenance/service/impl/MaintenanceServiceImpl.java @@ -37,8 +37,12 @@ import org.springblade.system.feign.ISysClient; import org.springblade.system.user.cache.UserCache; import org.springblade.system.user.entity.User; import org.springframework.beans.BeanUtils; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.DefaultTransactionDefinition; import java.time.Instant; import java.time.LocalDate; @@ -46,6 +50,9 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -71,41 +78,43 @@ public class MaintenanceServiceImpl implements MaintenanceService { private final IFlowClient flowClient; private final ISysClient sysClient; + private final ThreadPoolExecutor executor; + + + private final DataSourceTransactionManager dataSourceTransactionManager; + /** * 日常维护生成任务 * * @param ids */ @Override - @Transactional(rollbackFor = Exception.class) public void createTask(List ids) { - //获取日常维护计划 - LambdaQueryWrapper planEntityLambdaQueryWrapper = new LambdaQueryWrapper<>(); - planEntityLambdaQueryWrapper.in(BaseEntity::getId, ids); - List planEntities = maintenancePlanService - .list(planEntityLambdaQueryWrapper); - //筛选当月没生成任务的计划 - List finalPlanEntities = planEntities.stream().filter(planEntity -> - ObjectUtil.isEmpty(planEntity.getCreateTaskTime()) - || !DateUtil.judgeSameDay( - DateUtil.DateToLocalDateTime(planEntity.getCreateTaskTime()), LocalDate.now()) - ).collect(Collectors.toList()); - if (CollectionUtil.isEmpty(finalPlanEntities)) { - throw new ServiceException("所选计划当天已生成任务"); - } - for (OperMaintenancePlanEntity finalPlanEntity : finalPlanEntities) { - // 通过计划生成任务 - this.createTaskByPlan(finalPlanEntity); - //更新任务派发时间 - LambdaUpdateWrapper planEntityLambdaUpdateWrapper = - new LambdaUpdateWrapper<>(); - planEntityLambdaUpdateWrapper.set(OperMaintenancePlanEntity::getCreateTaskTime, new Date()); - planEntityLambdaUpdateWrapper.eq(OperMaintenancePlanEntity::getId, finalPlanEntity.getId()); - boolean update = maintenancePlanService.update(planEntityLambdaUpdateWrapper); - if (!update) { - log.error("maintenance:generateTask 更新任务派发失败"); - throw new ServiceException("更新任务派发失败"); + try { + //获取日常维护计划 + LambdaQueryWrapper planEntityLambdaQueryWrapper = new LambdaQueryWrapper<>(); + planEntityLambdaQueryWrapper.in(BaseEntity::getId, ids); + List planEntities = maintenancePlanService.list(planEntityLambdaQueryWrapper); + //筛选当月没生成任务的计划 + List finalPlanEntities = planEntities.stream().filter(planEntity -> ObjectUtil.isEmpty(planEntity.getCreateTaskTime()) || !DateUtil.judgeSameDay(DateUtil.DateToLocalDateTime(planEntity.getCreateTaskTime()), LocalDate.now())).collect(Collectors.toList()); + if (CollectionUtil.isEmpty(finalPlanEntities)) { + throw new ServiceException("所选计划当天已生成任务"); + } + for (OperMaintenancePlanEntity finalPlanEntity : finalPlanEntities) { + // 通过计划生成任务 + this.createTaskByPlan(finalPlanEntity); + //更新任务派发时间 + LambdaUpdateWrapper planEntityLambdaUpdateWrapper = new LambdaUpdateWrapper<>(); + planEntityLambdaUpdateWrapper.set(OperMaintenancePlanEntity::getCreateTaskTime, new Date()); + planEntityLambdaUpdateWrapper.eq(OperMaintenancePlanEntity::getId, finalPlanEntity.getId()); + boolean update = maintenancePlanService.update(planEntityLambdaUpdateWrapper); + if (!update) { + log.error("maintenance:generateTask 更新任务派发失败"); + throw new ServiceException("更新任务派发失败"); + } } + } catch (Exception e) { + e.printStackTrace(); } } @@ -117,107 +126,108 @@ public class MaintenanceServiceImpl implements MaintenanceService { @Override @Transactional(rollbackFor = RuntimeException.class) public void findPending(ProcessWorkFlowResponse response) { - log.info("获取businessKey: {}", response.getBusinessKey()); - log.info("获取taskId: {} ", response.getTaskId()); - log.info("获取下一个审批人是: {} ", response.getNextStepOperator()); - log.info("获取下一个用户Id是: {} ", response.getUserId()); - log.info("获取当前任务名称是: {} ", response.getTaskName()); - log.info("获取根据handleType区分是用户还是候选组角色: {}", response.getHandleType()); - //json转换表单 - String formData = JSON.toJSONString(response.getVariables()); - log.info("获取表单的数据:{}", formData); - OperMaintenanceTaskEntityVo standardTicketInfoVo = null; try { - JSONObject jsonObject = JSONObject.parseObject(formData); - standardTicketInfoVo = JSONObject.parseObject(jsonObject.getString("operMaintenanceTaskEntityVo"), - new TypeReference(){}); - // standardTicketInfoVo = (OperMaintenanceTaskEntityVo) jsonObject.get("operMaintenanceTaskEntityVo"); - // standardTicketInfoVo = JSONObject.toJavaObject(jsonObject, OperMaintenanceTaskEntityVo.class); - } catch (Exception e) { - log.error("获取表单出现异常了~~~~"); - throw new IllegalArgumentException(e.getMessage()); - } - //1.查询日常维护信息 - Long id = NumberUtils.toLong(response.getBusinessKey()); - OperMaintenanceTaskEntity dbOperMaintenanceTaskEntity = taskService.getById(id); - if (ObjectUtils.isEmpty(dbOperMaintenanceTaskEntity)) { - log.error("获取日常维护数据不存在"); - return; - } - OperMaintenanceTaskEntity entity = new OperMaintenanceTaskEntity(); - BeanUtils.copyProperties(standardTicketInfoVo, entity); - entity.setId(id); - //填充日常维护信息 - saveOperMaintenanceTaskEntity(entity, response); - entity.setProcessInstanceId(response.getProcessInstanceId()); - taskService.updateById(entity); - - //推送消息 - if (response.getTaskId() != null) { - // MessagePushRecordDto message = new MessagePushRecordDto(); - // message.setBusinessClassify("business"); - // message.setBusinessKey(MessageConstants.BusinessClassifyEnum.OPERATIONTICKETMESSAGE.getKey()); - // message.setSubject(MessageConstants.BusinessClassifyEnum.OPERATIONTICKETMESSAGE.getDescription()); - // message.setTaskId(entity.getId()); - // message.setTenantId("200000"); - // message.setTypes(Arrays.asList(MessageConstants.APP_PUSH, MessageConstants.WS_PUSH)); - // message.setPushType(MessageConstants.IMMEDIATELY); - // //您有一张工作票待审批,工作内容:*****,审批环节:*****; - // String countent = - // "您有一条日常维护任务待审批,工作内容:".concat(entity.getTitle()) - // .concat(",审批环节:") - // .concat(response.getTaskName()); - // message.setContent(countent); - // message.setDeptId(entity.getCreateDept()); - // R deptName = sysClient.getDeptName(entity.getCreateDept()); - // if (deptName.isSuccess()) { - // message.setDeptName(deptName.getData()); - // } - // String userIds = response.getUserId(); - // if (com.hnac.hzims.common.logs.utils.StringUtils.isBlank(userIds)) { - // log.error("推送的消息不能为空哦,{}", userIds); - // return; - // } - // String[] split = userIds.split(","); - // for (String userId : split) { - // message.setPusher(userId); - // User user = UserCache.getUser(NumberUtils.toLong(userId)); - // if (ObjectUtils.isNotEmpty(user)) { - // message.setPusherName(user.getName()); - // } - // message.setAccount(userId); - // message.setCreateUser(NumberUtils.toLong(userId)); - // messageClient.sendMessage(message); - // } - BusinessMessageDTO businessMessageDTO = new BusinessMessageDTO(); - businessMessageDTO.setBusinessClassify("business"); - businessMessageDTO.setBusinessKey(MessageConstants.BusinessClassifyEnum.ROUTINEMAINTENANCE.getKey()); - businessMessageDTO.setSubject(MessageConstants.BusinessClassifyEnum.ROUTINEMAINTENANCE.getDescription()); - businessMessageDTO.setTaskId(dbOperMaintenanceTaskEntity.getId()); - businessMessageDTO.setTenantId("200000"); - //您有一张工作票待审批,工作内容:*****,审批环节:*****; - String countent = - "您有一条日常维护任务待审批,工作内容:".concat(dbOperMaintenanceTaskEntity.getTitle()) - .concat(",审批环节:") - .concat(response.getTaskName()); - businessMessageDTO.setContent(countent); - businessMessageDTO.setDeptId(dbOperMaintenanceTaskEntity.getCreateDept()); - R deptName = sysClient.getDeptName(dbOperMaintenanceTaskEntity.getCreateDept()); - if (deptName.isSuccess()) { - businessMessageDTO.setDeptName(deptName.getData()); + log.info("获取businessKey: {}", response.getBusinessKey()); + log.info("获取taskId: {} ", response.getTaskId()); + log.info("获取下一个审批人是: {} ", response.getNextStepOperator()); + log.info("获取下一个用户Id是: {} ", response.getUserId()); + log.info("获取当前任务名称是: {} ", response.getTaskName()); + log.info("获取根据handleType区分是用户还是候选组角色: {}", response.getHandleType()); + //json转换表单 + String formData = JSON.toJSONString(response.getVariables()); + log.info("获取表单的数据:{}", formData); + OperMaintenanceTaskEntityVo standardTicketInfoVo = null; + try { + JSONObject jsonObject = JSONObject.parseObject(formData); + standardTicketInfoVo = JSONObject.parseObject(jsonObject.getString("operMaintenanceTaskEntityVo"), new TypeReference() { + }); + // standardTicketInfoVo = (OperMaintenanceTaskEntityVo) jsonObject.get("operMaintenanceTaskEntityVo"); + // standardTicketInfoVo = JSONObject.toJavaObject(jsonObject, OperMaintenanceTaskEntityVo.class); + } catch (Exception e) { + log.error("获取表单出现异常了~~~~"); + throw new IllegalArgumentException(e.getMessage()); + } + //1.查询日常维护信息 + Long id = NumberUtils.toLong(response.getBusinessKey()); + OperMaintenanceTaskEntity dbOperMaintenanceTaskEntity = taskService.getById(id); + if (ObjectUtils.isEmpty(dbOperMaintenanceTaskEntity)) { + log.error("获取日常维护数据不存在"); + return; } - String userIds = response.getUserId(); - businessMessageDTO.setUserIds(userIds); - businessMessageDTO.setCreateUser(dbOperMaintenanceTaskEntity.getCreateUser()); - - log.info("================================================"); - log.info("businessMessageDTO = " + businessMessageDTO); - log.info("================================================"); - R booleanR = messageClient.sendAppAndWsMsgByUsers(businessMessageDTO); - if (!booleanR.isSuccess()) { - throw new ServiceException("消息推送失败"); + OperMaintenanceTaskEntity entity = new OperMaintenanceTaskEntity(); + BeanUtils.copyProperties(standardTicketInfoVo, entity); + entity.setId(id); + //填充日常维护信息 + saveOperMaintenanceTaskEntity(entity, response); + entity.setProcessInstanceId(response.getProcessInstanceId()); + taskService.updateById(entity); + + //推送消息 + if (response.getTaskId() != null) { + // MessagePushRecordDto message = new MessagePushRecordDto(); + // message.setBusinessClassify("business"); + // message.setBusinessKey(MessageConstants.BusinessClassifyEnum.OPERATIONTICKETMESSAGE.getKey()); + // message.setSubject(MessageConstants.BusinessClassifyEnum.OPERATIONTICKETMESSAGE.getDescription()); + // message.setTaskId(entity.getId()); + // message.setTenantId("200000"); + // message.setTypes(Arrays.asList(MessageConstants.APP_PUSH, MessageConstants.WS_PUSH)); + // message.setPushType(MessageConstants.IMMEDIATELY); + // //您有一张工作票待审批,工作内容:*****,审批环节:*****; + // String countent = + // "您有一条日常维护任务待审批,工作内容:".concat(entity.getTitle()) + // .concat(",审批环节:") + // .concat(response.getTaskName()); + // message.setContent(countent); + // message.setDeptId(entity.getCreateDept()); + // R deptName = sysClient.getDeptName(entity.getCreateDept()); + // if (deptName.isSuccess()) { + // message.setDeptName(deptName.getData()); + // } + // String userIds = response.getUserId(); + // if (com.hnac.hzims.common.logs.utils.StringUtils.isBlank(userIds)) { + // log.error("推送的消息不能为空哦,{}", userIds); + // return; + // } + // String[] split = userIds.split(","); + // for (String userId : split) { + // message.setPusher(userId); + // User user = UserCache.getUser(NumberUtils.toLong(userId)); + // if (ObjectUtils.isNotEmpty(user)) { + // message.setPusherName(user.getName()); + // } + // message.setAccount(userId); + // message.setCreateUser(NumberUtils.toLong(userId)); + // messageClient.sendMessage(message); + // } + BusinessMessageDTO businessMessageDTO = new BusinessMessageDTO(); + businessMessageDTO.setBusinessClassify("business"); + businessMessageDTO.setBusinessKey(MessageConstants.BusinessClassifyEnum.ROUTINEMAINTENANCE.getKey()); + businessMessageDTO.setSubject(MessageConstants.BusinessClassifyEnum.ROUTINEMAINTENANCE.getDescription()); + businessMessageDTO.setTaskId(dbOperMaintenanceTaskEntity.getId()); + businessMessageDTO.setTenantId("200000"); + //您有一张工作票待审批,工作内容:*****,审批环节:*****; + String countent = "您有一条日常维护任务待审批,工作内容:".concat(dbOperMaintenanceTaskEntity.getTitle()).concat(",审批环节:").concat(response.getTaskName()); + businessMessageDTO.setContent(countent); + businessMessageDTO.setDeptId(dbOperMaintenanceTaskEntity.getCreateDept()); + R deptName = sysClient.getDeptName(dbOperMaintenanceTaskEntity.getCreateDept()); + if (deptName.isSuccess()) { + businessMessageDTO.setDeptName(deptName.getData()); + } + String userIds = response.getUserId(); + businessMessageDTO.setUserIds(userIds); + businessMessageDTO.setCreateUser(dbOperMaintenanceTaskEntity.getCreateUser()); + + log.info("================================================"); + log.info("businessMessageDTO = " + businessMessageDTO); + log.info("================================================"); + R booleanR = messageClient.sendAppAndWsMsgByUsers(businessMessageDTO); + if (!booleanR.isSuccess()) { + throw new ServiceException("消息推送失败"); + } + log.info("推送成功~"); } - log.info("推送成功~"); + } catch (Exception e) { + e.printStackTrace(); } } @@ -281,6 +291,9 @@ public class MaintenanceServiceImpl implements MaintenanceService { * @param finalPlanEntity */ private void fillTask(OperMaintenanceTaskEntity taskEntity, OperMaintenancePlanEntity finalPlanEntity) { + DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition(); + defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition); taskEntity.setId(null); taskEntity.setCreateTime(new Date()); taskEntity.setUpdateTime(new Date()); @@ -299,8 +312,7 @@ public class MaintenanceServiceImpl implements MaintenanceService { if (ObjectUtil.isNotEmpty(finalPlanEntity.getMaintenanceModel()) && finalPlanEntity.getMaintenanceModel() == 2) { //查询值班信息 LambdaQueryWrapper entityLambdaQueryWrapper = new LambdaQueryWrapper<>(); - entityLambdaQueryWrapper.eq(ImsDutyMainEntity::getDutyDate, - DateTimeFormatter.ofPattern(PATTERN_DATE).format(disposeTime)); + entityLambdaQueryWrapper.eq(ImsDutyMainEntity::getDutyDate, DateTimeFormatter.ofPattern(PATTERN_DATE).format(disposeTime)); entityLambdaQueryWrapper.eq(ImsDutyMainEntity::getClassId, finalPlanEntity.getImsDutyClassId()); ImsDutyMainEntity entity = mainService.getOne(entityLambdaQueryWrapper); if (ObjectUtil.isNotEmpty(entity) && StringUtils.isNotEmpty(entity.getDutyPersonIds())) { @@ -312,16 +324,40 @@ public class MaintenanceServiceImpl implements MaintenanceService { } } taskService.save(taskEntity); - //生成工作流实例 - String processInstanceId = this.startProcess(finalPlanEntity.getProcDefId(), taskEntity); - taskEntity.setProcessInstanceId(processInstanceId); - taskService.updateById(taskEntity); - finalPlanEntity.setCreateTaskTime(new Date()); - maintenancePlanService.updateById(finalPlanEntity); + dataSourceTransactionManager.commit(transaction); + + OperMaintenanceTaskEntity[] finalTaskEntity = {taskEntity}; + + CompletableFuture processInstanceIdCompletableFuture = CompletableFuture.supplyAsync(() -> { + //生成工作流实例 + String processInstanceId = this.startProcess(finalPlanEntity.getProcDefId(), finalTaskEntity[0]); + return processInstanceId; + }, executor); + + //启动流程 + CompletableFuture operMaintenanceTaskEntityCompletableFuture = processInstanceIdCompletableFuture.thenApplyAsync((processInstanceId) -> { + finalTaskEntity[0].setProcessInstanceId(processInstanceId); + taskService.updateById(finalTaskEntity[0]); + finalPlanEntity.setCreateTaskTime(new Date()); + maintenancePlanService.updateById(finalPlanEntity); + finalTaskEntity[0] = taskService.getById(finalTaskEntity[0].getId()); + return finalTaskEntity[0]; + }, executor); - taskEntity = taskService.getById(taskEntity.getId()); - //推送消息 - this.pushTaskMessage(taskEntity); + + CompletableFuture thennedAcceptAsync = operMaintenanceTaskEntityCompletableFuture.thenAcceptAsync((task) -> { + //推送消息 + this.pushTaskMessage(task); + }, executor); + + + //启动队列 + try { + CompletableFuture.allOf(processInstanceIdCompletableFuture, operMaintenanceTaskEntityCompletableFuture, + thennedAcceptAsync).get(); + } catch (Exception e) { + e.printStackTrace(); + } } @@ -343,10 +379,7 @@ public class MaintenanceServiceImpl implements MaintenanceService { params.put("initUserIds", taskUsers); params.put("operMaintenanceTaskEntityVo", operMaintenanceTaskEntityVo); - return flowClient.startProcessInstanceContainNameByKey(processDefinitionKey, - String.valueOf(taskEntity.getId()), taskEntity.getTitle(), params) - .getData() - .getProcessInstanceId(); + return flowClient.startProcessInstanceContainNameByKey(processDefinitionKey, String.valueOf(taskEntity.getId()), taskEntity.getTitle(), params).getData().getProcessInstanceId(); } diff --git a/hzims-service/operational/src/main/java/com/hnac/hzims/operational/propperties/ThreadPoolConfigProperties.java b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/propperties/ThreadPoolConfigProperties.java new file mode 100644 index 0000000..7a949bc --- /dev/null +++ b/hzims-service/operational/src/main/java/com/hnac/hzims/operational/propperties/ThreadPoolConfigProperties.java @@ -0,0 +1,32 @@ +package com.hnac.hzims.operational.propperties; + +import lombok.Data; +import org.springframework.context.annotation.Configuration; + +/** + * + * @Author WL + * @Version v1.0 + * @Serial 1.0 + * @Date 2023/5/10 12:51 + */ +@Configuration +@Data +public class ThreadPoolConfigProperties { + /** + * 核心数 + */ + private Integer corePoolSize = 20; + /** + * 最大数 + */ + private Integer maxSize = 200; + /** + * 存活时间 + */ + private Integer keepAliveTime = 20; + /** + * 容量 + */ + private Integer capacity = 10000; +} \ No newline at end of file