侧边栏壁纸
博主头像
冰原 博主等级

不念过去,不畏将来。

  • 累计撰写 7 篇文章
  • 累计创建 9 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Python多线程与队列写入文件的坑

南乔
2024-04-23 / 0 评论 / 0 点赞 / 78 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于2024-04-23,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一、前言

前段时间捣鼓爬虫,由于爬取时速度太慢。又去看了python的多线程和队列,其中踩了不少坑。对一些已解决的做个记录。

二、多线程文件写入部分资源丢失

出现多线程写入文件部分资源丢失,是因为在写入文件时使用了多线程,造成多个线程同时写入一个文件,导致部分资源丢失。解决方案,在爬虫下载文件流时使用多线程和队列。写入文件单独开一个线程写入即可解决。

错误示范:

#!/usr/bin/python3
from requests import get
from lxml import etree
from queue import Queue
from threading import Thread
from time import sleep
from fake_useragent import UserAgent
import threading

count = 1
q_image = Queue()
q_url = Queue()
q_write = Queue()
t_list = []
t_count = 8
w_list = []
ua = UserAgent()


def write_image():
    num = 0
    w_count = 1
    while True:
        if num >= 15:
            break
        if q_write.empty():
            num += 1
            sleep(1)
            print(threading.current_thread().getName() + " sleep " + str(num) + "s")
            continue
        num = 0
        res = q_write.get()
        print("Current writing[" + str(w_count) + "]: " + str(w_count) + ".jpg")
        with open("./images/" + str(w_count) + ".jpg", "wb") as f:
            f.write(res)
        w_count += 1


def download_image():
    global count
    num = 0
    #	print(threading.current_thread().getName() + ": " + str(threading.current_thread().is_alive()))i
    while True:
        if num >= 15:
            break
        if q_image.empty():
            num += 1
            sleep(1)
            print(threading.current_thread().getName() + " sleep " + str(num) + "s")
            continue
        num = 0
        img = q_image.get()
        if "http" in img:
            response = get(headers={"UserAgent": ua.random}, url=img, stream=True)
            if response.status_code == 200:
                q_write.put(response.content)
                print("Current download[" + str(count) + "]: " + img + "...")
            else:
                continue
        else:
            response = get(headers={"UserAgent": ua.random}, url="https://pic.netbian.com" + img, stream=True)
            if response.status_code == 200:
                q_write.put(response.content)
                print("Current download[" + str(count) + "]: " + "https://pic.netbian.com" + img + "...")
            else:
                continue
        count += 1
    print(threading.current_thread().name + " Exiting...")


def image_parse(url):
    if "http" in url:
        target = get(headers={"UserAgent": ua.random}, url=url)
        # print(etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src"))
        image = etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src")[0]
        q_image.put(image)
    else:
        target = get(headers={"UserAgent": ua.random}, url="https://pic.netbian.com" + url)
        # print(etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src"))
        image = etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src")[0]
        q_image.put(image)


def url_parse():
    while not q_url.empty():
        u_list = q_url.get()
        for l in u_list:
            if "index" not in l:
                image_parse(l)


if __name__ == "__main__":
    for i in range(1, 115):

        if i == 1:
            index = get(headers={"UserAgent": ua.random}, url="https://pic.netbian.com/4kdongman/index.html").text
        else:
            index = get(headers={"UserAgent": ua.random},
                        url="https://pic.netbian.com/4kdongman/index_" + str(i) + ".html").text
        root = etree.HTML(index)
        url_list = root.xpath("//*[@id=\"main\"]/div[3]/ul/li/a//@href")
        print("put url list: " + str(url_list))
        q_url.put(url_list)

    for j in range(0, t_count):
        s = Thread(target=url_parse, name="url_parse_thread_" + str(j))
        t = Thread(target=download_image, name="download_image_thread_" + str(j))
        t_list.append(s)
        t_list.append(t)
        s.start()
        t.start()
        print(t.getName() + "starting...")
        print(s.getName() + "starting...")
    for j in range(0, t_count)
    	w = Thread(target=write_image, name="write_image")
		w_list.append(w)
    	w.start()
    print(w.getName() + "starting...")

正确示例:

#!/usr/bin/python3
from requests import get
from lxml import etree
from queue import Queue
from threading import Thread
from time import sleep
from fake_useragent import UserAgent
import threading

count = 1
q_image = Queue()
q_url = Queue()
q_write = Queue()
t_list = []
t_count = 8
ua = UserAgent()


def write_image():
    num = 0
    w_count = 1
    while True:
        if num >= 15:
            break
        if q_write.empty():
            num += 1
            sleep(1)
            print(threading.current_thread().getName() + " sleep " + str(num) + "s")
            continue
        num = 0
        res = q_write.get()
        print("Current writing[" + str(w_count) + "]: " + str(w_count) + ".jpg")
        with open("./images/" + str(w_count) + ".jpg", "wb") as f:
            f.write(res)
        w_count += 1


def download_image():
    global count
    num = 0
    #	print(threading.current_thread().getName() + ": " + str(threading.current_thread().is_alive()))i
    while True:
        if num >= 15:
            break
        if q_image.empty():
            num += 1
            sleep(1)
            print(threading.current_thread().getName() + " sleep " + str(num) + "s")
            continue
        num = 0
        img = q_image.get()
        if "http" in img:
            response = get(headers={"UserAgent": ua.random}, url=img, stream=True)
            if response.status_code == 200:
                q_write.put(response.content)
                print("Current download[" + str(count) + "]: " + img + "...")
            else:
                continue
        else:
            response = get(headers={"UserAgent": ua.random}, url="https://pic.netbian.com" + img, stream=True)
            if response.status_code == 200:
                q_write.put(response.content)
                print("Current download[" + str(count) + "]: " + "https://pic.netbian.com" + img + "...")
            else:
                continue
        count += 1
    print(threading.current_thread().name + " Exiting...")


def image_parse(url):
    if "http" in url:
        target = get(headers={"UserAgent": ua.random}, url=url)
        # print(etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src"))
        image = etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src")[0]
        q_image.put(image)
    else:
        target = get(headers={"UserAgent": ua.random}, url="https://pic.netbian.com" + url)
        # print(etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src"))
        image = etree.HTML(target.text).xpath("//*[@id=\"img\"]/img/@src")[0]
        q_image.put(image)


def url_parse():
    while not q_url.empty():
        u_list = q_url.get()
        for l in u_list:
            if "index" not in l:
                image_parse(l)


if __name__ == "__main__":
    for i in range(1, 115):

        if i == 1:
            index = get(headers={"UserAgent": ua.random}, url="https://pic.netbian.com/4kdongman/index.html").text
        else:
            index = get(headers={"UserAgent": ua.random},
                        url="https://pic.netbian.com/4kdongman/index_" + str(i) + ".html").text
        root = etree.HTML(index)
        url_list = root.xpath("//*[@id=\"main\"]/div[3]/ul/li/a//@href")
        print("put url list: " + str(url_list))
        q_url.put(url_list)

    for j in range(0, t_count):
        s = Thread(target=url_parse, name="url_parse_thread_" + str(j))
        t = Thread(target=download_image, name="download_image_thread_" + str(j))
        t_list.append(s)
        t_list.append(t)
        s.start()
        t.start()
        print(t.getName() + "starting...")
        print(s.getName() + "starting...")
    w = Thread(target=write_image, name="write_image")
    w.start()
    print(w.getName() + "starting...")

三、多线程爬取提示“TCP链接池超出最大重试次数”

可以通过关闭长连接Connection: close设置header后即可,也可以使用随机UA来解决这个问题。

0

评论区