企业实战

架构设计

组件选择/多级

缓存的设计要分多个层次,在不同的层次上选择不同的缓存,包括JVM缓存、文件缓存和Redis缓存

JVM缓存

JVM缓存就是本地缓存,设计在应用服务器中(tomcat)。 通常可以采用Ehcache和Guava Cache,在互联网应用中,由于要处理高并发,通常选择Guava Cache。 适用本地(JVM)缓存的场景:

1、对性能有非常高的要求。

2、不经常变化

3、占用内存不大

4、有访问整个集合的需求

5、数据允许不实时一致

文件缓存

这里的文件缓存是基于http协议的文件缓存,一般放在nginx中。 因为静态文件(比如css,js, 图片)中,很多都是不经常更新的。nginx使用proxy_cache将用户的请 求缓存到本地一个目录。下一个相同请求可以直接调取缓存文件,就不用去请求服务器了。

server {
        listen 80 default_server;
        server_name localhost;
        root /mnt/note/;
        location / {
        }
        #要缓存文件的后缀,可以在以下设置。
        location ~ .*\.(gif|jpg|png|css|js)(.*) {
            proxy_pass http://ip地址:90;
            proxy_redirect off;
            proxy_set_header Host $host;
            proxy_cache cache_one;
            proxy_cache_valid 200 302 24h;
            proxy_cache_valid 301 30d;
            proxy_cache_valid any 5m;
            expires 90d;
            add_header wall "hello lagou.";
        }
}

Redis缓存

分布式缓存,采用主从+哨兵或RedisCluster的方式缓存数据库的数据。 在实际开发中 作为数据库使用,数据要完整 作为缓存使用 作为Mybatis的二级缓存使用

缓存大小

GuavaCache的缓存设置方式:

CacheBuilder.newBuilder().maximumSize(num) // 超过num会按照LRU算法来移除缓存

Nginx的缓存设置方式:

http {
    ...
    proxy_cache_path /path/to/cache levels=1:2 keys_zone=my_cache:10m max_size=10g
    inactive=60m use_temp_path=off;
    server {
    proxy_cache mycache;
    location / {
    proxy_pass http://localhost:8000;
    }
    }
}

Redis缓存设置:

maxmemory=num # 最大缓存量 一般为内存的3/4
maxmemory-policy allkeys lru #

缓存淘汰策略的选择

  • allkeys-lru : 在不确定时一般采用策略。
  • volatile-lru : 比allkeys-lru性能差 存 : 过期时间
  • allkeys-random : 希望请求符合平均分布(每个元素以相同的概率被访问)
  • 自己控制:volatile-ttl 缓存穿透
  • 禁止驱逐 用作DB 不设置maxmemory

key数量

官方说Redis单例能处理key:2.5亿个 一个key或是value大小最大是512M

读写峰值

Redis采用的是基于内存的采用的是单进程单线程模型的 KV 数据库,由C语言编写,官方提供的数据是 可以达到110000+的QPS(每秒内查询次数)。80000的写

命中率

命中:可以直接通过缓存获取到需要的数据。

不命中:无法直接通过缓存获取到想要的数据,需要再次查询数据库或者执行其它的操作。原因可能是 由于缓存中根本不存在,或者缓存已经过期。

通常来讲,缓存的命中率越高则表示使用缓存的收益越高,应用的性能越好(响应时间越短、吞吐量越 高),抗并发的能力越强。 由此可见,在高并发的互联网系统中,缓存的命中率是至关重要的指标。 通过info命令可以监控服务器状态

127.0.0.1:6379> info
# Server
redis_version:5.0.5
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:e188a39ce7a16352
redis_mode:standalone
os:Linux 3.10.0-229.el7.x86_64 x86_64
arch_bits:64
#缓存命中
keyspace_hits:1000
#缓存未命中
keyspace_misses:20
used_memory:433264648
expired_keys:1333536
evicted_keys:1547380

命中率=1000/1000+20=83%

一个缓存失效机制,和过期时间设计良好的系统,命中率可以做到95%以上。

影响缓存命中率的因素:

1、缓存的数量越少命中率越高,比如缓存单个对象的命中率要高于缓存集合

2、过期时间越长命中率越高

3、缓存越大缓存的对象越多,则命中的越多

过期策略

Redis的过期策略是定时删除+惰性删除。

性能监控指标

利用info命令就可以了解Redis的状态了,主要监控指标有:

connected_clients:68 #连接的客户端数量
used_memory_rss_human:847.62M #系统给redis分配的内存
used_memory_peak_human:794.42M #内存使用的峰值大小
total_connections_received:619104 #服务器已接受的连接请求数量
instantaneous_ops_per_sec:1159 #服务器每秒钟执行的命令数量 qps
instantaneous_input_kbps:55.85 #redis网络入口kps
instantaneous_output_kbps:3553.89 #redis网络出口kps
rejected_connections:0 #因为最大客户端数量限制而被拒绝的连接请求数量
expired_keys:0 #因为过期而被自动删除的数据库键数量
evicted_keys:0 #因为最大内存容量限制而被驱逐(evict)的键数量
keyspace_hits:0 #查找数据库键成功的次数
keyspace_misses:0 #查找数据库键失败的次

Redis监控平台: grafana、prometheus以及redis_exporter。

缓存预热

缓存预热就是系统启动前,提前将相关的缓存数据直接加载到缓存系统。避免在用户请求的时候,先查询 数据库,然后再将数据缓存的问题!用户直接查询实现被预热的缓存数据。

加载缓存思路:

  • 数据量不大,可以在项目启动的时候自动进行加载
  • 利用定时任务刷新缓存,将数据库的数据刷新到缓存中

缓存问题

缓存穿透

一般的缓存系统,都是按照key去缓存查询,如果不存在对应的value,就应该去后端系统查找(比如 DB)。

缓存穿透是指在高并发下查询key不存在的数据,会穿过缓存查询数据库。导致数据库压力过大而宕机

解决方案:

  • 对查询结果为空的情况也进行缓存,缓存时间(ttl)设置短一点,或者该key对应的数据insert了 之后清理缓存。

    问题:缓存太多空值占用了更多的空间

  • 使用布隆过滤器。在缓存之前在加一层布隆过滤器,在查询的时候先去布隆过滤器查询 key 是否 存在,如果不存在就直接返回,存在再查缓存和DB。

image-20210813182300818

布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机hash映射函数。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法。

image-20210813182418912

布隆过滤器的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

把字符串———>位 省空间 (1或0)

不用循环———>比较位置 省时间

缓存雪崩

当缓存服务器重启或者大量缓存集中在某一个时间段失效,这样在失效的时候,也会给后端系统(比如DB)带来很大压力。
突然间大量的key失效了或redis重启,大量访问数据库,数据库崩溃
解决方案:

  1. key的失效期分散开 不同的key设置不同的有效期
  2. 设置二级缓存(数据不一定一致)
  3. 高可用(脏读)

缓存击穿

对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题,这个和缓存雪崩的区别在于这里针对某一key缓存,前者则是很多key。
缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓
存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
解决方案:

  1. 用分布式锁控制访问的线程
    使用redis的setnx互斥锁先进行判断,这样其他线程就处于等待状态,保证不会有大并发操作去操作数据库。
  2. 不设超时时间,volatile-lru 但会造成写一致问题
    当数据库数据发生更新时,缓存中的数据不会及时更新,这样会造成数据库中的数据与缓存中的数据的不一致,应用会从缓存中读取到脏数据。可采用延时双删策略处理,这个我们后面会详细讲到。

数据不一致

缓存和DB的数据不一致的根源 : 数据源不一样
如何解决
强一致性很难,追求最终一致性(时间)
互联网业务数据处理的特点
高吞吐量
低延迟
数据敏感性低于金融业
时序控制是否可行?
先更新数据库再更新缓存或者先更新缓存再更新数据库
本质上不是一个原子操作,所以时序控制不可行

高并发情况下会产生不一致

保证数据的最终一致性(延时双删)
1、先更新数据库同时删除缓存项(key),等读的时候再填充缓存
2、2秒后再删除一次缓存项(key)
3、设置缓存过期时间 Expired Time 比如 10秒 或1小时
4、将缓存删除失败记录到日志中,利用脚本提取失败记录再次删除(缓存失效期过长 7*24)
升级方案
通过数据库的binlog来异步淘汰key,利用工具(canal)将binlog日志采集发送到MQ中,然后通过ACK机
制确认处理删除缓存。

数据并发竞争

这里的并发指的是多个redis的client同时set 同一个key引起的并发问题。
多客户端(Jedis)同时并发写一个key,一个key的值是1,本来按顺序修改为2,3,4,最后是4,但是顺序变成了4,3,2,最后变成了2。

第一种方案:分布式锁+时间戳
1.整体技术方案

这种情况,主要是准备一个分布式锁,大家去抢锁,抢到锁就做set操作。
加锁的目的实际上就是把并行读写改成串行读写的方式,从而来避免资源竞争

image-20210813182938600

2.Redis分布式锁的实现
主要用到的redis函数是setnx()
用SETNX实现分布式锁
时间戳
由于上面举的例子,要求key的操作需要顺序执行,所以需要保存一个时间戳判断set顺序。

系统A key 1 {ValueA 7:00}
系统B key 1 { ValueB 7:05}

假设系统B先抢到锁,将key1设置为{ValueB 7:05}。接下来系统A抢到锁,发现自己的key1的时间戳早于缓存中的时间戳(7:00<7:05),那就不做set操作了。

第二种方案:利用消息队列

在并发量过大的情况下,可以通过消息中间件进行处理,把并行读写进行串行化。
把Redis的set操作放在队列中使其串行化,必须的一个一个执行。

Hot Key

当有大量的请求(几十万)访问某个Redis某个key时,由于流量集中达到网络上限,从而导致这个redis的服务器宕机。造成缓存击穿,接下来对这个key的访问将直接访问数据库造成数据库崩溃,或者访问数据库回填Redis再访问Redis,继续崩溃。

image-20210813183117663

如何发现热key
1、预估热key,比如秒杀的商品、火爆的新闻等
2、在客户端进行统计,实现简单,加一行代码即可
3、如果是Proxy,比如Codis,可以在Proxy端收集
4、利用Redis自带的命令,monitor、hotkeys。但是执行缓慢(不要用)
5、利用基于大数据领域的流式计算技术来进行实时数据访问次数的统计,比如 Storm、Spark
Streaming、Flink,这些技术都是可以的。发现热点数据后可以写到zookeeper中

image-20210813183142992

如何处理热Key:
1、变分布式缓存为本地缓存
发现热key后,把缓存数据取出后,直接加载到本地缓存中。可以采用Ehcache、Guava Cache都可
以,这样系统在访问热key数据时就可以直接访问自己的缓存了。(数据不要求时时一致)
2、在每个Redis主节点上备份热key数据,这样在读取时可以采用随机读取的方式,将访问压力负载到每个Redis上。
3、利用对热点数据访问的限流熔断保护措施
每个系统实例每秒最多请求缓存集群读操作不超过 400 次,一超过就可以熔断掉,不让请求缓存集群,直接返回一个空白信息,然后用户稍后会自行再次重新刷新页面之类的。(首页不行,系统友好性差)
通过系统层自己直接加限流熔断保护措施,可以很好的保护后面的缓存集群。

Big Key

大key指的是存储的值(Value)非常大,常见场景:

  • 热门话题下的讨论
  • 大V的粉丝列表
  • 序列化后的图片
  • 没有及时处理的垃圾数据

大key的影响:

  • 大key会大量占用内存,在集群中无法均衡
  • Redis的性能下降,主从复制异常
  • 在主动删除或过期删除时会操作时间过长而引起服务阻塞

如何发现大key:
1、redis-cli —bigkeys命令。可以找到某个实例5种数据类型(String、hash、list、set、zset)的最大
key。

但如果Redis 的key比较多,执行该命令会比较慢
2、获取生产Redis的rdb文件,通过rdbtools分析rdb生成csv文件,再导入MySQL或其他数据库中进行
分析统计,根据size_in_bytes统计bigkey

大key的处理:
优化big key的原则就是string减少字符串长度,list、hash、set、zset等减少成员数。
1、string类型的big key,尽量不要存入Redis中,可以使用文档型数据库MongoDB或缓存到CDN上。
如果必须用Redis存储,最好单独存储,不要和其他的key一起存储。采用一主一从或多从。
2、单个简单的key存储的value很大,可以尝试将对象分拆成几个key-value, 使用mget获取值,这样
分拆的意义在于分拆单次操作的压力,将操作压力平摊到多次操作中,降低对redis的IO影响。
2、hash, set,zset,list 中存储过多的元素,可以将这些元素分拆。(常见)

以hash类型举例来说,对于field过多的场景,可以根据field进行hash取模,生成一个新的key,例如原
来的
hash_key:{filed1:value, filed2:value, filed3:value ...},可以hash取模后形成如下
key:value形式
hash_key:1:{filed1:value}
hash_key:2:{filed2:value}
hash_key:3:{filed3:value}
...
取模后,将原先单个key分成多个key,每个key filed个数为原先的1/N

3.删除大key时不要使用del,因为del是阻塞命令,删除时会影响性能。
4、使用 lazy delete (unlink命令)
删除指定的key(s),若key不存在则该key被跳过。但是,相比DEL会产生阻塞,该命令会在另一个线程中
回收内存,因此它是非阻塞的。 这也是该命令名字的由来:仅将keys从key空间中删除,真正的数据删
除会在后续异步操作。

redis> SET key1 "Hello"
"OK"
redis> SET key2 "World"
"OK"
redis> UNLINK key1 key2 key3
(integer) 2

缓存与数据库一致性

缓存更新策略

  • 利用Redis的缓存淘汰策略被动更新 LRU 、LFU
  • 利用TTL被动更新
  • 在更新数据库时主动更新 (先更数据库再删缓存——延时双删)
  • 异步更新 定时任务 数据不保证时时一致 不穿DB

不同策略之间的优缺点

策略 一致性 维护成本
利用Redis的缓存淘汰策略被动更新 最差 最低
利用TTL被动更新 较差 较低
在更新数据库时主动更新 较强 最高

与Mybatis整合

可以使用Redis做Mybatis的二级缓存,在分布式环境下可以使用。
框架采用springboot+Mybatis+Redis。框架的搭建就不赘述了。
1、在pom.xml中添加Redis依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、在application.yml中添加Redis配置

#开发配置
spring:
  #数据源配置
  datasource:
    url: jdbc:mysql://192.168.127.128:3306/test?
    serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8
    username: root
    password: root
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
  redis:
    host: 192.168.127.128
    port: 6379
    jedis:
      pool:
        min-idle: 0
        max-idle: 8
        max-active: 8
        max-wait: -1ms
#公共配置与profiles选择无关
mybatis:
  typeAliasesPackage: com.lagou.rcache.entity
  mapperLocations: classpath:mapper/*.xml

3、缓存实现
ApplicationContextHolder 用于注入RedisTemplate

package com.lagou.rcache.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextHolder implements ApplicationContextAware {
private static ApplicationContext ctx;
    @Override
    //向工具类注入applicationContext
    public void setApplicationContext(ApplicationContext applicationContext)
    throws BeansException {
    	ctx = applicationContext; //ctx就是注入的applicationContext
    } 
    //外部调用ctx
    public static ApplicationContext getCtx() {
    return ctx;
    } 
    public static <T> T getBean(Class<T> tClass) {
		return ctx.getBean(tClass);
    } 
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
    return (T) ctx.getBean(name);
    }
}

RedisCache 使用redis实现mybatis二级缓存

package com.lagou.rcache.utils;
import org.apache.ibatis.cache.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 使用redis实现mybatis二级缓存
*/
public class RedisCache implements Cache {
    //缓存对象唯一标识
    private final String id; //orm的框架都是按对象的方式缓存,而每个对象都需要一个唯一标.
    //用于事务性缓存操作的读写锁
    private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    //处理事务性缓存中做的
    //操作数据缓存的--跟着线程走的
    private RedisTemplate redisTemplate; //Redis的模板负责将缓存对象写到redis服务器里面去
    //缓存对象的是失效时间,30分钟
    private static final long EXPRIRE_TIME_IN_MINUT = 30;
    //构造方法---把对象唯一标识传进来
    public RedisCache(String id) {
    if (id == null) {
    	throw new IllegalArgumentException("缓存对象id是不能为空的");
    } 
        this.id = id;
    } 
    @Override
    public String getId() {
    	return this.id;
    } 
    //给模板对象RedisTemplate赋值,并传出去
    private RedisTemplate getRedisTemplate() {
        if (redisTemplate == null) { 
            //每个连接池的连接都要获得RedisTemplate
        	redisTemplate = ApplicationContextHolder.getBean("redisTemplate");
        } 
        return redisTemplate;
    } 
    /*
	保存缓存对象的方法
	*/
    @Override
    public void putObject(Object key, Object value) {
        try {
            RedisTemplate redisTemplate = getRedisTemplate();
            //使用redisTemplate得到值操作对象
            ValueOperations operation = redisTemplate.opsForValue();
            //使用值操作对象operation设置缓存对象
            operation.set(key, value, EXPRIRE_TIME_IN_MINUT, TimeUnit.MINUTES);
            //TimeUnit.MINUTES系统当前时间的分钟数
            System.out.println("缓存对象保存成功");
        } catch (Throwable t) {
            System.out.println("缓存对象保存失败" + t);
        }
    }
    /*
    获取缓存对象的方法
    */
    @Override
    public Object getObject(Object key) {
        try {
            RedisTemplate redisTemplate = getRedisTemplate();
            ValueOperations operations = redisTemplate.opsForValue();
            Object result = operations.get(key);
            System.out.println("获取缓存对象");
            return result;
        } catch (Throwable t) {
            System.out.println("缓存对象获取失败" + t);
            return null;
        }
    }
    @Override
    public Object (Object key) {
        try {
            RedisTemplate redisTemplate = getRedisTemplate();
            redisTemplate.delete(key);
            System.out.println("删除缓存对象成功!");
        } catch (Throwable t) {
            System.out.println("删除缓存对象失败!" + t);
        } 
            return null;
	}
    @Override
    public void clear() {
        RedisTemplate redisTemplate = getRedisTemplate();
        //回调函数
        redisTemplate.execute((RedisCallback) collection -> {
        collection.flushDb();
        return null;
        });
        System.out.println("清空缓存对象成功!");
    }
    @Override
    public int getSize() {
    	return 0;
    } 
    @Override
    public ReadWriteLock getReadWriteLock() {
    	return readWriteLock;
    }
}

4、在mapper中增加二级缓存开启(默认不开启)

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.lagou.rcache.dao.UserDao" >
<cache type="com.lagou.rcache.utils.RedisCache" />
<resultMap id="BaseResultMap" type="com.lagou.rcache.entity.TUser" >
    <id column="id" property="id" jdbcType="INTEGER" />
    <result column="name" property="name" jdbcType="VARCHAR" />
    <result column="address" property="address" jdbcType="VARCHAR" />
</resultMap>
    <sql id="Base_Column_List" >
    id, name, address
    </sql>
    <select id="selectUser" resultMap="BaseResultMap">
    select
    <include refid="Base_Column_List" />
    from tuser
    </select>
</mapper>

5、在启动时允许缓存

package com.lagou.rcache;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
@SpringBootApplication
@MapperScan("com.lagou.rcache.dao")
@EnableCaching
public class RcacheApplication {
    public static void main(String[] args) {
    	SpringApplication.run(RcacheApplication.class, args);
    }
}

运行结果:
控制台:

image-20210813184441873

Redis客户端:

127.0.0.1:6379> keys *
1) "\xac\xed\x00\x05sr\x00
org.apache.ibatis.cache.CacheKey\x0f\xe9\xd5\xb4\xcd3\xa8\x82\x02\x00\x05J\x00\b
checksumI\x00\x05countI\x00\bhashcodeI\x00\nmultiplierL\x00\nupdateListt\x00\x10
Ljava/util/List;xp\x00\x00\x00\x00\x87\xd8u%\x00\x00\x00\x05\xb0\xf6RU\x00\x00\x
00%sr\x00\x13java.util.ArrayListx\x81\xd2\x1d\x99\xc7a\x9d\x03\x00\x01I\x00\x04s
izexp\x00\x00\x00\x05w\x04\x00\x00\x00\x05t\x00'com.lagou.rcache.dao.UserDao.sel
ectUsersr\x00\x11java.lang.Integer\x12\xe2\xa0\xa4\xf7\x81\x878\x02\x00\x01I\x00
\x05valuexr\x00\x10java.lang.Number\x86\xac\x95\x1d\x0b\x94\xe0\x8b\x02\x00\x00x
p\x00\x00\x00\x00sq\x00~\x00\x06\x7f\xff\xff\xfft\x00Cselect\n \n
id, name, address\n \n from tusert\x00\x15SqlSessionFactoryBeanx"

分布式锁

watch

利用Watch实现Redis乐观锁

乐观锁基于CAS(Compare And Swap)思想(比较并替换),是不具有互斥性,不会产生锁等待而消
耗资源,但是需要反复的重试,但也是因为重试的机制,能比较快的响应。因此我们可以利用redis来实
现乐观锁。具体思路如下:
1、利用redis的watch功能,监控这个redisKey的状态值
2、获取redisKey的值
3、创建redis事务
4、给这个key的值+1
5、然后去执行这个事务,如果key的值被修改过则回滚,key不加1
Redis乐观锁实现秒杀

public class Second {
    public static void main(String[] arg) {
        String redisKey = "lock";
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        try {
            Jedis jedis = new Jedis("127.0.0.1", 6378);
            // 初始值
            jedis.set(redisKey, "0");
            jedis.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 1000; i++) {
            executorService.execute(() -> {
                Jedis jedis1 = new Jedis("127.0.0.1", 6378);
                try {
                    jedis1.watch(redisKey);
                    String redisValue = jedis1.get(redisKey);
                    int valInteger = Integer.valueOf(redisValue);
                    String userInfo = UUID.randomUUID().toString();
                    // 没有秒完
                    if (valInteger < 20) {
                        Transaction tx = jedis1.multi();
                        tx.incr(redisKey);
                        List list = tx.exec();
                        // 秒成功 失败返回空list而不是空
                        if (list != null && list.size() > 0) {
                            System.out.println("用户:" + userInfo + ",秒杀成功!
                                    当前成功人数:" + (valInteger + 1));
                        } // 版本变化,被别人抢了。
                        else {
                            System.out.println("用户:" + userInfo + ",秒杀失败");
                        }
                    } // 秒完了
                    else {
                        System.out.println("已经有20人秒杀成功,秒杀结束");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    jedis1.close();
                }
            });
        }
        executorService.shutdown();
    }
}

setnx

实现原理

共享资源互斥
共享资源串行化
单应用中使用锁:(单进程多线程)
synchronized、ReentrantLock
分布式应用中使用锁:(多进程多线程)
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
利用Redis的单线程特性对共享资源进行串行化处理

实现方式

获取锁
方式1(使用set命令实现)—推荐

/**
    * 使用redis的set命令实现获取分布式锁
    *
    * @param lockKey    可以就是锁
    * @param requestId  请求ID,保证同一性 uuid+threadID
    * @param expireTime 过期时间,避免死锁
    * @return
    */
   public boolean getLock(String lockKey, String requestId, int expireTime) {
       //NX:保证互斥性
       // hset 原子性操作 只要lockKey有效 则说明有进程在使用分布式锁
       String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
       if ("OK".equals(result)) {
           return true;
       }
       return false;
   }

方式2(使用setnx命令实现) — 并发会产生问题

public boolean getLock(String lockKey,String requestId,int expireTime) {
	Long result = jedis.setnx(lockKey, requestId);
    if(result == 1) {
    //成功设置 进程down 永久有效 别的进程就无法获得锁
    jedis.expire(lockKey, expireTime);
    	return true;
    } 
    return false;
}

释放锁
方式1(del命令实现) — 并发

/
**
* 释放分布式锁
* @param lockKey
* @param requestId
*/
public static void releaseLock(String lockKey,String requestId) {
    if (requestId.equals(jedis.get(lockKey))) {
    	jedis.del(lockKey);
    }
}

问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。

方式2(redis+lua脚本实现)—推荐

public static boolean releaseLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return
redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
    if (result.equals(1L)) {
    return true;
    } 
    return false;
}

存在问题

单机
无法保证高可用
主—从
无法保证数据的强一致性,在主机宕机时会造成锁的重复获得。

image-20210813185301017

无法续租
超过expireTime后,不能继续使用

本质分析

CAP模型分析
在分布式环境下不可能满足三者共存,只能满足其中的两者共存,在分布式下P不能舍弃(舍弃P就是单
机了)。
所以只能是CP(强一致性模型)和AP(高可用模型)。
分布式锁是CP模型,Redis集群是AP模型。 (base)
Redis集群不能保证数据的随时一致性,只能保证数据的最终一致性。
为什么还可以用Redis实现分布式锁?

与业务有关
当业务不需要数据强一致性时,比如:社交场景,就可以使用Redis实现分布式锁
当业务必须要数据的强一致性,即不允许重复获得锁,比如金融场景(重复下单,重复转账)就不要使用
可以使用CP模型实现,比如:zookeeper和etcd。

分布式锁特性

  • 互斥性
    任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁。
  • 同一性
    锁只能被持有该锁的客户端删除,不能由其它客户端删除。
  • 可重入性
    持有某个锁的客户端可继续对该锁加锁,实现锁的续租
  • 容错性
    锁失效后(超过生命周期)自动释放锁(key失效),其他客户端可以继续获得该锁,防止死锁

分布式锁的实际应用

数据并发竞争
利用分布式锁可以将处理串行化,前面已经讲过了。
防止库存超卖

image-20210813185548268

订单1下单前会先查看库存,库存为10,所以下单5本可以成功;
订单2下单前会先查看库存,库存为10,所以下单8本可以成功;
订单1和订单2 同时操作,共下单13本,但库存只有10本,显然库存不够了,这种情况称为库存超卖。
可以采用分布式锁解决这个问题。

image-20210813185604783

订单1和订单2都从Redis中获得分布式锁(setnx),谁能获得锁谁进行下单操作,这样就把订单系统下单的顺序串行化了,就不会出现超卖的情况了。伪码如下:

//加锁并设置有效期
if(redis.lock("RDL",200)){
    //判断库存
    if (orderNum<getCount()){
        //加锁成功 ,可以下单
        order(5);
        //释放锁
        redis,unlock("RDL");
    }
}

注意此种方法会降低处理效率,这样不适合秒杀的场景,秒杀可以使用CAS和Redis队列的方式。

Zookeeper分布式锁的对比

  • 基于Redis的set实现分布式锁
  • 基于zookeeper临时节点的分布式锁

image-20210813185715898

基于etcd实现
三者的对比,如下表

Redis zookeeper etcd
一致性算法 paxos(ZAB) raft
CAP AP CP CP
高可用 主从集群 n+1 (n至少为2) n+1
接口类型 客户端 客户端 http/grpc
实现 setNX createEphemeral restful API

分布式集群架构中的session分离

传统的session是由tomcat自己进行维护和管理,但是对于集群或分布式环境,不同的tomcat管理各自
的session,很难进行session共享,通过传统的模式进行session共享,会造成session对象在各个
tomcat之间,通过网络和Io进行复制,极大的影响了系统的性能。
可以将登录成功后的Session信息,存放在Redis中,这样多个服务器(Tomcat)可以共享Session信息。
利用spring-session-data-redis(SpringSession),可以实现基于redis来实现的session分离。