redis 缓存穿透,击穿,雪崩,redis面试题
I .配置文件redis.conf
1.1 Units单位,配置大小单位,开头定义一些基本的计量单位,只支持字节,不支持比特。
不区分大小写。
1.2包含,在多实例的情况下可以提取通用配置文件。
1.3网络网络相关配置。
1)绑定
在默认情况下,bind=127.0.0.1只能接受来自这台机器的访问请求。
如果不写,可以不受限制的接受访问任何ip地址。
生产环境必须写入应用服务器的地址。服务器需要远程访问,因此需要将其注释掉。
如果打开了保护模式,Redis只能接受这台机器的响应,而不设置绑定ip和密码。
2)保护模式
将本地访问保护模式设置为否.
3)港口
端口号,默认为6379。
4)tcp-backlog
设置tcp的积压。backlog实际上是一个连接队列。backlog queues的总和$=$未完成的三次握手队列$ $已完成三次握手队列。
在高并发环境中,您需要一个高backlog值来避免客户端连接缓慢的问题。
5)超时
空闲客户端保持关闭状态有多长时间?0表示该功能关闭。也就是永远不要靠近。
6)TCP-保持活动状态
对来访客户端的心跳检测,每n秒检测一次。
单位是秒。如果设置为0,将不会执行保活检测。建议设置为60。
1.4一般一般。
1)妖魔化
不管是不是后台进程,都设置为是。
后台启动的守护进程。
2)pidfile
pid文件的位置,每个实例都会产生一个不同的pid文件。
3)日志级别
指定日志记录级别。Redis总共支持四个级别:调试、详细、通知和警告,默认为通知。
4)日志文件
日志文件名。
5)数据库
默认的库数量是16,默认的数据库是0。您可以使用SELECT dbid命令在连接上指定数据库id。
1.5安全保障。
查看、设置和取消访问密码。
在命令中设置密码只是暂时的。重新启动redis服务器,密码将被恢复。
永久,需要在配置文件中设置。
1.6极限极限。
1)最大客户端数
设置redis可以同时连接到多少个客户端。
默认值为10000个客户端。
如果达到这个限制,redis将拒绝新的连接请求,并向这些连接请求者发送达到的最大客户端数量作为响应。
2)最大内存
建议设置,否则内存会满,导致服务器停机。
设置redis可以使用的内存量。一旦达到内存使用限制,redis将尝试删除内部数据,删除规则可以由maxmemory-policy指定。
如果redis不能按照移除规则移除内存中的数据,或者不允许移除,那么对于那些需要申请内存的指令,如SET、LPUSH等,redis会返回一个错误消息。
但是没有内存应用的指令仍然会正常响应,比如GET。如果您的redis是主redis(表明您的redis有从redis),那么在设置内存使用上限时,您需要在系统中留出一些内存空间用于同步队列缓存。只有设置了“不移除”,就不用考虑这个因素了。
3)maxmemory-策略
Volatile-lru:使用lru算法移除密钥,只针对设置了到期时间的密钥(最近最少使用)。
Allkeys-lru:在所有设置的密钥中,使用lru算法移除密钥。
Volatile-random:从过期的密钥集中删除随机密钥,只针对设置了过期时间的密钥。
Allkeys-random:在所有设置的密钥中,移除随机密钥。
Volatile-ttl:去掉那些ttl值最小的密钥,也就是那些最近就要过期的密钥。
无移除:不执行移除。对于写操作,只返回错误信息。
4)最大内存-样本
设置样本数,LRU算法和最小TTL算法都不是精确算法,而是估计值,所以可以设置样本大小。redis会默认检查这么多键,选择LRU键。
一般设置3到7的数字。数字越小,样本越不准确,但性能消耗越少。
二、常用的五种基本数据类型2.2键操作键*:查看当前库中的所有键。
Existkey:确定一个键是否存在。
类型:检查您的密钥的类型。
删除键:删除指定的键数据
Unlink key:根据值选择非阻塞删除,keyspace的元数据中只会删除键。真正的删除将在后续操作中异步执行。
过期密钥10:设置给定密钥的过期时间。
Ttl键:检查多少秒将到期。-1表示永不过期,-2表示过期。
选择:切换数据库的命令。
Dbsize:查看当前数据库中的键的数量。
清空当前库
Flushall:杀死所有库。
2.2 String(字符串)字符串类型是二进制安全的。这意味着Redis字符串可以包含任何数据。例如jpg图像或序列化对象。
类型字符串是Redis最基本的数据类型,一个Redis中的字符串值最大可达512M。
设置键值:添加键值对
Get key:查询对应的键值。
追加键值:将给定值追加到原始值的末尾。
Strenkey:获取值的长度。
Setnx key value:仅在key不存在时设置key的值。
Incr键:将key中存储的数字值加1,只有数字值可以操作。如果为空,新的增量为1(**原子**)。
Deckey:将存储在key中的数字值减1,只能对数字值进行运算。如果为空,则新添加的值为-1。
增加/减少键步长:增加或减少存储在键中的数字值。自定义步长
Mset key1 value1 key2 value2:同时设置一个或多个键值对
管理密钥1密钥2密钥3.同时获取一个或多个值
Msetnx键1值1键2值2.当且仅当所有给定的键都不存在时,才同时设置一个或多个键-值对。
Getrange键起始位置结束位置:获取的值的范围。
Setrange键起始位置值:用值覆盖key存储的字符串值。
设置密钥过期时间值:在设置密钥值时,以秒为单位设置过期时间。
Getkey value:以新值换,设置新值,同时获取旧值。
原子数
原子操作是指不会被线程调度机制中断的操作;
这个操作一旦开始,就会一直运行到最后,中间不会有上下文切换。
在单线程中,所有可以在一条指令中完成的操作都可以被认为是“原子操作”,因为中断只能发生在指令之间。在多线程中,不能被其他进程(线程)中断的操作称为原子操作。Redis单命令的原子性主要得益于Redis的单线程。
数据结构
内部结构类似Java中的ArrayList,预先分配冗余空间,减少内存的频繁分配。
2.3 List(列表)Redis list是一个简单的字符串列表,按照插入顺序排序。您可以将元素添加到列表的头部(左侧)或尾部(右侧)。
它的底层其实是一个双链表,两端性能都很高,通过索引下标操作中间的节点性能会差一些。
L/r按键值1值2值3.从左/右插入一个或多个值。
lpush k1 v1 v2 v3
范围k1 0 -1
输出:v3 v2 v1
-
rpush k1 v1 v2 v3
范围k1 0 -1
输出:v1 v2 v3
Lpop/rpop键:从左/右吐出一个值。值键是存在的,值灯键是死的。
Rpopusskey1 key2:从key1列表的右侧吐出一个值,插入到key2列表的左侧。
Lrange键start stop:根据索引下标获取元素(从左到右)
Lrange mylist 0 -1 0:左边的第一个,-1右边的第一个,(0 -1表示获取全部)
Lindex键索引:根据索引下标(从左到右)获取元素
len键:获取列表长度
l在值newvalue之前/之后插入键:在值之前/之后插入新值。
Lrmkey n value:从左边(从左到右)删除n个值
Ltimkey start end:根据索引截取下标元素(从左到右)
Lsekey索引值:用值替换列表键中的索引值。
数据结构
列表的数据结构是快速链表quickList。
首先,当列表元素很少时,将使用连续内存存储。这个结构就是ziplist,也就是压缩列表。
它存储所有的元素,并分配一个连续的内存。
数据量大的时候会改成quicklist。
因为普通链表需要过多的额外指针空间,会浪费空间。例如,这个列表中只存储int类型的数据,结构上需要两个额外的指针prev和next。
Redis将链表和ziplist结合起来形成快速列表。也就是说,使用双向指针将多个ziplist串在一起。这样既满足了快速插入和删除性能,又不会有太多的空间冗余。
2.4 Set(集合)集合提供的外部功能类似于列表,特殊之处在于集合可以自动复制数据。当需要存储一个列表数据而不希望有重复数据时,集合是一个很好的选择,集合提供了一个判断成员是否在集合集合中的重要接口,这是列表所不能提供的。
Redis集是字符串类型的无序集。它的底层其实是一个空值的哈希表,所以添加、删除、搜索的复杂度是***O(1)***。
一个算法,随着数据的增加,执行时间的长短,如果是***O(1)***,数据增加,找到数据的时间不变。
Sadd键值1值2.向set键添加一个或多个成员元素,现有的成员元素将被忽略。
Smembers键:取出这个集合的所有值。
Sismember key value:判断set key是否包含该值,如果返回1,则不返回0
Scard key:返回集合中元素的个数。
Emkey值1值2.删除集合中的元素。
Spokey:从这个集合中随机吐出一个值。
Srandmember key n: N个值随机取自集合,不会从集合中删除。
移动源目标值:将集合中的值从一个集合移动到另一个集合。
Sinter key1 key2:返回两个集合的交集元素。
Union key1key2:返回两个集合的联合元素。
Sdiff key1 key2:返回两个集合的差集元素(在key1中,但不在key2中)。
数据结构
设置的数据结构是字典,字典是用哈希表实现的。
2.5 Hash (hash) Redis hash是一组键值对。
Redis hash是字符串类型的字段和值的映射表,hash特别适合存储对象。
Et键字段值:为键集合中的字段键赋值。
Hget key1字段:从key1字段获取值。
设置hm key 1字段1值1字段2值2.批量设置hash值。
Existkey1field:检查哈希表键中是否存在给定的域字段。
HKEYSKY:列出这个散列集合的所有字段。
列出这个散列集的所有值。
Hincrby键字段增量:在哈希表键的字段字段值上增加1 -1的增量。
TNX关键字字段值:当且仅当字段不存在时,将哈希表关键字中的字段值设置为value。
数据结构
哈希类型对应的数据结构有两种:ziplist(压缩列表)和hashtable(哈希表)。
当字段值的长度较短且数量较少时,使用ziplist,否则使用hashtable。
2.6 zset(有序集)Redis有序集zset与普通集set非常相似,是一个没有重复元素的字符串集。
不同之处在于,有序集合的每个成员都与一个分数相关联,该分数用于将集合的成员从最低分数到最高分数进行排序。集合的成员是唯一的,但是分数可以重复。
因为元素是有序的,所以我们可以根据分数或者位置快速得到一个范围的元素。
访问有序集的中间元素也非常快,因此您可以将有序集用作没有重复成员的智能列表。
ZAKEY score 1 VALUE 1 SCORE 2 VALUE 2 …:将一个或多个成员元素及其分数值添加到有序集合键中。
Zrange key start stop [WITHSCORES]:返回有序集合键中的元素,在start stop之间带有下标。
使用SCORES,您可以将分数和值返回到结果集。
ZrAngeByScore key min max[with scores][limit offset count]:返回有序集键中所有得分值介于min和max之间的成员(包括等于min或max的成员)。有序集成员按分值递增的顺序排列(从小到大)。
evrangebyscore key max min[with scores][limit offset count]:同上,顺序由大变小。
Zincrby key increment value:为元素的分数增加一个增量。
Emkey value:删除此集合下具有指定值的元素。
Zcount key min max:计算集合中元素的数量,并给出分数区间。
Zrank键值:返回该值在集合中的排名,从0开始。
数据结构
SortedSet(zset)是Redis提供的一种非常特殊的数据结构。一方面相当于Java的数据结构***Map String,Double ***,可以给每个元素值一个权重分值。另一方面类似于TreeSet,将内部元素按照权重分值排序,这样就可以得到每个元素的排名,也可以通过分值的范围得到元素的列表。
zset的底层使用了两种数据结构。
Hash,hash的作用是将元素值和权重分值关联起来,保证元素值的唯一性。可以通过元素值找到对应的分值跳转表。跳转表的目的是根据分数的范围对元素值进行排序并获得元素列表。
三。Redis的出版和订阅。Redis发布和订阅(pub/sub)是一种消息通信模式:发送方(pub)发送消息,订阅方(sub)接收消息。
Redis客户可以订阅任意数量的频道。
客户可以订阅频道。
当您将消息发布到此通道时,该消息将被发送到订阅客户端。
订阅频道#订阅频道
在发布频道hello #频道上发送信息。
四。事务和锁机制Redis事务是一个独立的隔离操作:事务中的所有命令都是序列化的,按顺序执行。在事务执行过程中,不会被其他客户端发送的命令请求中断。
Redis事务的主要作用是将多个命令串联起来,防止其他命令插队。
4.1多重、执行、丢弃
多(前缀)
高级管理人员
抛弃
从多命令开始,输入命令将依次进入命令队列,但不会被执行。在输入exec命令之前,Redis会依次执行前面命令队列中的命令。
组队过程中,可以通过丢弃来放弃组队。
成功组队并成功提交。
放弃团队。
队里有命令错误,不会执行。
组队时不会报错,执行时会报错。
当团队中的一个命令报告错误时,执行该命令时,所有队列都将被取消。
4.2悲观锁(悲观锁),即每次去获取数据时,你都认为其他线程会修改它,所以每次获取数据时你都会锁,这样其他线程就会被阻塞,直到成功获取锁。(效率低)
4.3乐观锁定乐观锁定,就是每次去获取数据的时候,你都认为其他线程不会修改,所以不会锁定,但是在更新的时候,你会判断这段时间是否有其他线程更新这个数据,可以使用版本号等机制。
乐观锁定适用于多读应用程序,可以提高吞吐量。
Redis使用这种检查和设置机制来实现事务。
4.4 Watch,unwatch在执行multi之前,先执行watch key1 [key2],可以监控一个(或多个)键。如果在事务执行前该键被其他命令更改,事务将被中断。
取消监视命令以监视所有键。如果EXEC命令或DISCARD命令在WATCH命令之后首先执行,则不需要执行UNWATCH。
4.5交易三个特征分开隔离操作
事务中的所有命令都被序列化并按顺序执行。在事务执行过程中,不会被其他客户端发送的命令请求中断。没有隔离级别的概念。
队列中的命令在提交之前不会被实际执行,因为在提交事务之前,任何命令都不会被实际执行。不保证原子性
如果一个命令在事务中执行失败,后续命令仍将执行,而不会回滚。
动词(verb的缩写)持久性5.1 RDB在指定的时间间隔内将内存中数据集的快照写入磁盘,即Snapshot快照。恢复时,快照文件直接读入内存。
Redis将为持久性创建一个单独的子进程(fork)。
首先,将数据写入一个临时文件,然后在持久化过程完成后,将这个临时文件的内容覆盖到dump.rdb。
在整个过程中,主进程不执行任何IO操作,保证了极高的性能。如果需要大规模的数据恢复,并且对数据恢复的完整性不是很敏感,那么RDB比AOF更有效率。
RDB的缺点是最后一次持久化后的数据可能会丢失。
Fork用于复制与当前流程相同的流程。新过程的所有数据值(变量、环境变量、程序计数器等。)与原工艺相同,但却是全新的工艺。作为Linux中原进程的子进程,fork()会产生一个和父进程完全一样的子进程,但是子进程会在后面多次调用exec系统。为了提高效率,在Linux中引入了写时复制技术。通常,父进程和子进程会共享相同的物理内存,只有各自的进程空间。
在redis.conf中配置文件名,默认为dump.rdb
转储保存位置
可以修改rdb文件的保存路径。缺省值是Redis启动时命令行所在的目录。
bgsave出错时停止写入
即当redis无法写入磁盘时,关闭redis的写操作。
RDB压缩
持久性文件是否被压缩和存储。
rdbchecksum
完整性检查,即数据是否完整准确。
救援
指示写操作的次数。
一个
格式:保存第二次写操作拷贝数
优点和缺点
适合大规模数据恢复;对数据完整性和一致性要求不高,比较适合使用;节省磁盘空间;恢复速度快。劣势
Fork,内存中的数据被克隆,需要考虑2倍左右的扩展;Redis虽然在fork中使用了写时复制技术,但是在数据量巨大的情况下,仍然会消耗性能;在备份周期的特定时间间隔进行备份,这样,如果Redis意外关闭,最后一个快照之后的所有更改都将丢失。5.2 AOF以日志的形式记录每一次写操作(增量保存),记录Redis执行的所有写指令(不记录读操作)。只能追加文件,但不能重写文件。在Redis启动之初,将读取该文件以重建数据。换句话说,如果Redis重新启动,将根据日志文件的内容从前到后执行写指令,完成数据恢复。
执行过程
客户端的请求写命令将被附加到AOF缓冲区;AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件;当AOF文件大小超过重写策略或手动重写时,会重写AOF文件,压缩AOF文件容量;当Redis服务重新启动时,将重新加载AOF文件中的写操作以进行数据恢复。当a of和RDB同时打开时,系统会默认读取AOF的数据(数据不会丢失)
默认情况下,不会打开配置AOF。
文件名
AOF同步频率设置
appendfsync总是
永远同步,每次Redis写完,马上记录;
性能差但数据完整性好。
每秒appendfsync
每秒同步,每秒记录。如果停机,这一秒钟的数据可能会丢失。
appendfsync编号
Redis不主动同步,而是把同步的机会交给操作系统。
重写压缩
当AOF文件的大小超过设置的阈值时,Redis将启动AOF文件的内容压缩,只留下可以恢复数据的最小指令集。您可以使用命令bgrewriteaof。
优点和缺点
备份机制更健壮,丢失数据的概率更低;可读的日志文本,通过AOF健壮的操作,可以处理误操作。劣势
它比RDB占用更多的磁盘空间;备份速度慢;如果每次读写同步,有一定的性能压力;有个别bug,无法恢复。
5.3选择官方推荐来启用两者。
如果你对数据不敏感,你可以选择单独使用RDB。
不建议单独使用AOF,因为可能会出现bug。
如果只做纯内存缓存,可以不使用。
不及物动词主/从复制主机的数据更新后,会根据配置和策略自动同步到备机的主/从机制。主以写为主,从以读为主。
读写分离,性能扩展,容灾,快速恢复,一主多从!6.1设置一个主设备和两个从设备来创建文件目录/opt/etc
将redis.conf复制到当前目录cp /etc/redis.conf /opt/etc/
创建三个redis.conf配置文件redis6379.conf
redis6380.conf
redis6381.conf
启动三台redis服务器redis6379.conf
redis6380.conf
redis6381.conf# redis6379.conf
包括/opt/etc/redis.conf
pidfile /var/run/redis_6379.pid
端口6379
dbfilename dump6379.rdb
# redis6380.conf
包括/opt/etc/redis.conf
pidfile /var/run/redis_6380.pid
端口6380
dbfilename dump6380.rdb
# redis6381.conf
包括/opt/etc/redis.conf
pidfile /var/run/redis_6381.pid
端口6381
dbfilename dump6381.rdb
检查主机运行状况信息复制
永远配不上ip端口的主从
#成为实例的奴隶
再次检查主机操作。
成功构建。
6.2一主二仆主人6379,奴隶6380和6381。
假设从机6380挂机。当6380重启后,6380不再是6379的从机,而是新的主机;
当6380再次作为6379的从机添加时,从机从头到尾复制数据。假设主机6379挂机。680和6381还是6379的奴隶,什么都不会做;
当6379重新启动时,它仍然是主服务器。6.3代代相传
上一个从机可以是下一个从机的主机,从机也可以接收其他从机的连接和同步请求。那么这个从作为链条中的下一个主,可以有效的减轻主的写压力,去中心化可以降低风险。
从属ip端口
中途改变方向:之前的数据会被清除,重新建立最新的副本。
当一个奴隶倒下时,它后面的奴隶没有一个能被备份。
即当主设备挂机时,从设备仍然是从设备,但不能继续写数据。
6.4反客为主当一个主倒下的时候,后面的从马上可以升为主,后面的从不需要任何修改。
不奴役任何人
6.5自动版Sentinel模式,即可以在后台监控主机是否出现故障。如果失败,它会根据票数自动将从库转换为主库。
创建sentinel.conf文件以配置sentinel monitor my master 172 . 16 . 88 . 168 6379 1
# mymaster:监视对象的服务器的名称。
# 1:至少有多少哨兵同意迁移的数量。启动sentinel redis-sentinel/opt/etc/sentinel . conf
当主机挂起时,将从机器选举中生成新的主机。选举规则。
选举规则
根据优先级,从优先级/副本优先级,给优先级最高的一个优先级。
根据偏移量,优先选择偏移量大的。根据runid,最小的服务是首选的。复制延迟由于所有的写操作都是先在主机上执行,再同步到从机,所以从主机同步到从机有一定的延迟。当系统非常繁忙时,延迟问题会更加严重,从机的增加也会使这个问题更加严重。
6.6复制原则从机在成功连接到主机后会发送一个sync命令(同步命令)。在接收到命令后,主机启动后台保存过程,持久保存数据,并收集所有接收到的用于修改数据集的命令。后台进程完成后,主设备会将整个数据文件(rdb)传输到从设备,以完成完整的同步。当主服务写入时,它与从服务器同步数据。完整副本:在接收到数据库文件数据后,从属服务保存它并将其加载到内存中。增量:主站继续将所有新收集的修改命令依次发送给从站,完成同步。只要重新连接主服务器,就会自动执行完全同步(完全复制)。
七。集群容量不够。redis如何扩容?
redis是如何为并发写操作分配的?
主从模式,传递模式,主机停机,导致ip地址的改变,在应用中的配置需要修改相应的主机地址,端口等信息。
解决方法:
代理主机(以前)
分散式集群配置(redis3.0)
Redis集群实现了Redis的横向扩展,即启动N个Redis节点,将整个数据库存储在这N个节点中,每个节点存储总数据的1/N。
Redis集群通过分区提供一定程度的可用性。即使群集中的一些节点出现故障或无法通信,群集也可以继续处理命令请求。
7.1设置Redis集群创建配置文件#以redis6379.conf为例
包括/opt/etc/redis.conf
Pifile/var/run/redis _ 6379.pid #更改
端口6379 #更改
Dbfilename dump6379.rdb #更改
启用群集是#打开群集模式。
cluster-config-file nodes-6379 . conf #设置需要更改的节点配置文件名。
Cluster-node-timeout 15000 #设置节点丢失事件。在此时间(毫秒)之后,集群将自动从主服务器切换到从服务器。
开始
在将6个节点组合成一个cluster #组合之前,确保在启动所有redis实例之后正常生成nodes-xxxx.conf文件。
#转到redis安装目录
/opt/redis-6.2.6/src
#执行
redis-cli - cluster创建群集副本1 172 . 16 . 88 . 168:6379 172 . 16 . 88 . 168:6380 172 . 16 . 88 . 168:6381 172 . 16 . 88 . 168:6389 172 . 16 . 88 . 168:6390 172 . 16 . 88 . 168:6391
使用集群策略连接redis-cli -c -p端口
Cluster nodes #命令查看集群信息
7.2问题redis集群如何分配这六个节点?一个集群必须至少有三个主节点。
选项- cluster-replicas 1表示您希望为集群中的每个主节点创建一个从节点。
分配原则应该尽量确保每个主数据库运行在不同的IP地址,并且每个从数据库和主数据库不在同一个IP地址。
什么是老虎机?
Redis集群包含16,384个散列槽,数据库中的每个键都属于这16,384个槽中的一个。
使用群集公式CRC16(key)% 16384来计算密钥属于哪个槽,其中CRC16(key)语句用于计算密钥的CRC16校验和。
集群中的每个节点负责处理一些槽。例如,如果一个群集可以有一个主节点,其中:
节点A负责处理插槽0到5460。节点B负责处理时隙5461到10922。节点C负责处理槽10923到16383。如何在集群中输入值?每次在redis-cli中输入和查询密钥值时,redis都会计算密钥应该发送到的槽。如果不是客户端对应的服务器的插槽,redis会报错,通知redis实例地址和应该去的端口。
Redis-cli客户端提供c参数来实现自动重定向。
例如redis-cli -c -p 6379登录后,可以通过输入和查询来自动重定向键值对。
如何查询集群中的值?每个主机只能查询其自身范围内的插槽。
Cluster keyslot key:查询某个键的**slot**。
集群密钥槽:查询槽是否有值。
CLUSTER GETKEYSINSLOT槽计数:返回槽计数中的键。
故障恢复?如果主节点离线?从节点可以自动提升为主节点吗?注意:15秒超时。
第40号命令个6379挂掉后,6389成为新的主机。主节点恢复后,主从关系会如何?主节点回来变成从机。
第40号命令个6379重启后,6379成为6389的从机。如果所有某一段插槽的主从节点都宕掉,重复服务是否还能继续?
如果某一段插槽的主从都挂掉,什么事群集-要求完全复盖=是,那么,整个集群都挂掉。如果某一段插槽的主从都挂掉,什么事群集-要求完全复盖=否,那么,该插槽数据全都不能使用,也无法存储。redis.conf文件中的参数群集-要求完全复盖
S7-1200可编程控制器优缺点优点
实现扩容;分摊压力;无中心配置相对简单。缺点
多键操作是不被支持的;多键的再说一遍事务是不被支持的。左上臂脚本不被支持;由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至重定向群集,需要整体迁移而不是逐步过渡,复杂度较大。
八杰德!杰德操作Redis8.1 Java操作再说一遍。依赖依赖关系
组身份重定向客户端/组身份
artifactId jedis /artifactId
版本3 .2 .0/版本
/依赖
连接共和阶层杰迪斯德莫
public static void main(string[]args)>
杰迪斯杰迪斯=新绝地( 192。168 .57 .101 ,6379);
string pong=jedis。ping();
System.out.println(连接成功*乒乓球;
杰迪斯。close();
}
}
关键点
jedis.set(k1 , v1 );
绝地。set( k2 , v2 );
绝地。set( k3 , v3 );
设置字符串密钥=jedis。按键(*);
系统。出去。println(键。size());
对于(字符串关键点:关键点)
System.out.println(密钥):
}
系统。出去。println(杰迪斯。存在( k1 );
系统。出去。println(杰迪斯。TTL( k1 );
系统。出去。println(杰迪斯。get( k1 );字串(字符串)
绝地。mset( str 1 , v1 , str2 , v2 , str3 , v3 ;
系统。出去。println(杰迪斯。mget( str 1 , str2 , str 3 );列表(列表)
list字符串list=jedis。lrenge( mylist ,0,-1);
对于(字符串元素:列表)}
System.out.println元素:
}集
绝地。sadd( orders , order 01 );
绝地。sadd( orders , order 02 );
绝地。sadd( orders , order 03 );
绝地。sadd( orders , order 04 );
设置字符串smembers=jedis。成员(订单);
对于(字符串顺序:成员)
System.out.println(命令);
}
绝地。srem(订单,订单02 );杂凑(哈希)
jedis.hset(hash1 , userName , lis );
系统。出去。println(杰迪斯。hget(哈希1 ,用户名);
Map String,String map=new HashMap String,String();
地图。put(电话, 138101699999 );
map.put(address , at guigu );
map.put(email , ABC @ 163。com’);
jedis.hmset(hash2 ,map);
列表字符串结果=jedis。hmget(哈希2 ,电话,电子邮件):
对于(字符串元素:结果)}
System.out.println元素:
} zset
绝地。zad( zset 01 ,100d, z3 );
绝地。zad( zset 01 ,90d, l4 );
绝地。zad( zset 01 ,80d, w5 );
绝地。zad( zset 01 ,70d, z6 );
设置字符串zrange=jedis.zrange(zset01 ,0,-1);
对于(字符串e:zrange)}
系统。输出。println(e);
8.2杰德主从复制专用静态jediedinelpool jediedinelpool=null;
公共静态jedis getjedisfromssentinel()& gt;
if(jedisstineltool==null)}
set string语句set=new hashset();
哨兵。添加( 172。16 .88 .168:26379 );//端口为感觉到了吗
jedispoolconfig jedispoolconfig=new jedispoolconfig();
jedispoolconfig。setmaxtotal(10);//最大可用连接数
jedispooconfig。setmaxidle(5);//最大闲置连接数
jedispol配置。setminidle(5);//最小闲置连接数
jedispoolconfig。setblockwhenexhausted(true);//连接耗尽是否等待
jedispooconfig。setmaxwaitmillis(2000年年);//等待时间
jedispoolconfig。settstonbrow(true);//取连接的时候进行测试
jeiditinelpool=new jeiditinelpool( my master ,sentinelset,jedispoolconfig//服务主机名
返回jedisstineltool。getresome();
}
其他
返回jedisstineltool。getresome();
}
}
8.3版集群的杰迪丝开发即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。
无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。
公共类jedisclustertest
public static void main(string[]args)>
set HostAndPort set=new HashSet HostAndPort
set.add(新主机和端口
t("172.16.88.168",6379)); // 任何一个端口
JedisCluster jedisCluster = new JedisCluster(set);
jedisCluster.set("k1", "v1");
System.out.println(jedisCluster.get("k1"));
}
}
SpringBoot整合Redis依赖
!-- redis --
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-data-redis /artifactId
/dependency
!-- spring2.X集成redis所需common-pool2--
dependency
groupId org.apache.commons /groupId
artifactId commons-pool2 /artifactId
version 2.6.0 /version
/dependency
配置文件配置Redis
#Redis服务器地址
spring.redis.host= ip
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
Redis配置类(需要继承CachingConfigurerSupport)@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate String, Object redisTemplate(RedisConnectionFactory factory){
RedisTemplate String, Object template = new RedisTemplate ();
RedisSerializer String redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
// key序列化方式
template.setKeySerializer(redisSerializer);
// value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
// value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory){
RedisSerializer String redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
// 解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}九、应用问题解决9.1 缓存穿透
1)现象:
key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。
比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
造成:
应用服务器压力变大。redis命中率下降 $\longrightarrow$ 查询数据库 。2)如何解决:
对空值缓存
如果一个查询返回的数据为空(不管是数据是否不存在),仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟。设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,则不允许访问。采用布隆过滤器
布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。进行实时监控
当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。9.2 缓存击穿
key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。
数据库访问压力瞬间增大。redis中没有出现大量key过期,redis正常运行。(即某个经常访问的key过期,突然有大量访问这个数据)如何解决:
预先设置热门数据
在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长。实时调整
现场监控哪些数据热门,实时调整key的过期时长。使用锁9.3 缓存雪崩key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。
缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key。
数据库压力变大。即极少的时间段,查询大量key的集中过期情况。如何解决:
构建多级缓存架构
nginx缓存 +redis缓存 + 其他缓存(ehcache等)使用锁或队列:
用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况。设置过期标志更新缓存:
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。将缓存失效时间分散开:
比如我们可以在原有的失效时间基础上增加一个随机值,比如 1~5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
9.4 分布式锁问题描述
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程的特点以及分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
基于数据库实现分布式锁基于缓存(Redis 等)基于 Zookeeper根据实现方式,分布式锁还可以分为类 CAS 自旋式分布式锁以及 event 事件类型分布式锁:
类 CAS 自旋式分布式锁:询问的方式,类似 java 并发编程中的线程获询问的方式尝试加锁,如 mysql、redis。另外一类是 event 事件通知进程后续锁的变化,轮询向外的过程,如 zookeeper、etcd。每一种分布式锁解决方案都有各自的优缺点:
性能:redis 最高可靠性:zookeeper 最高
优化设置锁和过期时间
设置锁的命令
SETNX KEY VALUE # 设置锁
del key # 删除锁给锁设置过期时间
expire users 30
这样设置的问题:如果设置时间和上锁分开进行的话,可能存在上完锁,服务器down了,就没有设置过期时间。
优化:上锁和设置过期时间同时
set key value nx ex timejava代码实现
@GetMapping("testLock")
public void testLock(){
//1获取锁,setne ,顺便设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del
redisTemplate.delete("lock");
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
优化之UUID防止误删
不过代码除了修改的设置过期时间问题,还存在问题,入下图所示:
解决方法:
代码实现:
@GetMapping("testLock")
public void testLock(){
String uuid = UUID.randomUUID().toString();
//1获取锁,setne ,顺便设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3,TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
...
String lockUuid = (String)redisTemplate.opsForValue().get("lock");
if(uuid.equals(lockUuid)){
//2.4释放锁,del
redisTemplate.delete("lock");
}
}else{
...
}
}
优化之LUA脚本保证删除的原子性
原因:
解决方案:使用lua脚本保证删除的原子性
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call(get, KEYS[1]) == ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript Long redisScript = new DefaultRedisScript ();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
互斥性。在任意时刻,只有一个客户端能持有锁。不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。加锁和解锁必须具有原子性。
9.5 RedLockRedlock 是一种算法,Redlock 也就是 Redis Distributed Lock,可用实现多节点 redis 的分布式锁。RedLock 官方推荐,Redisson 完成了对 Redlock 算法封装。
此种方式具有以下特性:
互斥访问:即永远只有一个 client 能拿到锁。避免死锁:最终 client 都可能拿到锁,不会出现死锁的情况,即使锁定资源的服务崩溃或者分区,仍然能释放锁。容错性:只要大部分 Redis 节点存活(一半以上),就可以正常提供服务RedLock 原理(了解)
获取当前 Unix 时间,以毫秒为单位。依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。如果因为某些原因,获取锁失败(没有在至少 N/2+1 个 Redis 实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。
©
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。