解决方案
我先把解决方案放在前面,后面慢慢阐明拖了我三天的研究过程和原因:
若出现带锁的查询修改前,先行commit一次。
model.db.session.commit() # 先行commit一次
user = models.User.query.with_for_update().get(user_id) # 我把所有数据库模型放在model里 所以用model.User取出
user.money -= 1 # 扣费操作
model.db.session.commit()
问题的来源
事实上,数据库行锁是一个很常见的需求。
我高中还在用PHP做一个财务系统的时候,就发现“在相对高并发对用户账户进行扣费”时会出现扣减额不正确的情况,例如每笔订单0.1元,同时发起100笔,理应扣费10元,但系统实际扣费小于10元。
在sqlalchemy上的发现
最近,用python开发一套分销系统的时候,再次出现了这个问题,回想高中的解决方案是直接在接口进入的地方对外部文件加锁,这样所有的扣费操作必须在一个订单完成后再进行,但这样显然降低了并发效率,于是我查到了sqlalchemy通行的行锁方案 with_for_update()。
user = models.User.query.filter(
models.User.username == user_username
).with_for_update().first()
# 或者是用get主键的方式直接带锁取出
user = models.User.query.with_for_update().get(user_id)
本以为 这样就轻轻松松解决了,但是,实际发现,依然存在扣费不正确的情况。
同时,这个问题还出现在订单下单后,商品库存应该减少的情况,同样的,这个行锁“没有生效”。
你知道,这是一件非常糟糕的事情,因为这玩意原理就是如此,不过是行锁罢了,锁住这一行数据让其他所有想要查询/修改本行的请求进行等待来保证数据的时效性。但很糟糕,我这样的写法在我的业务中,并没有生效。
资料查询
事实上,遇到这种全网都差不多解决方案但我并没有复现成功的问题,第一反应肯定是我的实现问题。所以我在查询过程中,发现这个行锁和sqlalchemy有另一种写法(或许是原生sqlalchemy的写法):
db.session.query(模型).filter().with_for_update().first()
因为我一直写的是
模型.query.filter().with_for_update().first()
这就很迷呀,难不成是我对sqlalchemy的理解不深然后写法问题导致行锁无法执行?
遂换成db.session的写法,发现依然失败。正好那时候在图书馆,去翻了一下Jack Stouffer的《深入理解Flask》,结果发现其中有这样一段
(Model.query 实际上是 db.session.query(Model))
那么也就是说,并不是写法问题。
事实上这两篇文章给我了一些启发,证明我的写法没有问题。
from flask import Blueprint, request, current_app, g # 蓝图
from flask_restful import Api, Resource, reqparse, fields, marshal_with # restful对象
from server.models import models # 引入数据库文件
import time # time库
import threading # 线程库
hello_bp = Blueprint('hello_bp', __name__)
api = Api(hello_bp)
channel_id = 35
class Hello(Resource):
def get(self):
start_time = time.time()
order = models.db.session.query(models.Order).with_for_update().get(583)
channel = models.Channel.query.with_for_update().get(channel_id)
channel.inventory -= 1
# time.sleep(0.1)
models.db.session.commit()
end_time = time.time()
print("扣减后库存:", channel.inventory, "结束时间:", end_time, "消耗时间:", end_time - start_time)
return {"status": 200}
# channel_raw = models.Channel.query.get(channel_id)
# print("开始扣减之前库存:", channel_raw.inventory)
# for i in range(1, 20):
# my_thread = threading.Thread(target=sub_inventory, args=(i,))
# my_thread.start()
api.add_resource(Hello, '/hello')
运行结果:
哇哦,居然行锁生效的。并且,若是其中增加time.sleep()效果更佳明显,因为你会发现不同进程的等待时间是不一样的,基本正好差sleep的时间,也就是行锁是生效的。
那就更迷了呀
这意味着行锁是生效的,但数据确滞后更新了(我print出了每次扣费前后的情况,发现不同进程有一定概率是相同数值,也就是出现了看起来像行锁没有锁住的情况)。
在扣减库存的时候出现了同样的问题:
你知道,事实上在这里的update是一个先查出来再修改回去的过程,那么有没有可能是查的时候的数据问题?遂意识到,这似乎是一个数据滞后的缓存问题,于是,这时候出现了一篇文章:
http://muhongqiao.top/?id=380
它遇到的是查询的时候有概率碰到数据滞后的情况,并且提供了解决方案:
1、每次查询后进行commit操作
2、创建connect连接时,设autocommit=True,自动进行commit提交。下面是flask中的设置例子:
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'mysql://root:12333@127.0.0.1:3306/test?charset=utf8mb4&autocommit=true'
但问题在于,我需要行锁,也就是查出来不能直接commit呀,直接commit就把锁解开了。
我注意到这个博主穆琪的文章中有这样一段话:
经过搜索的原因如下:
SQLAlchemy 带有对象缓存机制,在重复查询相同的对象时,直接先查询本地的缓存,而不需要从数据库加载数据。
而且SQLAlchemy并没有什么参数开关设置关闭缓存,那么一定是有深层次的原因。
原因的核心是InnoDB的事务隔离。
InnoDB 的默认隔离级别。它可以防止任何被查询的行被其他事务更改,从而阻止不可重复的读取,而不是 幻读取。它使用中度严格的锁定策略,以便事务内的所有查询都会查看同一快照中的数据,即数据在事务开始时的数据。
当创建查询事务时,事务一直没有进行更新,每次查询到的数据都是之前查询结果的快照,所有才会出现多进程时候数据不同步的情况。
这意味着,sqlalchemy是存在一个缓存机制的!
拨云见日有没有,知道了缓存问题的存在,那就很好解决了,我们要解决缓存问题。如何解决缓存问题?穆琪博主提到的commit,那直接就在带锁的查询之前加一句
db.session.commit()
就好了嘛!
于是问题就此解决,后来又回去看我带锁查询之前的业务逻辑,发现其中有通过外键关联对我要锁的表进行过查询,我怀疑就是在这个时候sqlalchemy的缓存机制把数据存下来了,使得我业务环境中的效果和helloworld级代码中的效果不一样。遂修改业务中所有遇到的行锁问题,解决了这个所谓“不生效”问题。
另外,关于表锁
其实还有一个问题没有解决,或者说解决起来很奇怪。
打个比方,在用户下单之前,我需要判断这个用户当日下的单数有没有超出限额,于是我需要查询order表。
这个时候,若有多条订单同时进入,显然可能会被同时加入数据库,但我又不可能锁掉整个order表(因为还有其他对order的业务存在),于是我重新建了一个表,在那个表里面新建了一条。每次要判断限额的时候,我去锁那条数据,这样每次的判断都是串行的不会冲突。
然鹅不幸的事情又发生了,即使我通过time.sleep()判断行锁的确生效了的情况,而且若有50ms的延迟情况下,这个机制运行得很可靠,但我一旦删掉了这50ms的延迟,就开始同样出现数据滞后的情况了。
无奈之下,我保留了那50ms的延迟,但带来的副作用意味着每秒最多下20个订单。