diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/BigModelInvokeUrl.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/BigModelInvokeUrl.java index 5ff1f3e..70a539d 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/BigModelInvokeUrl.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/BigModelInvokeUrl.java @@ -15,4 +15,6 @@ public class BigModelInvokeUrl { private String assistantAsk; + private String assistantStatus; + } diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/ThreadPoolManager.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/ThreadPoolManager.java new file mode 100644 index 0000000..69c0d43 --- /dev/null +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/ThreadPoolManager.java @@ -0,0 +1,34 @@ +package com.hnac.hzims.bigmodel.configuration; + +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; + +/** + * @Author: huangxing + * @Date: 2024/05/06 11:58 + */ +@Component +public class ThreadPoolManager { + + @Bean + public ThreadPoolExecutor getAnswerPoolExecutor() { + // 核心线程数 + int corePoolSize = 5; + // 最大线程数 + int maximumPoolSize = 10; + // 线程空闲时的存活时间 + long keepAliveTime = 60L; + // 时间单位 + TimeUnit unit = TimeUnit.SECONDS; + // 任务队列 + BlockingQueue workQueue = new LinkedBlockingQueue<>(100); + // 线程工厂 + ThreadFactory threadFactory = Executors.defaultThreadFactory(); + // 拒绝策略 + RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); + return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler); + } + +} diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/controller/InteractiveController.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/controller/InteractiveController.java index 9c6e453..eebb4bf 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/controller/InteractiveController.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/controller/InteractiveController.java @@ -1,5 +1,6 @@ package com.hnac.hzims.bigmodel.interactive.controller; +import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; import com.hnac.hzims.bigmodel.BigModelConstants; import com.hnac.hzims.bigmodel.interactive.req.ModelFunctionReq; @@ -38,17 +39,24 @@ public class InteractiveController { @ApiOperation("提问") @ApiOperationSupport(order = 2) @GetMapping("/ask") - public R ask(@RequestParam @ApiParam("用户提出问题") String question, HttpServletRequest request) { - return interactiveService.ask(request, question); + public R ask(@RequestParam @ApiParam("用户提出问题") String question,@RequestParam @ApiParam("问答sessionId") String sessionId) { + return interactiveService.ask(question, sessionId); } @ApiOperation("站点、菜单鉴权") @ApiOperationSupport(order = 3) - @GetMapping("/authentication") + @RequestMapping(value = "/authentication",method = {RequestMethod.GET,RequestMethod.POST}) public R authentication(@RequestParam(required = false) @ApiParam("站点编号") String stationId, @RequestParam @ApiParam("用户ID") String userId, @RequestParam(required = false) @ApiParam("菜单ID") String menuId) { return R.status(interactiveService.authentication(stationId,userId,menuId)); } + @ApiOperation("获取问答sessionId") + @ApiOperationSupport(order = 4) + @GetMapping("/getSessionId") + public R getSessionId() { + return R.data(IdWorker.get32UUID()); + } + } diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/IInteractiveService.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/IInteractiveService.java index 7916509..1c718ac 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/IInteractiveService.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/IInteractiveService.java @@ -1,11 +1,13 @@ package com.hnac.hzims.bigmodel.interactive.service; import com.hnac.hzims.bigmodel.interactive.req.ModelFunctionReq; +import com.hnac.hzims.bigmodel.interactive.vo.AnswerVO; import io.swagger.annotations.ApiParam; import org.springblade.core.tool.api.R; import org.springframework.web.bind.annotation.RequestParam; import javax.servlet.http.HttpServletRequest; +import java.util.List; /** * @Author: huangxing @@ -15,7 +17,9 @@ public interface IInteractiveService { R resolve(ModelFunctionReq req); - R ask(HttpServletRequest request, String question); + R ask(String question,String sessionId); + + List getAnswerBySessionIds(String sessionIds); Boolean authentication(String stationId, String userId, String menuId); diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/impl/InteractiveServiceImpl.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/impl/InteractiveServiceImpl.java index 8ad00e6..91c6820 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/impl/InteractiveServiceImpl.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/impl/InteractiveServiceImpl.java @@ -3,6 +3,7 @@ package com.hnac.hzims.bigmodel.interactive.service.impl; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.hnac.hzims.bigmodel.configuration.BigModelInvokeUrl; import com.hnac.hzims.bigmodel.entity.FunctionEntity; @@ -10,6 +11,7 @@ import com.hnac.hzims.bigmodel.interactive.req.ModelFunctionReq; import com.hnac.hzims.bigmodel.interactive.service.IInteractiveService; import com.hnac.hzims.bigmodel.interactive.service.IJumpPageService; import com.hnac.hzims.bigmodel.function.service.IFunctionService; +import com.hnac.hzims.bigmodel.interactive.vo.AnswerVO; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,6 +30,7 @@ import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import java.util.HashMap; +import java.util.List; import java.util.Map; import static com.hnac.hzims.bigmodel.interactive.constants.FunctionConstants.*; @@ -65,9 +68,7 @@ public class InteractiveServiceImpl implements IInteractiveService { } @Override - public R ask(HttpServletRequest request, String question) { - HttpSession session = request.getSession(true); - String sessionId = session.getId(); + public R ask(String question,String sessionId) { Long userId = AuthUtil.getUserId(); //TODO 保存问题 Map params = new HashMap<>(); @@ -83,6 +84,19 @@ public class InteractiveServiceImpl implements IInteractiveService { } @Override + public List getAnswerBySessionIds(String sessionIds) { + Map params = new HashMap<>(); + params.put("ids",sessionIds); + HttpResponse response = HttpRequest.post(fdpHost + bigModelInvokeUrl.getAssistantStatus()) + .body(JSON.toJSONString(params)).execute(); + Assert.isTrue(response.getStatus() == HttpServletResponse.SC_OK && "1".equals(JSONObject.parseObject(response.body()).getString("success")), () -> { + throw new ServiceException("远程调用大模型【发起问答】接口失败!"); + }); + String data = JSONObject.parseObject(response.body()).getString("data"); + return JSONArray.parseArray(data,AnswerVO.class); + } + + @Override public Boolean authentication(String stationId, String userId, String menuId) { //TODO 鉴权逻辑完善 return true; diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/vo/AnswerVO.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/vo/AnswerVO.java new file mode 100644 index 0000000..ae34551 --- /dev/null +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/vo/AnswerVO.java @@ -0,0 +1,47 @@ +package com.hnac.hzims.bigmodel.interactive.vo; + +import com.alibaba.fastjson.annotation.JSONField; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; + +/** + * @Author: huangxing + * @Date: 2024/05/06 14:37 + */ +@Data +@ApiModel("HZLLM答案VO对象") +@EqualsAndHashCode +public class AnswerVO implements Serializable { + + @ApiModelProperty("发起问答时的随机ID") + @JSONField(name = "id") + private String sessionId; + + @ApiModelProperty("发起问答时的用户ID") + @JSONField(name = "userid") + private String userId; + + @ApiModelProperty("1代表代表正在进行问答,0代表已完成") + private Integer running; + + /** + * 正常发起一次问答,status会经历0、1、2、3、9、0的过程,如果问答需要执行多次指令,则可能会是0、1、2、3、2、3、9、0。 + * 9为大模型回复中,answer中会不断填充内容 + */ + @ApiModelProperty("1代表代表正在进行问答,0代表已完成") + private Integer status; + + @ApiModelProperty("当running为1时,用于显示状态文本") + private String text; + + @ApiModelProperty("最近一次发起的问题") + private String query; + + @ApiModelProperty("query对应的答案") + private String answer; + +} diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/schedule/InteractiveSchedule.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/schedule/InteractiveSchedule.java index 771cfd5..a9c3ba1 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/schedule/InteractiveSchedule.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/schedule/InteractiveSchedule.java @@ -1,11 +1,14 @@ package com.hnac.hzims.bigmodel.schedule; import com.alibaba.fastjson.JSON; +import com.hnac.hzims.bigmodel.interactive.service.IInteractiveService; +import com.hnac.hzims.bigmodel.interactive.vo.AnswerVO; import com.hnac.hzims.bigmodel.websocket.server.InteractiveWsServer; import com.hnac.hzims.bigmodel.websocket.service.InteractiveWsService; import com.hnac.hzims.bigmodel.websocket.sessionManager.InteractiveSessionManager; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; +import com.xxl.job.core.log.XxlJobLogger; import lombok.AllArgsConstructor; import org.springblade.core.log.exception.ServiceException; import org.springblade.core.tool.utils.Func; @@ -16,8 +19,13 @@ import org.springframework.util.Assert; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; +import java.io.IOException; +import java.util.Enumeration; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; import static com.hnac.hzims.bigmodel.schedule.XxlJobHandlerConstant.*; @@ -31,30 +39,47 @@ public class InteractiveSchedule { private final RedisTemplate redisTemplate; private final InteractiveWsService wsService; + private final IInteractiveService interactiveService; + private final ThreadPoolExecutor getAnswerPoolExecutor; + +// @XxlJob(GET_INTERACTIVE_RESULT) +// public ReturnT execute(String params) { +// String resultKey = ParamCache.getValue(GET_INTERACTIVE_RESULT); +// Set keySet = redisTemplate.keys(resultKey + "*"); +// keySet.parallelStream().forEach(key -> { +// // 根据Key获取sessionId +// List keySplits = Func.toStrList(":", key); +// String sessionId = keySplits.get(2); +// // 查询websocket是否存在连接session +// WebSocketSession session = InteractiveSessionManager.get(sessionId); +// if(session == null) { +// return; +// } +// TextMessage message = new TextMessage(JSON.toJSONString(redisTemplate.opsForValue().get(key))); +// Boolean sendResult = wsService.sendMessage(sessionId, message); +// Assert.isTrue(sendResult, () -> { +// throw new ServiceException(key + "推送消息失败,推送消息体为:" + JSON.toJSONString(redisTemplate.opsForValue().get(key))); +// }); +// redisTemplate.delete(key); +// }); +// return ReturnT.SUCCESS; +// } @XxlJob(GET_INTERACTIVE_RESULT) public ReturnT execute(String params) { - String resultKey = ParamCache.getValue(GET_INTERACTIVE_RESULT); - Set keySet = redisTemplate.keys(resultKey + "*"); - keySet.parallelStream().forEach(key -> { - //TODO 保存问题结果 - - // 根据Key获取sessionId - List keySplits = Func.toStrList(":", key); - String sessionId = keySplits.get(2); - // 查询websocket是否存在连接session - WebSocketSession session = InteractiveSessionManager.get(sessionId); - if(session == null) { - return; - } - TextMessage message = new TextMessage(JSON.toJSONString(redisTemplate.opsForValue().get(key))); - Boolean sendResult = wsService.sendMessage(sessionId, message); - Assert.isTrue(sendResult, () -> { - throw new ServiceException(key + "推送消息失败,推送消息体为:" + JSON.toJSONString(redisTemplate.opsForValue().get(key))); - }); - redisTemplate.delete(key); + List sessionIds = InteractiveSessionManager.getSessionIds(); + List AnswerList = interactiveService.getAnswerBySessionIds(sessionIds.stream().collect(Collectors.joining(","))); + AnswerList.parallelStream().forEach(answerVO -> { + CompletableFuture.runAsync(() -> { + WebSocketSession session = InteractiveSessionManager.get(answerVO.getSessionId()); + TextMessage message = new TextMessage(JSON.toJSONString(answerVO)); + try { + session.sendMessage(message); + } catch (IOException e) { + XxlJobLogger.log("消息中心推送失败,推送内容为:" + JSON.toJSONString(answerVO)); + } + }, getAnswerPoolExecutor); }); return ReturnT.SUCCESS; } - } diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/handler/InteractiveHandler.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/handler/InteractiveHandler.java index 1e4aae7..cf4a92f 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/handler/InteractiveHandler.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/handler/InteractiveHandler.java @@ -1,12 +1,22 @@ package com.hnac.hzims.bigmodel.websocket.handler; +import com.alibaba.fastjson.JSON; +import com.hnac.hzims.bigmodel.interactive.service.IInteractiveService; import com.hnac.hzims.bigmodel.websocket.sessionManager.InteractiveSessionManager; import lombok.extern.slf4j.Slf4j; +import org.springblade.core.log.exception.ServiceException; +import org.springblade.core.tool.api.R; +import org.springblade.core.tool.utils.SpringUtil; +import org.springframework.util.Assert; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + /** * @Author: huangxing * @Date: 2024/04/28 13:45 @@ -36,6 +46,8 @@ public class InteractiveHandler extends TextWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { - log.info("message handle successful!"); + IInteractiveService interactiveService = SpringUtil.getBean(IInteractiveService.class); + R askResult = interactiveService.ask(message.getPayload(), InteractiveSessionManager.getEntryBySession(session).getKey()); + log.info("message handle successful!返回结果为:"+ JSON.toJSONString(askResult)); } } diff --git a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/sessionManager/InteractiveSessionManager.java b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/sessionManager/InteractiveSessionManager.java index 6b13bbf..5e3d1a6 100644 --- a/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/sessionManager/InteractiveSessionManager.java +++ b/hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/sessionManager/InteractiveSessionManager.java @@ -1,16 +1,24 @@ package com.hnac.hzims.bigmodel.websocket.sessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springblade.core.log.exception.ServiceException; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * @Author: huangxing * @Date: 2024/04/28 13:58 */ +@Slf4j public class InteractiveSessionManager { /** ws会话池 **/ public static ConcurrentHashMap SESSION_POOL = new ConcurrentHashMap<>(); @@ -18,6 +26,30 @@ public class InteractiveSessionManager { private static final Lock lock = new ReentrantLock(); /** + * 获取sessionIds + * @return sessionIds + */ + public static List getSessionIds() { + Set> sessionEntrySet = InteractiveSessionManager.SESSION_POOL.entrySet(); + return sessionEntrySet.stream().map(entry -> entry.getKey()).collect(Collectors.toList()); + } + + /** + * 根据Session获取Entry + * @param session websocketSession + * @return Entry + */ + public static Map.Entry getEntryBySession(WebSocketSession session) { + Set> sessionEntrySet = InteractiveSessionManager.SESSION_POOL.entrySet(); + Optional> sessionIdOptional = sessionEntrySet.stream().filter(sessionEntry -> session.equals(sessionEntry.getValue())).findFirst(); + if(sessionIdOptional.isPresent()) { + log.error("当前Session Pool未查询到相关session,消息推送失败"); + throw new ServiceException("当前Session Pool未查询到相关session,消息推送失败"); + } + return sessionIdOptional.get(); + } + + /** * 添加会话 * @param sessionId 会话ID * @param session 会话对象