当前位置: 首页>后端>正文

Thymeleaf 缓存redis flask使用redis缓存

项目缓存数据的设计

一. 用户的基本信息数据
  1. 多个用户的数据库记录是保存在redis中的一条还是多条?——>多条
    多条数据缓存放到多个redis记录中
  2. 字符串 or 复合型 ?
user_1  user_2 user_3

都保存到redis中一条 X(不采用)

users -> hash {
    1: user_1_cache_data,
    2: user_2_cache_data
}

users -> list [
    user_1_cache_data, user_2_cache_data
]

users -> zset {
    值                    分数 user_id
    user_1_cache_data      1
    user_2_cache_data      2
}
user_id =1
  • 从数据的存储角度考虑 可以使用hash 方便
  • 从有效期的角度考虑,如果放到一个redis记录中,只能有一个有效期,不是我们的需求
    综合来说,不使用多条缓存放到一个redis记录中

每个用户一条缓存记录

user1 user2 user3
redis键				值
user:{user_id} -> hash
user:1  ->  {
    name: python
    photo: xx
    mobile: xxx
}

user:{user_id}  ->  string
user:1 -> json.dumps({})  pickle
二、 用户关注列表的缓存
user1 -> user2 user3 user4
redis key                    值
user:{user_id}:following   list [user_3_id,user_2_id']
                           zset {
                               value      score  时间戳 1557986157.1353633
                               user_3_id   update_timestamp 关注的时间 
                               user_2_id   update_timestamp  
                           }

不是字符串的原因,是要考虑到应用程序可能分页获取

redis 性能1s 内可以执行 10000+ 读操作
mysql 大概0.12s 查询一次

优先选择zset,冲分利用score分数的价值,可以排序+过滤

三. 持久保存中的 统计数据
  • 发布数量
  • 关注数量
  • 粉丝数量
  • 点赞数量
redis key               值
user:{user_id}:count    hash -> {
	                      'article_count': 12w
	                      'folloing_count': 51
	                      "fans_count": 629w
						}

MIS后台管理系统:

文章数量有多到少的用户 显示出来 top10

关注数量由多到少的用户显示 top100

redis key    					 redis值
count:user:articles    zset -> [
    						值			score
    						user_id_1     12w
    						user_id_2    11w
					  ]
        
count:user:following    zset -> [
    						值			score
    						user_id_1     12w
    						user_id_2    11w
					  ]

四、## 缓存雪崩的解决方案

  1. 加锁方式 setnx -> set if not exist
def get_cache_data():
    # 尝试获取锁
    lock_flag = False
    while not lock_flag:
        lock_flag = redis.setnx('article:1:info:lock')  // 抢到锁,没抢到则等待,  并发变串行
    	if lock_flag:
    		ret = redis.get('article:1:info')
    		if not ret:
        		ret = db.query.all()
        		redis.set(ret)
            redis.delete('article:1:info:lock')		// 释放锁 ,别人才能抢到
    return ret
  1. 使用队列方式
    可以使用celery 进行操作,——>生产者消费者
queue = []

def get_cache_data():
    ret = redis.get('article:1:info')
    if not ret:
       	ret = db.query.all()
       	redis.set(ret)
    return ret   
    
queue.append(get_cache_data, get_cache_data, get_cache_data)

# 队列的消费进程
func = queue.pop()
func()
五、缓存的编写

以用户资料缓存数据为例

class UserCache(object):

	redis_key = ''user:1:profile'
	        
	def get_user_cache_data()  查询
		redis.get(redis_key)
	    
	def clear_user_cache_data()  删除
		redis.delete(redis_key)
	    
	def exists_user_id() 判断是否存在
		redis.get(redis_key)
POST  /followings/<user_id>  user_id=1
类的构造
  1. 确定是否要保存数据,即数据选择类属性还是对象属性
  2. 选择方法
  • 如果要构造的方法中需要处理对象属性和类属性,选择对象方法
  • 如果仅需要处理类属性,选择类方法既可以通过类名也可以通过cls处理类属性
@classmethod
def func(cls, ...)
  • 如果在方法中不处理类属性与对象数据,仅从逻辑角度考虑应该封装到一起,则选择静态方法,通过类名可以处理类型属性 UserCache.key
@staticmethod
  def func()
  • 类属性
    -> 所有对象共享类属性,也就是说所有对象中读取出的类属性 数据值都相同
  • 对象属性
    -> 每个对象单独所有,也就意味着每个对象的数据可以不同

python中构建对象属性的方法: __init__()

项目代码:

from flask import current_app
import json
from sqlalchemy.orm import load_only
import random
from redis.exceptions import RedisError
from sqlalchemy.exc import DatabaseError
from models.user import User
from . import constants

# user1  UserCache(1) -> key='user:1:profile' get
# user2  UserCache(2) -> key='user:2:profile' get

class UserCache(object):
    """
    用户基本信息缓存工具类
    """

    def __init__(self, user_id):
        # redis数据记录的key 键
        self.key = 'user:{}:profile'.format(user_id)
        self.user_id = user_id

    # 类属性
    #   -> 所有对象共享类属性,也就是说所有对象中读取出的类属性 数据值都相同
    # 对象属性
    #  -> 每个对象单独所有,也就意味着每个对象的数据可以不同

    # 实例方法 (对象方法)
    # 类方法
    # 静态方法
    # 选择的依据:
    # 不同方法可以处理的类中的数据是不一样的
    #  对象方法-> 可以处理对象属性、类属性
    #  类方法-> 既可以通过类名也可以通过cls处理类属性
    #     @classmethod
    #     def func(cls, ...)
    # 静态方法  -> 通过类名可以处理类型属性 UserCache.key
    #    @staticmethod
    #    def func()

    def save(self):
        """
        保存缓存记录
        :return:
        """
        r = current_app.redis_cluster
        try:
            user = User.query.options(load_only(
                User.mobile,
                User.name,
                User.profile_photo,
                User.introduction,
                User.certificate
            )).filter_by(id=self.user_id).first()
        except DatabaseError as e:
            current_app.logger.error(e)
            # 对于这个数据库异常,我们自己封装的get方法无法为调用者做决定,决定返回什么值,所以抛出异常给调用者,由调用者决定
            raise e

        # 在django中 查询单一对象,而数据库不存在,抛出异常  User.DoesNotExists
        # 在sqlalchemy中,查询单一对象,数据库不存爱,不抛出异常,只返回None
        if user is None:
            # 数据库不存在
            try:
                r.setex(self.key, constants.UserNotExistCacheTTL.get_value(), -1)
            except RedisError as e:
                current_app.logger.error(e)

            return None
        else:
            user_dict = {
                'mobile': user.mobile,
                'name': user.name,
                'photo': user.profile_photo,
                'intro': user.introduction,
                'certi': user.certificate
            }
            try:
                r.setex(self.key, constants.UserCacheDataTTL.get_value(), json.dumps(user_dict))
            except RedisError as e:
                current_app.logger.error(e)

            return user_dict


    def get(self):
        """
        获取缓存数据
        :return: user dict
        """
        # 先查询redis缓存记录
        # 如果有记录 直接返回
        # 如果没有记录,查询数据库
        #      数据库中如果有记录,设置redis记录  json string
        #      数据库中如果没有记录,设置redis保存不存在的记录 -1
        # 返回
        r = current_app.redis_cluster
        try:
            ret = r.get(self.key)
        except RedisError as e:
            # 记录日志
            current_app.logger.error(e)
            # 在redis出现异常的时候,为了保证我们封装的get方法还能有返回值,可以进入数据库查询的部分
            ret = None

        if ret is not None:
            # 表示redis中有记录值
            # 判断redis记录是表示数据库不存在的-1值还是有意义的缓存记录
            # 切记: python3 从redis中取出的字符串数据是python中的bytes
            if ret == b'-1':
                return None
            else:
                # json.loads方法可以接受bytes类型
                user_dict = json.loads(ret)
                return user_dict
        else:
            return self.save()

    def clear(self):
        """
        清除缓存记录
        :return:
        """
        try:
            r = current_app.redis_cluster
            r.delete(self.key)
        except RedisError as e:
            current_app.logger.error(e)

    def determine_user_exists(self):
        """
        通过缓存判断用户id是否存在
        :return: boolean True:存在  False:不存在
        """
        # 查询redis
        # 如果存在Redis记录
        #   如果redis记录为-1,表示不存在
        #   如果redis记录不为-1, 表示用户存在

        # 如果不存在redis记录
        #   去数据库查询,判断是否存在
        #   设置redis缓存记录
        r = current_app.redis_cluster
        try:
            ret = r.get(self.key)
        except RedisError as e:
            # 记录日志
            current_app.logger.error(e)
            # 在redis出现异常的时候,为了保证我们封装的get方法还能有返回值,可以进入数据库查询的部分
            ret = None

        if ret is not None:
            # 表示redis中有记录值
            # 判断redis记录是表示数据库不存在的-1值还是有意义的缓存记录
            # 切记: python3 从redis中取出的字符串数据是python中的bytes
            if ret == b'-1':
                return False
            else:
                return True
        else:
            ret = self.save()
            if ret is not None:
                return True
            else:
                return False

python3 从redis中取出的字符串数据是python中的bytes类型
小技巧: 两者比较时,没有必要非把查询到的bytes类型数据转成string或者其他类型,也可以将待比较的string类型加 b 转换成bytes类型

if ret == b'-1':

'-1'.encode() ---> b'-1'

json.loads方法可以接受bytes类型

user_dict = json.loads(ret)

缓存数据有效期的类
用到了类方法,因为这些类属性,每个user都是共用的,不需要变化

import random


class CacheTTLBase(object):
    """
    缓存数据有效期的父类
    """
    TTL = 2 * 60 * 60  # 有效期的基准值
    MAX_DELTA = 10 * 60  # 有效期的最大随机偏差值

    @classmethod
    def get_value(cls):
        return cls.TTL + random.randrange(0, cls.MAX_DELTA)


# # 用户基本信息的有效期, 秒
# USER_CACHE_DATA_TTL = 2 * 60 * 60
#
# def get_value():
#     return USER_CACHE_DATA_TTL + random.randrange(0, 600)

class UserCacheDataTTL(CacheTTLBase):
    """
    用户基本信息的有效期
    """
    pass


class UserNotExistCacheTTL(CacheTTLBase):
    """
    用户不存在缓存的有效期
    """
    TTL = 5 * 60
    MAX_DELTA = 60


class ArticleCacheDataTTL(CacheTTLBase):
    """
    文章基本信息的有效期
    """
    TTL = 60 * 60
    MAX_DELTA = 5 * 60
接口示例

定义获取当前用户信息的接口

GET /v1_0/user

返回JSON

toutiao/resources/user/__init__.py中定义路由

user_api.add_resource(profile.CurrentUserResource, '/v1_0/user', endpoint='CurrentUser')

toutiao/resources/ user/profile.py

class CurrentUserResource(Resource):
    """
    用户自己的数据
    """
    method_decorators = [login_required]

    def get(self):
        """
        获取当前用户自己的数据
        """
        user_data = cache_user.UserProfileCache(g.user_id).get()
        user_data['id'] = g.user_id
        return user_data
六、项目Redis持久存储实现

common/cache/statistic.py
将统计数据,例如:点赞数量、评论数量、发表文章数量等的统计

from flask import current_app

from redis.exceptions import ConnectionError


# class UserArticleCountStorage(object):
#     """
#     用户文章数量redis存储工具类
#     """
#
#     # 在redis中一条记录保存了所有用户的而文章数量
#     # 'count:user:arts' redis  zset
#     # [
#     #     值          score
#     #     user_id   文章数量
#     #     (user_id_1, 12w),
#     #     (user_id_3, 10)
#     # ]
#
#     key = 'count:user:arts'
#
#     # 方式一
#     # cache_static.UserArticleCountStorage(user_id).get()
#     # def __init__(self, user_id):
#     #     self.user_id = user_id
#     #
#     # def get(self):
#     #     pass
#
#     # 方式二
#     # cache_static.UserArticleCountStorage.get(user_id)
#
#     @classmethod
#     def get(cls, user_id):
#         # 查询redis记录
#         # 如果redis存在记录
#         # 返回
#         # 如果redis不存在记录,则返回0,表示用户没有发表过文章
#         try:
#             count = current_app.redis_master.zscore(cls.key, user_id)
#         except ConnectionError as e:
#             current_app.logger.error(e)
#             count = current_app.redis_slave.zscore(cls.key, user_id)
#
#         if count is None:
#             return 0
#         else:
#             return int(count)
#
#     @classmethod
#     def increment(cls, user_id, incr_num=1):
#         try:
#             current_app.redis_master.zincrby(cls.key, user_id, incr_num)
#         except ConnectionError as e:
#             current_app.logger.error(e)
#             raise e


class CountStorageBase(object):
    """
    统计数量存储的父类
    """
    key = ''

    @classmethod
    def get(cls, user_id):
        # 查询redis记录
        # 如果redis存在记录
        # 返回
        # 如果redis不存在记录,则返回0,表示用户没有发表过文章
        try:
            count = current_app.redis_master.zscore(cls.key, user_id)
        except ConnectionError as e:
            current_app.logger.error(e)
            count = current_app.redis_slave.zscore(cls.key, user_id)		//从库里再查一次

        if count is None:
            return 0
        else:
            return int(count)

    @classmethod
    def increment(cls, user_id, incr_num=1):
        try:
            current_app.redis_master.zincrby(cls.key, user_id, incr_num)
        except ConnectionError as e:
            current_app.logger.error(e)
            raise e


class UserArticleCountStorage(CountStorageBase):
    """
    用户文章数量redis存储工具类
    """
    key = 'count:user:arts'


class UserFollowingCountStorage(CountStorageBase):
    """
    用户关注数量
    """
    key = 'count:user:followings'


# POST /articles
#  视图 1. 保存新文章数据库数据
#       2. UserArticleCountStorage.increment(user_id, 1)

# 并不能一直保证数据库中保存的文章数量与redis中保存的统计数量是相同
# 需要使用定时任务,定时核查redis中的数量是否与数据库中数据一致
#   方式 构造定时任务
#     定时任务: 查询数据库,取出分组聚合数据,将数据设置到redis中

session、cookie、jwt
前后端不分离 选session
前后端分离 jwt 因为不止有web端 有可能有app端



https://www.xamrdz.com/backend/3fn1962935.html

相关文章: