首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并行处理百万个文件的解析和追加

并行处理百万个文件的解析和追加

原创
作者头像
华科云商小徐
发布于 2024-07-08 02:44:38
发布于 2024-07-08 02:44:38
25200
代码可运行
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫
运行总次数:0
代码可运行

处理和解析大量文件,尤其是百万级别的文件,是一个复杂且资源密集的任务。为实现高效并行处理,可以使用Python中的多种并行和并发编程工具,比如multiprocessingconcurrent.futures模块以及分布式计算框架如DaskApache Spark。这里主要介绍如何使用concurrent.futures模块来并行处理和追加文件。

问题背景

在数据处理的过程中,经常会遇到需要对大量文件进行解析和追加的情况。如果使用单进程进行处理,则会花费大量的时间。为了提高处理效率,可以采用并行处理的方式,即同时使用多个进程来处理不同的文件。 在 Python 中,可以使用 multiprocessing 模块来实现并行处理。该模块提供了 ProcessQueuePool 等类,可以用于创建进程、共享数据和管理进程池。

解决方案

1、使用 multiprocessing.Pool

multiprocessing.Pool 是一个进程池,可以自动管理进程的数量和分配任务。使用 Pool 进行并行处理的步骤如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Pool
​
def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return
​
def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # using pool to parallelize the process
    pool = Pool(processes=4)
    pool.apply_async(worker, [task_queue])
    pool.close()
    pool.join()
    data_file.close()
    return

2、使用 multiprocessing.Queue

multiprocessing.Queue 是一个队列,可以用于在进程之间共享数据。使用 Queue 进行并行处理的步骤如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Process, Queue
​
def worker(task_queue, data_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_queue.put(data)
    return
​
def main():
    task_queue = Queue()
    data_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # spawn 4 worker processes
    for i in range(4):
        proc = Process(target=worker, args=[task_queue, data_queue])
        proc.start()
​
    # collect data from the data_queue and write to file
    while not data_queue.empty():
        data = data_queue.get()
        data_file.write(repr(data)+'\n')
​
    # wait for all worker processes to finish
    for proc in [proc for proc in [proc] if proc.is_alive()]:
        proc.join()
​
    data_file.close()
    return

代码例子

以下是一个使用 multiprocessing.Pool 实现并行处理的代码例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Pool
​
def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return
​
def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # using pool to parallelize the process
    pool = Pool(processes=4)
    pool.apply_async(worker, [task_queue])
    pool.close()
    pool.join()
    data_file.close()
    returnif __name__ == '__main__':
    main()

以上代码中,worker() 函数是工作进程的函数,它从任务队列中获取文件,解析文件并将其追加到输出文件中。main() 函数是主进程的函数,它创建任务队列,将文件放入任务队列,然后创建进程池并启动工作进程。最后,主进程等待所有工作进程完成,然后关闭输出文件。

Dask可以自动管理并行任务,并提供更强大的分布式计算能力。通过合理的并行和分布式处理,可以显著提高处理百万级文件的效率。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Python进程池Pool初始化详解
在Python中,进程池(Pool)是multiprocessing模块提供的一种并发执行任务的机制。它通过创建一组工作进程,可以高效地处理大量任务,特别适合CPU密集型操作。
用户11638464
2025/08/06
1180
Masscan+Nmap实现快速端口存活和服务探测
想法源自:Nmap配合Masscan实现高效率扫描资产, 使用下来,感觉有点问题,恰好自己目前有这方面的需求,就顺手改下,也分享给大家。
FB客服
2021/01/22
2.9K0
Python进程池Pool初始化详解 - 从入门到精通
在Python中,进程池(Pool)是multiprocessing模块提供的一种并发执行任务的机制。它通过创建一组工作进程,可以高效地处理大量任务,特别适合CPU密集型操作。
用户11750067
2025/08/06
890
8.0 Python 使用进程与线程
python 进程与线程是并发编程的两种常见方式。进程是操作系统中的一个基本概念,表示程序在操作系统中的一次执行过程,拥有独立的地址空间、资源、优先级等属性。线程是进程中的一条执行路径,可以看做是轻量级的进程,与同一个进程中的其他线程共享相同的地址空间和资源。
王 瑞
2023/10/11
3410
Python多进程编程
阅读目录 1. Process 2. Lock 3. Semaphore 4. Event 5. Queue 6. Pipe 7. Pool 序. multiprocessing python 中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进 程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支
小小科
2018/05/02
1.2K0
Python多进程编程
[源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。因为 Celery 通过多进程来提高执行效率,所以本文将带领大家初步了解 Celery 之 多进程架构和模型。
罗西的思考
2021/04/16
1.5K0
[源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型
Python学习—pyhton中的进程
进程: 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据、进程控制块(pcb)三部分组成。 (1)我们编写的程序用来描述进程要完成哪些功能以及如何完成; (2)数据则是程序在执行过程中所需要使用的资源; (3)进程控制块用来记录进程的所有信息。系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
py3study
2020/01/06
6370
Python 标准类库-并发执行之multiprocessing-基于进程的并行
multiprocessing是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。
授客
2023/07/10
9810
Python进程学习
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
py3study
2020/01/08
5710
【从零学习python 】85.Python进程池的并行计算技术应用
当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态生成多个进程,但如果是上百甚至上千个目标,手动创建进程的工作量巨大,此时就可以使用 multiprocessing 模块提供的 Pool 方法。
全栈若城
2024/02/29
1750
【Python爬虫实战】多进程结合 BeautifulSoup 与 Scrapy 构建爬虫项目
在大数据时代,爬虫技术是获取和处理网络数据的利器。面对需要处理大量网页的爬取任务,如何提升效率成为了一个重要的问题。Python 的多进程技术结合 BeautifulSoup 和 Scrapy,可以在保证解析能力的同时,大大提高并发抓取的效率。这篇文章将详细介绍如何利用多进程模块进行爬虫、结合 JoinableQueue 管理任务,以及在更复杂的场景中使用 BeautifulSoup 和 Scrapy,打造功能强大的爬虫项目。
易辰君
2024/11/07
3900
python进程回顾
pro = multiprocessing.Process(target=入口, args=(), kwargs={})
小闫同学啊
2019/07/18
6590
Python:线程、进程与协程(6)——
    上篇博文介绍了multiprocessing模块的内存共享(点击此处可以参看),下面讲进程池。有些情况下,所要完成的工作可以上篇博文介绍了multiprocessing模块的内存共享,下面讲进程池。有些情况下,所要完成的工作可以分解并独立地分布到多个工作进程,对于这种简单的情况,可以用Pool类来管理固定数目的工作进程。作业的返回值会收集并作为一个列表返回。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
py3study
2020/01/06
1.6K0
第十五章 Python多进程与多线程
multiprocessing是多进程模块,多进程提供了任务并发性,能充分利用多核处理器。避免了GIL(全局解释锁)对资源的影响。
py3study
2020/01/09
8000
python网络-多进程(21)
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。---来自百度百科
Se7eN_HOU
2019/09/11
5430
python网络-多进程(21)
多线程、协程和多进程并发编程(续写)
python中的多线程其实并不是真正的多线程,如果想要充分地使⽤多核CPU的资源,在python中
软件架构师Michael
2023/12/23
3951
Python并行——速度++++++++
这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。
自学气象人
2023/06/21
3100
Python并行——速度++++++++
并行执行(二)、multiprocessing
multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。这个模块表示像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。
狼啸风云
2022/07/27
6120
并行执行(二)、multiprocessing
Python进程间通信
可以使用 multiprocessing 模块的 Queue 实现多进程之间的数据传递,Queue本身是一个消息队列程序,首先用一个小实例来演示一下Queue的工作原理:
ZackSock
2021/04/13
6860
[源码解析] PyTorch 分布式(2) --- 数据加载之DataLoader
为了更好的介绍参数服务器Paracel的数据加载,我们临时插入两篇PyTorch的数据加载,主要是从分布式的角度进行切入。本文只算是开胃甜点,后续会有专门系列分析PyTorch分布式。
罗西的思考
2021/08/23
1.9K0
[源码解析] PyTorch 分布式(2) --- 数据加载之DataLoader
相关推荐
Python进程池Pool初始化详解
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档