This commit is contained in:
2023-07-17 17:38:06 +08:00
parent a9b02b5a4d
commit 1b3a3fdfb7
17 changed files with 318 additions and 69 deletions

View File

@ -0,0 +1,28 @@
package com.qiaoba.common.database.config;
import com.qiaoba.common.database.listener.SwitchPrimaryListener;
import com.qiaoba.common.redis.manager.RedisChannelManager;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* DatasourceRedisMessageConfig
*
* @author ailanyin
* @version 1.0
* @since 2023/7/17 10:38
*/
@Configuration
@RequiredArgsConstructor
public class DatasourceRedisMessageConfig {
private final RedisChannelManager redisChannelManager;
@Bean
public SwitchPrimaryListener switchPrimaryListener() {
SwitchPrimaryListener switchPrimaryListener = new SwitchPrimaryListener();
redisChannelManager.addChannelListener("test", switchPrimaryListener);
return switchPrimaryListener;
}
}

View File

@ -84,7 +84,7 @@ public class BackupDatasourceContext {
*/
public static void updateBackupMap(String tenantId, DynamicDataSource dataSource) {
List<DynamicDataSource> dataSourceList = get(tenantId);
if (CollUtil.isEmpty(dataSourceList)) {
if (CollUtil.isNotEmpty(dataSourceList)) {
for (DynamicDataSource dynamicDataSource : dataSourceList) {
if (dataSource.getDatasourceId().equals(dynamicDataSource.getDatasourceId())) {
dataSourceList.remove(dynamicDataSource);

View File

@ -5,6 +5,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 动态数据源实体
*
@ -16,7 +18,9 @@ import lombok.NoArgsConstructor;
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DynamicDataSource {
public class DynamicDataSource implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 数据源ID

View File

@ -0,0 +1,37 @@
package com.qiaoba.common.database.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 切换主要的数据源
*
* @author ailanyin
* @version 1.0
* @since 2023/7/17 10:56
*/
@Component
@Slf4j
public class SwitchPrimaryListener implements MessageListener {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msgObj = redisTemplate.getValueSerializer().deserialize(messageBody);
// 渠道名称转换
String channel = new String(pattern);
log.info("channel: {}", channel);
log.info("msg: {}", msgObj);
}
}

View File

@ -8,6 +8,7 @@ import com.qiaoba.common.database.config.DynamicDataSourceConfig;
import com.qiaoba.common.database.context.BackupDatasourceContext;
import com.qiaoba.common.database.entity.DynamicDataSource;
import com.qiaoba.common.database.util.JdbcUtil;
import com.qiaoba.common.redis.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -28,18 +29,13 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class NotOnlineDatasourceMonitor {
/**
* 错误数据源(连接中断)
* <p>
* key: 租户ID
* value: 错误数据源 list
*/
public static Map<String, List<DynamicDataSource>> ERROR_DATASOURCE_MAP = new ConcurrentHashMap<>();
private static final String LOCK_KEY = "lock4j:notOnlineDatasourceMonitor";
public static final String ERROR_DATASOURCE_KEY = "error_datasource";
@Resource
private LockTemplate lockTemplate;
@Resource
private RedisService redisService;
@PostConstruct
@ -52,10 +48,6 @@ public class NotOnlineDatasourceMonitor {
if (!DynamicDataSourceConfig.COMPLETE_LOAD_DATASOURCE) {
return;
}
// 没有暂时失联的数据源, 直接结束
if (CollUtil.isEmpty(ERROR_DATASOURCE_MAP)) {
return;
}
// expire = -1 锁自动续期, 防止数据源过多或异常等待, 超过默认锁 30s
final LockInfo lockInfo = lockTemplate.lock(LOCK_KEY, -1, 1000);
//申请锁失败 说明集群中其他设备正在执行监控
@ -77,34 +69,34 @@ public class NotOnlineDatasourceMonitor {
}
private void tryConnect() {
Set<String> tenantIds = ERROR_DATASOURCE_MAP.keySet();
for (String tenantId : tenantIds) {
List<DynamicDataSource> errorDatasourceList = ERROR_DATASOURCE_MAP.get(tenantId);
for (int i = 0; i < errorDatasourceList.size(); i++) {
DynamicDataSource errorDatasource = errorDatasourceList.get(i);
// 说明连接成功
boolean check = JdbcUtil.checkConnect(errorDatasource.getDriver(), errorDatasource.getUrl(), errorDatasource.getUsername(), errorDatasource.getPassword());
if (check) {
log.info("数据源重连成功, Url: {}", errorDatasource.getUrl());
// 从errorMap中删除
errorDatasourceList.remove(errorDatasource);
if (CollUtil.isEmpty(errorDatasourceList)) {
ERROR_DATASOURCE_MAP.remove(tenantId);
Map<String, List<DynamicDataSource>> map = redisService.getHashList(ERROR_DATASOURCE_KEY, DynamicDataSource.class);
if (Objects.nonNull(map)) {
for (String tenantId : map.keySet()) {
List<DynamicDataSource> errorDatasourceList = map.get(tenantId);
List<DynamicDataSource> delList = new ArrayList<>();
for (int i = 0; i < errorDatasourceList.size(); i++) {
DynamicDataSource errorDatasource = errorDatasourceList.get(i);
// 说明连接成功
boolean check = JdbcUtil.checkConnect(errorDatasource.getDriver(), errorDatasource.getUrl(), errorDatasource.getUsername(), errorDatasource.getPassword());
if (check) {
log.info("数据源重连成功, Url: {}", errorDatasource.getUrl());
delList.add(errorDatasource);
// 加入到备用Map中
BackupDatasourceContext.addBackupMap(tenantId, errorDatasource);
log.info("租户断线数据源重连成功, 已重新加入备用数据源,租户ID: {}", tenantId);
}
// 加入到备用Map中
BackupDatasourceContext.addBackupMap(tenantId, errorDatasource);
}
errorDatasourceList.removeAll(delList);
removeErrorDatasource(tenantId, errorDatasourceList);
}
}
}
public static void addErrorDatasource(String tenantId, DynamicDataSource dataSource) {
List<DynamicDataSource> errorDataSourceList = NotOnlineDatasourceMonitor.ERROR_DATASOURCE_MAP.get(tenantId);
if (CollUtil.isEmpty(errorDataSourceList)) {
NotOnlineDatasourceMonitor.ERROR_DATASOURCE_MAP.put(tenantId, ListUtil.toList(dataSource));
private void removeErrorDatasource(String tenantId, List<DynamicDataSource> errorDatasourceList) {
if (CollUtil.isNotEmpty(errorDatasourceList)) {
redisService.hSet(ERROR_DATASOURCE_KEY, tenantId, errorDatasourceList);
} else {
errorDataSourceList.add(dataSource);
redisService.hDel(ERROR_DATASOURCE_KEY, tenantId);
}
}
}

View File

@ -14,6 +14,7 @@ import com.qiaoba.common.database.entity.DynamicDataSource;
import com.qiaoba.common.database.service.DynamicDatasourceService;
import com.qiaoba.common.database.util.DatasourceUtil;
import com.qiaoba.common.database.util.JdbcUtil;
import com.qiaoba.common.redis.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -22,7 +23,6 @@ import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 在线的数据源监控
@ -34,13 +34,15 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Component
@Slf4j
@SuppressWarnings("unchecked")
public class OnlineDatasourceMonitor {
private static final String LOCK_KEY = "lock4j:onlineDatasourceMonitor";
private static Map<String, String> WAIT_UPDATE_DATASOURCE_STATUS = new ConcurrentHashMap<>();
private static Map<String, List<String>> WAIT_ADD_ERROR_MAP = new ConcurrentHashMap<>();
private static final String WAIT_UPDATE_DATASOURCE_STATUS_KEY = "wait_update_datasource_status";
private static final String WAIT_ADD_ERROR_DATASOURCE = "wait_add_error_datasource";
@Resource
private RedisService redisService;
@Resource
private LockTemplate lockTemplate;
@Resource
@ -154,7 +156,7 @@ public class OnlineDatasourceMonitor {
// 更改数据库中该数据源为主要数据源
if (Objects.nonNull(dynamicDataSource.getTenantId()) && !TenantConstant.DEFAULT_TENANT_ID.equals(dynamicDataSource.getTenantId())) {
// 添加到待处理任务中
WAIT_UPDATE_DATASOURCE_STATUS.put(dynamicDataSource.getTenantId(), dynamicDataSource.getDatasourceId());
addWaitUpdateDatasourceStatus(dynamicDataSource.getTenantId(), dynamicDataSource.getDatasourceId());
}
// 备用数据源集合删除该数据源
dataSources.remove((int) backupIndex);
@ -167,29 +169,36 @@ public class OnlineDatasourceMonitor {
}
private void handleUpdateDatasourceStatus() {
Set<String> keys = WAIT_UPDATE_DATASOURCE_STATUS.keySet();
for (String key : keys) {
try {
log.info("开始更新数据库中租户数据源状态, 租户ID: {}", key);
// 防止更新过程中主数据挂了
dynamicDatasourceService.changePrimaryDatasource(key, WAIT_UPDATE_DATASOURCE_STATUS.get(key));
// 处理完成 删除任务
WAIT_UPDATE_DATASOURCE_STATUS.remove(key);
log.info("更新数据库中租户数据源状态完成, 租户ID: {}", key);
} catch (Exception e) {
log.error("更新数据库中租户数据源状态未完成, 租户ID: {}, 失败原因: {}", key, e.getMessage());
Map<String, List<String>> map = getWaitUpdateDatasourceStatus();
if (Objects.nonNull(map)) {
for (String tenantId : map.keySet()) {
List<String> datasourceIds = map.get(tenantId);
List<String> delIds = new ArrayList<>();
for (String datasourceId : datasourceIds) {
try {
dynamicDatasourceService.changePrimaryDatasource(tenantId, datasourceId);
delIds.add(datasourceId);
log.info("更新数据库中租户数据源状态完成, 数据源ID: {}", datasourceId);
} catch (Exception e) {
log.error("更新数据库中租户数据源状态未完成, 数据源ID: {}, 失败原因: {}", datasourceId, e.getMessage());
}
}
datasourceIds.removeAll(delIds);
removeWaitUpdateDatasourceStatus(tenantId, datasourceIds);
}
}
}
private void handleErrorDatasource() {
Set<String> tenantIds = WAIT_ADD_ERROR_MAP.keySet();
for (String tenantId : tenantIds) {
List<String> ipList = WAIT_ADD_ERROR_MAP.get(tenantId);
for (String ip : ipList) {
NotOnlineDatasourceMonitor.addErrorDatasource(tenantId, dynamicDatasourceService.selectByIp(tenantId, ip));
Map<String, List<String>> map = getWaitAddErrorDatasource();
if (Objects.nonNull(map)) {
for (String tenantId : map.keySet()) {
List<String> ipList = map.get(tenantId);
for (String ip : ipList) {
redisService.addHashList(NotOnlineDatasourceMonitor.ERROR_DATASOURCE_KEY, tenantId, dynamicDatasourceService.selectByIp(tenantId, ip), DynamicDataSource.class);
}
redisService.hDel(WAIT_ADD_ERROR_DATASOURCE, tenantId);
}
WAIT_ADD_ERROR_MAP.remove(tenantId);
}
}
@ -205,18 +214,40 @@ public class OnlineDatasourceMonitor {
dynamicDataSource.setInitialSize(dataSource.getInitialSize());
dynamicDataSource.setMaxActive(dataSource.getMaxActive());
dynamicDataSource.setMinIdle(dataSource.getMinIdle());
NotOnlineDatasourceMonitor.addErrorDatasource(tenantId, dynamicDataSource);
redisService.addHashList(NotOnlineDatasourceMonitor.ERROR_DATASOURCE_KEY, tenantId, dynamicDataSource, DynamicDataSource.class);
return;
}
// 普通租户
String ip = DataBaseEnum.getIp(dataSource.getUrl(), dataSource.getDriverClassName());
List<String> errorIpList = WAIT_ADD_ERROR_MAP.get(tenantId);
if (CollUtil.isEmpty(errorIpList)) {
WAIT_ADD_ERROR_MAP.put(tenantId, CollUtil.toList(ip));
addWaitAddErrorDatasource(tenantId, ip);
}
private void addWaitUpdateDatasourceStatus(String tenantId, String datasourceId) {
redisService.addHashList(WAIT_UPDATE_DATASOURCE_STATUS_KEY, tenantId, datasourceId, String.class);
}
private Map<String, List<String>> getWaitUpdateDatasourceStatus() {
return redisService.getHashList(WAIT_UPDATE_DATASOURCE_STATUS_KEY, String.class);
}
private void removeWaitUpdateDatasourceStatus(String tenantId, List<String> datasourceIds) {
if (CollUtil.isNotEmpty(datasourceIds)) {
redisService.hSet(WAIT_UPDATE_DATASOURCE_STATUS_KEY, tenantId, datasourceIds);
} else {
errorIpList.add(ip);
redisService.hDel(WAIT_UPDATE_DATASOURCE_STATUS_KEY, tenantId);
}
}
private void addWaitAddErrorDatasource(String tenantId, String ip) {
redisService.addHashList(WAIT_ADD_ERROR_DATASOURCE, tenantId, ip, String.class);
}
private Map<String, List<String>> getWaitAddErrorDatasource() {
return redisService.getHashList(WAIT_ADD_ERROR_DATASOURCE, String.class);
}
}

View File

@ -2,14 +2,12 @@ package com.qiaoba.common.database.util;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.qiaoba.common.base.enums.DataBaseEnum;
import com.qiaoba.common.base.exception.ServiceException;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.*;
/**
* JdbcUtil
@ -82,4 +80,19 @@ public class JdbcUtil {
return DriverManager.getConnection(url, username, password);
}
public static boolean checkConnect(DruidDataSource druidDataSource) {
Connection connection = null;
try {
connection = druidDataSource.getConnection();
if (checkConnect(connection)) {
return true;
}
} catch (SQLException e) {
return false;
} finally {
IoUtil.close(connection);
}
return false;
}
}

View File

@ -1,4 +1,7 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.qiaoba.common.database.factory.DynamicDataSourceFactory,\
com.qiaoba.common.database.util.DatasourceUtil,\
com.qiaoba.common.database.config.MybatisPlusConfig
com.qiaoba.common.database.config.MybatisPlusConfig,\
com.qiaoba.common.database.monitor.OnlineDatasourceMonitor,\
com.qiaoba.common.database.monitor.NotOnlineDatasourceMonitor,\
com.qiaoba.common.database.config.DatasourceRedisMessageConfig

View File

@ -0,0 +1,25 @@
package com.qiaoba.common.redis.config;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* @author ailanyin
* @version 1.0
* @since 2023/7/17 10:24
*/
@Component
@RequiredArgsConstructor
public class RedisSubConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
return container;
}
}

View File

@ -0,0 +1,32 @@
package com.qiaoba.common.redis.manager;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* RedisChannelManager
*
* @author ailanyin
* @version 1.0
* @since 2023/7/17 10:18
*/
@Component
@RequiredArgsConstructor
public class RedisChannelManager {
private final RedisMessageListenerContainer messageListenerContainer;
public void addChannelListener(String channelName, MessageListener listener) {
ChannelTopic topic = new ChannelTopic(channelName);
messageListenerContainer.addMessageListener(listener, topic);
messageListenerContainer.start();
}
public void removeChannelListener(String channelName, MessageListener listener) {
ChannelTopic topic = new ChannelTopic(channelName);
messageListenerContainer.removeMessageListener(listener, topic);
}
}

View File

@ -141,6 +141,25 @@ public interface RedisService {
*/
Map<Object, Object> hGetAll(String key);
/**
* 往hash集合里面插入一个值
*
* @param key key
* @param hashKey hashKey
* @param value value
* @param clazz clazz
*/
<T> void addHashList(String key, String hashKey, T value, Class<T> clazz);
/**
* 获取hash集合数据
*
* @param key key
* @param clazz clazz
* @return Map<String, List < T>>
*/
<T> Map<String, List<T>> getHashList(String key, Class<T> clazz);
/**
* 获取整个hash结构的大小
*
@ -353,6 +372,14 @@ public interface RedisService {
*/
Collection<String> getKeys(String key);
/**
* 发布消息
*
* @param channel 管道 (类似于topic)
* @param message 消息内容
*/
void convertAndSend(String channel, String message);
/**
* get object
*

View File

@ -1,6 +1,7 @@
package com.qiaoba.common.redis.service.impl;
import cn.hutool.core.collection.ListUtil;
import com.qiaoba.common.base.constant.BaseConstant;
import com.qiaoba.common.base.constant.TenantConstant;
import com.qiaoba.common.base.context.BaseContext;
@ -102,6 +103,33 @@ public class RedisServiceImpl implements RedisService {
return redisTemplate.opsForHash().entries(addTenantPrefix(key));
}
@Override
@SuppressWarnings("unchecked")
public <T> void addHashList(String key, String hashKey, T value, Class<T> clazz) {
if (hHasKey(key, hashKey)) {
List<T> list = (List<T>) hGet(key, hashKey);
list.add(value);
hSet(key, hashKey, list);
} else {
hSet(key, hashKey, ListUtil.toList(value));
}
}
@Override
@SuppressWarnings("unchecked")
public <T> Map<String, List<T>> getHashList(String key, Class<T> clazz) {
if (hasKey(key)) {
Map<Object, Object> map = hGetAll(key);
Map<String, List<T>> returnMap = new HashMap<>(map.size());
for (Object tempKey : map.keySet()) {
returnMap.put(tempKey.toString(), (List<T>) map.get(tempKey));
}
return returnMap;
}
return null;
}
@Override
public Long hSize(String key) {
return redisTemplate.opsForHash().size(addTenantPrefix(key));
@ -240,6 +268,11 @@ public class RedisServiceImpl implements RedisService {
return redisTemplate.keys(addTenantPrefix(key));
}
@Override
public void convertAndSend(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
@Override
@SuppressWarnings("unchecked")
public <T> T getObject(String key, Class<T> clazz) {

View File

@ -1,5 +1,7 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.qiaoba.common.redis.config.RedisConfig,\
com.qiaoba.common.redis.config.RedisSubConfig,\
com.qiaoba.common.redis.manager.RedisChannelManager,\
com.qiaoba.common.redis.service.impl.RedisServiceImpl

View File

@ -10,6 +10,7 @@ import com.qiaoba.common.base.validate.AddGroup;
import com.qiaoba.common.base.validate.EditGroup;
import com.qiaoba.common.database.entity.PageQuery;
import com.qiaoba.common.database.entity.TableDataInfo;
import com.qiaoba.common.redis.service.RedisService;
import com.qiaoba.module.tenant.entity.dto.TenantSettingDto;
import com.qiaoba.module.tenant.service.SysTenantService;
import io.swagger.v3.oas.annotations.Operation;
@ -33,6 +34,7 @@ import org.springframework.web.bind.annotation.*;
public class SysTenantController {
private final SysTenantService sysTenantService;
private final RedisService redisService;
@PreAuthorize("hasAuthority('tenant:add')")
@PostMapping
@ -93,4 +95,11 @@ public class SysTenantController {
return AjaxResult.success();
}
@Operation(summary = "测试Redis消息")
@GetMapping("/test")
public AjaxResult msg(String msg) {
redisService.convertAndSend("switch_primary_datasource",msg);
redisService.convertAndSend("test",msg);
return AjaxResult.success();
}
}

View File

@ -57,6 +57,14 @@ public interface SysTenantDatasourceService {
*/
int updateById(SysTenantDatasource sysTenantDatasource);
/**
* 更新状态
*
* @param sysTenantDatasource sysTenantDatasource
* @return 结果
*/
int updateStatus(SysTenantDatasource sysTenantDatasource);
/**
* 批量删除
*

View File

@ -50,7 +50,7 @@ public class DynamicDatasourceServiceImpl implements DynamicDatasourceService {
SysTenantDatasource sysTenantDatasource = new SysTenantDatasource();
sysTenantDatasource.setDatasourceId(datasourceId);
sysTenantDatasource.setIsPrimary(BaseEnum.YES.getCode());
sysTenantDatasourceService.updateById(sysTenantDatasource);
sysTenantDatasourceService.updateStatus(sysTenantDatasource);
// 将该租户下的其他数据源设置为非主要数据源
sysTenantDatasourceService.setBackupDatasourceExcludeId(tenantId, datasourceId);
}

View File

@ -92,6 +92,11 @@ public class SysTenantDatasourceServiceImpl implements SysTenantDatasourceServic
return result;
}
@Override
public int updateStatus(SysTenantDatasource sysTenantDatasource) {
return sysTenantDatasourceMapper.updateById(sysTenantDatasource);
}
@Override
public int deleteByIds(List<String> ids) {
// 删除备用Map