
核心价值:从「基础概念」→「GIL 核心机制」→「多线程 / 多进程实践」→「协程底层原理」→「生产级选型」全链路覆盖,解决CPU 密集型 / IO 密集型任务选型、GIL 困惑、异步代码编写三大核心问题。
维度 | 同步编程 | 异步编程 |
|---|---|---|
执行方式 | 任务顺序执行,前一个任务完成后再执行下一个 | 任务并行发起,无需等待前一个任务完成,通过回调 / 事件循环接收结果 |
阻塞特性 | 任务执行时会阻塞当前线程 | 任务执行时不会阻塞当前线程 |
适用场景 | 逻辑简单、依赖顺序的任务 | IO 密集型任务(网络请求、文件读写) |
实现复杂度 | 低,顺序执行易调试 | 高,需事件循环 / 协程调度 |
代码示例 | result = requests.get(url) | result = await aiohttp.get(url) |
代码验证:同步 VS 异步的执行流程
# 同步代码:顺序执行,耗时≈3s
import time
import requests
def sync_task():
urls = [f"https://httpbin.org/get?delay=1" for _ in range(3)]
start = time.time()
for url in urls:
requests.get(url) # 阻塞等待
print(f"同步耗时:{time.time()-start:.2f}s") # ≈3s
sync_task()
# 异步代码:并行执行,耗时≈1s
import asyncio
import aiohttp
async def async_task():
urls = [f"https://httpbin.org/get?delay=1" for _ in range(3)]
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
await asyncio.gather(*tasks) # 并行发起
print(f"异步耗时:{time.time()-start:.2f}s") # ≈1s
asyncio.run(async_task())维度 | 并发 | 并行 |
|---|---|---|
核心定义 | 单核 CPU 通过时间分片交替执行多个任务,表面并行 | 多核 CPU 同时执行多个任务,真正并行 |
硬件依赖 | 无需多核,单核即可 | 必须多核 CPU |
调度方式 | 操作系统时间片调度 | CPU 物理核心并行 |
典型场景 | 高并发 web 服务 | 科学计算、视频渲染等 CPU 密集型任务 |
Python 实现 | 多线程、协程 | 多进程 |
维度 | 阻塞 | 非阻塞 |
|---|---|---|
执行状态 | 任务执行时,线程 / 进程被挂起,无法执行其他任务 | 任务执行时,立即返回状态,线程 / 进程可执行其他任务 |
结果获取 | 直接返回最终结果 | 返回临时结果 / 状态,需轮询或回调获取最终结果 |
系统资源 | 利用率低 | 利用率高 |
Python 实现 | time.sleep()、requests.get() | select/poll/epoll、asyncio |
GIL(Global Interpreter Lock) 是 Python 解释器(CPython)的全局解释器锁,同一时间仅允许一个线程执行 Python 字节码,无论 CPU 核心数量多少。
任务类型 | GIL 影响 | 实际性能 |
|---|---|---|
IO 密集型 | 影响小,因为 IO 等待时线程会释放 GIL | 优于单线程,接近并行 |
CPU 密集型 | 影响大,因为线程需竞争 GIL,切换开销大 | 甚至不如单线程 |
代码验证:GIL 对 CPU 密集型任务的影响
# 单线程CPU密集型任务
import time
def cpu_bound(n):
return sum(i*i for i in range(n))
start = time.time()
result1 = cpu_bound(10**7)
result2 = cpu_bound(10**7)
print(f"单线程耗时:{time.time()-start:.2f}s") # ≈1.2s
# 多线程CPU密集型任务
import threading
start = time.time()
t1 = threading.Thread(target=cpu_bound, args=(10**7,))
t2 = threading.Thread(target=cpu_bound, args=(10**7,))
t1.start() # 启动线程
t2.start()
t1.join() # 阻塞主线程,等待t1完成
t2.join() # 等待t2完成
print(f"多线程耗时:{time.time()-start:.2f}s") # ≈1.1s(几乎没有提升,GIL竞争导致)Python 的threading模块提供了多线程的核心 API,核心类是Thread。
方法 | 功能 |
|---|---|
__init__(target, args, kwargs) | 初始化线程,指定目标函数、位置参数、关键字参数 |
start() | 启动线程,使其进入就绪状态,由操作系统调度执行(注意:不能重复调用) |
join(timeout=None) | 阻塞当前线程,直到被 join 的线程执行完毕;timeout为超时时间 |
is_alive() | 返回线程是否在运行 |
set_name()/get_name() | 设置 / 获取线程名称 |
daemon | 线程是否为守护线程(主线程结束时,守护线程自动终止) |
start () 的执行流程:
start()后,线程进入就绪状态,等待操作系统调度;target函数;join () 的执行流程:
thread.join()后,进入阻塞状态;代码验证:start 和 join 的作用
import threading
import time
def task():
time.sleep(1)
print(f"线程{threading.current_thread().name}执行完成")
# 测试1:仅start,不join
print("测试1:仅start,不join")
t1 = threading.Thread(target=task, name="Thread-1")
t1.start()
print("主线程执行完成") # 主线程会先于子线程结束,子线程仍继续执行
time.sleep(2) # 等待子线程完成
# 测试2:start+join
print("\n测试2:start+join")
t2 = threading.Thread(target=task, name="Thread-2")
t2.start()
t2.join() # 主线程等待t2完成
print("主线程执行完成") # 主线程在子线程完成后执行由于多线程共享全局内存,会出现线程安全问题。threading模块提供了多种锁机制:
Lock:互斥锁,同一时间仅允许一个线程获取RLock:递归锁,允许同一线程多次获取Condition:条件变量,用于线程间通信Semaphore:信号量,限制同时运行的线程数Event:事件,用于线程间通知代码验证:线程安全问题与 Lock 的解决
# 线程不安全示例:全局变量累加
total = 0
def increment():
global total
for _ in range(10**5):
total += 1 # 非原子操作,会被中断
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"线程不安全的结果:{total}") # 预期200000,实际≈150000(结果随机)
# 线程安全示例:用Lock
total = 0
lock = threading.Lock()
def increment_safe():
global total
with lock: # 自动获取/释放锁
for _ in range(10**5):
total += 1
t1 = threading.Thread(target=increment_safe)
t2 = threading.Thread(target=increment_safe)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"线程安全的结果:{total}") # 200000(正确)多进程通过创建多个独立的 Python 解释器实例规避 GIL,每个进程拥有自己的内存空间和 GIL,真正实现多核并行。
multiprocessing模块的 API 与threading类似,核心类是Process,支持:
方法 | 功能 | 与 Thread 的区别 |
|---|---|---|
__init__(target, args, kwargs) | 初始化进程 | 进程的内存空间独立 |
start() | 启动进程 | 会创建新的 Python 解释器 |
join(timeout=None) | 等待进程完成 | 进程间通信需特殊机制 |
is_alive() | 检查进程是否运行 | 进程的创建销毁开销大 |
daemon | 守护进程 | 主线程结束时,守护进程自动终止 |
代码验证:多进程解决 CPU 密集型任务
import multiprocessing
import time
def cpu_bound(n):
return sum(i*i for i in range(n))
# 多进程CPU密集型任务
start = time.time()
p1 = multiprocessing.Process(target=cpu_bound, args=(10**7,))
p2 = multiprocessing.Process(target=cpu_bound, args=(10**7,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"多进程耗时:{time.time()-start:.2f}s") # ≈0.6s(几乎是单线程的1/2,多核并行)由于进程的内存空间独立,需通过以下机制通信:
Queue:基于管道的消息队列,线程 / 进程安全Pipe:双向管道,适合两个进程通信Manager:共享内存,支持字典、列表等复杂类型代码验证:用 Queue 实现进程间通信
import multiprocessing
def producer(q):
for item in [1, 2, 3, 4, 5]:
q.put(item) # 放入队列
print(f"生产者放入:{item}")
def consumer(q):
while True:
item = q.get() # 从队列取出,阻塞直到有数据
if item is None: # 停止信号
break
print(f"消费者取出:{item}")
# 创建队列
q = multiprocessing.Queue()
# 创建进程
p_producer = multiprocessing.Process(target=producer, args=(q,))
p_consumer = multiprocessing.Process(target=consumer, args=(q,))
# 启动进程
p_producer.start()
p_consumer.start()
# 等待生产者完成
p_producer.join()
# 发送停止信号
q.put(None)
# 等待消费者完成
p_consumer.join()对于大量短期进程,用Pool可以避免频繁创建销毁进程的开销。Pool的核心方法:
apply():同步执行,阻塞apply_async():异步执行,非阻塞map():批量执行,自动分块close():关闭池,不再接受新任务join():等待所有任务完成代码验证:用 Pool 处理大量任务
import multiprocessing
import time
def task(n):
return f"任务{n}完成"
# 创建进程池(默认数量为CPU核心数)
with multiprocessing.Pool() as pool:
# 批量执行10个任务
results = pool.map(task, range(10))
for result in results:
print(result)阶段 | 实现方式 | 特点 |
|---|---|---|
Python 2.5 | yield/yield from | 基于生成器,手动调度 |
Python 3.3 | asyncio模块 | 引入事件循环,自动调度 |
Python 3.5 | async def/await | 语法糖,更简洁的异步编程 |
协程是用户态的轻量级线程,由程序员手动调度,无需操作系统干预,切换开销极小(≈纳秒级)。
核心机制:
select/poll/epoll实现 IO 等待时的协程切换。组件 | 功能 |
|---|---|
async def | 定义协程函数 |
await | 等待协程 / 可等待对象(如 Task、Future) |
asyncio.run() | 运行协程的入口 |
asyncio.gather() | 并行执行多个协程 |
asyncio.Task() | 将协程包装为 Task 对象,立即调度 |
asyncio.sleep() | 非阻塞睡眠(替代 time.sleep ()) |
代码验证:async/await 的基本使用
import asyncio
import time
async def coro1():
await asyncio.sleep(1)
return "Coro1完成"
async def coro2():
await asyncio.sleep(2)
return "Coro2完成"
async def main():
start = time.time()
# 并行执行
result1, result2 = await asyncio.gather(coro1(), coro2())
print(f"协程1结果:{result1}")
print(f"协程2结果:{result2}")
print(f"总耗时:{time.time()-start:.2f}s") # ≈2s(并行)
asyncio.run(main())维度 | 协程 | 多线程 | 多进程 |
|---|---|---|---|
切换开销 | 极小(用户态) | 中等(内核态) | 极大(进程创建) |
内存占用 | 极低(≈几 KB) | 低(≈几 MB) | 高(≈几十 MB) |
GIL 影响 | 无,单线程内调度 | 有,CPU 密集型受影响 | 无,多核并行 |
适用场景 | IO 密集型任务 | IO 密集型任务 | CPU 密集型 / IO 密集型任务 |
实现难度 | 中等,需理解事件循环 | 低,API 简单 | 中等,需进程间通信 |
GIL 的核心逻辑:
CPU 核心数 | 任务类型 | 多线程性能提升 | 多进程性能提升 |
|---|---|---|---|
1 | CPU 密集型 | 0 | 0 |
2 | CPU 密集型 | ≈1.1x | ≈2x |
4 | CPU 密集型 | ≈1.2x | ≈4x |
任意 | IO 密集型 | ≈Nx(N 为任务数) | ≈Nx |
方案 | 适用场景 | 复杂度 |
|---|---|---|
多进程 | CPU 密集型任务 | 低 |
C 扩展 | 性能敏感的 CPU 密集型任务 | 高 |
PyPy 解释器 | 无需修改代码,默认无 GIL | 低 |
异步协程 | IO 密集型任务 | 中等 |
任务类型 | 推荐方案 | 理由 |
|---|---|---|
IO 密集型(高并发) | asyncio 协程 | 低内存、低开销、高并发 |
IO 密集型(低并发) | 多线程 | API 简单、无需异步改写 |
CPU 密集型 | 多进程 | 规避 GIL、多核并行 |
混合任务 | 多进程 + asyncio | 进程处理 CPU 密集型,协程处理 IO 密集型 |
asyncio.run()的debug=True参数;Python 的并发编程体系涵盖了同步 / 异步、线程 / 进程 / 协程等多种方式,核心在于根据任务类型选择合适的方案: