首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并行处理百万个文件的解析和追加

并行处理百万个文件的解析和追加

原创
作者头像
华科云商小徐
发布于 2024-07-08 02:44:38
发布于 2024-07-08 02:44:38
21100
代码可运行
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫
运行总次数:0
代码可运行

处理和解析大量文件,尤其是百万级别的文件,是一个复杂且资源密集的任务。为实现高效并行处理,可以使用Python中的多种并行和并发编程工具,比如multiprocessingconcurrent.futures模块以及分布式计算框架如DaskApache Spark。这里主要介绍如何使用concurrent.futures模块来并行处理和追加文件。

问题背景

在数据处理的过程中,经常会遇到需要对大量文件进行解析和追加的情况。如果使用单进程进行处理,则会花费大量的时间。为了提高处理效率,可以采用并行处理的方式,即同时使用多个进程来处理不同的文件。 在 Python 中,可以使用 multiprocessing 模块来实现并行处理。该模块提供了 ProcessQueuePool 等类,可以用于创建进程、共享数据和管理进程池。

解决方案

1、使用 multiprocessing.Pool

multiprocessing.Pool 是一个进程池,可以自动管理进程的数量和分配任务。使用 Pool 进行并行处理的步骤如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Pool
​
def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return
​
def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # using pool to parallelize the process
    pool = Pool(processes=4)
    pool.apply_async(worker, [task_queue])
    pool.close()
    pool.join()
    data_file.close()
    return

2、使用 multiprocessing.Queue

multiprocessing.Queue 是一个队列,可以用于在进程之间共享数据。使用 Queue 进行并行处理的步骤如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Process, Queue
​
def worker(task_queue, data_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_queue.put(data)
    return
​
def main():
    task_queue = Queue()
    data_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # spawn 4 worker processes
    for i in range(4):
        proc = Process(target=worker, args=[task_queue, data_queue])
        proc.start()
​
    # collect data from the data_queue and write to file
    while not data_queue.empty():
        data = data_queue.get()
        data_file.write(repr(data)+'\n')
​
    # wait for all worker processes to finish
    for proc in [proc for proc in [proc] if proc.is_alive()]:
        proc.join()
​
    data_file.close()
    return

代码例子

以下是一个使用 multiprocessing.Pool 实现并行处理的代码例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from multiprocessing import Pool
​
def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return
​
def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop
​
    # using pool to parallelize the process
    pool = Pool(processes=4)
    pool.apply_async(worker, [task_queue])
    pool.close()
    pool.join()
    data_file.close()
    returnif __name__ == '__main__':
    main()

以上代码中,worker() 函数是工作进程的函数,它从任务队列中获取文件,解析文件并将其追加到输出文件中。main() 函数是主进程的函数,它创建任务队列,将文件放入任务队列,然后创建进程池并启动工作进程。最后,主进程等待所有工作进程完成,然后关闭输出文件。

Dask可以自动管理并行任务,并提供更强大的分布式计算能力。通过合理的并行和分布式处理,可以显著提高处理百万级文件的效率。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
接口测试平台代码实现40:修改bug
我们的这个系列已经进行了长达12章成品预览和40章纯开发章节,但是基本还没做过完全一点的测试修复bug章节,每次新开发的功能也仅仅停留在单元/函数层面上的自测。 
我去热饭
2022/05/19
4260
接口测试平台代码实现40:修改bug
接口测试平台代码实现33:接口调试
首先我们要做一个简单的弹层,和上节课的备注弹层一样的做法,这里就给大家快速贴源码了用到的技术都是之前学过的。注意一点,打开弹层后,一定要在弹层的某个地方放入接口id,以便我们之后调试保存时,发出的请求中可以知道当前用户打开的是哪个接口。
我去热饭
2022/05/19
1.1K0
接口测试平台代码实现33:接口调试
接口测试平台代码实现39:接口数据全部保存
我们之前的章节已经解决了各种接口的数据的提取问题,本节的任务就是把这些传给后端,然后保存成功。
我去热饭
2022/05/19
6560
接口测试平台代码实现39:接口数据全部保存
接口测试平台代码实现49:自动异常测试-2
本节开始之前先感谢有同学反馈并主动过修复的一个bug:就是在某些接口的返回值中,中文会显示乱码的问题
我去热饭
2022/05/19
4930
接口测试平台代码实现49:自动异常测试-2
接口测试平台代码实现38: 请求体保存-end
本节课我们处理完剩余请求体,内容较多。因为是直播撸码,所以难免会有一些不完美的地方。大家自行优化。
我去热饭
2022/05/19
4370
接口测试平台代码实现38: 请求体保存-end
接口测试平台代码实现88: 全局请求头-3
本节我们就一起来研究,在接口库调试页面,用例库步骤详情页 这俩个地方如何加入这些项目公共请求头吧。
我去热饭
2022/05/19
4060
接口测试平台代码实现88: 全局请求头-3
接口测试平台代码实现32:接口列表备注功能
让我们制作一个简单的备注输入框和保持/取消按钮,然后用户点击备注按钮就会显示这个输入框+保持/取消按钮。 保持和取消都会让输入框消失,但是保存功能多了一个像后台发送请求的过程,把备注内容给后台保存起来。
我去热饭
2022/05/19
6020
接口测试平台代码实现32:接口列表备注功能
接口测试平台代码实现107:登录态接口-3
首先先改正一下 榜一大哥提出的bug,就是当选择完公共请求头后取消选择,然后再请求就会报错的问题:
我去热饭
2022/05/19
5220
接口测试平台代码实现107:登录态接口-3
接口测试平台代码实现37:接口请求的保存和取消
本节主要来讲一下,使用者在打开接口调试面板后,点击保存按钮,就会成功保存好,并且再次打开后,能显示出来呢?可能大家会觉得本节课没什么难度,就是简单的保存而已,但是实际上,本节是非常复杂的。因为接口的不同编码格式,我们保存起来的基本只有一个大字符串。要如何存储和展示是需要进行特殊设计的,类似 加密和解密,压缩和解压缩。
我去热饭
2022/05/19
5430
接口测试平台代码实现37:接口请求的保存和取消
接口测试平台代码实现46:接口调试用户异常操作处理
经过45节的学习,很多同学已经达到了一个不错的水准,可以自主找到一些问题并修复和提出改进方法,我很欣慰。
我去热饭
2022/05/19
8810
接口测试平台代码实现46:接口调试用户异常操作处理
接口测试平台代码实现31:接口列表继续开发
在调试和删除 俩个按钮中间 加入 备注/复制/异常值测试 三个按钮 ,按钮顺序尽量按照使用习惯 和频率 来设计。
我去热饭
2022/05/19
4630
接口测试平台代码实现31:接口列表继续开发
接口测试平台代码实现41:调试窗口显示接口内容
接口调试层打开后,目前并没有显示这个接口的所有数据。本节就一句一句的把这些数据显示出来吧。
我去热饭
2022/05/19
4920
接口测试平台代码实现41:调试窗口显示接口内容
接口测试平台代码实现149:加密算法的增删改查
我们目前要做的加密算法是一个实体,或者说一个表达式。那么它就自然而然的要考虑下面几点:
我去热饭
2022/05/19
3360
接口测试平台代码实现149:加密算法的增删改查
接口测试平台代码实现35:请求体
http://down.htmleaf.com/1801/201801271505.zip
我去热饭
2022/05/19
7970
接口测试平台代码实现35:请求体
接口测试平台代码实现106:登录态接口-2
大部分内容我们仍然可以借鉴普通接口调试层的内容,所以打开P_apis.html,找到那个登陆态的div,开始进行添加内部控件,但是也不能全都照搬过来,有一些就不需要,比如接口名字。而因为这个接口一个项目基本也就设置一次,所以优化便利性这种事的优先级不高,精力还是留给那些使用频率高的功能上吧
我去热饭
2022/05/19
1K0
接口测试平台代码实现106:登录态接口-2
接口测试平台代码实现48: 自动异常测试-1
本功能属于番外,不是必须的。长度大概3章节。难度不高,也算给大家休息一下。
我去热饭
2022/05/19
5510
接口测试平台代码实现48: 自动异常测试-1
接口测试平台代码实现114:登录态接口-10
这位博主在上一节课,成功实现了登陆状态嵌入到接口库的功能,本节会继续嵌入到用例库中。
我去热饭
2022/05/19
3160
接口测试平台代码实现114:登录态接口-10
接口测试平台代码实现111:登录态接口-7
我们先在打开项目的P_apis.html,找到调试弹层,先找个位置给它加上这个登陆态的开关:
我去热饭
2022/05/19
2510
接口测试平台代码实现111:登录态接口-7
接口测试平台代码实现104:GraphQL-4
本节开始我要一步搞定用例库相关的设置,首先打开P_cases.html,找到步骤详情页的请求体类型部分,然后添加相关dom:
我去热饭
2022/05/19
2350
接口测试平台代码实现104:GraphQL-4
接口测试平台代码实现58-首页重构6
我们记得,我们都是用a标签超链接 来显示一个一个的请求记录。那么他们点击的href属性,我们指定的是触发一个叫home_log_show()的js函数,传入这个记录的id,当然这个函数还没有写。
我去热饭
2022/05/19
6140
接口测试平台代码实现58-首页重构6
推荐阅读
相关推荐
接口测试平台代码实现40:修改bug
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验