沉醉未知,唤醒感官:一场“日日夜夜”的极致触感之旅

核心内容摘要

色伊人
老司机漫画不止于车,更在于“道”的幽默解读

镜子里的陌生人:当这种“消耗”透支了你的最后一格电

在网络爬虫开发中生产者 - 消费者模型是经典且高效的架构模式。

它将 “任务生产URL 采集” 和 “任务消费页面爬取” 解耦能有效控制并发、避免资源浪费。

而 Python 的asyncio异步 I/O结合asyncio.Queue可以打造出高性能的异步爬虫相比传统多线程 / 多进程爬虫异步模型在 IO 密集型的爬虫场景下效率提升显著。

核心原理

生产者 - 消费者模型生产者负责生成待爬取的 URL将其放入队列中是爬虫的 “任务源”消费者从队列中取出 URL执行页面请求、数据解析等操作是爬虫的 “执行端”队列asyncio.Queue作为生产者和消费者之间的缓冲解耦两者的执行节奏控制并发数量避免瞬间请求量过大导致的被封 IP 或服务器压力过高。

asyncio 的优势asyncio是 Python 内置的异步编程框架基于事件循环实现无需创建大量线程 / 进程仅通过单线程的协程切换就能处理大量 IO 操作如网络请求资源开销远低于多线程适合爬虫这类高 IO 场景。

完整实现代码以下是一个可直接运行的异步爬虫示例以爬取某测试站点的页面标题为例完整实现生产者 - 消费者模型python运行import asyncio import aiohttp from aiohttp import ClientTimeout from typing import List # 全局配置 MAX_CONCURRENT 5 # 最大并发消费者数量 QUEUE_MAXSIZE 10 # 队列最大容量控制缓冲大小 BASE_URL https://httpbin.org/get?page{} # 测试URL PAGE_RANGE range(1,

# 待爬取的页面范围

页 class AsyncCrawler: def __init__(self): # 初始化异步队列设置最大容量 self.queue asyncio.Queue(maxsizeQUEUE_MAXSIZE) # 存储爬取结果 self.results [] async def producer(self, urls: List[str]): 生产者将待爬取的URL放入队列 for url in urls: await self.queue.put(url) print(f生产者已放入URL - {url}) # 放入结束标记数量等于消费者数量通知消费者退出 for _ in range(MAX_CONCURRENT): await self.queue.put(None) async def consumer(self, session: aiohttp.ClientSession, consumer_id: int): 消费者从队列取出URL并爬取 while True: # 从队列获取URL异步阻塞直到有数据 url await self.queue.get() # 检测结束标记 if url is None: print(f消费者{consumer_id}收到结束标记退出) self.queue.task_done() break try: # 异步请求页面 async with session.get(url, timeoutClientTimeout(total

) as response: if response.status 200: # 解析响应数据此处仅示例可替换为实际解析逻辑 data await response.json() page data[args][page] self.results.append(f页面{page}爬取成功) print(f消费者{consumer_id}成功爬取 - {url}) else: print(f消费者{consumer_id}爬取失败状态码 - {response.status}) except Exception as e: print(f消费者{consumer_id}爬取异常 - {url}错误{str(e)}) finally: # 标记任务完成用于队列的join()方法 self.queue.task_done() async def run(self): 启动爬虫主流程 #

生成待爬取的URL列表 urls [BASE_URL.format(page) for page in PAGE_RANGE] #

创建异步HTTP会话复用连接提升性能 async with aiohttp.ClientSession() as session: #

创建并启动生产者任务 producer_task asyncio.create_task(self.producer(urls)) #

创建并启动多个消费者任务 consumer_tasks [] for i in range(MAX_CONCURRENT): task asyncio.create_task(self.consumer(session, i

) consumer_tasks.append(task) #

等待生产者任务完成 await producer_task #

等待队列中所有任务处理完成 await self.queue.join() #

等待所有消费者任务退出 await asyncio.gather(*consumer_tasks) #

输出爬取结果 print(\n 爬取完成 ) for res in self.results: print(res) print(f总计成功爬取{len(self.results)} 条数据) if __name__ __main__: # 运行异步爬虫 crawler AsyncCrawler() asyncio.run(crawler.run())

代码核心解析

队列初始化self.queue asyncio.Queue(maxsizeQUEUE_MAXSIZE)初始化异步队列maxsize限制队列最大容量避免生产者生产过快导致内存溢出。

生产者逻辑遍历 URL 列表通过await self.queue.put(url)将 URL 放入队列生产完成后放入与消费者数量相同的None作为结束标记确保每个消费者都能收到退出信号。

消费者逻辑循环从队列取 URLurl await self.queue.get()异步阻塞无数据时等待检测到None时退出循环完成消费者任务使用aiohttp.ClientSession发起异步 HTTP 请求复用连接池提升效率无论爬取成功 / 失败最终调用self.queue.task_done()标记任务完成供队列join()方法检测。

主流程控制asyncio.create_task()创建协程任务实现生产者和多个消费者的并发执行await self.queue.join()阻塞直到队列中所有任务都被task_done()标记确保所有 URL 都被处理asyncio.gather(*consumer_tasks)等待所有消费者任务正常退出。

关键优化点连接复用使用aiohttp.ClientSession而非每次请求创建新会话减少 TCP 连接建立开销并发控制通过MAX_CONCURRENT限制消费者数量避免并发过高触发反爬异常处理捕获请求过程中的异常避免单个 URL 爬取失败导致消费者退出队列缓冲设置QUEUE_MAXSIZE当队列满时生产者会阻塞实现生产消费的速率匹配。

适用场景与扩展

适用场景高并发爬取大量网页如电商商品页、新闻列表页需控制爬取速率避免目标网站反爬IO 密集型爬取任务大部分时间消耗在网络请求上。

扩展方向去重机制添加 URL 去重如使用集合避免重复爬取重试机制对失败的 URL 进行重试提升爬取成功率数据存储将爬取结果写入数据库如 MySQL、MongoDB需使用异步数据库驱动代理池集成添加异步代理池随机切换代理 IP避免被封任务持久化将队列中的 URL 持久化到文件 / 数据库实现断点续爬。

总结asyncio Queue实现的生产者 - 消费者爬虫核心是通过异步队列解耦生产URL 生成和消费页面爬取利用异步 IO 提升爬取效率关键控制点包括并发数限制、队列容量限制、异常处理、结束标记传递该模型兼具高性能和可控性是异步爬虫开发的经典架构可根据实际需求扩展去重、重试、代理等功能。

相比传统同步爬虫异步生产者 - 消费者模型在处理大量 IO 密集型任务时能以更低的资源开销实现更高的并发量是 Python 爬虫开发中值得掌握的核心技术。

9.1隐藏网站官方版-9.1隐藏网站官方版应用

百度百家号客服电话人工服务

123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123 123