Browse Source

fix:优化工作流消息消费逻辑

zhongwei
haungxing 9 months ago
parent
commit
57389e7be2
  1. 65
      hzims-service/hzims-middle/src/main/java/com/hnac/hzims/middle/processflow/consumer/StandardWorkTicketConsumer.java
  2. 9
      hzims-service/hzims-middle/src/main/java/com/hnac/hzims/middle/processflow/entity/WorkflowOperationLog.java
  3. 3
      hzims-service/hzims-middle/src/main/java/com/hnac/hzims/middle/processflow/strategy/entity/WorkflowQueue.java

65
hzims-service/hzims-middle/src/main/java/com/hnac/hzims/middle/processflow/consumer/StandardWorkTicketConsumer.java

@ -9,13 +9,18 @@ import com.hnac.hzims.middle.processflow.service.WorkflowOperationLogService;
import com.hnac.hzims.middle.processflow.strategy.entity.WorkflowQueue;
import com.hnac.hzims.middle.processflow.strategy.service.ProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.log.exception.ServiceException;
import org.springblade.core.tool.utils.BeanUtil;
import org.springblade.core.tool.utils.Func;
import org.springblade.queue.annotation.RedisQueue;
import org.springblade.queue.consume.IQueueConsume;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.List;
import java.util.Optional;
/**
* 监听数据的
@ -30,7 +35,6 @@ import java.util.List;
@Slf4j
public class StandardWorkTicketConsumer implements IQueueConsume {
@Autowired
private List<ProcessService> ticketServiceList;
@ -39,42 +43,39 @@ public class StandardWorkTicketConsumer implements IQueueConsume {
@Override
public void handlerMessage(String message) {
log.info("监听到数据:{}", message);
ProcessWorkFlowResponse response = JSONObject.parseObject(message, ProcessWorkFlowResponse.class);
String taskDefinitionKey = response.getProcessDefinitionKey();
//记录操作日志
WorkflowOperationLog regularWorkflowOperationLog = new WorkflowOperationLog();
BeanUtils.copyProperties(response, regularWorkflowOperationLog);
if (ObjectUtils.isNotEmpty(regularWorkflowOperationLog)) {
regularWorkflowOperationLog.setVariables(JSON.toJSONString(response.getVariables()));
regularWorkflowOperationLog.setTakeId(response.getTaskId());
regularWorkflowOperationLog.setTakeName(response.getTaskName());
regularWorkflowOperationLog.setIsOperationLog(true);
regularWorkflowOperationLog.setProcessInstanceKey(response.getProcessInstanceId());
log.info("正在记录操作日志WorkflowOperationLog.... :{}", regularWorkflowOperationLog);
}
try {
WorkflowQueue ticker = new WorkflowQueue();
ticker.setProcessDefinitionKey(taskDefinitionKey);
//判断执行具体的实现类
ProcessService processService =
ticketServiceList.stream().filter(item -> item.isWorkflowProcess(ticker)).findFirst().orElse(null);
if (ObjectUtils.isNotEmpty(processService)) {
//执行业务方法
ProcessWorkFlowResponse response = JSONObject.parseObject(message, ProcessWorkFlowResponse.class);
Assert.isTrue(Func.isNotEmpty(response),() -> {
throw new ServiceException("消息转换失败,消息内容为:" + message);
});
// 保存日志
WorkflowOperationLog log = BeanUtil.copy(response, WorkflowOperationLog.class);
log.setVariables(JSON.toJSONString(response.getVariables()));
log.setIsOperationLog(true);
log.setProcessInstanceKey(response.getProcessInstanceId());
workflowOperationLogService.save(log);
WorkflowQueue queue = WorkflowQueue.builder().processDefinitionKey(response.getProcessDefinitionKey()).build();
Optional<ProcessService> serviceOptional = ticketServiceList.stream().filter(processService -> processService.isWorkflowProcess(queue)).findFirst();
if(serviceOptional.isPresent()) {
try {
processService.calculate(response);
} catch (Exception e) {
serviceOptional.get().calculate(response);
}
catch (Exception e) {
log.setFaultLog(e.getMessage());
log.setIsOperationLog(false);
workflowOperationLogService.updateById(log);
e.printStackTrace();
//todo 业务出错误 做补偿
log.error("业务出错,StandardWorkTicketConsumer: {}", e.getMessage());
log.error("业务出错,StandardWorkTicketConsumer: {} ", response);
regularWorkflowOperationLog.setIsOperationLog(false);//调用消费方抛出异常
}
}
} finally {
workflowOperationLogService.save(regularWorkflowOperationLog);
else {
log.setFaultLog("未获取到相关执行方法,key值为:" + response.getProcessDefinitionKey());
log.setIsOperationLog(false);
workflowOperationLogService.updateById(log);
}
}
catch(Exception e) {
log.error("消息消费失败,消息体为:{}",JSON.toJSONString(message));
e.printStackTrace();
}
}
}

9
hzims-service/hzims-middle/src/main/java/com/hnac/hzims/middle/processflow/entity/WorkflowOperationLog.java

@ -4,6 +4,8 @@ import com.baomidou.mybatisplus.annotation.*;
import java.time.LocalDateTime;
import java.io.Serializable;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
@ -92,10 +94,11 @@ public class WorkflowOperationLog implements Serializable {
private Boolean Deleted;
/**
* 记录正常日志 或错误日志
*/
@ApiModelProperty("是否消费成功")
private Boolean isOperationLog;
@ApiModelProperty("错误日志")
private String faultLog;
}

3
hzims-service/hzims-middle/src/main/java/com/hnac/hzims/middle/processflow/strategy/entity/WorkflowQueue.java

@ -1,5 +1,6 @@
package com.hnac.hzims.middle.processflow.strategy.entity;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.Accessors;
@ -10,7 +11,7 @@ import lombok.experimental.Accessors;
* @Date 2023/3/27 16:49
*/
@Data
@Accessors(chain = true)
@Builder
public class WorkflowQueue {
/**

Loading…
Cancel
Save