
除了之前提到的标准库方案和 aiohttp(异步库),还有多个 第三方库 可优雅处理多线程/并发请求的结果顺序问题,核心优势是 封装完善、无需手动处理线程安全/排序逻辑,同时兼顾并发效率。以下是 4 个高频实用库,附具体实现和场景适配:
grequests(requests + gevent,协程并发+有序结果)requests(兼容其所有 API)和 gevent(协程库),本质是“协程并发”(非多线程,但效率高于多线程,无 GIL 限制);grequests.map() 实现);requests 代码,即可实现高并发。pip install grequestsimport grequests
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5
def request_api(index: int) -> tuple:
"""单个请求函数:返回(索引,是否成功,结果信息)"""
url = API_URL.format(index % 10 + 1)
try:
# 用 grequests.get 替代 requests.get,支持协程并发
response = grequests.get(url, timeout=TIMEOUT)
response.send() # 发送请求(非阻塞)
response = response.response # 获取响应对象
response.raise_for_status()
return (index, True, f"响应:{response.json()['title'][:20]}...")
except Exception as e:
return (index, False, f"失败:{str(e)[:30]}")
if __name__ == "__main__":
start_time = time.time()
# 1. 构造请求列表(按顺序提交)
requests_list = [grequests.get(API_URL.format(i % 10 + 1), timeout=TIMEOUT) for i in range(TOTAL_REQUESTS)]
# 2. 并发执行,map 方法保证结果顺序与请求顺序一致
responses = grequests.map(requests_list, size=10) # size=并发数(类似线程池大小)
# 3. 按顺序处理结果(responses 顺序 = 请求提交顺序)
print("grequests 协程并发 - 按请求顺序输出:")
for idx, response in enumerate(responses):
if response and response.status_code == 200:
msg = f"响应:{response.json()['title'][:20]}..."
print(f"任务[{idx}]:✅ {msg}")
else:
err_msg = response.reason if response else "请求超时/失败"
print(f"任务[{idx}]:❌ 失败:{err_msg[:30]}")
total_cost = round(time.time() - start_time, 3)
print(f"\n总耗时:{total_cost}s")requests)、并发效率高(协程无线程切换开销)、天然有序;httpx(新一代 HTTP 客户端,支持同步/异步+有序结果)requests;asyncio,并发效率高,且 asyncio.gather() 天然保证结果顺序;requests。pip install httpximport httpx
import asyncio
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5
CONCURRENT_NUM = 10 # 并发数
async def request_api(client: httpx.AsyncClient, index: int) -> tuple:
"""异步请求函数"""
url = API_URL.format(index % 10 + 1)
try:
response = await client.get(url, timeout=TIMEOUT)
response.raise_for_status()
data = response.json()
return (index, True, f"响应:{data['title'][:20]}...")
except Exception as e:
return (index, False, f"失败:{str(e)[:30]}")
async def main():
start_time = time.time()
# 1. 创建异步客户端(支持连接池复用)
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=CONCURRENT_NUM)) as client:
# 2. 构造异步任务列表(按顺序提交)
tasks = [request_api(client, i) for i in range(TOTAL_REQUESTS)]
# 3. 并发执行,gather 保证结果顺序与任务顺序一致
results = await asyncio.gather(*tasks)
# 4. 按顺序输出结果
print("httpx 异步并发 - 按请求顺序输出:")
for idx, is_success, msg in results:
print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n总耗时:{total_cost}s")
if __name__ == "__main__":
# 兼容 Windows 系统
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())requests 且追求更高性能,同时要求结果有序的并发请求。tenacity(重试+结合多线程/异步,保证有序结果)requests、aiohttp、ThreadPoolExecutor 结合;pip install tenacityimport requests
from concurrent.futures import ThreadPoolExecutor
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
# 配置重试策略:失败最多重试 2 次,每次间隔 1s、2s(指数退避)
@retry(
stop=stop_after_attempt(2), # 最多重试 2 次
wait=wait_exponential(multiplier=1, min=1, max=5), # 重试间隔:1s、2s、4s...
retry=retry_if_exception_type((requests.exceptions.Timeout, requests.exceptions.ConnectionError)) # 仅对超时/连接错误重试
)
def request_api(index: int) -> tuple:
"""单个请求函数(带重试)"""
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
return (index, True, f"响应:{response.json()['title'][:20]}...")
except Exception as e:
return (index, False, f"失败:{str(e)[:30]}")
if __name__ == "__main__":
start_time = time.time()
# 结合 ThreadPoolExecutor 有序收集结果
with ThreadPoolExecutor(max_workers=THREAD_NUM) as executor:
future_list = [executor.submit(request_api, i) for i in range(TOTAL_REQUESTS)]
print("ThreadPoolExecutor + tenacity 重试 - 按请求顺序输出:")
for future in future_list: # 按提交顺序获取结果(有序)
idx, is_success, msg = future.result()
print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n总耗时:{total_cost}s")trio(异步并发库,更简洁的有序结果处理)asyncio 的替代方案,专注“结构化并发”,API 更简洁、错误处理更优雅;pip install trio httpx-trio # httpx-trio 是 httpx 的 trio 适配库import trio
import httpx
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5
CONCURRENT_NUM = 10 # 最大并发数
async def request_api(client: httpx.AsyncClient, index: int) -> tuple:
"""异步请求函数"""
url = API_URL.format(index % 10 + 1)
try:
response = await client.get(url, timeout=TIMEOUT)
response.raise_for_status()
data = response.json()
return (index, True, f"响应:{data['title'][:20]}...")
except Exception as e:
return (index, False, f"失败:{str(e)[:30]}")
async def main():
start_time = time.time()
# 1. 创建异步客户端(结合 trio)
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=CONCURRENT_NUM)) as client:
# 2. 构造任务列表(按顺序),用 trio.gather 并发执行并保证顺序
results = await trio.gather(*[request_api(client, i) for i in range(TOTAL_REQUESTS)])
# 3. 按顺序输出结果
print("trio + httpx 异步并发 - 按请求顺序输出:")
for idx, is_success, msg in results:
print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n总耗时:{total_cost}s")
if __name__ == "__main__":
trio.run(main()) # trio 自带事件循环,无需手动启动库名 | 核心机制 | 关键优势 | 适用场景 |
|---|---|---|---|
grequests | 协程(gevent) | 兼容 requests、语法极简、并发效率高 | 快速替换 requests 实现高并发+有序结果 |
httpx | 异步(asyncio) | 支持同步/异步、HTTP/2、连接池优化 | 替代 requests 且追求高性能+有序结果 |
tenacity | 重试机制 | 专注失败重试、可与任意并发方案结合 | 接口不稳定,需要重试+有序结果 |
trio | 结构化并发 | 代码简洁、错误处理优雅 | 异步编程爱好者,追求简洁性+有序结果 |
grequests、httpx、trio 均基于协程/异步,效率高于多线程(无 GIL 限制、无线程切换开销),是接口请求的最优解;grequests(最快上手);httpx(支持 HTTP/2);tenacity+任意并发库(重试保障);trio;ThreadPoolExecutor、asyncio),复杂场景再引入第三方库。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。