前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 多进程处理数据

Python 多进程处理数据

作者头像
Michael阿明
发布2022-10-24 14:53:10
6310
发布2022-10-24 14:53:10
举报
文章被收录于专栏:Michael阿明学习之路

文章目录

1. multiprocessing.Process

代码语言:javascript
复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time


class compute_process(multiprocessing.Process):  # 计算处理进程
    def __init__(self, input_datafile, output_datafile):
        super(compute_process, self).__init__()
        self.input_datafile = input_datafile
        self.output_datafile = output_datafile

    def compute(self):
        with open(self.output_datafile, 'w') as fout:
            with open(self.input_datafile, 'r') as fin:
                for line in fin:
                    fout.write(f'{line.strip()} out\n')

    def run(self):
        self.compute()
        print(f'finish compute process with {self.input_datafile}')


if __name__ == '__main__':
    num_workers = 4
    print('start compute')
    input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
    [p.start() for p in processes]
    # [p.join() for p in processes]  # 等待子进程结束在执行主进程
    for o in output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

如果 没有 join ,主进程直接会执行后续代码

输出:

代码语言:javascript
复制
start compute
not exists ./test/out1.txt
not exists ./test/out2.txt
not exists ./test/out3.txt
not exists ./test/out4.txt
congratulations finish
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/4.txt

如果 打开 join 就会等待子进程结束才会继续执行

代码语言:javascript
复制
start compute
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/1.txt
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

多进程也会相应消耗更多倍的资源,可以根据资源情况,设置进程数量来限制

代码语言:javascript
复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time
import math


class compute_process(multiprocessing.Process):
    def __init__(self, input_datafile, output_datafile):
        super(compute_process, self).__init__()
        self.input_datafile = input_datafile
        self.output_datafile = output_datafile

    def compute(self):
        with open(self.output_datafile, 'w') as fout:
            with open(self.input_datafile, 'r') as fin:
                for line in fin:
                    fout.write(f'{line.strip()} out\n')

    def run(self):
        self.compute()
        print(f'finish compute process with {self.input_datafile}')


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    for idx in range(math.ceil(len(all_input_datafiles) / num_workers)):
        input_datafiles = all_input_datafiles[idx * num_workers: (idx + 1) * num_workers]
        output_datafiles = all_output_datafiles[idx * num_workers: (idx + 1) * num_workers]
        processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
        print(f'idx: {idx}, len of sub processes: {len(processes)}')
        [p.start() for p in processes]
        [p.join() for p in processes]  # 等待子进程结束在执行主进程
    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出:

代码语言:javascript
复制
start compute
idx: 0, len of sub processes: 3
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
idx: 1, len of sub processes: 1
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

但是上面 for 循环有个问题,一次循环中需要等 耗时最长的子进程 结束才能开始下一个循环

2. multiprocessing.Pool.starmap

我把 1.txt 文件换成一个上百万行的文件

代码语言:javascript
复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os

def compute_func(input_datafile, output_datafile, sql):
    with open(output_datafile, 'w') as fout:
        with open(input_datafile, 'r') as fin:
            for line in fin:
                fout.write(f'{line.strip()} out\n')
    print(f'finish compute process with {input_datafile} and sql {sql}')
    return output_datafile


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    sql = ['sql1', 'sql2', 'sql3', 'sql4']

    pool = multiprocessing.Pool(processes=num_workers)

    outputs = pool.starmap(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
    # 第二个参数 是 第一个参数 函数的参数
    
    # map版本是阻塞的
    print('outputs', outputs)

    print('pool finish')
    # pool.close()

    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出

代码语言:javascript
复制
start compute
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

可以看出 1.txt 文件是最后执行完

3. multiprocessing.Pool.starmap_async

代码语言:javascript
复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os

def compute_func(input_datafile, output_datafile, sql):
    with open(output_datafile, 'w') as fout:
        with open(input_datafile, 'r') as fin:
            for line in fin:
                fout.write(f'{line.strip()} out\n')
    print(f'finish compute process with {input_datafile} and sql {sql}')
    return output_datafile


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    sql = ['sql1', 'sql2', 'sql3', 'sql4']

    pool = multiprocessing.Pool(processes=num_workers)

    outputs_async = pool.starmap_async(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
    # starmap_async 异步
    print('outputs_async', outputs_async)
    outputs_async = [o for o in outputs_async.get()]  # get 会等待所有子进程完成
    print('outputs_async', outputs_async)


    print('pool finish')
    # pool.close()
    # pool.join()  # 异步的话,如果没有 get 结果,这里需要join,不然会直接执行后续代码

    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出:

代码语言:javascript
复制
start compute
outputs_async <multiprocessing.pool.MapResult object at 0x000002D15FC86BE0>
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs_async ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

如果把 get 结果行删掉,且下面没有 join 函数等待,最后没有输出文件生成

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1. multiprocessing.Process
  • 2. multiprocessing.Pool.starmap
  • 3. multiprocessing.Pool.starmap_async
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档