1. 数据库
Flask 没有限定使用哪种数据库,不管是 SQL 还是 NoSQL。如果你不想自己编写负责的 SQL语句的话,你也可以使用 ORM
,通过 SQLALchemy
即可实现。
1.1 SQLALchemy 使用
安装
pip install flask-sqlalchemy
# 要连接mysql数据库,仍需要安装flask-mysqldb
pip install flask-mysqldb
# 或者你还还需 pymyql
pip3 install pymyql
相关网站:
- http://flask.pocoo.org/
- 基于 Celery 的后台任务
- flask-sqlalchemy
Flask-SQLAlchemy 数据库 URL
# hostname:主机、database:数据库,username、password 数据库用户名和密码
# mysql
mysql://username:password@hostname/database
# postgres
postgresql://username:password@hostname/database
# SQLite unix
sqlite:////absolute/path/to/database
# sqlite windows
sqlite:///c:/absolute/path/to/database
连接 MySQL
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:xxx@localhost/t1'
#设置每次请求结束后会自动提交数据库中的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_ECHO'] = True # 查询时会显示原始SQL语句
db = SQLAlchemy(app)
常用 sqlalchemy 列类型
常用 sqlalchemy 列选项
常用 SQLAlchemy 关系选项
1.2 基本操作
不管是插入、修改、还是删除操作,均由数据库会话管理,会话用 db.session
表示。数据库会话是为了保证数据的一致性,避免因部分更新导致数据不一致。提交操作把会话对象全部写入数据库,如果写入过程发生错误,整个会话都会失效。数据库会话也可以回滚,通过 db.session.rollback()
方法,实现会话提交数据前的状态。
创建、删除、插入
# 创建表
# 如果表存在,则帮助重新创建或更新
# 如果要修改模型后要把应用到现有数据库中,这个操作不行,更新现有数据库的粗暴方式是先删除再创建
db.create_all()
# 删除表
db.drop_all()
# 插入一条数据
ro1 = Role(name='admin')
db.session.add(ro1)
db.session.commit()
# 一次插入多条数据
us1 = User(name='wang',email='xxx@163.com',pswd='123456',role_id=ro1.id)
us2 = User(name='zhang',email='xxx@189.com',pswd='201512',role_id=ro2.id)
db.session.add_all([us1,us2])
db.session.commit()
查询
# 查询:filter_by精确查询
User.query.filter_by(name='wang').all()
# first()返回查询到的第一个对象
User.query.first()
# all()返回查询到的所有对象
User.query.all()
# filter模糊查询,返回名字结尾字符为g的所有数据。
User.query.filter(User.name.endswith('g')).all()
# get(),参数为主键,如果主键不存在没有返回内容
User.query.get()
# 逻辑非,返回名字不等于wang的所有数据。
User.query.filter(User.name!='wang').all()
# 逻辑与,需要导入and,返回and()条件满足的所有数据。
from sqlalchemy import and_
User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all()
# 逻辑或,需要导入or_
from sqlalchemy import or_
User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all()
# not_ 相当于取反
from sqlalchemy import not_
User.query.filter(not_(User.name=='chen')).all()
# 查询数据后删除
user = User.query.first()
db.session.delete(user)
db.session.commit()
User.query.all()
# 关联查询示例:角色和用户的关系是一对多的关系,一个角色可以有多个用户,一个用户只能属于一个角色。
查询角色的所有用户:
#查询roles表id为1的角色
ro1 = Role.query.get(1)
#查询该角色的所有用户
ro1.us
# 查询用户所属角色:
#查询users表id为3的用户
us1 = User.query.get(3)
#查询用户属于什么角色
us1.role
更新
# 更新数据
user = User.query.first()
user.name = 'dong'
db.session.commit()
User.query.first()
# 使用update
User.query.filter_by(name='zhang').update({'name':'li'})
查看原生 SQL
>>> str(User.query.filter_by(role=user_role)
... )
'SELECT users.id AS users_id, users.username AS users_username, users.role_id AS users_role_id \nFROM users \nWHERE %(
param_1)s = users.role_id'
常用 sqlAlchemy 查询过滤器
常用 SQLAlchemy 查询执行函数
示例:在视图函数中定义模型类
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
#设置连接数据库的URL
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mysql@127.0.0.1:3306/Flask_test'
#设置每次请求结束后会自动提交数据库中的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
class Role(db.Model):
# 定义表名
__tablename__ = 'roles'
# 定义列对象
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
us = db.relationship('User', backref='role')
#repr()方法显示一个可读字符串
def __repr__(self):
return 'Role:%s'% self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, index=True)
email = db.Column(db.String(64),unique=True)
pswd = db.Column(db.String(64))
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return 'User:%s'%self.name
if __name__ == '__main__':
db.drop_all()
db.create_all()
ro1 = Role(name='admin')
ro2 = Role(name='user')
db.session.add_all([ro1,ro2])
db.session.commit()
us1 = User(name='rose',email='rose@163.com',pswd='123',role_id=ro1.id)
us2 = User(name='lila',email='lila@189.com',pswd='456',role_id=ro2.id)
us3 = User(name='john',email='john@126.com',pswd='789',role_id=ro2.id)
db.session.add_all([us1,us2,us3])
db.session.commit()
app.run(debug=True)
1.3 使用 Flask-Migrate 实现数据库迁移
开发时,需要修改数据库模型,修改之后要更新数据库。表不存在时就创建,存在时,更新表的唯一方式是先删除后创建,但是会丢失数据。
更新表的最好方式是使用 数据库迁移框架,Flask 使用的是 Alembic
(集成在 Flask-Script
中:
# pip3 install flask-migrate
from flask_migrate import Migrate, MigrateCommand
app = Flask(__name__)
manager = Manager(app)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:xxxx@localhost/t1'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
migrate = Migrate(app, db) # 第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例
manager.add_command('db', MigrateCommand) # 这条语句在flask-Script中添加一个db命令
初始化数据库
在维护数据库迁移之前,使用 init
子命令创建迁移仓库:
此时你会看到项目中生成一个 migrations
的文件夹,所有迁移脚本都存放其中。
创建迁移脚本
Alembic 中,迁移脚本中有两个函数:upgrade()
he downgrade()
:
-
upgrade()
:把迁移中改动应用到数据库中 -
upgrade()
:将改动删除
revision
命令手动创建 Alembic 迁移,migrate
自动创建(自动创建可能会有错误,一定要检查下),自动创建迁移脚本:
发现出现:too many arguments
错误,用 python3 s1.py db migrate --help
命令查看 migrate
命令后面到底可以接什么参数,发现 -m
是可以的,但为什么会报错呢 ?
尝试不输入 'initial migration'
那么长,而是随便输入了一个 's'
,发现竟然成功了:
数据回滚
如果要回滚,那么首先要指定版本号,查看版本号:
# 由于版本号是随机字符串,为避免出错,建议先使用 history 命令查看历史版本的具体版本号,然后复制具体版本号执行回退。
python database.py db history
# 回滚
python database.py db downgrade 版本号
1.4 示例
在这里我们将创建一个简单的能够添加书籍的小程序,整个项目结构如下图所示:
1、定义模型,models.py
:
from flask_test.s4 import db
class Author(db.Model):
"""定义模型类-作者"""
__tablename__ = 'author'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
email = db.Column(db.String(64))
au_book = db.relationship('Book', backref='author', lazy='dynamic')
def __str__(self):
return 'Author:%s' % self.name
class Book(db.Model):
"""定义模型类-书名"""
__tablename__ = 'books'
id = db.Column(db.Integer, primary_key=True)
info = db.Column(db.String(32),unique=True)
leader = db.Column(db.String(32))
au_book = db.Column(db.Integer, db.ForeignKey('author.id'))
def __str__(self):
return 'Book:%s' % self.info
2、定义 Form 表单,my_forms.py
:
from flask_wtf import FlaskForm
from wtforms.validators import DataRequired
from wtforms import StringField, SubmitField
class Append(FlaskForm):
"""添加信息"""
auth_info = StringField(label='作者', validators=[DataRequired()])
book_info = StringField(label='书名', validators=[DataRequired()])
submit = SubmitField(u'添加')
3、主程序,app.py
:
from flask import Flask, render_template, redirect, url_for, request
from flask_test.my_extend import models
from flask_test.my_extend.my_forms import Append
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
manager = Manager(app)
#设置连接数据
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:password@localhost/t2'
#设置每次请求结束后会自动提交数据库中的改动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
#设置成 True,SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SECRET_KEY'] = 's'
#实例化SQLAlchemy对象
db = SQLAlchemy(app)
@app.route('/', methods=['GET', 'POST'])
def index():
# 查询所有作者和书名信息
author_list = models.Author.query.all()
book_list = models.Book.query.all()
return render_template('info.html', author_list=author_list, book_list=book_list)
@app.route('/add_book', methods=['GET', 'POST'])
def add_book():
"""添加书籍"""
form = Append() # 创建表单对象
if form.validate_on_submit():
# 获取表单输入数据
wtf_auth = form.auth_info.data
wtf_book = form.book_info.data
print(wtf_book, wtf_auth)
# 将表单输入的数据存入到模型中
db_auth = models.Author(name=wtf_auth)
db_book = models.Book(info=wtf_book)
# 提交会话
db.session.add_all([db_auth, db_book])
db.session.commit()
return redirect(url_for('index'))
else:
if request.method == 'GET':
return render_template('add_book.html', form=form)
if __name__ == '__main__':
manager.run()
4、info.html
```python
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/bootstrap.css') }}">
</head>
<body>
<div class="col-md-6" style="margin-left: 100px">
<h1>玄幻系列</h1>
<button class="btn pull-right" style="margin-bottom: 15px;"><a href="/add_book">添加</a> </button>
<table class="table table-striped table-bordered">
<thead>
<tr>
<th>作者</th>
<th>书名</th>
</tr>
</thead>
<tbody>
<tr>
{% for author in author_list %}
<td>{{ author.name }}</td>
{% endfor %}
{% for book in book_list %}
<td>{{ book.info }}</td>
{% endfor %}
</tr>
</tbody>
</table>
</div>
<script src="{{ url_for('static', filename='js/jquery-3.1.1.js') }}"></script>
<script src="{{ url_for('static', filename='js/bootstrap.js') }}"></script>
</body>
</html>
5、add_book.html
:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form action="/add_book" method="post">
{{ form.csrf_token }}
<p>{{ form.auth_info.label }} {{ form.auth_info }}</p>
<p>{{ form.book_info.label }} {{ form.book_info }}</p>
<p>{{ form.submit }}</p>
</form>
</body>
</html>
效果图如下:
1.5 踩坑
flask 不能进入 shell
flask 进入 shell,命令:python 程序文件.py shell
要安装:pip3 install flask-script
,程序中使用:
from flask_script import Manager
from flask import Flask
app = Flask(__name__)
manager = Manager(app)
if __name__ == '__main__':
manager.run()
flask 在连接MySQL出现ModuleNotFoundError: No module named 'MySQLdb'错误
解决办法:只要在配置 SQLALCHEMY_DATABASE_URI
时,加上一个 pymysql
就可以了:
# 记得安装 pymysql
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:dzd123@localhost/你的数据库名'
参考文章:
进入 shell 时出现错误:SQLALCHEMY_TRACK_MODIFICATIONS adds significant overhead and '
解决办法:
# 修改 flask_sqlalchemy 的 __init__.py
# 查找方法:在程序中导入 flask_sqlalchemy,按住 Ctrl 键,鼠标点击 flask_sqlalchemy,就会定位到 __init__.py 中,然后 Ctrl + F 查找 track_modifications 即可
# 在init.py里面有 init_app方法,修改下面的一行
track_modifications = app.config.setdefault('SQLALCHEMY_TRACK_MODIFICATIONS', True)
参考文章:
2. 邮件拓展
Flask 的扩展包 Flask-Mail
通过包装了 Python
内置的 smtplib
包,可以用在Flask
程序中发送邮件。
Flask-Mail
连接到简单邮件协议(Simple Mail Transfer Protocol,SMTP
)服务器,并把邮件交给服务器发送。
示例:下面以 qq 邮箱为示例演示下如何在 Flask 程序中发送邮件
1、开启 IMAP/SMTP
服务,生成授权码:
2、app.py
pip3 install flask_mail
```
```python
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
# 配置邮件:服务器/端口/传输层安全协议/邮箱名/密码
app.config.update(
DEBUG = True,
MAIL_SERVER='smtp.qq.com',
MAIL_PROT=465,
MAIL_USE_TLS=True,
MAIL_USERNAME='98xxx616@qq.com',
MAIL_PASSWORD='vnpyuevrhqyybccd', # 授权码
)
mail = Mail(app)
@app.route('/')
def index():
# sender 发送方,recipients 接收方列表,This is a test 为邮件标题
msg = Message("This is a test ", sender='9825xx616@qq.com', recipients=['146xx0@qq.com', 'junxx@outlook.com'])
#邮件内容
msg.body = "Flask 邮件测试"
#发送邮件
mail.send(msg)
print("邮件发送中...")
return "发生成功!"
if __name__ == "__main__":
app.run()
运行程序,访问:http://127.0.0.1:5000/
:
3. 蓝图 Blueprint
一个项目有很多的页面(视图),也就意味着会有很多路由,如果把所有代码写在一个文件中(启动文件、路由、配置等等),显然程序变得模块化,耦合性太差,后期维护过于困难。
采用 Python 模块导入的方式将程序拆分成多个文件,虽然降低程序耦合,但是也不能解决路由映射问题。Flask 提倡使用蓝图解决此类问题。那么什么是蓝图呢?
蓝图是用于实现单个应用的视图、模板、静态文件的集合,是模块化处理的类。简单来说,蓝图就是一个存储操作路由映射方法的容器,主要用来实现客户端请求和URL相互关联的功能。 在Flask中,使用蓝图可以帮助我们实现模块化应用的功能。
蓝图使用
一、创建蓝图对象。
#Blueprint必须指定两个参数,admin表示蓝图的名称,__name__表示蓝图所在模块
admin = Blueprint('admin',__name__)
二、注册蓝图路由。
@admin.route('/')
def admin_index():
return 'admin_index'
三、在程序实例中注册该蓝图。
app.register_blueprint(admin,url_prefix='/admin')
示例:
三个程序文件:login.py、user.py、manage.py
,相当于三个 app,其中 manage.py
作为主要程序文件(启动问)。
实现目标:在主程序与子程序中定义相同的路由 URL,能够同时访问,互不影响。
项目结构:
user.py
from flask import Blueprint,render_template
users = Blueprint('user',__name__)
@users.route('/user')
def user():
return render_template('user.html')
login.py
from flask import Blueprint,render_template
#创建蓝图
logins = Blueprint('login',__name__)
@logins.route('/login')
def login():
return render_template('login.html')
manage.py
from flask import Flask
#导入蓝图对象
from login import logins
from user import users
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello World!'
@app.route('/login')
def sss():
return 'login'
#注册蓝图,第一个参数logins是蓝图对象,url_prefix参数默认值是根路由,如果指定,会在蓝图注册的路由url中添加前缀。
app.register_blueprint(logins, url_prefix='/ll')
app.register_blueprint(users, url_prefix='')
if __name__ == '__main__':
app.run(debug=True)
在蓝图 login.py
中 login()
函数的路由我们使用了:/login
,主程序 manage.py
中 sss()
函数也使用了 /login
如果蓝图 logins
没有指定 url_prefix=''
路由前缀,那么访问:http://127.0.0.1:5000/login
得到的会是函数 sss()
所渲染的值。
那么访问蓝图 logins
,应该使用:http://127.0.0.1:5000/ll/login
动态路由
login.py
@logins.route('/login/<id>')
def login(id):
print(id)
return render_template('login.html')
访问:http://127.0.0.1:5000/ll/login/2
4 单元测试
4.1 什么是单元测试
Web程序开发过程一般包括以下几个阶段:需求分析,设计阶段,实现阶段,测试阶段。其中测试阶段通过人工或自动来运行测试某个系统的功能。目的是检验其是否满足需求,并得出特定的结果,以达到弄清楚预期结果和实际结果之间的差别的最终目的。
测试的分类:
测试从软件开发过程可以分为:单元测试、集成测试、系统测试等。在众多的测试中,与程序开发人员最密切的就是单元测试,因为单元测试是由开发人员进行的,而其他测试都由专业的测试人员来完成。所以我们主要学习单元测试。
什么是单元测试?
程序开发过程中,写代码是为了实现需求。当我们的代码通过了编译,只是说明它的语法正确,功能能否实现则不能保证。 因此,当我们的某些功能代码完成后,为了检验其是否满足程序的需求。可以通过编写测试代码,模拟程序运行的过程,检验功能代码是否符合预期。
单元测试就是开发者编写一小段代码,检验目标代码的功能是否符合预期。通常情况下,单元测试主要面向一些功能单一的模块进行。在Web开发过程中,单元测试实际上就是一些“断言”(assert)代码。
断言就是判断一个函数或对象的一个方法所产生的结果是否符合你期望的那个结果。 python中assert断言是声明布尔值为真的判定,如果表达式为假会发生异常。单元测试中,一般使用assert来断言结果。
断言使用:
>>> a = [1, 2, 3, 4]
>>> b = 2
>>> assert b in a # 为真则通过
>>> assert b not in a # 为假则报错
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AssertionError
# 自定义异常
>>> assert b not in a, 'False'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AssertionError: False
常用的断言方法:
assertEqual # 如果两个值相等,则pass
assertNotEqual # 如果两个值不相等,则pass
assertTrue # 判断bool值为True,则pass
assertFalse # 判断bool值为False,则pass
assertIsNone # 不存在,则pass
assertIsNotNone # 存在,则pass
简单测试用例:
>>> def add(x, y):
... return x + y
...
>>> assert add(2, 3) == 5
>>> assert add(2, 3) == 6, 'False'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AssertionError: False
4.2 单元测试用例
单元测试的基本写法:
- 定义一个类,继承自unittest.TestCase
- 在测试类中,定义两个测试方法
- 在测试类中,编写测试代码
测试基本框架:
import unittest
class TestClass(unittest.TestCase):
def setUp(self):
"""该方法会首先执行,方法名为固定写法"""
pass
def tearDown(self):
"""该方法会在测试代码执行完后执行,方法名为固定写法"""
pass
def test_app_exists(self):
"""测试代码"""
pass
在 setUp()
方法的代码创建了一个新的测试客户端并且初始化了一个新的数据库。这个函数将会在每次独立的测试函数运行之前运行。要在测试之后删除这个数据库,我们在 tearDown()
函数当中关闭这个文件,并将它从文件系统中删除。同时,在初始化的时候 TESTING
配置标志被激活,这将会使得处理请求时的错误捕捉失效,以便于您在进行对应用发出请求的测试时获得更好的错误反馈。
1、测试响应内容是否与预期一致
def test_index(self):
resp = self.client.get('/')
print(resp.data)
self.assertEqual(resp.data, b'Hello World!')
2、测试登陆和登出
我们应用的大部分功能只允许具有管理员资格的用户访问。所以我们需要一种方法来帮助我们的测试客户端登陆和登出。为此,我们向登陆和登出页面发送一些请求,这些请求都携带了表单数据(用户名和密码),因为登陆和登出页面都会重定向,我们将客户端设置为 follow_redirects
。
def login(self, username, password):
return self.app.post('/login', data=dict(
username=username,
password=password
), follow_redirects=True)
def logout(self):
return self.app.get('/logout', follow_redirects=True)
def test_login_logout(self):
rv = self.login('admin', 'default')
assert 'You were logged in' in rv.data
rv = self.logout()
assert 'You were logged out' in rv.data
rv = self.login('adminx', 'default')
assert 'Invalid username' in rv.data
rv = self.login('admin', 'defaultx')
assert 'Invalid password' in rv.data
3、数据库测试
import unittest
from author_book import *
#自定义测试类,setUp方法和tearDown方法会分别在测试前后执行。以test_开头的函数就是具体的测试代码。
class DatabaseTest(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mysql@localhost/test0'
self.app = app
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
#测试代码
def test_append_data(self):
au = Author(name='itcast')
bk = Book(info='python')
db.session.add_all([au,bk])
db.session.commit()
author = Author.query.filter_by(name='itcast').first()
book = Book.query.filter_by(info='python').first()
#断言数据存在
self.assertIsNotNone(author)
self.assertIsNotNone(book)
5. 部署
一直以来我们都是使用的 flask
自带的服务器,完成了web服务的启动。在生产环境中,flask 自带的服务器,无法满足性能要求。
这里采用 Gunicorn
做 wsgi
容器,来部署 flask
程序。Gunicorn
(绿色独角兽)是一个Python WSGI
的HTTP
服务器。从Ruby
的独角兽(Unicorn
)项目移植。该 Gunicorn
服务器与各种Web框架兼容,实现非常简单,轻量级的资源消耗。Gunicorn
直接用命令启动,不需要编写配置文件,相对 uWSGI
要容易很多。
几个概念:
- WSGI:全称是
Web Server Gateway Interface
(web服务器网关接口),它是一种规范,它是web
服务器和web
应用程序之间的接口。它的作用就像是桥梁,连接在 服务器和web
应用框架之间。 - uwsgi:是一种传输协议,用于定义传输信息的类型。
- uWSGI:是实现了
uwsgi
协议WSGI
的web
服务器。
在这里我们采用 nginx + gunicorn + flask
方式部署 flask,你也可以采用别的方式,比如:Django(uWSGI+Nginx)等等,只不过 gunicorn
相对来说更简单操作一些。
正向代理及反向代理
代理种类 | 说明 | 特点 | |
正向代理 | 请求经过代理服务器从局域网发出,然后到达互联网上的服务器 | 服务端并不知道真正的客户端是谁 | 客户端 |
反向代理 | 请求从互联网发出,先进入代理服务器,再转发给局域网内的服务器 | 客户端并不知道真正的服务端是谁 | 服务端 |
使用 Nginx 的优势:
- 实现分流、转发、负载均衡
- 分担服务器的压力
- 部署简单,内存消耗少,成本低
- 既可以做正向代理,也可以做反向代理
5.1 gunicorn
常用命令
# 安装
$ pip3 install gunicorn
# 查看命令行选项:
$ gunicorn -h
# 直接运行,默认启动的 127.0.0.1::8000
gunicorn 运行文件名称:Flask程序实例名
# 示例
gunicorn s2:app
# 指定进程和端口号: -w: 表示进程(worker)。 -b:表示绑定ip地址和端口号(bind)。
$ gunicorn -w 4 -b 127.0.0.1:5001 运行文件名称:Flask程序实例名
踩坑
运行 gunicorn
时发生如下错误:
# 错误
gunicorn.errors.HaltSever:<HaltServer 'Worker faild to boot.' 3
使用 debug
模式查看更多信息--log-level=debug
,也可以在后边加上参数 –preload
,便可看到详细的报错信息:
gunicorn -w 4 -b 192.168.21.128:5001 s2:app --log-level=debug
发现是 import error
,错误提醒说 s2.py
不是个包文件,这是 Python import 的问题。
解决方法
1、给这个执行文件添加环境变量
# 添加环境变量
vim /etc/profile
export PATH="$PATH:/home/hj/桌面/fk" # 将路径设置你的执行文件路径(在 profile 文件最后添加)
# 关闭保存 profile 文件
source /etc/profile
# 测试下有没有添加成功
echo $PATH
2、将这个执行文件变为包文件
# 在 fk 文件夹中新建一个 __init__.py 文件即可
vim __init__.py
hj@hj:~/桌面/fk$ ls
error.log __init__.py __pycache__ s s2.py
3、再运行:gunicorn -w 4 -b 192.168.21.128:5001 s2:app
,发现成功了:
参考文章:gunicorn启动报错gunicorn.errors.HaltServer
5.2 Nginx
安装 Nginx
$ sudo apt-get install nginx
Nginx 常用命令
Ubuntu 中你可以像任何其他 systemd 单位一样管理 Nginx 服务:
# 停止Nginx服务
sudo systemctl stop nginx
# 再次启动
sudo systemctl start nginx
# 重新启动Nginx服务:
sudo systemctl restart nginx
# 在进行一些配置更改后重新加载 Nginx 服务:
$sudo systemctl reload nginx
# 如果你想禁用Nginx服务在启动时启动:
$sudo systemctl disable nginx
# 并重新启用它:
$sudo systemctl enable nginx
# 查看
ps -ef | grep nginx
Nginx 配置
1、为 Nginx 添加配置文件,Ngnix 默认配置文件加载是在 /etc/nginx/conf.d/
目录下,新建一个配置文件(名字随意),编辑如下:
server {
# 监听80端口
listen 80;
# 本机
server_name 192.168.21.128;
# 默认请求的url
location / {
# 请求转发到gunicorn服务器
proxy_pass http://127.0.0.1:5001;
# 设置请求头,并将头信息传递给服务器端
proxy_set_header Host $host;
}
}
2、启动 Nginx 服务 /etc/init.d/nginx start
,访问:http://192.168.21.128:5001/
,效果如下图:
总结
- 客户端向 Nginx 发起请求,Nginx 将请求转发给
gunicorn
服务器 - Nginx 的
server_name
要和gunicorn
启动的-b
参数地址一致