Loading [MathJax]/jax/input/TeX/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并行处理百万个文件的解析和追加

并行处理百万个文件的解析和追加

原创
作者头像
华科云商小徐
发布于 2024-07-08 02:44:38
发布于 2024-07-08 02:44:38
21100
代码可运行
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫
运行总次数:0
代码可运行

处理和解析大量文件,尤其是百万级别的文件,是一个复杂且资源密集的任务。为实现高效并行处理,可以使用Python中的多种并行和并发编程工具,比如multiprocessingconcurrent.futures模块以及分布式计算框架如DaskApache Spark。这里主要介绍如何使用concurrent.futures模块来并行处理和追加文件。

问题背景

在数据处理的过程中,经常会遇到需要对大量文件进行解析和追加的情况。如果使用单进程进行处理,则会花费大量的时间。为了提高处理效率,可以采用并行处理的方式,即同时使用多个进程来处理不同的文件。 在 Python 中,可以使用 multiprocessing 模块来实现并行处理。该模块提供了 ProcessQueuePool 等类,可以用于创建进程、共享数据和管理进程池。

解决方案

1、使用 multiprocessing.Pool

multiprocessing.Pool 是一个进程池,可以自动管理进程的数量和分配任务。使用 Pool 进行并行处理的步骤如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Pool
​
def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return
​
def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # using pool to parallelize the process
    pool = Pool(processes=4)
    pool.apply_async(worker, [task_queue])
    pool.close()
    pool.join()
    data_file.close()
    return

2、使用 multiprocessing.Queue

multiprocessing.Queue 是一个队列,可以用于在进程之间共享数据。使用 Queue 进行并行处理的步骤如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Process, Queue
​
def worker(task_queue, data_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_queue.put(data)
    return
​
def main():
    task_queue = Queue()
    data_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # spawn 4 worker processes
    for i in range(4):
        proc = Process(target=worker, args=[task_queue, data_queue])
        proc.start()
​
    # collect data from the data_queue and write to file
    while not data_queue.empty():
        data = data_queue.get()
        data_file.write(repr(data)+'\n')
​
    # wait for all worker processes to finish
    for proc in [proc for proc in [proc] if proc.is_alive()]:
        proc.join()
​
    data_file.close()
    return

代码例子

以下是一个使用 multiprocessing.Pool 实现并行处理的代码例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Pool
​
def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return
​
def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # using pool to parallelize the process
    pool = Pool(processes=4)
    pool.apply_async(worker, [task_queue])
    pool.close()
    pool.join()
    data_file.close()
    returnif __name__ == '__main__':
    main()

以上代码中,worker() 函数是工作进程的函数,它从任务队列中获取文件,解析文件并将其追加到输出文件中。main() 函数是主进程的函数,它创建任务队列,将文件放入任务队列,然后创建进程池并启动工作进程。最后,主进程等待所有工作进程完成,然后关闭输出文件。

Dask可以自动管理并行任务,并提供更强大的分布式计算能力。通过合理的并行和分布式处理,可以显著提高处理百万级文件的效率。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Python多进程编程
阅读目录 1. Process 2. Lock 3. Semaphore 4. Event 5. Queue 6. Pipe 7. Pool 序. multiprocessing python 中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进 程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支
小小科
2018/05/02
1.2K0
Python多进程编程
Python 标准类库-并发执行之multiprocessing-基于进程的并行
multiprocessing是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。
授客
2023/07/10
9340
Python进程学习
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
py3study
2020/01/08
5580
[源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。因为 Celery 通过多进程来提高执行效率,所以本文将带领大家初步了解 Celery 之 多进程架构和模型。
罗西的思考
2021/04/16
1.4K0
[源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型
Python:线程、进程与协程(6)——
    上篇博文介绍了multiprocessing模块的内存共享(点击此处可以参看),下面讲进程池。有些情况下,所要完成的工作可以上篇博文介绍了multiprocessing模块的内存共享,下面讲进程池。有些情况下,所要完成的工作可以分解并独立地分布到多个工作进程,对于这种简单的情况,可以用Pool类来管理固定数目的工作进程。作业的返回值会收集并作为一个列表返回。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
py3study
2020/01/06
1.6K0
【Python基础编程】全面解析进程、进程通信与生产者-消费者模式
上篇文章主要了解python的线程,如何创建线程,如何通过线程实现生产者-消费者模式以及线程池的使用等等,接下来这篇文章讲解python的进程,有问题欢迎一起探讨。
易辰君
2024/11/07
1260
Python 线程&进程与协程
线程是进程的执行单元,对于大多数程序来说,可能只有一个主线程,但是为了能够提高效率,有些程序会采用多线程,在系统中所有的线程看起来都是同时执行的,例如,现在的多线程网络下载程序中,就使用了这种线程并发的特性,程序将欲下载的文件分成多个部分,然后同时进行下载,从而加快速度.虽然线程并不是一个容易掌握和使用的概念,但是如果运用得当,还是可以获得很不错的性能的.
王 瑞
2022/12/28
8190
【从零学习python 】85.Python进程池的并行计算技术应用
当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态生成多个进程,但如果是上百甚至上千个目标,手动创建进程的工作量巨大,此时就可以使用 multiprocessing 模块提供的 Pool 方法。
全栈若城
2024/02/29
1590
Python进程间通信
可以使用 multiprocessing 模块的 Queue 实现多进程之间的数据传递,Queue本身是一个消息队列程序,首先用一个小实例来演示一下Queue的工作原理:
ZackSock
2021/04/13
6670
Python学习—pyhton中的进程
进程: 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据、进程控制块(pcb)三部分组成。 (1)我们编写的程序用来描述进程要完成哪些功能以及如何完成; (2)数据则是程序在执行过程中所需要使用的资源; (3)进程控制块用来记录进程的所有信息。系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
py3study
2020/01/06
6130
python进程回顾
pro = multiprocessing.Process(target=入口, args=(), kwargs={})
小闫同学啊
2019/07/18
6490
【Python爬虫实战】多进程结合 BeautifulSoup 与 Scrapy 构建爬虫项目
在大数据时代,爬虫技术是获取和处理网络数据的利器。面对需要处理大量网页的爬取任务,如何提升效率成为了一个重要的问题。Python 的多进程技术结合 BeautifulSoup 和 Scrapy,可以在保证解析能力的同时,大大提高并发抓取的效率。这篇文章将详细介绍如何利用多进程模块进行爬虫、结合 JoinableQueue 管理任务,以及在更复杂的场景中使用 BeautifulSoup 和 Scrapy,打造功能强大的爬虫项目。
易辰君
2024/11/07
3010
多线程、协程和多进程并发编程(续写)
python中的多线程其实并不是真正的多线程,如果想要充分地使⽤多核CPU的资源,在python中
软件架构师Michael
2023/12/23
3801
Python多进程编程:基础、应用与优化策略
在了解multiprocessing模块之前,我们先来了解一下进程的基本概念。进程是计算机中运行的程序的实例,它拥有独立的内存空间和系统资源。相比于多线程,多进程更容易实现并行处理,因为每个进程都有自己的解释器和全局解释器锁(GIL)。
一键难忘
2024/03/15
4270
浅谈 multiprocessing
一前言 使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code! 二使用 2.1 初识异同
用户1278550
2018/08/08
4560
Python并行——速度++++++++
这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。
自学气象人
2023/06/21
2930
Python并行——速度++++++++
python网络-多进程(21)
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。---来自百度百科
Se7eN_HOU
2019/09/11
5350
python网络-多进程(21)
Python自动化开发学习10
上次讲了由于GIL锁的存在,Python的多线程是假的,用的还是CPU的单核。Python的多线程只是利用了CPU的上下文切换,上下分切换也是占用CPU的。那么什么时候用多行程? Python的多线程,适合IO密集型的任务,不适合CPU密集型的任务。 IO操作不占用CPU,比如socket这种网络编程的情景。 计算占用CPU,所以大量计算的情景下多线程反而更慢,额外消耗了CPU切换上下文的计算。
py3study
2020/01/08
1.1K0
并行执行(二)、multiprocessing
multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。这个模块表示像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。
狼啸风云
2022/07/27
5880
并行执行(二)、multiprocessing
Python 的并发编程
这篇文章将讲解 Python 并发编程的基本操作。并发和并行是对孪生兄弟,概念经常混淆。并发是指能够多任务处理,并行则是是能够同时多任务处理。Erlang 之父 Joe Armstrong 有一张非常有趣的图说明这两个概念:
CS实验室
2021/03/22
5550
Python 的并发编程
相关推荐
Python多进程编程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档