tyty
1 year ago
8 changed files with 413 additions and 132 deletions
@ -0,0 +1,12 @@
|
||||
package com.hnac.hzims.alarm.show.service; |
||||
|
||||
/** |
||||
* 告警处理接口 |
||||
* @author ysj |
||||
*/ |
||||
public interface FdpAlarmService { |
||||
|
||||
String sendMessage(); |
||||
|
||||
void receiveMessage(String message); |
||||
} |
@ -0,0 +1,126 @@
|
||||
package com.hnac.hzims.alarm.show.service.impl; |
||||
|
||||
import com.alibaba.fastjson.JSONObject; |
||||
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; |
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
||||
import com.hnac.hzims.alarm.entity.AlarmEntity; |
||||
import com.hnac.hzims.alarm.show.service.AlarmService; |
||||
import com.hnac.hzims.alarm.show.service.FdpAlarmService; |
||||
import com.hnac.hzims.alarm.show.service.MessageService; |
||||
import com.hnac.hzims.alarm.vo.FdpAlarmVo; |
||||
import com.hnac.hzims.message.fegin.IMessageClient; |
||||
import com.hnac.hzims.operational.station.entity.StationEntity; |
||||
import com.hnac.hzims.operational.station.feign.IStationClient; |
||||
import lombok.RequiredArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.core.log.exception.ServiceException; |
||||
import org.springblade.core.tool.api.R; |
||||
import org.springblade.core.tool.utils.CollectionUtil; |
||||
import org.springblade.core.tool.utils.DateUtil; |
||||
import org.springblade.system.feign.ISysClient; |
||||
import org.springblade.system.user.feign.IUserClient; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import static com.hnac.hzims.alarm.constants.AlarmConstants.EARLY; |
||||
import static com.hnac.hzims.alarm.constants.AlarmConstants.EARLY_WARNING; |
||||
|
||||
/** |
||||
* 等级告警实现类 |
||||
* @author ysj |
||||
*/ |
||||
@Slf4j |
||||
@Service |
||||
@RequiredArgsConstructor |
||||
public class FdpAlarmServiceImpl implements FdpAlarmService { |
||||
|
||||
|
||||
private final MessageService messageService; |
||||
private final IUserClient userClient; |
||||
private final ISysClient sysClient; |
||||
|
||||
private final IMessageClient messageClient; |
||||
private final AlarmService alarmService; |
||||
private final IStationClient stationClient; |
||||
/** |
||||
* 定时发送消息内容 |
||||
* @return String |
||||
*/ |
||||
@Override |
||||
public String sendMessage() { |
||||
R<List<StationEntity>> listAll = stationClient.getListAll(); |
||||
if (!listAll.isSuccess()||CollectionUtil.isEmpty(listAll.getData())){ |
||||
throw new ServiceException("FdpAlarm send message is null"); |
||||
} |
||||
List<StationEntity> stations = listAll.getData(); |
||||
Map<String,String> map = new ConcurrentHashMap<>(); |
||||
map.put("stations",stations.stream().map(StationEntity::getCode).collect(Collectors.joining(","))); |
||||
return JSONObject.toJSONString(map); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* 接收服务推送消息 |
||||
* @param message |
||||
*/ |
||||
@Override |
||||
public void receiveMessage(String message) { |
||||
// 对象转换
|
||||
List<FdpAlarmVo> alarms = JSONObject.parseArray(message, FdpAlarmVo.class); |
||||
if(CollectionUtil.isEmpty(alarms)){ |
||||
return; |
||||
} |
||||
|
||||
R<List<StationEntity>> listAll = stationClient.getListAll(); |
||||
if (!listAll.isSuccess()||CollectionUtil.isEmpty(listAll.getData())){ |
||||
throw new ServiceException("FdpAlarm send message is null"); |
||||
} |
||||
List<StationEntity> stations = listAll.getData(); |
||||
// 查询当天已经记录的告警
|
||||
List<AlarmEntity> historys = alarmService.list(Wrappers.<AlarmEntity>lambdaQuery(). |
||||
ge(AlarmEntity::getCreateTime, DateUtil.format(new Date(), DateUtil.PATTERN_DATE) + " 00:00:00") |
||||
.eq(AlarmEntity::getAlarmSource, EARLY_WARNING)); |
||||
|
||||
// 数据过滤
|
||||
List<AlarmEntity> entitys = alarms.stream() |
||||
.filter(alarm -> CollectionUtil.isNotEmpty(historys) || !historys.stream().map(AlarmEntity::getAlarmId).collect(Collectors.toList()).contains(alarm.getFaultId())) |
||||
.map(item->{ |
||||
//转换对象
|
||||
AlarmEntity entity = getAlarmEntity(item); |
||||
// 短信
|
||||
CompletableFuture.runAsync(() -> messageService.message(entity)); |
||||
// web/app消息推送
|
||||
CompletableFuture.runAsync(() -> messageService.webAppMessage(entity)); |
||||
// 微信公众号推送
|
||||
CompletableFuture.runAsync(() -> messageService.weChatMessage(entity)); |
||||
return entity; |
||||
}).collect(Collectors.toList()); |
||||
if(CollectionUtil.isEmpty(entitys)){ |
||||
return; |
||||
} |
||||
// 批量保存
|
||||
alarmService.saveBatch(entitys); |
||||
} |
||||
|
||||
private AlarmEntity getAlarmEntity(FdpAlarmVo item) { |
||||
AlarmEntity entity = new AlarmEntity(); |
||||
entity.setAlarmId(item.getFaultId()); |
||||
entity.setAlarmTime(item.getCreateTime()); |
||||
entity.setAlarmContext(item.getFinfo()); |
||||
entity.setAlarmType(EARLY); |
||||
entity.setAlarmStatus(Integer.valueOf(String.valueOf(item.getStatus()))); |
||||
entity.setStationId(item.getStation()); |
||||
entity.setAlarmStatus(EARLY_WARNING); |
||||
R<StationEntity> stationByCode = stationClient.getStationByCode(item.getStation()); |
||||
if (stationByCode.isSuccess()&& ObjectUtils.isNotEmpty(stationByCode.getData())){ |
||||
entity.setCreateDept(stationByCode.getData().getCreateDept()); |
||||
} |
||||
return entity; |
||||
} |
||||
} |
@ -0,0 +1,51 @@
|
||||
package com.hnac.hzims.alarm.ws.fdp; |
||||
|
||||
import com.hnac.hzims.alarm.show.service.FdpAlarmService; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.core.tool.utils.ObjectUtil; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.scheduling.annotation.EnableScheduling; |
||||
import org.springframework.scheduling.annotation.Scheduled; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.net.URI; |
||||
|
||||
/** |
||||
* 等级告警获取数据长链接 |
||||
* @author ty |
||||
*/ |
||||
@Slf4j |
||||
@Component |
||||
@EnableScheduling |
||||
public class FdpAlarmRegular { |
||||
|
||||
@Value("${hzims.fdp.alarm-url}") |
||||
private String fdp_alarm_url; |
||||
|
||||
private FdpAlarmWebSocket client; |
||||
|
||||
@Autowired |
||||
private FdpAlarmService fdpAlarmService; |
||||
|
||||
// 定时发送消息
|
||||
@Scheduled(cron = "0 0/30 * * * ?") |
||||
private void regular(){ |
||||
// 检查链接存活状态
|
||||
if(ObjectUtil.isNotEmpty(client) && client.isOpen()){ |
||||
client.send(fdpAlarmService.sendMessage()); |
||||
}else { |
||||
this.createClient(); |
||||
} |
||||
} |
||||
|
||||
// 创建websocket链接
|
||||
private void createClient() { |
||||
try{ |
||||
client = new FdpAlarmWebSocket(new URI(fdp_alarm_url)); |
||||
client.connectBlocking(); |
||||
}catch (Exception e){ |
||||
log.error("FdpAlarm create error : {}",e.getMessage()); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,125 @@
|
||||
package com.hnac.hzims.alarm.ws.fdp; |
||||
|
||||
import com.hnac.hzims.alarm.show.service.SystemAlarmService; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.java_websocket.client.WebSocketClient; |
||||
import org.java_websocket.handshake.ServerHandshake; |
||||
import org.springblade.core.tool.utils.SpringUtil; |
||||
import org.springblade.core.tool.utils.StringUtil; |
||||
|
||||
import javax.net.ssl.SSLContext; |
||||
import javax.net.ssl.TrustManager; |
||||
import javax.net.ssl.X509TrustManager; |
||||
import java.net.Socket; |
||||
import java.net.URI; |
||||
import java.security.SecureRandom; |
||||
import java.security.cert.X509Certificate; |
||||
import java.util.Optional; |
||||
|
||||
/** |
||||
* 等级告警获取数据长链接 |
||||
* @author ty |
||||
*/ |
||||
@Slf4j |
||||
public class FdpAlarmWebSocket extends WebSocketClient { |
||||
|
||||
private final SystemAlarmService systemAlarmService; |
||||
|
||||
/** |
||||
* 构造等级告警websocket |
||||
* @param uri |
||||
*/ |
||||
public FdpAlarmWebSocket(URI uri) { |
||||
super(uri); |
||||
systemAlarmService = SpringUtil.getBean(SystemAlarmService.class); |
||||
connection(this); |
||||
} |
||||
|
||||
// 链接到服务器回调接口
|
||||
@Override |
||||
public void onOpen(ServerHandshake handshakedata) { |
||||
log.error("FdpAlarm websocket open"); |
||||
} |
||||
|
||||
// 接收到服务器消息回调接口
|
||||
@Override |
||||
public void onMessage(String message) { |
||||
if(StringUtil.isEmpty(message)){ |
||||
log.error("FdpAlarm on message is null"); |
||||
}else{ |
||||
// 等级告警数据处理
|
||||
systemAlarmService.receiveMessage(message); |
||||
} |
||||
} |
||||
|
||||
// 与服务器链接中断回调接口
|
||||
@Override |
||||
public void onClose(int code, String reason, boolean remote) { |
||||
log.error("FdpAlarm websocket close"); |
||||
} |
||||
|
||||
// 与服务器通讯异常触发
|
||||
@Override |
||||
public void onError(Exception e) { |
||||
log.error("FdpAlarm websocket error : {}",e.getMessage()); |
||||
} |
||||
|
||||
/** |
||||
* 建立链接 |
||||
* @param webSocket |
||||
*/ |
||||
private void connection(FdpAlarmWebSocket webSocket) { |
||||
SSLContext context = init(); |
||||
if(Optional.ofNullable(context).isPresent()){ |
||||
Socket socket = create(context); |
||||
if(Optional.ofNullable(socket).isPresent()){ |
||||
webSocket.setSocket(socket); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 创建Socket |
||||
* @param context |
||||
* @return |
||||
*/ |
||||
private Socket create(SSLContext context) { |
||||
Socket socket = null; |
||||
try{ |
||||
socket = context.getSocketFactory().createSocket(); |
||||
}catch (Exception e){ |
||||
log.error("FdpAlarm socket create error : {}",e.getMessage()); |
||||
} |
||||
return socket; |
||||
} |
||||
|
||||
/** |
||||
* 协议初始化 |
||||
* @return SSLContext |
||||
*/ |
||||
private SSLContext init() { |
||||
SSLContext SSL = null; |
||||
try{ |
||||
SSL = SSLContext.getInstance("TLS"); |
||||
SSL.init(null, new TrustManager[]{new X509TrustManager() { |
||||
@Override |
||||
public void checkClientTrusted(X509Certificate[] chain, |
||||
String authType) { |
||||
} |
||||
|
||||
@Override |
||||
public void checkServerTrusted(X509Certificate[] chain, String authType) { |
||||
} |
||||
|
||||
@Override |
||||
public X509Certificate[] getAcceptedIssuers() { |
||||
return new X509Certificate[0]; |
||||
} |
||||
}}, new SecureRandom()); |
||||
}catch (Exception e){ |
||||
log.error("FdpAlarm SSL init error : {}",e.getMessage()); |
||||
} |
||||
return SSL; |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue