当前位置: 首页>后端>正文

python flask mysql 教程

一 、
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from datetime import datetime
from waymon import res

post request.form.get("")

get request.args.get("")

app = Flask(name)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:123456@127.0.0.1:3306/test"

追踪数据库 一般不开启

app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

db = SQLAlchemy(app)

class Todo(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.String(200), nullable=False)
completed = db.Column(db.Integer, default=0)
pub_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)

def __repr__(self):
    return "<Task %r>" % self.id

migrate = Migrate(app=app, db=db)

@app.route("/info", methods=["GET"])
def info():
id = request.args.get("id")
task = Todo.query.filter(Todo.id == id).first()
return res(task, "ok", 0)

@app.route("/list", methods=['GET'])
def list():
page = request.args.get("page")
print(page)
size = request.args.get("size")
print(size)
tasks = Todo.query.order_by(Todo.pub_date).all()
return res(tasks, "ok", 0)

@app.route("/add", methods=["POST", "GET"])
def add():
if request.method == "POST":
task_content = request.form.get("content")
new_task = Todo(content=task_content)
try:
db.session.add(new_task)
db.session.commit()
return res(None, "ok", 0)
except:
return res(None, "err", 1)
else:
task_content = request.args.get("content")
new_task = Todo(content=task_content)
try:
db.session.add(new_task)
db.session.commit()
return res(None, "ok", 0)
except:
return res(None, "err", 1)

@app.route("/update/<int:id>", methods=["POST"])
def update(id):
task = Todo.query.get_or_404(id)
if request.method == "POST":
task.content = request.form["task"]
try:
db.session.commit()
return res(None, "ok", 0)
except:
return res(None, "ok", 1)
else:
return res(None, "不支持GET", 2)

@app.route("/delete/<int:id>", methods=["POST"])
def delete(id):
task = Todo.query.get_or_404(id)
try:
db.session.delete(task)
db.session.commit()
return res(data=None, msg="ok", status=0)
except:
return res(None, "err", 1)

if name == 'main':
app.run(port=5000)

二 、 json&get&post 返回值封装

from flask import jsonify
from sqlalchemy.orm import DeclarativeMeta
import requests
import json

def res(data=None, msg="ok", status=0):
"""
json 返回值
"""
response = {
"data": data,
"msg": msg,
"status": status
}
try:
response['data'] = serializer(data)
return jsonify(response)
except SerializationError as e:
response['status'] = e.code
response['msg'] = e.message
return jsonify(response)

def serializer(obj):
"""
将对象转换为可以序列化为JSON的数据类型
"""
if obj is None:
return None
try:
# 如果对象本身就是可以序列化为JSON的类型,则直接返回
if isinstance(obj, (str, int, float, bool, list, tuple, dict)):
return obj
# 如果对象是ORM对象,则将其转换为字典并返回
elif isinstance(obj.class, DeclarativeMeta):
return {c.name: getattr(obj, c.name) for c in obj.table.columns}
# 如果对象实现了dict方法,则将其转换为字典并返回
elif hasattr(obj, 'dict'):
return obj.dict
else:
raise SerializationError(code=500, message="Cannot serializer obj")
except Exception as e:
raise SerializationError(code=500, message=str(e))

class SerializationError(Exception):
"""
自定义的异常类,用于处理序列化错误
"""
def init(self, code, message):
self.code = code
self.message = message

def get(uri, params):
"""
get 请求
"""
response = requests.get(uri, data=params)
if response.status_code == 200:
result = json.loads(response.content)
return result
else:
return res(None, response.text, response.status_code)

def post(uri, data):
"""
post 请求
"""
response = requests.post(uri, json=data)
if response.status_code == 200:
result = json.loads(response.content)
return result
else:
return res(None, response.text, response.status_code)

三、基础组件

python3.11 -m pip install flask

python3.11 -m pip install SQLalchemy

python3.11 -m pip install flask-sqlalchemy

python3.11 -m pip install flask-migrate

四 数据库迁移

1). 创建迁移仓库(migrations目录)
python3.11 app.py db init
2). 读取类的内容, 生成版本文件, 并没有真正在数据库中添加或删除;
python3.11 app.py db migrate -m "版本名后缀"
1
3). 在数据库中增删改, 也就是将迁移应用于数据库;
python3.11 app.py db upgrade

五 、orm 使用

1 创建表
class Todo(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.String(200), nullable=False)
completed = db.Column(db.Integer, default=0)
pub_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)

def __repr__(self):
    return "<Task %r>" % self.id

2 单条查询
Todo.query.filter(Todo.id == id).first()
3 多条查询
Todo.query.order_by(Todo.pub_date).all()
3 添加
db.session.add(new_task)
4 修改

task.content = request.form["task"]
db.session.commit()
5 删除


https://www.xamrdz.com/backend/3wv1918358.html

相关文章: