Advertisement

Redis相关知识点

阅读量:

Redis

NoSQL数据库

概述

NoSQL(Not Only SQL),不仅仅是SQL,泛指非关系型数据库。NoSQL不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。

  • 不遵循SQL标准
  • 不支持ACID
  • 远超SQL的性能

适用场景

  • 对数据高并发读写
  • 海量数据的读写
  • 对数据高可扩展性

不适用场景

需要事务支持

基于sql的结构化查询存储,处理复杂的关系

安装

安装gcc环境

复制代码
    yum install gcc

解压压缩包

复制代码
    tar -zxvf redis-6.2.1.tar.gz

进入目录

复制代码
    cd redis-6.2.1

在此目录下执行make

复制代码
    make

再执行安装

复制代码
    make install

默认安装目录在/usr/local/bin

后台启动 设置配置文件daemonize no 为yes

复制代码
    redis-server /etc/redis.conf

相关知识

端口6379,默认16个数据库,初始默认使用0号库

单线程+多路IO复用技术

多路复用是指使用一个线程老检查多个文件描述符的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一线程里执行,也可以启动线程执行。

常用五大数据类型

Redis键(key)

复制代码
    keys *

查看当前库中所有key

复制代码
    exists <key>

判断某个key是否存在

复制代码
    type <key>

查看你的key是什么类型

复制代码
    del <key>

删除指定的key数据

复制代码
    unlink <key>

根据value 选择非阻塞删除(仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作)

复制代码
    expire <key>  10

10秒:为给定的key设置过期时间

复制代码
    ttl <key>

查看还有多少秒过期,-1表示永不过期,-2表示已经过期

复制代码
    select

切换数据库

复制代码
    dbsize

查看当前数据库的key的数量

复制代码
    flushdb

清空当前库

复制代码
    flushall

通杀全部库

String

String是Redis最基本的类型,一个key对应一个value

String类型是二进制安全的,意味着Redis的string可以包含任何数据。比如jpg图片或者序列化对象

String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M

命令
复制代码
    set <key>  <value>

添加键值对

复制代码
    get <key>

查询对应键值

复制代码
    append <key>  <value>

将给定的value追加到原值的末尾

复制代码
    strlen <key>

key获得值得长度

复制代码
    setnx <key> <value>

只有key不存在得时候才能设置成功

复制代码
    incr <key>

将key中存储的数字值+1

复制代码
    decr <key>

将key中存储的数字值-1

复制代码
    incrby/decrby <key> <步长>

将key中存储的数字值增减,自定义步长。

原子操作是值不会被线程调度机制打断的操作,这种操作一旦开始就一直运行到结束,中间不会有任何context switch(切换到另一线程)

1、在单线程中,能够在单条指令中完成的操作都可以认为是原子操作,因为中断只能发生于指令之间。

2、在多线程中们不能被其他进程大段的操作就叫原子操作。

Redis单命令的原子性主要得益于Redis的单线程。

复制代码
    mset <key1> <value1> <key2> <value2>

添加多个键值对

复制代码
    mget <key1>  <key2>

获取多个value

复制代码
    msetnx <key1> <value1> <key2> <value2>

同时设置多个键值对,当且仅当给定key都不存在

复制代码
    getrange <key><开始位置><结束位置>

获得值的范围 前包后包

复制代码
    setrange <key> <开始位置>

用value覆写key所存储的字符串值,从起始位置开始

复制代码
    setex <key><过期时间> <value>

设置键值的同时,设置过期时间,单位秒。

复制代码
    getset <key><value>

以旧换新

数据结构

动态字符串,内部结构实现类似于ArrayList,采用预分配冗余空间的方式,来减少内存的频繁分配

内部为当前字符串时间分配的空间一般要高于实际字符串长度,当字符串长度小于1M的空间,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M。需要注意的是字符串最大长度是512M

Redis列表(List)

单键多值

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部或尾部。

底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会很差

命令
复制代码
    lpush/rpush <key> <value> <value> <value>

从左边/右边插入值

复制代码
    lpop/rpop <key>

从左边/右边吐出一个值。值在键在,值光键亡

复制代码
    rpoplpush <key1><key2>

从key1列表右边吐出一个值,插到key2列表左边

复制代码
    lrange <key> <start><stop>

按照索引下标获得元素(从左到右)

复制代码
    lindex <key><index>

按照索引下标获得元素

复制代码
    llen <key>

获得列表长度

复制代码
    linsert <key> before/after <value> <newvalue>

在的前面后面插入插入值

复制代码
    lrem <key><n><value>

从左边删除n个value(从左到右)

复制代码
    lset <key> <index> <value>

将列表下标为index的值替换成value

数据结构

快速链表quickList

首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也就是压缩链表

它将所有的元素紧挨着一起存储,分配的是一块连续的内存。

当数据量比较多的时候才会改成quicklist

因为普通的链表需要的附加指针空间太大,会比较浪费空间,比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next

redis将链表和ziplist结合起来组成quicklist,也就将多个ziplist使用双向指针串起来使用,这样既满足了快速的插入删除性能,又不会出现太大的空间冗余

Redis集合(set)

redis set对外提供的功能域list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的

set是string类型的无序集合,它底层其实是一个value为null的hash表,所以添加删除查找的复杂度都是O(1)

命令
复制代码
    sadd <key><value1><value2>

将一个或多个member元素加入到集合key中,已经存在的member元素将被忽略

复制代码
    smembers <key>

取出该集合的所有值

复制代码
    sismember <key> <value>

判断集合是否含有指定value

复制代码
    scard<key>

返回该集合的元素个数

复制代码
    srem <key> <value1><value2>

删除集合中的某个元素

复制代码
    spop<key>

随机从该集合中吐出一个值

复制代码
    srandmember <key> <n>

随机从该集合中取出n个值。不会从集合中删除。

复制代码
    smove <resource><destination> value

把集合中的一个值从一个集合移动到另一个集合

复制代码
    sinter <key1><key2>

返回两个集合交集元素

复制代码
    sunion<key1><key2>

返回两个集合并集元素

复制代码
    sdiff<key1><key2>

返回两个集合的差集元素 key1中的不包含key2中的

数据结构

set的数据结构是dict字典,字典使用哈希表实现的

java中的hashset的内部实现使用的是hashmap,之不够所有的value指向同一个对象,redis的set结构也是一样他的内部也是用hash结构,所有的value都指向同一个内部值

Redis哈希(Hash)

hash是一个键值对集合

hash是一个string类型的field和value的映射表,hash特别适合用于存储对象,类似于java里面的Map<String,Object>

命令
复制代码
    hset <key> <field><value>

给key集合中的键赋值

复制代码
    hget <key1><field>

从集合取出value

复制代码
    hmset <key><field1><value1><field2><value2>

批量设置hash的值

复制代码
    hexist <key1><field>

查看哈希表key中,给定的field是否存在

复制代码
    hkeys <key>

列出该hash集合所有field

复制代码
    hvals  <key>

列出该hash集合的所有value

复制代码
    hincrby <key><field><increment>

为哈希表key中额field 增加 指定增量

复制代码
    hsetnx <key><field><value>

将哈希表key中域field的值设置为value,当且仅当域field不存在

数据结构

Hash类型对应的数据结构是两种:ziplist,hashtable。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable

Redis有序集合Zset

和普通集合set相似,没有重复元素的字符串集合

有序集合的每个成员都关联了一个评分(sorce),这个评分被用于按照从最低分到最高分的方式排序集合中的成员,集合的成员是唯一的,但是评分可以重复。

因为元素是有序的,所以可以根据评分或次序来获取一个范围的元素

访问有序集合的中间元素也很快,因此可以使用有序集合作为一个没有重复成员的智能列表

命令
复制代码
    zadd <key><score1><value1><score2><value2>

将一个或多个member元素及其score值加入到有序集key中

复制代码
    zrange <key><start><stop>【withscores】

返回有序集key中,下标在 之间的元素

带上withscores可以让分数一起和值返回到结果集

复制代码
    zrangebyscore <key> <min> <max> [withscores] [limit offset count]

返回有序集合key中,所有score值介于min和max之间(包括等于min或max的成员),有序集成员按score值递增从小到大次序排列

复制代码
    zrevrangebyscore key max min [withscore] [limit offset count]

同上。从大到小排列

复制代码
    zincrby <key><increment><value>

为元素的score加上增量

复制代码
    zrem <key><value>

删除该集合下指定值的元素

复制代码
    zcount <key><min><max>

统计该集合,分数区间内的元素个数

复制代码
    zrank <key><value>

返回该值在集合中的排名,从0开始

数据结构

zset是Redis提供的一个特别的数据结构,一方面等价于java中的Map<String, Double>,可以给每一个元素value 赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的返回来获取元素的列表

底层使用两个数据结构

  • hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score
  • 跳跃表,目的在于给元素value排序,根据score的返回获取元素列表

跳表

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8m9fjvuB-1657675688448)(D:\TyporaDocument\img\跳表.png)]

关于找51,从第二层开始找,1节点比51节点小,向后比较。21节点比51节点小,继续向后比较,后面就是null了,所以从21节点向下到第一层,在第一层,41节点比51节点小,继续向后,61节点比51节点大,所以从41向下,在第0层,51节点为要查找的节点,节点被找到,共查找4次

跳表比有序链表效率要高

配置文件

支持远程连接

注释掉bind 127.0.0.1 -::1

protected-mode yes改为no,支持远程访问

Redis的发布订阅

Redis发布订阅是一种消息通信模式:发送者发送消息,订阅者接收消息

Redis客户端可以订阅任意数量的频道

发布订阅命令行实现

打开一个客户端订阅channel1

复制代码
    subscribe channel1

打开另一个客户端,给channel1发布消息hello

复制代码
    publish channel1 hello

返回1是订阅者数量

打开第一个客户端可以看到发送的消息

发布的消息没有持久化,如果在订阅的客户端收不到hello,只能收到订阅后发布的消息

Redis6新数据类型

BitMaps

可以实现对位的操作

  • Bitmaps本身不是一种数据类型,实际上他就是字符串,但是他可以对字符串的位进行操作
  • Bitmaps单独提供了一套命令,所以在Redis中使用Bitmaps和使用字符串的方法不太相同,可以把Bitmaps想象成一个以位为单位的数组,数组的每个单元只能存储0和1,数组的下标在Bitmaps中叫做偏移量。
命令
复制代码
    setbit <key> <offset> <value>

设置Bitmaps中某个偏移量的值

复制代码
    getbit <key> <offset>

获取某个偏移量的值

复制代码
    bitcount <key> [start end]

统计字符串被设置为1的bit数,一般情况下,给定的整个字符串都会被计数,通过指定额外的start或end参数,可以让计数旨在特定的位上进行。start和end参数的设置,都可以使用负数值,二者皆包含

复制代码
    bittop and(or/not/xor) <destkey> [key...]

bittop是一个符合操作,它可以做多个Bitmaps的and、or、not、xor操作,并将结果保存到destkey中

HyperLogLog

工作中,我们经常会遇到与统计相关的功能需求,比如统计网站页面访问量,可以使用Redis的incr、increby实现

求集合中不重复元素个数的问题被称为基数问题

解决方案

  • 数据存储在Mysql中,使用distinct count计算不重复个数
  • 使用redis提供的hash、set、bitmaps等数据结构处理

但是以上方案随着数据增加导致占用空间越来越大,hyperloglog降低一定精度来平衡存储空间

hyperloglog是用来做基数统计的算法,hyperloglog的优点是在输入元素的数量或者体积非常大时,计算基数所需的空间总是固定的、并且是很小的

在redis中每个hyperloglog键只需要花费12kb内存,就可以计算接近2^64个不同元素的基数,但是只会根据输入元素来计算基数,不会存储输入元素本身,所以不能像集合那样返回输入的各个元素

基数

比如数据集{1,3,5,7,5,7,8},那么数据集的基数集是{1,3,5,7,8}.基数为5(不重复元素的个数),基数估计就是在误差可接受的范围内,快速计算基数

命令
复制代码
    pfadd <key> <element> [element...]

添加指定元素到HyperLogLog中

复制代码
    pfcount <key> [key...]

计算HLL的近似基数,可以计算多个HLL,比如HLL存储每天的UV,计算一周的UV可以使用7天UV合并计算即可

pfmerge [sourcekey]

将一个或多个JHLL合并后的结果存储在另一个HLL中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得

Geospatial

经纬度,地理位置操作

命令

geoadd

添加地理位置(经度,纬度,名称)

两极无法直接添加,一般会下载城市数据,直接通过java 程序导入,有效的经度从-180到180,有效的纬度从-85.05到85.05

当坐标位置超出指定范围时,该命令将返回一个错误

已经添加的数据无法再次添加

geopos [member…]

获得指定地区的坐标值

geodist 【m|km|ft|mi】

获得两个位置之间的直线距离

georadius radius m|km|ft|mi

以给定的经纬度为中心,找出某半径内的元素

Jedis案例

复制代码
    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>
    </dependencies>
复制代码
    package com.atguigu.jedis;
    
    import redis.clients.jedis.Jedis;
    
    import javax.swing.*;
    import java.util.Random;
    
    /** * @author Kobatal
     * @version V1.0
     * @Package com.atguigu.jedis
     * @date 2022/7/8 19:19
     * @Copyright © 2020-2021 SIS
     */
    public class PhoneCode {
    public static void main(String[] args) {
        verifyCode("11111111111");
    //        getRedisCode("11111111111","282261");
    }
    
    public static void getRedisCode(String phone,String code){
        Jedis jedis = new Jedis("192.168.126.128",6379);
        String codeKey = "VerufyCode" + phone+":code";
        String redisCode = jedis.get(codeKey);
        if (redisCode.equals(code)){
            System.out.println("成功");
        }else {
            System.out.println("失败");
        }
        jedis.close();
    }
    public static void verifyCode(String phone){
        Jedis jedis = new Jedis("192.168.126.128",6379);
        String countKey = "VerifyCode"+ phone+":count";
        String codeKey = "VerufyCode" + phone+":code";
        String count = jedis.get(countKey);
        if (count == null){
            jedis.setex(countKey,24*60*60,"1");
        }else if (Integer.parseInt(count)<=2){
            jedis.incr(countKey);
        }else if (Integer.parseInt(count)>2){
            System.out.println("今天的发送次数已经超过三次");
            jedis.close();
        }
        String vcode = getCode();
        jedis.setex(codeKey,120,vcode);
        jedis.close();
    }
    
    public static String getCode(){
        Random random = new Random();
        StringBuilder code = new StringBuilder();
        for (int i =0; i<6;i++){
            int rand = random.nextInt(10);
            code.append(rand);
        }
        return code.toString();
    }
    }

springboot整合redis

复制代码
    <parent>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.1.RELEASE</version>
        <artifactId>spring-boot-starter-parent</artifactId>
    </parent>
    <groupId>org.example</groupId>
    <artifactId>redis_springboot</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    
        <!-- redis依赖commons-pool 这个依赖一定要添加 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-json</artifactId>
        </dependency>
    
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
复制代码
    package com.atguigu.redis.springboot.config;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.cache.CacheManager;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.cache.RedisCacheConfiguration;
    import org.springframework.data.redis.cache.RedisCacheManager;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializationContext;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import java.time.Duration;
    
    /** * @author Kobatal
     * @version V1.0
     * @Package com.atguigu.redis.springboot.config
     * @date 2022/7/8 19:59
     * @Copyright © 2020-2021 SIS
     */
    @EnableCaching
    @Configuration
    public class RedisConfig extends CachingConfigurerSupport {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
    
    
        //设置“key"的序列化方式
        template.setKeySerializer(new StringRedisSerializer());
    
        //设置“值”的序列化方式
        template.setValueSerializer(jackson2JsonRedisSerializer);
    
        //设置“hash”类型数据的序列化方式
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
    
        return template;
    }
    
    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        StringRedisSerializer redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
    
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //配置序列化(解决乱码问题)过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
    }
复制代码
    package com.atguigu.redis.springboot.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.nio.channels.Pipe;
    
    /** * @author Kobatal
     * @version V1.0
     * @Package com.atguigu.redis.springboot.controller
     * @date 2022/7/8 20:19
     * @Copyright © 2020-2021 SIS
     */
    @RestController
    @RequestMapping("/redisTest")
    public class RedisTestController {
    @Autowired
    private RedisTemplate redisTemplate;
    @GetMapping
    public String testRedis(){
        //设置值到redis
        redisTemplate.opsForValue().set("name","lucy");
        //获取
        String name = (String) redisTemplate.opsForValue().get("name");
        return name;
    }
    }

Redis事务

redis事务是一个单独的隔离操作:事务中的所有命令都会序列化按顺序执行。食物在执行的过程中们不会被其他客户端发送来的命令请求所打断

从输入Multi命令开始,输入的命令都会依次进入命令队列,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。

组队的过程中可以通过discard来放弃组队

事务的错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

悲观锁,每次拿数据都认为别人会修改,所以每次拿数据的时候都会上锁,这样别人拿数据就会block直到它拿到锁。传统的关系型数据库里边就用到了,比如行锁,表锁,读锁,写锁,都是在做操作之前就先上锁。

乐观锁,每次拿数据都认为别人不会修改,但是在更新的时候会判断一下在此期间别人有没有去更新数据,可以使用版本号等机制。乐观锁适用于多读的应用,可以提高吞吐量,redis用的就是checkAndSet机制实现事务。

watch key

在执行multi之前,限制性watch key1[key2]可以监视一个或多个key,如果在事务执行之前这个key被其他命令所改动,那么事务将被打断。

unwatch

取消watch命令对所有key的监视

如果在执行watch命令之后,exec命令或discard命令先被执行力的话,就不需要再执行unwatch了

Redis事务三特性

单独的隔离操作

事务中所有命令都会序列化、按顺序的执行。事务在执行的过程中不会被其他客户端发送来的命令请求所打断

没有隔离级别的概念

队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

不保证原子性

事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

秒杀案例

  1. uid和prodid非空判断,判断用户和商品是否存在
  2. 连接redis
  3. 拼接key,包括拼接库存key和秒杀成功用户key
  4. 获取库存,如果库存为空则说明秒杀尚未开始
  5. 判断用户是否重复秒杀的操作
  6. 判断如果商品数量库存小于1,秒杀结束
  7. 秒杀成功,库存-1,秒杀成功用户添加到清单中

连接超时

通过连接池解决

超卖问题

  • 利用乐观锁淘汰用户
  • 监视库存,将减库存和添加成功用户使用事务
复制代码
    Transaction multi = jedis.multi();
    multi.decr(kcKey);
    multi.sadd(userKey,uid);
    List<Object> results = multi.exec();
    if(results == null || results.size == 0){
    	log.info("秒杀失败")
    	jedis.close;
    	return false;
    }

库存遗留

乐观锁造成库存遗留,2000人抢500库存,最后剩200

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数,提升性能

lua脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

利用lua脚本淘汰用户,解决超卖问题

RDB持久化

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里

如何备份

Redis会单独创建(fork)一个子进程来持久化,会先将数据写入到一个临时文件中,带持久化过程都结束了,再用这个临时文件替换上次持久化好的文件,整个过程中,主进程是不进行任何IO操作的,着酒确保了极高的性能,如果需要进行大规模数据恢复,且对于数据恢复的完整性不是非常敏感,RDB方式要比AOF方式更加高效。缺点是最后一次持久化后的数据可能丢失

Fork

fork的作用是赋值一个与当前进程一样的进程,新进程的所有数据输值都和袁锦程一致,但是是一个全新的进程,并作为原进程的子进程。

在linux程序中fork会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,处于效率考虑,linux中引入了写时复制技术

一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容赋值一份给子进程

优势

  • 适合大规模的数据恢复
  • 对数据完整性和一致性要求不高更适合使用
  • 节省磁盘空间
  • 恢复速度快

劣势

  • fork阶段中内存数据被克隆一份,大致2倍的膨胀性需要考虑
  • redis在fork时使用了写时拷贝技术,但是如果数据庞大的时还是比较消耗性能
  • 在备份周期在一定间隔时间做一次备份,所以如果redis意外down掉的话,就会丢失最后一次快照后的所有修改

rdb备份

  • 关闭redis
  • 先把备份的文件拷贝到工作目录下
  • 启动redis,备份数据会加载

AOF持久化

以日志的形式来记录每个写操作,将Redis执行过的所有写指令记录下来,只许追加文件,但是不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次完成数据的恢复工作

持久化流程

  • 客户端的请求写命令会被append追加到AOF缓冲区内
  • AOF缓冲区根据AOF持久化策略将操作同步到磁盘的AOF文件中
  • AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite
  • redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的

AOF和RDB同时开启redis默认取AOF的数据(数据不存在丢失)

异常回复

修改默认的appendonly no,改为yes

如遇到AOF文件损坏通过/usr/local/bin/redis-check-aof–fix appendonly.aof进行恢复

备份被写坏的AOF文件

重启redis,重新加载

AOF同步频率设置

appendfsync always

始终同步,每次redis的写入都会立刻记入日志了性能较差但数据完整性比较好

appendfsync everysec

每秒同步,每秒计入日志一次,如果宕机,本秒的数据可能丢失

appendfsync no

redis不主动进行同步,把同步时机交给操作系统

Rewrite压缩

AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值,Redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令bgrewriteaof

重写原理

AOF文件持续增长而过大时,会fork出一条新进程来将文件重写,redis4.0版本后的重写是指把rbd的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。

触发机制

Redis会记录上次重写时AOF的大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发

重写虽然可以解决大量磁盘空间,减少恢复时间。但是每次重写还是又一定的负担的,因此Redis要满足一定条件才会进行重写

auto-aof-rewrite-percentage:设置重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的两倍时触发)

auto-aof-rewrite-min-size:设置重写的基准值,最小文件64MB。达到这个值开始重写

重写流程

  • bgrewriteaof触发重写,判断当前是否有bgsave或bgrewriteof在运行,如果有,则等待该命令结束后再继续执行
  • 主进程fork出子进程执行重写操作,保证主进程不会阻塞
  • 子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失
  • 子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。主进程把aof_rewrite_buf中的数据写入到新的AOF文件。
  • 使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。

优势

  • 备份机制更稳健,丢失数据概率更低
  • 可读的日志文本,通过操作AOF文件,可以处理误操作

劣势

  • 比RDB占用更多的磁盘空间
  • 恢复备份速度慢
  • 每次读写都同步有一定性能压力
  • 存在个别bug,造成恢复不能

官方推荐持久化方式

如果的对数据不敏感,可以单独使用RDB

不仅阿姨单独用AOF,因为可能出现bug

如果只做村数据缓存,可以都不用

RDB持久化方式能够在指定的时间间隔能对你的数据进行快照存储

AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次写的操作到文件末尾

redis还能对AOF文件进行后台重写,是的AOF文件的体积不至于过大

只做缓存:如果你只希望你的数据在服务器运行的时候存在,你也可以不适用任何持久化方式

同时开两种持久化方式,redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整

RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF?建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的bug,留着作为一个万一的手段

主从复制

主机数据更新后根据配置和策略,自动同步到备机的master/slaver机制,master主写,slave主读

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OIHPDxZE-1657675688455)(D:\TyporaDocument\img\微信截图_20220711190309.png)]

读写分离

容灾快速恢复(一台从服务器挂掉可以迅速转其他从服务器)

案例

创建myredis文件夹

复制redis.config配置文件

配置一主两从,创建三个配置文件

redis6379.conf

redis6380.conf

redis6381.conf

  • 在三个配置文件写入内容
复制代码
    include /myredis/redis.conf
    pidfile /var/run/redis_6379.pid
    port 6379
    dbfilename dump6379.rbdinclude /myredis/redis.conf
  • 启动三个redis服务
复制代码
    info replication

查看当前主机运行状况

复制代码
     slaveof ip port

设置从机

一主二从

从服务器挂掉之后,再次重启就变成主服务器了,再次加入到从服务器中会从头复制主服务器中的数据

主服务器挂掉,从服务器不做任何事,主服务器重启之后仍然是主服务器带两个小弟

主从复制的原理
  • 从服务器连接上主服务器之后,从服务器向主服务器发送进行数据同步消息
  • 主服务器接收到从服务器发送过来的同步消息,把主服务器数据进行持久化,生成rdb文件,把rdb文件发送给从服务器,从服务器拿到rdb进行读取
  • 每次主服务器进行写操作之后,和从服务器进行数据同步

全量复制:slave服务在接收到数据库文件数据之后,将其存盘并加载到内存中

增量复制:master继续降新的所有收集到的修改命令依次传给slave,完成同步

薪火相传

上一个slave可以是下一个slave的master,slave同样可以接收其他slaves的连接和同步请求

风险是一旦某个slave宕机,后面的slave都没法备份,主机挂了,从机还是从机,无法写数据

反客为主(手动)

slaveof no one

允许从服务器上位

哨兵模式

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票自动将从库转换为主库

步骤

一主二仆的环境

自定义/myredis目录下新建sentinel.conf

配置哨兵

sentinel monitor mymaster 127.0.0.1 6379 1

其中mymaster 为监控对象起的服务器名称,1为至少有多少个哨兵同意迁移的数量

redis-sentinel sentinel.conf

  • 启动哨兵

主服务器挂掉,从服务器选出一个当新主服务器,原来的主服务器会变成从服务器

复制延时(缺点)

由于所有写操作都是在master上操作,同步更新到slave上,所以master同步到slave机器上有一定的延迟,当系统繁忙的时候,延迟问题会严重,slave及其数量的增加也会使这个问题严重

选举规则
  • 优先级靠前

redis.conf中默认replica-priority 100 ,值越小优先级越高

  • 选择偏移量最大的

偏移量是指获得原主机数据最全的

  • 选择runid最小的从服务

每个redis实例启动后都会随机生成一个40位的runid

Redis集群

无中心化集群

Redis集群实现了对Redis的水平扩容,既启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数居的1/N

Redis集群通过分区来提供一定程度的可用性,即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求

部署

修改配置文件

复制代码
    include /myredis/redis.conf
    pidfile /var/run/redis_6379.pid
    port 6379
    dbfilename dump6379.rbdinclude /myredis/redis.conf
    #打开集群模式
    cluster-enabled yes
    #设置节点配置文件名
    cluster-config-file nodes-6379.conf
    #设定节点失联时间,超过时间自动进行主从切换
    cluster-node-timeout 15000

启动redis

将六个节点合成一个集群

复制代码
    cd /opt/redis-7.0.2/src
复制代码
    redis-cli --cluster create --cluster-replicas 1 192.168.126.128:6379 192.168.126.128:6380 192.168.126.128:6381 192.168.126.128:6389 192.168.126.128:6390 192.168.126.128:6391

一个集群至少要有三个主节点

–cluster-replicas 1表示我们希望为集群中的每个主节点创建一个从节点。

分配原则尽量保证每个主数据库运行在不同的ip地址,每个从库和主库不在一个ip地址上

slots

All 16384 slots covered

一个Redis集群包含16384个插槽,数据库中的每个键都属于这16384个插槽的其中一个

集群使用公式CRC16(key)%16384来计算键key属于哪个槽,其中CRC16(key)语句用于计算key的CRC16校验和

集群中的每个节点负责处理一部分插槽。举个例子,如果一个集群可以有主节点,其中节点A负责处理0号-5460号插槽

集群加入多个值用组添加,插槽计算也是通过组的名称进行计算

故障恢复

master挂掉,slaver上,原master重启后变slaver

主从都挂掉,而cluster-require-full-coverage为yes,那么整个集群都挂掉

如果某一段插槽的主从都挂掉,而cluster-require-full-coverage为no,那么,该插槽数据全都不能使用,也无法存储

redis.conf中的参数 cluster-require-full-coverage

优点

实现扩容、分摊压力、无中心配置相对简单

不足

多键操作时不被支持的

多键的Redis事务时不被支持的。lua脚本不被支持

由于集群方案出现较晚,很多公司已经采用其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

应用问题

缓存穿透

应用服务器压力变大

redis命中率降低

一直查询数据库,又查询不到,大量的访问导致数据库崩溃

解决方案

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

  • 对空值缓存:如果一个查询返回的数据为空(不管数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
  • 设置白名单:使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmaps里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
  • 采用布隆过滤器:布隆过滤器可以用于检索一个元素是否在一个集合中。他的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从未避免了对底层存储系统的查询压力。
  • 实时监控:当发现redis的命中率开始急速降低,需要排查访问对象和访问数据,设置黑名单

缓存击穿

key对应的数据存在,但是redis中过期,此时若又大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这时候大并发的请求可能瞬间压垮DB

解决方案
  • 预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
  • 实时调整:现场监控哪些数据热门,实时调整key的过期时长
  • 用锁:在缓存失效的时候不立即去访问数据库,先使用缓存工具的某些带成功操作的返回值的操作,当操作返回成功时再加载数据库,并回设缓存,最后删除mutexkey,当操作返回失败证明有线程再loaddb,当前线程睡眠一段时间再重试

缓存雪崩

缓存雪崩和缓存击穿的区别在于大量key集中过期

解决方案

构建多级缓存架构:nginx+redis+其他缓存

  • 使用锁或者队列:用锁或者队列方式保证不会有大量的线程对数据库一次性读写,避免失效时大量请求落到DB上,不适合高并发
  • 设置过期标志:记录缓存数据是否过期(设置提前量),如果过期就触发另外线程更新实际key的缓存
  • 将缓存失效时间分散开:在原有的失效时间基础上加一个随机值,这样缓存的过期时间的重复率会降低,降低集体失效的概率

分布式锁

redis实现的分布式锁

设置锁的过期时间

用setnx,或者setxx,nx只在键不存在的时候进行设置操作,xx相反

用以上命令可以上锁,用del释放锁

如果一直没有释放,可以设置key的过期时间,自动释放

防止uuid误删

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bKBmLH0w-1657675688456)(D:\TyporaDocument\img\微信截图_20220712195551.png)]

删除操作缺乏原子性

使用lua脚本保证删除原子性

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mch6LgXZ-1657675688456)(D:\TyporaDocument\img\微信截图_20220712200010.png)]

为保证分布式锁可用,需要满足四个条件:

  • 互斥性:任意时刻只有一个客户端持有锁
  • 不发生死锁:即使一个客户端持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
  • 加锁解锁必须是同一个客户端
  • 加锁解锁有原子性

全部评论 (0)

还没有任何评论哟~