分布式缓存的实践原理
技术选型
分布式缓存方案默认采用的是主流的缓存框架:Redis,即将缓存数据存储在另一台Redis服务器上。 系统在使用缓存时,依赖的是缓存的接口,而非具体的实现;
分布式缓存在更新时不允许并发更新,防止缓存击穿,因此我们在Redis的基础上,采用了基于Redisson的分布式锁,在更新分布式缓存前必须先获得锁。缓存生命周期
更新机制被动刷新:基于Redis的数据驱逐策略,包括LRU和TTL等;主动刷新:业务数据驱动的数据更新。当业务侧有数据变更时,将会主动刷新分布式缓存。比如当秒杀品下线时,会发出相应的领域事件,而在领域事件的处理中就会刷新缓存。
分布式缓存在刷新的过程中,并不会主动刷新所有服务器上的本地缓存,本地缓存将遵循单机的刷新策略。这意味着,本地缓存可能会有秒级或毫秒级的滞后,对于数据一致性非绝对敏感的场景,这种短时间的延迟下的脏数据是可以接受的,它只是会对用户侧的展示有所影响,而不会影响到服务端的数据状态。分布式锁基于Redis实现分布式锁利用set nx ex获取锁,并设置过期时间,保存线程标识释放锁时先判断线程标识是否与自己一致,一致则删除锁/** * 基于redis的分布式锁 */ public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString() + "-"; /** * 锁 * * @param time 锁的过期时间 * @return */ @Override public boolean tryLock(long time) { long id = Thread.currentThread().getId(); // 值用 uuid + 线程id拼接 String value = ID_PREFIX + id; // 自动拆箱有空指针问题 Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, value, time, TimeUnit.SECONDS); return Boolean.TRUE.equals(aBoolean); } /** * 解锁 */ @Override public void unLock() { String valueInRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); long id = Thread.currentThread().getId(); // 值用 uuid + 线程id拼接 String value = ID_PREFIX + id; // 两者相同才释放锁,要先做判断再进行释放 if (value.equals(valueInRedis)) { stringRedisTemplate.delete(KEY_PREFIX + name); } } }基于Redison实现分布式锁
导包 org.redisson redisson 3.16.3
配置文件@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://localhost:6379") .setPassword("ezreal") .setDatabase(0); return Redisson.create(config); } }
代码实现@Component public class RedissonLockService implements DistributedLockFactoryService { private final Logger logger = LoggerFactory.getLogger(RedissonLockService.class); @Resource private RedissonClient redissonClient; @Override public DistributedLock getDistributedLock(String key) { RLock rLock = redissonClient.getLock(key); return new DistributedLock() { @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { boolean isLockSuccess = rLock.tryLock(waitTime, leaseTime, unit); logger.info("{} get lock result:{}", key, isLockSuccess); return isLockSuccess; } @Override public void lock(long leaseTime, TimeUnit unit) { rLock.lock(leaseTime, unit); } @Override public void unlock() { if (isLocked() && isHeldByCurrentThread()) { rLock.unlock(); } } @Override public boolean isLocked() { return rLock.isLocked(); } @Override public boolean isHeldByThread(long threadId) { return rLock.isHeldByThread(threadId); } @Override public boolean isHeldByCurrentThread() { return rLock.isHeldByCurrentThread(); } }; } }
实现原理可重入:利用hash结构记录线程id和重入次数可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制超时续约::利用watchDog,每隔一段时间 (releaseTime/3),重置超时时间
分布式缓存基本逻辑
每次获取缓存的时候,先从本地缓存中获取,再从分布式缓存中获取。
若分布式缓存中不存在对应的值,则需要获取分布式锁,然后对分布式缓存数据进行更新若获取锁成功,则查询数据库,获取最新的数据放入缓存;
若数据库中的数据也为空,则也要存储入数据库,防止缓存穿透若获取锁失败,则直接返回,不要等待重新获取锁,客户端对这次请求进行静默处理;实现代码
缓存的接口
实现部分Redis数据结构的存储接口public interface DistributedCacheService { void put(String key, String value); void put(String key, Object value); void put(String key, Object value, long timeout, TimeUnit unit); void put(String key, Object value, long expireTime); T getObject(String key, Class targetClass); String getString(String key); List getList(String key, Class targetClass); Boolean delete(String key); Boolean hasKey(String key); }
基本逻辑代码private SeckillGoodCache updateDistributedSeckillGood(Long itemId) { logger.info("更新远程缓存|{}", itemId); DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(UPDATE_ITEMS_CACHE_LOCK_KEY + itemId); try { boolean tryLock = distributedLock.tryLock(1, 5, TimeUnit.SECONDS); // 如果没有获得到锁,就返回重试 if (!tryLock) { return new SeckillGoodCache().tryLater(); } // 再次检查 SeckillGoodCache distributedSeckillCache = distributedCacheService.getObject(buildItemCacheKey(itemId), SeckillGoodCache.class); if (distributedSeckillCache != null) { return distributedSeckillCache; } // 查询数据库 SeckillGood seckillGood = seckillGoodMapper.selectById(itemId); SeckillGoodCache seckillGoodCache = new SeckillGoodCache(); if (seckillGood == null) { // 数据不存在 也要返回 也要存缓存 防止缓存穿透 seckillGoodCache.notExist(); } else { seckillGoodCache.with(seckillGood).setVersion(System.currentTimeMillis()); } logger.info("itemCache|远程缓存已更新|{}", itemId); distributedCacheService.put(buildItemCacheKey(itemId), JSON.toJSONString(seckillGoodCache)); return seckillGoodCache; } catch (InterruptedException e) { logger.error("itemCache|远程缓存更新失败|{}", itemId); return new SeckillGoodCache().tryLater(); } finally { distributedLock.unlock(); } }