当前位置: 首页>后端>正文

Sqlite 多进程 共享 sqlalchemy 多进程

1 sqlalchemy介绍和快速使用

# django 的orm框架,对象关系映射,只能在djagno中用
# sqlalchemy:独立的orm框架,轻松的集成到任意项目中去,SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果

# djagno 的orm,sqlalchemy,peewee。。。   目前异步的orm框架,没有特别好的

# 安装:pip3 install sqlalchemy

# 组成部分(了解)
    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类
    Schema/Types,架构和类型
    SQL Exprression Language,SQL表达式语言
    
    
# SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]   
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

1.1 原生操作的快速使用

# 原生操作,写原生sql,用的比较少

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/luffy_api",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
    # 从连接池中拿一个链接
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from luffy_banner"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()

for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

2 创建操作数据表

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class Users(Base):
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    email = Column(String(32), unique=True)
    #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)


    __tablename__ = 'users'  # 数据库表名称
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        Index('ix_id_name', 'name', 'email'), #索引
    )


# orm不能创建数据库,手动创建
#sqlalchemy只能创建表和删除表,不能新增,删除字段
# 创建表,同步到数据库
def init_db():
    """
    根据类创建数据库表
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 创建被Base管理的所有表
    Base.metadata.create_all(engine)


def drop_db():
    """
    根据类删除数据库表
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    # init_db()
    drop_db()

    
# sqlalchemy 只能创建表,删除表,不能增加删除字段  (在flask中,可以使用第三方支持)

3 scoped_session线程安全

# 1 新增数据
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
from sqlalchemy.orm import scoped_session
from threading import Thread

# 第一步:创建engine
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:生成session,每次执行数据库操作时,都需要创建一个Session
Session = sessionmaker(bind=engine)
# 原来的时候
# session = Session()
# 变成了它,它就是线程安全,session对象在flask框架中就可以是全局的,任意线程使用的是就是这个全局session,但是不会出现数据错乱问题
# 本质内部使用了local对象,保证了,每个线程实际上使用的是当前线程自己的session
session = scoped_session(Session)



# scoped_session类的对象,正常来讲是没有 add, close,commit...方法和属性的,但是实际上有,是通过create_proxy_methods装饰器,设置进去的(通过反射setattr写进去的)


def task(i):
    # 第三步:插入数据
    user = Users(name='lqz%s'%i, email='33%s@qq.com'%i, extra='lqz is handsome')
    session.add(user)
    # 第四步:提交
    session.commit()
    # 第五步:关闭链接
    session.close()


for i in range(10):
    t = Thread(target=task,args=[i,])
    t.start()

4 基本增删查改

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
from sqlalchemy.sql import text

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()

# 1 增加数据一条
# session.add(对象)

# 2 同时增加多条
# session.add_all([Users(name='lqz123', email='asdf@qq.com', extra='asfasdf'),Users(name='pyy', email='aseedf@qq.com', extra='asdfeedd')])


# 3  删除  ---》查出来删
# res = session.query(Users).filter(Users.id > 17).delete()
# res = session.query(Users).filter(Users.name == 'lqz').delete()
# print(res)

# 4 修改---->查出来改
# res=session.query(Users).filter(Users.id > 10).update({"name" : "lqz"})
# 类似于django的F查询
# session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
# session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")

# 5 查询 ----》查询特别多
# 5.1 查所有
# res = session.query(Users).all()

# 5.2 指定查询的字段select name as xx,age from users;
# res = session.query(Users.name.label('xx'), Users.email)


# 5.3 通过filter 过滤
# filter写条件  >  <   ==
# res=session.query(Users).filter(Users.id > 0).all()
# res=session.query(Users).filter(Users.name == 'lqz2099').all()

# 5.4 filter_by 过滤,filter_by表达式
# res = session.query(Users).filter_by(name='lqz',email='332@qq.com').all()
# res = session.query(Users).filter_by(name='lqz').first()

# 5.5 自定制where部分查询sql
# res = session.query(Users).filter(text("id>:value or name=:name")).params(value=16, name='lqz099').all()
# res = session.query(Users).filter(Users.id<13 , Users.name=='lqz099').all()

# 5.6 纯自定义sql---》django中支持这个
# res = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()


# 5.7 filter和filter_by的其他使用
#  条件
# res = session.query(Users).filter_by(name='lqz').all()
# 表达式,and条件连接
# ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
# between
# res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz').all()
# res = session.query(Users).filter(Users.id.between(7, 9)).all()
# res = session.query(Users).filter(Users.id.between(7, 9))  # 原生sql
# res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz') # 原生sql

# in
# res = session.query(Users).filter(Users.id.in_([7,8,9])).all()
# ~非,除。。外
# res = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()

# 二次筛选
# res = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz099'))).all()


# 5.8 与  或  非
# or_包裹的都是or条件,and_包裹的都是and条件
from sqlalchemy import and_, or_

# res = session.query(Users).filter(and_(Users.id > 3, Users.name == 'lqz099')).all()
# res = session.query(Users).filter(or_(Users.id < 2, Users.name == 'lqz099')).all()
# res = session.query(Users).filter(
#     or_(
#         Users.id < 2,
#         and_(Users.name == 'lqz099', Users.id > 3),
#         Users.extra != ""
#     ))

# 5.9 通配符,以e开头,不以e开头
# res = session.query(Users).filter(Users.name.like('lqz%')).all()
# res = session.query(Users).filter(~Users.name.like('%lqz%')).all()


# 5.10  限制,用于分页,区间
# res = session.query(Users)[1:2]
# 每页显示5条,第3页的数据
# res = session.query(Users)[3*5:3*5+5]


# 5.11 排序,根据name降序排列(从大到小)
# res = session.query(Users).order_by(Users.name.desc()).all()
# 第一个条件重复后,再按第二个条件升序排
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()


# 5.12 分组
# 分组
from sqlalchemy.sql import func

# select * from user group by name;
# res = session.query(Users).group_by(Users.name).all()
# 分组之后取最大id,id之和,最小id
# select max(id),sum(id),min(id) from user group by name;
# res = session.query(
#     func.max(Users.id),
#     func.sum(Users.id),
#     func.min(Users.id)).group_by(Users.name).all()
# haviing筛选
# select max(id),sum(id),min(id) from user where email like 33% group by name  having min(id) > 2;
res = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).filter(Users.email.like('33%')).group_by(Users.name).having(func.min(Users.id) > 2).all()







### 5.13 链表操作
# select * from users,favor where user.id= favor.nid;  # 先笛卡尔积再过滤
# ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
#join表,默认是inner join
# select * from person inner join favor on person.favor_id=favor.id;
# ret = session.query(Person).join(Favor).all()


#isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# select * from person left join favor on person.favor_id=favor.id;
# ret = session.query(Person).join(Favor, isouter=True).all()
# 右链接
# ret = session.query(Favor).join(Person, isouter=True).all()

# 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from person left join favor on person.id=favor.id;
# ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all()


## 5.14  union和union all的区别?
# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
#union和union all的区别?
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union_all(q2).all()


print(res)

# 第四步:提交
session.commit()
# 第五步:关闭链接
session.close()

5 一对多

###一对多关系
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名,uselist=False
    hobby_id = Column(Integer, ForeignKey("hobby.id"))


    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 基于对象的跨表查询:
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')

    def __repr__(self):
        return self.name
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Person, Hobby
from sqlalchemy.sql import text

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()

# 1  一对多增加
# 方式一:
# session.add(Hobby(caption='足球'))
# session.add(Person(name='lqz',hobby_id=1))
# session.add(Person(name='pyy',hobby_id=2))  # 有约束,增加失败

# 方式二:
# session.add(Person(name='刘亦菲', hobby=Hobby(caption='橄榄球')))


# 2 一对多查询
# 2.1 基于对象的跨表查询
### 正向查询
# res = session.query(Person).filter(Person.name == '刘亦菲').first()
# print(res)
# print(res.hobby.caption)

## 反向查询
# res=session.query(Hobby).filter_by(caption='足球').first()
# print(res)
# # 取出喜欢足球的所有人---》反向查
# print(res.pers)


# 2.2 基于连表的跨表查询
# select * from person,hobby where person.hobby_id=hobby.id and person.name=彭于晏;
# res=session.query(Person,Hobby).filter(Person.hobby_id==Hobby.id,Person.name=='彭于晏').all()

#select * from person inner join hobby on person.hobby_id=hobby.id where person.name=彭于晏;
res = session.query(Person).join(Hobby).filter(Person.name=='彭于晏').all()

print(res)

session.commit()
session.close()

6 多对多

#多对多
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))



class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    #方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
    girls = relationship('Girl', secondary='boy2girl', backref='boys')
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Boy,Girl,Boy2Girl
from sqlalchemy.sql import text

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()

# 1  多对多增加
# 方式一:所有表都有,一个个增加
# session.add(Boy(name='dxl'))
# session.add(Girl(name='zqh'))
# session.add_all([Boy(name='dxl'),Girl(name='zqh')])
# session.add_all([Boy2Girl(boy_id=1,girl_id=1),Boy2Girl(boy_id=1,girl_id=2)])


# 方式二:
# session.add(Boy(name='张涛',girls=[Girl(name='芙蓉姐姐'),Girl(name='凤姐')]))


# 2 一对多查询
# 2.1 基于对象的跨表查询
### 正向查询

# res=session.query(Boy).filter_by(name='张涛').first()
# print(res.girls)

## 反向查询
# res=session.query(Girl).filter_by(name='凤姐').first()
# print(res.boys)



# 2.2 基于连表的跨表查询


# print(res)

session.commit()
session.close()

7 flask-sqlalchemy使用和和flask-migrate使用

# flask-sqlalchemy:帮助我们快速把sqlalchemy集成到flask中
# 表字段增加删除不支持:flask-migrate,以后增加了字段,删除字段只需要两条迁移命令就完成了

### flask—sqlalchemy
# 第一步:导入SQLAlchemy,实例化得到对象db
# 第二步:注册
	db.init_app(app)
# 第三步:创建表模型,继承db.Model
# 第四步:使用session
	db.session   处理了线程安全
    
    
    
    
### 把表同步到数据库中---》flask-migrate
	## 基于flask_script,定制几个命令  python manage.py db init/ migrate/upgrade
    
    # 第一个命令:
    python38 manage.py db init  # 初始化,项目使用的时候,只敲一次,生成migrations文件夹
    # 第二个命令:只要改了表结构,增加表,删除表,增加字段,删除字段
    python38 manage.py db migrate # 不会在数据库中生成记录,只是记录变化等同于makemigrations
    
    # 第三个命令
	python38 manage.py db upgrade  # 等同于migrate

8 请求上下文分析

def wsgi_app(self, environ, start_response):
        # 读完了:ctx:是RequestContext的对象,内部有当次请求的request对象,session对象,app对象,flash对象
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)
请求上下文执行流程(ctx):
		-0 flask项目一启动,有6个全局变量
			-_request_ctx_stack:LocalStack对象
			-_app_ctx_stack :LocalStack对象
			-request : LocalProxy对象
			-session : LocalProxy对象
		-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
		-2 wsgi_app()
			-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session,flash,当前app对象
			-2.2 执行: ctx.push():RequestContext对象的push方法
				-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
				-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
				-2.2.3 push方法源码:
				    def push(self, obj):
						#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
						#Local(),flask封装的支持线程和协程的local对象
						# 一开始取不到stack,返回None
						rv = getattr(self._local, "stack", None)
						if rv is None:
							#走到这,self._local.stack=[],rv=self._local.stack
							self._local.stack = rv = []
						# 把ctx放到了列表中
						#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
						rv.append(obj)
						return rv
		-3 如果在视图函数中使用request对象,比如:print(request)
			-3.1 会调用request对象的__str__方法,request类是:LocalProxy
			-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
				-3.2.1 内部执行self._get_current_object()
				-3.2.2 _get_current_object()方法的源码如下:
				    def _get_current_object(self):
						if not hasattr(self.__local, "__release_local__"):
							#self.__local()  在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
							# 用了隐藏属性
							#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
							#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
							#这个地方的返回值就是request对象(当此请求的request,没有乱)
							return self.__local()
						try:
							return getattr(self.__local, self.__name__)
						except AttributeError:
							raise RuntimeError("no object bound to %s" % self.__name__)
				-3.2.3 _lookup_req_object函数源码如下:
					def _lookup_req_object(name):
						#name是'request'字符串
						#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
						top = _request_ctx_stack.top
						if top is None:
							raise RuntimeError(_request_ctx_err_msg)
						#通过反射,去ctx中把request对象返回
						return getattr(top, name)
				-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
		-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
		
		-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
		
	其他的东西:
		-session:
			-请求来了opensession
				-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
					if self.session is None:
						#self是ctx,ctx中有个app就是flask对象,   self.app.session_interface也就是它:SecureCookieSessionInterface()
						session_interface = self.app.session_interface
						self.session = session_interface.open_session(self.app, self.request)
						if self.session is None:
							#经过上面还是None的话,生成了个空session
							self.session = session_interface.make_null_session(self.app)
			-请求走了savesession
				-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
				-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
		-请求扩展相关
			before_first_request,before_request,after_request依次执行
		-flask有一个请求上下文,一个应用上下文
			-ctx:
				-是:RequestContext对象:封装了request和session
				-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
			-app_ctx:
				-是:AppContext(self) 对象:封装了当前的app和g
				-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
	-g是个什么鬼?
		专门用来存储用户信息的g对象,g的全称的为global 
		g对象在一次请求中的所有的代码的地方,都是可以使用的 
		
		
	-代理模式
		-request和session就是代理对象,用的就是代理模式




https://www.xamrdz.com/backend/3xa1960007.html

相关文章: