当前位置: 首页>后端>正文

大文件上传分治上传前端分片

首先,安装Vue3.0以上版本:

npm install -g @vue/cli

安装异步请求库axios:

npm install axios --save

随后,安装Ant-desgin:

npm i --save ant-design-vue@next -S

Ant-desgin虽然因为曾经的圣诞节“彩蛋门”事件而声名狼藉,但客观地说,它依然是业界不可多得的优秀UI框架之一。

接着在项目程序入口文件引入使用:

import{createApp}from'vue'importAppfrom'./App.vue'import{router}from'./router/index'importaxiosfrom'axios'importqsfrom'qs'importAntdfrom'ant-design-vue';import'ant-design-vue/dist/antd.css';constapp=createApp(App)app.config.globalProperties.axios=axios;app.config.globalProperties.upload_dir="https://localhost/static/";app.config.globalProperties.weburl="http://localhost:8000";app.use(router);app.use(Antd);app.mount('#app')

随后,参照Ant-desgin官方文档:https://antdv.com/components/overview-cn构建上传控件:

<a-upload@change="fileupload":before-upload="beforeUpload"><a-button><upload-outlined></upload-outlined>上传文件</a-button></a-upload>

注意这里需要将绑定的before-upload强制返回false,设置为手动上传:

beforeUpload:function(file){returnfalse;}

接着声明分片方法:

fileupload:function(file){varsize=file.file.size;//总大小? varshardSize=200*1024;//分片大小? this.shardCount=Math.ceil(size/shardSize);//总片数? console.log(this.shardCount);for(vari=0;i<this.shardCount;++i){//计算每一片的起始与结束位置? varstart=i*shardSize;varend=Math.min(size,start+shardSize);vartinyfile=file.file.slice(start,end);letdata=newFormData();data.append('file',tinyfile);data.append('count',i);data.append('filename',file.file.name);constaxiosInstance=this.axios.create({withCredentials:false});axiosInstance({method:'POST',url:'http://localhost:8000/upload/',//上传地址? data:data}).then(data=>{this.finished+=1;console.log(this.finished);if(this.finished==this.shardCount){this.mergeupload(file.file.name);}}).catch(function(err){//上传失败? });}}

具体分片逻辑是,大文件总体积按照单片体积的大小做除法并向上取整,获取到文件的分片个数,这里为了测试方便,将单片体积设置为200kb,可以随时做修改。

随后,分片过程中使用Math.min方法计算每一片的起始和结束位置,再通过slice方法进行切片操作,最后将分片的下标、文件名、以及分片本体异步发送到后台。

当所有的分片请求都发送完毕后,封装分片合并方法,请求后端发起合并分片操作:

mergeupload:function(filename){this.myaxios(this.weburl+"/upload/","put",{"filename":filename}).then(data=>{console.log(data);});}

至此,前端分片逻辑就完成了。

后端异步IO写入

为了避免同步写入引起的阻塞,安装aiofiles库:

pip3 install aiofiles

aiofiles用于处理asyncio应用程序中的本地磁盘文件,配合Tornado的异步非阻塞机制,可以有效的提升文件写入效率:

importaiofiles# 分片上传? classSliceUploadHandler(BaseHandler):asyncdefpost(self):file=self.request.files["file"][0]filename=self.get_argument("filename")count=self.get_argument("count")filename='%s_%s'%(filename,count)# 构成该分片唯一标识符? contents=file['body']#异步读取文件? asyncwithaiofiles.open('./static/uploads/%s'%filename,"wb")asf:awaitf.write(contents)return{"filename":file.filename,"errcode":0}

这里后端获取到分片实体、文件名、以及分片标识后,将分片文件以文件名_分片标识的格式异步写入到系统目录中,以一张378kb大小的png图片为例,分片文件应该顺序为200kb和178kb,如图所示:

[图片上传失败...(image-1affef-1658759510726)]

当分片文件都写入成功后,触发分片合并接口:

importaiofiles# 分片上传? classSliceUploadHandler(BaseHandler):asyncdefpost(self):file=self.request.files["file"][0]filename=self.get_argument("filename")count=self.get_argument("count")filename='%s_%s'%(filename,count)# 构成该分片唯一标识符? contents=file['body']#异步读取文件? asyncwithaiofiles.open('./static/uploads/%s'%filename,"wb")asf:awaitf.write(contents)return{"filename":file.filename,"errcode":0}asyncdefput(self):filename=self.get_argument("filename")chunk=0asyncwithaiofiles.open('./static/uploads/%s'%filename,'ab')astarget_file:whileTrue:try:source_file=open('./static/uploads/%s_%s'%(filename,chunk),'rb')awaittarget_file.write(source_file.read())source_file.close()exceptExceptionase:print(str(e))breakchunk=chunk+1self.finish({"msg":"ok","errcode":0})

这里通过文件名进行寻址,随后遍历合并,注意句柄写入模式为增量字节码写入,否则会逐层将分片文件覆盖,同时也兼具了断点续写的功能。有些逻辑会将分片个数传入后端,让后端判断分片合并个数,其实并不需要,因为如果寻址失败,会自动抛出异常并且跳出循环,从而节约了一个参数的带宽占用。

轮询服务

在真实的超大文件传输场景中,由于网络或者其他因素,很可能导致分片任务中断,此时就需要通过降级快速响应,返回托底数据,避免用户的长时间等待,这里我们使用基于Tornado的Apscheduler库来调度分片任务:

pip install apscheduler

随后编写job.py轮询服务文件:

fromdatetimeimportdatetimefromtornado.ioloopimportIOLoop,PeriodicCallbackfromtornado.webimportRequestHandler,Applicationfromapscheduler.schedulers.tornadoimportTornadoScheduler? ? ? scheduler=Nonejob_ids=[]# 初始化? definit_scheduler():globalscheduler? ? ? scheduler=TornadoScheduler()scheduler.start()print('[Scheduler Init]APScheduler has been started')# 要执行的定时任务在这里? deftask1(options):print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),options))classMainHandler(RequestHandler):defget(self):self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')classSchedulerHandler(RequestHandler):defget(self):globaljob_ids? ? ? ? ? job_id=self.get_query_argument('job_id',None)action=self.get_query_argument('action',None)ifjob_id:# add? if'add'==action:ifjob_idnotinjob_ids:job_ids.append(job_id)scheduler.add_job(task1,'interval',seconds=3,id=job_id,args=(job_id,))self.write('[TASK ADDED] - {}'.format(job_id))else:self.write('[TASK EXISTS] - {}'.format(job_id))# remove? elif'remove'==action:ifjob_idinjob_ids:scheduler.remove_job(job_id)job_ids.remove(job_id)self.write('[TASK REMOVED] - {}'.format(job_id))else:self.write('[TASK NOT FOUND] - {}'.format(job_id))else:self.write('[INVALID PARAMS] INVALID job_id or action')if__name__=="__main__":routes=[(r"/",MainHandler),(r"/scheduler/?",SchedulerHandler),]init_scheduler()app=Application(routes,debug=True)app.listen(8888)IOLoop.current().start()

每一次分片接口被调用后,就建立定时任务对分片文件进行监测,如果分片成功就删除分片文件,同时删除任务,否则就启用降级预案。

作者:刘悦的技术博客

链接:https://www.jianshu.com/p/acf3e12c99d2

来源:简书

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


https://www.xamrdz.com/backend/3vj1944770.html

相关文章: