Redis实战常见问题与解决方案

  Java   36分钟   124浏览   0评论

引言

你好呀,我是小邹。

Redis作为高性能的内存数据存储系统,在现代Java应用中广泛用于缓存、会话存储、消息队列等场景。然而在实际使用中,开发者常会遇到各种问题。本文将通过实际案例,探讨Redis在Java应用中的常见问题及其解决方案。

一、缓存穿透、击穿与雪崩

1.1 缓存穿透

问题场景:查询不存在的数据,导致请求绕过缓存直接访问数据库。

// 问题示例
public User getUserById(String userId) {
    String key = "user:" + userId;
    User user = redisTemplate.opsForValue().get(key);
    if (user == null) {
        // 数据库查询
        user = userDao.findById(userId);
        if (user != null) {
            redisTemplate.opsForValue().set(key, user, 5, TimeUnit.MINUTES);
        }
    }
    return user; // 可能返回null
}

解决方案

public User getUserByIdWithProtection(String userId) {
    String key = "user:" + userId;
    User user = redisTemplate.opsForValue().get(key);

    // 1. 布隆过滤器检查
    if (!bloomFilter.mightContain(userId)) {
        return null;
    }

    // 2. 缓存空值
    if (EMPTY_CACHE.equals(user)) {
        return null;
    }

    if (user == null) {
        user = userDao.findById(userId);
        if (user != null) {
            redisTemplate.opsForValue().set(key, user, 5, TimeUnit.MINUTES);
        } else {
            // 缓存空值,设置较短过期时间
            redisTemplate.opsForValue().set(key, EMPTY_CACHE, 1, TimeUnit.MINUTES);
            bloomFilter.put(userId); // 加入布隆过滤器
        }
    }
    return user;
}

1.2 缓存击穿

问题场景:热点key过期瞬间,大量请求同时访问数据库。

解决方案

public User getHotUser(String userId) {
    String key = "hot_user:" + userId;

    // 1. 互斥锁方案
    String lockKey = key + ":lock";
    try {
        // 尝试获取锁
        Boolean lockAcquired = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);

        if (lockAcquired != null && lockAcquired) {
            try {
                User user = redisTemplate.opsForValue().get(key);
                if (user == null) {
                    user = userDao.findById(userId);
                    redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
                }
                return user;
            } finally {
                redisTemplate.delete(lockKey);
            }
        } else {
            // 等待并重试
            Thread.sleep(50);
            return redisTemplate.opsForValue().get(key);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return userDao.findById(userId);
    }
}

// 2. 逻辑过期方案
public User getHotUserWithLogicExpire(String userId) {
    String key = "hot_user:" + userId;
    RedisData<User> redisData = redisTemplate.opsForValue().get(key);

    if (redisData == null) {
        return loadAndSetUser(userId);
    }

    // 检查逻辑过期时间
    if (redisData.getExpireTime().isBefore(LocalDateTime.now())) {
        // 异步更新缓存
        CompletableFuture.runAsync(() -> {
            loadAndSetUser(userId);
        });
    }

    return redisData.getData();
}

1.3 缓存雪崩

问题场景:大量key同时过期,导致数据库压力激增。

解决方案

@Component
public class CacheService {

    // 1. 差异化过期时间
    public void setWithRandomExpire(String key, Object value, long baseTime, TimeUnit unit) {
        // 添加随机时间偏移(±20%)
        Random random = new Random();
        double variation = 0.8 + random.nextDouble() * 0.4;
        long expireTime = (long) (unit.toMillis(baseTime) * variation);

        redisTemplate.opsForValue().set(
            key, 
            value, 
            expireTime, 
            TimeUnit.MILLISECONDS
        );
    }

    // 2. 热点key永不过期,后台异步更新
    @Scheduled(fixedDelay = 300000) // 每5分钟执行
    public void refreshHotKeys() {
        List<String> hotKeys = getHotKeysFromMonitor();
        hotKeys.forEach(key -> {
            Object data = loadDataFromDB(key);
            if (data != null) {
                redisTemplate.opsForValue().set(key, data);
            }
        });
    }
}

二、数据一致性问题

2.1 缓存与数据库一致性

@Service
public class UserService {

    // 问题:先更新数据库后删除缓存,可能产生脏数据
    public void updateUserProblematic(User user) {
        // 1. 更新数据库
        userDao.update(user);
        // 2. 删除缓存
        redisTemplate.delete("user:" + user.getId());
    }

    // 解决方案1:延迟双删
    public void updateUserWithDelayDoubleDelete(User user) {
        // 第一次删除
        redisTemplate.delete("user:" + user.getId());

        // 更新数据库
        userDao.update(user);

        // 延迟再次删除
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000); // 延迟1秒
                redisTemplate.delete("user:" + user.getId());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    // 解决方案2:基于消息队列的最终一致性
    @Transactional
    public void updateUserWithMQ(User user) {
        // 更新数据库
        userDao.update(user);

        // 发送缓存更新消息
        redisTemplate.convertAndSend("cache-update", 
            CacheUpdateMessage.of("user:" + user.getId(), "delete"));
    }

    // 消息监听器
    @Component
    public class CacheUpdateListener {

        @EventListener
        public void handleCacheUpdate(CacheUpdateMessage message) {
            if ("delete".equals(message.getOperation())) {
                redisTemplate.delete(message.getKey());
            }
        }
    }
}

三、内存管理与优化

3.1 大key问题

@Component
public class BigKeySolution {

    // 问题:存储大对象
    public void storeBigObjectProblematic(String key, List<User> users) {
        redisTemplate.opsForValue().set(key, users); // 可能超内存
    }

    // 解决方案1:分片存储
    public void storeBigObjectWithSharding(String baseKey, List<User> users) {
        int shardSize = 100;
        int shardCount = (users.size() + shardSize - 1) / shardSize;

        List<List<User>> shards = Lists.partition(users, shardSize);
        for (int i = 0; i < shards.size(); i++) {
            String shardKey = baseKey + ":shard:" + i;
            redisTemplate.opsForValue().set(shardKey, shards.get(i));
        }
        // 存储元数据
        redisTemplate.opsForHash().put(baseKey + ":meta", 
            "shardCount", String.valueOf(shardCount));
    }

    // 解决方案2:使用合适的数据结构
    public void storeUserScores(String userId, Map<String, Double> scores) {
        // 使用Hash而不是String存储Map
        redisTemplate.opsForHash().putAll("user:scores:" + userId, scores);
    }
}

3.2 内存淘汰策略配置

# application.yml
spring:
  redis:
    # 使用allkeys-lru策略,防止内存溢出
    jedis:
      pool:
        max-active: 8
        max-wait: -1ms
        max-idle: 8
        min-idle: 0
    # 在Redis配置中设置
    # maxmemory-policy allkeys-lru

四、连接池优化

@Configuration
public class RedisConfig {

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();

        // 关键连接池配置
        poolConfig.setMaxTotal(50);           // 最大连接数
        poolConfig.setMaxIdle(20);            // 最大空闲连接
        poolConfig.setMinIdle(5);             // 最小空闲连接
        poolConfig.setMaxWaitMillis(2000);    // 最大等待时间
        poolConfig.setTestOnBorrow(true);     // 借出时测试
        poolConfig.setTestWhileIdle(true);    // 空闲时测试
        poolConfig.setTimeBetweenEvictionRunsMillis(30000); // 逐出扫描间隔

        JedisConnectionFactory factory = new JedisConnectionFactory(poolConfig);
        factory.setHostName("localhost");
        factory.setPort(6379);
        return factory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());

        // 使用Jackson序列化
        Jackson2JsonRedisSerializer<Object> serializer = 
            new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.activateDefaultTyping(
            mapper.getPolymorphicTypeValidator(),
            ObjectMapper.DefaultTyping.NON_FINAL
        );
        serializer.setObjectMapper(mapper);

        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(serializer);
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(serializer);

        template.afterPropertiesSet();
        return template;
    }
}

五、监控与运维问题

5.1 慢查询监控

@Component
public class RedisMonitor {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    // 定期检查慢查询
    @Scheduled(fixedDelay = 60000)
    public void monitorSlowLog() {
        List<SlowLog> slowLogs = redisTemplate.execute(
            (RedisCallback<List<SlowLog>>) connection -> {
                Jedis jedis = (Jedis) connection.getNativeConnection();
                return jedis.slowlogGet();
            }
        );

        slowLogs.stream()
            .filter(log -> log.getExecutionTime() > 100) // 超过100ms
            .forEach(log -> {
                log.warn("Redis慢查询: {} - {}ms", 
                    log.getArgs(), log.getExecutionTime());
                // 发送告警
                sendAlert(log);
            });
    }

    // 内存监控
    @Scheduled(fixedDelay = 300000)
    public void monitorMemory() {
        Properties info = redisTemplate.getRequiredConnectionFactory()
            .getConnection().info("memory");

        Long usedMemory = Long.parseLong(info.getProperty("used_memory"));
        Long maxMemory = Long.parseLong(info.getProperty("maxmemory"));

        double usageRate = (double) usedMemory / maxMemory;
        if (usageRate > 0.8) {
            log.error("Redis内存使用率超过80%: {}%", usageRate * 100);
            // 触发清理或扩容
        }
    }
}

5.2 集群模式下的问题

@Component
public class RedisClusterService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 处理集群重定向
    public Object getWithRetry(String key) {
        int retryCount = 0;
        while (retryCount < 3) {
            try {
                return redisTemplate.opsForValue().get(key);
            } catch (RedisConnectionFailureException e) {
                retryCount++;
                if (e.getMessage().contains("MOVED") || e.getMessage().contains("ASK")) {
                    // 处理集群重定向
                    try {
                        Thread.sleep(50 * retryCount);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                } else {
                    throw e;
                }
            }
        }
        throw new RedisOperationException("重试失败");
    }

    // 批量操作优化(集群模式下pipeline有限制)
    public Map<String, Object> batchGet(Set<String> keys) {
        // 按slot分组
        Map<Integer, List<String>> slotMap = new HashMap<>();

        keys.forEach(key -> {
            int slot = JedisClusterCRC16.getSlot(key);
            slotMap.computeIfAbsent(slot, k -> new ArrayList<>()).add(key);
        });

        Map<String, Object> result = new HashMap<>();
        slotMap.values().forEach(slotKeys -> {
            // 每个slot内使用pipeline
            List<Object> values = redisTemplate.executePipelined(
                (RedisCallback<Object>) connection -> {
                    slotKeys.forEach(key -> 
                        connection.stringCommands().get(key.getBytes())
                    );
                    return null;
                }
            );

            for (int i = 0; i < slotKeys.size(); i++) {
                result.put(slotKeys.get(i), values.get(i));
            }
        });

        return result;
    }
}

六、安全与配置最佳实践

@Configuration
public class RedisSecurityConfig {

    @Bean
    public RedisTemplate<String, Object> secureRedisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());

        // 1. 启用SSL(如果支持)
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName("redis.example.com");
        config.setPort(6379);

        // 2. 设置密码
        config.setPassword(RedisPassword.of("securePassword123"));

        // 3. 使用SSL连接
        JedisClientConfiguration clientConfig = JedisClientConfiguration.builder()
            .useSsl()
            .sslSocketFactory(SSLContext.getDefault().getSocketFactory())
            .build();

        JedisConnectionFactory factory = new JedisConnectionFactory(config, clientConfig);
        template.setConnectionFactory(factory);

        // 4. 设置键前缀,避免冲突
        template.setKeySerializer(new PrefixStringRedisSerializer("app:prod:"));

        return template;
    }
}

// 键前缀序列化器
class PrefixStringRedisSerializer implements RedisSerializer<String> {
    private final String prefix;
    private final StringRedisSerializer serializer = new StringRedisSerializer();

    public PrefixStringRedisSerializer(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public byte[] serialize(String key) {
        return serializer.serialize(prefix + key);
    }

    @Override
    public String deserialize(byte[] bytes) {
        String fullKey = serializer.deserialize(bytes);
        return fullKey.startsWith(prefix) ? 
            fullKey.substring(prefix.length()) : fullKey;
    }
}

总结与最佳实践

  1. 设计层面

    • 合理设计key结构,使用冒号分隔命名空间
    • 预估数据量,选择合适的数据结构
    • 设置合理的过期时间,避免雪崩
  2. 开发层面

    • 始终处理Redis异常,不要假设Redis永远可用
    • 使用连接池并合理配置参数
    • 监控慢查询和大key
  3. 运维层面

    • 启用持久化(AOF+RDB)
    • 设置内存上限和淘汰策略
    • 定期备份和监控
  4. 高可用

    • 生产环境使用哨兵或集群模式
    • 做好容灾和故障转移预案
    • 实施分级缓存策略(本地缓存+Redis)

通过以上方案,Java开发者可以有效避免Redis使用中的常见陷阱,构建稳定高效的应用系统。记住,没有银弹,最佳实践总是需要根据具体业务场景进行调整和优化。

如果你觉得文章对你有帮助,那就请作者喝杯咖啡吧☕
微信
支付宝
  0 条评论