Browse Source

add:大模型管理

zhongwei
haungxing 9 months ago
parent
commit
300b496dbf
  1. 2
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/BigModelInvokeUrl.java
  2. 34
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/ThreadPoolManager.java
  3. 14
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/controller/InteractiveController.java
  4. 6
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/IInteractiveService.java
  5. 20
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/impl/InteractiveServiceImpl.java
  6. 47
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/vo/AnswerVO.java
  7. 65
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/schedule/InteractiveSchedule.java
  8. 14
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/handler/InteractiveHandler.java
  9. 32
      hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/sessionManager/InteractiveSessionManager.java

2
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/BigModelInvokeUrl.java

@ -15,4 +15,6 @@ public class BigModelInvokeUrl {
private String assistantAsk;
private String assistantStatus;
}

34
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/configuration/ThreadPoolManager.java

@ -0,0 +1,34 @@
package com.hnac.hzims.bigmodel.configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
* @Author: huangxing
* @Date: 2024/05/06 11:58
*/
@Component
public class ThreadPoolManager {
@Bean
public ThreadPoolExecutor getAnswerPoolExecutor() {
// 核心线程数
int corePoolSize = 5;
// 最大线程数
int maximumPoolSize = 10;
// 线程空闲时的存活时间
long keepAliveTime = 60L;
// 时间单位
TimeUnit unit = TimeUnit.SECONDS;
// 任务队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
// 线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
}
}

14
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/controller/InteractiveController.java

@ -1,5 +1,6 @@
package com.hnac.hzims.bigmodel.interactive.controller;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import com.hnac.hzims.bigmodel.BigModelConstants;
import com.hnac.hzims.bigmodel.interactive.req.ModelFunctionReq;
@ -38,17 +39,24 @@ public class InteractiveController {
@ApiOperation("提问")
@ApiOperationSupport(order = 2)
@GetMapping("/ask")
public R ask(@RequestParam @ApiParam("用户提出问题") String question, HttpServletRequest request) {
return interactiveService.ask(request, question);
public R ask(@RequestParam @ApiParam("用户提出问题") String question,@RequestParam @ApiParam("问答sessionId") String sessionId) {
return interactiveService.ask(question, sessionId);
}
@ApiOperation("站点、菜单鉴权")
@ApiOperationSupport(order = 3)
@GetMapping("/authentication")
@RequestMapping(value = "/authentication",method = {RequestMethod.GET,RequestMethod.POST})
public R authentication(@RequestParam(required = false) @ApiParam("站点编号") String stationId,
@RequestParam @ApiParam("用户ID") String userId,
@RequestParam(required = false) @ApiParam("菜单ID") String menuId) {
return R.status(interactiveService.authentication(stationId,userId,menuId));
}
@ApiOperation("获取问答sessionId")
@ApiOperationSupport(order = 4)
@GetMapping("/getSessionId")
public R<String> getSessionId() {
return R.data(IdWorker.get32UUID());
}
}

6
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/IInteractiveService.java

@ -1,11 +1,13 @@
package com.hnac.hzims.bigmodel.interactive.service;
import com.hnac.hzims.bigmodel.interactive.req.ModelFunctionReq;
import com.hnac.hzims.bigmodel.interactive.vo.AnswerVO;
import io.swagger.annotations.ApiParam;
import org.springblade.core.tool.api.R;
import org.springframework.web.bind.annotation.RequestParam;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
/**
* @Author: huangxing
@ -15,7 +17,9 @@ public interface IInteractiveService {
R resolve(ModelFunctionReq req);
R ask(HttpServletRequest request, String question);
R ask(String question,String sessionId);
List<AnswerVO> getAnswerBySessionIds(String sessionIds);
Boolean authentication(String stationId, String userId, String menuId);

20
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/service/impl/InteractiveServiceImpl.java

@ -3,6 +3,7 @@ package com.hnac.hzims.bigmodel.interactive.service.impl;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.hnac.hzims.bigmodel.configuration.BigModelInvokeUrl;
import com.hnac.hzims.bigmodel.entity.FunctionEntity;
@ -10,6 +11,7 @@ import com.hnac.hzims.bigmodel.interactive.req.ModelFunctionReq;
import com.hnac.hzims.bigmodel.interactive.service.IInteractiveService;
import com.hnac.hzims.bigmodel.interactive.service.IJumpPageService;
import com.hnac.hzims.bigmodel.function.service.IFunctionService;
import com.hnac.hzims.bigmodel.interactive.vo.AnswerVO;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -28,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.hnac.hzims.bigmodel.interactive.constants.FunctionConstants.*;
@ -65,9 +68,7 @@ public class InteractiveServiceImpl implements IInteractiveService {
}
@Override
public R ask(HttpServletRequest request, String question) {
HttpSession session = request.getSession(true);
String sessionId = session.getId();
public R ask(String question,String sessionId) {
Long userId = AuthUtil.getUserId();
//TODO 保存问题
Map<String,String> params = new HashMap<>();
@ -83,6 +84,19 @@ public class InteractiveServiceImpl implements IInteractiveService {
}
@Override
public List<AnswerVO> getAnswerBySessionIds(String sessionIds) {
Map<String,String> params = new HashMap<>();
params.put("ids",sessionIds);
HttpResponse response = HttpRequest.post(fdpHost + bigModelInvokeUrl.getAssistantStatus())
.body(JSON.toJSONString(params)).execute();
Assert.isTrue(response.getStatus() == HttpServletResponse.SC_OK && "1".equals(JSONObject.parseObject(response.body()).getString("success")), () -> {
throw new ServiceException("远程调用大模型【发起问答】接口失败!");
});
String data = JSONObject.parseObject(response.body()).getString("data");
return JSONArray.parseArray(data,AnswerVO.class);
}
@Override
public Boolean authentication(String stationId, String userId, String menuId) {
//TODO 鉴权逻辑完善
return true;

47
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/interactive/vo/AnswerVO.java

@ -0,0 +1,47 @@
package com.hnac.hzims.bigmodel.interactive.vo;
import com.alibaba.fastjson.annotation.JSONField;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* @Author: huangxing
* @Date: 2024/05/06 14:37
*/
@Data
@ApiModel("HZLLM答案VO对象")
@EqualsAndHashCode
public class AnswerVO implements Serializable {
@ApiModelProperty("发起问答时的随机ID")
@JSONField(name = "id")
private String sessionId;
@ApiModelProperty("发起问答时的用户ID")
@JSONField(name = "userid")
private String userId;
@ApiModelProperty("1代表代表正在进行问答,0代表已完成")
private Integer running;
/**
* 正常发起一次问答status会经历012390的过程如果问答需要执行多次指令则可能会是01232390
* 9为大模型回复中answer中会不断填充内容
*/
@ApiModelProperty("1代表代表正在进行问答,0代表已完成")
private Integer status;
@ApiModelProperty("当running为1时,用于显示状态文本")
private String text;
@ApiModelProperty("最近一次发起的问题")
private String query;
@ApiModelProperty("query对应的答案")
private String answer;
}

65
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/schedule/InteractiveSchedule.java

@ -1,11 +1,14 @@
package com.hnac.hzims.bigmodel.schedule;
import com.alibaba.fastjson.JSON;
import com.hnac.hzims.bigmodel.interactive.service.IInteractiveService;
import com.hnac.hzims.bigmodel.interactive.vo.AnswerVO;
import com.hnac.hzims.bigmodel.websocket.server.InteractiveWsServer;
import com.hnac.hzims.bigmodel.websocket.service.InteractiveWsService;
import com.hnac.hzims.bigmodel.websocket.sessionManager.InteractiveSessionManager;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.AllArgsConstructor;
import org.springblade.core.log.exception.ServiceException;
import org.springblade.core.tool.utils.Func;
@ -16,8 +19,13 @@ import org.springframework.util.Assert;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import static com.hnac.hzims.bigmodel.schedule.XxlJobHandlerConstant.*;
@ -31,30 +39,47 @@ public class InteractiveSchedule {
private final RedisTemplate redisTemplate;
private final InteractiveWsService wsService;
private final IInteractiveService interactiveService;
private final ThreadPoolExecutor getAnswerPoolExecutor;
// @XxlJob(GET_INTERACTIVE_RESULT)
// public ReturnT execute(String params) {
// String resultKey = ParamCache.getValue(GET_INTERACTIVE_RESULT);
// Set<String> keySet = redisTemplate.keys(resultKey + "*");
// keySet.parallelStream().forEach(key -> {
// // 根据Key获取sessionId
// List<String> keySplits = Func.toStrList(":", key);
// String sessionId = keySplits.get(2);
// // 查询websocket是否存在连接session
// WebSocketSession session = InteractiveSessionManager.get(sessionId);
// if(session == null) {
// return;
// }
// TextMessage message = new TextMessage(JSON.toJSONString(redisTemplate.opsForValue().get(key)));
// Boolean sendResult = wsService.sendMessage(sessionId, message);
// Assert.isTrue(sendResult, () -> {
// throw new ServiceException(key + "推送消息失败,推送消息体为:" + JSON.toJSONString(redisTemplate.opsForValue().get(key)));
// });
// redisTemplate.delete(key);
// });
// return ReturnT.SUCCESS;
// }
@XxlJob(GET_INTERACTIVE_RESULT)
public ReturnT execute(String params) {
String resultKey = ParamCache.getValue(GET_INTERACTIVE_RESULT);
Set<String> keySet = redisTemplate.keys(resultKey + "*");
keySet.parallelStream().forEach(key -> {
//TODO 保存问题结果
// 根据Key获取sessionId
List<String> keySplits = Func.toStrList(":", key);
String sessionId = keySplits.get(2);
// 查询websocket是否存在连接session
WebSocketSession session = InteractiveSessionManager.get(sessionId);
if(session == null) {
return;
}
TextMessage message = new TextMessage(JSON.toJSONString(redisTemplate.opsForValue().get(key)));
Boolean sendResult = wsService.sendMessage(sessionId, message);
Assert.isTrue(sendResult, () -> {
throw new ServiceException(key + "推送消息失败,推送消息体为:" + JSON.toJSONString(redisTemplate.opsForValue().get(key)));
});
redisTemplate.delete(key);
List<String> sessionIds = InteractiveSessionManager.getSessionIds();
List<AnswerVO> AnswerList = interactiveService.getAnswerBySessionIds(sessionIds.stream().collect(Collectors.joining(",")));
AnswerList.parallelStream().forEach(answerVO -> {
CompletableFuture.runAsync(() -> {
WebSocketSession session = InteractiveSessionManager.get(answerVO.getSessionId());
TextMessage message = new TextMessage(JSON.toJSONString(answerVO));
try {
session.sendMessage(message);
} catch (IOException e) {
XxlJobLogger.log("消息中心推送失败,推送内容为:" + JSON.toJSONString(answerVO));
}
}, getAnswerPoolExecutor);
});
return ReturnT.SUCCESS;
}
}

14
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/handler/InteractiveHandler.java

@ -1,12 +1,22 @@
package com.hnac.hzims.bigmodel.websocket.handler;
import com.alibaba.fastjson.JSON;
import com.hnac.hzims.bigmodel.interactive.service.IInteractiveService;
import com.hnac.hzims.bigmodel.websocket.sessionManager.InteractiveSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.log.exception.ServiceException;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.SpringUtil;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* @Author: huangxing
* @Date: 2024/04/28 13:45
@ -36,6 +46,8 @@ public class InteractiveHandler extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
log.info("message handle successful!");
IInteractiveService interactiveService = SpringUtil.getBean(IInteractiveService.class);
R askResult = interactiveService.ask(message.getPayload(), InteractiveSessionManager.getEntryBySession(session).getKey());
log.info("message handle successful!返回结果为:"+ JSON.toJSONString(askResult));
}
}

32
hzims-service/hzims-big-model/src/main/java/com/hnac/hzims/bigmodel/websocket/sessionManager/InteractiveSessionManager.java

@ -1,16 +1,24 @@
package com.hnac.hzims.bigmodel.websocket.sessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.log.exception.ServiceException;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* @Author: huangxing
* @Date: 2024/04/28 13:58
*/
@Slf4j
public class InteractiveSessionManager {
/** ws会话池 **/
public static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
@ -18,6 +26,30 @@ public class InteractiveSessionManager {
private static final Lock lock = new ReentrantLock();
/**
* 获取sessionIds
* @return sessionIds
*/
public static List<String> getSessionIds() {
Set<Map.Entry<String, WebSocketSession>> sessionEntrySet = InteractiveSessionManager.SESSION_POOL.entrySet();
return sessionEntrySet.stream().map(entry -> entry.getKey()).collect(Collectors.toList());
}
/**
* 根据Session获取Entry
* @param session websocketSession
* @return Entry
*/
public static Map.Entry<String, WebSocketSession> getEntryBySession(WebSocketSession session) {
Set<Map.Entry<String, WebSocketSession>> sessionEntrySet = InteractiveSessionManager.SESSION_POOL.entrySet();
Optional<Map.Entry<String, WebSocketSession>> sessionIdOptional = sessionEntrySet.stream().filter(sessionEntry -> session.equals(sessionEntry.getValue())).findFirst();
if(sessionIdOptional.isPresent()) {
log.error("当前Session Pool未查询到相关session,消息推送失败");
throw new ServiceException("当前Session Pool未查询到相关session,消息推送失败");
}
return sessionIdOptional.get();
}
/**
* 添加会话
* @param sessionId 会话ID
* @param session 会话对象

Loading…
Cancel
Save