1 功能目标
JWT 用于业务请求认证
Refresh Token 用于续期 JWT(长会话)
防重放:JWT 的 jti 存 Redis 防止重复使用
单点登录(Single Sign-On, SSO):一用户一个有效 refresh token,踢掉旧设备
2 数据结构设计
Redis Key 设计
auth:used_jti:{user_id}:{jti} # 防重放
auth:refresh_token:{user_id} # 当前 refresh token,有效期长
auth:sso_session:{user_id} # 当前 sso 会话标记(可选,防多设备)
3 代码示例
jwt_utils.py
import jwt
import uuid
import time
SECRET_KEY = 'my_super_secret_key'
JWT_EXPIRE_SECONDS = 300 # 5分钟
REFRESH_EXPIRE_SECONDS = 86400 * 7 # 7天
def generate_jwt(user_id):
now = int(time.time())
payload = {
'sub': str(user_id),
'iat': now,
'exp': now + JWT_EXPIRE_SECONDS,
'jti': str(uuid.uuid4())
}
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
return token
def decode_jwt(token):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
refresh_token_utils.py
import uuid
import time
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
REFRESH_TOKEN_NAMESPACE = "auth:refresh_token"
def build_refresh_token_key(user_id):
return f"{REFRESH_TOKEN_NAMESPACE}:{user_id}"
def generate_refresh_token(user_id):
refresh_token = str(uuid.uuid4())
redis_key = build_refresh_token_key(user_id)
redis_client.setex(redis_key, 86400 * 7, refresh_token) # 7天
return refresh_token
def validate_refresh_token(user_id, refresh_token):
redis_key = build_refresh_token_key(user_id)
stored_token = redis_client.get(redis_key)
if not stored_token:
raise ValueError("No refresh token stored or expired")
if stored_token.decode() != refresh_token:
raise ValueError("Invalid refresh token")
return True
replay_protection.py
import redis
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
JTI_NAMESPACE = "auth:used_jti"
def build_jti_key(user_id, jti):
return f"{JTI_NAMESPACE}:{user_id}:{jti}"
def check_and_store_jti(user_id, jti, exp_timestamp):
redis_key = build_jti_key(user_id, jti)
ttl = exp_timestamp - int(time.time())
if ttl <= 0:
raise ValueError("Token already expired")
result = redis_client.set(redis_key, 1, nx=True, ex=ttl)
if result is None:
raise ValueError("Replay attack detected: token already used")
else:
return True
main.py
from auth.jwt_utils import generate_jwt, decode_jwt
from auth.replay_protection import check_and_store_jti
# 生成 token
token = generate_jwt(user_id='user_1234', ttl_seconds=300)
print("Generated token:", token)
# 客户端发请求,服务端处理验证:
try:
payload = decode_jwt(token)
user_id = payload['sub']
jti = payload['jti']
exp_timestamp = payload['exp']
# 防重放检查
check_and_store_jti(user_id, jti, exp_timestamp)
# 通过,正常业务处理
print("Token valid and first use, process business logic.")
except ValueError as e:
print("Token verification failed:", str(e))
4 单点登录(SSO)
用户只能在一个设备登录(踢掉旧设备)
SSO_SESSION_NAMESPACE = "auth:sso_session"
def build_sso_key(user_id):
return f"{SSO_SESSION_NAMESPACE}:{user_id}"
def store_sso_session(user_id, session_id):
redis_key = build_sso_key(user_id)
redis_client.setex(redis_key, 86400 * 7, session_id)
def validate_sso_session(user_id, session_id):
redis_key = build_sso_key(user_id)
stored_session_id = redis_client.get(redis_key)
if not stored_session_id:
raise ValueError("SSO session expired")
if stored_session_id.decode() != session_id:
raise ValueError("SSO session conflict (another device logged in)")
return True
登录成功后,session_id 存 Redis,token 里带 session_id,每次校验对齐。
5 综合使用流程
登录接口
- 验证账号密码
- 生成 JWT、Refresh Token、SSO Session
- 返回给客户端:
{
"access_token": "JWT",
"refresh_token": "refresh_token",
"session_id": "session_id"
}
API 请求流程
- 客户端带 JWT + session_id
- 服务端:
payload = decode_jwt(token)
check_and_store_jti(payload['sub'], payload['jti'], payload['exp'])
validate_sso_session(payload['sub'], payload.get('session_id'))
# 通过后执行业务逻辑
Refresh Token 刷新流程
- 客户端请求刷新 token,带 refresh_token
- 服务端:
validate_refresh_token(user_id, refresh_token)
new_jwt = generate_jwt(user_id)
防重放 + SSO 设计总结
安全设计点 | 对应做法 |
JWT 防重放 | jti + Redis |
Refresh Token | Redis 存储 + 校验 |
SSO 防多设备登录 | session_id + Redis 对比 |
Token 过期策略 | JWT 短期 5-15 min,Refresh 长期 7-14 天 |
自动踢下线 | 新登录刷新 session_id |
客户端什么时候刷新 token
分场景讲:
1 JWT 本质是短期 token
- 例如 JWT_EXPIRE_SECONDS = 5分钟
- 为什么?为了减少被盗用窗口,提升安全性
- 所以需要 定期刷新
2 刷新 token 典型策略
策略 1:到期前自动刷新(最推荐)
- 客户端维护 JWT 过期时间
- 每次请求前判断:
- 如果 token 将在 N 秒后过期(例如 30 秒),提前刷新
- 优点:
- 用户体验无感知,不会突然 token 失效
- 减少请求过程中遇到 token 失效的错误
策略 2:接口 401 刷新
- 如果请求 API 返回 401(Token Expired):
- 自动用 Refresh Token 请求新 JWT
- 重新请求原 API
- 缺点:
- 体验略差,用户可能看到轻微延迟
- 但代码实现简单
策略 3:定时后台刷新
- 客户端启动后 定时器轮询刷新 token
- 例如每 3 分钟刷新一次 JWT
- 优点:
- 不依赖业务请求
- 缺点:
- 增加后台轮询请求,适合 App 场景,不适合 Web 前端
典型流程图
+------------------+ +-----------------------+
| Client | | Server |
+------------------+ +-----------------------+
| |
| --- Request with JWT --------------> |
| |
| <--- 200 OK / 401 Expired -----------|
| |
if JWT expiring soon or 401:
| --- Refresh Token Request ---------> |
| |
| <--- New JWT + Optional Refresh ---- |
| |
| --- Re-send Original API ----------> |
| |
综合建议
Web 前端 (Vue/React/SPA)
- 推荐 策略 1 + 2
- 请求拦截器(axios interceptors)
- 自动判断 JWT 是否快过期
- 401 自动刷新重试
App 端 (iOS/Android)
- 推荐 策略 1 + 3
- App 启动后定时后台刷新 token
- 网络请求同样提前判断 JWT 是否快过期
Websocket 场景
- 初始连接时带上 JWT
- 后端定时推送 "Token Expire Soon" 通知
- 客户端收到后用 Refresh Token 主动换新 JWT + 重连 Websocket
场景 | 推荐策略组合 |
Web API 调用 | 策略 1 + 策略 2 |
App 长会话 | 策略 1 + 策略 3 |
Websocket 长连接 | 后端通知 + 主动刷新 |