Browse Source

# 消息中心即时推送逻辑修改

zhongwei
haungxing 9 months ago
parent
commit
2424905687
  1. 7
      hzims-biz-common/src/main/java/com/hnac/hzims/common/invalid/Create.java
  2. 7
      hzims-biz-common/src/main/java/com/hnac/hzims/common/invalid/Update.java
  3. 15
      hzims-service-api/message-api/src/main/java/com/hnac/hzims/message/dto/BusinessMessageDTO.java
  4. 3
      hzims-service-api/message-api/src/main/java/com/hnac/hzims/message/entity/MessagePushRecordEntity.java
  5. 4
      hzims-service-api/pom.xml
  6. 10
      hzims-service/message/src/main/java/com/hnac/hzims/message/controller/MessageController.java
  7. 35
      hzims-service/message/src/main/java/com/hnac/hzims/message/fegin/MessageClient.java
  8. 32
      hzims-service/message/src/main/java/com/hnac/hzims/message/schedule/MessagePushSchedule.java
  9. 31
      hzims-service/message/src/main/java/com/hnac/hzims/message/service/IMessagePushRecordService.java
  10. 185
      hzims-service/message/src/main/java/com/hnac/hzims/message/service/impl/MessagePushRecordServiceImpl.java
  11. 56
      hzims-service/message/src/main/java/com/hnac/hzims/message/service/impl/PushMessageServiceImpl.java
  12. 9
      hzims-service/message/src/main/java/com/hnac/hzims/message/service/impl/SmsMessageServiceImpl.java
  13. 2
      hzims-service/message/src/main/resources/db/1.0.1.sql

7
hzims-biz-common/src/main/java/com/hnac/hzims/common/invalid/Create.java

@ -0,0 +1,7 @@
package com.hnac.hzims.common.invalid;
import javax.validation.groups.Default;
public interface Create extends Default {
}

7
hzims-biz-common/src/main/java/com/hnac/hzims/common/invalid/Update.java

@ -0,0 +1,7 @@
package com.hnac.hzims.common.invalid;
import javax.validation.groups.Default;
public interface Update extends Default {
}

15
hzims-service-api/message-api/src/main/java/com/hnac/hzims/message/dto/BusinessMessageDTO.java

@ -2,6 +2,8 @@ package com.hnac.hzims.message.dto;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import com.hnac.hzims.common.invalid.Create;
import com.hnac.hzims.common.invalid.Update;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
@ -11,6 +13,7 @@ import org.springblade.core.mp.support.SqlCondition;
import org.springblade.core.tool.utils.DateUtil; import org.springblade.core.tool.utils.DateUtil;
import org.springframework.format.annotation.DateTimeFormat; import org.springframework.format.annotation.DateTimeFormat;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDate; import java.time.LocalDate;
@ -28,33 +31,33 @@ import java.time.LocalDateTime;
@EqualsAndHashCode @EqualsAndHashCode
public class BusinessMessageDTO implements Serializable { public class BusinessMessageDTO implements Serializable {
@ApiModelProperty(value = "机构ID",required = true) @ApiModelProperty(value = "机构ID",required = true)
@NotNull(message = "机构ID不能为空") @NotNull(message = "机构ID不能为空",groups = {Create.class})
private Long deptId; private Long deptId;
@ApiModelProperty("机构名称") @ApiModelProperty("机构名称")
private String deptName; private String deptName;
@ApiModelProperty(value = "业务关键字",required = true) @ApiModelProperty(value = "业务关键字",required = true)
@NotNull(message = "业务关键字不能为空") @NotBlank(message = "业务关键字不能为空",groups = {Create.class})
private String businessKey; private String businessKey;
@ApiModelProperty(value = "业务分类。系统通知:system,事务消息:business,日常提醒:dailyRemind,巡检消息:inspect",required = true) @ApiModelProperty(value = "业务分类。系统通知:system,事务消息:business,日常提醒:dailyRemind,巡检消息:inspect",required = true)
@NotNull(message = "业务分类不能为空") @NotBlank(message = "业务分类不能为空",groups = {Create.class})
private String businessClassify; private String businessClassify;
@ApiModelProperty("业务任务ID") @ApiModelProperty("业务任务ID")
@QueryField(condition = SqlCondition.EQUAL) @QueryField(condition = SqlCondition.EQUAL)
private Long taskId; private Long taskId;
@NotNull @NotBlank(message = "内容不能为空",groups = {Create.class})
@ApiModelProperty(value = "内容") @ApiModelProperty(value = "内容")
private String content; private String content;
@NotNull @NotBlank(message = "主题不能为空",groups = {Create.class})
@ApiModelProperty(value = "主题") @ApiModelProperty(value = "主题")
private String subject; private String subject;
@NotNull @NotBlank(message = "推送用户不能为空",groups = {Create.class})
@ApiModelProperty(value = "推送用户") @ApiModelProperty(value = "推送用户")
private String userIds; private String userIds;

3
hzims-service-api/message-api/src/main/java/com/hnac/hzims/message/entity/MessagePushRecordEntity.java

@ -91,6 +91,9 @@ public class MessagePushRecordEntity extends MessageTemplateEntity {
@ApiModelProperty(value = "响应说明") @ApiModelProperty(value = "响应说明")
private String respondRemark; private String respondRemark;
@ApiModelProperty("推送失败结果")
private String faultResult;
@ApiModelProperty("查询开始时间") @ApiModelProperty("查询开始时间")
@DateTimeFormat(pattern = DateUtil.PATTERN_DATE) @DateTimeFormat(pattern = DateUtil.PATTERN_DATE)
@JsonFormat(pattern = DateUtil.PATTERN_DATE) @JsonFormat(pattern = DateUtil.PATTERN_DATE)

4
hzims-service-api/pom.xml

@ -33,6 +33,10 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.hnac.hzims</groupId>
<artifactId>hzims-biz-common</artifactId>
</dependency>
<dependency>
<groupId>org.springblade</groupId> <groupId>org.springblade</groupId>
<artifactId>blade-starter-mybatis</artifactId> <artifactId>blade-starter-mybatis</artifactId>
<exclusions> <exclusions>

10
hzims-service/message/src/main/java/com/hnac/hzims/message/controller/MessageController.java

@ -81,14 +81,4 @@ public class MessageController extends BladeController {
return messageService.send(wsPushDto); return messageService.send(wsPushDto);
} }
@GetMapping("/sendByUsers")
@ApiOperation("发送APP消息(多人)")
@ApiOperationSupport(order=5)
public R sendByUsers(@ApiParam("消息主题") String subject,
@ApiParam("消息内容") String content,
@ApiParam("用户ID列表") String userIds,
@ApiParam("租户ID") String tenantId) throws Exception {
PushMessageServiceImpl messageService = SpringUtil.getBean(PushMessageServiceImpl.class);
return R.status(messageService.sendByUsers(subject,content, Func.toStrList(",",userIds),tenantId));
}
} }

