在当今大数据时代,网络爬虫已成为数据采集的重要手段。然而,高并发爬虫在提升抓取效率的同时,也可能对目标服务器造成过大压力,甚至触发反爬机制(如IP封禁、验证码等)。因此,合理的限流策略(Rate Limiting)是爬虫开发中不可或缺的一环。
Python的aiohttp
库作为异步HTTP客户端,能够高效地处理高并发请求。本文将介绍如何在aiohttp
爬虫中实现请求限流,包括:
我们不仅会讲解算法原理,还会提供完整的代码实现,帮助开发者构建更稳定、高效的爬虫系统。
限流方式 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
固定窗口 | 简单限流 | 实现简单 | 存在临界问题 |
滑动窗口 | 精准限流 | 平滑控制 | 计算稍复杂 |
令牌桶 | 突发流量 | 允许短时突发 | 需维护令牌池 |
漏桶 | 恒定速率 | 稳定输出 | 无法应对突发 |
接下来,我们使用aiohttp
实现这些限流策略。
我们先构建一个简单的aiohttp
爬虫,后续再逐步加入限流逻辑。
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["https://example.com"] * 100 # 模拟100个请求
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
这个爬虫会同时发起100个请求,可能触发反爬机制。接下来我们加入限流。
固定窗口限流是指在固定时间窗口(如1秒)内限制请求数量。
from datetime import datetime
class FixedWindowLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.window_start = datetime.now()
self.request_count = 0
async def wait(self):
now = datetime.now()
elapsed = (now - self.window_start).total_seconds()
if elapsed > self.window_seconds:
self.window_start = now # 重置窗口
self.request_count = 0
if self.request_count >= self.max_requests:
# 计算剩余时间
remaining = self.window_seconds - elapsed
await asyncio.sleep(remaining)
self.window_start = datetime.now()
self.request_count = 0
self.request_count += 1
# 使用示例
async def fetch_with_limiter(session, url, limiter):
await limiter.wait() # 等待限流
return await fetch(session, url)
async def main():
limiter = FixedWindowLimiter(max_requests=10, window_seconds=1) # 每秒10次
urls = ["https://example.com"] * 100
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limiter(session, url, limiter) for url in urls]
await asyncio.gather(*tasks)
优点:简单易实现。 缺点:窗口切换时可能出现临界问题(如1.9秒和2.1秒各发10次,实际2秒内发了20次)。
滑动窗口通过动态计算最近N秒的请求数,更精准地控制流量。
from collections import deque
class SlidingWindowLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.request_times = deque()
async def wait(self):
now = datetime.now()
# 移除超出窗口的请求记录
while self.request_times and (now - self.request_times[0]).total_seconds() > self.window_seconds:
self.request_times.popleft()
if len(self.request_times) >= self.max_requests:
# 计算最早请求的剩余时间
oldest_request_time = self.request_times[0]
elapsed = (now - oldest_request_time).total_seconds()
remaining = self.window_seconds - elapsed
await asyncio.sleep(remaining)
# 递归检查
await self.wait()
self.request_times.append(now)
# 使用方式与FixedWindowLimiter相同
优点:避免临界问题,流量控制更平滑。 缺点:需维护请求队列,内存占用稍高。
令牌桶允许短时突发流量,适用于爬虫需要偶尔加速的场景。
class TokenBucketLimiter:
def __init__(self, tokens_per_second, max_tokens):
self.tokens_per_second = tokens_per_second
self.max_tokens = max_tokens
self.tokens = max_tokens
self.last_refill = datetime.now()
async def wait(self):
now = datetime.now()
elapsed = (now - self.last_refill).total_seconds()
new_tokens = elapsed * self.tokens_per_second
self.tokens = min(self.tokens + new_tokens, self.max_tokens)
self.last_refill = now
if self.tokens < 1:
# 计算需要等待的时间
deficit = 1 - self.tokens
wait_time = deficit / self.tokens_per_second
await asyncio.sleep(wait_time)
await self.wait() # 递归检查
else:
self.tokens -= 1
# 示例:每秒10个令牌,桶容量20
limiter = TokenBucketLimiter(tokens_per_second=10, max_tokens=20)
优点:允许短时突发(如爬取突发新闻)。 缺点:需动态计算令牌补充。
漏桶算法强制恒定速率,适用于需要稳定输出的场景
class LeakyBucketLimiter:
def __init__(self, rate_per_second, capacity):
self.rate_per_second = rate_per_second
self.capacity = capacity
self.tokens = 0
self.last_leak = datetime.now()
async def wait(self):
now = datetime.now()
elapsed = (now - self.last_leak).total_seconds()
leaked_tokens = elapsed * self.rate_per_second
self.tokens = max(self.tokens - leaked_tokens, 0)
self.last_leak = now
if self.tokens >= self.capacity:
# 计算需要等待的时间
excess = self.tokens - self.capacity + 1
wait_time = excess / self.rate_per_second
await asyncio.sleep(wait_time)
await self.wait()
else:
self.tokens += 1
# 示例:每秒处理5个请求,桶容量10
limiter = LeakyBucketLimiter(rate_per_second=5, capacity=10)
优点:输出速率恒定,防止突发流量。 缺点:无法应对短时高并发需求。
场景 | 推荐策略 |
---|---|
简单限流 | 固定窗口 |
精准控制 | 滑动窗口 |
允许突发 | 令牌桶 |
恒定速率 | 漏桶 |
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。