增加字段和模型本地缓存

This commit is contained in:
2025-02-12 14:03:24 +08:00
parent 554286c88c
commit 58204692a6
9 changed files with 302 additions and 2 deletions

View File

@@ -0,0 +1,182 @@
package com.fibo.ddp.common.service.cache;
import com.fibo.ddp.common.dao.datax.datamanage.FieldMapper;
import com.fibo.ddp.common.dao.strategyx.aimodel.MachineLearningModelsMapper;
import com.fibo.ddp.common.model.datax.datamanage.Field;
import com.fibo.ddp.common.model.strategyx.aimodel.MachineLearningModels;
import com.fibo.ddp.common.model.strategyx.aimodel.ModelDTO;
import com.fibo.ddp.common.service.common.CommUtil;
import com.fibo.ddp.common.service.strategyx.aimodel.PMMLExecutor.PMMLExecutor;
import org.jpmml.evaluator.Evaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class LocalCacheService {
private static final Logger logger = LoggerFactory.getLogger(LocalCacheService.class);
//
private static final long ONE_SECOND_MS = 1000L;
// 10s
private static final long UPDATE_SLEEP_TIME_MS = 10 * ONE_SECOND_MS;
//
private final AtomicLong runCount = new AtomicLong(0);
// key is id
private volatile Map<String, Field> fieldsMap = new ConcurrentHashMap<>();
private volatile Map<String, ModelDTO> modelMap = new ConcurrentHashMap<>();
private final Map<String, String> infoMap = new ConcurrentHashMap<>();
//
@Resource
FieldMapper fieldMapper;
@Autowired
MachineLearningModelsMapper machineLearningModelsMapper;
@Resource
PMMLExecutor pmmlExecutor;
@PostConstruct
public void init() {
logger.info("LocalCacheService_init_start");
updateAllCache(true);
//
CommUtil.doSleep(UPDATE_SLEEP_TIME_MS);
Thread t = new Thread();
t.start();
logger.info("LocalCacheService_init_done");
}
public Map<String,Object> getInfo(){
Map<String,Object> info = new HashMap<>();
info.put("runCount",runCount.longValue());
info.put("info",infoMap);
Map fieldsMapTmp = fieldsMap;
Map modelMapTmp = modelMap;
if(fieldsMapTmp!=null){
info.put("fieldsNum",fieldsMapTmp.size());
}
if(modelMapTmp!=null){
info.put("modelNum",modelMapTmp.size());
}
return info;
}
public Map<String, Field> getFieldsMap(){
return fieldsMap;
}
public Map<String, ModelDTO> getModelMap(){
return modelMap;
}
public void updateModelCache()throws Exception{
Map<String, ModelDTO> modelMapTmp = new ConcurrentHashMap<>();
List<MachineLearningModels> models = machineLearningModelsMapper.getAllModelList();
if(models!=null){
for(MachineLearningModels item:models){
Integer id = item.getId();
if(id==null){
continue;
}
String filePath = item.getFilePath();
// ModelDTO modelDTOOld = modelMap.get(id+"");
ModelDTO modelDTO = new ModelDTO();
modelDTO.setModel(item);
try {
Evaluator evaluator = pmmlExecutor.loadPmml(filePath);
modelDTO.setEvaluator(evaluator);
if (evaluator == null) {
modelDTO.setErrorMsg("evaluator_null");
}
}catch(Throwable e){
modelDTO.setErrorMsg("loadPmml_error,"+e);
}
modelMapTmp.put(id+"",modelDTO);
}
}
modelMap = modelMapTmp;
}
public void updateAllCache(boolean throwsError){
boolean error = false;
try{
updateFieldsCache();
}catch(Throwable e){
error = true;
logger.error("updateFieldsCache_error",e);
}
try{
updateModelCache();
}catch(Throwable e){
error = true;
logger.error("updateModelCache_error",e);
}
if(throwsError && error){
throw new RuntimeException("updateAllCacheError");
}
}
public void updateFieldsCache(){
Map<String, Field> fieldsMapTmp = new ConcurrentHashMap<>();
List<Field> fields = fieldMapper.getAllFieldList();
if(fields!=null) {
for(Field item:fields){
Long id = item.getId();
if(id!=null){
fieldsMapTmp.put(id+"",item);
}
}
}
fieldsMap = fieldsMapTmp;
}
private class LocalCacheUpdateThread extends Thread{
public void run(){
runCount.getAndIncrement();
updateAllCache(false);
}
}
}

View File

@@ -0,0 +1,18 @@
package com.fibo.ddp.common.service.common;
public class CommUtil {
public static void doSleep(long time){
if(time<=0){
return;
}
try{
Thread.sleep(time);
}catch(Throwable e){
// do nothing
}
}
}