在上一篇文章中对QQ音乐爬虫的逻辑进行分析,是用单线程单进程写的,这里对此进行改进,因为要对全网的歌曲进行爬取,所以为提高效率,设计成分布式爬虫。
Pathon标准库为我们提供了threading和multiprossing来实现多线程,自从Python3.2之后,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPollExecutor两个类,实现了对threading和multiprocessing更高级的抽象,对编写线程、进程提供了直接的支持。
下面简单讲述一下concurrent.futures的属性和方法:
Executor:Executor是一个抽象类,不能被直接使用。为具体的异步执行定义了基本方法:ThreadingPoolExecutor和ProcessingPoolExecutor继承了Executor,分别被用来创建线程和进程。
创建线程和进程后,Executor提供了submit()和map()方法对其操作。submit()和map()方法最大的区别是参数类型,map()的参数必须是列表、元组和迭代器的数据类型。
Future:可以理解为在未来完成的操作,这是异步编程的基础。通常情况下,我们执行IO操作和访问URL时,在等待结果返回之前会产生阻塞,CPU不能做其他事情,而Future的引入帮助我们在等待的这段时间完成其他操作。
现在我们知道,爬取全站歌曲信息是按照字母A-Z的顺序循环爬取的,这是在单进程单线程的情况下运行的。如果将这26次循环分为26个进程同时执行,每个进程只需执行对应的字母分类,假设执行一个分类的时间相同,那么多进程并发的效率是单进程的26倍。
除了运行多进程外,项目代码大部分是IO密集型的,那么在每个进程下使用多线程可以提高每个进程的运行效率。我们知道歌手列表页是通过两层循环实现的,第一层是循环每个分类字母,现将每个分类字母当做一个单独的进程处理。第二层是循环每个分类的歌手总页数,可将这个循环使用多线程处理。假设每个进程使用10条线程(线程数可自行设定),那么每个进程的效率也相对提高10倍。
分布式策略考虑的因素有网站服务器负载量、网速快慢、硬件配置和数据库最大连接量。举个例子,爬取某个网站1000万数据,从数据量分析,当然进程数和线程越多,爬取的速度越快。但往往忽略了网站服务器的的并发量。假设设定10个进程,每个进程200条线程,每秒并发量为200*10=2000,若网站服务器并发量远远低于该并发量,在请求网站的时候,就会出现卡死的情况,导致请求超时,无形之中增加等待时间。除此之外,进程和线程越多,对运行程序的系统压力越大,若涉及数据入库,还要考虑并发数是否超出数据库连接数。
根据上述分布式策略,在music.py中分别添加函数myThread和myProcess,分别代表多线程和多进程:
# 多线程
# 修改了请求地址URL以及数据获取
def myThread(index, cookie_dict):
cookie_dict = getCookies()
# 每个字母分类的歌手列表页数
url = 'https://u.y.qq.com/cgi-bin/musicu.fcg?-=getUCGI771604139451213&g_tk=5381&loginUin=0&hostUin=0&format=json&inCharset=utf8&outCharset=utf-8¬ice=0&platform=yqq.json&needNewCode=0&data={"comm":{"ct":24,"cv":0},"singerList":{"module":"Music.SingerListServer","method":"get_singer_list","param":{"area":-100,"sex":-100,"genre":-100,"index":'+str(index)+',"sin":0,"cur_page":1}}}'
r = session.get(url, headers=headers)
total = r.json()['singerList']['data']['total']
pagecount = math.ceil(int(total) / 80)
page_list = [x for x in range(1, pagecount+1)]
thread_number = 10
# 将每个分类总页数平均分给线程数
list_interval = math.ceil(len(page_list) / thread_number)
# 设置线程对象
Thread = ThreadPoolExecutor(max_workers=thread_number)
for i in range(thread_number):
# 计算每条线程应执行的页数
start_num = list_interval * i
if list_interval * (i + 1) <= len(page_list):
end_num = list_interval * (i + 1)
else:
end_num = len(page_list)
# 每个线程各自执行不同的歌手列表页数
Thread.submit(get_genre_singer, index, page_list[start_num: end_num],cookie_dict)
# 多进程
# 添加Cookies获取和循环方式
def myProcess():
with ProcessPoolExecutor(max_workers=27) as executor:
cookie_dict = getCookies()
for index in range(1, 28):
# 创建27个进程,分别执行A-Z分类
executor.submit(myThread, index, cookie_dict)
- 多进程myProcess()函数:主要循环字母A-Z和#,将每个字母独立创建一个进程,每个进程执行的方法函数是myThread()。
- 多线程myThread()函数:首先根据传入函数参数获取当前分类的歌手总页数,然后根据得到的总页数和设定的线程数计算每条线程应执行的页数,最后遍历设定线程数,让每条线程执行相应页数。例如总页数100页、10条线程,每条线程应执行10页,第一条线程执行0-10页,第二条线程执行10-20页,以此类推。线程调用的方法函数是get_genre_singer()。