前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python进程间通信和进程池

Python进程间通信和进程池

作者头像
Python碎片公众号
发布2021-02-26 14:51:25
8310
发布2021-02-26 14:51:25
举报
文章被收录于专栏:Python碎片公众号的专栏

Python实现多进程是通过multiprocessing模块来实现的。

参考:Python使用multiprocessing实现多进程

在使用多进程时,有时候在多个进程之间需要传递数据。

一、使用Queue实现进程间通信

可以使用multiprocessing模块的Queue实现多个进程之间的数据传递。Queue本身是一个消息列队程序。

代码语言:javascript
复制
from multiprocessing import Process, Queue
import time
 
 
def put_card(queue):
    """往队列中添加数据"""
    for card in ['A', 'K', 'Q', 'J', '10']:
        print('Put {} to queue...'.format(card))
        queue.put(card)
        time.sleep(1)
 
 
def get_card(queue):
    """从队列中取出数据"""
    while True:
        if not queue.empty():
            card = queue.get(True)
            print('Get {} from queue.'.format(card))
            time.sleep(1)
        else:
            break
 
 
if __name__ == "__main__":
    q = Queue()
    pp = Process(target=put_card, args=(q,))
    pg = Process(target=get_card, args=(q,))
    pp.start()
 
    pg.start()
    pg.join()
    print(pg.is_alive())

运行结果:

代码语言:javascript
复制
Put A to queue...
Get A from queue.
Put K to queue...
Get K from queue.
Put Q to queue...
Get Q from queue.
Put J to queue...
Get J from queue.
Put 10 to queue...
Get 10 from queue.
False

上面的代码中,在父进程中创建两个子进程,一个往Queue里添加数据,一个从Queue里读取数据。

创建了一个Queue对象q,创建了两个Process对象pp和pg,两个子进程分别执行put_card函数和get_card函数,pp进程往q队列中添加数据,pg进程从q队列中获取数据。这样,就实现了两个进程之间的数据传递,即队列间的通信。

注意,pp进程需要在pg进程之前执行,需要先添加后获取,顺序不能错,否则不能实现数据的传递。

二、Queue语法结构和常用方法

Queue([maxsize])

maxsize:指定队列的长度,即队列中消息的最大数量

初始化Queue对象时,若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

Queue的常用方法:

1.qsize():返回当前队列包含的消息数量,即当前队列中有多少条数据

2.empty():如果队列为空,返回True,反之False

3.full():如果队列满了,返回True,反之False

4.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True

如果block使用默认值,且没有设置timeout(单位秒),列队为空,此时程序将被阻塞(停在读取状态),直到从列队读到消息为止。如果设置了timeout,列队为空,则会等待timeout秒,若还没读取到任何消息,抛出"Queue.Empty"异常。

如果block值为False,消息如果为空,则会立刻抛出 "Queue.Empty"异常。

5.get_nowait():相当于Queue.get(False)

6.Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True

如果block使用默认值,且没有设置timeout(单位秒),列队已满,此时程序将被阻塞(停在写入状态),直到列队腾出空间为止,将数据写入。如果设置了timeout,列队已满,则会等待timeout秒,若还没空间,抛出"Queue.Full"异常。

如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常。

7.Queue.put_nowait(item):相当于Queue.put(item, False)

三、使用Pool实现进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process创建进程,但如果是上百甚至上千个进程,一个一个的创建工作量巨大,且容易出错,此时就可以用到multiprocessing模块提供的Pool方法。

代码语言:javascript
复制
from multiprocessing import Pool
import os
import time
 
 
def task(num):
    print("Sub process {} start, process id is {}".format(num, os.getpid()))
    time.sleep(1)
    print("Sub process {} end".format(num))
 
 
if __name__ == '__main__':
 
    po = Pool(3)
    for i in range(10):
        po.apply_async(task, (i + 1,))
 
    po.close()
    po.join()

运行结果:

代码语言:javascript
复制
Sub process 1 start, process id is 14348
Sub process 2 start, process id is 11676
Sub process 3 start, process id is 8096
Sub process 1 end
Sub process 4 start, process id is 14348
Sub process 2 end
Sub process 5 start, process id is 11676
Sub process 3 end
Sub process 6 start, process id is 8096
Sub process 4 end
Sub process 7 start, process id is 14348
Sub process 5 end
Sub process 8 start, process id is 11676
Sub process 6 end
Sub process 9 start, process id is 8096
Sub process 7 end
Sub process 10 start, process id is 14348
Sub process 8 end
Sub process 9 end
Sub process 10 end

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

在上面的代码中,我们指定进程池的最大进程数量为3,我们需要创建的进程数量是10个,当进程数不到三个时,直接创建。因为我们设置的是每个进程运行时间一样,所以第一个进程结束后才会去创建第四个,第二个结束后才会去创建第五个,并且,进程4的id与进程1的相同,进程5的id与进程2的相同,以此类推。

apply_async中的第一个参数是进程要执行的函数的引用,这是一个必传的位置参数,第二个参数是执行函数所需要的参数,是一个元组。

进程池中创建的进程,一旦创建就会自动执行,不需要使用start()方法来手动开始。

进程池使用完后需要使用close()方法关闭进程池。

主进程需要使用join()阻塞,保证所有子进程都执行完。

四、Pool常用方法

Pool([maxsize])

maxsize:指定进程池的大小,即进程池中进程的最大数量

如果不设置数字,会自动根据系统的CPU核数来创建进程数量。这个数量要设置适合,如果太大,会占用太多系统资源,且创建进程池的时间会很慢。如果是负数,则代码报错。

Pool常用方法:

1.apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程)

func:子进程需要执行的函数,传入一个函数的引用,这里是位置参数

args:传递给func的参数,以元组的方式传递

kwds:传递给func的关键字参数列表,以字典的方式传递

2.close():关闭Pool,使其不再接受新的任务

3.terminate():不管任务是否完成,立即终止

4.join():主进程阻塞,等所有待子进程的退出, 必须在close或terminate之后使用

五、进程池中的Queue

如果要使用Pool创建进程,需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则程序会直接终止。

代码语言:javascript
复制
from multiprocessing import Pool, Manager
import time
 
 
def put_card(queue):
    """往队列中添加数据"""
    for card in ['A', 'K', 'Q', 'J', '10']:
        print('Put {} to queue...'.format(card))
        queue.put(card)
        time.sleep(1)
 
 
def get_card(queue):
    """从队列中取出数据"""
    while True:
        if not queue.empty():
            card = queue.get(True)
            print('Get {} from queue.'.format(card))
            time.sleep(1)
        else:
            break
 
 
if __name__ == "__main__":
    # q = Queue() 程序会直接终止
    q = Manager().Queue()
    p = Pool()
    p.apply_async(put_card, args=(q,))
    p.apply_async(get_card, args=(q,))
 
    p.close()
    p.join()

运行结果:

代码语言:javascript
复制
Put A to queue...
Get A from queue.
Put K to queue...
Get K from queue.
Put Q to queue...
Get Q from queue.
Put J to queue...
Get J from queue.
Put 10 to queue...
Get 10 from queue.

上面的代码中,进程是通过Pool创建的,不是通过Process创建的,这时候如果需要进行进程间的通信,则要使用 Manager().Queue()。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python 碎片 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档