redis作为nosql数据库,应用其实就是存取数据,但是凭借着其优越的性能与多维的api支持,已经支持主从集群部署,因此在不同业务场景下往往都会有不同的使用方式。而应用往往是关系的具体的业务才能算的上有价值,今天从几个方面更进一步认识redis。
通过其客户端提供的api进行基础的数据存取。针对不同编程语言提供了对应的客户端工具。
通过string数据类型,通过incr decr 实现
>set count 1
>incr count
>get count
当然针对的是整数类型,字符串则会出错
通过list数据类型实现
>lpush list a b c d
>ltrim list 0 500
因为list是链表,且遵循FIFO,最新的记录会在最前面,可以通过ltrim取出最新的N条记录
通过sorted set实现
>zadd net 0 baidu
>zadd net 0 ali
>zadd net 0 tencent
>zincrby net 1 baidu
>...
>zrevrange net 0 N-1
每次有新的点击时,将scroe加一,然后根据score排序
通过sorted set实现
>zadd user 20190101122100 jack
>zrangebyscore user (20190101000000 (20190101235959
查看2019-01-01 00:00:00~2019-01-01 23:59:59用户
一般的加锁最简单的是通过synchronized来实现通过同步取锁,并进行阻塞,但是在分布式系统中,各个系统独立存在,synchronized显然无法作用,我们则需要在方法层面实现同步拿锁。
加锁:对应key不存在,为key设置值;key已经超时;
释放锁:删除key
-订阅频道(匹配频道) PSUBSCRIBE pattern [pattern ...]
-订阅多频道(具体频道) SUBSCRIBE channel [channel ...]
-发布消息 PUBLISH channel message
-退订频道 UNSUBSCRIBE [channel [channel ...]]
-退订频道 PUNSUBSCRIBE [pattern [pattern ...]]
-查看状态 PUBSUB subcommand [argument [argument ...]]
可以看到,很多应用只是对redis基本的api进行使用,所有主要针对其中几个方面进行下一步的讨论:
配合springboot使用,只需要引入相关jar,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
通过用户服务看下具体实现:
@Cacheable(value = CACHE_USER,key = "#id")
public User getUser(String id){
log.info("getUser by {}",id);
return userMap.get(id);
}
public List<User> getUsers(){
log.info("getUsers");
return users;
}
public User saveUser(User user){
log.info("saveUser {}",user);
if(user.getUserId()!=null && !"".equals(user.getUserId())){
updateUser(user);
}else{
user.setUserId(UUID.randomUUID().toString());
users.add(user);
userMap.put(user.getUserId(),user);
}
return user;
}
@CachePut(value = CACHE_USER,key = "#user.userId")
public User updateUser(User user){
log.info("updateUser {}",user);
String id = user.getUserId();
if(user.getUserId()!=null && !"".equals(user.getUserId())){
if(userMap.get(user.getUserId())==null){
throw new RuntimeException(String.format("id:【%s】没有对应对象!" ,user.getUserId()));
}else{
User oldUser = userMap.get(user.getUserId());
userMap.put(user.getUserId(),user);
users.remove(oldUser);
users.add(user);
}
userMap.put(user.getUserId(),user);
}else{
throw new RuntimeException("用户id不能为空!");
}
return user;
}
@CacheEvict(value = CACHE_USER,key = "#id")
public int deleteUser(String id){
log.info("deleteUser by {}",id);
User user = userMap.get(id);
users.remove(user);
userMap.remove(id);
return 1;
}
主要通过spring提供的三个注解实现:
当然这些注解是针对spring的所有缓存,不单针对redis
@Cacheable :如果没有则存,如果有则从缓存去
@CachePut :不管缓存没有,都会去方法取,然后缓存
@CacheEvict :删除
问题(参考https://www.cnblogs.com/raichen/p/7750165.html):
1、穿透
缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。
解决方案:
有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。另外也有一个更为简单粗暴的方法(我们采用的就是这种),如果一个查询返回的数据为空(不管是数 据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。
2、雪崩
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决方案:
缓存失效时的雪崩效应对底层系统的冲击非常可怕。大多数系统设计者考虑用加锁或者队列的方式保证缓存的单线 程(进程)写,从而避免失效时大量的并发请求落到底层存储系统上。这里分享一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
3、击穿
对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题,这个和缓存雪崩的区别在于这里针对某一key缓存,前者则是很多key。
缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
结局方案:
尽量少的线程构建缓存(甚至是一个) + 数据一致性 + 较少的潜在危险。
分析:
1、多个系统在任意时刻,只有系统能持有锁
2、因为系统发生故障,不会导致死锁。设置超时时间
3、因为redis支持集群环境,多个节点发生故障,同样能够保证系统正常
4、加锁和解锁必须是同一个业务
5、业务没有结束,如果锁超时,能够及时续期
其实我们可以分几种情况进行设计,但是要保证前提条件。在取锁和释放锁时,最终保证取锁的对象只有一个,在但系统中,我们可以分别对两个方法通过同步机制来 实现,但是对于分布式系统,同一个应用会部署在多个服务环境中,这时借助redis提供的一些原子操作来帮我们实现。
先看一种实现方式:
public static boolean getLock2( String lockKey, int expireTime) {
long expires = System.currentTimeMillis() + expireTime;
String expiresStr = String.valueOf(expires);
// 如果当前锁不存在,返回加锁成功
if (jedis().setnx(lockKey, expiresStr) == 1) {
return true;
}
// 如果锁存在,获取锁的过期时间
String currentValueStr = jedis().get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
String oldValueStr = jedis().getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
return true;
}
}
return false;
}
大致逻辑如下:
1、判断key是否存在,如果不存在则设置值(当前系统时间+过期时间,可以当做锁id,因为同一时刻,只有一个系统持有锁),返回取锁成功;如果存在则下一步
2、取出key对应的值,比较该值是否过期,如果过期,则删除key;如果没有过期看下一步
3、存入新的时间,并取出原来的值,将原来的值与比较前的原值一致,则该系统加锁成功
4、否则取锁失败
看下这里的问题:
1、由于多个系统可能部署在不同服务器上,虽然获取锁的时间理论上是不可能相同,但是如果服务器时间的不一致,那么锁可能提前超时了。
例如:系统A设置超时时间3s,业务还在处理中,才过了1s,但是系统B与比系统A早2s,次数系统B取锁,发现所超时了,并且成功取得了锁,A还没运行完锁还没释放又被B拿到了锁,这样自然出现问题了。
2、如果锁过期,在getset方法出会覆盖超时时间
例如:当前值为1,系统A取出1,判断过期了,然后设置值为2,并返回1,两次比较相同返回true,持有锁;同时系统B也进行了getset,设置为3,返回2,由于1!=2,返回false;如果在系统A取锁时还没返回但B也做了处理,虽然A获取了锁,但是超时时间却改变了。
2、从严谨性上讲,由于没有系统标识,任何系统都可以释放其他系统的锁。因为锁之后有一个系统持有,那么释放锁应该也之后是那个系统。
其实这种设计方式,处理保证时间的一致性,一般也不会有很多问题
看另一种实现:
public boolean getLock1(String clientId){
String v = jedis().set(scene, clientId, "nx", TIME_UNIT, timeout);//30秒超时
//如果没有值,且设置成功,则返回OK,否则返回null
if("OK".equals(v)){
return true;
}
return false;
}
通过设置过期时间,不依赖于系统时间,但是一定要确保对值设定与过期时间应该是原子操作,像下面这样试不行的:
Long r = jedis().setnx(scene, String.valueOf(new Date().getTime()));//没有超时时间,设置为值
if(r==1){
jedis().expire(scene,30);//如果这里异常了,则死锁了
}
如果设置过期时间时出现系统问题,锁没有过期时间,如果锁有没有释放,那么其他系统用于没法拿到锁了。
释放锁其实就是删除key就行了:
public boolean releaseLock(String clientId){
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis().eval(script, Collections.singletonList(scene), Collections.singletonList(clientId));
if ("1".equals(result.toString())) {
return true;
}
return false;
}
逻辑上是删除对应系统的锁,如果先查找后删除,又是非原子操作,则会存在与上面一样的风险。