yang_shj
3 months ago
16 changed files with 377 additions and 31 deletions
@ -0,0 +1,20 @@
|
||||
package com.hnac.hzims.bigmodel.api.constants; |
||||
|
||||
/** |
||||
* @Author: huangxing |
||||
* @Date: 2024/08/09 13:48 |
||||
*/ |
||||
public interface MqttTopicConstants { |
||||
|
||||
/** |
||||
* 新增视频主题 |
||||
*/ |
||||
String TOPIC_VIDEO_INSERT = "topic_video_insert"; |
||||
|
||||
/** |
||||
* 删除视频主题 |
||||
*/ |
||||
String TOPIC_VIDEO_DELETE = "topic_video_delete"; |
||||
|
||||
|
||||
} |
@ -0,0 +1,28 @@
|
||||
package com.hnac.hzims.bigmodel.api.constants; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Getter; |
||||
|
||||
/** |
||||
* @Author: huangxing |
||||
* @Date: 2024/08/09 09:13 |
||||
*/ |
||||
@AllArgsConstructor |
||||
@Getter |
||||
public enum SyncTableEnum { |
||||
/**视频**/ |
||||
VIDEO("video"), |
||||
/**实时画面**/ |
||||
CANVAS("canvas"), |
||||
/**故障**/ |
||||
FAULT("fault"), |
||||
/**站点**/ |
||||
STATION("station"), |
||||
/**实时数据属性**/ |
||||
RECORD("record"), |
||||
/**遥控**/ |
||||
REMOTE("yk") |
||||
; |
||||
private final String tableName; |
||||
|
||||
} |
@ -0,0 +1,40 @@
|
||||
package com.hnac.hzims.bigmodel.business.consumer; |
||||
|
||||
import com.alibaba.fastjson.JSONArray; |
||||
import com.alibaba.fastjson.JSONObject; |
||||
import com.google.common.collect.Lists; |
||||
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||
import com.hnac.hzims.bigmodel.api.feign.ISyncClient; |
||||
import com.hnac.hzims.bigmodel.api.feign.VideoSyncClient; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.core.tool.utils.CollectionUtil; |
||||
import org.springblade.core.tool.utils.Func; |
||||
import org.springblade.mqtt.customer.IMqttReceive; |
||||
import org.springblade.mqtt.customer.annotation.MqttReceive; |
||||
import org.springframework.stereotype.Service; |
||||
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @Author: huangxing |
||||
* @Date: 2024/08/09 10:33 |
||||
*/ |
||||
@MqttReceive(topicName = MqttTopicConstants.TOPIC_VIDEO_INSERT) |
||||
@Service |
||||
@Slf4j |
||||
@AllArgsConstructor |
||||
public class VideoAddConsumer implements IMqttReceive { |
||||
|
||||
private final VideoSyncClient videoSyncClient; |
||||
|
||||
@Override |
||||
public void handlerMessage(String message) { |
||||
log.info("收到新增视频信息,消息内容体为:{}", message); |
||||
List<VideoSyncDTO> videoSyncDTOS = JSONArray.parseArray(message, VideoSyncDTO.class); |
||||
if(Func.isNotEmpty(videoSyncDTOS)) { |
||||
videoSyncClient.saveBatch(videoSyncDTOS); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,36 @@
|
||||
package com.hnac.hzims.bigmodel.business.consumer; |
||||
|
||||
import com.alibaba.fastjson.JSONArray; |
||||
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||
import com.hnac.hzims.bigmodel.api.feign.VideoSyncClient; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.core.tool.utils.Func; |
||||
import org.springblade.mqtt.customer.IMqttReceive; |
||||
import org.springblade.mqtt.customer.annotation.MqttReceive; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @Author: huangxing |
||||
* @Date: 2024/08/12 09:29 |
||||
*/ |
||||
@MqttReceive(topicName = MqttTopicConstants.TOPIC_VIDEO_DELETE) |
||||
@Service |
||||
@Slf4j |
||||
@AllArgsConstructor |
||||
public class VideoRemoveConsumer implements IMqttReceive { |
||||
|
||||
private final VideoSyncClient videoSyncClient; |
||||
|
||||
|
||||
@Override |
||||
public void handlerMessage(String message) { |
||||
log.info("收到删除视频信息,消息内容体为:{}", message); |
||||
if(Func.isNotEmpty(message)) { |
||||
Func.toStrList(",",message).forEach(id -> videoSyncClient.deleteByIds(id)); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,76 @@
|
||||
package com.hnac.hzims.operational.station.aspect; |
||||
|
||||
import com.alibaba.fastjson.JSON; |
||||
import com.google.common.collect.Lists; |
||||
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||
import com.hnac.hzims.operational.station.entity.StationVideoTypeEntity; |
||||
import com.hnac.hzims.operational.station.feign.IStationClient; |
||||
import com.hnac.hzims.operational.station.service.IStationService; |
||||
import com.hnac.hzims.operational.station.vo.StationVO; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.aspectj.lang.JoinPoint; |
||||
import org.aspectj.lang.annotation.After; |
||||
import org.aspectj.lang.annotation.Aspect; |
||||
import org.aspectj.lang.annotation.Pointcut; |
||||
import org.springblade.core.tool.utils.BeanUtil; |
||||
import org.springblade.core.tool.utils.Func; |
||||
import org.springblade.mqtt.producer.IMqttSender; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.stream.Collectors; |
||||
|
||||
/** |
||||
* @Author: huangxing |
||||
* @Date: 2024/08/09 14:02 |
||||
*/ |
||||
@Aspect |
||||
@Component |
||||
@AllArgsConstructor |
||||
@Slf4j |
||||
public class AddVideoAspect { |
||||
|
||||
private final IStationService stationService; |
||||
private final IMqttSender mqttSender; |
||||
|
||||
@Pointcut("execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.saveBatch(..)) " + |
||||
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.save(..)) " + |
||||
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.updateById(..)) " + |
||||
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.updateBatchById(..))") |
||||
private void pointCut() { |
||||
|
||||
} |
||||
|
||||
@After("pointCut()") |
||||
public void after(JoinPoint point) { |
||||
Object[] args = point.getArgs(); |
||||
if(args.length == 1) { |
||||
Object arg = args[0]; |
||||
if(arg instanceof StationVideoTypeEntity) { |
||||
List<VideoSyncDTO> videoSyncDTOS = Lists.newArrayList(this.convert((StationVideoTypeEntity) arg)); |
||||
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_INSERT, JSON.toJSONString(videoSyncDTOS)); |
||||
} |
||||
else if(arg instanceof List) { |
||||
List<StationVideoTypeEntity> videoList = (List<StationVideoTypeEntity>) args[0]; |
||||
List<VideoSyncDTO> videoSyncDTOS = videoList.stream().map(this::convert).collect(Collectors.toList()); |
||||
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_INSERT, JSON.toJSONString(videoSyncDTOS)); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private VideoSyncDTO convert(StationVideoTypeEntity entity) { |
||||
VideoSyncDTO videoSyncDTO = BeanUtil.copy(entity,VideoSyncDTO.class); |
||||
StationVO station = stationService.getStationByCode(entity.getStationId()); |
||||
String stationName = Optional.ofNullable(station).map(StationVO::getName).orElse(""); |
||||
videoSyncDTO.setId(String.valueOf(entity.getId())); |
||||
videoSyncDTO.setItemId(String.valueOf(entity.getId())); |
||||
videoSyncDTO.setStationName(stationName); |
||||
videoSyncDTO.setItemName(stationName + " " + videoSyncDTO.getName()); |
||||
return videoSyncDTO; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,56 @@
|
||||
package com.hnac.hzims.operational.station.aspect; |
||||
|
||||
import com.alibaba.fastjson.JSON; |
||||
import com.google.common.collect.Lists; |
||||
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||
import com.hnac.hzims.operational.station.entity.StationVideoTypeEntity; |
||||
import com.hnac.hzims.operational.station.service.IStationService; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.aspectj.lang.JoinPoint; |
||||
import org.aspectj.lang.annotation.After; |
||||
import org.aspectj.lang.annotation.Aspect; |
||||
import org.aspectj.lang.annotation.Pointcut; |
||||
import org.springblade.mqtt.producer.IMqttSender; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
/** |
||||
* @Author: huangxing |
||||
* @Date: 2024/08/12 09:17 |
||||
*/ |
||||
@Aspect |
||||
@Component |
||||
@AllArgsConstructor |
||||
@Slf4j |
||||
public class RemoveVideoAspect { |
||||
|
||||
private final IMqttSender mqttSender; |
||||
|
||||
@Pointcut("execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.removeById(..)) " + |
||||
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.removeByIds(..)) ") |
||||
private void pointCut() { |
||||
|
||||
} |
||||
|
||||
@After("pointCut()") |
||||
public void after(JoinPoint point) { |
||||
Object[] args = point.getArgs(); |
||||
if(args.length == 1) { |
||||
Object arg = args[0]; |
||||
if(arg instanceof Long) { |
||||
// 若调用接口为removeById 则传参为long类型 将ID传给大模型进行数据同步
|
||||
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_DELETE, String.valueOf(arg)); |
||||
} |
||||
else if(arg instanceof Collection) { |
||||
// 若调用接口为removeByIds 则传参为String类型 将ID传给大模型进行数据同步
|
||||
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_DELETE, JSON.toJSONString(arg)); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue