首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >通过带有迭代器的复杂类函数多处理大型数据集

通过带有迭代器的复杂类函数多处理大型数据集
EN

Stack Overflow用户
提问于 2019-10-23 09:52:17
回答 1查看 37关注 0票数 0

我已经搜索了大约10个线程的多进程查找,但似乎没有一个完全适合我的用例。下面是我想要并行化的一般概念。

代码语言:javascript
复制
class foo():
    def boo():
        filename = 'path to the data file'
        with reader(filename) as fileReader:
            for id, feature in fileReader:
                 boo2(id, feature)
    def boo2(id, feature):
        *process feature then save the output to a folder*

在这里,我想将对boo2()的调用并行化,其中fileReader是一个迭代器(来自pykaldi的sequentialMatrixReader ),其中包含数万行idfeature,其中id是一个字符串,每个feature是一个矩阵(数百行x数十列)。boo2将计算一个较小的矩阵,并将结果保存到基于id的文件夹中。每个对boo2的调用都是相互独立的,所以我想将其并行化。

据我所知,我不能使用multiprocessing.Pool,因为boo2是一个类函数,而且由于它的复杂性,我不能把它从类中提取出来。

我不知道如何使用multiprocessing.Process,因为内核的数量比迭代器的行数少得多,而且一旦我完成了start()join()进程,我也不确定如何对boo2的新调用进行排队(我已经尝试将fileReader拆分为n批,并为每批设置一个进程,但是我更喜欢在一行而不是多批中对调用进行排队)

我还研究了pathos模块,因为它在类函数方面没有问题。然而,从示例用例中,最符合我的需求的是:

代码语言:javascript
复制
pathos.threading.ThreadPoolpool.imap(boo2, [feature for feature in fileReader])

但由于fileReader太大,我无法在内存中容纳[feature for feature in fileReader]

任何和所有的帮助都是感激的。谢谢。

EN

回答 1

Stack Overflow用户

发布于 2019-10-23 10:07:16

由于类成员的原因,您将无法使用multiprocessing,因此需要一个单独的函数--这一点您是对的。

关于线程的使用,我建议你不要使用简单的理解[feature for feature in fileReader],而是根据可用的CPU线程分批读取fileReader的特性,然后运行线程,等待完成,然后读取下一批,等等。

类似于:

代码语言:javascript
复制
def make_next_batch( fileReader ) :
    batch = []
    for feature in fileReader :
        if len(batch) == BATCH_SIZE :
            yield batch
            batch = []
        batch.append( feature )
    if len(batch) :
        yield batch

然后,您必须同时只在内存中保留BATCH_SIZE功能。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58514373

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档