diff --git a/pom.xml b/pom.xml index 407844f8..ee91f6fa 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ org.springframework.boot spring-boot-starter-data-redis + @@ -209,21 +210,10 @@ 3.11.2 - - - - - - - - - - - - - - - + + + + diff --git a/yshop-api/pom.xml b/yshop-api/pom.xml index e6b05b25..af403cbb 100644 --- a/yshop-api/pom.xml +++ b/yshop-api/pom.xml @@ -83,6 +83,11 @@ emoji-java 4.0.0 + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.0.4 + diff --git a/yshop-api/src/main/java/co/yixiang/common/rocketmq/MqConsumer.java b/yshop-api/src/main/java/co/yixiang/common/rocketmq/MqConsumer.java new file mode 100644 index 00000000..16cfba4b --- /dev/null +++ b/yshop-api/src/main/java/co/yixiang/common/rocketmq/MqConsumer.java @@ -0,0 +1,48 @@ +package co.yixiang.common.rocketmq; + +import cn.hutool.core.util.ObjectUtil; +import co.yixiang.modules.order.entity.YxStoreOrder; +import co.yixiang.modules.order.service.YxStoreOrderService; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @ClassName 消费者 + * @Author hupeng <610796224@qq.com> + * @Date 2020/1/1 + **/ +@Component +@RocketMQMessageListener( + topic = "yshop-topic", + consumerGroup = "yshop-group", + selectorExpression = "*" +) +@Slf4j +public class MqConsumer implements RocketMQListener { + + @Autowired + private YxStoreOrderService storeOrderService; + + @Override + public void onMessage(String msg) { + log.info("系统开始处理延时任务---订单超时未付款---订单id:" + msg); + + Integer id = Integer.valueOf(msg); + + YxStoreOrder order = storeOrderService.getOne(new QueryWrapper() + .eq("id", id).eq("is_del",0).eq("paid",0)); + + if(ObjectUtil.isNull(order)) { + return; + } + + storeOrderService.cancelOrderByTask(id); + + log.info("=====处理成功======"); + + } +} diff --git a/yshop-api/src/main/java/co/yixiang/common/rocketmq/MqProducer.java b/yshop-api/src/main/java/co/yixiang/common/rocketmq/MqProducer.java new file mode 100644 index 00000000..8e8faa9a --- /dev/null +++ b/yshop-api/src/main/java/co/yixiang/common/rocketmq/MqProducer.java @@ -0,0 +1,49 @@ +package co.yixiang.common.rocketmq; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @ClassName 生成者 + * @Author hupeng <610796224@qq.com> + * @Date 2020/1/1 + **/ +@Component +public class MqProducer { + //注入rocketMQ的模板 + @Autowired + private RocketMQTemplate rocketMQTemplate; + + + /** + * 发送延时消息10分钟 + * + * @param topic + * @param msg + */ + public void sendMsg(String topic, String msg) { + DefaultMQProducer defaultMQProducer = rocketMQTemplate.getProducer(); + + Message message = new Message(topic,msg.getBytes()); + message.setDelayTimeLevel(14); + + try { + defaultMQProducer.send(message); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +} diff --git a/yshop-api/src/main/java/co/yixiang/modules/order/service/impl/YxStoreOrderServiceImpl.java b/yshop-api/src/main/java/co/yixiang/modules/order/service/impl/YxStoreOrderServiceImpl.java index e2546736..02969720 100644 --- a/yshop-api/src/main/java/co/yixiang/modules/order/service/impl/YxStoreOrderServiceImpl.java +++ b/yshop-api/src/main/java/co/yixiang/modules/order/service/impl/YxStoreOrderServiceImpl.java @@ -5,7 +5,7 @@ import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import co.yixiang.common.constant.CommonConstant; +import co.yixiang.common.rocketmq.MqProducer; import co.yixiang.common.service.impl.BaseServiceImpl; import co.yixiang.common.web.vo.Paging; import co.yixiang.domain.AlipayConfig; @@ -42,7 +42,6 @@ import co.yixiang.modules.shop.service.YxStoreProductReplyService; import co.yixiang.modules.shop.service.YxStoreProductService; import co.yixiang.modules.shop.service.YxSystemConfigService; import co.yixiang.modules.shop.web.vo.YxStoreCartQueryVo; -import co.yixiang.modules.task.DelayJobService; import co.yixiang.modules.user.entity.YxUser; import co.yixiang.modules.user.entity.YxUserBill; import co.yixiang.modules.user.entity.YxWechatUser; @@ -145,8 +144,6 @@ public class YxStoreOrderServiceImpl extends BaseServiceImpl delayJob = new HashMap<>(); - delayJob.put("delayJobName","CANCEL_ORVERTIME_ORDER"); - delayJob.put("orderId",storeOrder.getId()); - delayJobService.submitJob(delayJob, CommonConstant.ORDER_OUTTIME_UNPAY); - log.info("添加定时任务成功 订单id: [{}]:", storeOrder.getId()); + //使用MQ延时消息 + mqProducer.sendMsg("yshop-topic",storeOrder.getId().toString()); + log.info("投递延时订单id: [{}]:", storeOrder.getId()); + return storeOrder; } diff --git a/yshop-api/src/main/java/co/yixiang/modules/shop/web/controller/IndexController.java b/yshop-api/src/main/java/co/yixiang/modules/shop/web/controller/IndexController.java index 39987bc7..fd3a8ba1 100644 --- a/yshop-api/src/main/java/co/yixiang/modules/shop/web/controller/IndexController.java +++ b/yshop-api/src/main/java/co/yixiang/modules/shop/web/controller/IndexController.java @@ -32,10 +32,13 @@ public class IndexController { private final YxSystemConfigService systemConfigService; private final YxStoreProductService storeProductService; + + @GetMapping("/index") @ApiOperation(value = "首页数据",notes = "首页数据") public ApiResult> index(){ + Map map = new LinkedHashMap<>(); //banner map.put("banner",systemGroupDataService.getDatas("routine_home_banner")); diff --git a/yshop-api/src/main/java/co/yixiang/modules/task/CancelOrderService.java b/yshop-api/src/main/java/co/yixiang/modules/task/CancelOrderService.java deleted file mode 100644 index 01c7e15f..00000000 --- a/yshop-api/src/main/java/co/yixiang/modules/task/CancelOrderService.java +++ /dev/null @@ -1,41 +0,0 @@ -package co.yixiang.modules.task; - -import cn.hutool.core.util.ObjectUtil; -import co.yixiang.modules.order.entity.YxStoreOrder; -import co.yixiang.modules.order.service.YxStoreOrderService; -import co.yixiang.utils.BeanUtil; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.Map; - - -@Component -@Slf4j -public class CancelOrderService implements ExecuteJob { - @Override - public void execute(Map job) { - if(job.get("delayJobName").equals("CANCEL_ORVERTIME_ORDER")){ - log.info("系统开始处理延时任务---订单超时未付款---" + job.get("orderId")); - - YxStoreOrderService yxorderService = BeanUtil.getBean(YxStoreOrderService.class); - - YxStoreOrder order = null; - try { - order = yxorderService.getOne(new QueryWrapper().eq("id", job.get("orderId")).eq("is_del",0)); - } catch (Exception e) { - e.printStackTrace(); - } - if(ObjectUtil.isNull(order)) { - return; - } - if(order.getPaid() != 0){ - return; - } - yxorderService.cancelOrderByTask((int)job.get("orderId")); - log.info("系统结束处理延时任务---订单超时未付款取消---" + job.get("orderId")); - } - - } -} diff --git a/yshop-api/src/main/java/co/yixiang/modules/task/DelayJobService.java b/yshop-api/src/main/java/co/yixiang/modules/task/DelayJobService.java deleted file mode 100644 index 30d21471..00000000 --- a/yshop-api/src/main/java/co/yixiang/modules/task/DelayJobService.java +++ /dev/null @@ -1,33 +0,0 @@ -package co.yixiang.modules.task; - -import org.redisson.api.RDelayedQueue; -import org.redisson.api.RQueue; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * Created by kl on 2018/7/20. - * Content :订单延时job服务 - */ -@Component -public class DelayJobService { - - @Autowired - private RedissonClient client; - - /** - * 添加超时任务到redis队列 - * @param job 任务 - * @param delay 超时时间 - */ - public void submitJob(Map job, Long delay){ - RQueue> blockingQueue = client.getQueue(JobTimer.CUSTOMER_JOB_TIMER_JOBS); - RDelayedQueue> delayedQueue = client.getDelayedQueue(blockingQueue); - delayedQueue.offer(job,delay,TimeUnit.MILLISECONDS); - delayedQueue.destroy(); - } -} diff --git a/yshop-api/src/main/java/co/yixiang/modules/task/ExecuteJob.java b/yshop-api/src/main/java/co/yixiang/modules/task/ExecuteJob.java deleted file mode 100644 index e8fb9d14..00000000 --- a/yshop-api/src/main/java/co/yixiang/modules/task/ExecuteJob.java +++ /dev/null @@ -1,12 +0,0 @@ -package co.yixiang.modules.task; - -import java.util.Map; - -/** - * Created by kl on 2018/7/20. - * Content :延时job执行器接口 - */ -public interface ExecuteJob { - - void execute(Map job); -} diff --git a/yshop-api/src/main/java/co/yixiang/modules/task/JobTimer.java b/yshop-api/src/main/java/co/yixiang/modules/task/JobTimer.java deleted file mode 100644 index 0f31ea00..00000000 --- a/yshop-api/src/main/java/co/yixiang/modules/task/JobTimer.java +++ /dev/null @@ -1,66 +0,0 @@ -package co.yixiang.modules.task; - -import org.redisson.api.RBlockingQueue; -import org.redisson.api.RDelayedQueue; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * 消费已经到点的延时job服务,通过job参数调用业务执行器实现 - */ -@Component -public class JobTimer { - - static final String CUSTOMER_JOB_TIMER_JOBS = "customer_job_timer_jobs"; - @Autowired - private RedissonClient client; - - @Autowired - private ExecuteJob service; - - ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); - @PostConstruct - public void startJobTimer() { - RBlockingQueue> blockingQueue = client.getBlockingQueue(CUSTOMER_JOB_TIMER_JOBS); - RDelayedQueue> delayedQueue = client.getDelayedQueue(blockingQueue); - new Thread(() -> { - while (true) { - try { - Map job = blockingQueue.take(); - if(job != null){ - executorService.execute(new ExecutorTask(job)); - } - } catch (Exception e) { - e.printStackTrace(); - try { - TimeUnit.SECONDS.sleep(60); - } catch (Exception ex) { - - } - }finally { - delayedQueue.destroy(); - } - } - }).start(); - } - class ExecutorTask implements Runnable { - - - private Map delayJob; - - public ExecutorTask(Map delayJob) { - this.delayJob = delayJob; - } - @Override - public void run() { - service.execute(delayJob); - } - } -} diff --git a/yshop-api/src/main/java/co/yixiang/modules/wechat/web/controller/WechatController.java b/yshop-api/src/main/java/co/yixiang/modules/wechat/web/controller/WechatController.java index 7842e5db..01ad4248 100644 --- a/yshop-api/src/main/java/co/yixiang/modules/wechat/web/controller/WechatController.java +++ b/yshop-api/src/main/java/co/yixiang/modules/wechat/web/controller/WechatController.java @@ -321,6 +321,7 @@ public class WechatController extends BaseController { WxMpXmlMessage inMessage = WxMpXmlMessage.fromEncryptedXml(requestBody, wxService.getWxMpConfigStorage(), timestamp, nonce, msgSignature); WxMpXmlOutMessage outMessage = this.route(inMessage); + if(outMessage == null) return; out = outMessage.toEncryptedXml(wxService.getWxMpConfigStorage()); } diff --git a/yshop-api/src/main/resources/config/application.yml b/yshop-api/src/main/resources/config/application.yml index 0d860c47..2cb344ee 100644 --- a/yshop-api/src/main/resources/config/application.yml +++ b/yshop-api/src/main/resources/config/application.yml @@ -22,6 +22,12 @@ spring: redis: repositories: enabled: false +#配置rocketmq +rocketmq: + nameServer: localhost:9876 + producer: + group: yshop-group + sendMessageTimeout: 300000 #配置 Jpa jpa: