当前位置: 首页>大数据>正文

爬虫16:多线程threading&queue

爬虫:IO密集型程序<=涉及很多网络IO以及本地磁盘IO操作=>消耗时间,降低效率
多线程,一定程度上提升执行效率(IO密集型)

  • scrapy-redis
    Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(pip install scrapy-redis
  • 分布式策略
    Master端(核心服务器) :搭建一个Redis数据库,不负责爬取,只负责url指纹判重、Request的分配,以及数据的存储。
  1. _threading:偏底层,较threading功能有限
  2. threading=>推荐*;=_thread+其他方法

1、多线程使用流程

  • threading.currentThread() # 返回当前的线程变量
  • threading.enumerate() # 返回一个所有正在运行的线程的列表
  • threading.activeCount() # 返回正在运行的线程数量
    具体方法:
from threading import Thread
# 线程创建、启动、回收
t = Thread(target="函数名")  # 创建线程对象
t.start() # 创建并启动线程
t.join() # 阻塞等待回收线程

创建多线程:

def function_name():
    # print(time.strptime()time.time())
    time.sleep(1)
    print(time.strftime("%Y/%m/%d %H:%M:%S"))

from threading import Thread
# 线程创建、启动、回收
t_list = []
for _ in range(5):
    t = Thread(target=function_name)  # 创建线程对象
    t_list.append(t)
    t.start() # 创建并启动线程
for t in t_list:
    t.join() # 阻塞等待回收线程

输出结果:


爬虫16:多线程threading&amp;queue,第1张
多线程实例输出结果

注意:
多线程不能用于操作同一数据=>数据不确定=>通过threading模块的Lock对象保证数据正确性
即执行完操作时主动释放,继续让其他线程获取。

没有搞懂
# 上锁操作
from threading import Lock
lock = Lock()
lock.acquire()  # 获取锁
---
lock.release()

2、Queue队列模型

GIL全局解释器=>同一时刻仅一个线程占据解释器
线程遇到 IO 操作时就会主动让出解释器,让其他处于等待状态的线程去获取解释器来执行程序,而该线程则回到等待状态,这主要是通过线程的调度机制实现的。
queue:队列先进先出,提供创建共享数据的队列模型

from queue import Queue  # 导入模块

q = Queue() # 创建队列对象
q.put(url) # 向队列中添加爬取的url
q.get() # 获取一个url;当队列为空时,阻塞
q.empty()  # 判断是否为空,True/false
# 其他
from queue import Queue,LifoQueue,PriorityQueue
# 队列,用于多线程间的数据交换
q = Queue(maxsize=10)  # FIFO 队列,先进先出;maxsize默认是0,即不指定队列的大小
LifoQueue(10) # LIFO类似堆,即先进后出
PriorityQueue(10) # 优先级队列,优先级别越低越先出

q.qsize() # 返回队列的大小
q.empty() # 队列空,返回True;不空,返回False
q.full() # 队列满了,返回True;不满,返回False # 与maxsize对应

q.get() # 获取队列,立即取出一个元素,并返回该元素,timeout是超时时间
# get(self, block=True, timeout=None)
# Remove and return an item from the queue.
#  默认在必要时,阻塞至项目可取到
#         If optional args 'block' is true and 'timeout' is None (the default),
#         block if necessary until an item is available. If 'timeout' is
#         a non-negative number, it blocks at most 'timeout' seconds and raises
#         the Empty exception if no item was available within that time.
#         Otherwise ('block' is false), return an item if one is immediately
#         available, else raise the Empty exception ('timeout' is ignored
#         in that case).即block=false,则item立即取出,若无法取出,则报错
q.put("item") # block默认True,timeout默认None

q.get_nowait()  # = q.get(False) 即立即取出
q.put_nowait("item") # =q.put(item, False) 即立即入队

q.join()
# def join(self):
#     '''Blocks until all items in the Queue have been gotten and processed.
#    阻塞调用线程,直到队列中所有任务被处理掉。即等队列为空,再执行其他操作
#     The count of unfinished tasks goes up whenever an item is added to the
#     queue. The count goes down whenever a consumer thread calls task_done()
#     to indicate the item was retrieved and all work on it is complete.
#
#     When the count of unfinished tasks drops to zero, join() unblocks.'''

q.task_done()
#     def task_done(self):
#         '''Indicate that a formerly enqueued task is complete.
#   在完成一项任务之后,该函数向已经完成的队列发送一个信号
#         Used by Queue consumer threads.  For each get() used to fetch a task,
#         a subsequent call to task_done() tells the queue that the processing
#         on the task is complete.
#
#         If a join() is currently blocking, it will resume when all items
#         have been processed (meaning that a task_done() call was received
#         for every item that had been put() into the queue).
#
#         Raises a ValueError if called more times than there were items
#         placed in the queue.'''

实例:

# https://app.mi.com/

"""
标签 
<h5>
   <a href="/details?id=com.tencent.tmgp.jx3m">剑网3:指尖江湖</a>
</h5>
跳转链接 https://app.mi.com/details?id=com.tencent.tmgp.jx3m
xpath //ul[@class="applist"]//h5/a
- 实例:queue的使用
"""
from urllib.request import urlopen, Request
from faker import Faker
from pyquery import PyQuery
from queue import Queue


class XiaoMi:
   def __init__(self):
       self.url = "https://app.mi.com/"
       self.q = Queue()

   def get_html(self, url):
       # 获取相应内容
       req = Request(url, headers={"User-Agent": Faker(local="zh_CN").user_agent()})
       resp = urlopen(req)
       return resp.read()

   def para_html(self, html):
       # 解析html
       # tag = PyQuery(html)(".applist")
       tags = PyQuery((PyQuery(html)(".applist")))("h5")
       for tag in tags:
           url_tail = PyQuery(tag)("a").attr("href")
           self.q.put(self.url + url_tail)
           # print(url_tail)

   def run(self):
       """主函数"""
       html = self.get_html(self.url)  # 获取主页面
       self.para_html(html)
       # print(self.q)  # <queue.Queue object at 0x000001B60E2B83D0>
       while not (self.q.empty()):  # 非空时执行
           detail_url = self.q.get()  # 先出
           # detail_url = "https://app.mi.com/details?id=com.kimoo.shizi"
           detail_html = self.get_html(detail_url)
           res = PyQuery(detail_html)(".intro-titles")
           name = PyQuery(res)("h3").text()  # 获取h3标签对应的文本信息
           clas = PyQuery(res)(".special-font").text()
           print(name, clas, detail_url)  # name、clas存在获取失败情况


if __name__ == "__main__":
   a = XiaoMi()
   a.run()

  • Lock+queue+threading的使用
import time
from queue import Queue
import csv
from threading import Lock
from faker import Faker
from pyquery import PyQuery
from urllib.request import urlopen, Request
from threading import Thread


class Spider:

    def __init__(self):
        """
        爬虫:https://app.mi.com/catTopList/0?page=3
        """
        self.url = "https://app.mi.com"
        self.q = Queue()  # 存放所有URL的队列
        self.i = 0
        self.id_list = []  # 存放所有类型id的空列表
        self.f = open("Xiaomi.csv", "a", encoding="utf-8", newline="")  # 打开文件
        self.writer = csv.writer(self.f)
        self.lock = Lock()  # 创建锁
        self.fake = Faker(local="zh_CN")  # 获取ua

    # 获取包链接
    def get_cateid(self):
        """获取类名id;=>获取包名+app分类+下载详情页"""
        resp = Request(self.url, headers={"User-Agent": self.fake.user_agent()})
        html = urlopen(resp).read()
        res = PyQuery(html)(".applist")
        for i in res:
            tags = PyQuery(i)("h5")
            for tag in tags:
                url_tail = PyQuery(tag)("a").attr("href")
                self.id_list.append(self.url + url_tail)
        # print(self.id_list)


    # url加入队函数,拼接url并将url加入队列
    def url_in(self):
        self.get_cateid()
        for url in self.id_list:
            self.q.put(url)

    # 请求:多线程处理
    def get_req(self):
        while not self.q.empty():
            url = self.q.get()
            resp = Request(url, headers={"User-Agent": self.fake.user_agent()})
            html = urlopen(resp).read()
            self.parse_html(html)

    # 解析函数
    def parse_html(self, html):
        res = PyQuery(html)(".intro-titles")
        name = PyQuery(res)("h3").text()  # 获取h3标签对应的文本信息
        clas = PyQuery(res)(".special-font").text()
        # clas1, clas2 = clas.split("|")
        # print(name, clas)  # name、clas存在获取失败情况
        # 写入文件
        self.lock.acquire()
        self.writer.writerow([name, clas])
        self.lock.release()

    # 主函数
    def main(self):
        self.url_in()  # url加入队列
        t_list = []
        # 创建多线程
        for i in range(1):
            t = Thread(target=self.get_req)
            t_list.append(t)
            t.start()  # 启动线程

        # 回收线程
        for t in t_list:
            t.join()

        # 文件保存
        self.f.close()


if __name__ == "__main__":
    start = time.time()
    s = Spider()
    s.main()
    end = time.time()
    print("执行时间:", end - start)
    # 执行时间: 10.322103023529053
爬虫16:多线程threading&amp;queue,第2张

参考

  1. 总结的太到位:python 多线程系列详解
  2. Python多线程爬虫详解
    3.python 队列queue
    4.Python3 queue模块详解

https://www.xamrdz.com/bigdata/7fr1890014.html

相关文章: