最近一个 python 项目中同时用到了 gevent 和 multiprocessing。在优雅退出的实现上,出现了一些预料之外的问题。
一个简化版的代码,启动了4 个进程,每个进程里启动了两个协程,并注册了 SIGINT 等信号的回调函数来实现优雅退出:
import signal
import time
import multiprocessing
import gevent
from gevent import monkey
monkey.patch_all() # NOQA
class WorkerManager():
def __init__(self):
self.is_running = multiprocessing.Value('b', True)
def job(self):
while self.is_running.value:
print("job")
time.sleep(3)
def run(self):
for sig in [signal.SIGINT, signal.SIGUSR1, signal.SIGTERM]:
signal.signal(sig, signal.SIG_IGN)
jobs = [gevent.spawn(self.job) for _ in range(2)]
gevent.joinall(jobs)
def start(self):
self.workers = [multiprocessing.Process(
target=self.run) for _ in range(4)]
for worker in self.workers:
worker.start()
signal.signal(signal.SIGINT, self.graceful_exit)
def graceful_exit(self, sig, frame):
self.shutdown()
def shutdown(self):
if not self.is_running.value:
return
self.is_running.value = False
for worker in self.workers:
worker.join()
worker_manager = WorkerManager()
worker_manager.start()
上面的代码运行后,按ctrl+c会报下面的错误:
gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback
相关的调用栈
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
pid, sts = os.waitpid(self.pid, flag)
File ".../venv/lib/python3.7/site-packages/gevent/os.py", line 380, in waitpid
get_hub().wait(new_watcher)
File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
回到我们的代码里,我们用了gevent 的 monkey.patch_all()
,并且用到了 multiprocessing,而出错的调用栈中可以看到问题出在对子进程 join 时,这个 join 函数在 multiprocess 库里,调用了 os.waitpid
,这里就会调用 gevent 实现的 os,由于是个阻塞操作,就会在 switch_out 时出错。为什么出错呢?这是 gevent 里相关的代码:
class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
# Subclasses must define:
# - self.loop
# This class defines loop in its .pxd for Cython. This lets us avoid
# circular dependencies with the hub.
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
if switch_out is not None:
switch_out()
return _greenlet_switch(self) # pylint:disable=undefined-variable
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
因为我们的程序收到信号中断时,主进程里没有其他的 greenlet,主进程里也没有其它运行的东西,所以运行着的是 hub 本身这个 greenlet,它会在一个线程里运行。所以 switch_out 时会找之前在跑的 greenlet(getcurrent()这个代码),结果就是 hub 本身。
一般 switch_out 是用来从一个普通的 greenlet 切换到 hub 里的,现在从 hub 里无法再切换到其它地方了。所以就是‘BlockingSwitchOutError’ 错误了。
def graceful_exit(self, sig, frame):
gevent.spawn(self.shutdown)
但如果主进程了没别的在跑,可能不会等 shutdown 运行完。
ps:一不小心写成了 self.shutdown()
,后面加上了(),就和没改一样,所以报了一样的错了。
monkey.patch_all(os=False)
gevent.signal_handler(signal.SIGINT, self.graceful_exit, signal.SIGINT, None)
这种方法同样的可能不会等 shutdown 运行完。。
while True:
pass
def join(self):
for worker in self.worker_process:
worker.join()
worker.close()
def shutdown(self):
if not self.is_running.value:
return
self.is_running.value = False
...
worker_manager.join()