项目缓存数据的设计
一. 用户的基本信息数据
user_1 user_2 user_3
都保存到redis中一条 X(不采用)
users -> hash {
1: user_1_cache_data,
2: user_2_cache_data
}
users -> list [
user_1_cache_data, user_2_cache_data
]
users -> zset {
值 分数 user_id
user_1_cache_data 1
user_2_cache_data 2
}
user_id =1
- 从数据的存储角度考虑 可以使用
hash
方便 - 从有效期的角度考虑,如果放到一个redis记录中,只能有一个有效期,不是我们的需求,
综合来说,不使用多条缓存放到一个redis记录中
每个用户一条缓存记录
user1 user2 user3
redis键 值
user:{user_id} -> hash
user:1 -> {
name: python
photo: xx
mobile: xxx
}
user:{user_id} -> string
user:1 -> json.dumps({}) pickle
二、 用户关注列表的缓存
user1 -> user2 user3 user4
redis key 值
user:{user_id}:following list [user_3_id,user_2_id']
zset {
value score 时间戳 1557986157.1353633
user_3_id update_timestamp 关注的时间
user_2_id update_timestamp
}
不是字符串的原因,是要考虑到应用程序可能分页获取
redis 性能1s 内可以执行 10000+ 读操作
mysql 大概0.12s 查询一次
优先选择zset
,冲分利用score
分数的价值,可以排序+过滤
三. 持久保存中的 统计数据
- 发布数量
- 关注数量
- 粉丝数量
- 点赞数量
redis key 值
user:{user_id}:count hash -> {
'article_count': 12w
'folloing_count': 51
"fans_count": 629w
}
MIS后台管理系统:
文章数量有多到少的用户 显示出来 top10
关注数量由多到少的用户显示 top100
redis key redis值
count:user:articles zset -> [
值 score
user_id_1 12w
user_id_2 11w
]
count:user:following zset -> [
值 score
user_id_1 12w
user_id_2 11w
]
四、## 缓存雪崩的解决方案
- 加锁方式
setnx -> set if not exist
def get_cache_data():
# 尝试获取锁
lock_flag = False
while not lock_flag:
lock_flag = redis.setnx('article:1:info:lock') // 抢到锁,没抢到则等待, 并发变串行
if lock_flag:
ret = redis.get('article:1:info')
if not ret:
ret = db.query.all()
redis.set(ret)
redis.delete('article:1:info:lock') // 释放锁 ,别人才能抢到
return ret
- 使用队列方式
可以使用celery
进行操作,——>生产者消费者
queue = []
def get_cache_data():
ret = redis.get('article:1:info')
if not ret:
ret = db.query.all()
redis.set(ret)
return ret
queue.append(get_cache_data, get_cache_data, get_cache_data)
# 队列的消费进程
func = queue.pop()
func()
五、缓存的编写
以用户资料缓存数据为例
class UserCache(object):
redis_key = ''user:1:profile'
def get_user_cache_data() 查询
redis.get(redis_key)
def clear_user_cache_data() 删除
redis.delete(redis_key)
def exists_user_id() 判断是否存在
redis.get(redis_key)
POST /followings/<user_id> user_id=1
类的构造
- 确定是否要保存数据,即数据选择类属性还是对象属性
- 选择方法
- 如果要构造的方法中需要处理对象属性和类属性,选择对象方法
- 如果仅需要处理类属性,选择类方法,既可以通过类名也可以通过cls处理类属性
@classmethod
def func(cls, ...)
- 如果在方法中不处理类属性与对象数据,仅从逻辑角度考虑应该封装到一起,则选择静态方法,通过类名可以处理类型属性
UserCache.key
@staticmethod
def func()
- 类属性
-> 所有对象共享类属性,也就是说所有对象中读取出的类属性 数据值都相同 - 对象属性
-> 每个对象单独所有,也就意味着每个对象的数据可以不同
python中构建对象属性的方法: __init__()
项目代码:
from flask import current_app
import json
from sqlalchemy.orm import load_only
import random
from redis.exceptions import RedisError
from sqlalchemy.exc import DatabaseError
from models.user import User
from . import constants
# user1 UserCache(1) -> key='user:1:profile' get
# user2 UserCache(2) -> key='user:2:profile' get
class UserCache(object):
"""
用户基本信息缓存工具类
"""
def __init__(self, user_id):
# redis数据记录的key 键
self.key = 'user:{}:profile'.format(user_id)
self.user_id = user_id
# 类属性
# -> 所有对象共享类属性,也就是说所有对象中读取出的类属性 数据值都相同
# 对象属性
# -> 每个对象单独所有,也就意味着每个对象的数据可以不同
# 实例方法 (对象方法)
# 类方法
# 静态方法
# 选择的依据:
# 不同方法可以处理的类中的数据是不一样的
# 对象方法-> 可以处理对象属性、类属性
# 类方法-> 既可以通过类名也可以通过cls处理类属性
# @classmethod
# def func(cls, ...)
# 静态方法 -> 通过类名可以处理类型属性 UserCache.key
# @staticmethod
# def func()
def save(self):
"""
保存缓存记录
:return:
"""
r = current_app.redis_cluster
try:
user = User.query.options(load_only(
User.mobile,
User.name,
User.profile_photo,
User.introduction,
User.certificate
)).filter_by(id=self.user_id).first()
except DatabaseError as e:
current_app.logger.error(e)
# 对于这个数据库异常,我们自己封装的get方法无法为调用者做决定,决定返回什么值,所以抛出异常给调用者,由调用者决定
raise e
# 在django中 查询单一对象,而数据库不存在,抛出异常 User.DoesNotExists
# 在sqlalchemy中,查询单一对象,数据库不存爱,不抛出异常,只返回None
if user is None:
# 数据库不存在
try:
r.setex(self.key, constants.UserNotExistCacheTTL.get_value(), -1)
except RedisError as e:
current_app.logger.error(e)
return None
else:
user_dict = {
'mobile': user.mobile,
'name': user.name,
'photo': user.profile_photo,
'intro': user.introduction,
'certi': user.certificate
}
try:
r.setex(self.key, constants.UserCacheDataTTL.get_value(), json.dumps(user_dict))
except RedisError as e:
current_app.logger.error(e)
return user_dict
def get(self):
"""
获取缓存数据
:return: user dict
"""
# 先查询redis缓存记录
# 如果有记录 直接返回
# 如果没有记录,查询数据库
# 数据库中如果有记录,设置redis记录 json string
# 数据库中如果没有记录,设置redis保存不存在的记录 -1
# 返回
r = current_app.redis_cluster
try:
ret = r.get(self.key)
except RedisError as e:
# 记录日志
current_app.logger.error(e)
# 在redis出现异常的时候,为了保证我们封装的get方法还能有返回值,可以进入数据库查询的部分
ret = None
if ret is not None:
# 表示redis中有记录值
# 判断redis记录是表示数据库不存在的-1值还是有意义的缓存记录
# 切记: python3 从redis中取出的字符串数据是python中的bytes
if ret == b'-1':
return None
else:
# json.loads方法可以接受bytes类型
user_dict = json.loads(ret)
return user_dict
else:
return self.save()
def clear(self):
"""
清除缓存记录
:return:
"""
try:
r = current_app.redis_cluster
r.delete(self.key)
except RedisError as e:
current_app.logger.error(e)
def determine_user_exists(self):
"""
通过缓存判断用户id是否存在
:return: boolean True:存在 False:不存在
"""
# 查询redis
# 如果存在Redis记录
# 如果redis记录为-1,表示不存在
# 如果redis记录不为-1, 表示用户存在
# 如果不存在redis记录
# 去数据库查询,判断是否存在
# 设置redis缓存记录
r = current_app.redis_cluster
try:
ret = r.get(self.key)
except RedisError as e:
# 记录日志
current_app.logger.error(e)
# 在redis出现异常的时候,为了保证我们封装的get方法还能有返回值,可以进入数据库查询的部分
ret = None
if ret is not None:
# 表示redis中有记录值
# 判断redis记录是表示数据库不存在的-1值还是有意义的缓存记录
# 切记: python3 从redis中取出的字符串数据是python中的bytes
if ret == b'-1':
return False
else:
return True
else:
ret = self.save()
if ret is not None:
return True
else:
return False
python3 从redis中取出的字符串数据是python中的bytes类型
小技巧: 两者比较时,没有必要非把查询到的bytes
类型数据转成string或者其他类型,也可以将待比较的string类型加 b 转换成bytes类型
if ret == b'-1':
'-1'.encode() ---> b'-1'
json.loads
方法可以接受bytes类型
user_dict = json.loads(ret)
缓存数据有效期的类
用到了类方法,因为这些类属性,每个user都是共用的,不需要变化
import random
class CacheTTLBase(object):
"""
缓存数据有效期的父类
"""
TTL = 2 * 60 * 60 # 有效期的基准值
MAX_DELTA = 10 * 60 # 有效期的最大随机偏差值
@classmethod
def get_value(cls):
return cls.TTL + random.randrange(0, cls.MAX_DELTA)
# # 用户基本信息的有效期, 秒
# USER_CACHE_DATA_TTL = 2 * 60 * 60
#
# def get_value():
# return USER_CACHE_DATA_TTL + random.randrange(0, 600)
class UserCacheDataTTL(CacheTTLBase):
"""
用户基本信息的有效期
"""
pass
class UserNotExistCacheTTL(CacheTTLBase):
"""
用户不存在缓存的有效期
"""
TTL = 5 * 60
MAX_DELTA = 60
class ArticleCacheDataTTL(CacheTTLBase):
"""
文章基本信息的有效期
"""
TTL = 60 * 60
MAX_DELTA = 5 * 60
接口示例
定义获取当前用户信息的接口
GET /v1_0/user
返回JSON
在toutiao/resources/user/__init__.py
中定义路由
user_api.add_resource(profile.CurrentUserResource, '/v1_0/user', endpoint='CurrentUser')
在toutiao/resources/ user/profile.py
中
class CurrentUserResource(Resource):
"""
用户自己的数据
"""
method_decorators = [login_required]
def get(self):
"""
获取当前用户自己的数据
"""
user_data = cache_user.UserProfileCache(g.user_id).get()
user_data['id'] = g.user_id
return user_data
六、项目Redis持久存储实现
common/cache/statistic.py
将统计数据,例如:点赞数量、评论数量、发表文章数量等的统计
from flask import current_app
from redis.exceptions import ConnectionError
# class UserArticleCountStorage(object):
# """
# 用户文章数量redis存储工具类
# """
#
# # 在redis中一条记录保存了所有用户的而文章数量
# # 'count:user:arts' redis zset
# # [
# # 值 score
# # user_id 文章数量
# # (user_id_1, 12w),
# # (user_id_3, 10)
# # ]
#
# key = 'count:user:arts'
#
# # 方式一
# # cache_static.UserArticleCountStorage(user_id).get()
# # def __init__(self, user_id):
# # self.user_id = user_id
# #
# # def get(self):
# # pass
#
# # 方式二
# # cache_static.UserArticleCountStorage.get(user_id)
#
# @classmethod
# def get(cls, user_id):
# # 查询redis记录
# # 如果redis存在记录
# # 返回
# # 如果redis不存在记录,则返回0,表示用户没有发表过文章
# try:
# count = current_app.redis_master.zscore(cls.key, user_id)
# except ConnectionError as e:
# current_app.logger.error(e)
# count = current_app.redis_slave.zscore(cls.key, user_id)
#
# if count is None:
# return 0
# else:
# return int(count)
#
# @classmethod
# def increment(cls, user_id, incr_num=1):
# try:
# current_app.redis_master.zincrby(cls.key, user_id, incr_num)
# except ConnectionError as e:
# current_app.logger.error(e)
# raise e
class CountStorageBase(object):
"""
统计数量存储的父类
"""
key = ''
@classmethod
def get(cls, user_id):
# 查询redis记录
# 如果redis存在记录
# 返回
# 如果redis不存在记录,则返回0,表示用户没有发表过文章
try:
count = current_app.redis_master.zscore(cls.key, user_id)
except ConnectionError as e:
current_app.logger.error(e)
count = current_app.redis_slave.zscore(cls.key, user_id) //从库里再查一次
if count is None:
return 0
else:
return int(count)
@classmethod
def increment(cls, user_id, incr_num=1):
try:
current_app.redis_master.zincrby(cls.key, user_id, incr_num)
except ConnectionError as e:
current_app.logger.error(e)
raise e
class UserArticleCountStorage(CountStorageBase):
"""
用户文章数量redis存储工具类
"""
key = 'count:user:arts'
class UserFollowingCountStorage(CountStorageBase):
"""
用户关注数量
"""
key = 'count:user:followings'
# POST /articles
# 视图 1. 保存新文章数据库数据
# 2. UserArticleCountStorage.increment(user_id, 1)
# 并不能一直保证数据库中保存的文章数量与redis中保存的统计数量是相同
# 需要使用定时任务,定时核查redis中的数量是否与数据库中数据一致
# 方式 构造定时任务
# 定时任务: 查询数据库,取出分组聚合数据,将数据设置到redis中
session、cookie、jwt
前后端不分离 选session
前后端分离 jwt 因为不止有web端 有可能有app端