新增rocketmq,原有的订单延时队列修改为mq,移除redisson

This commit is contained in:
hupeng
2020-01-02 10:48:29 +08:00
parent a65577bfd4
commit f5a5f2d5a5
12 changed files with 125 additions and 178 deletions

20
pom.xml
View File

@ -74,6 +74,7 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--Spring boot end-->
<!--spring2.0集成redis所需common-pool2-->
@ -209,21 +210,10 @@
<version>3.11.2</version>
</dependency>
<!--webservice-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web-services</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.cxf</groupId>-->
<!-- <artifactId>cxf-rt-frontend-jaxws</artifactId>-->
<!-- <version>3.1.6</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.cxf</groupId>-->
<!-- <artifactId>cxf-rt-transports-http</artifactId>-->
<!-- <version>3.1.6</version>-->
<!-- </dependency>-->
</dependencies>

View File

@ -83,6 +83,11 @@
<artifactId>emoji-java</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,48 @@
package co.yixiang.common.rocketmq;
import cn.hutool.core.util.ObjectUtil;
import co.yixiang.modules.order.entity.YxStoreOrder;
import co.yixiang.modules.order.service.YxStoreOrderService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName 消费者
* @Author hupeng <610796224@qq.com>
* @Date 2020/1/1
**/
@Component
@RocketMQMessageListener(
topic = "yshop-topic",
consumerGroup = "yshop-group",
selectorExpression = "*"
)
@Slf4j
public class MqConsumer implements RocketMQListener<String> {
@Autowired
private YxStoreOrderService storeOrderService;
@Override
public void onMessage(String msg) {
log.info("系统开始处理延时任务---订单超时未付款---订单id:" + msg);
Integer id = Integer.valueOf(msg);
YxStoreOrder order = storeOrderService.getOne(new QueryWrapper<YxStoreOrder>()
.eq("id", id).eq("is_del",0).eq("paid",0));
if(ObjectUtil.isNull(order)) {
return;
}
storeOrderService.cancelOrderByTask(id);
log.info("=====处理成功======");
}
}

View File

