除了“无序收集+统一排序”的方案一,处理多线程请求接口结果顺序的核心思路是 “确保结果与请求提交顺序对齐”,以下是 4 种实用方案(含进阶优化和第三方库方案),覆盖不同场景需求,且均保证线程安全和并发效率:
提前创建一个与请求总数长度一致的结果列表,每个线程携带唯一的“请求索引”,执行完成后直接将结果写入列表的对应索引位置(如任务 5 的结果写入 results[5])。由于索引与提交顺序一一对应,所有线程完成后,列表自然是有序的。
None 的位置判断);import requests
import threading
import time
from typing import List
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
# 1. 初始化:固定长度的结果列表(与请求顺序对应)+ 互斥锁(保护列表写入)
results: List[tuple] = [None] * TOTAL_REQUESTS # 初始值为 None,完成后写入结果
lock = threading.Lock() # 线程安全:避免多线程同时修改同一列表位置
def request_api(index: int):
"""线程执行函数:按索引写入结果到固定位置"""
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
result = (index, True, f"响应:{response.json()['title'][:20]}...")
except Exception as e:
result = (index, False, f"失败:{str(e)[:30]}")
# 2. 加锁写入结果(仅锁定写入操作,不影响请求并发)
with lock:
results[index] = result # 关键:按请求索引写入对应位置
if __name__ == "__main__":
start_time = time.time()
threads = []
# 3. 创建并启动线程(控制线程池大小)
for i in range(TOTAL_REQUESTS):
# 限制同时运行的线程数,避免创建过多线程
if len(threads) >= THREAD_NUM:
# 等待任意线程完成后再创建新线程
threading.Thread.join(threading.Thread.wait(threads))
threads = [t for t in threads if t.is_alive()]
t = threading.Thread(target=request_api, args=(i,), name=f"Thread-{i}")
t.start()
threads.append(t)
# 4. 等待所有线程完成
for t in threads:
t.join()
# 5. 直接按列表顺序输出(已与提交顺序一致)
print("固定位置存储 - 按请求顺序输出:")
for idx, is_success, msg in results:
print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")
print(f"\n总耗时:{round(time.time() - start_time, 3)}s")用两个线程安全的队列:
queue.Queue 内部已实现锁),无需手动加锁;import requests
import threading
from queue import Queue
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
def worker(task_queue: Queue, result_queue: Queue):
"""工作线程:从任务队列取任务,执行后存入结果队列"""
while not task_queue.empty():
try:
index = task_queue.get(timeout=1) # 非阻塞取任务(超时1秒退出)
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
result = (index, True, f"响应:{response.json()['title'][:20]}...")
except Exception as e:
result = (index, False, f"失败:{str(e)[:30]}")
result_queue.put(result) # 存入结果队列(无序)
except Exception:
break
if __name__ == "__main__":
start_time = time.time()
# 1. 初始化队列
task_queue = Queue() # 按顺序存入请求索引(0~19)
result_queue = Queue() # 存储无序的结果
# 2. 提交任务(按顺序入队)
for i in range(TOTAL_REQUESTS):
task_queue.put(i)
# 3. 启动工作线程
threads = [threading.Thread(target=worker, args=(task_queue, result_queue)) for _ in range(THREAD_NUM)]
for t in threads:
t.start()
# 4. 主线程按顺序提取结果(流式输出)
print("队列流式处理 - 按请求顺序实时输出:")
expected_index = 0 # 期望的下一个任务索引(从0开始)
completed = 0 # 已完成的任务数
while completed < TOTAL_REQUESTS:
if not result_queue.empty():
index, is_success, msg = result_queue.get()
# 匹配期望索引则输出,否则放回队列
if index == expected_index:
print(f"任务[{index}]:{'✅' if is_success else '❌'} {msg}")
expected_index += 1
completed += 1
else:
result_queue.put((index, is_success, msg)) # 未匹配则放回
else:
time.sleep(0.01) # 避免空循环占用CPU
# 5. 等待所有线程结束
for t in threads:
t.join()
print(f"\n总耗时:{round(time.time() - start_time, 3)}s")concurrent.futures + 有序结果收集ThreadPoolExecutor 提交任务后会返回一个 Future 列表,该列表的顺序与提交顺序一致(即使任务完成顺序无序)。通过遍历 Future 列表(而非 as_completed),直接调用 future.result(),会按提交顺序阻塞等待每个任务完成,从而自然得到有序结果。
import requests
from concurrent.futures import ThreadPoolExecutor
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
def request_api(index: int) -> tuple:
"""单个请求函数:返回(索引,是否成功,结果信息)"""
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
return (index, True, f"响应:{response.json()['title'][:20]}...")
except Exception as e:
return (index, False, f"失败:{str(e)[:30]}")
if __name__ == "__main__":
start_time = time.time()
# 1. 提交任务并获取 Future 列表(顺序与提交一致)
with ThreadPoolExecutor(max_workers=THREAD_NUM) as executor:
future_list = [executor.submit(request_api, i) for i in range(TOTAL_REQUESTS)]
# 2. 按 Future 列表顺序获取结果(阻塞等待,顺序与提交一致)
print("ThreadPoolExecutor 有序收集 - 按请求顺序输出:")
for future in future_list:
index, is_success, msg = future.result() # 按提交顺序阻塞等待
print(f"任务[{index}]:{'✅' if is_success else '❌'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n总耗时:{total_cost}s")aiohttp(异步并发+有序结果)虽然是“异步”而非“多线程”,但 aiohttp 是 IO 密集型接口请求的更优选择(单线程异步并发,无 GIL 影响,效率更高),且天然支持有序结果——异步任务的提交顺序与结果返回顺序一致。
import aiohttp
import asyncio
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5
async def request_api(session: aiohttp.ClientSession, index: int) -> tuple:
"""异步请求函数"""
url = API_URL.format(index % 10 + 1)
try:
async with session.get(url, timeout=TIMEOUT) as response:
response.raise_for_status()
data = await response.json()
return (index, True, f"响应:{data['title'][:20]}...")
except Exception as e:
return (index, False, f"失败:{str(e)[:30]}")
async def main():
start_time = time.time()
# 1. 创建异步会话(复用连接,提升效率)
async with aiohttp.ClientSession() as session:
# 2. 创建所有异步任务(顺序与提交一致)
tasks = [request_api(session, i) for i in range(TOTAL_REQUESTS)]
# 3. 并发执行任务,按提交顺序获取结果
results = await asyncio.gather(*tasks) # gather 保证结果顺序与任务顺序一致
# 4. 输出结果(已有序)
print("aiohttp 异步并发 - 按请求顺序输出:")
for idx, is_success, msg in results:
print(f"任务[{idx}]:{'✅' if is_success else '❌'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n总耗时:{total_cost}s")
if __name__ == "__main__":
# 兼容 Windows 系统
if __name__ == "__main__":
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())pip install aiohttp方案 | 技术依赖 | 核心优势 | 适用场景 |
|---|---|---|---|
固定位置存储 | threading + lock | 效率最高、可实时查进度 | 高并发、需跟踪任务状态 |
队列流式处理 | threading + Queue | 流式输出、无需等待所有任务 | 实时展示进度、边请求边处理 |
ThreadPoolExecutor 有序收集 | concurrent.futures | 代码最简单、标准库支持 | 无需实时输出、按顺序处理结果 |
aiohttp 异步并发 | aiohttp + asyncio | 并发效率最高、无线程安全问题 | 高并发接口请求、爬取、压测 |
queue.Queue);aiohttp 异步并发效率高于多线程,且天然有序,是接口请求的最优解;ThreadPoolExecutor 有序收集(方案四),复杂场景用队列或固定位置存储,无需追求复杂逻辑。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。