我已经搜索了大约10个线程的多进程查找,但似乎没有一个完全适合我的用例。下面是我想要并行化的一般概念。
class foo():
def boo():
filename = 'path to the data file'
with reader(filename) as fileReader:
for id, feature in fileReader:
boo2(id, feature)
def boo2(id, feature):
*process feature then save the output to a folder*在这里,我想将对boo2()的调用并行化,其中fileReader是一个迭代器(来自pykaldi的sequentialMatrixReader ),其中包含数万行id和feature,其中id是一个字符串,每个feature是一个矩阵(数百行x数十列)。boo2将计算一个较小的矩阵,并将结果保存到基于id的文件夹中。每个对boo2的调用都是相互独立的,所以我想将其并行化。
据我所知,我不能使用multiprocessing.Pool,因为boo2是一个类函数,而且由于它的复杂性,我不能把它从类中提取出来。
我不知道如何使用multiprocessing.Process,因为内核的数量比迭代器的行数少得多,而且一旦我完成了start()和join()进程,我也不确定如何对boo2的新调用进行排队(我已经尝试将fileReader拆分为n批,并为每批设置一个进程,但是我更喜欢在一行而不是多批中对调用进行排队)
我还研究了pathos模块,因为它在类函数方面没有问题。然而,从示例用例中,最符合我的需求的是:
pathos.threading.ThreadPoolpool.imap(boo2, [feature for feature in fileReader])但由于fileReader太大,我无法在内存中容纳[feature for feature in fileReader]。
任何和所有的帮助都是感激的。谢谢。
发布于 2019-10-23 10:07:16
由于类成员的原因,您将无法使用multiprocessing,因此需要一个单独的函数--这一点您是对的。
关于线程的使用,我建议你不要使用简单的理解[feature for feature in fileReader],而是根据可用的CPU线程分批读取fileReader的特性,然后运行线程,等待完成,然后读取下一批,等等。
类似于:
def make_next_batch( fileReader ) :
batch = []
for feature in fileReader :
if len(batch) == BATCH_SIZE :
yield batch
batch = []
batch.append( feature )
if len(batch) :
yield batch然后,您必须同时只在内存中保留BATCH_SIZE功能。
https://stackoverflow.com/questions/58514373
复制相似问题