@ -0,0 +1,49 @@
package co.yixiang.common.rocketmq;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName 生成者
* @Author hupeng <610796224@qq.com>
* @Date 2020/1/1
**/
@Component
public class MqProducer {
//注入rocketMQ的模板
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延时消息10分钟
*
* @param topic
* @param msg
*/
public void sendMsg(String topic, String msg) {
DefaultMQProducer defaultMQProducer = rocketMQTemplate.getProducer();
Message message = new Message(topic,msg.getBytes());
message.setDelayTimeLevel(14);
try {
defaultMQProducer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -5,7 +5,7 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import co.yixiang.common.constant.CommonConstant;
import co.yixiang.common.rocketmq.MqProducer;
import co.yixiang.common.service.impl.BaseServiceImpl;
import co.yixiang.common.web.vo.Paging;
import co.yixiang.domain.AlipayConfig;
@ -42,7 +42,6 @@ import co.yixiang.modules.shop.service.YxStoreProductReplyService;
import co.yixiang.modules.shop.service.YxStoreProductService;
import co.yixiang.modules.shop.service.YxSystemConfigService;
import co.yixiang.modules.shop.web.vo.YxStoreCartQueryVo;
import co.yixiang.modules.task.DelayJobService;
import co.yixiang.modules.user.entity.YxUser;
import co.yixiang.modules.user.entity.YxUserBill;
import co.yixiang.modules.user.entity.YxWechatUser;
@ -145,8 +144,6 @@ public class YxStoreOrderServiceImpl extends BaseServiceImpl<YxStoreOrderMapper,
@Autowired
private YxStoreCouponUserMapper yxStoreCouponUserMapper;
@Autowired
private DelayJobService delayJobService;
@Autowired
private YxStoreCombinationService combinationService;
@ -178,6 +175,9 @@ public class YxStoreOrderServiceImpl extends BaseServiceImpl<YxStoreOrderMapper,
@Autowired
private Sid sid;
@Autowired
private MqProducer mqProducer;
/**
* 订单退款
@ -1361,13 +1361,10 @@ public class YxStoreOrderServiceImpl extends BaseServiceImpl<YxStoreOrderMapper,
orderStatusService.create(storeOrder.getId(),"cache_key_create_order","订单生成");
// 添加订单支付超期延时任务
// 添加订单支付超期延时任务
Map<String,Object> delayJob = new HashMap<>();
delayJob.put("delayJobName","CANCEL_ORVERTIME_ORDER");
delayJob.put("orderId",storeOrder.getId());
delayJobService.submitJob(delayJob, CommonConstant.ORDER_OUTTIME_UNPAY);
log.info("添加定时任务成功 订单id [{}]", storeOrder.getId());
//使用MQ延时消息
mqProducer.sendMsg("yshop-topic",storeOrder.getId().toString());
log.info("投递延时订单id [{}]", storeOrder.getId());
return storeOrder;
}

View File

@ -32,10 +32,13 @@ public class IndexController {
private final YxSystemConfigService systemConfigService;
private final YxStoreProductService storeProductService;
@GetMapping("/index")
@ApiOperation(value = "首页数据",notes = "首页数据")
public ApiResult<Map<String,Object>> index(){
Map<String,Object> map = new LinkedHashMap<>();
//banner
map.put("banner",systemGroupDataService.getDatas("routine_home_banner"));

View File

@ -1,41 +0,0 @@
package co.yixiang.modules.task;
import cn.hutool.core.util.ObjectUtil;
import co.yixiang.modules.order.entity.YxStoreOrder;
import co.yixiang.modules.order.service.YxStoreOrderService;
import co.yixiang.utils.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
public class CancelOrderService implements ExecuteJob {
@Override
public void execute(Map<String,Object> job) {
if(job.get("delayJobName").equals("CANCEL_ORVERTIME_ORDER")){
log.info("系统开始处理延时任务---订单超时未付款---" + job.get("orderId"));
YxStoreOrderService yxorderService = BeanUtil.getBean(YxStoreOrderService.class);
YxStoreOrder order = null;
try {
order = yxorderService.getOne(new QueryWrapper<YxStoreOrder>().eq("id", job.get("orderId")).eq("is_del",0));
} catch (Exception e) {
e.printStackTrace();
}
if(ObjectUtil.isNull(order)) {
return;
}
if(order.getPaid() != 0){
return;
}
yxorderService.cancelOrderByTask((int)job.get("orderId"));
log.info("系统结束处理延时任务---订单超时未付款取消---" + job.get("orderId"));
}
}
}

View File

@ -1,33 +0,0 @@
package co.yixiang.modules.task;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Created by kl on 2018/7/20.
* Content :订单延时job服务
*/
@Component
public class DelayJobService {
@Autowired
private RedissonClient client;
/**
* 添加超时任务到redis队列
* @param job 任务
* @param delay 超时时间
*/
public void submitJob(Map<String,Object> job, Long delay){
RQueue<Map<String,Object>> blockingQueue = client.getQueue(JobTimer.CUSTOMER_JOB_TIMER_JOBS);
RDelayedQueue<Map<String,Object>> delayedQueue = client.getDelayedQueue(blockingQueue);
delayedQueue.offer(job,delay,TimeUnit.MILLISECONDS);
delayedQueue.destroy();
}
}

View File

@ -1,12 +0,0 @@
package co.yixiang.modules.task;
import java.util.Map;
/**
* Created by kl on 2018/7/20.
* Content :延时job执行器接口
*/
public interface ExecuteJob {
void execute(Map<String,Object> job);
}

View File

@ -1,66 +0,0 @@
package co.yixiang.modules.task;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 消费已经到点的延时job服务通过job参数调用业务执行器实现
*/
@Component
public class JobTimer {
static final String CUSTOMER_JOB_TIMER_JOBS = "customer_job_timer_jobs";
@Autowired
private RedissonClient client;
@Autowired
private ExecuteJob service;
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
@PostConstruct
public void startJobTimer() {
RBlockingQueue<Map<String,Object>> blockingQueue = client.getBlockingQueue(CUSTOMER_JOB_TIMER_JOBS);
RDelayedQueue<Map<String,Object>> delayedQueue = client.getDelayedQueue(blockingQueue);
new Thread(() -> {
while (true) {
try {
Map<String,Object> job = blockingQueue.take();
if(job != null){
executorService.execute(new ExecutorTask(job));
}
} catch (Exception e) {
e.printStackTrace();
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception ex) {
}
}finally {
delayedQueue.destroy();
}
}
}).start();
}
class ExecutorTask implements Runnable {
private Map<String,Object> delayJob;
public ExecutorTask(Map<String,Object> delayJob) {
this.delayJob = delayJob;
}
@Override
public void run() {
service.execute(delayJob);
}
}
}

View File

@ -321,6 +321,7 @@ public class WechatController extends BaseController {
WxMpXmlMessage inMessage = WxMpXmlMessage.fromEncryptedXml(requestBody, wxService.getWxMpConfigStorage(),
timestamp, nonce, msgSignature);
WxMpXmlOutMessage outMessage = this.route(inMessage);
if(outMessage == null) return;
out = outMessage.toEncryptedXml(wxService.getWxMpConfigStorage());
}

View File

@ -22,6 +22,12 @@ spring:
redis:
repositories:
enabled: false
#配置rocketmq
rocketmq:
nameServer: localhost:9876
producer:
group: yshop-group
sendMessageTimeout: 300000
#配置 Jpa
jpa: