yshop-pro init
This commit is contained in:
26
yshop-framework/yshop-spring-boot-starter-mq/pom.xml
Normal file
26
yshop-framework/yshop-spring-boot-starter-mq/pom.xml
Normal file
@ -0,0 +1,26 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>co.yixiang.boot</groupId>
|
||||
<artifactId>yshop-framework</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>yshop-spring-boot-starter-mq</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费</description>
|
||||
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
|
||||
|
||||
<dependencies>
|
||||
<!-- DB 相关 -->
|
||||
<dependency>
|
||||
<groupId>co.yixiang.boot</groupId>
|
||||
<artifactId>yshop-spring-boot-starter-redis</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,162 @@
|
||||
package co.yixiang.yshop.framework.mq.config;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import co.yixiang.yshop.framework.common.enums.DocumentEnum;
|
||||
import co.yixiang.yshop.framework.mq.core.RedisMQTemplate;
|
||||
import co.yixiang.yshop.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import co.yixiang.yshop.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
||||
import co.yixiang.yshop.framework.mq.core.stream.AbstractStreamMessageListener;
|
||||
import co.yixiang.yshop.framework.mq.job.RedisPendingMessageResendJob;
|
||||
import co.yixiang.yshop.framework.redis.config.YshopRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 消息队列配置类
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
@Slf4j
|
||||
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
|
||||
@AutoConfiguration(after = YshopRedisAutoConfiguration.class)
|
||||
public class YshopMQAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
||||
List<RedisMessageInterceptor> interceptors) {
|
||||
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
|
||||
// 添加拦截器
|
||||
interceptors.forEach(redisMQTemplate::addInterceptor);
|
||||
return redisMQTemplate;
|
||||
}
|
||||
|
||||
// ========== 消费者相关 ==========
|
||||
|
||||
/**
|
||||
* 创建 Redis Pub/Sub 广播消费的容器
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
||||
// 创建 RedisMessageListenerContainer 对象
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
// 设置 RedisConnection 工厂。
|
||||
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
|
||||
// 添加监听器
|
||||
listeners.forEach(listener -> {
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
||||
listener.getChannel(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 重新消费的任务
|
||||
*/
|
||||
@Bean
|
||||
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners,
|
||||
RedisMQTemplate redisTemplate,
|
||||
@Value("${spring.application.name}") String groupName,
|
||||
RedissonClient redissonClient) {
|
||||
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
* <p>
|
||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
||||
checkRedisVersion(redisTemplate);
|
||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
||||
// 创建 options 配置
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.batchSize(10) // 一次性最多拉取多少条消息
|
||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
||||
// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
|
||||
DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
||||
listeners.parallelStream().forEach(listener -> {
|
||||
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
|
||||
listener.getStreamKey(), listener.getClass().getName());
|
||||
// 创建 listener 对应的消费者分组
|
||||
try {
|
||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
// 设置 listener 对应的 redisTemplate
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
// 创建 Consumer 对象
|
||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
||||
// 设置 Consumer 监听
|
||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
||||
.builder(streamOffset).consumer(consumer)
|
||||
.autoAcknowledge(false) // 不自动 ack
|
||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||
container.register(builder.build(), listener);
|
||||
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
|
||||
listener.getStreamKey(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
||||
* 参考自 RocketMQ clientId 的实现
|
||||
*
|
||||
* @return 消费者名字
|
||||
*/
|
||||
private static String buildConsumerName() {
|
||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验 Redis 版本号,是否满足最低的版本号要求!
|
||||
*/
|
||||
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
|
||||
// 获得 Redis 版本
|
||||
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
|
||||
String version = MapUtil.getStr(info, "redis_version");
|
||||
// 校验最低版本必须大于等于 5.0.0
|
||||
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
|
||||
if (majorVersion < 5) {
|
||||
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
|
||||
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
package co.yixiang.yshop.framework.mq.core;
|
||||
|
||||
import co.yixiang.yshop.framework.common.util.json.JsonUtils;
|
||||
import co.yixiang.yshop.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import co.yixiang.yshop.framework.mq.core.message.AbstractRedisMessage;
|
||||
import co.yixiang.yshop.framework.mq.core.pubsub.AbstractChannelMessage;
|
||||
import co.yixiang.yshop.framework.mq.core.stream.AbstractStreamMessage;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis MQ 操作模板类
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class RedisMQTemplate {
|
||||
|
||||
@Getter
|
||||
private final RedisTemplate<String, ?> redisTemplate;
|
||||
/**
|
||||
* 拦截器数组
|
||||
*/
|
||||
@Getter
|
||||
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public <T extends AbstractChannelMessage> void send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public <T extends AbstractStreamMessage> RecordId send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加拦截器
|
||||
*
|
||||
* @param interceptor 拦截器
|
||||
*/
|
||||
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
|
||||
private void sendMessageBefore(AbstractRedisMessage message) {
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
||||
}
|
||||
|
||||
private void sendMessageAfter(AbstractRedisMessage message) {
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package co.yixiang.yshop.framework.mq.core.interceptor;
|
||||
|
||||
import co.yixiang.yshop.framework.mq.core.message.AbstractRedisMessage;
|
||||
|
||||
/**
|
||||
* {@link AbstractRedisMessage} 消息拦截器
|
||||
* 通过拦截器,作为插件机制,实现拓展。
|
||||
* 例如说,多租户场景下的 MQ 消息处理
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
public interface RedisMessageInterceptor {
|
||||
|
||||
default void sendMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void sendMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package co.yixiang.yshop.framework.mq.core.message;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Redis 消息抽象基类
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
@Data
|
||||
public abstract class AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 头
|
||||
*/
|
||||
private Map<String, String> headers = new HashMap<>();
|
||||
|
||||
public String getHeader(String key) {
|
||||
return headers.get(key);
|
||||
}
|
||||
|
||||
public void addHeader(String key, String value) {
|
||||
headers.put(key, value);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package co.yixiang.yshop.framework.mq.core.pubsub;
|
||||
|
||||
import co.yixiang.yshop.framework.mq.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Channel Message 抽象类
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
public abstract class AbstractChannelMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Channel
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
|
||||
public abstract String getChannel();
|
||||
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
package co.yixiang.yshop.framework.mq.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import co.yixiang.yshop.framework.common.util.json.JsonUtils;
|
||||
import co.yixiang.yshop.framework.mq.core.RedisMQTemplate;
|
||||
import co.yixiang.yshop.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import co.yixiang.yshop.framework.mq.core.message.AbstractRedisMessage;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
public abstract class AbstractChannelMessageListener<T extends AbstractChannelMessage> implements MessageListener {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
private final String channel;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractChannelMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得 Sub 订阅的 Redis Channel 通道
|
||||
*
|
||||
* @return channel
|
||||
*/
|
||||
public final String getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onMessage(Message message, byte[] bytes) {
|
||||
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package co.yixiang.yshop.framework.mq.core.stream;
|
||||
|
||||
import co.yixiang.yshop.framework.mq.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Stream Message 抽象类
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
public abstract class AbstractStreamMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
public abstract String getStreamKey();
|
||||
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
package co.yixiang.yshop.framework.mq.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import co.yixiang.yshop.framework.common.util.json.JsonUtils;
|
||||
import co.yixiang.yshop.framework.mq.core.RedisMQTemplate;
|
||||
import co.yixiang.yshop.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import co.yixiang.yshop.framework.mq.core.message.AbstractRedisMessage;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
public abstract class AbstractStreamMessageListener<T extends AbstractStreamMessage>
|
||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
@Getter
|
||||
private final String streamKey;
|
||||
|
||||
/**
|
||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
||||
*/
|
||||
@Value("${spring.application.name}")
|
||||
@Getter
|
||||
private String group;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractStreamMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, String> message) {
|
||||
// 消费消息
|
||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
// ack 消息消费完成
|
||||
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
|
||||
// TODO yshop:需要额外考虑以下几个点:
|
||||
// 1. 处理异常的情况
|
||||
// 2. 发送日志;以及事务的结合
|
||||
// 3. 消费日志;以及通用的幂等性
|
||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
package co.yixiang.yshop.framework.mq.job;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import co.yixiang.yshop.framework.mq.core.RedisMQTemplate;
|
||||
import co.yixiang.yshop.framework.mq.core.stream.AbstractStreamMessageListener;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.StreamOperations;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 这个任务用于处理,crash 之后的消费者未消费完的消息
|
||||
*/
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class RedisPendingMessageResendJob {
|
||||
|
||||
private static final String LOCK_KEY = "redis:pending:msg:lock";
|
||||
|
||||
private final List<AbstractStreamMessageListener<?>> listeners;
|
||||
private final RedisMQTemplate redisTemplate;
|
||||
private final String groupName;
|
||||
private final RedissonClient redissonClient;
|
||||
|
||||
/**
|
||||
* 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
|
||||
*/
|
||||
@Scheduled(cron = "35 * * * * ?")
|
||||
public void messageResend() {
|
||||
RLock lock = redissonClient.getLock(LOCK_KEY);
|
||||
// 尝试加锁
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
execute();
|
||||
} catch (Exception ex) {
|
||||
log.error("[messageResend][执行异常]", ex);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void execute() {
|
||||
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
|
||||
listeners.forEach(listener -> {
|
||||
PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName);
|
||||
// 每个消费者的 pending 队列消息数量
|
||||
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
|
||||
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
|
||||
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
|
||||
|
||||
// 从消费者的 pending 队列中读取消息
|
||||
List<MapRecord<String, Object, Object>> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0")));
|
||||
if (CollUtil.isEmpty(records)) {
|
||||
return;
|
||||
}
|
||||
for (MapRecord<String, Object, Object> record : records) {
|
||||
// 重新投递消息
|
||||
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(record.getValue()) // 设置内容
|
||||
.withStreamKey(listener.getStreamKey()));
|
||||
|
||||
// ack 消息消费完成
|
||||
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
/**
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
*/
|
||||
package co.yixiang.yshop.framework.mq;
|
@ -0,0 +1,62 @@
|
||||
package org.springframework.data.redis.stream;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.ByteRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.Record;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。
|
||||
* 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006
|
||||
* 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽!
|
||||
*
|
||||
* @author yshop
|
||||
*/
|
||||
public class DefaultStreamMessageListenerContainerX<K, V extends Record<K, ?>> extends DefaultStreamMessageListenerContainer<K, V> {
|
||||
|
||||
/**
|
||||
* 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现
|
||||
*/
|
||||
public static <K, V extends Record<K, ?>> StreamMessageListenerContainer<K, V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) {
|
||||
Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!");
|
||||
Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!");
|
||||
return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options);
|
||||
}
|
||||
|
||||
public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions<K, V> containerOptions) {
|
||||
super(connectionFactory, containerOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现
|
||||
*/
|
||||
@Override
|
||||
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
||||
return this.doRegisterX(getReadTaskX(streamRequest, listener));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private StreamPollTask<K, V> getReadTaskX(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
||||
StreamPollTask<K, V> task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener);
|
||||
// 修改 readFunction 方法
|
||||
Function<ReadOffset, List<ByteRecord>> readFunction = (Function<ReadOffset, List<ByteRecord>>) ReflectUtil.getFieldValue(task, "readFunction");
|
||||
ReflectUtil.setFieldValue(task, "readFunction", (Function<ReadOffset, List<ByteRecord>>) readOffset -> {
|
||||
List<ByteRecord> records = readFunction.apply(readOffset);
|
||||
//【重点】保证 records 不是空,避免 NPE 的问题!!!
|
||||
return records != null ? records : Collections.emptyList();
|
||||
});
|
||||
return task;
|
||||
}
|
||||
|
||||
private Subscription doRegisterX(Task task) {
|
||||
return ReflectUtil.invoke(this, "doRegister", task);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1 @@
|
||||
co.yixiang.yshop.framework.mq.config.YshopMQAutoConfiguration
|
Reference in New Issue
Block a user