35
hzims-service/message/src/main/java/com/hnac/hzims/message/fegin/MessageClient.java

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.hnac.hzims.common.invalid.Create;
import com.hnac.hzims.message.MessageConstants; import com.hnac.hzims.message.MessageConstants;
import com.hnac.hzims.message.dto.*; import com.hnac.hzims.message.dto.*;
import com.hnac.hzims.message.entity.MessagePushRecordEntity; import com.hnac.hzims.message.entity.MessagePushRecordEntity;
@ -27,6 +28,7 @@ import org.springblade.system.feign.ISysClient;
import org.springblade.system.user.cache.UserCache; import org.springblade.system.user.cache.UserCache;
import org.springblade.system.user.entity.User; import org.springblade.system.user.entity.User;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.validation.Valid; import javax.validation.Valid;
@ -146,37 +148,8 @@ public class MessageClient extends BladeController implements IMessageClient{
@Override @Override
@PostMapping(value = SEND_APP_AND_WS_MSG , produces="application/json; charset=UTF-8") @PostMapping(value = SEND_APP_AND_WS_MSG , produces="application/json; charset=UTF-8")
public R<Boolean> sendAppAndWsMsgByUsers(@RequestBody BusinessMessageDTO request) { public R<Boolean> sendAppAndWsMsgByUsers(@RequestBody @Validated(Create.class) BusinessMessageDTO request) {
// 保存消息记录 return R.status(recordService.sendAppAndWsMsgByUsers(request));
List<MessagePushRecordEntity> pushRecords = Func.toLongList(request.getUserIds()).stream().flatMap(userId -> {
long messageId = IdWorker.getId();
return Lists.newArrayList(MessageConstants.APP_PUSH, MessageConstants.WS_PUSH).stream().map(messageType -> {
MessagePushRecordEntity record = BeanUtil.copy(request, MessagePushRecordEntity.class);
record.setDeptName(Func.isNotEmpty(record.getDeptName()) ? record.getDeptName() : this.getDeptNameById(record.getDeptId()));
record.setMessageId(messageId);
record.setPusher(userId.toString());
record.setPusherName(Optional.ofNullable(UserCache.getUser(userId)).map(User::getName).orElse(null));
record.setPushType(MessageConstants.IMMEDIATELY);
record.setAccount(userId.toString());
record.setPlanTime(LocalDateTime.now());
record.setType(messageType);
record.setCreateDept(record.getDeptId());
record.setStatus(MessageConstants.NOT_PUSH);
return record;
});
}).collect(Collectors.toList());
boolean saveResult = recordService.saveBatch(pushRecords);
if(saveResult) {
List<MessagePushRecordEntity> appRecords = pushRecords.stream().filter(record -> MessageConstants.APP_PUSH.equals(record.getType())).collect(Collectors.toList());
// 推送消息 - app
Boolean appFlag = recordService.sendAppMsgByUsers(request, appRecords);
// 推送消息 - web
Boolean wsFlag = recordService.sendWsMsgByUsers(new ArrayList<>(CollectionUtils.subtract(pushRecords, appRecords)));
return R.data(appFlag && wsFlag);
}
else {
return R.data(false);
}
} }
@Override @Override

32
hzims-service/message/src/main/java/com/hnac/hzims/message/schedule/MessagePushSchedule.java

@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.hnac.hzims.common.constant.CommonConstant;
import com.hnac.hzims.common.utils.DateUtil; import com.hnac.hzims.common.utils.DateUtil;
import com.hnac.hzims.message.MessageConstants; import com.hnac.hzims.message.MessageConstants;
import com.hnac.hzims.message.config.MessageFactory; import com.hnac.hzims.message.config.MessageFactory;
@ -16,12 +17,14 @@ import com.hnac.hzims.message.service.IMessagePushRecordService;
import com.hnac.hzims.message.service.IMessageService; import com.hnac.hzims.message.service.IMessageService;
import com.hnac.hzims.message.service.IMessageTemplateService; import com.hnac.hzims.message.service.IMessageTemplateService;
import com.hnac.hzims.message.service.impl.PushMessageServiceImpl; import com.hnac.hzims.message.service.impl.PushMessageServiceImpl;
import com.hnac.hzinfo.core.push.model.PushResponse;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.CollectionUtil; import org.springblade.core.tool.utils.CollectionUtil;
import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.SpringUtil; import org.springblade.core.tool.utils.SpringUtil;
@ -93,18 +96,27 @@ public class MessagePushSchedule {
.collect(Collectors.groupingBy(MessagePushRecordEntity::getSubject)); .collect(Collectors.groupingBy(MessagePushRecordEntity::getSubject));
listMap.forEach((subject,list) -> { listMap.forEach((subject,list) -> {
try { try {
boolean pushFlag = pushMessageService.sendByUsers( String content = list.stream().map(MessagePushRecordEntity::getContent).collect(Collectors.joining("\r\n"));
subject, ArrayList<String> pushers = Lists.newArrayList(pusher.toString());
list.stream().map(MessagePushRecordEntity::getContent).collect(Collectors.joining("\r\n")), R<PushResponse> androidPush = pushMessageService.sendAndroidMsg(subject, content, pushers, CommonConstant.TENANT_ID);
Lists.newArrayList(pusher.toString()), R<PushResponse> iosPush = pushMessageService.sendIOSMsg(subject, content, pushers, CommonConstant.TENANT_ID);
"200000"
);
if(pushFlag) {
XxlJobLogger.log("消息推送失败,subject为"+subject+";list为:"+JSON.toJSONString(list));
}
LambdaUpdateWrapper<MessagePushRecordEntity> updateWrapper = Wrappers.<MessagePushRecordEntity>lambdaUpdate() LambdaUpdateWrapper<MessagePushRecordEntity> updateWrapper = Wrappers.<MessagePushRecordEntity>lambdaUpdate()
.set(MessagePushRecordEntity::getStatus, MessageConstants.PUSH_SUCCESS)
.in(MessagePushRecordEntity::getId, list.stream().map(MessagePushRecordEntity::getId).collect(Collectors.toList())); .in(MessagePushRecordEntity::getId, list.stream().map(MessagePushRecordEntity::getId).collect(Collectors.toList()));
String faultResult = "";
if(!androidPush.isSuccess()) {
updateWrapper.set(MessagePushRecordEntity::getStatus,MessageConstants.PUSH_FAILED);
faultResult += "ANDROID消息推送失败,推送结果为:"+androidPush.getMsg()+";";
XxlJobLogger.log("ANDROID消息推送失败,subject为"+subject+";list为:"+JSON.toJSONString(list));
}
if(!iosPush.isSuccess()) {
updateWrapper.set(MessagePushRecordEntity::getStatus,MessageConstants.PUSH_FAILED);
faultResult += "IOS消息推送失败,推送结果为:"+androidPush.getMsg()+";";
XxlJobLogger.log("IOS消息推送失败,subject为"+subject+";list为:"+JSON.toJSONString(list));
}
else {
updateWrapper.set(MessagePushRecordEntity::getStatus, MessageConstants.PUSH_SUCCESS);
}
updateWrapper.set(MessagePushRecordEntity::getFaultResult,faultResult);
recordService.update(updateWrapper); recordService.update(updateWrapper);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);

31
hzims-service/message/src/main/java/com/hnac/hzims/message/service/IMessagePushRecordService.java

@ -105,21 +105,6 @@ public interface IMessagePushRecordService extends BaseService<MessagePushRecord
Boolean sendWxMessageByUser(WxMessageDTO message,List<MessagePushRecordEntity> records); Boolean sendWxMessageByUser(WxMessageDTO message,List<MessagePushRecordEntity> records);
/**
* App多人推送消息
* @param request 消息内容
* @param records 消息记录
* @return 推送结果
*/
Boolean sendAppMsgByUsers(BusinessMessageDTO request,List<MessagePushRecordEntity> records);
/**
* web多人推送消息
* @param request 消息记录
* @return 推送结果
*/
Boolean sendWsMsgByUsers(List<MessagePushRecordEntity> request);
Boolean sendMailMsgByUsers(List<MessagePushRecordEntity> request); Boolean sendMailMsgByUsers(List<MessagePushRecordEntity> request);
@ -136,4 +121,20 @@ public interface IMessagePushRecordService extends BaseService<MessagePushRecord
* @return 推送结果 * @return 推送结果
*/ */
Boolean sendSmsImmediatelyMsg(MessagePushRecordEntity record); Boolean sendSmsImmediatelyMsg(MessagePushRecordEntity record);
/**
* 推送即时消息
* @param record 消息内容
* @return 推送结果
*/
Boolean sendImmediatelyMSg(MessagePushRecordEntity record);
/**
* 推送事务消息即时推送APP消息存入redis进行消费推送
* @param request 事务消息
* @return 推送结果
*/
Boolean sendAppAndWsMsgByUsers(BusinessMessageDTO request);
} }

185
hzims-service/message/src/main/java/com/hnac/hzims/message/service/impl/MessagePushRecordServiceImpl.java

@ -26,15 +26,23 @@ import com.hnac.hzims.message.vo.UnreadMessageVO;
import com.hnac.hzims.message.vo.msgpushrecord.*; import com.hnac.hzims.message.vo.msgpushrecord.*;
import io.minio.messages.Item; import io.minio.messages.Item;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springblade.core.log.exception.ServiceException; import org.springblade.core.log.exception.ServiceException;
import org.springblade.core.mp.base.BaseServiceImpl; import org.springblade.core.mp.base.BaseServiceImpl;
import com.hnac.hzims.common.utils.Condition; import com.hnac.hzims.common.utils.Condition;
import org.springblade.core.mp.support.Query; import org.springblade.core.mp.support.Query;
import org.springblade.core.secure.utils.AuthUtil; import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.BeanUtil;
import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.SpringUtil; import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.system.cache.DictCache; import org.springblade.system.cache.DictCache;
import org.springblade.system.feign.ISysClient;
import org.springblade.system.user.cache.UserCache;
import org.springblade.system.user.entity.User;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -43,15 +51,20 @@ import javax.validation.Valid;
import java.time.*; import java.time.*;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service
@AllArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
public class MessagePushRecordServiceImpl extends BaseServiceImpl<MessagePushRecordMapper, MessagePushRecordEntity> implements IMessagePushRecordService { public class MessagePushRecordServiceImpl extends BaseServiceImpl<MessagePushRecordMapper, MessagePushRecordEntity> implements IMessagePushRecordService {
private final MprNoAurhScopeMapper noAurhScopeMapper; private final MprNoAurhScopeMapper noAurhScopeMapper;
private final IMessageService smsMessageService; private final IMessageService smsMessageService;
private final ISysClient sysClient;
private final RedisTemplate redisTemplate;
@Value("${hzims.message.redis-key.app-push}")
private String appPushRedisKey;
@Override @Override
public List<MessagePushRecordEntity> list(MessagePushRecordEntity request) { public List<MessagePushRecordEntity> list(MessagePushRecordEntity request) {
@ -94,14 +107,13 @@ public class MessagePushRecordServiceImpl extends BaseServiceImpl<MessagePushRec
} }
/** /**
* @return java.lang.Boolean 推送解雇 * @return java.lang.Boolean 推送结果
* @Author hx * @Author hx
* @Description 保存并推送消息 * @Description 保存并推送消息
* @Date 2023/4/12 10:23 * @Date 2023/4/12 10:23
* @Param [request] 消息记录 * @Param [request] 消息记录
**/ **/
@Override @Override
@Transactional(rollbackFor = Exception.class)
public Boolean saveAndSend(MessagePushRecordEntity request) { public Boolean saveAndSend(MessagePushRecordEntity request) {
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
request.setStatus(MessageConstants.NOT_PUSH); request.setStatus(MessageConstants.NOT_PUSH);
@ -109,10 +121,7 @@ public class MessagePushRecordServiceImpl extends BaseServiceImpl<MessagePushRec
request.setPlanTime(now); request.setPlanTime(now);
request.setPushTime(now); request.setPushTime(now);
// 立即推送消息 // 立即推送消息
if (this.save(request) && Func.isNotEmpty(request.getId())) { return this.sendImmediatelyMSg(request);
return this.send(request.getId());
}
throw new ServiceException("保存消息失败!");
} else if (MessageConstants.PLAN.equals(request.getPushType())) { } else if (MessageConstants.PLAN.equals(request.getPushType())) {
return this.save(request); return this.save(request);
} else { } else {
@ -296,19 +305,131 @@ public class MessagePushRecordServiceImpl extends BaseServiceImpl<MessagePushRec
public Boolean sendSmsImmediatelyMsg(MessagePushRecordEntity record) { public Boolean sendSmsImmediatelyMsg(MessagePushRecordEntity record) {
// 完善推送Account // 完善推送Account
record.setAccount(smsMessageService.getAccountByPusher(Long.parseLong(record.getPusher()),record.getType())); record.setAccount(smsMessageService.getAccountByPusher(Long.parseLong(record.getPusher()),record.getType()));
if(this.save(record)) { SmsMessageServiceImpl smsService = SpringUtil.getBean(SmsMessageServiceImpl.class);
if(smsMessageService.send(record)) { if(this.saveOrUpdate(record)) {
this.update( R sendResult = smsService.sendSmsByRecord(record);
Wrappers.<MessagePushRecordEntity>lambdaUpdate() if(sendResult.isSuccess()) {
.set(MessagePushRecordEntity::getStatus,MessageConstants.PUSH_SUCCESS) record.setStatus(MessageConstants.PUSH_SUCCESS);
.set(MessagePushRecordEntity::getPushTime,LocalDateTime.now()) record.setPushTime(LocalDateTime.now());
.eq(MessagePushRecordEntity::getId,record.getId()) }
); else {
record.setStatus(MessageConstants.PUSH_FAILED);
record.setFaultResult(sendResult.getMsg());
}
this.updateById(record);
}
return true;
}
/**
* 推送APP即时消息
* @param record 消息记录
* @return 推送结果
*/
private Boolean sendAppImmediatelyMsg(MessagePushRecordEntity record) {
if(this.saveOrUpdate(record)) {
PushMessageServiceImpl pushService = SpringUtil.getBean(PushMessageServiceImpl.class);
if(pushService.send(record)) {
record.setStatus(MessageConstants.PUSH_SUCCESS);
record.setPushTime(LocalDateTime.now());
}
else {
record.setStatus(MessageConstants.PUSH_FAILED);
}
this.updateById(record);
}
return true;
}
private Boolean sendWsImmediatelyMsg(MessagePushRecordEntity record) {
if(this.saveOrUpdate(record)) {
WebsocketServiceImpl wsService = SpringUtil.getBean(WebsocketServiceImpl.class);
if(wsService.send(record)) {
record.setStatus(MessageConstants.PUSH_SUCCESS);
record.setPushTime(LocalDateTime.now());
}
else {
record.setStatus(MessageConstants.PUSH_FAILED);
}
this.updateById(record);
}
return true;
}
@Override
public Boolean sendImmediatelyMSg(MessagePushRecordEntity record) {
Assert.isTrue(MessageConstants.IMMEDIATELY.equals(record.getPushType()),() -> {
throw new ServiceException("该消息记录推送类型错误,只能为即时推送!");
});
switch(record.getType()) {
case MessageConstants.APP_PUSH:
return this.sendAppImmediatelyMsg(record);
case MessageConstants.WX_PUSH:
break;
case MessageConstants.WS_PUSH:
return this.sendWsImmediatelyMsg(record);
case MessageConstants.SMS_PUSH:
return this.sendSmsImmediatelyMsg(record);
default:
break;
}
return true;
}
@Override
public Boolean sendAppAndWsMsgByUsers(BusinessMessageDTO request) {
// 保存消息记录
List<MessagePushRecordEntity> pushRecords = Func.toLongList(request.getUserIds()).stream().flatMap(userId -> {
long messageId = IdWorker.getId();
return Lists.newArrayList(MessageConstants.APP_PUSH, MessageConstants.WS_PUSH).stream().map(messageType -> {
MessagePushRecordEntity record = BeanUtil.copy(request, MessagePushRecordEntity.class);
record.setDeptName(Func.isNotEmpty(record.getDeptName()) ? record.getDeptName() : this.getDeptNameById(record.getDeptId()));
record.setMessageId(messageId);
record.setPusher(userId.toString());
record.setPusherName(Optional.ofNullable(UserCache.getUser(userId)).map(User::getName).orElse(null));
record.setPushType(MessageConstants.IMMEDIATELY);
record.setAccount(userId.toString());
record.setPlanTime(LocalDateTime.now());
record.setType(messageType);
record.setCreateDept(record.getDeptId());
record.setStatus(MessageConstants.NOT_PUSH);
return record;
});
}).collect(Collectors.toList());
if(this.saveBatch(pushRecords)) {
// 推送消息
WebsocketServiceImpl wsMessageService = SpringUtil.getBean(WebsocketServiceImpl.class);
pushRecords.forEach(record -> {
if(MessageConstants.APP_PUSH.equals(record.getType())) {
// APP消息存入redis中进行消费
redisTemplate.opsForList().leftPush(appPushRedisKey.concat(":").concat(record.getPusher()), record);
} else if (MessageConstants.WS_PUSH.equals(record.getType())) {
// WEB消息调用接口直接进行推送
if(wsMessageService.send(record)) {
record.setPushTime(LocalDateTime.now());
record.setStatus(MessageConstants.PUSH_SUCCESS);
} }
else {
record.setStatus(MessageConstants.PUSH_FAILED);
} }
this.updateById(record);
}
});
return true; return true;
} }
return false;
}
/**
* 根据机构ID获取机构名称
* @param deptId 机构ID
* @return 机构名称
*/
private String getDeptNameById(Long deptId) {
return Optional.ofNullable(sysClient.getDeptName(deptId)).filter(r -> r.isSuccess()).map(R::getData).orElse(null);
}
/** /**
* 推送成功失败统计 * 推送成功失败统计
@ -408,40 +529,6 @@ public class MessagePushRecordServiceImpl extends BaseServiceImpl<MessagePushRec
} }
@Override @Override
public Boolean sendAppMsgByUsers(BusinessMessageDTO request, List<MessagePushRecordEntity> records) {
PushMessageServiceImpl service = SpringUtil.getBean(PushMessageServiceImpl.class);
try {
boolean sendFlag = service.sendByUsers(request.getSubject(), request.getContent(), Arrays.asList(request.getUserIds().split(",")), request.getTenantId());
if (sendFlag) {
return this.update(Wrappers.<MessagePushRecordEntity>lambdaUpdate()
.set(MessagePushRecordEntity::getPushTime, LocalDateTime.now())
.set(MessagePushRecordEntity::getStatus, MessageConstants.PUSH_SUCCESS)
.in(MessagePushRecordEntity::getId, records.stream().map(MessagePushRecordEntity::getId).collect(Collectors.toList()))
);
}
return false;
} catch (Exception e) {
throw new ServiceException(e.getMessage());
}
}
@Override
public Boolean sendWsMsgByUsers(List<MessagePushRecordEntity> request) {
WebsocketServiceImpl service = SpringUtil.getBean(WebsocketServiceImpl.class);
request.forEach(record -> {
boolean sendFlag = service.send(record);
if (sendFlag) {
this.update(Wrappers.<MessagePushRecordEntity>lambdaUpdate()
.set(MessagePushRecordEntity::getPushTime, LocalDateTime.now())
.set(MessagePushRecordEntity::getStatus, MessageConstants.PUSH_SUCCESS)
.in(MessagePushRecordEntity::getId, record.getId())
);
}
});
return true;
}
@Override
public Boolean sendMailMsgByUsers(List<MessagePushRecordEntity> request) { public Boolean sendMailMsgByUsers(List<MessagePushRecordEntity> request) {
MailMessageServiceImpl service = SpringUtil.getBean(MailMessageServiceImpl.class); MailMessageServiceImpl service = SpringUtil.getBean(MailMessageServiceImpl.class);
request.forEach(record -> { request.forEach(record -> {

56
hzims-service/message/src/main/java/com/hnac/hzims/message/service/impl/PushMessageServiceImpl.java

@ -33,6 +33,7 @@ import org.springframework.util.Assert;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -99,13 +100,6 @@ public class PushMessageServiceImpl implements IMessageService {
// App消息推送因腾讯云推送限制同一标签一段时间内无法推送多条消息,现将app推送改造成由redis定时推送 // App消息推送因腾讯云推送限制同一标签一段时间内无法推送多条消息,现将app推送改造成由redis定时推送
Long pushResult = redisTemplate.opsForList().leftPush(appPushRedisKey.concat(":").concat(request.getPusher()), request); Long pushResult = redisTemplate.opsForList().leftPush(appPushRedisKey.concat(":").concat(request.getPusher()), request);
return Func.isNotEmpty(pushResult); return Func.isNotEmpty(pushResult);
// String tenantId = Func.isNotEmpty(AuthUtil.getTenantId()) ? AuthUtil.getTenantId() : request.getTenantId();
// try {
// return this.sendByUsers(request.getSubject(), request.getContent(), Func.toStrList(",",request.getPusher()),tenantId);
// }
// catch (Exception e) {
// throw new ServiceException(e.getMessage());
// }
} }
/** /**
@ -116,33 +110,51 @@ public class PushMessageServiceImpl implements IMessageService {
* @param tenantId 租户ID * @param tenantId 租户ID
* @return * @return
*/ */
public boolean sendByUsers(String subject, String content, List<String> userIds,String tenantId) throws Exception { // public boolean sendByUsers(String subject, String content, List<String> userIds,String tenantId) throws Exception {
// 安卓推送 // // 安卓推送
FutureTask<Boolean> androidPush = new FutureTask<>(() -> { // FutureTask<Boolean> androidPush = new FutureTask<>(() -> this.sendAndroidMsg(subject,content,userIds,tenantId));
// appMessagePushExecutor.submit(new Thread(androidPush,"安卓推送"));
//
// // IOS推送
// FutureTask<Boolean> iosPush = new FutureTask<>(() -> this.sendIOSMsg(subject,content,userIds,tenantId));
// appMessagePushExecutor.submit(new Thread(iosPush,"ios推送"));
//
// return androidPush.get() && iosPush.get();
// }
/**
* 发送APP安卓消息
* @param subject 消息主题
* @param content 消息内容
* @param userIds 发送人员
* @param tenantId 租户ID
* @return 推送结果
*/
public R<PushResponse> sendAndroidMsg(String subject,String content,List<String> userIds,String tenantId) {
PushInfoVO pushInfoVO = new PushInfoVO(); PushInfoVO pushInfoVO = new PushInfoVO();
PushPlatform pushPlatform = PushPlatform.newBuilder().addPlatformType(PlatformType.Android).build(); PushPlatform pushPlatform = PushPlatform.newBuilder().addPlatformType(PlatformType.Android).build();
PushAudience pushAudience = PushAudience.newBuilder().addPushAudienceType(PushAudienceType.TAG, userIds).build(); PushAudience pushAudience = PushAudience.newBuilder().addPushAudienceType(PushAudienceType.TAG, userIds).build();
PushInfo pushInfo = new PushInfo("ops-push-android", subject, content,"", null, pushPlatform, pushAudience); PushInfo pushInfo = new PushInfo("ops-push-android", subject, content,"", null, pushPlatform, pushAudience);
pushInfoVO.setPushInfo(pushInfo); pushInfoVO.setPushInfo(pushInfo);
pushInfoVO.setTenantId(tenantId); pushInfoVO.setTenantId(tenantId);
R<PushResponse> pushResult = pushClient.tenantPush(pushInfoVO); return pushClient.tenantPush(pushInfoVO);
return pushResult.isSuccess(); }
});
appMessagePushExecutor.submit(new Thread(androidPush,"安卓推送"));
// IOS推送 /**
FutureTask<Boolean> iosPush = new FutureTask<>(() -> { * 发送IOS安卓消息
* @param subject 消息主题
* @param content 消息内容
* @param userIds 发送人员
* @param tenantId 租户ID
* @return 推送结果
*/
public R<PushResponse> sendIOSMsg(String subject,String content,List<String> userIds,String tenantId) {
PushInfoVO pushInfoVO = new PushInfoVO(); PushInfoVO pushInfoVO = new PushInfoVO();
PushPlatform pushPlatform = PushPlatform.newBuilder().addPlatformType(PlatformType.IOS).build(); PushPlatform pushPlatform = PushPlatform.newBuilder().addPlatformType(PlatformType.IOS).build();
PushAudience pushAudience = PushAudience.newBuilder().addPushAudienceType(PushAudienceType.TAG, userIds).build(); PushAudience pushAudience = PushAudience.newBuilder().addPushAudienceType(PushAudienceType.TAG, userIds).build();
PushInfo pushInfo = new PushInfo("ops-push-ios", subject, content,"", null, pushPlatform, pushAudience); PushInfo pushInfo = new PushInfo("ops-push-ios", subject, content,"", null, pushPlatform, pushAudience);
pushInfoVO.setPushInfo(pushInfo); pushInfoVO.setPushInfo(pushInfo);
pushInfoVO.setTenantId(tenantId); pushInfoVO.setTenantId(tenantId);
R<PushResponse> pushResult = pushClient.tenantPush(pushInfoVO); return pushClient.tenantPush(pushInfoVO);
return pushResult.isSuccess();
});
appMessagePushExecutor.submit(new Thread(iosPush,"ios推送"));
return androidPush.get() && iosPush.get();
} }
} }

9
hzims-service/message/src/main/java/com/hnac/hzims/message/service/impl/SmsMessageServiceImpl.java

@ -66,4 +66,13 @@ public class SmsMessageServiceImpl implements IMessageService {
return response.getData().isSuccess(); return response.getData().isSuccess();
} }
} }
public R sendSmsByRecord(MessagePushRecordEntity request) {
return smsClient.sendMessage(
Func.isNotEmpty(AuthUtil.getTenantId()) ? AuthUtil.getTenantId() : request.getTenantId(),
request.getResourceCode(),
request.getSmsParam(),
this.getAccountByPusher(Long.parseLong(request.getPusher()),request.getType())
);
}
} }

2
hzims-service/message/src/main/resources/db/1.0.1.sql

@ -0,0 +1,2 @@
alter table `hzims_message_push_record` add column `fault_result` varchar(1000) comment '推送失败信息';
alter table `hzims_message_push_record` MODIFY `TASK_ID` BIGINT(20) NULL;
Loading…
Cancel
Save