这个比较有意思,看个案例:
from multiprocessing import Process, Pipe
def test(w):
w.send("[子进程]老爸,老妈回来记得喊我一下~")
msg = w.recv()
print(msg)
def main():
r, w = Pipe()
p1 = Process(target=test, args=(w, ))
p1.start()
msg = r.recv()
print(msg)
r.send("[父进程]滚犊子,赶紧写作业,不然我得跪方便面!")
p1.join()
if __name__ == '__main__':
main()
结果:
老爸,老妈回来记得喊我一下~
滚犊子,赶紧写作业,不然我得跪方便面!
按照道理应该子进程自己写完自己读了,和上次讲得不一样啊?不急,先看看源码:
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def Pipe(self, duplex=True):
'''返回由管道连接的两个连接对象'''
from .connection import Pipe
return Pipe(duplex)
看看 connection.Pipe
方法的定义部分,是不是双向通信就看你是否设置 duplex=True
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/connection.py
if sys.platform != 'win32':
def Pipe(duplex=True):
'''返回管道两端的一对连接对象'''
if duplex:
# 双工内部其实是socket系列(下次讲)
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = Connection(s1.detach())
c2 = Connection(s2.detach())
else:
# 这部分就是我们上次讲的pipe管道
fd1, fd2 = os.pipe()
c1 = Connection(fd1, writable=False)
c2 = Connection(fd2, readable=False)
return c1, c2
else:
def Pipe(duplex=True):
# win平台的一系列处理
......
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
return c1, c2
通过源码知道了,原来双工是通过socket搞的啊~
再看个和原来一样效果的案例:(不用关来关去的了,方便!)
from multiprocessing import Process, Pipe
def test(w):
# 只能写
w.send("[子进程]老爸,咱们完了,老妈一直在门口~")
def main():
r, w = Pipe(duplex=False)
p1 = Process(target=test, args=(w, ))
p1.start() # 你把这个放在join前面就直接死锁了
msg = r.recv() # 只能读
print(msg)
p1.join()
if __name__ == '__main__':
main()
输出:(可以思考下为什么 start换个位置就死锁
,提示: 阻塞读写
)
[子进程]老爸,咱们完了,老妈一直在门口~
再举个 Pool
的例子,咱们就进入今天的重点了:
from multiprocessing import Pipe, Pool
def proc_test1(conn):
conn.send("[小明]小张,今天哥们要见一女孩,你陪我呗,我24h等你回复哦~")
msg = conn.recv()
print(msg)
def proc_test2(conn):
msg = conn.recv()
print(msg)
conn.send("[小张]不去,万一被我帅气的外表迷倒就坑了~")
def main():
conn1, conn2 = Pipe()
p = Pool()
p.apply_async(proc_test1, (conn1, ))
p.apply_async(proc_test2, (conn2, ))
p.close() # 关闭池,不再接收新任务
p.join() # 等待回收,必须先关才能join,不然会异常
if __name__ == '__main__':
main()
输出:
[小明]小张,今天哥们要见一女孩,你陪我呗,我24h等你回复哦~
[小张]不去,万一被我帅气的外表迷倒就坑了~
看看源码就理解了:看看Pool的join是啥情况?看源码:
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/pool.py
def join(self):
util.debug('joining pool')
if self._state == RUN:
# 没关闭就join,这边就会抛出一个异常
raise ValueError("Pool is still running")
elif self._state not in (CLOSE, TERMINATE):
raise ValueError("In unknown state")
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join() # 循环join回收
在pool的 __init__
的方法中,这几个属性:
self._processes = processes # 指定的进程数
self._pool = [] # 列表
self._repopulate_pool() # 给列表append内容的方法
将池进程的数量增加到指定的数量,join的时候会使用这个列表
def _repopulate_pool(self):
# 指定进程数-当前进程数,差几个补几个
for i in range(self._processes - len(self._pool)):
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer,
self._initargs, self._maxtasksperchild,
self._wrap_exception)
)
self._pool.append(w) # 重点来了
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True # pool退出后,通过pool创建的进程都会退出
w.start()
util.debug('added worker')
一步步的设局,从底层的的 pipe()
-> os.pipe
-> PIPE
,现在终于到 Queue
了,心酸啊,明知道上面两个项目
里面基本上不会用,但为了你们能看懂源码,说了这么久 %>_<%
其实以后当我们从 Queue
说到 MQ
和 RPC
之后,现在
讲得这些进程间通信( IPC
)也基本上不会用了,但本质你得清楚,我尽量多分析点源码,这样你们以后看开源项目压力会很小
欢迎批评指正~
from multiprocessing import Process, Queue
def test(q):
q.put("[子进程]老爸,我出去嗨了")
print(q.get())
def main():
q = Queue()
p = Process(target=test, args=(q, ))
p.start()
msg = q.get()
print(msg)
q.put("[父进程]去吧比卡丘~")
p.join()
if __name__ == '__main__':
main()
输出:( get
和 put
默认是阻塞等待的)
[子进程]老爸,我出去嗨了
[父进程]去吧比卡丘~
先看看 Queue
的初始化方法:(不指定大小就是最大队列数)
# 队列类型,使用PIPE,缓存,线程
class Queue(object):
# ctx = multiprocessing.get_context("xxx")
# 上下文总共3种:spawn、fork、forkserver(扩展部分会提一下)
def __init__(self, maxsize=0, *, ctx):
# 默认使用最大容量
if maxsize <= 0:
from .synchronize import SEM_VALUE_MAX as maxsize
self._maxsize = maxsize # 指定队列大小
# 创建了一个PIPE匿名管道(单向)
self._reader, self._writer = connection.Pipe(duplex=False)
# `multiprocessing/synchronize.py > Lock`
self._rlock = ctx.Lock() # 进程锁(读)【非递归】
self._opid = os.getpid() # 获取PID
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock() # 进程锁(写)【非递归】
# Semaphore信号量通常用于保护容量有限的资源
# 控制信号量,超了就异常
self._sem = ctx.BoundedSemaphore(maxsize)
# 不忽略PIPE管道破裂的错误
self._ignore_epipe = False
# 线程相关操作
self._after_fork()
# 向`_afterfork_registry`字典中注册
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
关于 get
和 put
是阻塞的问题,看下源码探探究竟:
q.get()
:收消息
def get(self, block=True, timeout=None):
# 默认情况是阻塞(lock加锁)
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release() # 信号量+1
else:
if block:
deadline = time.monotonic() + timeout
# 超时抛异常
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
# 不管有没有内容都去读,超时就抛异常
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
# 接收字节数据作为字节对象
res = self._recv_bytes()
self._sem.release() # 信号量+1
finally:
# 释放锁
self._rlock.release()
# 释放锁后,重新序列化数据
return _ForkingPickler.loads(res)
queue.put()
:发消息
def put(self, obj, block=True, timeout=None):
# 如果Queue已经关闭就抛异常
assert not self._closed, "Queue {0!r} has been closed".format(self)
# 记录信号量的锁
if not self._sem.acquire(block, timeout):
raise Full # 超过数量,抛个异常
# 条件变量允许一个或多个线程等待,直到另一个线程通知它们
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
非阻塞 get_nowait
和 put_nowait
本质其实也是调用了 get
和 put
方法:
def get_nowait(self):
return self.get(False)
def put_nowait(self, obj):
return self.put(obj, False)
说这么多不如来个例子看看:
from multiprocessing import Queue
def main():
q = Queue(3) # 只能 put 3条消息
q.put([1, 2, 3, 4]) # put一个List类型的消息
q.put({"a": 1, "b": 2}) # put一个Dict类型的消息
q.put({1, 2, 3, 4}) # put一个Set类型的消息
try:
# 不加timeout,就一直阻塞,等消息队列有空位才能发出去
q.put("再加条消息呗", timeout=2)
# Full(Exception)是空实现,你可以直接用Exception
except Exception:
print("消息队列已满,队列数%s,当前存在%s条消息" % (q._maxsize, q.qsize()))
try:
# 非阻塞,不能put就抛异常
q.put_nowait("再加条消息呗") # 相当于q.put(obj,False)
except Exception:
print("消息队列已满,队列数%s,当前存在%s条消息" % (q._maxsize, q.qsize()))
while not q.empty():
print("队列数:%s,当前存在%s条消息 内容%s" % (q._maxsize, q.qsize(), q.get_nowait()))
print("队列数:%s,当前存在:%s条消息" % (q._maxsize, q.qsize()))
if __name__ == '__main__':
main()
输出:
消息队列已满,队列数3,当前存在3条消息
消息队列已满,队列数3,当前存在3条消息
队列数:3,当前存在3条消息 内容[1, 2, 3, 4]
队列数:3,当前存在2条消息 内容{'a': 1, 'b': 2}
队列数:3,当前存在1条消息 内容{1, 2, 3, 4}
队列数:3,当前存在:0条消息
补充说明一下:
q._maxsize
队列数(尽量不用 _
开头的属性和方法)q.qsize()
查看当前队列中存在几条消息q.full()
查看是否满了q.empty()
查看是否为空再看个简单点的子进程间通信:(铺垫demo)
import os
import time
from multiprocessing import Process, Queue
def pro_test1(q):
print("[子进程1]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
q.put("[子进程1]小明,今晚撸串不?")
# 设置一个简版的重试机制(三次重试)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2(q):
print("[子进程2]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
print(q.get())
time.sleep(4) # 模拟一下网络延迟
q.put("[子进程2]不去,我今天约了妹子")
def main():
queue = Queue()
p1 = Process(target=pro_test1, args=(queue, ))
p2 = Process(target=pro_test2, args=(queue, ))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main()
输出:( time python35.queue2.py
)
[子进程1]PPID=15220,PID=15221,GID=1000
[子进程2]PPID=15220,PID=15222,GID=1000
[子进程1]小明,今晚撸串不?
[子进程2]不去,我今天约了妹子
real 0m6.087s
user 0m0.053s
sys 0m0.035s
多进程基本上都是用 pool
,可用上面说的 Queue
方法怎么报错了?
import os
import time
from multiprocessing import Pool, Queue
def error_callback(msg):
print(msg)
def pro_test1(q):
print("[子进程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
q.put("[子进程1]小明,今晚撸串不?")
# 设置一个简版的重试机制(三次重试)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2(q):
print("[子进程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
print(q.get())
time.sleep(4) # 模拟一下网络延迟
q.put("[子进程2]不去,我今天约了妹子")
def main():
print("[父进程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
queue = Queue()
p = Pool()
p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
p.close()
p.join()
if __name__ == '__main__':
main()
输出:(队列对象不能在父进程与子进程间通信)
[父进程]PPID=4223,PID=32170,GID=1000
Queue objects should only be shared between processes through inheritance
Queue objects should only be shared between processes through inheritance
real 0m0.183s
user 0m0.083s
sys 0m0.012s
下面会详说,先看一下正确方式:(队列换了一下,其他都一样 Manager().Queue()
)
import os
import time
from multiprocessing import Pool, Manager
def error_callback(msg):
print(msg)
def pro_test1(q):
print("[子进程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
q.put("[子进程1]小明,今晚撸串不?")
# 设置一个简版的重试机制(三次重试)
for i in range(3):
if not q.empty():
print(q.get())
break
else:
time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6s
def pro_test2(q):
print("[子进程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
print(q.get())
time.sleep(4) # 模拟一下网络延迟
q.put("[子进程2]不去,我今天约了妹子")
def main():
print("[父进程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
os.getgid()))
queue = Manager().Queue()
p = Pool()
p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
p.close()
p.join()
if __name__ == '__main__':
main()
输出:
[父进程]PPID=4223,PID=31329,GID=1000
[子进程1]PPID=31329,PID=31335,GID=1000
[子进程2]PPID=31329,PID=31336,GID=1000
[子进程1]小明,今晚撸串不?
[子进程2]不去,我今天约了妹子
real 0m6.134s
user 0m0.133s
sys 0m0.035s
官方参考:https://docs.python.org/3/library/multiprocessing.html
这块官方文档很详细,贴下官方的2个案例:
通过 multiprocessing.set_start_method(xxx)
来设置启动的上下文类型
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn') # 不要过多使用
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
输出:( set_start_method
不要过多使用)
hello
real 0m0.407s
user 0m0.134s
sys 0m0.012s
如果你把设置启动上下文注释掉:(消耗的总时间少了很多)
real 0m0.072s
user 0m0.057s
sys 0m0.016s
也可以通过 multiprocessing.get_context(xxx)
获取指定类型的上下文
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
输出:( get_context
在Python源码里用的比较多,so=>也建议大家这么用)
hello
real 0m0.169s
user 0m0.146s
sys 0m0.024s
从结果来看,总耗时也少了很多
说下日记相关的事情:
先看下 multiprocessing
里面的日记记录:
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def log_to_stderr(self, level=None):
'''打开日志记录并添加一个打印到stderr的处理程序'''
from .util import log_to_stderr
return log_to_stderr(level)
更多 Loging
模块内容可以看官方文档:https://docs.python.org/3/library/logging.html
这个是内部代码,看看即可:
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/util.py
def log_to_stderr(level=None):
'''打开日志记录并添加一个打印到stderr的处理程序'''
# 全局变量默认是False
global _log_to_stderr
import logging
# 日记记录转换成文本
formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
# 一个处理程序类,它将已适当格式化的日志记录写入流
handler = logging.StreamHandler() # 此类不会关闭流,因为用到了sys.stdout|sys.stderr
# 设置格式:'[%(levelname)s/%(processName)s] %(message)s'
handler.setFormatter(formatter)
# 返回`multiprocessing`专用的记录器
logger = get_logger()
# 添加处理程序
logger.addHandler(handler)
if level:
# 设置日记级别
logger.setLevel(level)
# 现在log是输出到stderr的
_log_to_stderr = True
return _logger
Logging
之前也有提过,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.装饰器传参的扩展(可传可不传)
来个案例:
import logging
from multiprocessing import Process, log_to_stderr
def test():
print("test")
def start_log():
# 把日记输出定向到sys.stderr中
logger = log_to_stderr()
# 设置日记记录级别
# 敏感程度:DEBUG、INFO、WARN、ERROR、CRITICAL
print(logging.WARN == logging.WARNING) # 这两个是一样的
level = logging.INFO
logger.setLevel(level) # 设置日记级别(一般都是WARN)
# 自定义输出
# def log(self, level, msg, *args, **kwargs):
logger.log(level, "我是通用格式") # 通用,下面的内部也是调用的这个
logger.info("info 测试")
logger.warning("warning 测试")
logger.error("error 测试")
def main():
start_log()
# 做的操作都会被记录下来
p = Process(target=test)
p.start()
p.join()
if __name__ == '__main__':
main()
输出:
True
[INFO/MainProcess] 我是通用格式
[INFO/MainProcess] info 测试
[WARNING/MainProcess] warning 测试
[ERROR/MainProcess] error 测试
[INFO/Process-1] child process calling self.run()
test
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
之前忘记说了~现在快结尾了,补充一下进程5态:(来个草图)