Redis

前言:依据狂神说视频整理

一、Nosql概述

1、初识Nosql

为什么会使用Nosql呢?我们现在所处于大数据时代,用户的个人信息,社交网络,地理位置,用户自己产生的数据等,数据量十分的庞大,而一般的数据库无法进行分析处理。当访问量过大后,一个服务器承受不了,当出现这种情况后,于是Nosql便应运而生了,Nosql 的全程是Not only sql 不仅仅是sql,因此,他能做的sql 更多。

2、Nosql的特点

  • 方便扩展

  • 大数据量的高性能

  • 数据类型是多样性的

传统RDBMS和Nosql

传统的RDBMS

  • 结构化的组织

  • SQL

  • 数据和关系都存在单独的表中

  • 数据定义语言

  • 严格的一致性

  • 基础的事务

Nosql

  • 不仅仅是数据
  • 没有固定的查询语言
  • 键值对存储,列存储、文档存储、图形数据库
  • 最终一致性
  • CAP定理和BASE(异地多活)
  • 高性能、高可用、高可扩

3、3V+3高

大数据时代的3V:主要是描述问题的
1、海量的Volume
2、多样的Variety
3、实时的Velocity
大数据时代的3高:主要是对程序的要求
1、高并发
2、高可拓
3、高性能

4、Nosql的四大分类

  • KV键值对 例如:redis

  • 文档型数据库 例如:MongoDB

  • 列存储数据库 例如:Hbase

  • 图关系型数据库 例如 :Neo4j

二、Redis入门

1、概述

Redis全称remote dictionary server 远程字典服务,是一个使用C语言编写、支持网络、可基于内存亦可持久化的日志型key-Value数据库,并提供了多种语言的API。Redis 是一个开源的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动分区(Cluster)提供高可用性(high availability)。

链接: redis官网 | GitHub上的开源地址

不建议windows ,windows端的github上面已经停更很久了,redis推荐linux

image-20241106153635577

2、安装

CSDN上面有很多教程,这里不再赘述

1
2
# 阿里云服务器redis安装位置
/usr/local/bin

三、基本指令

启动redis

1
2
../redis-server redis.conf  //启动服务端
redis-cli //进入客户端

1、切换数据库

redis 默认有16个数据库

可以使用select进行切换数据库

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> select 3                 # 切换数据库

OK

127.0.0.1:6379[3]> dbsize #查看数据库所占的大小

(integer) 0

127.0.0.1:6379[3]>

2、set和get

因为是key-vakue类型的数据库。所以需要使用set和get来进行操作

1
2
3
4
5
6
7
127.0.0.1:6379[3]>set name chenning      #set key value

OK

127.0.0.1:6379[3]> get name

"chenning"

3、查看所有键值

1
2
3
127.0.0.1:6379[3]>keys *

1) "name"

4、清除

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379[3]>flushdb               # 清除当前数据库的数据

OK

127.0.0.1:6379[3]> keys *

(empty array)

127.0.0.1:6379> flushall #清空所有数据库的数据

OK

补充说明:

redis是单线程的 redis是基于内存操作的,cpu不是redis的性能瓶颈 redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程 于是就用了单线程

为什么单线程还这么快?

  • 误区一 高性能的服务器一定 是多线程的

  • 误区二 多线程的一定比单线程的效率高

redis是将所有的数据放在内存中的所有说使用单线程去擦欧总就是最高的 多线程cpu会上下文切换 对于内存系统来说,没有上下文的切换效率就是最高的,在内存情况下,单线程就是效率最高的方案

四、Redis入门

1、key 相关指令

1.1 查看是否拥有key

1
2
3
4
5
6
7
127.0.0.1:6379> exists name                        #查看是否拥有 name这个键

(integer) 1 #拥有则返回1

127.0.0.1:6379> exists name1

(integer) 0 #不拥有则返回0

1.2、移动key

1
2
3
4
5
6
7
127.0.0.1:6379> move name 1                        #move key db  将key移动到对应的数据库

(integer) 1

127.0.0.1:6379> keys *

(empty array)

1.3、查看key值是否存在

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> exists name                        #查看name这个key在这里存在吗

(integer) 1 #存在

127.0.0.1:6379> exists name1

(integer) 0 #不存在


1.4、设置存活时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

127.0.0.1:6379> set name chenning

OK

127.0.0.1:6379> expire name 10 # 设置存活时间为10

(integer) 1

127.0.0.1:6379> ttl name # 查看剩余的时间

(integer) 5 #还剩5

127.0.0.1:6379> ttl name

(integer) -2 #已经过期

127.0.0.1:6379> get name

(nil) #值已经无法获取

1.5、查看value的类型

1
2
3
4
5
6
7
127.0.0.1:6379> set name chenning

OK

127.0.0.1:6379> type name #查看value的类型

string

2、String

2.1、追加

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> get name

"chenning"

127.0.0.1:6379> append name "niubi" #追加 如果没有的话就会新建

(integer) 13 #这里返回的是字符串的长度

127.0.0.1:6379> get name

"chenningniubi"

2.2、查询长度

1
2
3
127.0.0.1:6379> strlen name

(integer) 13

2.3、自增和自减

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
127.0.0.1:6379> set views 0

OK

127.0.0.1:6379> get views

"0"

127.0.0.1:6379> incr views # 增加 1

(integer) 1

127.0.0.1:6379> incr views

(integer) 2

127.0.0.1:6379> decr views # 减少 1

(integer) 1

127.0.0.1:6379> decr views

(integer) 0

增加和减少的时候是可以设置步长的

1
2
3
4
5
6
7
127.0.0.1:6379> incrby views 10        # 设置步长为10  一次增加10

(integer) 12

127.0.0.1:6379> decrby views 4 # 设置步长为4 一次减少4

(integer) 8

2.4、截取字符串

1
2
3
4
5
6
7
127.0.0.1:6379> getrange name 0 3      #截取 03 位置的字符串

"chen"

127.0.0.1:6379> getrange name 0 -1 #获取全部的额字符串

"chenningniubi"

2.5、替代字符串

1
2
3
4
5
6
7
127.0.0.1:6379> SETRANGE name 1 xx     #将1后的部分字符串替换为xx

(integer) 13

127.0.0.1:6379> get name

"cxxnningniubi" # 结果

2.6、设置有存活时间的字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
127.0.0.1:6379> SETEX yy2 30 "hello"   #setex key time string

OK

127.0.0.1:6379> ttl yy2

(integer) 23

127.0.0.1:6379> ttl yy2

(integer) 17

127.0.0.1:6379> ttl yy2

(integer) 1

127.0.0.1:6379> ttl yy2

(integer) -2 #已过期

2.7、设置不存在的key

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> SETNX mykey "db"

(integer) 1

127.0.0.1:6379> SETNX mykey "mdu" #如果key已经存在,则不会做出修改,返回0失败

(integer) 0

127.0.0.1:6379>get mykey

"db" #依旧还是原来的值

2.8、批量操作key和value

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> mset k1 v1 k2 v3 k3 v3   #使用 mset 操作,一个key后面跟随一个value值

OK

127.0.0.1:6379> mget k1 k2 k3 #批量获取key的值

1) "v1"

2) "v3"

3) "v3
1
2
3
4
127.0.0.1:6379> MSETNX k1 v1 k4 v4       #这里的 msetnx 和上面的是一样的操作,需要注意的就是,
#这里是原子性操作,要么一起成功 要么一起失败,例如这里因为设置过k1所以失败了
(integer) 0

2.9、设置对象

1
set user:1 {name:zhangsan,age:3}         #设置一个json字符串来保存对象

或者有更巧妙的设计

1
2
3
4
5
6
7
8
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2

ok

127.0.0.1:6379> mget user:1:name user:1:age

1) "zhangsan"
2) "2"

2.10、getset

1
2
3
4
5
6
7
8
127.0.0.1:6379> getset db redis    # 如果不存在,则返回 nil
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongodb #如果存在值,获取原来的值,并设置新的值
"redis"
127.0.0.1:6379> get db
"mongodb "

3、List

所有的List命令都是L开头的

3.1、push

像队列一样增加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
127.0.0.1:6379> LPUSH list one               #在左边增加one

(integer) 1

127.0.0.1:6379> LPUSH list two

(integer) 2

127.0.0.1:6379> LPUSH list three

(integer) 3

127.0.0.1:6379> LRANGE list 0 -1 #查看list全部的信息

1) "three"

2) "two"

3) "one"

127.0.0.1:6379> LRANGE list 0 1 #查看list位置 01 的信息

1) "three"

2) "two"


从右边push

1
2
3
4
5
6
7
8
9
10
11
12
13
14

127.0.0.1:6379> RPUSH list four

(integer) 4

127.0.0.1:6379> LRANGE list 0 -1

1) "three"

2) "two"

3) "one"

4) "four" #可以看到从右边push后这个four是在最后边的

3.2、 pop

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> RPOP list                       #从右边pop一个数据出来

"four"

127.0.0.1:6379> LRANGE list 0 -1

1) "three"

2) "two"

3) "one"

3.3、根据下标获取值

1
2
3
4
5
6
7
127.0.0.1:6379> LINDEX list 1

"two"

127.0.0.1:6379> LINDEX list 0

"three"

3.4、获取长度

1
2
3
127.0.0.1:6379> LLEN list

(integer) 3

3.5、删除指定的元素

1
2
3
4
5
6
7
8
9
10
11
12

127.0.0.1:6379> lrem list 1 three #从list里面删除一个three,

(integer) 1

127.0.0.1:6379> LRANGE list 0 -1 #可以看到list里面仍然还有一个three

1) "three"

2) "two"

3) "one"

3.6、修建枝叶

1
2
3
4
5
6
7
8
9
10

127.0.0.1:6379> ltrim list 1 2 #只保留list 里面 12 的元素 list是从0开始的

OK

127.0.0.1:6379> LRANGE list 0 -1

1) "two"

2) "one" #已经是修改过的了

3.7、组合命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

127.0.0.1:6379> LRANGE list 0 -1

1) "two"

2) "one"

127.0.0.1:6379> RPOPLPUSH list otherlist # 使右边的元素弹出并push到otherlist

"one"

127.0.0.1:6379> LRANGE list 0 -1

1) "two"

127.0.0.1:6379> LRANGE otherlist 0 -1

1) "one"

3.8、Lset更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> LSET list 1 item        #将list中的 1 号元素改为item

(error) ERR index out of range #不存在则会报错

127.0.0.1:6379> lset list 0 item

OK

127.0.0.1:6379> LRANGE list 0 -1

1) "item" #已经成功的变成了item

#相当于是一个更新操作,如果不存在则会报错

3.9、insert插入操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
127.0.0.1:6379> RPUSH mylist hello

(integer) 1

127.0.0.1:6379> RPUSH mylist world

(integer) 2

127.0.0.1:6379> LINSERT mylist before world niubi #往world前面插入niubi

(integer) 3

127.0.0.1:6379> LRANGE mylist 0 -1

1) "hello"

2) "niubi"

3) "world"

127.0.0.1:6379> LINSERT mylist after world gan #往world后面插入gan

(integer) 4

127.0.0.1:6379> LRANGE mylist 0 -1

1) "hello"

2) "niubi"

3) "world"

4) "gan"

4、Set

set命令都是S开头,存储的元素不可以重复

4.1、添加元素和查看元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
127.0.0.1:6379> sadd myset "hello"        # 往set里面添加元素

(integer) 1

127.0.0.1:6379> sadd myset "world"

(integer) 1

127.0.0.1:6379> SMEMBERS myset #查看所有的元素

1) "world"

2) "hello"

127.0.0.1:6379> SISMEMBER myset hello #查看该元素是否在set中

(integer) 1 #含有该元素

4.2、查询set的长度

1
2
3
127.0.0.1:6379> SCARD myset

(integer) 2 #含有两个元素

4.3、移除元素

1
2
3
4
5
6
7
8
127.0.0.1:6379> SREM myset hello          #remove指定元素

(integer) 1

127.0.0.1:6379> SMEMBERS myset

1) "world"

4.4、抽随机

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> SRANDMEMBER myset         # 从set里面随机抽取一个元素出来

"world"

127.0.0.1:6379> SRANDMEMBER myset

"william"

127.0.0.1:6379> SRANDMEMBER myset

"world"

4.5、随机移除(弹出)一个元素

1
2
3
4
5
6
7
127.0.0.1:6379> SPOP myset                #经典的pop   

"world"

127.0.0.1:6379> SPOP myset

"hello"

4.6、将一个元素转移到另外一个set

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> SMOVE myset myset2 hello        # 组合命令 将hello元素从myset转移到myset2

(integer) 1

127.0.0.1:6379> SMEMBERS myset

1) "world"

2) "william"

127.0.0.1:6379> SMEMBERS myset2

1) "hello"

4.7、差,交,并集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

127.0.0.1:6379> sadd myset a

(integer) 1

127.0.0.1:6379> sadd myset b

(integer) 1

127.0.0.1:6379> sadd myset1 b

(integer) 1

127.0.0.1:6379> sadd myset1 c

(integer) 1

127.0.0.1:6379> SDIFF myset myset1 #查询 myset之于myset1的差集

1) "a"

127.0.0.1:6379> SINTER myset myset1 #查询 myset和myset1的交集

1) "b"

127.0.0.1:6379> SUNION myset myset1 #查询 myset和myset1的并集

1) "c"

2) "a"

3) "b"

127.0.0.1:6379> SDIFF myset1 myset #查询 myset1之于myset的差集

1) "c"

5、hash

map集合 key-map

5.1、增加和获取元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
127.0.0.1:6379> hset myhash field1 chenning          #在myhash中增加 field1 -- chenning 这一对键值对 

(integer) 1

127.0.0.1:6379> hget myhash field1 # 在myhash中查询fields所对应的值

"chenning"

127.0.0.1:6379> hmset myhash field1 hello field2 world #批量设置myhash 中的key--value

OK

127.0.0.1:6379> hmget myhash field1 field2 #批量获取 myhash 中的key所对应的value

1) "hello"

2) "world"

127.0.0.1:6379> hgetall myhash #获取所有的key和value

1) "field1"

2) "hello"

3) "field2"

4) "world"

5.2、删除元素

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> hdel myhash field1                    #删除 myhash 中该 key 所对应的 value

(integer) 1

127.0.0.1:6379> hgetall myhash

1) "field2"

2) "world"

5.3、查询hash的长度

1
2
3
127.0.0.1:6379> hlen myhash

(integer) 1

5.4、查询是否存在对应的key

1
2
3
4
5
6
7
127.0.0.1:6379> HEXISTS myhash field1                #经典的 exists 命令

(integer) 0

127.0.0.1:6379> HEXISTS myhash field2

(integer) 1

5.5、查询所有的key和value

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> HKEYS myhash                        # 查询所有的key值

1) "field2"

2) "field1"

127.0.0.1:6379> hvals myhash # 查询所有的value值

1) "world"

2) "chenning"

5.6、自增和自减

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> hset myhash field3 5

(integer) 1

127.0.0.1:6379> HINCRBY myhash field3 1 #incrby 命令

(integer) 6

127.0.0.1:6379> HINCRBY myhash field3 -1 #增加负值

(integer) 5

5.7、setnx

1
2
3
4
5
6
7
8

127.0.0.1:6379> HSETNX myhash field4 hello #和之前一样,不存在才可以设置,若存在则不可以使用该命令更改

(integer) 1

127.0.0.1:6379> HSETNX myhash field4 world

(integer) 0

hash适合变更的数据user、name、age 尤其是用户信息之类的经常变动的信息,hash更适合对象的存储,String更适合字符串的存储

6、Zset 有序集合

相对于set 就是增加了排序的功能,其他功能set有的,他都有

6.1、增加和查看元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

127.0.0.1:6379> ZADD myset 1 one

(integer) 1

127.0.0.1:6379> ZADD myset 2 two 3 three

(integer) 2

127.0.0.1:6379> ZRANGE myset 0 -1

1) "one"

2) "two"

3) "three"

6.2、排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
127.0.0.1:6379> ZADD salary 2500 xiaohong 

(integer) 1

127.0.0.1:6379> ZADD salary 5000 zhangsan

(integer) 1

127.0.0.1:6379> ZADD salary 500 chenning

(integer) 1

127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf #按照从小到大的顺序排序 -inf 负无穷 +inf 正无穷

1) "chenning"

2) "xiaohong"

3) "zhangsan"

排序的其他操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

127.0.0.1:6379> ZADD salary 2500 xiaohong

(integer) 1

127.0.0.1:6379> ZADD salary 5000 zhangsan

(integer) 1

127.0.0.1:6379> ZADD salary 500 chenning

(integer) 1

127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf

1) "chenning"

2) "xiaohong"

3) "zhangsan"

127.0.0.1:6379> ZRANGEBYSCORE salary 0 -1

(empty array)

127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 withscores #显示从负无穷到2500的元素,并把数据显示出来

1) "chenning"

2) "500"

3) "xiaohong"

4) "2500"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> ZREVRANGE salary 0 -1

1) "zhangsan"

2) "chenning"

127.0.0.1:6379> ZREVRANGE salary 0 -1 withscores #使用 revrange 进行倒序排序

1) "zhangsan"

2) "5000"

3) "chenning"

4) "500"

6.3、移除元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

127.0.0.1:6379> ZRANGE salary 0 -1

1) "chenning"

2) "xiaohong"

3) "zhangsan"

127.0.0.1:6379> zrem salary xiaohong # remove 这个元素

(integer) 1

127.0.0.1:6379> ZRANGE salary 0 -1

1) "chenning"

2) "zhangsan"

6.4、获取元素个数

1
2
3
127.0.0.1:6379> zcard salary

(integer) 2 获取个数

五、三大特殊数据类型

1、geospatial

1.1、 getadd

添加国家城市的经纬度信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
127.0.0.1:6379> GEOADD china:city 116.40 39.90 beijing

(integer) 1

127.0.0.1:6379> GEOADD china:city 121.47 31.23 shanghai

(integer) 1

127.0.0.1:6379> GEOADD china:city 106.50 29.53 chongqing

(integer) 1

127.0.0.1:6379> GEOADD china:city 114.05 22.52 shenzheng

(integer) 1

127.0.0.1:6379> GEOADD china:city 120.16 30.24 hangzhou

(integer) 1

127.0.0.1:6379> GEOADD china:city 108.96 34.26 xian

(integer) 1

注意事项

  • 两级无法直接添加,一般可以可以通过Java一键导入所有城市的信息
  • 有效的经度从-180度到180度
  • 有效的维度信息从-85.05112878到85.05112878度
  • 当坐标位置超出上述的指定的范围的时候,该命令回返回一个错误

1.2、getpos

获取城市的位置信息

1
2
3
4
5
6
7
8
9
10

127.0.0.1:6379> GEOPOS china:city beijing chongqing

1) 1) "116.39999896287918091"

2) "39.90000009167092543"

2) 1) "106.49999767541885376"

2) "29.52999957900659211"

1.3、geodist

获取两点之间的距离信息

1
2
3
4
5
6
7
127.0.0.1:6379> GEODIST china:city beijing shanghai km       # 这后面的km是可以换的, 比如 m 等都可以

"1067.3788"

127.0.0.1:6379> GEODIST china:city beijing chongqing km

"1464.0708"

1.4、georadius

查询指定范围内的城市信息

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km          #查询以经纬度110 30为圆心周围1000km的城市位置信息 

1) "chongqing"

2) "xian"

3) "shenzheng"

4) "hangzhou"

其他的附带参数

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withcoord     #跟上面的一样,但是带经纬度的参数

1) 1) "chongqing"

2) 1) "106.49999767541885376"

2) "29.52999957900659211"

2) 1) "xian"

2) 1) "108.96000176668167114"

2) "34.25999964418929977"
1
2
3
4
5
6
7
8
9
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withdist    #带距离指定位置的距离参数

1) 1) "chongqing"

2) "341.9374"

2) 1) "xian"

2) "483.8340"
1
2
3
4
5
6
7
8
9
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withdist withcoord count 1   #带只查询一个的指令参数

1) 1) "chongqing"

2) "341.9374"

3) 1) "106.49999767541885376"

2) "29.52999957900659211"

通过指定元素来查找元素

1
2
3
4
5
127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km      #距离北京1000km的所有元素

1) "beijing"

2) "xian"

1.5、将经纬度转化

1
2
3
4
5
127.0.0.1:6379> GEOHASH china:city beijing chongqing       #二维的经纬度转换为一位的字符串 字符串长得越想就越接近

1) "wx4fbxxfke0"

2) "wm5xzrybty0"

1.6、像set一样操作

因为其实本质上就是zset所有下面这些操作是都可以进行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
127.0.0.1:6379> ZRANGE china:city 0 -1

1) "chongqing"

2) "xian"

3) "shenzheng"

4) "hangzhou"

5) "shanghai"

6) "beijing"

127.0.0.1:6379> ZREM china:city beijing

(integer) 1

127.0.0.1:6379> ZRANGE china:city 0 -1

1) "chongqing"

2) "xian"

3) "shenzheng"

4) "hangzhou"

5) "shanghai"

2、hyperloglog

hyperloglog主要就是关于基数的操作,而基数就是不重复的元素

优点就是占用的内存固定,2^64不同的元素的技术,只需要12kb的内存,例如在统计网站的访问的次数的时候,一个人访问多次,就可以记为一次。虽然有0.81%的错误率,但是可以忽略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
127.0.0.1:6379> PFADD mykey a b c d e f g h i j     # 添加元素

(integer) 1

127.0.0.1:6379> PFCOUNT mykey # 计数元素的个数

(integer) 10

127.0.0.1:6379> PFADD mykey2 i j z x c v b n m

(integer) 1

127.0.0.1:6379> PFMERGE mykey3 mykey mykey2 #将mykey mykey2中的元素合并到mykey3中

OK

127.0.0.1:6379> PFCOUNT mykey3 #合并后只有不重复的元素,所有这里只有15个元素

(integer) 15

3、bitmaps

位存储
统计用户信息,是否活跃,是否登录,两个状态的,都可以使用Bitmaps,Bitmap 位图是数据结构,都是操作二进制来进行记录,只有0和1两个状态,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
127.0.0.1:6379> setbit sign 0 1                   #为每一位设置状态

(integer) 0

127.0.0.1:6379> setbit sign 1 0

(integer) 0

127.0.0.1:6379> setbit sign 2 0

(integer) 0

127.0.0.1:6379> setbit sign 3 1

(integer) 0

127.0.0.1:6379> setbit sign 4 1

(integer) 0

127.0.0.1:6379> setbit sign 5 0

(integer) 0

127.0.0.1:6379> setbit sign 6 0

(integer) 0

127.0.0.1:6379> getbit sign 3 #获取状态

(integer) 1

127.0.0.1:6379> getbit sign 5

(integer) 0

获取sign为1的数量

1
2
3
4
127.0.0.1:6379> BITCOUNT sign

(integer) 3

六、事务

  • Redis事务的本质就是一组命令的组合,一个事务中所有的命令都会被序列化,在事务执行的过程中,会按照顺序执行,一次性,顺序性,排他性的执行一些列的命令
  • Redis的事务没有隔离级别的概念,所有的命令在事务中,只有发起执行命令exec 才会执行
  • Redis单条命令式保留原子性,但是事务并不保留原子性,

redis的事务

  • 开启事务(multi)
  • 命令入队
  • 执行事务(exec)

1、正常的执行事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
127.0.0.1:6379> multi               #开启事务命令

OK

127.0.0.1:6379(TX)> set k1 v1

QUEUED

127.0.0.1:6379(TX)> set k2 v2

QUEUED

127.0.0.1:6379(TX)> get k2

QUEUED

127.0.0.1:6379(TX)> set k3 v3

QUEUED

127.0.0.1:6379(TX)> exec #执行事务命令

1) OK

2) OK

3) "v2"

4) OK

2、放弃事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

127.0.0.1:6379> MULTI

OK

127.0.0.1:6379(TX)> set k1 v1

QUEUED

127.0.0.1:6379(TX)> set k4 v4

QUEUED

127.0.0.1:6379(TX)> DISCARD #放弃事务命令

OK

127.0.0.1:6379> get k4

(nil)

3、执行事务时的异常

事务的异常分为两种,编译时的异常和执行时的异常,下面主要写一下两种异常的不同点

3.1、编译时异常

编译时的异常即代表你的语法可能就有问题,一旦一句有问题,整个事务都不能正常运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
127.0.0.1:6379> multi

OK

127.0.0.1:6379(TX)> set k1 v1

QUEUED

127.0.0.1:6379(TX)> set k2 v2

QUEUED

127.0.0.1:6379(TX)> getset k3

(error) ERR wrong number of arguments for 'getset' command #这里就开始提示有err

127.0.0.1:6379(TX)> set k4 v4

QUEUED

127.0.0.1:6379(TX)> exec

(error) EXECABORT Transaction discarded because of previous errors. # 导致整个事务都不能正常运行

127.0.0.1:6379> get k5

(nil)

3.2、运行时错误

运行时错误表示语法没有啥问题,是其他地方除了问题,仅仅只影响那一句话,其他的依旧可以正常的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
127.0.0.1:6379> MULTI

OK

127.0.0.1:6379(TX)> incr k1 #这里的k1对应的value是一个字符串,是不能增加 1

QUEUED

127.0.0.1:6379(TX)> set k2 v2

QUEUED

127.0.0.1:6379(TX)> set k3 v3

QUEUED

127.0.0.1:6379(TX)> get k3

QUEUED #前面的语句语法都没有任何的问题

127.0.0.1:6379(TX)> exec

1) (error) ERR value is not an integer or out of range #事务的第一句不能正常执行

2) OK

3) OK

4) "v3"

127.0.0.1:6379> get k2

"v2"

127.0.0.1:6379> get k3

"v3" #事务的其他语句是可以正常运行的

七、Redis实现乐观锁

悲观锁

认为什么时候都会出错,无论做什么都会枷锁,效率低下

乐观锁

-认为什么时候都不会出问题,所以不会上锁,更新数据的时候去判断一下,在此期间是否有人去修改过这个数据

  • 获取version
  • 更新的时候比较version

单线程的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
127.0.0.1:6379> set money 100

OK

127.0.0.1:6379> set out 0

OK

127.0.0.1:6379> watch money # 加锁

OK

127.0.0.1:6379> multi

OK

127.0.0.1:6379(TX)> DECRBY money 20

QUEUED

127.0.0.1:6379(TX)> INCRBY out 20

QUEUED

127.0.0.1:6379(TX)> exec

1) (integer) 80

2) (integer) 20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
127.0.0.1:6379> watch money

OK

127.0.0.1:6379> multi

OK

127.0.0.1:6379(TX)> DECRBY money 10

QUEUED

127.0.0.1:6379(TX)> INCRBY out 10

QUEUED

127.0.0.1:6379(TX)> exec # 在执行这个事务前,有其他的线程改变了这个值,则会导致执行失败

(nil)

若执行失败,则正确操作为:
解锁

八、Jedis

就是使用Java操作redis

1、简单的使用一下Jedis

首先导入包

image-20220414100910101

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.2.2</version>
</dependency>

image-20220414100943921

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>

简单的使用

1
2
3
4
public void static main(String[] args){}
jedis jedis = new Jedis("127.0.0.1");//使用对象来操作
Sysrtem.out.printn(jedis.ping);//测试链接
}

2、常用的api

String

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

public class TestString {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);

jedis.flushDB();
System.out.println("===========增加数据===========");
System.out.println(jedis.set("key1","value1"));
System.out.println(jedis.set("key2","value2"));
System.out.println(jedis.set("key3", "value3"));
System.out.println("删除键key2:"+jedis.del("key2"));
System.out.println("获取键key2:"+jedis.get("key2"));
System.out.println("修改key1:"+jedis.set("key1", "value1Changed"));
System.out.println("获取key1的值:"+jedis.get("key1"));
System.out.println("在key3后面加入值:"+jedis.append("key3", "End"));
System.out.println("key3的值:"+jedis.get("key3"));
System.out.println("增加多个键值对:"+jedis.mset("key01","value01","key02","value02","key03","value03"));
System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03"));
System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03","key04"));
System.out.println("删除多个键值对:"+jedis.del("key01","key02"));
System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03"));

jedis.flushDB();
System.out.println("===========新增键值对防止覆盖原先值==============");
System.out.println(jedis.setnx("key1", "value1"));
System.out.println(jedis.setnx("key2", "value2"));
System.out.println(jedis.setnx("key2", "value2-new"));
System.out.println(jedis.get("key1"));
System.out.println(jedis.get("key2"));

System.out.println("===========新增键值对并设置有效时间=============");
System.out.println(jedis.setex("key3", 2, "value3"));
System.out.println(jedis.get("key3"));
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(jedis.get("key3"));

System.out.println("===========获取原值,更新为新值==========");
System.out.println(jedis.getSet("key2", "key2GetSet"));
System.out.println(jedis.get("key2"));

System.out.println("获得key2的值的字串:"+jedis.getrange("key2", 2, 4));
}
}

List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import redis.clients.jedis.Jedis;

public class TestList {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("===========添加一个list===========");
jedis.lpush("collections", "ArrayList", "Vector", "Stack", "HashMap", "WeakHashMap", "LinkedHashMap");
jedis.lpush("collections", "HashSet");
jedis.lpush("collections", "TreeSet");
jedis.lpush("collections", "TreeMap");
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));//-1代表倒数第一个元素,-2代表倒数第二个元素,end为-1表示查询全部
System.out.println("collections区间0-3的元素:"+jedis.lrange("collections",0,3));
System.out.println("===============================");
// 删除列表指定的值 ,第二个参数为删除的个数(有重复时),后add进去的值先被删,类似于出栈
System.out.println("删除指定元素个数:"+jedis.lrem("collections", 2, "HashMap"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("删除下表0-3区间之外的元素:"+jedis.ltrim("collections", 0, 3));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections列表出栈(左端):"+jedis.lpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections添加元素,从列表右端,与lpush相对应:"+jedis.rpush("collections", "EnumMap"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections列表出栈(右端):"+jedis.rpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("修改collections指定下标1的内容:"+jedis.lset("collections", 1, "LinkedArrayList"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("===============================");
System.out.println("collections的长度:"+jedis.llen("collections"));
System.out.println("获取collections下标为2的元素:"+jedis.lindex("collections", 2));
System.out.println("===============================");
jedis.lpush("sortedList", "3","6","2","0","7","4");
System.out.println("sortedList排序前:"+jedis.lrange("sortedList", 0, -1));
System.out.println(jedis.sort("sortedList"));
System.out.println("sortedList排序后:"+jedis.lrange("sortedList", 0, -1));
}
}

Set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import redis.clients.jedis.Jedis;

public class TestSet {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("============向集合中添加元素(不重复)============");
System.out.println(jedis.sadd("eleSet", "e1","e2","e4","e3","e0","e8","e7","e5"));
System.out.println(jedis.sadd("eleSet", "e6"));
System.out.println(jedis.sadd("eleSet", "e6"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("删除一个元素e0:"+jedis.srem("eleSet", "e0"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("删除两个元素e7和e6:"+jedis.srem("eleSet", "e7","e6"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));
System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("eleSet中包含元素的个数:"+jedis.scard("eleSet"));
System.out.println("e3是否在eleSet中:"+jedis.sismember("eleSet", "e3"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e1"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e5"));
System.out.println("=================================");
System.out.println(jedis.sadd("eleSet1", "e1","e2","e4","e3","e0","e8","e7","e5"));
System.out.println(jedis.sadd("eleSet2", "e1","e2","e4","e3","e0","e8"));
System.out.println("将eleSet1中删除e1并存入eleSet3中:"+jedis.smove("eleSet1", "eleSet3", "e1"));//移到集合元素
System.out.println("将eleSet1中删除e2并存入eleSet3中:"+jedis.smove("eleSet1", "eleSet3", "e2"));
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet3中的元素:"+jedis.smembers("eleSet3"));
System.out.println("============集合运算=================");
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet2中的元素:"+jedis.smembers("eleSet2"));
System.out.println("eleSet1和eleSet2的交集:"+jedis.sinter("eleSet1","eleSet2"));
System.out.println("eleSet1和eleSet2的并集:"+jedis.sunion("eleSet1","eleSet2"));
System.out.println("eleSet1和eleSet2的差集:"+jedis.sdiff("eleSet1","eleSet2"));//eleSet1中有,eleSet2中没有
jedis.sinterstore("eleSet4","eleSet1","eleSet2");//求交集并将交集保存到dstkey的集合
System.out.println("eleSet4中的元素:"+jedis.smembers("eleSet4"));
}
}

Hash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import redis.clients.jedis.Jedis;

import java.util.HashMap;
import java.util.Map;

public class TestHash {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
Map<String,String> map = new HashMap<String,String>();
map.put("key1","value1");
map.put("key2","value2");
map.put("key3","value3");
map.put("key4","value4");
//添加名称为hash(key)的hash元素
jedis.hmset("hash",map);
//向名称为hash的hash中添加key为key5,value为value5元素
jedis.hset("hash", "key5", "value5");
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));//return Map<String,String>
System.out.println("散列hash的所有键为:"+jedis.hkeys("hash"));//return Set<String>
System.out.println("散列hash的所有值为:"+jedis.hvals("hash"));//return List<String>
System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:"+jedis.hincrBy("hash", "key6", 6));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:"+jedis.hincrBy("hash", "key6", 3));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("删除一个或者多个键值对:"+jedis.hdel("hash", "key2"));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("散列hash中键值对的个数:"+jedis.hlen("hash"));
System.out.println("判断hash中是否存在key2:"+jedis.hexists("hash","key2"));
System.out.println("判断hash中是否存在key3:"+jedis.hexists("hash","key3"));
System.out.println("获取hash中的值:"+jedis.hmget("hash","key3"));
System.out.println("获取hash中的值:"+jedis.hmget("hash","key3","key4"));
}
}

Key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import redis.clients.jedis.Jedis;

import java.util.Set;

public class TestKey {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);

System.out.println("清空数据:"+jedis.flushDB());
System.out.println("判断某个键是否存在:"+jedis.exists("username"));
System.out.println("新增<'username','kuangshen'>的键值对:"+jedis.set("username", "kuangshen"));
System.out.println("新增<'password','password'>的键值对:"+jedis.set("password", "password"));
System.out.print("系统中所有的键如下:");
Set<String> keys = jedis.keys("*");
System.out.println(keys);
System.out.println("删除键password:"+jedis.del("password"));
System.out.println("判断键password是否存在:"+jedis.exists("password"));
System.out.println("查看键username所存储的值的类型:"+jedis.type("username"));
System.out.println("随机返回key空间的一个:"+jedis.randomKey());
System.out.println("重命名key:"+jedis.rename("username","name"));
System.out.println("取出改后的name:"+jedis.get("name"));
System.out.println("按索引查询:"+jedis.select(0));
System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());
System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());
System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());
}
}

Password

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import redis.clients.jedis.Jedis;

public class TestPassword {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);

//验证密码,如果没有设置密码这段代码省略
// jedis.auth("password");

jedis.connect(); //连接
jedis.disconnect(); //断开连接

jedis.flushAll(); //清空所有的key
}
}

3、事务

让我们看看在boot里面如何使用事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class TestMulti {
public static void main(String[] args) {
//创建客户端连接服务端,redis服务端需要被开启
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();

JSONObject jsonObject = new JSONObject();
jsonObject.put("hello", "world");
jsonObject.put("name", "java");
//开启事务
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
try{
//向redis存入一条数据
multi.set("json", result);
//再存入一条数据
multi.set("json2", result);
//这里引发了异常,用0作为被除数
int i = 100/0;
//如果没有引发异常,执行进入队列的命令
multi.exec();
}catch(Exception e){
e.printStackTrace();
//如果出现异常,回滚
multi.discard();
}finally{
System.out.println(jedis.get("json"));
System.out.println(jedis.get("json2"));
//最终关闭客户端
jedis.close();
}
}
}

4、Springboot 整合

在spring boot2.x之后,原来使用的jedis被替换成了lettuce

  • jedis所采用的直连,多个线程操作的时候不安全,如果想要追求安全 ,可以使用jedis pool连接池

  • lettuce 采用netty 实例可以在多个线程中进行共享,不存在线程不安全的情况,更像NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
//两个泛型都是object 类,我们后使用需要强制转换
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

@Bean
@ConditionalOnMissingBean//由于String时redis最常使用的类型,所以单独提出来了一个方法
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

1、整合步骤

1.导入依赖 --上面的jedis的包

2.配置链接

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
redis:
host: 127.0.0.1
port: 6379
password:
jedis:
pool:
max-active: 8
max-wait: -1ms
max-idle: 500
min-idle: 0
lettuce:
shutdown-timeout: 0ms

3.测试

2、编写自己的 redisTemplate

这里需要注意一下要直接传输对象的话需要将对象序列化,比如下面这样讲pojo的类实现序列化的接口

1
2
3
4
public class user implements Serializer{
int age;
String name;
}

自己的template

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();

template.setConnectionFactory(factory);

//序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);

//String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();

return template;
}


}

使用工具类封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtil {

@Autowired
@Qualifier("redisTemplate")//指定那个redisTemplate,可以不写
private RedisTemplate<String, Object> redisTemplate;

// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}


/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}


// ============================String=============================

/**
* 普通缓存获取
* @param key 键
* @return
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}

/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/

public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/

public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}


/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}


// ================================Map=================================

/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}

/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}

/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}


/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}


/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}


/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}


// ============================set=============================

/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/

public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

// ===============================list=================================

/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}

}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}

}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/

public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/

public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}

}

}

测试

1
2
3
4
5
6
7
8
@Autowird
private RedisUtil redisUtil; //装配工具类

@Test
public void test1(){
redisUtil.set("name","chenning");//直接使用就行
System.out.println(redisUtil.get("name"));
}

重要的是去理解redis的思想和使用情景

九、redis.conf详解

文件的位置

默认路径 /usr/local/bin/Nconfig

image-20220414174009767

配置文件对于大小写不敏感

1、包含 INCLUDES

1
2
3
include /path/to/local.conf

include /path/to/other.conf

好比是spring里面的import

2、网络 NETWORK

1
2
3
4
5
6
7
8
9
bind 127.0.0.1 -::1


# 是否开启保护模式
protected-mode yes


# 端口设置
port 6379

3、通用配置 GENERAL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 以守护进程的方式进行
daemonize yes
# 默认是关闭的,我们这里选择开启



# 如果以后台的形式运行 需要指定一个pid文件
pidfile /var/run/redis_6379.pid



# 日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)

loglevel notice

#日志文件位置名
logfile ""

#默认库的数量是16个
databases 16

4、快照 SNAPSHOT

所谓的快照就是持久化

在持久化的过程中,redis中的数据会持久化到以 .rdb 或者 .aof 

因为redis是内存数据库,因此数据断电即失,所以需要隔一段时间就持久化一下

1
2
3
4
5
save 3600 1       # 如果3600内有1次修改数据就持久化一次

save 300 100 # 如果300内有100次修改数据就持久化一次

save 60 10000 # 如果60内有10000次修改数据就持久化一次
1
2
3
4
5
6
7
stop-writes-on-bgsave-error yes # 持久化操作错误后是否操作

rdbcompression yes # 是否压缩持久化数据

rdbchecksum yes # 持久化后是否检查

dir ./ # .rdb文件保存的目录

5、安全

限制client

1
2
3
4
5
6
7
8
# 最大的连接数
# maxclients 10000

# 控制节点能使用的最大内存
# maxmemory <bytes>

# 内存满之后的处理策略
# maxmemory-policy noeviction

有六种淘汰策略

  1. noeviction(默认策略):对于写请求不再提供服务,直接返回错误(DEL请求和部分特殊请求除外)

  2. allkeys-lru:从所有key中使用LRU算法进行淘汰

  3. volatile-lru:从设置了过期时间的key中使用LRU算法进行淘汰

  4. allkeys-random:从所有key中随机淘汰数据

  5. volatile-random:从设置了过期时间的key中随机淘汰

  6. volatile-ttl:在设置了过期时间的key中,淘汰过期时间剩余最短的

6、append only 模式 aof配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 默认使用rdb 因为完全够用 不开启aof  
appendonly no

# aof文件的名字
appendfilename "appendonly.aof"


#下面三个只选择一个开
# appendfsync always # 每次修改都会同步,但过于消耗性能

appendfsync everysec # 每秒执行一次同步,但会丢失这1s的数据

# appendfsync no # 不执行同步,操作系统自己同步数据,速度最快

十、持久化

持久化是 重点

为什么需要持久化

redis是一个内存数据库,如果不将数据库里面的数据保存到磁盘,服务一旦停止,那么数据就会全部丢失!因此redis需要通过持久化将数据都保存下来,redis的持久化有两种方式,RDB和AOF

1、RDB

RDB的全程就是redis database,它通过再指定的时间间隔内将内存种的数据集快照写入磁盘,也就是snapshot快照,他恢复时是将快照文件直接读取到内存里面(感觉像是我以前写的租车管理系统😐),Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件种,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程时是不进行任何IO操作的。这就确保了极高的性能。如果要进行大规模数据的恢复,且对于数据恢复的完整性不是特别的敏感,那么RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化的数据可能会丢失,Redis默认的就是RDB,一般需要修改。

1.1 触发机制

RDB的触发机制有下面三种

  1. save规则满足(save的配置上一节有)

  2. 执行flushAll 命令

  3. 退出redis

1.2 如何恢复

这个比较容易,只需要将rdb文件放在我们的启动目录下就行,redis启动的时候会自动检查dump.rdb恢复其中的数据

1
2
3
127.0.0.1:6379> config get dir  #输入这个指令就可以得到启动目录,将rdb放里面就行
1) "dir"
2) "/usr/local/bin"

1.3 优缺点

优点

  1. 适合大规模的数据恢复

  2. 对数据的完整性要不高

缺点

  1. 需要一定的时间间隔进程操作,如果redis意外宕机,最后一次的数据就无了

  2. fork进程的时候,会占用一定的内容空间

2、AOF

AOF全称appendonly file,只追加保存,默认是不保存的,需要手动配置然后 重启redis就行了

1
appendonly no # 把这里改成yes就行

2.1 AOF文件受损恢复

redis提供了一个修复aof的工具 “redis-check-aof --fix”

1
redis-check-aof --fix appendonly.aof     # 输入这个指令进行AOf修复

修复截图

修复之后可能会损失一些数据,但总比全部丢了好

2.2 AOF的rewrite

aof默认就是文件无限制的追加,因此如果一个文件太大之后,AOF会fork一个新的进程进行重写

1
2
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

2.2 优点以及缺点

1
2
3
4
5
6
appendoly no   # 默认不开启
appendfilename "appendoly.aof" #aof文件的名字

appendfsync always # 每次修改都执行同步操作,消耗性能
appendfsync everysec # 每秒执行一次,可能会丢失那一秒的数据
appendfsync no # 不执行同步,操作系统进行同步,速度最快

优点

  1. 每一次修改都同步,文件完整性会更加的好
  2. 每秒同步一次,可能会丢失一秒的数据
  3. 从来不同步,速度最快

缺点

  1. 相对于数据文件来说,aof远远大于rdb,修复的速度也比rdb慢

  2. AOF运行效率也要比rdb慢,所以redis默认的配置就是rdb持久化

3、总结以及拓展

  1. RDB持久化方式能够再指定的时间间隔内对你的数据进行快照存储
  2. AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来会恢复原始的数据,AOF命令以Redis协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的额体积不至于太大
  3. 只做缓存,如果你只希望你的数据再服务器运行的时候存在,你也可以不使用持久化
  4. 同时开启两种持久化模式
    • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常的情况下AOF文件保存的数据集要比RDB文件保存的数据集更加的完整
    • RDB的数据不实时,同时使用两种服务器重启也只会找AOF文件,那要不要只使用AOF呢🙌?答案是建议不要,因为RDB更加的适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的BUG,作为一个留着以防万一的手段
  5. 性能建议
    • 因为RDB文件只用作后备用途,建议旨在Slave(附机)持久化RDB文件,而且只要15分钟备份一次就行了,只需要保留save 900 1 这条规则
    • 如果Enable AOF,好处是在最恶劣情况下也只会失去不超过两秒的数据,启动脚本只load自己的文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞时不可避免的,只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值太小了,可以设置到5G以上,默认超过原大小100%大小重写可以改到适当的数值
    • 如果不Enable AOF,仅靠 Master-Slave实现高可用性也可以,能够省掉一大笔的IO,也减少了rewrite时带来的系统波动,代价时如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个Master/Slave中的RDB文件,载入较新的那个,微博就是这种架构

十一、发布订阅

redis中提供了类似消息队列的功能–发布订阅,但是更加专业的功能仍然需要消息队列来做,比如Kafaka等

1. 简单的来实现一下

订阅一个频道

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> SUBSCRIBE chenning

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "chenning"

3) (integer) 1

发送端

1
2
3
4
5
127.0.0.1:6379> PUBLISH chenning "hell chenning"  # 发送一则消息

(integer) 1

127.0.0.1:6379>

再次回到刚刚订阅的那个窗口,可以看到以下信息

1
2
3
4
5
1) "message"

2) "chenning"

3) "hell chenning"

上述就是一个简单的发布订阅

2. 原理

Redis 是使用c实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加加深对 Redis 的理解。Redis 通过 PUBLISH 、 SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。

通过 SUBSCRIBE 命令订阅某频道后, redis - server 里维护了一个字典,字典的键就是一个个 channel 而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。 SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。

通过PUBLSH 命令向订订阅者发送消息, redis-server 会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户的链表,遍历这个链表,将消息发布给所有订阅者。

Pub / Sub 从字面上理解就是发布( Publish )与订阅( Subscribe ),在 Redis 中,你可以设定对某一个 key 值进行消息发布及消息订阅,当一个 key 值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。

十二、主从复制

1. 概念

主从复制,是指将一台 Redis 服务器的数据,复制到其他的 Redis 服务器。前者称为主节点( master / leader ),后者称为从节点( slave / folower );数据的复制是单向的,只能由主节点到从节点。 Master 以写为主, Slave 以读为主。

默认情况下,每台 Redis 服务器都是王节点;且一个王节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的作用主要包括:

1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写 Redis 数据时应用连接主节点,读 Redis 数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高 Redis 服务器的并发量。
4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是 Redis 高可用的基础。

一般来说,要将 Redis 运用于工程项目中,只使用一台 Redis 是万万不能的,原因如下:

1、从结构上,单个 Redis 服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
2、从容量上,单个 Redis 服务器内存容量有限,就算一台 Redis 服务器内存容量为256G,也不能将所有内存用作 Redis 存储内存,
一般来说,单台 Redis 最大使用内存不应该超过20G。

2. 环境配置

只配置从库 不配置主库,因为默认自己为主库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
127.0.0.1:6379> info replication

# Replication

role:master # 可以看到这里默认自己是主机

connected_slaves:0

master_failover_state:no-failover

master_replid:4fbf9a6eb1fa637393710a004aa2839976897b2c

master_replid2:0000000000000000000000000000000000000000

master_repl_offset:0

second_repl_offset:-1

repl_backlog_active:0

repl_backlog_size:1048576

repl_backlog_first_byte_offset:0

repl_backlog_histlen:0

接下来我们使用三个端口启动三个redis服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[root@Mercury Nconfig]# cp redis.conf redis79.conf 

[root@Mercury Nconfig]# cp redis.conf redis80.conf

[root@Mercury Nconfig]# cp redis.conf redis81.conf # 复制了三份配置文件

[root@Mercury Nconfig]# ll

total 368

-rw-r--r-- 1 root root 93725 Apr 15 18:56 redis79.conf

-rw-r--r-- 1 root root 93725 Apr 15 18:56 redis80.conf

-rw-r--r-- 1 root root 93725 Apr 15 18:56 redis81.conf

-rw-r--r-- 1 root root 93725 Apr 14 17:22 redis.conf

# 为保证不重名,配置文件里需要修改 端口 pid名字 log文件名字 dump.rdb名字

[root@Mercury bin]# redis-server Nconfig/redis79.conf
[root@Mercury bin]# redis-server Nconfig/redis80.conf
[root@Mercury bin]# redis-server Nconfig/redis81.conf #分别指定配置文件进行启动


[root@Mercury bin]# ps -ef|grep redis

root 6093 1 0 19:05 ? 00:00:00 redis-server 127.0.0.1:6380

root 6108 1 0 19:05 ? 00:00:00 redis-server 127.0.0.1:6381

root 6277 19173 0 19:06 pts/1 00:00:00 grep --color=auto redis

root 27191 1 0 Apr14 ? 00:02:28 redis-server 127.0.0.1:6379 # 可以看到已经分别启动了

3. 一主二从

所谓一主二从就是一台主机两台从机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379          # 通过这个命令将6380设为6379的从机

OK

127.0.0.1:6380> info replication

# Replication

role:slave # 可以看到已经变为从机

master_host:127.0.0.1

master_port:6379

master_link_status:up

master_last_io_seconds_ago:5

master_sync_in_progress:0

slave_read_repl_offset:14

slave_repl_offset:14

slave_priority:100

slave_read_only:1

replica_announced:1

connected_slaves:0

master_failover_state:no-failover

master_replid:1f474dfc54aedd694b971cefafd50927b55f97a4

master_replid2:0000000000000000000000000000000000000000

master_repl_offset:14

second_repl_offset:-1

repl_backlog_active:1

repl_backlog_size:1048576

repl_backlog_first_byte_offset:1

repl_backlog_histlen:14

再次查看6379

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[root@Mercury bin]# redis-cli -p 6379

127.0.0.1:6379> info replication

# Replication

role:master

connected_slaves:1 # 显示已连接从机数量

slave0:ip=127.0.0.1,port=6380,state=online,offset=84,lag=1 # 显示从机状态

master_failover_state:no-failover

master_replid:1f474dfc54aedd694b971cefafd50927b55f97a4

master_replid2:0000000000000000000000000000000000000000

master_repl_offset:84

second_repl_offset:-1

repl_backlog_active:1

repl_backlog_size:1048576

repl_backlog_first_byte_offset:1

repl_backlog_histlen:84

以同样的步骤连接上6381

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
127.0.0.1:6379> info replication

# Replication

role:master

connected_slaves:2 # 两个从机

slave0:ip=127.0.0.1,port=6380,state=online,offset=280,lag=0

slave1:ip=127.0.0.1,port=6381,state=online,offset=280,lag=1

master_failover_state:no-failover

master_replid:1f474dfc54aedd694b971cefafd50927b55f97a4

master_replid2:0000000000000000000000000000000000000000

master_repl_offset:280

second_repl_offset:-1

repl_backlog_active:1

repl_backlog_size:1048576

repl_backlog_first_byte_offset:1

repl_backlog_histlen:280


因为这里我们是通过指令操作的,所以关闭服务后就不再是从机了,而是会变回主机,如果想一直是从机,我们可以在配置文件里面修改

配置文件里面

1
replicaof   <masterip>   <masterport>

3.1 细节了解

主机可以设置值,从机不能写只能读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
127.0.0.1:6379> set k1 "v1"

OK

127.0.0.1:6379> get k1

"v1"

# 主机设置 key-value



127.0.0.1:6380> get k1

"v1"

# 从机可以拿到


127.0.0.1:6380> set k2 "v2"

(error) READONLY You can't write against a read only replica.

# 从机不可以写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 主机断开

127.0.0.1:6379> SHUTDOWN

not connected>



127.0.0.1:6380> info replication

# Replication

role:slave # 依然是从机

master_host:127.0.0.1

master_port:6379

master_link_status:down

master_last_io_seconds_ago:-1

master_sync_in_progress:0

slave_read_repl_offset:962

slave_repl_offset:962

master_link_down_since_seconds:33

slave_priority:100

slave_read_only:1

replica_announced:1

connected_slaves:0

master_failover_state:no-failover

master_replid:1f474dfc54aedd694b971cefafd50927b55f97a4

master_replid2:0000000000000000000000000000000000000000

master_repl_offset:962

second_repl_offset:-1

repl_backlog_active:1

repl_backlog_size:1048576

repl_backlog_first_byte_offset:1

repl_backlog_histlen:962
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 主机重新启动

[root@Mercury bin]# redis-server Nconfig/redis79.conf

[root@Mercury bin]# redis-cli -p 6379

127.0.0.1:6379> set k2 "v2"

OK

#再次设置值

127.0.0.1:6380> get k2

"v2"

# 依然可以拿到

主机断开连接,从机依然连接到主机的,但是没有写操作,这个时候,主机如果回来了,从机依旧可以直接获取到主机写的信息,但如果是使用命令行来配置的从机,重启后就会变回主机,不再和原来的主机同步,无法获取之后写入的信息,但只要再变回从机,立刻就可以从主机中获取值。

4. 复制原理

Slave 启动成功连接到 master 后会发送一个 sync 命令

Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后, master 将传送整个数据文件到 slave ,并完成一次完全同步。

全量复制:而 slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制: Master 继续将新的所有收集到的修改命令依次传给 slave ,完成同步但是只要是重新连接 master ,一次完全同步(全量复制)将被自动执行🚗

5. 层层链路

使用下面的方式将三个 redis 服务连接起来

1
2
3
79-----------80---------81 

M M S S

此时的80端口依旧是从节点,输入命令可以看到依然显示为Salve

一般层层链路不常用

6. 谋朝篡位 🤪

如果主机断开了连接,我们可以使用SLAVEOF no one命令使自己变成主机,其他主机可以手动连接到这个主节点

十三、哨兵模式

1. 概念

简单的来说,哨兵模式就是自动选举主机的模式。一般情况下,当主服务器宕机后,需要手动把一台服务器切换为主服务器,但是手动对于程序员来讲还是太麻烦了🙌,所以Redis从2.8正式提供了Sentinel哨兵模式来解决这个问题

哨兵模式能够在后台自动监测主机是否故障,如果故障了将根据投票数自动将从机转换为主机,哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程。原理就是哨兵通过发送命令,等待服务器响应,从而监控多个redis实例

哨兵示意图

过程:假设主服务器宕机(比如断电啥的),哨兵会检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象称为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

2. 实际测试一波

简单编辑配置文件

1
2
3
4
5
6
[root@Mercury Nconfig]# vim sentinel.conf

sentinel monitor myredis 127.0.0.1 6379 1

# 后面这个数字1代表有多少个哨兵认为这个主机挂了就执行换主机的操作

启动哨兵服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 启动命令
[root@Mercury bin]# redis-sentinel Nconfig/sentinel.conf
32048:X 17 Apr 2022 18:15:13.108 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
32048:X 17 Apr 2022 18:15:13.108 # Redis version=6.2.6, bits=64, commit=00000000, modified=0, pid=32048, just started
32048:X 17 Apr 2022 18:15:13.108 # Configuration loaded
32048:X 17 Apr 2022 18:15:13.109 * monotonic clock: POSIX clock_gettime
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 6.2.6 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 26379
| `-._ `._ / _.-' | PID: 32048
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | https://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'

32048:X 17 Apr 2022 18:15:13.110 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
32048:X 17 Apr 2022 18:15:13.114 # Sentinel ID is b7a639b54bcccf0219a5c5390726703b88a2d628
32048:X 17 Apr 2022 18:15:13.114 # +monitor master myredis 127.0.0.1 6379 quorum 1
32048:X 17 Apr 2022 18:15:13.115 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379

然后我们关闭6379这个主机

1
2
127.0.0.1:6379> SHUTDOWN
not connected>

等待一段时间后可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
32048:X 17 Apr 2022 18:21:49.348 # +sdown master myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.348 # +odown master myredis 127.0.0.1 6379 #quorum 1/1 #满足了条件

32048:X 17 Apr 2022 18:21:49.348 # +new-epoch 1

32048:X 17 Apr 2022 18:21:49.348 # +try-failover master myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.351 # +vote-for-leader b7a639b54bcccf0219a5c5390726703b88a2d628 1

32048:X 17 Apr 2022 18:21:49.351 # +elected-leader master myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.351 # +failover-state-select-slave master myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.442 # +selected-slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.442 * +failover-state-send-slaveof-noone slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.508 * +failover-state-wait-promotion slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.716 # +promoted-slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.716 # +failover-state-reconf-slaves master myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.789 # +failover-end master myredis 127.0.0.1 6379

32048:X 17 Apr 2022 18:21:49.789 # +switch-master myredis 127.0.0.1 6379 127.0.0.1 6380 # 6380成为了主机

32048:X 17 Apr 2022 18:21:49.790 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6380

32048:X 17 Apr 2022 18:21:50.420 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6380

32048:X 17 Apr 2022 18:22:19.834 # +sdown slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6380

我们再来看看6380的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
127.0.0.1:6380> info replication

# Replication

role:master # 很明显这里已经成为了主机

connected_slaves:1

slave0:ip=127.0.0.1,port=6381,state=online,offset=265004,lag=1

master_failover_state:no-failover

master_replid:4e0b279d0bae3d59691444c3d8a264de3859340b

master_replid2:9ea51c7e5cefa6a5b8e281bb4fa433fb34f9292d

master_repl_offset:265136

second_repl_offset:258784

repl_backlog_active:1

repl_backlog_size:1048576

repl_backlog_first_byte_offset:1

repl_backlog_histlen:265136

以上就是哨兵模式的实际测试,如果主机此时回来了,只能归到新的主机下

3. 优缺点

优点

  1. 哨兵集群,基于主从复制模式,所有的主从配置的优点,它全都有
  2. 主从是可以切换的,故障可以转移,系统的可用性会更好
  3. 哨兵模式就是主从模式的升级,手动到自动

缺点

  1. Redis不好在线扩容,集群的数量一旦达到上限,在线扩容就十分的麻烦
  2. 实现哨兵模式的配置比较麻烦,里面有很多选择

哨兵模式的全部配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Example sentinel.conf

# 哨兵sentinel实例运行的端口 默认26379

port 26379



# 哨兵sentinel的工作目录

dir /tmp



# 哨兵sentinel监控的redis主节点的 ip port

# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。

# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了

# sentinel monitor <master-name> <ip> <redis-port> <quorum>

sentinel monitor mymaster 127.0.0.1 6379 2



# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码

# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码

# sentinel auth-pass <master-name> <password>

sentinel auth-pass mymaster MySUPER--secret-0123passw0rd



# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒

# sentinel down-after-milliseconds <master-name> <milliseconds>

sentinel down-after-milliseconds mymaster 30000



# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,这个数字越小,完成failover所需的时间就越长,但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。

# sentinel parallel-syncs <master-name> <numslaves>

sentinel parallel-syncs mymaster 1



# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:

#1. 同一个sentinel对同一个master两次failover之间的间隔时间。

#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。

#3.当想要取消一个正在进行的failover所需要的时间。

#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了

# 默认三分钟

# sentinel failover-timeout <master-name> <milliseconds>

sentinel failover-timeout mymaster 180000



# SCRIPTS EXECUTION

#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。

#对于脚本的运行结果有以下规则:

#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10

#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。

#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。

#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。

#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。



#通知脚本

# shell编程

# sentinel notification-script <master-name> <script-path>

sentinel notification-script mymaster /var/redis/notify.sh



# 客户端重新配置主节点参数脚本

# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。

# 以下参数将会在调用脚本时传给脚本:

# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>

# 目前<state>总是“failover”,

# <role>是“leader”或者“observer”中的一个。

# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的

# 这个脚本应该是通用的,能被多次调用,不是针对性的。

# sentinel client-reconfig-script <master-name> <script-path>

sentinel client-reconfig-script mymaster /var/redis/reconfig.sh # 一般都是由运维来配置!


十四、缓存穿透和雪崩

1. 缓存穿透

1.1 概念

缓存穿透的概念很简单,用户想要查询一个数据,发现 redis 内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中(秒杀!),于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

1.2 解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能査询的参数以 hash 形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;

布隆过滤器

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时设置一个过期时间,之后再访问这个数据会从缓存中获取,保护了后端数据源

缓存空对象

但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会
有影响。

2. 缓存击穿

2.1 概述

这里需要注意和缓存击穿的区别,缓存击穿,是指一个 key 非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个 key 在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。当某个 key 在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

2.2 解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。

加互斥锁

分布式锁:使用分布式锁,保证对于每个 key 同时只有一个线程去査询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

3. 缓存雪崩

3.1 概念

缓存雪崩,是指在某一个时间段,缓存集中过期失效。 Redis 宕机!发生雪崩的原因之一,比如在双十一零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知知的,很有可能瞬间就把数据库压垮。

3.2 解决方案

redis 高可用

这个思想的含义是,既然 redis 有可能挂掉,那我多增设几台 redis ,这样一台挂掉之后其他的还可以继续工作,实就是搭建的集群。(异地多活!)

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个 key 只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的 key ,设置不同的过期时间,让缓存失效的时间点尽量均匀。


Redis
http://example.com/2022/03/07/Redis/
作者
Mercury
发布于
2022年3月7日
许可协议