haungxing
3 months ago
16 changed files with 377 additions and 31 deletions
@ -0,0 +1,20 @@ |
|||||||
|
package com.hnac.hzims.bigmodel.api.constants; |
||||||
|
|
||||||
|
/** |
||||||
|
* @Author: huangxing |
||||||
|
* @Date: 2024/08/09 13:48 |
||||||
|
*/ |
||||||
|
public interface MqttTopicConstants { |
||||||
|
|
||||||
|
/** |
||||||
|
* 新增视频主题 |
||||||
|
*/ |
||||||
|
String TOPIC_VIDEO_INSERT = "topic_video_insert"; |
||||||
|
|
||||||
|
/** |
||||||
|
* 删除视频主题 |
||||||
|
*/ |
||||||
|
String TOPIC_VIDEO_DELETE = "topic_video_delete"; |
||||||
|
|
||||||
|
|
||||||
|
} |
@ -0,0 +1,28 @@ |
|||||||
|
package com.hnac.hzims.bigmodel.api.constants; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Getter; |
||||||
|
|
||||||
|
/** |
||||||
|
* @Author: huangxing |
||||||
|
* @Date: 2024/08/09 09:13 |
||||||
|
*/ |
||||||
|
@AllArgsConstructor |
||||||
|
@Getter |
||||||
|
public enum SyncTableEnum { |
||||||
|
/**视频**/ |
||||||
|
VIDEO("video"), |
||||||
|
/**实时画面**/ |
||||||
|
CANVAS("canvas"), |
||||||
|
/**故障**/ |
||||||
|
FAULT("fault"), |
||||||
|
/**站点**/ |
||||||
|
STATION("station"), |
||||||
|
/**实时数据属性**/ |
||||||
|
RECORD("record"), |
||||||
|
/**遥控**/ |
||||||
|
REMOTE("yk") |
||||||
|
; |
||||||
|
private final String tableName; |
||||||
|
|
||||||
|
} |
@ -0,0 +1,40 @@ |
|||||||
|
package com.hnac.hzims.bigmodel.business.consumer; |
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONArray; |
||||||
|
import com.alibaba.fastjson.JSONObject; |
||||||
|
import com.google.common.collect.Lists; |
||||||
|
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||||
|
import com.hnac.hzims.bigmodel.api.feign.ISyncClient; |
||||||
|
import com.hnac.hzims.bigmodel.api.feign.VideoSyncClient; |
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
import org.springblade.core.tool.utils.CollectionUtil; |
||||||
|
import org.springblade.core.tool.utils.Func; |
||||||
|
import org.springblade.mqtt.customer.IMqttReceive; |
||||||
|
import org.springblade.mqtt.customer.annotation.MqttReceive; |
||||||
|
import org.springframework.stereotype.Service; |
||||||
|
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||||
|
|
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
/** |
||||||
|
* @Author: huangxing |
||||||
|
* @Date: 2024/08/09 10:33 |
||||||
|
*/ |
||||||
|
@MqttReceive(topicName = MqttTopicConstants.TOPIC_VIDEO_INSERT) |
||||||
|
@Service |
||||||
|
@Slf4j |
||||||
|
@AllArgsConstructor |
||||||
|
public class VideoAddConsumer implements IMqttReceive { |
||||||
|
|
||||||
|
private final VideoSyncClient videoSyncClient; |
||||||
|
|
||||||
|
@Override |
||||||
|
public void handlerMessage(String message) { |
||||||
|
log.info("收到新增视频信息,消息内容体为:{}", message); |
||||||
|
List<VideoSyncDTO> videoSyncDTOS = JSONArray.parseArray(message, VideoSyncDTO.class); |
||||||
|
if(Func.isNotEmpty(videoSyncDTOS)) { |
||||||
|
videoSyncClient.saveBatch(videoSyncDTOS); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,36 @@ |
|||||||
|
package com.hnac.hzims.bigmodel.business.consumer; |
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONArray; |
||||||
|
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||||
|
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||||
|
import com.hnac.hzims.bigmodel.api.feign.VideoSyncClient; |
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
import org.springblade.core.tool.utils.Func; |
||||||
|
import org.springblade.mqtt.customer.IMqttReceive; |
||||||
|
import org.springblade.mqtt.customer.annotation.MqttReceive; |
||||||
|
import org.springframework.stereotype.Service; |
||||||
|
|
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
/** |
||||||
|
* @Author: huangxing |
||||||
|
* @Date: 2024/08/12 09:29 |
||||||
|
*/ |
||||||
|
@MqttReceive(topicName = MqttTopicConstants.TOPIC_VIDEO_DELETE) |
||||||
|
@Service |
||||||
|
@Slf4j |
||||||
|
@AllArgsConstructor |
||||||
|
public class VideoRemoveConsumer implements IMqttReceive { |
||||||
|
|
||||||
|
private final VideoSyncClient videoSyncClient; |
||||||
|
|
||||||
|
|
||||||
|
@Override |
||||||
|
public void handlerMessage(String message) { |
||||||
|
log.info("收到删除视频信息,消息内容体为:{}", message); |
||||||
|
if(Func.isNotEmpty(message)) { |
||||||
|
Func.toStrList(",",message).forEach(id -> videoSyncClient.deleteByIds(id)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,76 @@ |
|||||||
|
package com.hnac.hzims.operational.station.aspect; |
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON; |
||||||
|
import com.google.common.collect.Lists; |
||||||
|
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||||
|
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||||
|
import com.hnac.hzims.operational.station.entity.StationVideoTypeEntity; |
||||||
|
import com.hnac.hzims.operational.station.feign.IStationClient; |
||||||
|
import com.hnac.hzims.operational.station.service.IStationService; |
||||||
|
import com.hnac.hzims.operational.station.vo.StationVO; |
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
import org.aspectj.lang.JoinPoint; |
||||||
|
import org.aspectj.lang.annotation.After; |
||||||
|
import org.aspectj.lang.annotation.Aspect; |
||||||
|
import org.aspectj.lang.annotation.Pointcut; |
||||||
|
import org.springblade.core.tool.utils.BeanUtil; |
||||||
|
import org.springblade.core.tool.utils.Func; |
||||||
|
import org.springblade.mqtt.producer.IMqttSender; |
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Optional; |
||||||
|
import java.util.stream.Collectors; |
||||||
|
|
||||||
|
/** |
||||||
|
* @Author: huangxing |
||||||
|
* @Date: 2024/08/09 14:02 |
||||||
|
*/ |
||||||
|
@Aspect |
||||||
|
@Component |
||||||
|
@AllArgsConstructor |
||||||
|
@Slf4j |
||||||
|
public class AddVideoAspect { |
||||||
|
|
||||||
|
private final IStationService stationService; |
||||||
|
private final IMqttSender mqttSender; |
||||||
|
|
||||||
|
@Pointcut("execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.saveBatch(..)) " + |
||||||
|
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.save(..)) " + |
||||||
|
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.updateById(..)) " + |
||||||
|
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.updateBatchById(..))") |
||||||
|
private void pointCut() { |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
@After("pointCut()") |
||||||
|
public void after(JoinPoint point) { |
||||||
|
Object[] args = point.getArgs(); |
||||||
|
if(args.length == 1) { |
||||||
|
Object arg = args[0]; |
||||||
|
if(arg instanceof StationVideoTypeEntity) { |
||||||
|
List<VideoSyncDTO> videoSyncDTOS = Lists.newArrayList(this.convert((StationVideoTypeEntity) arg)); |
||||||
|
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_INSERT, JSON.toJSONString(videoSyncDTOS)); |
||||||
|
} |
||||||
|
else if(arg instanceof List) { |
||||||
|
List<StationVideoTypeEntity> videoList = (List<StationVideoTypeEntity>) args[0]; |
||||||
|
List<VideoSyncDTO> videoSyncDTOS = videoList.stream().map(this::convert).collect(Collectors.toList()); |
||||||
|
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_INSERT, JSON.toJSONString(videoSyncDTOS)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private VideoSyncDTO convert(StationVideoTypeEntity entity) { |
||||||
|
VideoSyncDTO videoSyncDTO = BeanUtil.copy(entity,VideoSyncDTO.class); |
||||||
|
StationVO station = stationService.getStationByCode(entity.getStationId()); |
||||||
|
String stationName = Optional.ofNullable(station).map(StationVO::getName).orElse(""); |
||||||
|
videoSyncDTO.setId(String.valueOf(entity.getId())); |
||||||
|
videoSyncDTO.setItemId(String.valueOf(entity.getId())); |
||||||
|
videoSyncDTO.setStationName(stationName); |
||||||
|
videoSyncDTO.setItemName(stationName + " " + videoSyncDTO.getName()); |
||||||
|
return videoSyncDTO; |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,56 @@ |
|||||||
|
package com.hnac.hzims.operational.station.aspect; |
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON; |
||||||
|
import com.google.common.collect.Lists; |
||||||
|
import com.hnac.hzims.bigmodel.api.constants.MqttTopicConstants; |
||||||
|
import com.hnac.hzims.bigmodel.api.dto.VideoSyncDTO; |
||||||
|
import com.hnac.hzims.operational.station.entity.StationVideoTypeEntity; |
||||||
|
import com.hnac.hzims.operational.station.service.IStationService; |
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
import org.aspectj.lang.JoinPoint; |
||||||
|
import org.aspectj.lang.annotation.After; |
||||||
|
import org.aspectj.lang.annotation.Aspect; |
||||||
|
import org.aspectj.lang.annotation.Pointcut; |
||||||
|
import org.springblade.mqtt.producer.IMqttSender; |
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
import java.util.Collection; |
||||||
|
import java.util.List; |
||||||
|
import java.util.stream.Collectors; |
||||||
|
|
||||||
|
/** |
||||||
|
* @Author: huangxing |
||||||
|
* @Date: 2024/08/12 09:17 |
||||||
|
*/ |
||||||
|
@Aspect |
||||||
|
@Component |
||||||
|
@AllArgsConstructor |
||||||
|
@Slf4j |
||||||
|
public class RemoveVideoAspect { |
||||||
|
|
||||||
|
private final IMqttSender mqttSender; |
||||||
|
|
||||||
|
@Pointcut("execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.removeById(..)) " + |
||||||
|
"|| execution(* com.hnac.hzims.operational.station.service.IStationVideoTypeService.removeByIds(..)) ") |
||||||
|
private void pointCut() { |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
@After("pointCut()") |
||||||
|
public void after(JoinPoint point) { |
||||||
|
Object[] args = point.getArgs(); |
||||||
|
if(args.length == 1) { |
||||||
|
Object arg = args[0]; |
||||||
|
if(arg instanceof Long) { |
||||||
|
// 若调用接口为removeById 则传参为long类型 将ID传给大模型进行数据同步
|
||||||
|
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_DELETE, String.valueOf(arg)); |
||||||
|
} |
||||||
|
else if(arg instanceof Collection) { |
||||||
|
// 若调用接口为removeByIds 则传参为String类型 将ID传给大模型进行数据同步
|
||||||
|
mqttSender.sendToMqtt(MqttTopicConstants.TOPIC_VIDEO_DELETE, JSON.toJSONString(arg)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
Loading…
Reference in new issue