本文基于黑马2022的Redis课程实战篇编写,课程地址:黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目
课程资料:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA&pwd=eh11
教程中将以6.2.6版本的Redis进行演示
引言
本文将基于黑马点评项目的各种场景进行Redis学习:
会使用redis共享session来实现
结合代码理解缓存击穿,缓存穿透,缓存雪崩等问题
学会Redis的计数器功能,结合Lua完成高性能的redis操作,同时学会Redis分布式锁的原理,包括Redis的三种消息队列
利用Redis的GEOHash来完成对于地理坐标的操作
主要是使用Redis来完成统计功能
使用Redis的BitMap数据统计功能
基于Set集合的关注、取消关注,共同关注等等功能
基于List来完成点赞列表的操作,同时基于SortedSet来完成点赞的排行榜功能

短信登录
导入黑马点评项目
导入MySQL表

导入后端项目
资料中提供了一个项目源码:

注意,jdk版本为1.8,启动失败可能是没设置好jdk版本
导入前端工程

在文件目录下打开cmd窗口,输入start nginx.exe启动前端项目

基于Session实现登录流程
发送验证码:
用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号
如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户
短信验证码登录、注册:
用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息
校验登录状态:
用户在请求时候,会从cookie中携带者JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并且放行

实现模拟发送短信验证码功能
页面流程:

代码实现:
在com.hmdp.service.IUserService中创建Result sendCode(String phone, HttpSession session);方法并在com.hmdp.service.impl.UserServiceImpl中实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Override
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);
// 4.保存验证码到 session
session.setAttribute("code", code);
// 5.用日志模拟发送验证码
log.debug("发送短信验证码成功,验证码:{}", code);
// 返回处理成功
return Result.ok();
}
}
|
然后修改com.hmdp.controller.UserController中的sendCode方法
1
2
3
4
5
6
7
|
/**
* 发送手机验证码
*/
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
return userService.sendCode(phone, session);
}
|
同理,在服务层实现login方法,并在控制层直接调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1.校验手机号(没做手机号是否相同的校验,不过这不是重点)
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 2.校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if(cacheCode == null || !cacheCode.toString().equals(code)){
// 不一致,报错
return Result.fail("验证码错误");
}
// 一致,根据手机号查询用户
User user = query().eq("phone", phone).one();
// 3.判断用户是否存在
if(user == null){
// 不存在,则创建
user = createUserWithPhone(phone);
}
// 4.保存用户信息到session中(只存放必要属性)
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
return Result.ok();
}
private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
// 默认昵称:user_随机字符串
user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
// 保存用户到数据库
save(user);
return user;
}
|
实现登录校验拦截功能
tomcat的运行原理:

当用户发起请求时,会访问我们像tomcat注册的端口,任何程序想要运行,都需要有一个线程对当前端口号进行监听,tomcat也不例外,当监听线程知道用户想要和tomcat连接连接时,那会由监听线程创建socket连接,socket都是成对出现的,用户通过socket互相传递数据,当tomcat端的socket接受到数据后,此时监听线程会从tomcat的线程池中取出一个线程执行用户请求,在我们的服务部署到tomcat后,线程会找到用户想要访问的工程,然后用这个线程转发到工程中的controller,service,dao中,并且访问对应的DB,在用户执行完请求后,再统一返回,再找到tomcat端的socket,再将数据写回到用户端的socket,完成请求和响应。
通过以上讲解,我们可以得知:每个用户其实对应都是去找tomcat线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用threadlocal来做到线程隔离,每个线程操作自己的一份数据

可以在com.hmdp.config包下编写登录拦截器类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.hmdp.config;
import com.hmdp.dto.UserDTO;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
@Component
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取session
HttpSession session = request.getSession();
// 获取session中的用户
Object user = session.getAttribute("user");
// 判断用户是否存在
if(user == null) {
// 不存在,拦截,返回401
response.setStatus(401);
return false;
}
// 存在,则将用户信息保存到ThreadLocal(UserHolder类中已实现通过ThreadLocal存取用户信息)
UserHolder.saveUser((UserDTO) user);
// 放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户,避免内存泄漏
UserHolder.removeUser();
}
}
|
在com.hmdp.config包中创建一个配置类,在其中配置登录拦截器,使其生效:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package com.hmdp.config;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
@RequiredArgsConstructor
public class MvcConfig implements WebMvcConfigurer {
private final LoginInterceptor loginInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(loginInterceptor)
.excludePathPatterns( // 排除一些不需要拦截的路径
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
);
}
}
|
现在已经实现存入当前用户信息的功能,也就是说同时也能获取用户信息了,可以修改com.hmdp.controller.UserController中的me方法了:
1
2
3
4
5
6
|
@GetMapping("/me")
public Result me(){
// 获取当前登录的用户并返回
UserDTO user = UserHolder.getUser();
return Result.ok(user);
}
|
session共享问题
每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时整个登录拦截功能就会出现问题,*如何解决这个问题呢?*早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
但是这种方案具有两个大问题
1、每台服务器中都有完整的一份session数据,服务器压力过大。
2、session拷贝数据时,可能会出现延迟
所以后来采用的方案都是基于redis来完成,把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了

Redis代替session的业务流程
设计key的结构
首先要思考一下*此处redis到底使用哪种结构来存储数据呢?*由于存入的数据比较简单,可以考虑使用String,或者是使用哈希,如下图,如果使用String,value会多占用一点空间,如果使用哈希,则它的value中只会存储数据本身,而且可以单独对某个字段进行修改。不过如果不是特别在意内存,其实使用String就可以了。

所以下面使用String结构存储数据,接下来考虑如何给key命名
在设计这个key的时候,需要满足两点:
1、key要具有唯一性
2、key要方便携带
如果采用phone:手机号这种形式存储当然是可以的,但是如果把这样的敏感数据存储到redis中并且从页面中带过来毕竟不太合适,所以选择在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了
整体访问流程
当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致,如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到redis,并且生成token作为redis的key,当我们校验用户是否登录时,会去携带着token进行访问,从redis中取出token对应的value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到threadLocal中,并且放行。

基于Redis实现短信登录
首先对com.hmdp.service.impl.UserServiceImpl进行修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
package com.hmdp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.dto.LoginFormDTO;
import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.User;
import com.hmdp.mapper.UserMapper;
import com.hmdp.service.IUserService;
import com.hmdp.utils.RedisConstants;
import com.hmdp.utils.RegexUtils;
import com.hmdp.utils.SystemConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.hmdp.utils.RedisConstants.*;
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);
// 4.保存验证码到 session
// session.setAttribute("code", code);
// 4.保存验证码到 redis , 2分钟有效
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
// 5.用日志模拟发送验证码
log.debug("发送短信验证码成功,验证码:{}", code);
// 返回处理成功
return Result.ok();
}
// 基于session的登录功能
// @Override
// public Result login(LoginFormDTO loginForm, HttpSession session) {
// // 1.校验手机号(没做手机号是否相同的校验,不过这不是重点)
// String phone = loginForm.getPhone();
// if (RegexUtils.isPhoneInvalid(phone)) {
// // 如果不符合,返回错误信息
// return Result.fail("手机号格式错误!");
// }
// // 2.校验验证码
// Object cacheCode = session.getAttribute("code");
// String code = loginForm.getCode();
// if(cacheCode == null || !cacheCode.toString().equals(code)){
// // 不一致,报错
// return Result.fail("验证码错误");
// }
// // 一致,根据手机号查询用户
// User user = query().eq("phone", phone).one();
//
// // 3.判断用户是否存在
// if(user == null){
// // 不存在,则创建
// user = createUserWithPhone(phone);
// }
//
// // 4.保存用户信息到session中
// session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
//
// return Result.ok();
// }
// 基于redis的登录功能
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号(没做手机号是否相同的校验,不过这不是重点)
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 2. 校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if(cacheCode == null || !cacheCode.equals(code)){
// 不一致,报错
return Result.fail("验证码错误");
}
// 一致,根据手机号查询用户
User user = query().eq("phone", phone).one();
// 3. 判断用户是否存在
if(user == null){
// 不存在,则创建
user = createUserWithPhone(phone);
}
// 4. 保存用户信息到redis中
// 4.1 随机生成token,作为前端的登录令牌
String token = UUID.randomUUID().toString(true);
// 4.2 将User对象转为HashMap存储,方便用putAll方法一次性存入redis,不用多次访问
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
// !!!注意:因为id是long类型,直接转换会报错,所以需要将id的值转为string类型,beanToMap方法正好支持自定义功能!!!
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); // 将字段值转为string类型
// 4.3 存储
stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap);
// 4.4 设置token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 5. 返回token
return Result.ok(token);
}
private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
// 默认昵称:user_随机字符串
user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
// 保存用户到数据库
save(user);
return user;
}
}
|
然后对com.hmdp.config.LoginInterceptor进行修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package com.hmdp.config;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.UserDTO;
import com.hmdp.utils.UserHolder;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;
@Component
@RequiredArgsConstructor
public class LoginInterceptor implements HandlerInterceptor {
// 基于session获取用户信息的前置拦截器
// @Override
// public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// // 获取session
// HttpSession session = request.getSession();
// // 获取session中的用户
// Object user = session.getAttribute("user");
// // 判断用户是否存在
// if(user == null) {
// // 不存在,拦截,返回401
// response.setStatus(401);
// return false;
// }
// // 存在,则将用户信息保存到ThreadLocal(UserHolder类中已实现通过ThreadLocal存取用户信息)
// UserHolder.saveUser((UserDTO) user);
// // 放行
// return true;
// }
private final StringRedisTemplate stringRedisTemplate;
// 基于redis获取用户信息的前置拦截器
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取请求头中的token
String token = request.getHeader("authorization");
// 判断token是否存在
if(StrUtil.isBlank(token)) {
// 不存在,拦截,返回401
response.setStatus(401);
return false;
}
// 基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
// 判断用户是否存在
if(userMap.isEmpty()) {
// 不存在,拦截,返回401
response.setStatus(401);
return false;
}
// 将用户信息从HashMap转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 将用户信息保存到ThreadLocal(UserHolder类中已实现通过ThreadLocal存取用户信息)
UserHolder.saveUser(userDTO);
// 刷新token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户,避免内存泄漏
UserHolder.removeUser();
}
}
|
注意,在拦截器中添加刷新token有效期功能,每次校验通过后重置token过期时间,不然用户正常使用过程中突然token过期,要重新登录就很不友好了
现在就可以基于redis保存验证码和用户信息了

解决状态登录刷新问题
还是前面提到的token过期问题,前面的拦截器只是对需要登录的路径进行拦截,也就是只会在需要登录的路径下刷新token过期时间,如果用户一直不亦乐乎地看不需要登录的路径(如主页)的话,等到一段时间后点开个人中心却发现要重新登录,那就很难受了

优化方案
既然之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了threadLocal的数据,所以此时第二个拦截器只需要判断拦截器中的user对象是否存在即可,完成整体刷新功能。

代码实现
复制一份com.hmdp.config.LoginInterceptor然后重命名为RefreshTokenInterceptor放在config包下,然后进行修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package com.hmdp.config;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.UserDTO;
import com.hmdp.utils.UserHolder;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_KEY;
import static com.hmdp.utils.RedisConstants.LOGIN_USER_TTL;
@Component
@RequiredArgsConstructor
public class RefreshTokenInterceptor implements HandlerInterceptor {
private final StringRedisTemplate stringRedisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取请求头中的token
String token = request.getHeader("authorization");
// 判断token是否存在
if(StrUtil.isBlank(token)) {
// 不存在也放行
return true;
}
// 基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
// 判断用户是否存在
if(userMap.isEmpty()) {
return false;
}
// 将用户信息从HashMap转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 将用户信息保存到ThreadLocal(UserHolder类中已实现通过ThreadLocal存取用户信息)
UserHolder.saveUser(userDTO);
// 刷新token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户,避免内存泄漏
UserHolder.removeUser();
}
}
|
然后修改原本的LoginInterceptor拦截器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package com.hmdp.config;
// ...
@Component
public class LoginInterceptor implements HandlerInterceptor {
// 基于redis获取用户信息的前置拦截器(经过RefreshTokenInterceptor优化版)
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 判断是否需要拦截:若ThreadLocal中有用户信息就放行,否则拦截
if(UserHolder.getUser() == null) {
// 没有用户信息则拦截,并返回401
response.setStatus(401);
return false;
}
// 有用户信息则放行
return true;
}
}
|
写完依旧是要在配置类中进行配置后才能生效,所以修改com.hmdp.config.MvcConfig:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package com.hmdp.config;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
@RequiredArgsConstructor
public class MvcConfig implements WebMvcConfigurer {
private final LoginInterceptor loginInterceptor;
private final RefreshTokenInterceptor refreshTokenInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 添加登录拦截器
registry.addInterceptor(loginInterceptor)
.excludePathPatterns( // 排除一些不需要拦截的路径
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
).order(1); // 修改执行优先级(默认是0),值越小优先级越高,值越大优先级越低
// 添加刷新token拦截器
registry.addInterceptor(refreshTokenInterceptor)
.addPathPatterns("/**") // 拦截所有
.order(0);
}
}
|
商户查询缓存
缓存简介
什么是缓存
缓存(Cache),就是数据交换的缓冲区,是临时存储数据的地方,一般读写性能较高。
为什么要用缓存
因为速度快,好用。缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力
实际开发过程中,企业的数据量,少则几十万,多则几千万,这么大数据量,如果没有缓存来作为缓冲,系统是几乎撑不住的,所以企业会大量运用到缓存技术
但是缓存也会增加代码复杂度和运营的成本:

如何使用缓存
实际开发中,会构筑多级缓存来使系统运行速度进一步提升,例如本地缓存与redis中的缓存并发使用
浏览器缓存:主要是存在于浏览器端的缓存
应用层缓存:可以分为tomcat本地缓存,比如之前提到的map,或者是使用redis作为缓存
数据库缓存:在数据库中有一片空间是 buffer pool,增改查数据都会先加载到mysql的缓存中
CPU缓存:当代计算机最大的问题是 cpu性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了cpu的L1,L2,L3级的缓存

添加商户缓存
当前代码在查询商户信息时,是直接从数据库中去进行查询的,效率很低,所以需要增加缓存
当前方案:
1
2
3
4
5
|
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
// 无论什么情况都是直接查询数据库
return Result.ok(shopService.getById(id));
}
|
缓存模型和思路
标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis。

代码实现
在服务层实现,在com.hmdp.service.IShopService中创建queryById方法,并在其实现类中实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
// 从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
// 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 存在,直接返回
// 返回之前得先把json转换为Shop对象
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 不存在,则根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
// 若没从数据库查询到数据,返回错误
return Result.fail("店铺不存在");
}
// 若查到,则缓存到redis中再返回
// 存入redis之前得先将shop对象转换为json
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}
}
|
然后修改控制层的queryShopById方法:
1
2
3
4
5
6
7
8
9
|
/**
* 根据id查询商铺信息
* @param id 商铺id
* @return 商铺详情数据
*/
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return shopService.queryById(id);
}
|
练习
可以参考前面的商户缓存,给/shop-type/list接口也写一个缓存,这个接口是点开主页就会调用,也很适合使用缓存,接口在com.hmdp.controller.ShopTypeController
我的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
// com.hmdp.controller.ShopTypeController
@RestController
@RequestMapping("/shop-type")
public class ShopTypeController {
@Resource
private IShopTypeService typeService;
@GetMapping("list")
public Result queryTypeList() {
return typeService.queryTypeList();
}
}
// com.hmdp.service.impl.ShopTypeServiceImpl
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
@Resource
public StringRedisTemplate stringRedisTemplate;
@Override
public Result queryTypeList() {
// 查询redis缓存
List<String> shopList = stringRedisTemplate.opsForList().range(CACHE_SHOP_TYPE_LIST_KEY, 0, -1);
if (!shopList.isEmpty()) {
// 若缓存中有数据,则先转换为ShopType集合,然后返回
List<ShopType> shopTypeList = shopList.stream()
.map(shopJson -> JSONUtil.toBean(shopJson, ShopType.class))
.collect(Collectors.toList());
return Result.ok(shopTypeList);
}
// 若缓存中不存在,则查询数据库
List<ShopType> typeList = query().orderByAsc("sort").list();
if (typeList.isEmpty()) {
// 若数据库中不存在,则返回错误
return Result.fail("店铺类型列表查询结果为空");
}
// 写入缓存,然后返回
stringRedisTemplate.opsForList().leftPushAll(CACHE_SHOP_TYPE_LIST_KEY, typeList.stream()
.map(JSONUtil::toJsonStr) // 转换为json
.collect(Collectors.toList()));
return Result.ok(typeList);
}
}
|
缓存更新策略
缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数据进行更新,或者把它叫为淘汰更合适。
内存淘汰:redis自动进行,当redis内存达到设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
超时剔除:当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便我们继续使用缓存
主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

数据库缓存不一致解决方案
由于我们的缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在,其后果是:
用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等
*怎么解决呢?*有如下几种方案
Cache Aside Pattern:人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
Read/Write Through Pattern:由系统本身完成,数据库与缓存的问题交由系统本身去处理
Write Behind Caching Pattern:调用者只操作缓存,其他线程去异步处理数据库,实现最终一致

数据库和缓存不一致采用什么方案
综合考虑使用方案一,但是*方案一调用者如何处理呢?*这里有几个问题
操作缓存和数据库时有三个问题需要考虑:
-
删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存,一般使用这个方案
-
如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布式事务方案
-
先操作缓存还是先操作数据库?
缓存更新策略最佳实践
-
低一致性需求:使用Redis自带的内存淘汰机制
-
高一致性需求:主动更新,并以超时剔除作为兜底方案
- 读操作:
- 缓存命中则直接返回
- 缓存未命中则查询数据库,并写入缓存,设置超时时间
- 写操作:
- 先写数据库,然后删除缓存
- 要确保数据库与缓存操作的原子性
实现商铺和缓存与数据库双写一致
修改ShopController中的业务逻辑,满足下面的需求:
①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
②根据id修改店铺时,先修改数据库,再删除缓存
修改com.hmdp.service.impl.ShopServiceImpl#queryById方法
1
|
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
|
然后修改商户的更新数据库的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Controller层
@PutMapping
public Result updateShop(@RequestBody Shop shop) {
// 写入数据库
// shopService.updateById(shop);
// return Result.ok();
return shopService.update(shop);
}
// Service层略,下面是ServiceImpl层
@Override
@Transactional // 保证原子性
public Result update(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("店铺id不能为空");
}
// 更新数据库
updateById(shop);
// 删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}
|
缓存穿透问题
缓存穿透问题的解决思路
缓存穿透:缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
若是不做防护,不怀好意者就能通过大量请求不存在的数据,从而让数据库进行大量查询操作,影响数据库性能。
常见的解决方案有两种:
缓存空对象思路分析:当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了。不过若是之后真的存入了这个数据,但没有及时更新redis的话,就还是会查到空数据,导致数据不一致,所以需要设置一个合适的过期时间。
布隆过滤:布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中,
假设布隆过滤器判断这个数据不存在,则直接返回
这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器走的是哈希思想,只要哈希思想,就可能存在哈希冲突

前面这两种都是被动解决缓存穿透,是等别人穿透时再进行防护,也可以主动进行防护,比如增强id的复杂度,避免被猜测id规律、做好数据的基础格式校验、加强用户权限校验、做好热点参数的限流等
解决商品查询的缓存穿透问题
接下来使用缓存空对象的方法来解决缓存穿透问题
核心思路如下:
在原来的逻辑中,如果发现这个数据在mysql中不存在,直接就返回查询失败了,这样是会存在缓存穿透问题
优化后的逻辑:如果这个数据不存在,会把这个数据写入到Redis中,并且将value设置为空,当再次发起查询时,如果查到的value不是null,则判断这个value是否是空字符串,如果是空字符串,则证明是缓存穿透数据,如果不是,则直接返回数据。

修改com.hmdp.service.impl.ShopServiceImpl#queryById的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
@Override
public Result queryById(Long id) {
// 从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
// 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 存在,直接返回
// 返回之前得先把json转换为Shop对象
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 缓存中不存在对应数据时:
// 现在有可能是为了防止缓存穿透而设置的""字符串,所以需判断一下
if (shopJson != null && shopJson.equals("")) {
// 返回错误信息
return Result.fail("店铺不存在");
}
// 到这说明是真不存在
// 根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
// 若没从数据库查询到数据,返回错误
// 返回错误信息之前,将空值写入缓存
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return Result.fail("店铺不存在");
}
// 若查到,则缓存到redis中再返回
// 存入redis之前得先将shop对象转换为json
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
|
小结
缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存null值
- 布隆过滤
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩问题及解决思路
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存

缓存击穿问题
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

逻辑分析:假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法,那么这些线程都不能从缓存中查询到数据,那么他们就会在同一时刻来访问查询缓存,然后都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
常见的解决方案有两种:
利用互斥锁解决缓存击穿问题
因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。

假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。

核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿
互斥锁可以利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true,如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false。
我们可以通过true,或者是false,来表示线程是否成功插入key,成功插入key的线程就是获得到锁的线程。
下面是修改后的com.hmdp.service.impl.ShopServiceImpl:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package com.hmdp.service.impl;
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
// 从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
// 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 存在,直接返回
// 返回之前得先把json转换为Shop对象
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 缓存中不存在对应数据时:
// 现在有可能是为了防止缓存穿透而设置的""字符串,所以需判断一下
if (shopJson != null && shopJson.equals("")) {
// 返回错误信息
return Result.fail("店铺不存在");
}
// 到这说明是真不存在
// 然后开始实现缓存重建
try {
while(true) {
// 先获取互斥锁
// 判断是否获取成功
boolean isLockSuccess = tryLock(LOCK_SHOP_KEY + id);
if (isLockSuccess) {
try {
// 加锁成功,然后进行DoubleCheck,再次检查redis缓存是否存在
// 从redis中查询商铺缓存
String shopJson4Check = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
// 判断是否存在
if (StrUtil.isNotBlank(shopJson4Check)) {
// 存在,直接返回
// 返回之前得先把json转换为Shop对象
Shop shop = JSONUtil.toBean(shopJson4Check, Shop.class);
return Result.ok(shop);
}
// 缓存中不存在对应数据时:
// 现在有可能是为了防止缓存穿透而设置的""字符串,所以需判断一下
if (shopJson4Check != null && shopJson4Check.equals("")) {
// 返回错误信息
return Result.fail("店铺不存在");
}
// 根据id查询数据库
Shop shop = getById(id);
if (shop == null) {
// 若没从数据库查询到数据,返回错误
// 返回错误信息之前,将空值写入缓存
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return Result.fail("店铺不存在");
}
// 若查到,则缓存到redis中再返回
// 存入redis之前得先将shop对象转换为json
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
} finally {
// 释放互斥锁
unlock(LOCK_SHOP_KEY + id);
}
} else {
Thread.sleep(50);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// ...update方法不变
/**
* 尝试获取锁
* @param key
* @return true表示加锁成功,false表示加锁失败
*/
private boolean tryLock(String key) {
// setIfAbsent()为key不存在时则设置,存在则返回false
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);
// 返回flag(flag为true时返回true,为false或null时都返回false)
return BooleanUtil.isTrue(flag);
}
// 释放锁
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}
|
可以使用Jmeter进行压力测试,正常情况下日志里最多只会有一条mysql查询记录



利用逻辑过期解决缓存击穿问题
之所以会出现这个缓存击穿问题,主要原因是在于我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用内存了吗,所以我们可以采用逻辑过期方案。

可以把过期时间设置在redis的value中,注意:这个过期时间并不会直接作用于redis,而是我们后续通过逻辑去处理。假设线程1去查询缓存,然后从value中判断出来当前的数据已经过期了,此时线程1会获得互斥锁,那么其他线程会被阻塞,获得了锁的线程会开启一个线程去进行以前的重构数据的逻辑,直到新开的线程完成这个逻辑后,才释放锁,而线程1不会在旁边等这些操作,而是会直接返回查到的过期数据。假设现在线程3过来访问,由于线程线程1持有着锁,所以线程3无法获得锁,线程3也直接返回数据,只有等到新开的线程2把重建数据构建完后,其他线程才能走返回正确的数据。
这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题

当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value是否过期,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
首先定义一个RedisData类,用于存放逻辑过期的数据(下载的代码中应该自带了)
1
2
3
4
5
6
7
8
9
10
11
|
package com.hmdp.utils;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
|
然后修改com.hmdp.service.impl.ShopServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package com.hmdp.service.impl;
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
// 创建线程池用于逻辑过期方案的缓存重建
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 查询店铺数据——使用逻辑过期解决缓存击穿
* 前提:redis中已存在对应店铺信息
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
// 从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
// 判断是否存在
if (StrUtil.isBlank(shopJson)) {
// 不存在,直接返回
return Result.fail("店铺不存在");
}
// 存在,则判断是否过期
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
if (LocalDateTime.now().isBefore(expireTime)) {
// 未过期,直接返回商铺信息
return Result.ok(shop);
}
// 已过期,继续判断能否获取锁
boolean isLockSuccess = tryLock(LOCK_SHOP_KEY + id);
if (isLockSuccess) {
// 加锁成功
// 进行DoubleCheck
// 从redis中查询商铺缓存
shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
// 判断是否存在
if (StrUtil.isBlank(shopJson)) {
// 不存在,直接返回
return Result.fail("店铺不存在");
}
// 存在,则判断是否过期
redisData = JSONUtil.toBean(shopJson, RedisData.class);
shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
expireTime = redisData.getExpireTime();
if (LocalDateTime.now().isBefore(expireTime)) {
// 未过期,直接返回商铺信息
return Result.ok(shop);
}
// DoubleCheck结束,确定需要缓存重建
// 开启一个独立线程来重建缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 重建缓存
saveShop2Redis(id, 20L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁,写在finally中保证一定能释放
unlock(LOCK_SHOP_KEY + id);
}
});
}
// 返回商铺信息(成功、失败最后都要先返回商铺信息)
return Result.ok(shop);
}
// ...update方法不变
// ...tryLock和unlock方法也不变
/**
* 将携带逻辑过期时间的店铺数据写入redis(逻辑过期方案的缓存重建)
* @param id
* @param expireSeconds
*/
private void saveShop2Redis(Long id, Long expireSeconds) {
// 查询店铺数据
Shop shop = getById(id);
// 封装shop对象和逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
// 写入redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
}
|
这种方案需要redis中已经存在数据,才能更新逻辑过期时间,所以得给saveShop2Redis方法写个单元测试,给redis中先写缓存,使用单元测试写数据的时候记得把saveShop2Redis的修饰改为public
1
2
3
4
5
6
7
8
9
10
11
12
13
|
package com.hmdp.service.impl;
@SpringBootTest
class ShopServiceImplTest {
@Resource
private ShopServiceImpl shopService;
@Test
void saveShop2RedisTest() {
shopService.saveShop2Redis(1L, 3L);
}
}
|
接下来就可以运行项目使用Jmeter进行测试了,这次调整为100个线程,然后我把数据库中的商铺名从“102茶餐厅”改为“103茶餐厅”





可以看到,前三次查询返回的是旧数据,从第四次查询开始,第一次查询时开启的线程修改完了数据,所以返回的就是更新之后的数据了,而且看日志确实只查询了一次MySQL数据库,与预期一致
两者对比
互斥锁方案:由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响
逻辑过期方案:线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦

封装Redis工具类
如果每次都需要重复写这些缓存击穿、缓存穿透等解决方案的话,还是比较麻烦的,一般会封装成一个工具类来直接调用
所以接下来基于StringRedisTemplate封装一个简易的缓存工具类,满足下列需求:
- 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓
存击穿问题
- 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
代码实现:
在com.hmdp.utils包下创建一个CacheClient类,在其中编写上面的四个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
package com.hmdp.utils;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.hmdp.utils.RedisConstants.*;
@Slf4j
@Component
@RequiredArgsConstructor
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
/**
* 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
* @param key
* @param value
* @param time
* @param unit
*/
public void set(String key, Object value, Long time, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}
/**
* 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间
* @param key
* @param value
* @param time
* @param unit
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
// 封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
/**
* 根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
* @param keyPrefix 前缀
* @param id 要查询的id
* @param type 返回的数据类型
* @param dbFallback MySQL数据库查询逻辑
* @param time 过期时间
* @param unit 时间单位
* @return
* @param <R> 返回类型
* @param <ID> id的数据类型
*/
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
// 获取redis中的数据
String json = stringRedisTemplate.opsForValue().get(keyPrefix + id);
// 判断redis中是否有数据
if (StrUtil.isNotBlank(json)) {
// 存在,将json反序列化为对象后返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是用于防止缓存穿透的空值(能到这不是null就是空字符串)
if(json != null) {
return null;
}
// 如果是null,根据id查询数据库
R result = dbFallback.apply(id);
if (result == null) {
// 数据库没有,将空值写入redis,用于防止缓存穿透
stringRedisTemplate.opsForValue().set(keyPrefix + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 数据库中有,将数据写入redis
this.set(keyPrefix + id, JSONUtil.toJsonStr(result), time, unit);
return result;
}
// 创建线程池用于逻辑过期方案的缓存重建
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 根据指定的key查询缓存,并反序列化为指定类型,利用逻辑过期解决缓存击穿问题
* @param keyPrefix 前缀
* @param lockKeyPrefix 锁的前缀
* @param id 要查询的id
* @param type 返回的数据类型
* @param dbFallback MySQL数据库查询逻辑
* @param time 过期时间
* @param unit 时间单位
* @return
* @param <R> 返回类型
* @param <ID> id的数据类型
*/
public <R, ID> R queryWithLogicalExpire(String keyPrefix, String lockKeyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 从redis中查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 判断是否存在
if (StrUtil.isBlank(json)) {
// 不存在,直接返回
return null;
}
// 存在,则判断是否过期
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
R result = JSONUtil.toBean(data, type);
LocalDateTime expireTime = redisData.getExpireTime();
if (LocalDateTime.now().isBefore(expireTime)) {
// 未过期,直接返回商铺信息
return result;
}
// 已过期,继续判断能否获取锁
String lockKey = lockKeyPrefix + id;
boolean isLockSuccess = tryLock(lockKey);
if (isLockSuccess) {
// 加锁成功
// 进行DoubleCheck
// 从redis中查询商铺缓存
json = stringRedisTemplate.opsForValue().get(key);
// 判断是否存在
if (StrUtil.isBlank(json)) {
// 不存在,直接返回
return null;
}
// 存在,则判断是否过期
redisData = JSONUtil.toBean(json, RedisData.class);
result = JSONUtil.toBean((JSONObject) redisData.getData(), type);
expireTime = redisData.getExpireTime();
if (LocalDateTime.now().isBefore(expireTime)) {
// 未过期,直接返回商铺信息
return result;
}
// DoubleCheck结束,确定需要缓存重建
// 开启一个独立线程来重建缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 重建缓存:
// 查询数据库
R r = dbFallback.apply(id);
// 写入redis
this.setWithLogicalExpire(keyPrefix + id, r, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁,写在finally中保证一定能释放
unlock(lockKey);
}
});
}
// 返回商铺信息(成功、失败最后都要先返回商铺信息)
return result;
}
/**
* 尝试获取锁
* @param key
* @return true表示加锁成功,false表示加锁失败
*/
private boolean tryLock(String key) {
// setIfAbsent()为key不存在时则设置,存在则返回false
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);
// 返回flag(flag为true时返回true,为false或null时都返回false)
return BooleanUtil.isTrue(flag);
}
// 释放锁
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}
|
然后可以在com.hmdp.service.impl.ShopServiceImpl中直接使用封装好的queryWithLogicalExpire方法了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package com.hmdp.service.impl;
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private CacheClient cacheClient;
/**
* 查询店铺数据——使用逻辑过期解决缓存击穿——利用封装好的CacheClient
* 前提:redis中已存在对应店铺信息
* @param id
* @return
*/
@Override
public Result queryById(Long id) {
Shop shop = cacheClient.queryWithLogicalExpire(CACHE_SHOP_KEY, LOCK_SHOP_KEY, id, Shop.class,
this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
if (shop == null) {
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}
// ...update方法保持不变
// tryLock、unlock、saveShop2Redis方法都可以删了
}
|
优惠券秒杀
全局唯一ID
实现策略
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用MySQL数据库的自增ID就存在一些问题:
场景分析一:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。
场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

ID的组成部分:
Redis实现
在com.hmdp.utils下创建一个类RedisIdWorker用来实现全局唯一id:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package com.hmdp.utils;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@Component
@RequiredArgsConstructor
public class RedisIdWorker {
/**
* 时间戳起始时间 2026年1月1日 00:00:00
*/
private static final long BEGIN_TIMESTAMP = 1767225600L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;
private final StringRedisTemplate stringRedisTemplate;
/**
* 获取下一个id
* @param keyPrefix 区分不同业务的前缀
* @return
*/
public long nextId(String keyPrefix) {
// 获取当前时间戳
LocalDateTime now = LocalDateTime.now();
long nowEpochSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowEpochSecond - BEGIN_TIMESTAMP;
// 生成序列号
// 获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// 找到名为"icr:{keyPrefix}:{date}"的key,然后将其value+1后返回
// 若没找到则创建并将其value设置为1后返回
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 拼接并返回
// 先将时间戳左移32位(序列号位数),然后将空出来的部分利用或运算填入序列号
return timestamp << COUNT_BITS | count;
}
// public static void main(String[] args) {
// // 获取一个起始时间戳
// LocalDateTime time = LocalDateTime.of(2026, 1, 1, 0, 0, 0);
// long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
// System.out.println(epochSecond); // 输出:1767225600
// }
}
|
可以在测试类中测试一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.hmdp;
import com.hmdp.utils.RedisIdWorker;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(300);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}
}
|
可以从输出结果中看出,即使有300个线程一起获取id也不会出现重复id
添加优惠券功能介绍
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
平价卷由于优惠力度并不是很大,所以是可以任意领取
而特价代金券由于优惠力度大,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段
代码中已经在com.hmdp.controller.VoucherController实现了新增两种优惠券的方法了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
* 新增普通券
* @param voucher 优惠券信息
* @return 优惠券id
*/
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}
/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
|
可以先用/voucher/seckill接口添加一张秒杀券用于后面测试:
1
2
3
4
5
6
7
8
9
10
11
12
|
{
"shopId": 1,
"title": "100元代金券",
"subTitle": "周一至周五均可使用",
"rules": "全场通用\\n无需预约\\n可无限叠加\\n不兑现、不找零\\n仅限堂食",
"payValue": 8000,
"actualValue": 10000,
"type": 1,
"stock": 100,
"beginTime": "2026-05-05T10:09:17",
"endTime": "2026-05-07T12:09:04"
}
|
实现秒杀券下单功能

下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
下单核心逻辑分析:
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。

修改com.hmdp.controller.VoucherOrderController:
1
2
3
4
5
6
7
8
9
10
11
12
|
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private IVoucherOrderService voucherOrderService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
}
|
然后编写seckillVoucher方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package com.hmdp.service.impl;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// 查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀时间是否开启和结束
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime()) || LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
// 若在正确时间则直接结束
return Result.fail("不在秒杀时间");
}
// 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 若库存不足则直接结束
return Result.fail("库存不足");
}
// 库存充足则扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
return Result.fail("库存扣减失败");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 设置用户id
voucherOrder.setUserId(UserHolder.getUser().getId());
// 设置代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
}
|
现在就可以在下图页面中完成简单的秒杀券下单功能了

解决并发场景下超卖问题
超卖问题说明
有关超卖问题分析:在前面的代码中是这么写的
1
2
3
4
5
6
7
8
9
10
|
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}
boolean isSuccess = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!isSuccess) {
//扣减库存
return Result.fail("库存不足!");
}
|
假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

解决方案
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁,而对于加锁,通常有两种解决方案

悲观锁:
悲观锁可以实现对于数据的串行化执行(一个一个依次执行),比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等。但性能较低
乐观锁:


在CAS法中,直接用库存代替版本,在每次修改前确认库存是否与之前查到的一致
乐观锁解决超卖问题
那就按照上面的思路,对原有代码进行修改:
1
2
3
4
5
|
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", seckillVoucher.getStock()) // 判断库存是否与先前查到的一致
.update();
|
但在进行并发测试后会发现,能成功下单的线程是小于库存量的,也就是说明明有库存,但无法下单,从超卖变成卖不出去了
因为以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的。
但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败
优化上面的代码:
前面的方式成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0即可
1
2
3
4
5
|
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
|
现在只要库存小于零,就会下单失败,大于0时不影响正常下单
使用200个线程来测试:

与预期一致,50%异常表示就下单成功了100单,与库存一致,并没有出现超卖或卖不出去问题
一人一单
需求分析
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
现在的问题在于:
优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单
具体操作逻辑如下:根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则无法再下单

逐步完善
现在先按照上面的思路来继续完善com.hmdp.service.impl.VoucherOrderServiceImpl#seckillVoucher:
1
2
3
4
5
6
7
8
9
10
|
// 查询该用户是否已下过该秒杀券单(限一人一单)
Long userId = UserHolder.getUser().getId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
return Result.fail("当前用户已购买过该秒杀券");
}
|
在其中添加上述逻辑后,继续用JMeter进行并发测试看看,期望结果应该是库存只扣减了1。

但实际测试后会发现,在一个用户使用大量线程下单时,依旧会出现一个用户下了多单的情况,与预期不符。
所以还是需要加锁,乐观锁比较适合更新数据,而现在是插入数据,所以使用悲观锁操作,但如果直接给新增秒杀券订单业务加锁会导致性能严重下降,所以只针对用户id对单个用户加锁
所以继续对代码进行修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀时间是否开启和结束
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime()) || LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
// 若在正确时间则直接结束
return Result.fail("不在秒杀时间");
}
// 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 若库存不足则直接结束
return Result.fail("库存不足");
}
// 提取业务为一个单独的方法
return createVoucherOrder(voucherId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 查询该用户是否已下过该秒杀券单(限一人一单)
Long userId = UserHolder.getUser().getId();
// 为什么要用 intern()?
// 如果直接用 userId.toString(),每次都会 new 一个新的 String 对象。
// synchronized 锁的是对象实例,不同的 String 对象会导致锁失效(无法互斥)。
// intern() 会利用字符串常量池,保证值相同的字符串在内存中是同一个对象,从而确保锁有效。
// 不使用 intern() 的话就和没锁差不多
synchronized (userId.toString().intern()) {
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
return Result.fail("当前用户已购买过该秒杀券");
}
// 库存充足,扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0) // 确保不超卖
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
return Result.fail("库存扣减失败");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 设置用户id
voucherOrder.setUserId(userId);
// 设置代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
} // 到这就释放锁了
}
}
|
但现在依旧存在问题,依旧是并发安全问题,按照当前逻辑,synchronized锁会在返回订单id后立刻释放,但执行到这里的时候,事务还没有提交,也就是这些操作还没保存到数据库中。要是在这一瞬间又有新的线程进来,查到的数据依然是旧数据,就会认为当前用户还没有购买过该秒杀券,导致同一用户重复下单,出现并发安全问题。
所以要修改synchronized锁的范围:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀时间是否开启和结束
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime()) || LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
// 若在正确时间则直接结束
return Result.fail("不在秒杀时间");
}
// 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 若库存不足则直接结束
return Result.fail("库存不足");
}
// 查询该用户是否已下过该秒杀券单(限一人一单)
Long userId = UserHolder.getUser().getId();
// 为什么要用 intern()?
// 如果直接用 userId.toString(),每次都会 new 一个新的 String 对象。
// synchronized 锁的是对象实例,不同的 String 对象会导致锁失效(无法互斥)。
// intern() 会利用字符串常量池,保证值相同的字符串在内存中是同一个对象,从而确保锁有效。
// 不使用 intern() 的话就和没锁差不多
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
return Result.fail("当前用户已购买过该秒杀券");
}
// 库存充足,扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0) // 确保不超卖
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
return Result.fail("库存扣减失败");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 设置用户id
voucherOrder.setUserId(userId);
// 设置代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
}
|
现在将整个createVoucherOrder方法都锁起来后,才能确保事务一定提交到了数据库,确保线程安全
但是,现在的代码还是存在问题,现在seckillVoucher方法没有加@Transactional事务注解,只有提取出来的createVoucherOrder方法了事务注解,而在seckillVoucher方法中是直接用this.createVoucherOrder(voucherId)这种方式调用的(this省略了而已),而this拿到的是当前类对象,而不是当前类的代理对象,事务注解的原理是Spring对当前类进行了动态代理,利用代理对象实现的事务管理。而this拿到的当前类对象并不是代理对象,所以就没有代理功能,此时事务就会失效
简单来说,当一个类中的普通方法(seckillVoucher)直接调用同类中带有 @Transactional 注解的方法(createVoucherOrder)时,事务注解会失效。这个问题一般称为Spring AOP 的“自调用” (Self-Invocation) 问题
最终结果
可以利用AopContext.currentProxy()方法拿到当前Service(Bean)的代理对象,也就是IVoucherOrderService接口的代理对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.hmdp.service.impl;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀时间是否开启和结束
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime()) || LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
// 若在正确时间则直接结束
return Result.fail("不在秒杀时间");
}
// 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 若库存不足则直接结束
return Result.fail("库存不足");
}
// 查询该用户是否已下过该秒杀券单(限一人一单)
Long userId = UserHolder.getUser().getId();
// 为什么要用 intern()?
// 如果直接用 userId.toString(),每次都会 new 一个新的 String 对象。
// synchronized 锁的是对象实例,不同的 String 对象会导致锁失效(无法互斥)。
// intern() 会利用字符串常量池,保证值相同的字符串在内存中是同一个对象,从而确保锁有效。
// 不使用 intern() 的话就和没锁差不多
synchronized (userId.toString().intern()) {
// Spring 默认使用 JDK 动态代理,其实现原理是给目标类所实现的接口生成动态代理(动态代理类会实现目标类所实现的所有接口)
// 所以拿到的代理对象,可以强转为当前类所实现的接口,而不能转为当前实现类
// 注意:需在启动类添加 @EnableAspectJAutoProxy(exposeProxy = true) 才能获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
return Result.fail("当前用户已购买过该秒杀券");
}
// 库存充足,扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0) // 确保不超卖
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
return Result.fail("库存扣减失败");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 设置用户id
voucherOrder.setUserId(userId);
// 设置代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
}
|
需要在接口中也写上createVoucherOrder方法:
1
2
3
4
5
6
|
public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
Result createVoucherOrder(Long voucherId);
}
|
还需要在pom.xml文件中补充相关依赖,以及在启动类中添加@EnableAspectJAutoProxy(exposeProxy = true)注解才行
1
2
3
4
5
|
<!-- 导入AopContext.currentProxy()所需的 aspectj 依赖 -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
|

测试
同样使用同一个用户账号,开200个线程对/voucher-order/seckill/{id}接口进行测试

结果与预期一致,只有第一次能成功下单,后续均下单失败
集群环境下的并发问题
模拟集群环境
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
接下来模拟集群环境,进行测试
1.将服务启动两份,端口分别为8081和8082:
找到idea的服务菜单:


2.修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/json;
sendfile on;
keepalive_timeout 65;
server {
listen 8080;
server_name localhost;
# 指定前端项目所在的位置
location / {
root html/hmdp;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location /api {
default_type application/json;
#internal;
keepalive_timeout 30s;
keepalive_requests 1000;
#支持keep-alive
proxy_http_version 1.1;
rewrite /api(/.*) $1 break;
proxy_pass_request_headers on;
#more_clear_input_headers Accept-Encoding;
proxy_next_upstream error timeout;
#proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
}
}
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
}
|

如果在修改前已经启动nginx,可以在nginx根目录打开命令行输入nginx.exe -s reload重新加载配置
可以在synchronized锁内部打个断点,此时用一个用户的id连续对/voucher-order/seckill/{id}接口发起两次请求(先清空之前的购买记录),会发现两个请求都进到断点里面了,并没有成功锁住第二个请求

有关锁失效原因分析
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

分布式锁
基本原理和实现方式
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么就能锁住不同JVM的线程,不让线程并行,让程序串行执行,这就是分布式锁的核心思路

那么分布式锁应该满足一些什么样的条件呢?
-
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
-
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
-
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
-
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
-
安全性:安全也是程序中必不可少的一环
常见的分布式锁有三种:
-
Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见
-
Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
-
Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本篇文章并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

Redis分布式锁的实现核心思路
实现分布式锁时需要实现的两个基本方法:
-
获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
-
释放锁:
1
2
3
4
|
# 获取锁,NX表示互斥,EX是设置超时时间,seckill:lock:1是key,locked是value
SET seckill:lock:1 locked NX EX 10
# 释放锁
DEL seckill:lock:1
|
大致流程如下:

初步实现分布式锁
先在com.hmdp.utils包下创建一个锁的基本接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package com.hmdp.utils;
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁的过期时间,秒单位
* @return true表示加锁成功,false表示加锁失败
*/
boolean tryLock(Long timeoutSec);
/**
* 释放锁
*/
void unLock();
}
|
然后实现这个接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(Long timeoutSec) {
// 获取线程标识
long theadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
// 不存在才设置
.setIfAbsent(KEY_PREFIX + name, String.valueOf(theadId), timeoutSec, TimeUnit.SECONDS);
// 避免空指针
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
|
然后修改之前VoucherOrderServiceImpl的业务代码,尝试现在是否能成功加锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package com.hmdp.service.impl;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
// 查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 判断秒杀时间是否开启和结束
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime()) || LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
// 若在正确时间则直接结束
return Result.fail("不在秒杀时间");
}
// 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
// 若库存不足则直接结束
return Result.fail("库存不足");
}
// 查询该用户是否已下过该秒杀券单(限一人一单)
Long userId = UserHolder.getUser().getId();
// 使用分布式锁
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁
boolean isLock = lock.tryLock(120L);
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败,直接返回失败(当前业务是防止一个用户重复下单,所以不用重试)
return Result.fail("请勿重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unLock();
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 代码不变
}
}
|
在39行if (!isLock)这行打个断点测试:


isLock的值一个是true,一个是false。确实是只有一个线程拿到了锁,成功在不同JVM中加锁了
Redis分布式锁的误删问题
出现误删的情况
持有锁的线程在锁的内部出现了阻塞,导致它的锁超时自动释放,这时线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况

解决方案
在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果不属于自己,则不进行锁的删除。假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后准备删除锁,但是线程1一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

修改后的流程:

解决误删问题
修改之前的分布式锁实现,在获取锁时存入线程标示(可以用UUID表示)
能不能直接用前面的线程ID作为标识?
不能,因为线程的id是从1开始,JVM每次创建一个新线程时自动递增的,在集群环境有多个JVM的情况下,有可能会出现两个线程id相同的情况
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
修改之前编写的SimpleRedisLock类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
// 用于区分不同JVM中的锁
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(Long timeoutSec) {
// 获取线程标识
String theadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
// 不存在才设置
.setIfAbsent(KEY_PREFIX + name, theadId, timeoutSec, TimeUnit.SECONDS);
// 避免空指针
return Boolean.TRUE.equals(success);
}
@Override
public void unLock() {
// 获取线程标识
String theadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断锁标识是否一致,一致则释放锁,不一致就无操作
if (theadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
|
分布式锁的原子性问题
出现问题的情景说明
更为极端的误删逻辑说明:
线程1现在持有锁之后,在执行业务逻辑过程中,已经通过了锁标识是否一致的条件判断,正准备释放锁的时候被阻塞了(比如JVM在执行Full GC时会触发“Stop-The-World”(STW),暂停所有应用线程),阻塞期间它的锁到期了,那么此时线程2进来开始执行业务,但是线程1阻塞结束后会接着往后执行,就会执行释放锁的那行代码,相当于条件判断并没有起到作用,这就是释放锁时的原子性问题。
之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的

Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,下面重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了
Redis提供的调用函数,语法如下:
1
|
redis.call('命令名称', 'key', '其它参数', ...)
|
例如,我们要执行set name jack,则脚本是这样:
1
2
|
# 执行 set name jack
redis.call('set', 'name', 'jack')
|
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
1
2
3
4
5
6
|
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
|
写好脚本以后,需要用Redis命令来调用脚本
例如,要执行 redis.call(‘set’, ’name’, ‘jack’) 这个脚本,语法如下:

如果脚本中的key、value不想写死,也可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

释放锁的业务流程是这样的
1、获取锁中的线程标示
2、判断是否与指定的标示(当前线程标示)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做
如果用Lua脚本来表示则是这样的:
1
2
3
4
5
6
7
8
|
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
|
利用Java代码调用Lua脚本改造分布式锁
RedisTemplate中,可以利用execute方法去执行lua脚本

在resources目录下编写释放锁的lua脚本:

1
2
3
4
5
6
7
8
|
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
|
优化SimpleRedisLock中的释放锁逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
// 用于区分不同JVM中的锁
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(Long timeoutSec) {
// 不变
}
@Override
public void unLock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
// 设置key,要传入一个列表
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
}
|
分布式锁小结
分布式锁这一小节到目前为止,先是利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是通过在删之前拿锁、比锁、删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是又出现了原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题
但是目前依旧存在一些问题,将在接下来的redisson小节中介绍并解决
分布式锁-redisson
当前存在的问题
基于setnx实现的分布式锁存在下面的问题:

重入问题:就是当一个线程已经持有了锁,如果它再次尝试获取同一把锁,非重入锁会直接导致线程阻塞,从而引发死锁。可重入锁的意义在于防止死锁,比如在 Java 的 HashTable 中,方法都使用 synchronized 修饰。如果一个已持有锁的方法内部,又调用了另一个同样需要这把锁的方法,若锁不可重入,线程就会在等待自己释放锁,导致程序永久卡死。因此,标准的 synchronized 和 ReentrantLock 都是可重入的,而基于 setnx 的简单实现如果不做额外处理,就无法满足这一需求。
不可重试:当前的实现缺乏重试机制。当线程首次尝试获取锁失败后,会立即返回,而不是等待一段时间后再次尝试。一个健壮的分布式锁应当支持可重试:在获取锁失败后,线程可以在一个指定的时间内,以一定的间隔反复尝试获取锁,直到成功或超过总等待时间。这能有效应对短暂的资源竞争,提升系统的可用性。
超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候误删别人的锁,但提前释放本身就是一个严重的安全隐患。它意味着在业务逻辑尚未执行完毕时,其他线程就可能获取到锁并操作共享资源,从而破坏了锁的互斥性,可能导致数据不一致。
主从一致性: 在Redis主从集群架构下,存在数据同步延迟的风险。当主节点(Master)成功处理了加锁请求后,需要异步地将这个“锁键”同步给从节点(Slave)。如果在这个过程中,主节点突然宕机,而哨兵(Sentinel)机制将一个尚未同步到该锁信息的从节点提升为新的主节点,那么这把锁就会“丢失”。随后,其他线程便能从新的主节点成功获取到同一把锁,导致多个线程同时认为自己是锁的持有者,彻底破坏了分布式锁的互斥性。
redisson简介
那么什么是Redisson呢?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
可以把它理解为 Redis 的一个“超级增强版”Java 客户端。它的核心目标是将 Redis 的能力封装成开发者熟悉的 Java 对象,让你能像操作本地的 HashMap、ArrayList 或 ReentrantLock 一样,轻松实现复杂的分布式功能。
Redisson提供了分布式锁的多种多样的功能,如可重入锁、公平锁、读写锁、红锁(RedLock)等,内置看门狗(Watchdog)机制自动续期,有效防止死锁。
快速入门
引入依赖
1
2
3
4
5
|
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
|
redisson也提供了Spring Boot Starter类型的依赖,但它会修改Spring对Redis的默认配置与实现,如果不希望这些配置与实现被修改,就可以引入这个依赖
配置Redisson客户端
可以在com.hmdp.config包下创建一个Redisson配置类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package com.hmdp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://Redis所在ip地址:6379")
.setPassword("你的密码");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
|
Redisson分布式锁示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
// 判断获取锁是否成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
|
改造先前的业务代码
改造com.hmdp.service.impl.VoucherOrderServiceImpl中的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package com.hmdp.service.impl;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
// 可以删掉stringRedisTemplate了
// @Resource
// private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// -----------------不变-----------------
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime()) || LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("不在秒杀时间");
}
if (seckillVoucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
// -----------------不变-----------------
// 使用分布式锁
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("order:" + userId);
// 获取锁
// boolean isLock = lock.tryLock(120L);
boolean isLock = lock.tryLock(); // 无参时,默认不重试,30s过期
// 判断是否获取锁成功
if (!isLock) {
return Result.fail("请勿重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
// lock.unLock();
lock.unlock();
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 不变
}
}
|
回顾一下,本业务就是要防止一个用户多次下单,所以获取锁失败后不用重试
可重入锁原理
要理解 Redisson 的可重入锁,我们可以先从 Java 原生的锁机制说起。
在 Java 的 ReentrantLock 中,其可重入性是通过一个 volatile 的 state 变量来维护的。state 记录了锁被持有的次数:当 state 为 0 时,表示锁是空闲的;当某个线程首次获取锁时,state 变为 1;如果同一个线程再次获取这把锁,state 就会自增。释放锁时则相反,state 递减,直到归零才表示锁被完全释放。synchronized 关键字的底层实现也遵循类似的逻辑,通过一个计数器来判断锁的重入状态。
那么,在分布式环境下,Redisson 是如何实现这一特性的呢?
Redisson 的核心思想是将“线程标识”和“重入次数”这两个关键信息存储在 Redis 中,从而在分布式环境下模拟出本地锁的可重入行为。
- 数据结构:Redisson 使用 Redis 的 Hash 结构 来存储锁的信息。
- Key:锁的名称(例如
"myLock")。
- Field:持有锁的线程的唯一标识,通常是
UUID:ThreadId 的组合。这能确保不同客户端、不同线程之间的隔离。
- Value:一个计数器,记录该线程重入这把锁的次数。

- 加锁流程:
- 首次加锁:当一个线程尝试加锁时,Redisson 会通过 Lua 脚本检查 Redis 中是否存在这把锁。如果不存在,它会在 Hash 结构中创建一个新的条目,将当前线程的标识作为 Field,并将 Value(重入次数)设为 1。
- 再次加锁(重入):如果锁已存在,Redisson 会再次通过 Lua 脚本检查 Hash 结构中是否已经包含当前线程的标识。如果存在,说明是当前线程在重入,它会将对应的 Value 加 1。
获取锁的lua脚本:

- 解锁流程:
- 每次解锁时,Redisson 会将当前线程对应的重入次数减 1。
- 只有当重入次数减到 0 时,才会真正地从 Redis 中删除这把锁的整个 Hash 结构,从而释放锁。
释放锁的lua脚本:

通过这种方式,Redisson 利用 Redis 的 Hash 结构和保证原子性的 Lua 脚本,完美地在分布式环境中实现了与本地 ReentrantLock 类似的可重入特性,有效避免了同一线程因重复获取锁而导致的死锁问题。

秒杀优化
异步秒杀思路
先来回顾一下下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤
1、查询优惠卷
2、判断秒杀库存是否足够
3、查询订单
4、校验是否是一人一单
5、扣减库存
6、创建订单

在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行,这样就会导致我们的程序执行的很慢,所以我们需要通过程序异步执行来优化响应速度
优化方案:我们将耗时的数据库操作(如创建订单、扣减库存)与快速的资格校验(如库存判断、一人一单)进行解耦。资格校验通过 Redis 和 Lua 脚本原子性地完成,一旦校验通过,我们便将订单信息放入一个阻塞队列中,并立即向用户返回成功。同时,我们启动一个独立的后台线程池,它持续地从队列中获取订单信息,并异步地完成后续的下单流程。这种方式极大地提升了接口的响应速度,并且通过线程池管理后台任务,有效避免了资源耗尽的风险。不过这里边有两个难点

- 第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断?
一人一单:可以在Redis中利用set集合判断,这个set集合的key存放优惠券id,value存放用户id,每次下单时根据优惠券id判断该用户id是否已在value中,若不在,则可以下单,并将id放入value。若在,则说明已经下过单,无法继续下单。
库存判断:直接在Redis中存放一个键值对,key是对应优惠券id,value是库存,每次下单减1,若为0则说明无库存了。
- 第二个难点是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成?
可以在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
我们现在来看看整体思路:当用户下单之后,判断库存是否充足只需要到redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,并将userId和优惠卷存入到redis中,并且返回0。整个过程需要保证是原子性的,我们可以使用lua来操作

当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
优化秒杀逻辑
当前需求:
- 新增秒杀优惠券的同时,将优惠券的库存信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
修改新增秒杀券逻辑
修改com.hmdp.service.impl.VoucherServiceImpl的addSeckillVoucher方法,新增一个将库存信息存入redis的操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.hmdp.service.impl;
// ...
import static com.hmdp.utils.RedisConstants.SECKILL_STOCK_KEY;
@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryVoucherOfShop(Long shopId) {
// 不变
}
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存库存到Redis中
// SECKILL_STOCK_KEY ="seckill:stock:"
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
}
|
编写Lua脚本
先编写lua脚本,实现资格判断与库存扣减
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key(在lua脚本中用..拼接)
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户id)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
|
初步改造下单秒杀券业务代码
基于Redis完成秒杀资格判断与库存扣减,修改com.hmdp.service.impl.VoucherOrderServiceImpl的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
package com.hmdp.service.impl;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 我把lua脚本命名为seckill.lua,放在resources目录下
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户id
Long userId = UserHolder.getUser().getId();
// 生成订单id
long orderId = redisIdWorker.nextId("order");
// 执行lua脚本
Long executeResult = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString());
// 判断结果
int intExecuteResult = executeResult.intValue(); // 转为int类型便于后续判断
if (intExecuteResult != 0) {
// 若不为0,则返回错误信息
return Result.fail(intExecuteResult == 1 ? "库存不足" : "无法重复下单");
}
// TODO 保存下单信息到阻塞队列
// 返回订单id
return Result.ok(orderId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 不变
}
}
|
目前只是在Redis里完成了下单的库存扣减与用户id记录,并没有同步到MySQL数据库中,接下来实现这个功能
基于阻塞队列进一步优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
package com.hmdp.service.impl;
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
// 创建阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
// 创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct // 在当前类初始化完成后执行
private void init() {
// 让当前类初始化完成后就开始执行VocherOrderHandler中的run方法
SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());
}
// 编写内部类来执行下单任务
private class VocherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 获取队列中的订单信息
// take()方法在orderTasks中没有元素的时候会阻塞,所以不用担心死循环造成巨大负担
VoucherOrder voucherOrder = orderTasks.take();
// 创建订单
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("处理订单异常:", e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 创建锁对象(再次加锁作为兜底,以防Redis出了某些问题)
RLock lock = redissonClient.getLock("order:" + voucherOrder.getUserId());
// 获取锁
boolean isLock = lock.tryLock(); // 无参时,默认不重试,30s过期
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败,直接返回失败(当前业务是防止一个用户重复下单,所以不用重试)
log.error("不允许重复下单");
}
try {
// 创建订单
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
lock.unlock();
}
}
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户id
Long userId = UserHolder.getUser().getId();
// 生成订单id
long orderId = redisIdWorker.nextId("order");
// 执行lua脚本
Long executeResult = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString());
// 判断结果
int intExecuteResult = executeResult.intValue(); // 转为int类型便于后续判断
if (intExecuteResult != 0) {
// 若不为0,则返回错误信息
return Result.fail(intExecuteResult == 1 ? "库存不足" : "无法重复下单");
}
// 结果为0
// 保存下单信息到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单id
voucherOrder.setId(orderId);
// 设置用户id
voucherOrder.setUserId(userId);
// 设置代金券id
voucherOrder.setVoucherId(voucherId);
// 放入阻塞队列(VocherOrderHandler会从阻塞队列中取出来执行创建订单逻辑)
orderTasks.add(voucherOrder);
// 获取代理对象(只能在这获取,线程池里拿不到)
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单id
return Result.ok(orderId);
}
// createVoucherOrder方法的参数和返回值都修改了,记得接口那里也要改
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId())
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
log.error("当前用户已购买过该秒杀券");
}
// 库存充足,扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0) // 确保不超卖
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
log.error("库存扣减失败");
}
// 创建订单
save(voucherOrder);
}
}
|
小总结:
秒杀业务的优化思路是什么?
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
- 基于阻塞队列的异步秒杀存在哪些问题?
上面的两个问题将在接下来的内容中解决
Redis消息队列
认识消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息

使用消息队列的核心价值在于 解耦 与 异步。
- 耦合的场景:快递员(生产者)必须把包裹亲手交到你(消费者)手上。这种方式要求你们双方必须同时在线,如果此时你不在家,快递员就只能等待,这极大地浪费了他的时间,也降低了整个配送系统的效率。
- 解耦的场景:快递员将包裹放入快递柜(消息队列),然后离开。你可以在任何方便的时候,凭取件码从快递柜中取出包裹。这样,快递员和你就不再需要“同时在线”,双方的工作被有效地分离开来。
将这个思想应用到我们的秒杀系统中,流程就变得非常清晰:
- 快速校验与投递:用户下单后,系统首先利用 Redis 进行快速的资格校验(如库存、限购等)。
- 异步处理:校验通过后,系统会立即将订单信息作为一个“消息”投递到消息队列中,并立刻向用户返回“下单成功”。
- 后台消费:与此同时,一个独立的后台线程(消费者)会持续地从队列中获取订单消息,并从容地完成创建订单、扣减库存等耗时的数据库操作。
通过这种方式,我们将耗时的下单逻辑与用户请求解耦,极大地提升了系统的响应速度和吞吐量。
在实际开发中,我们通常会使用 Kafka、RabbitMQ 等成熟的专业消息队列中间件。为了降低部署与学习成本,也可以使用Redis提供的MQ方案,Redis提供了三种不同的方式来实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
基于List的消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果(B代表Block阻塞)。
基于List的消息队列有哪些优缺点? 优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
基于PubSub的消息队列
Redis Pub/Sub(发布订阅)本质上是一个即时广播系统,就像我们平时听的电台节目一样。
在这个模型里,发布者(Producer)和订阅者(Subscriber)互不认识,他们只需要约定好一个“频道”(Channel)。发布者负责向频道里“喊话”(发送消息),而所有正在“收听”这个频道的订阅者,都能在同一时刻收到这条消息。
Redis 提供了三个最基础的指令来实现这套机制:
- SUBSCRIBE channel:这是订阅者的动作。就像你拿起收音机调频到“新闻台”,一旦订阅,你就会一直监听这个频道,等待消息推送。
- PUBLISH channel msg:这是发布者的动作。就像电台主持人向“新闻台”播报一条新闻,消息发出后,Redis 会立刻把这条消息推送给所有正在收听的人。
- PSUBSCRIBE pattern:这是模糊订阅。如果你不想只听“新闻台”,而是想听所有带“音乐”二字的频道(比如“音乐台”、“经典音乐台”),就可以使用通配符(如
music.*)一次性订阅一类频道。

基于PubSub的消息队列有哪些优缺点? 优点:
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息
发送消息的命令:

例如:

读消息
读取消息的方式之一:XREAD

注意,等待时长如果传入0,将一直等待,直到有人发送消息
并且读取消息后依旧可以再次读取,消息会永久存在,并不会在读取后消失
例如,使用XREAD读取第一个消息:

XREAD阻塞方式,读取最新的消息,$表示最新消息:

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

注意:当我们指定起始ID为$时,代表读取最新的一条消息,如果在处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,最新一条消息与上次读的消息之间的消息就漏掉了,这就出现了漏读消息的问题
XREAD命令特点
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
将在后续的内容中解决消息漏镀的问题
基于Stream的消息队列-消费者组
消费者组简介
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
具备下列特点:

创建消费者组的命令
1
2
3
4
5
6
|
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
|
其它常见命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
# 从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
1.ID传入">",表示从下一个未消费的消息开始
2.ID传入其它,表示根据指定id从pending-list中获取已消费但未确认的消息,例如传入0,就是从pending-list中的第一个消息开始
# 处理后确认消息
XACK key group ID [ID ...]
|
消费者监听消息的基本思路
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
while(true){
// 尝试监听队列,使用阻塞模式,最长等待 2000 毫秒
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null){ // null说明没有消息,继续下一次
continue;
}
try {
// 处理消息,完成后一定要ACK
handleMessage(msg);
} catch(Exception e){
while(true){
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){ // null说明没有异常消息,所有消息都已确认,结束循环
break;
}
try {
// 说明有异常消息,再次处理
handleMessage(msg);
} catch(Exception e){
// 再次出现异常,记录日志,继续循环
continue;
}
}
}
}
|
XREADGROUP命令特点
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
三种方式对比

优化异步秒杀
基于Redis的Stream结构作为消息队列,进一步优化异步秒杀下单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
在redis创建消息队列
在Redis中输入:
1
|
XGROUP CREATE stream.orders g1 0 MKSTREAM
|
修改Lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key(在lua脚本中用..拼接)
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
|
修改秒杀业务
继续优化com.hmdp.service.impl.VoucherOrderServiceImpl中的秒杀券下单业务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
package com.hmdp.service.impl;
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
// 创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct // 在当前类初始化完成后执行
private void init() {
// 让当前类初始化完成后就开始执行VocherOrderHandler中的run方法
SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());
}
// 编写内部类来执行下单任务
private class VocherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 组名g1,消费者为c1
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // 每次读取1条消息,阻塞2秒
StreamOffset.create(queueName, ReadOffset.lastConsumed()) // 读key为“stream.orders”的队列,从未消费的消息开始读取
);
// 判断是否成功获取到消息
if (list == null || list.isEmpty()) {
// 获取失败,说明消息队列中无消息,继续下一次循环
continue;
}
// 获取成功,解析消息中的订单信息
// 因为每次只读一条消息,所以直接拿list中的第一个
MapRecord<String, Object, Object> record = list.get(0);
// 取到的value就是lua脚本中存入的键值对,'userId': userId, 'voucherId': voucherId, 'id': orderId
Map<Object, Object> value = record.getValue();
// 将map对象转成VoucherOrder对象,前面将orderId的key设为'id'而不是'orderId'就是为了这里转换方便
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 创建订单
handleVoucherOrder(voucherOrder);
// ACK确认,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常:", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 获取pending-list中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), // 组名g1,消费者为c1
StreamReadOptions.empty().count(1), // 每次读取1条消息
StreamOffset.create(queueName, ReadOffset.from("0")) // 读pending-list中的第一条消息
);
// 判断是否成功获取到消息
if (list == null || list.isEmpty()) {
// 获取失败,说明pending-list中无异常消息,跳出循环
break;
}
// 获取成功,解析消息中的订单信息
// 因为每次只读一条消息,所以直接拿list中的第一个
MapRecord<String, Object, Object> record = list.get(0);
// 取到的value就是lua脚本中存入的键值对,'userId': userId, 'voucherId': voucherId, 'id': orderId
Map<Object, Object> value = record.getValue();
// 将map对象转成VoucherOrder对象,前面将orderId的key设为'id'而不是'orderId'就是为了这里转换方便
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 创建订单
handleVoucherOrder(voucherOrder);
// ACK确认,SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理pending-list订单异常:", e);
// 如果担心一直处理失败一直循环导致较大性能消耗,可以给一点休眠时间
try {
Thread.sleep(30);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 创建锁对象(再次加锁作为兜底,以防Redis出了某些问题)
RLock lock = redissonClient.getLock("order:" + voucherOrder.getUserId());
// 获取锁
boolean isLock = lock.tryLock(); // 无参时,默认不重试,30s过期
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败,直接返回失败(当前业务是防止一个用户重复下单,所以不用重试)
log.error("不允许重复下单");
}
try {
// 创建订单
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
lock.unlock();
}
}
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户id
Long userId = UserHolder.getUser().getId();
// 生成订单id
long orderId = redisIdWorker.nextId("order");
// 执行lua脚本
Long executeResult = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId));
// 判断结果
int intExecuteResult = executeResult.intValue(); // 转为int类型便于后续判断
if (intExecuteResult != 0) {
// 若不为0,则返回错误信息
return Result.fail(intExecuteResult == 1 ? "库存不足" : "无法重复下单");
}
// 获取代理对象(只能在这获取,线程池里拿不到)
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单id
return Result.ok(orderId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
return Result.fail("当前用户已购买过该秒杀券");
}
// 库存充足,扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0) // 确保不超卖
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
return Result.fail("库存扣减失败");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 设置用户id
voucherOrder.setUserId(userId);
// 设置代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId())
.count();
// 根据查询结果判断该用户是否已下过该秒杀券单
if (count > 0) {
log.error("当前用户已购买过该秒杀券");
}
// 库存充足,扣减库存
boolean isSuccess = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0) // 确保不超卖
.update();
// 如果扣减库存失败则返回错误信息
if (!isSuccess) {
log.error("库存扣减失败");
}
// 创建订单
save(voucherOrder);
}
}
|
达人探店
发布探店笔记
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个: tb_blog:探店笔记表,包含笔记中的标题、文字、图片等 tb_blog_comments:其他用户对探店笔记的评价

这两个接口代码中已经实现了,因为本教程主要是学习redis,所以上传的图片就直接保存在本地,所以需要修改一下上传文件的本地保存位置,找到com.hmdp.utils.SystemConstants,修改其中的IMAGE_UPLOAD_DIR:
1
2
3
4
5
6
7
8
|
package com.hmdp.utils;
public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = "你的nginx所在目录\\nginx-1.18.0\\html\\hmdp\\imgs";
public static final String USER_NICK_NAME_PREFIX = "user_";
public static final int DEFAULT_PAGE_SIZE = 5;
public static final int MAX_PAGE_SIZE = 10;
}
|
查看探店笔记
实现探店笔记查看接口
实现查看发布探店笔记的接口

在com.hmdp.controller.BlogController中新建一个接口:
1
2
3
4
|
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id) {
return blogService.queryBlogById(id);
}
|
实现在IBlogService接口中创建并在其实现类中实现queryBlogById方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package com.hmdp.service.impl;
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Override
public Result queryBlogById(Long id) {
// 获取笔记信息
Blog blog = getById(id);
if (blog == null) {
return Result.fail("笔记不存在");
}
// 获取用户信息
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
return Result.ok(blog);
}
}
|
修改主页热门探店笔记接口
当前com.hmdp.controller.BlogController中的queryHotBlog方法是查询热门笔记,首页显示的笔记就是用这个接口实现的。
当前这个接口把业务写在控制层,不太合适,而且获取用户信息部分的代码和前面的查看探店笔记中逻辑一样,可以封装为一个方法
修改后代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
/*
* com.hmdp.controller.BlogController中的queryHotBlog方法
*/
@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}
/*
* com.hmdp.service.IBlogService接口
*/
package com.hmdp.service;
import com.hmdp.dto.Result;
import com.hmdp.entity.Blog;
import com.baomidou.mybatisplus.extension.service.IService;
public interface IBlogService extends IService<Blog> {
Result queryHotBlog(Integer current);
Result queryBlogById(Long id);
}
/*
* com.hmdp.service.impl.BlogServiceImpl接口实现类
*/
package com.hmdp.service.impl;
// import ...
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(this::queryBlogUser);
return Result.ok(records);
}
@Override
public Result queryBlogById(Long id) {
// 获取笔记信息
Blog blog = getById(id);
if (blog == null) {
return Result.fail("笔记不存在");
}
// 获取用户信息
queryBlogUser(blog);
return Result.ok(blog);
}
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
}
|
点赞功能
业务分析
当前点赞功能代码:
1
2
3
4
5
6
7
|
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
// 修改点赞数量
blogService.update()
.setSql("liked = liked + 1").eq("id", id).update();
return Result.ok();
}
|
问题分析:当前并没有做用户校验,发送请求赞的数量就会+1,这种方式会导致一个用户无限点赞,明显是不合理的,

完善点赞功能
需求:
- 同一个用户只能点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤:
- 给Blog类中添加一个isLike字段,标示是否被当前用户点赞
- 修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
- 修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
- 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
具体实现:
在com.hmdp.entity.Blog中添加isLike字段(原代码里应该已经有了):
1
2
3
4
5
|
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;
|
在com.hmdp.controller.BlogController中修改点赞接口:
1
2
3
4
|
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
|
在IBlogService中创建likeBlog方法,并在其实现类中实现,修改后的com.hmdp.service.impl.BlogServiceImpl:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
package com.hmdp.service.impl;
// import ...
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户 ----------------- 有改动
records.forEach(blog -> {
queryBlogUser(blog);
isBlogLiked(blog);
});
return Result.ok(records);
}
@Override
public Result queryBlogById(Long id) {
// 获取笔记信息
Blog blog = getById(id);
if (blog == null) {
return Result.fail("笔记不存在");
}
// 获取用户信息
queryBlogUser(blog);
// 查询当前blog是否被当前用户点赞 ----------------- 有改动
isBlogLiked(blog);
return Result.ok(blog);
}
@Override
public Result likeBlog(Long id) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 判断当前登录用户是否已点赞
Boolean isLike = stringRedisTemplate.opsForSet().isMember(BLOG_LIKED_KEY + id, userId.toString());
// 如果未点赞,则MySQL中点赞数加1,并保存用户信息到redis的set集合中
if (BooleanUtil.isFalse(isLike)) {
boolean isSuccess = update()
.setSql("liked = liked + 1")
.eq("id", id).update();
if (isSuccess) {
stringRedisTemplate.opsForSet().add(BLOG_LIKED_KEY + id, userId.toString());
}
}
// 如果已点赞,则MySQL中点赞数减1,并从redis的set集合中删除用户信息
if (BooleanUtil.isTrue(isLike)) {
boolean isSuccess = update()
.setSql("liked = liked - 1")
.eq("id", id).update();
if (isSuccess) {
stringRedisTemplate.opsForSet().remove(BLOG_LIKED_KEY + id, userId.toString());
}
}
return Result.ok();
}
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
private void isBlogLiked(Blog blog) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 判断当前登录用户是否已点赞
Boolean isLike = stringRedisTemplate.opsForSet().isMember(BLOG_LIKED_KEY + blog.getId(), userId.toString());
// 不直接传isLike,防止空指针
blog.setIsLike(BooleanUtil.isTrue(isLike));
}
}
|
点赞列表
业务分析
在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞列表
之前的点赞是放到set集合,但是set集合是不能排序的,所以此时可以采用一个可以排序的set集合,就是sortedSet

下图是Redis中List、Set、SortedSet的简单对比:

所有点赞的人,需要是唯一的,所以我们应当使用set或者是sortedSet,其次我们需要排序,所以sortedSet最合适
修改点赞功能
因为把集合从Set改为了SortedSet,两种不同的集合使用方式也不太一样,所以需要修改之前的点赞功能的代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
package com.hmdp.service.impl;
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryHotBlog(Integer current) {
// 不变
}
@Override
public Result queryBlogById(Long id) {
// 不变
}
@Override
public Result likeBlog(Long id) {
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 判断当前登录用户是否已点赞
Double score = stringRedisTemplate.opsForZSet().score(BLOG_LIKED_KEY + id, userId.toString());
// 如果未点赞,则MySQL中点赞数加1,并保存用户信息到redis的set集合中
if (score == null) {
boolean isSuccess = update()
.setSql("liked = liked + 1")
.eq("id", id).update();
if (isSuccess) {
// zadd key value score
stringRedisTemplate.opsForZSet().add(BLOG_LIKED_KEY + id, userId.toString(), System.currentTimeMillis());
}
}
else {
// 如果已点赞,则MySQL中点赞数减1,并从redis的set集合中删除用户信息
boolean isSuccess = update()
.setSql("liked = liked - 1")
.eq("id", id).update();
if (isSuccess) {
stringRedisTemplate.opsForZSet().remove(BLOG_LIKED_KEY + id, userId.toString());
}
}
return Result.ok();
}
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
private void isBlogLiked(Blog blog) {
// 获取当前登录用户
UserDTO user = UserHolder.getUser();
// 若用户未登录,则无需查询是否点赞
if (user == null) return;
// 获取当前用户id
Long userId = user.getId();
// 判断当前登录用户是否已点赞
Double score = stringRedisTemplate.opsForZSet().score(BLOG_LIKED_KEY + blog.getId(), userId.toString());
blog.setIsLike(score != null);
}
}
|
其中score之间存点赞的时间,方便后续实现点赞列表功能
实现点赞列表
实现显示某个笔记的最近几个点赞用户的功能
在com.hmdp.controller.BlogController中创建一个/likes/{id}路径的接口:
1
2
3
4
|
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
return blogService.queryBlogLikes(id);
}
|
实现queryBlogLikes方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package com.hmdp.service.impl;
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryHotBlog(Integer current) {
// 一样
}
@Override
public Result queryBlogById(Long id) {
// 一样
}
@Override
public Result likeBlog(Long id) {
// 一样
}
@Override
public Result queryBlogLikes(Long id) {
// 查询score(点赞时间)前五的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(BLOG_LIKED_KEY + id, 0, 4);
// 若为空则返回空列表
if (top5 == null || top5.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 解析出用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
// 根据用户id查询用户
// 要用 WHERE id IN (x, y, ...) ORDER BY FIELD(id, x, y, ...) 手动指定排序方式,否则查询结果会以id升序排序
List<User> users = userService.query()
.in("id", ids)
// 在sql最后拼接 ORDER BY FIELD(id, x, y, ...)
.last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")")
.list();
// 转为为UserDTO
List<UserDTO> userDTOS = users.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}
private void queryBlogUser(Blog blog) {
// 一样
}
private void isBlogLiked(Blog blog) {
// 一样
}
}
|
好友关注
关注和取消关注
业务分析
可以对其他用户进行关注和取消关注。

实现思路:
需求:基于该表数据结构,实现两个接口:
关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示:
1
2
3
4
5
6
7
8
9
10
|
-- auto-generated definition
create table tb_follow
(
id bigint auto_increment comment '主键' primary key,
user_id bigint unsigned not null comment '用户id',
follow_user_id bigint unsigned not null comment '关联的用户id',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间'
)
collate = utf8mb4_general_ci
row_format = COMPACT;
|
功能实现
先在com.hmdp.controller.FollowController中创建两个接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package com.hmdp.controller;
import com.hmdp.dto.Result;
import com.hmdp.service.IFollowService;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@RestController
@RequestMapping("/follow")
public class FollowController {
@Resource
private IFollowService followService;
/**
* 关注或取关
* @param followUserId 要关注或取关的用户id
* @param isFollow 要关注还是取关,true为关注,false为取关
* @return
*/
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) {
return followService.follow(followUserId, isFollow);
}
/**
* 判断当前用户是否关注了某个用户
* @param followUserId 要判断的用户id
* @return
*/
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId) {
return followService.isFollow(followUserId);
}
}
|
然后在IFollowService接口中创建对应方法,并在其实现类中实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package com.hmdp.service.impl;
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
// 判断当前请求是关注还是取关
if (isFollow) {
// 关注
Follow follow = new Follow();
follow.setUserId(loginUserId);
follow.setFollowUserId(followUserId);
save(follow);
} else {
// 取关 delete from tb_follow where user_id = ? and follow_user_id = ?
remove(new QueryWrapper<Follow>()
.eq("user_id", loginUserId)
.eq("follow_user_id", followUserId));
}
return Result.ok();
}
@Override
public Result isFollow(Long followUserId) {
// 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
// 查询当前登录用户是否关注了某个用户
Integer count = query().eq("user_id", loginUserId)
.eq("follow_user_id", followUserId)
.count();
// 查到了就是关注了,返回true。没查到就是未关注,返回false
return Result.ok(count > 0);
}
}
|
共同关注
前置条件
想要去看共同关注的好友,需要首先进入到这个页面,这个页面会发起两个请求
1、去查询用户的详情
2、去查询用户的笔记

以上两个接口不是本节重点,就不多说了,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// UserController 根据id查询用户
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){
// 查询详情
User user = userService.getById(userId);
if (user == null) {
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
// 返回
return Result.ok(userDTO);
}
// BlogController 根据id查询博主的探店笔记
@GetMapping("/of/user")
public Result queryBlogByUserId(
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam("id") Long id) {
// 根据用户查询
Page<Blog> page = blogService.query()
.eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}
|
业务分析
接下来我们来看看共同关注如何实现:
需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同关注。
可以使用set集合,在set集合中,有交集并集补集的api,我们可以把两人的关注的人分别放入到一个set集合中,然后再通过api去查看这两个set集合中的交集数据。

功能实现
先来改造当前的关注列表,改造原因是我们需要在用户关注了某位用户后,将数据放入到set集合中,方便后续共同关注功能的实现,同时当取消关注时,也需要从set集合中进行删除
修改com.hmdp.service.impl.FollowServiceImpl中的follow方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package com.hmdp.service.impl;
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
// 判断当前请求是关注还是取关
if (isFollow) {
// 关注
Follow follow = new Follow();
follow.setUserId(loginUserId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if (isSuccess) {
// 将关注的用户id放入redis的set集合
stringRedisTemplate.opsForSet().add("follows:" + loginUserId, followUserId.toString());
}
} else {
// 取关 delete from tb_follow where user_id = ? and follow_user_id = ?
boolean isSuccess = remove(new QueryWrapper<Follow>()
.eq("user_id", loginUserId)
.eq("follow_user_id", followUserId));
if (isSuccess) {
// 将取关的用户id从redis的set集合中移除
stringRedisTemplate.opsForSet().remove("follows:" + loginUserId, followUserId.toString());
}
}
return Result.ok();
}
@Override
public Result isFollow(Long followUserId) {
// 不变
}
}
|
然后是实现查看共同关注的用户功能:
在com.hmdp.controller.FollowController中新增路径为/common/{id}的查看共同关注接口:
1
2
3
4
|
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long id) {
return followService.followCommons(id);
}
|
在com.hmdp.service.impl.FollowServiceImpl实现其中的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package com.hmdp.service.impl;
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private IUserService userService;
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 不变
}
@Override
public Result isFollow(Long followUserId) {
// 不变
}
@Override
public Result followCommons(Long id) {
// 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
// 求交集
Set<String> followUserIds = stringRedisTemplate.opsForSet()
.intersect("follows:" + loginUserId, "follows:" + id);
// 判断结果
if (followUserIds == null || followUserIds.isEmpty()) {
// 若没查到结果,则返回空列表
return Result.ok(new ArrayList<>());
}
// 解析查到的id
List<Long> ids = followUserIds.stream().map(Long::valueOf).collect(Collectors.toList());
// 根据id查询用户,并转换为UserDTO类型(只存必要信息),保护数据隐私
List<UserDTO> users = userService.listByIds(ids).stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
// 返回结果
return Result.ok(users);
}
}
|
Feed流
简介
关注推送也叫做Feed流,直译为投喂。为用户持续提供新的信息。
对于传统的模式的内容解锁:需要用户去通过搜索引擎或者是其他的方式去解锁想要看的内容

对于新型的Feed流的的效果:不需要用户自己去搜索信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。

两种模式
Feed流的实现有两种模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
Timeline的实现方案
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

拉模式:也叫做读扩散
该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的发件箱中,假设赵六要读取他自己的收件箱,此时系统会拉取他关注的人发布的信息,然后再进行排序
优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清除。
缺点:延迟较高,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。

推模式:也叫做写扩散。
推模式是没有发件箱的,当张三发送了一个内容后,此时系统会主动把张三写的内容发送到他粉丝的收件箱中,此时张三的粉丝再来读取,就不用再去临时拉取了
优点:时效快,不用临时拉取
缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。
推拉模式是一个折中的方案,在发件人这一端,如果是个普通的人,那么采用写扩散的方式,直接把数据写入到他的粉丝的收件箱中去,因为普通人的粉丝量比较少,所以这样做没有压力。如果是大V,那么先将数据写入一份到他的发件箱里去,然后再写一份到活跃粉丝的收件箱里去。现在站在收件人这端来看,如果是大V的活跃粉丝,那么大V和普通人发的内容都会直接写入到自己的收件箱里边来,而如果是大V的普通粉丝(不常上线),他们关注的普通博主平时依旧直接发送消息到他们的收件箱,由于他们上线不是很频繁,所以等他们上线时,再拉取大V的信息到收件箱中。

关注推送
业务分析
接下来将基于推模式实现关注推送功能
需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱(保存完整数据到数据库,但只需要推送blog的id到粉丝收件箱即可,到时候粉丝根据id查询数据库就会快很多,因为一般id都有索引)
- 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
- 查询收件箱数据时,可以实现分页查询
但Feed流中的数据会不断更新,所以数据的下标也在变化,因此不能采用传统的分页模式。

假设在t1时刻,我们去读取第一页,此时page = 1 ,size = 5,那么我们拿到的就是106这几条记录,假设t2时刻又发布了一条记录,然后到了t3时刻,我们来读取第二页,读取第二页传入的参数是page=2 ,size=5,那么此时读取到的第二页实际上是从6开始,结果是62,那么我们就读取到了重复的数据,所以feed流的分页,不能采用原始方案来做。
所以Feed流应该采用滚动分页的方式来查询,我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据

我们从t1时刻开始,拿第一页数据,拿到了10~6,然后记录下当前最后一次拿取的记录,就是6。t2时刻发布了新的记录,此时这个11放到最顶上,但是不会影响我们之前记录的6,到了t3时刻来拿第二页,还是从6后一位的5去拿,就拿到了5-1的记录。我们这个地方可以采用sortedSet来进行范围查询,每次记录查询到的最小时间戳,下次(下一页)就查询更小的时间戳,就可以实现滚动分页了
修改新增笔记业务
修改com.hmdp.controller.BlogController中的saveBlog方法:
1
2
3
4
|
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
return blogService.saveBlog(blog);
}
|
实现对应方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package com.hmdp.service.impl;
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private IFollowService followService;
@Resource
private StringRedisTemplate stringRedisTemplate;
// ...
@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
boolean isSuccess = save(blog);
if (!isSuccess) {
return Result.fail("发布笔记失败");
}
// 查询当前登录用户的粉丝 select * from tb_follow where follow_user_id = ?
List<Follow> fans = followService.query().eq("follow_user_id", user.getId()).list();
// 推送笔记id给粉丝
fans.forEach(fan -> {
// 获取粉丝的用户id
Long fanId = fan.getUserId();
// 推送笔记id到粉丝收件箱
stringRedisTemplate.opsForZSet()
.add(FEED_KEY + fanId, blog.getId().toString(), System.currentTimeMillis());
});
// 返回id
return Result.ok(blog.getId());
}
// ...
}
|
业务进一步分析
需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息
具体操作如下:
1、每次查询完成后,我们要分析出查询到数据的最小时间戳,这个值会作为下一次查询的条件
2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据
综上:我们的请求参数中就需要携带上一次查询的最小时间戳和偏移量这两个参数。
这两个参数第一次会由前端来指定,以后的查询就根据后端结果作为条件,再次传递到后台。

第一次查询时偏移量是0,因为要从头开始查,但为什么后续查询时需要找到与上一次查询相同的查询个数作为偏移量,而不是直接用1作偏移量呢?
举个例子:

当偏移量固定为1的话,如果有多个重复的score就有可能出现重复查询的情况

如图,m6被查询了两次。所以偏移量应该为上一次查询到的最小时间戳出现的次数,比如这里m7、m6的score都是6,那偏移量就应该为2
实现分页查询收件箱
先定义返回值的实体类:(当前代码中应该已经有了)
1
2
3
4
5
6
7
8
9
10
11
12
|
package com.hmdp.dto;
import lombok.Data;
import java.util.List;
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}
|
然后在com.hmdp.controller.BlogController定义接口:
1
2
3
4
5
6
7
8
9
10
11
12
|
/**
* 查询当前登录用户的收件箱(关注的用户所发布的笔记)
* @param max 本次查询的最大时间(上次查询的最小时间)
* @param offset 偏移量(要跳过的元素个数)
* @return
*/
@GetMapping("/of/follow")
public Result queryBlogOfFollow(
@RequestParam("lastId") Long max,
@RequestParam(value = "offset", defaultValue = "0") Integer offset){
return blogService.queryBlogOfFollow(max, offset);
}
|
然后实现对应方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package com.hmdp.service.impl;
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource
private IFollowService followService;
@Resource
private StringRedisTemplate stringRedisTemplate;
// ...
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
// 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
// 获取当前用户收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
// 参数顺序:K key, double min, double max, long offset, long count
.reverseRangeByScoreWithScores(FEED_KEY + loginUserId, 0, max, offset, 2);
// 非空判断
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 解析数据:blogId、minTime(时间戳)、offset
ArrayList<Long> blogIds = new ArrayList<>(typedTuples.size());
long minTime = 0;
int offsetResult = 1;
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
// 获取blogId
blogIds.add(Long.valueOf(typedTuple.getValue()));
// 获取时间戳
long time = typedTuple.getScore().longValue();
// 时间戳相同,offset+1,否则offset重置为1
if (time == minTime) {
offsetResult++;
} else {
minTime = time;
offsetResult = 1;
}
}
// 根据blogId查询笔记,因为结果是有序的,不能直接用listByIds方法查询,需要指定排序方式
List<Blog> blogs = query()
.in("id", blogIds)
.last("ORDER BY FIELD(id," + StrUtil.join(",", blogIds) + ")")
.list();
// 封装笔记信息
blogs.forEach(blog -> {
// 查询blog有关的用户
queryBlogUser(blog);
// 查询blog是否被当前登录用户点赞
isBlogLiked(blog);
});
// 封装并返回
ScrollResult scrollResult = new ScrollResult();
scrollResult.setList(blogs);
scrollResult.setMinTime(minTime);
scrollResult.setOffset(offsetResult);
return Result.ok(scrollResult);
}
// ...
}
|
附近商户
GEO数据结构的基本用法
简介
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。
Redis中常见的命令有:
- GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
- GEODIST:计算指定的两个点之间的距离并返回
- GEOHASH:将指定member的坐标转为hash字符串形式并返回
- GEOPOS:返回指定member的坐标
- GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
- GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
- GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
练手
- 添加下面几条数据:
- 北京南站( 116.378248 39.865275 )
- 北京站( 116.42803 39.903738 )
- 北京西站( 116.322287 39.893729 )
- 计算北京南站到北京西站的距离
- 搜索天安门( 116.397904 39.909005 )附近10km内的所有火车站,并按照距离升序排序
添加数据:

计算距离:

搜索天安门附近火车站:

导入店铺数据到GEO
业务分析
在首页点击某个频道后,会自动发送一个携带当前坐标的请求

当我们点击美食之后,会出现一系列的商家,商家中可以按照多种排序方式,我们此时关注的是距离,这个地方就需要使用到GEO,向后台传入当前app收集的地址(我们此处是写死的) ,以当前坐标作为圆心,同时绑定相同的店家类型type,以及分页信息,把这几个条件传入后台,后台查询出对应的数据再返回。
现在需要把各个店铺的坐标导入到Redis的GEO中,GEO在redis中的存储结构是一个member和一个经纬度,我们把x和y轴传入到redis的经纬度位置去,但我们不能把所有的数据都放入到member中去,毕竟作为redis是一个内存级数据库,如果存海量数据,redis还是力不从心,所以我们在这个地方存储他的id即可。
但是这个时候还有一个问题,就是在redis中并没有存储店铺的类型,所以我们无法根据type来对数据进行筛选
解决办法:可以按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

导入数据
写个单元测试将MySQL里的坐标导入Redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package com.hmdp;
import com.hmdp.entity.Shop;
import com.hmdp.service.IShopService;
import com.hmdp.utils.RedisIdWorker;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static com.hmdp.utils.RedisConstants.SHOP_GEO_KEY;
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private IShopService shopService;
@Test
void loadShopData() {
// 查询店铺信息
List<Shop> list = shopService.list();
// 将店铺按typeId分组
Map<Long, List<Shop>> shopMap = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
// 按组写入Redis
for (Map.Entry<Long, List<Shop>> entry : shopMap.entrySet()) {
// 获取类型id
Long typeId = entry.getKey();
String key = SHOP_GEO_KEY + typeId;
// 获取同类型的店铺的集合
List<Shop> value = entry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
// 写入redis:GEOADD key 经度 纬度 member
value.forEach(shop -> {
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(), shop.getY())));
// 简单但消耗性能的写法
// stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
});
stringRedisTemplate.opsForGeo().add(key, locations);
}
}
}
|
实现附近商户功能
修改依赖版本
SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要手动指定版本,修改自己的pom.xml文件
主要修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>
|
修改后的完整pom.xml文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hmdp</groupId>
<artifactId>hm-dianping</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hm-dianping</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3</version>
</dependency>
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
<!-- 导入AopContext.currentProxy()所需的 aspectj 依赖 -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!-- 导入redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
|
功能实现
修改com.hmdp.controller.ShopController中的/of/type接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x", required = false) Double x,
@RequestParam(value = "y", required = false) Double y
) {
return shopService.queryShopByType(typeId, current, x, y);
}
|
创建并实现其中的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package com.hmdp.service.impl;
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
// ...
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
// 判断是否需要根据坐标进行查询
if (x == null || y == null) {
// 不需要坐标查询,按类型分页查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}
// 计算分页参数
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
// 查询redis,按照距离排序、分页。结果:shopId、distance
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 5000 WITHDISTANCE
.search(
SHOP_GEO_KEY + typeId,
GeoReference.fromCoordinate(x, y),
new Distance(5000), // 5000米
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs() // 添加参数
.includeDistance() // 需要返回距离,相当于添加WITHDISTANCE
.limit(end)); // 从查询0开始,查询end个(无法指定起始位置)
// 解析出店铺id
if (results == null) {
// 没查到就直接返回
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
// 因为查的是 from 到 end 部分,如果ids.size() <= from,则说明没有下一页数据了
if (list.size() <= from) {
// 没有下一页数据,结束
return Result.ok(Collections.emptyList());
}
List<Long> ids = new ArrayList<>(list.size());
Map<String, Distance> idDistanceMap = new HashMap<>(list.size());
// 截取 from 到 end 部分进行处理
list.stream().skip(from).forEach(result -> {
// 获取店铺id
String shopIdStr = result.getContent().getName();// 这里的name就是存的member,也就是店铺id
ids.add(Long.valueOf(shopIdStr));
// 获取距离
Distance distance = result.getDistance();
idDistanceMap.put(shopIdStr, distance);
});
// 根据店铺id查询店铺数据
List<Shop> shops = query()
.in("id", ids)
.last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")")
.list();
// 封装距离数据到店铺数据中
shops.forEach(shop -> shop.setDistance(idDistanceMap.get(shop.getId().toString()).getValue()));
// 返回结果
return Result.ok(shops);
}
// ...
}
|
用户签到
BitMap
签到功能可以通过MySQL来实现,比如使用下面这张表:
1
2
3
4
5
6
7
8
9
|
CREATE TABLE `tb_sign` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
`year` year(4) NOT NULL COMMENT '签到的年',
`month` tinyint(2) NOT NULL COMMENT '签到的月',
`date` date NOT NULL COMMENT '签到的日期',
`is_backup` tinyint(1) unsigned DEFAULT NULL COMMENT '是否补签',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
在这张表中,用户一次签到,就是一条记录,假如有1000万用户,平均每人每年签到次数为10次,则这张表一年的数据量为1亿条
每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,一个月则最多需要600多字节
这种方案太占用空间了,能否简化一点呢?
可以按月来统计用户签到信息,每天签到记录为1,未签到则记录为0。
把每一个bit位对应当月的每一天,形成了映射关系。用0和1表示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示
Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。

BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT :获取指定位置(offset)的bit值
- BITCOUNT :统计BitMap中值为1的bit位的数量
- BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
- BITOP :将多个BitMap的结果做位运算(与 、或、异或)
- BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
用户签到
业务分析
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
思路:我们可以把年和月作为bitMap的key,然后保存到一个bitMap中,每次签到就到对应的位上把数字从0变成1,只要对应是1,就表明说明这一天已经签到了,反之则没有签到。

功能实现
在com.hmdp.controller.UserController中新建签到接口:
1
2
3
4
5
6
7
8
|
/**
* 用户签到
* @return
*/
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
|
创建对应方法并实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.hmdp.service.impl;
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
// ...
@Override
public Result sign() {
// 获取当前登录用户
Long loginUserId = UserHolder.getUser().getId();
// 获取当前日期
LocalDateTime now = LocalDateTime.now();
// 拼接key,sign:id:日期
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + loginUserId + keySuffix;
// 获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 写入redis,SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true); // true表示1,false表示0
return Result.ok();
}
// ...
}
|
我然后测试这个接口,我现在是16号,所以应该是15个0,然后是1

结果与预期一致,注意要用二进制形式查看,图中我用的工具是Another Redis Desktop Manager
连续签到统计
业务分析
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

Java逻辑代码:获得当前这个月的最后一次签到数据,定义一个计数器,然后不停的向前统计,直到获得第一个非0的数字即可,每得到一个非0的数字计数器+1,直到遍历完所有的数据,就可以获得当前月的签到总天数了
假设今天是16号,那么我们就可以从当前月的第一天开始,统计到当前这一天的位数,今天是16号,那么就是16位,然后统计其中有多少个1即可,可以用如下redis命令:
1
2
|
# u表示转换为无符号十进制数
BITFIELD key GET u[dayOfMonth] 0
|
注意:bitMap返回的数据是10进制,假如返回8,*那么如何知道哪些是0,哪些是1呢?*只需要让得到的10进制数字和1做与运算就可以了,因为与运算时,1只有遇见1才是1,其他数字都是0。把签到结果和1进行与运算,每与一次,就把签到结果向右移动一位,以此类推,就能完成从后往前逐个遍历的效果了。

需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

有用户有时间我们就可以组织出对应的key,此时就能找到这个用户截止这天的所有签到记录,再根据前面提到的方法,就能统计出来他连续签到的次数了
功能实现
在com.hmdp.controller.UserController中新建签到统计接口:
1
2
3
4
5
6
7
8
|
/**
* 连续签到统计
* @return
*/
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}
|
创建对应方法并实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package com.hmdp.service.impl;
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
// ...
@Override
public Result signCount() {
// 获取当前登录用户
Long loginUserId = UserHolder.getUser().getId();
// 获取当前日期
LocalDateTime now = LocalDateTime.now();
// 拼接key,sign:id:日期
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + loginUserId + keySuffix;
// 获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 获取本月截止今天为止的签到数据,拿到被转换为无符号十进制的数值 BITFIELD sign:5:202605 GET u16 0
List<Long> results = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
// 获取从0开始16位的无符号十进制值
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
// 判断结果是否为空
if (results == null || results.isEmpty()) {
// 没有任何签到结果
return Result.ok(0);
}
// 当前情况results只会有一个元素,所以直接get(0)
Long num = results.get(0);
// 非空判断
if (num == null || num == 0) {
return Result.ok(0);
}
// 遍历num的二进制表示,1的个数就是签到天数
int count = 0;
// while (true) {
// // 让这个数字与1做与运算,得到数字的最后一个bit位
// if ((num & 1) == 0) {
// // 如果为0,说明未签到,结束循环
// break;
// } else {
// // 如果为1,说明有签到,计数器加1
// count++;
// }
// // 把数字无符号右移一位,相当于把数字除以2,抛弃最后一位bit位,继续判断下一个bit位
// num = num >>> 1;
// }
// 简洁版
while ((num & 1) != 0) {
count++;
num = num >>> 1;
}
return Result.ok(count);
}
// ...
}
|
扩展-使用bitmap来解决缓存穿透的方案
回顾缓存穿透
回顾一下什么是缓存穿透:这通常是指用户发起的请求,查询的是数据库中根本不存在的数据(Redis中自然也没有)。这种情况很容易被恶意利用,演变成一种针对系统的攻击。
针对缓存穿透,常见的解决方案及其局限性如下:
- 基础校验:例如直接判断 id < 0 并拦截。
- 局限性:如果攻击者使用的是格式正确但数据库中不存在的正数 ID,这种校验就无法生效。
- 缓存空对象:当数据库查询结果为空时,依然将这个空结果写入 Redis。
- 局限性:这种方法只能防止同一个不存在的 ID 重复攻击。如果攻击者每次使用不同的 ID 进行请求,依然会穿透缓存,直接击穿数据库。
更优的解决方案:
我们可以将数据库中所有有效数据的 ID 预先提取出来,存储在一个独立的集合(如 List 或专门的过滤器结构)中。
当用户发起查询请求时,处理流程如下:
- 首先去判断该 ID 是否存在于这个集合中。
- 如果集合中不包含该 ID,说明该数据在数据库中绝对不存在,直接拦截并返回,不再访问 Redis 和数据库。
- 如果集合中包含该 ID,说明这是一次合法的查询请求,则直接放行,按正常流程继续访问缓存或数据库。

bitmap方案
上面的方案面临的挑战: 在实际业务中,主键(ID)往往不是短整型,而是较长的字符串或大整数。早在2011年左右,淘宝的商品总量就已突破10亿。如果继续采用之前“将所有ID存入List集合”的方案,这个List会占用极大的存储空间,给服务器内存带来巨大压力。
优化方案:引入Bitmap(位图) 为了大幅减少存储开销,我们可以将庞大的List数据抽象成一个巨大的Bitmap(位图)。我们不再直接存储ID本身,而是利用哈希思想进行空间映射:
- 数据映射(写入阶段): 通过哈希算法(例如
id % bitmap.size),计算出当前ID在Bitmap中对应的索引位置,并将该位置的值从 0 置为 1。
- 查询拦截(读取阶段): 当用户发起查询时,我们使用相同的哈希算法,计算出该ID在Bitmap中对应的索引位:
- 如果该位的值是
0,则说明该ID绝对不存在,直接拦截请求。
- 如果该位的值是
1,则说明该ID可能存在,放行请求进行后续的缓存或数据库查询。
核心考量:误差率(哈希冲突) 采用这种方式需要重点考虑误差率的问题。由于Bitmap的空间是有限的,不同的ID经过哈希计算后可能会映射到同一个索引位置(即发生哈希冲突)。这会导致一种情况:某个ID实际上并不存在,但因为其他ID占用了它对应的位(将其置为了1),系统会误判该ID“可能存在”。这种误判的概率,就是我们所说的误差率。
著名的布隆过滤器就是“Bitmap + 哈希思想”方案的深度优化和标准化实现,不过它并不是简单地把 id % bitmap.size 算出一个位置,而是采用了**“一个超长的位数组(Bit Array) + 多个相互独立的哈希函数”**的组合方案,显著降低了哈希冲突发生的概率。
UV统计
HyperLogLog
首先我们搞懂两个概念:
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
通常来说UV会比PV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素,所以我们只是单纯的把这两个值作为一个参考值
UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那该怎么处理呢?
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0 Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低得令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。
Redis中的命令如下:

测试百万数据的统计
测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Test
void testHyperLogLog() {
// 准备数组,装用户数据
String[] users = new String[1000];
// 数组角标
int index = 0;
for (int i = 1; i <= 1000000; i++) {
// 赋值
users[index++] = "user_" + i;
// 每1000条发送一次
if (i % 1000 == 0) {
index = 0;
stringRedisTemplate.opsForHyperLogLog().add("hll1", users);
}
}
// 统计数量
Long size = stringRedisTemplate.opsForHyperLogLog().size("hll1");
System.out.println("size = " + size);
}
|
输出结果:

经过测试:可以发现它的误差是在允许范围内,并且内存占用极小