Browse Source

fix:向量数据库同步时,超时不刷新更新时间的问题

zhongwei
luyie 4 months ago
parent
commit
e7b3676abc
  1. 3
      hzims-service/gglm-big-model/src/main/java/com/hnac/gglm/bigmodel/maintenance/mapper/VectorParamMapper.java
  2. 2
      hzims-service/gglm-big-model/src/main/java/com/hnac/gglm/bigmodel/maintenance/service/VectorParamService.java
  3. 66
      hzims-service/gglm-big-model/src/main/java/com/hnac/gglm/bigmodel/maintenance/service/impl/VectorParamServiceImpl.java

3
hzims-service/gglm-big-model/src/main/java/com/hnac/gglm/bigmodel/maintenance/mapper/VectorParamMapper.java

@ -1,5 +1,6 @@
package com.hnac.gglm.bigmodel.maintenance.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hnac.gglm.bigmodel.maintenance.entity.VectorParamEntity;
@ -7,5 +8,7 @@ import com.hnac.gglm.bigmodel.maintenance.entity.VectorParamEntity;
* @Author: ypj
* @Date: 2024/09/02 15:12
*/
@DS("hznlm")
public interface VectorParamMapper extends BaseMapper<VectorParamEntity> {
}

2
hzims-service/gglm-big-model/src/main/java/com/hnac/gglm/bigmodel/maintenance/service/VectorParamService.java

@ -19,4 +19,6 @@ public interface VectorParamService extends IService<VectorParamEntity> {
List<String> getUrlResponseKeyList(String url, String bladeToken, String hzinfoToken);
Boolean synchronization(List<Long> id ,String bladeToken, String hzinfoToken);
Boolean synchronization(Long id, String bladeToken, String hzinfoToken);
}

66
hzims-service/gglm-big-model/src/main/java/com/hnac/gglm/bigmodel/maintenance/service/impl/VectorParamServiceImpl.java

@ -12,10 +12,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hnac.gglm.bigmodel.database.service.WeaviateService;
import com.hnac.gglm.bigmodel.maintenance.vo.VectorUrlResponse;
import com.hnac.gglm.bigmodel.maintenance.entity.VectorParamEntity;
import com.hnac.gglm.bigmodel.maintenance.mapper.VectorParamMapper;
import com.hnac.gglm.bigmodel.maintenance.service.VectorParamService;
import com.hnac.gglm.bigmodel.maintenance.vo.VectorUrlResponse;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -23,6 +23,9 @@ import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author: ypj
@ -115,32 +118,47 @@ public class VectorParamServiceImpl extends ServiceImpl<VectorParamMapper, Vecto
if (null == ids && ids.isEmpty()) {
return false;
}
for (Long id : ids) {
VectorParamEntity entity = getById(id);
if (null == entity) {
return false;
}
VectorUrlResponse response = getUrlResponse(entity.getUrl(), bladeToken, hzinfoToken);
if (response != null) {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = null;
try {
rootNode = mapper.readTree(entity.getAttributeMap());
} catch (JsonProcessingException e) {
log.error("getUrlResponseKeyList error", e);
}
Map<String, String> attrMap = new HashMap<>();
Iterator<String> iterator = rootNode.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next();
attrMap.put(key, rootNode.findValue(key).textValue());
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, ids.size() > 5 ? ids.size() : 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
ids.forEach(id -> {
executor.execute(() -> {
if (synchronization(id, bladeToken, hzinfoToken)) {
log.info("synchronization vector param id:{} success", id);
} else {
log.error("synchronization vector param id:{} fail", id);
}
String tableName = entity.getTableName().replace(entity.getProjectPrefix() + "_","");
weaviateService.saveBatch(response.getOriginalData(), tableName, attrMap);
this.update(Wrappers.<VectorParamEntity>lambdaUpdate().eq(VectorParamEntity::getId, id).set(VectorParamEntity::getUpdateTime, new Date()));
});
});
executor.shutdown();
return true;
}
@Override
public Boolean synchronization(Long id, String bladeToken, String hzinfoToken) {
VectorParamEntity entity = baseMapper.selectById(id);
if (null == entity) {
return false;
}
log.info("synchronization vector param id:{}", id);
VectorUrlResponse response = getUrlResponse(entity.getUrl(), bladeToken, hzinfoToken);
if (response != null) {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = null;
try {
rootNode = mapper.readTree(entity.getAttributeMap());
} catch (JsonProcessingException e) {
log.error("getUrlResponseKeyList error", e);
}
Map<String, String> attrMap = new HashMap<>();
Iterator<String> iterator = rootNode.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next();
attrMap.put(key, rootNode.findValue(key).textValue());
}
String tableName = entity.getTableName().replace(entity.getProjectPrefix() + "_", "");
weaviateService.saveBatch(response.getOriginalData(), tableName, attrMap);
return this.update(Wrappers.<VectorParamEntity>lambdaUpdate().eq(VectorParamEntity::getId, id).set(VectorParamEntity::getUpdateTime, new Date()));
}
return true;
}
}

Loading…
Cancel
Save