应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:
Value
or Array
)之前说过 Queue
:在 Process
之间使用没问题,用到 Pool
,就使用 Manager().xxx
, Value
和 Array
,就不太一样了:
看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.pyclass Value(object): def __init__(self, typecode, value, lock=True): self._typecode = typecode self._value = value def get(self): return self._value def set(self, value): self._value = value def __repr__(self): return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value) value = property(get, set) # 给value设置get和set方法(和value的属性装饰器一样效果)def Array(typecode, sequence, lock=True): return array.array(typecode, sequence)
以 Process
为例看看怎么用:
from multiprocessing import Process, Value, Arraydef proc_test1(value, array): print("子进程1", value.value) array[0] = 10 print("子进程1", array[:])def proc_test2(value, array): print("子进程2", value.value) array[1] = 10 print("子进程2", array[:])def main(): try: value = Value("d", 3.14) # d 类型,相当于C里面的double array = Array("i", range(10)) # i 类型,相当于C里面的int print(type(value)) print(type(array)) p1 = Process(target=proc_test1, args=(value, array)) p2 = Process(target=proc_test2, args=(value, array)) p1.start() p2.start() p1.join() p2.join() print("父进程", value.value) # 获取值 print("父进程", array[:]) # 获取值 except Exception as ex: print(ex) else: print("No Except")if __name__ == '__main__': main()
输出:( Value
和 Array
是 进程|线程
安全的)
<class 'multiprocessing.sharedctypes.Synchronized'><class 'multiprocessing.sharedctypes.SynchronizedArray'>子进程1 3.14子进程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]子进程2 3.14子进程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]父进程 3.14父进程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]No Except
类型方面的对应关系:
typecode_to_type = { 'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong, 'f': ctypes.c_float, 'd': ctypes.c_double}
这两个类型其实是 ctypes
类型,更多的类型可以去 multiprocessing.sharedctypes
查看,来张图:
回头解决 GIL
的时候会用到 C
系列或者 Go
系列的共享库(讲线程的时候会说)
关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下 i+=1
并不是原子性操作:
from multiprocessing import Process, Valuedef proc_test1(value): for i in range(1000): value.value += 1def main(): value = Value("i", 0) p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)] # 批量启动 for i in p_list: i.start() # 批量资源回收 for i in p_list: i.join() print(value.value)if __name__ == '__main__': main()
输出:(理论上应该是:5×1000=5000)
2153
稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了)
# 通用方法def proc_test1(value): for i in range(1000): if value.acquire(): value.value += 1 value.release()# 官方案例:(Lock可以使用with托管)def proc_test1(value): for i in range(1000): with value.get_lock(): value.value += 1# 更多可以查看:`sharedctypes.SynchronizedBase` 源码
输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)
5000
看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了 Value
)
def Value(typecode_or_type, *args, lock=True, ctx=None): '''返回Value的同步包装器''' obj = RawValue(typecode_or_type, *args) if lock is False: return obj # 默认支持Lock if lock in (True, None): ctx = ctx or get_context() # 获取上下文 lock = ctx.RLock() # 获取递归锁 if not hasattr(lock, 'acquire'): raise AttributeError("%r has no method 'acquire'" % lock) # 一系列处理 return synchronized(obj, lock, ctx=ctx)def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None): '''返回RawArray的同步包装器''' obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj # 默认是支持Lock的 if lock in (True, None): ctx = ctx or get_context() # 获取上下文 lock = ctx.RLock() # 递归锁属性 # 查看是否有acquire属性 if not hasattr(lock, 'acquire'): raise AttributeError("%r has no method 'acquire'" % lock) return synchronized(obj, lock, ctx=ctx)
扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807
Manager
)官方文档:https://docs.python.org/3/library/multiprocessing.html#managers
有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象
通过返回的经理 Manager()
将支持类型 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue
举个简单例子(后面还会再说):(本质其实就是 多个进程通过代理,共同操作服务端内容
)
from multiprocessing import Pool, Managerdef test1(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse()def test2(d, l): print(d) print(l)def main(): with Manager() as manager: dict_test = manager.dict() list_test = manager.list(range(10)) pool = Pool() pool.apply_async(test1, args=(dict_test, list_test)) pool.apply_async(test2, args=(dict_test, list_test)) pool.close() pool.join()if __name__ == '__main__': main()
输出:
{1: '1', '2': 2, 0.25: None}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了 “中介”
)
同步问题依然需要注意一下,举个例子体会一下:
from multiprocessing import Manager, Process, Lockdef test(dict1, lock): for i in range(100): with lock: # 你可以把这句话注释掉,然后就知道为什么加了 dict1["year"] += 1def main(): with Manager() as m: lock = Lock() dict1 = m.dict({"year": 2000}) p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(dict1)if __name__ == '__main__': main()
扩展补充:
multiprocessing.Lock
是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。Manager
Unix/Linux
系统下,用 fork
调用还行,在 Windows
下创建进程开销巨大。Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers
WinServer
的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)
还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:
multiprocessing.Queue()
这将使 Queue
实例在所有子进程中全局共享再看一下Pool的 __init__
方法:
# processes:进程数# initializer,initargs 初始化进行的操作# maxtaskperchild:每个进程执行task的最大数目# contex:上下文对象def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None):
第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)
import osimport timefrom multiprocessing import Pool, Queuedef error_callback(msg): print(msg)def pro_test1(): print("[子进程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid())) q.put("[子进程1]小明,今晚撸串不?") # 设置一个简版的重试机制(三次重试) for i in range(3): if not q.empty(): print(q.get()) break else: time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6sdef pro_test2(): print("[子进程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid())) print(q.get()) time.sleep(4) # 模拟一下网络延迟 q.put("[子进程2]不去,我今天约了妹子")def init(queue): global q q = queuedef main(): print("[父进程]PPID=%d,PID=%d" % (os.getppid(), os.getpid())) queue = Queue() p = Pool(initializer=init, initargs=(queue, )) p.apply_async(pro_test1, error_callback=error_callback) p.apply_async(pro_test2, error_callback=error_callback) p.close() p.join()if __name__ == '__main__': main()
输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数: alizer=init,initargs=(queue,))
)
[父进程]PPID=13157,PID=24864[子进程1]PPID=24864,PID=24865[子进程2]PPID=24864,PID=24866[子进程1]小明,今晚撸串不?[子进程2]不去,我今天约了妹子real 0m6.105suser 0m0.071ssys 0m0.042s
Win下亦通用(win下没有 os.getgid
)
有了 1.6
的基础,咱们来个例子练练:
BaseManager
的缩略图:
服务器端代码:
from multiprocessing import Queuefrom multiprocessing.managers import BaseManagerdef main(): # 用来身份验证的 key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92" get_zhang_queue = Queue() # 小张消息队列 get_ming_queue = Queue() # 小明消息队列 # 把Queue注册到网络上, callable参数关联了Queue对象 BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue) BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue) # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥 manager = BaseManager(address=("192.168.36.235", 5438), authkey=key) # 运行serve manager.get_server().serve_forever()if __name__ == '__main__': main()
客户端代码1:
from multiprocessing.managers import BaseManagerdef main(): """客户端1""" key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92" # 注册对应方法的名字(从网络上获取Queue) BaseManager.register("get_ming_queue") BaseManager.register("get_zhang_queue") # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥 m = BaseManager(address=("192.168.36.235", 5438), authkey=key) # 连接到服务器 m.connect() q1 = m.get_zhang_queue() # 在自己队列里面留言 q1.put("[小张]小明,老大明天是不是去外地办事啊?") q2 = m.get_ming_queue() # 获取小明说的话 print(q2.get())if __name__ == '__main__': main()
客户端代码2:
from multiprocessing.managers import BaseManagerdef main(): """客户端2""" key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92" # 注册对应方法的名字(从网络上获取Queue) BaseManager.register("get_ming_queue") BaseManager.register("get_zhang_queue") # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥 m = BaseManager(address=("192.168.36.235", 5438), authkey=key) # 连接到服务器 m.connect() q1 = m.get_zhang_queue() # 获取小张说的话 print(q1.get()) q2 = m.get_ming_queue() # 在自己队列里面留言 q2.put("[小明]这几天咱们终于可以不加班了(>_<)")if __name__ == '__main__': main()
输出图示:
服务器运行在Linux的测试:
其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下
参考文章:
进程共享的探讨:python-sharing-a-lock-between-processes
多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error
JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html
Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html
有深度但需要辩证看的两篇文章:
跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue
关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。