一 、
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from datetime import datetime
from waymon import res
post request.form.get("")
get request.args.get("")
app = Flask(name)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:123456@127.0.0.1:3306/test"
追踪数据库 一般不开启
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
class Todo(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.String(200), nullable=False)
completed = db.Column(db.Integer, default=0)
pub_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
def __repr__(self):
return "<Task %r>" % self.id
migrate = Migrate(app=app, db=db)
@app.route("/info", methods=["GET"])
def info():
id = request.args.get("id")
task = Todo.query.filter(Todo.id == id).first()
return res(task, "ok", 0)
@app.route("/list", methods=['GET'])
def list():
page = request.args.get("page")
print(page)
size = request.args.get("size")
print(size)
tasks = Todo.query.order_by(Todo.pub_date).all()
return res(tasks, "ok", 0)
@app.route("/add", methods=["POST", "GET"])
def add():
if request.method == "POST":
task_content = request.form.get("content")
new_task = Todo(content=task_content)
try:
db.session.add(new_task)
db.session.commit()
return res(None, "ok", 0)
except:
return res(None, "err", 1)
else:
task_content = request.args.get("content")
new_task = Todo(content=task_content)
try:
db.session.add(new_task)
db.session.commit()
return res(None, "ok", 0)
except:
return res(None, "err", 1)
@app.route("/update/<int:id>", methods=["POST"])
def update(id):
task = Todo.query.get_or_404(id)
if request.method == "POST":
task.content = request.form["task"]
try:
db.session.commit()
return res(None, "ok", 0)
except:
return res(None, "ok", 1)
else:
return res(None, "不支持GET", 2)
@app.route("/delete/<int:id>", methods=["POST"])
def delete(id):
task = Todo.query.get_or_404(id)
try:
db.session.delete(task)
db.session.commit()
return res(data=None, msg="ok", status=0)
except:
return res(None, "err", 1)
if name == 'main':
app.run(port=5000)
二 、 json&get&post 返回值封装
from flask import jsonify
from sqlalchemy.orm import DeclarativeMeta
import requests
import json
def res(data=None, msg="ok", status=0):
"""
json 返回值
"""
response = {
"data": data,
"msg": msg,
"status": status
}
try:
response['data'] = serializer(data)
return jsonify(response)
except SerializationError as e:
response['status'] = e.code
response['msg'] = e.message
return jsonify(response)
def serializer(obj):
"""
将对象转换为可以序列化为JSON的数据类型
"""
if obj is None:
return None
try:
# 如果对象本身就是可以序列化为JSON的类型,则直接返回
if isinstance(obj, (str, int, float, bool, list, tuple, dict)):
return obj
# 如果对象是ORM对象,则将其转换为字典并返回
elif isinstance(obj.class, DeclarativeMeta):
return {c.name: getattr(obj, c.name) for c in obj.table.columns}
# 如果对象实现了dict方法,则将其转换为字典并返回
elif hasattr(obj, 'dict'):
return obj.dict
else:
raise SerializationError(code=500, message="Cannot serializer obj")
except Exception as e:
raise SerializationError(code=500, message=str(e))
class SerializationError(Exception):
"""
自定义的异常类,用于处理序列化错误
"""
def init(self, code, message):
self.code = code
self.message = message
def get(uri, params):
"""
get 请求
"""
response = requests.get(uri, data=params)
if response.status_code == 200:
result = json.loads(response.content)
return result
else:
return res(None, response.text, response.status_code)
def post(uri, data):
"""
post 请求
"""
response = requests.post(uri, json=data)
if response.status_code == 200:
result = json.loads(response.content)
return result
else:
return res(None, response.text, response.status_code)
三、基础组件
python3.11 -m pip install flask
python3.11 -m pip install SQLalchemy
python3.11 -m pip install flask-sqlalchemy
python3.11 -m pip install flask-migrate
四 数据库迁移
1). 创建迁移仓库(migrations目录)
python3.11 app.py db init
2). 读取类的内容, 生成版本文件, 并没有真正在数据库中添加或删除;
python3.11 app.py db migrate -m "版本名后缀"
1
3). 在数据库中增删改, 也就是将迁移应用于数据库;
python3.11 app.py db upgrade
五 、orm 使用
1 创建表
class Todo(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.String(200), nullable=False)
completed = db.Column(db.Integer, default=0)
pub_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
def __repr__(self):
return "<Task %r>" % self.id
2 单条查询
Todo.query.filter(Todo.id == id).first()
3 多条查询
Todo.query.order_by(Todo.pub_date).all()
3 添加
db.session.add(new_task)
4 修改
task.content = request.form["task"]
db.session.commit()
5 删除