当前位置: 首页>后端>正文

pyflink 集群版redis读取数据 flask连接redis

一、flask-session

安装flask-session组件

使用流程:



import redis
from flask_session import Session

app.config["SESSION_TYPE"] = "redis"
app.config["SESSION_REDIS"] = redis.Redis(host="", port=6379,password="")
Session(app)



 

二、DBUtils

DBUtils是html" class="superseo">python用来实现数据库连接的模块。有两种实现方式,一是为每一个线程创建一个连接,二是创建一个连接池。

模式一:




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()


View Code


模式二:




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()


func()


View Code


 Flask中的使用:




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

class Conf(object):
    SALT = b"asd"
    SECRET_KEY = "asdf"

    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='ywj971020',
        database='code_count',
        charset='utf8'
    )

conn = Pool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  #如果不传参数,默认返回的数据格式是元祖
cursor.execute(sql)
data = cursor.fetchone()
data_list = cursor.fetchall()
cursor.close()
conn.close()


Flask


 

三、wtform

1、用户登录实例




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True


class LoginForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'}

    )
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)

if __name__ == '__main__':
    app.run()

app.py


app.py



pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
    <!--<input type="text" name="name">-->
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

    <!--<input type="password" name="pwd">-->
    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>

login.html


login.html


2、用户注册实例




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, '男'),
            (2, '女'),
        ),
        coerce=int
    )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))

    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field: 
        :return: 
        """
        # 最开始初始化时,self.data中已经有所有的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 1})
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()

app.py


app.py



pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for item in form %}
    <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>

register.html


register.html


3、其他

数据库数据实时更新:自定义一个__init__方法,因为静态字段只在程序开始时加载一次,构造方法会实时更新。

设置input框的默认值:在实例化Form对象时传参:data={'字段名' : '默认值'}

 

四、SQLAlchemy

SQLAlchemy是一个ORM框架,ORM是关系对象映射(类对应数据库的表,类里的字段对应表中的列,对象对应数据库的行)

1、创建表




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext..declarative import declarative_base
from sqlalchemy import Column,Datetime

Base = declarative_base()

class User(Base):

    __tablename__ = 'users'

    id = Column(Integer,primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    ctime = Column(Datetime,default=datetime.datetime.now)

def create_tables():
        engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)

def drop_tables():
        engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)

if __name__ == "__main__":
    create_tables()
    #drop_tables()


创建表


2、基本增删改查



#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
  
# 每次执行数据库操作时,都需要创建一个session
session = Session()

#1.添加
obj = Users(name='xx')
session.add(obj)
#session.add_all(Users(name='xx'),User(name='xx'))
session.commit()

#2.查询
result = session.query(Users).all()
result = session.query(Users).filter(Users.id=1).first()

#3.删除
session.query(Users).filter(User.id=1).delete()
session.commit()

#4.修改
session.query(Users).filter(User.id=1).update({'name':'xx'})
session.query(Users).filter(User.id=1).update({Users.name:'xx'})



 3.常用查询操作




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

#查询时只取某些字段
result = session.query(Users.id,Users.name).all()

#为查询的字段起别名
result = session.query(Users.name.label('cname')).all()
for item in result:
    print(item.cname)


# 条件
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()    #默认的逗号就是and
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()    #between
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()    #in
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()    #not in
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()    #子查询
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()    #默认的and可以不加and_
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()

#模糊匹配like
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 切片/分页
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()


View Code


给查询字段起别名:session.query(Users,name.label("n")).all()

4.一对多和多对多




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

#创建FK字段
class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Int, primary_key=True)
    name = Column(String(32), nullable=False)

class Users(Base):
    __tablename__ = 'users'
    id = Column(Int, primary_key=True)
    name = Column(String(32), nullable=False)
    depart_id = Column(Int, ForeignKey("depart.id"))

    dp = relationship('Depart', backref='us')

#1.查询用户名+所在部门
ret = session.query(Users.name, Depart.name).join(Depart).all()
#ret = session.query(Users.name, Depart.name).join(Depart, isouter=True).all()  #表示使用left join

#2.relation字段 查询用户名+所在部门
ret = session.query(Users).all()
for row in ret:
    print(ret.name, ret.dp.name)

#3.relation字段 查找某个部门所有员工
ret = session.query(Depart).filter(Depart.name='销售').first()
for row in ret.us:
    print(row.name)

#4.添加一个IT部门,同时增加一名员工
user = Users(name='xxx', dp.name='IT')    #自动创建IT部门
session.add(user)
session.commit()

#5.添加一个IT部门,同时增加多名员工
dep = Depart(name='IT')
dep.us = [Users(name=xxx), Users(name=xxx)]
session.add(dep)
session.commit()

一对多示例


一对多示例



pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

#创建表
class Server2Group(Base):
    __tablename__ = 'server2group'
    id = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))

    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),     #创建联合唯一索引
        # Index('ix_id_name', 'name', 'extra'),
    )


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便
    servers = relationship('Server', secondary='server2group', backref='groups')


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)


多对多示例


5.两种连接方式




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

def task():
    session = Session()    #从连接池中取一个连接
    xxxxx
    session.close()    #把连接返回给连接池

from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()


方式一



pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = scoped_session(Session)    #这里相当于是基于threading.Local为每个线程创建独立的内存空间,去连接池中取连接,这个连接只有当前的线程可以使用。

def task():
    xxxxx
    session.remove()

from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()


方式二


6.原生sql




pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

#查询
cursor = session.execute('select * from xx')
result = cursor.fetchall()

#添加
cursor = session.execute('insert into xxxxx')
session.commit()


方式一



pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_连接池,第1张

pyflink 集群版redis读取数据 flask连接redis,pyflink 集群版redis读取数据 flask连接redis_数据库_02,第2张

conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(xxxx)
result = cursor.fetchall()
cursor.close()
conn.close()


方式二


https://www.xamrdz.com/backend/39g1963516.html

相关文章: