redssion move

This commit is contained in:
hupeng
2019-12-08 20:24:36 +08:00
parent 907c0e7d97
commit 8dcfbd0a7a
5 changed files with 4 additions and 19 deletions

View File

@ -1,34 +0,0 @@
package co.yixiang.redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Created by kl on 2018/7/20.
* Content :订单延时job服务
*/
@Component
public class DelayJobService {
@Autowired
private RedissonClient client;
/**
* 添加超时任务到redis队列
* @param job 任务
* @param delay 超时时间
*/
public void submitJob(Map<String,Object> job, Long delay){
RQueue<Map<String,Object>> blockingQueue = client.getQueue(JobTimer.CUSTOMER_JOB_TIMER_JOBS);
RDelayedQueue<Map<String,Object>> delayedQueue = client.getDelayedQueue(blockingQueue);
delayedQueue.offer(job,delay,TimeUnit.MILLISECONDS);
delayedQueue.destroy();
}
}

View File

@ -1,12 +0,0 @@
package co.yixiang.redisson;
import java.util.Map;
/**
* Created by kl on 2018/7/20.
* Content :延时job执行器接口
*/
public interface ExecuteJob {
void execute(Map<String,Object> job);
}

View File

@ -1,64 +0,0 @@
package co.yixiang.redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 消费已经到点的延时job服务通过job参数调用业务执行器实现
*/
@Component
public class JobTimer {
static final String CUSTOMER_JOB_TIMER_JOBS = "customer_job_timer_jobs";
@Autowired
private RedissonClient client;
@Autowired
private ExecuteJob service;
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
@PostConstruct
public void startJobTimer() {
RBlockingQueue<Map<String,Object>> blockingQueue = client.getBlockingQueue(CUSTOMER_JOB_TIMER_JOBS);
RDelayedQueue<Map<String,Object>> delayedQueue = client.getDelayedQueue(blockingQueue);
new Thread(() -> {
while (true) {
try {
Map<String,Object> job = blockingQueue.take();
executorService.execute(new ExecutorTask(job));
} catch (Exception e) {
e.printStackTrace();
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception ex) {
}
}finally {
delayedQueue.destroy();
}
}
}).start();
}
class ExecutorTask implements Runnable {
private Map<String,Object> delayJob;
public ExecutorTask(Map<String,Object> delayJob) {
this.delayJob = delayJob;
}
@Override
public void run() {
service.execute(delayJob);
}
}
}