接着上面继续拓展,补充说说获取函数返回值。 上面是通过成功后的回调函数来获取返回值
,这次说说自带的方法:
import time
from multiprocessing import Pool
def test(x):
"""开平方"""
time.sleep(1)
return x * x
def main():
pool = Pool()
task = pool.apply_async(test, (10, ))
print(task)
try:
task.get(timeout=1) # raises multiprocessing.TimeoutError
except Exception:
print("超时了~")
if __name__ == '__main__':
main()
输出:( apply_async
返回一个 ApplyResult
类,里面有个get方法可以获取返回值)
<multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8>
超时了~
再举个例子,顺便把 Pool
里面的 map
和 imap
方法搞个案例(类比jq)
import time
from multiprocessing import Pool
def test(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
task = pool.apply_async(test, (10, ))
print(task.get(timeout=1))
obj_list = pool.map(test, range(10))
print(obj_list)
# 返回一个可迭代类的实例对象
obj_iter = pool.imap(test, range(10))
print(obj_iter)
next(obj_iter)
for i in obj_iter:
print(i, end=" ")
输出:
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<multiprocessing.pool.IMapIterator object at 0x7ff7f9734198>
1 4 9 16 25 36 49 64 81
微微看一眼源码:(基础忘了可以查看==> 点我 )
class IMapIterator(object):
def __init__(self, cache):
self._cond = threading.Condition(threading.Lock())
self._job = next(job_counter)
self._cache = cache
self._items = collections.deque()
self._index = 0
self._length = None
self._unsorted = {}
cache[self._job] = self
def __iter__(self):
return self # 返回一个迭代器
# 实现next方法
def next(self, timeout=None):
with self._cond:
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration from None
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration from None
raise TimeoutError from None
success, value = item
if success:
return value
raise value
......
扩展:优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517
官方文档:https://docs.python.org/3/library/subprocess.html
还记得之前李代桃僵的 execlxxx
系列吗?
这不, subprocess
就是它的一层封装,当然了要强大的多,先看个例子:(以 os.execlp
的例子为引)
import subprocess
def main():
# os.execlp("ls", "ls", "-al") # 执行Path环境变量可以搜索到的命令
result = subprocess.run(["ls", "-al"])
print(result)
if __name__ == '__main__':
main()
输出
总用量 44
drwxrwxr-x 2 dnt dnt 4096 8月 7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月 6 08:01 ..
-rw-rw-r-- 1 dnt dnt 151 8月 3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt 723 8月 5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt 501 8月 3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月 6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt 340 8月 7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt 481 8月 7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt 652 8月 5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt 191 8月 7 17:33 3.subprocess.py
CompletedProcess(args=['ls', '-al'], returncode=0)
现在看下官方的文档描述来理解一下:
r"""
具有可访问I / O流的子进程
Subprocesses with accessible I/O streams
此模块允许您生成进程,连接到它们输入/输出/错误管道,并获取其返回代码。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.
完整文档可以查看:https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.
Main API
========
run(...): 运行命令,等待它完成,然后返回`CompletedProcess`实例。
Runs a command, waits for it to complete,
then returns a CompletedProcess instance.
Popen(...): 用于在新进程中灵活执行命令的类
A class for flexibly executing a command in a new process
Constants(常量)
---------
DEVNULL: 特殊值,表示应该使用`os.devnull`
Special value that indicates that os.devnull should be used
PIPE: 表示应创建`PIPE`管道的特殊值
Special value that indicates a pipe should be created
STDOUT: 特殊值,表示`stderr`应该转到`stdout`
Special value that indicates that stderr should go to stdout
Older API(尽量不用,说不定以后就淘汰了)
=========
call(...): 运行命令,等待它完成,然后返回返回码。
Runs a command, waits for it to complete, then returns the return code.
check_call(...): Same as call() but raises CalledProcessError()
if return code is not 0(返回值不是0就引发异常)
check_output(...): 与check_call()相同,但返回`stdout`的内容,而不是返回代码
Same as check_call but returns the contents of stdout instead of a return code
getoutput(...): 在shell中运行命令,等待它完成,然后返回输出
Runs a command in the shell, waits for it to complete,then returns the output
getstatusoutput(...): 在shell中运行命令,等待它完成,然后返回一个(exitcode,output)元组
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""
其实看看源码很有意思:(内部其实就是调用的 os.popen
【进程先导篇讲进程守护的时候用过】)
def run(*popenargs, input=None, capture_output=False,
timeout=None, check=False, **kwargs):
if input is not None:
if 'stdin' in kwargs:
raise ValueError('stdin和输入参数可能都不会被使用。')
kwargs['stdin'] = PIPE
if capture_output:
if ('stdout' in kwargs) or ('stderr' in kwargs):
raise ValueError('不能和capture_outpu一起使用stdout 或 stderr')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
raise TimeoutExpired(
process.args, timeout, output=stdout, stderr=stderr)
except: # 包括KeyboardInterrupt的通信处理。
process.kill()
# 不用使用process.wait(),.__ exit__为我们做了这件事。
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(
retcode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
返回值类型: CompletedProcess
# https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):
def __init__(self, args, returncode, stdout=None, stderr=None):
self.args = args
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
def __repr__(self):
"""对象按指定的格式显示"""
args = [
'args={!r}'.format(self.args),
'returncode={!r}'.format(self.returncode)
]
if self.stdout is not None:
args.append('stdout={!r}'.format(self.stdout))
if self.stderr is not None:
args.append('stderr={!r}'.format(self.stderr))
return "{}({})".format(type(self).__name__, ', '.join(args))
def check_returncode(self):
"""如果退出代码非零,则引发CalledProcessError"""
if self.returncode:
raise CalledProcessError(self.returncode, self.args, self.stdout,
self.stderr)
再来个案例体会一下方便之处:
import subprocess
def main():
result = subprocess.run(["ping", "www.baidu.com"])
print(result.stdout)
if __name__ == '__main__':
main()
图示:
再来个强大的案例(交互的程序都可以,比如 ftp
, nslookup
等等): popen1.communicate
import subprocess
def main():
process = subprocess.Popen(
["ipython3"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
# 对pstree进行交互
out, err = process.communicate(input=b'print("hello")', timeout=3)
print("Out:%s\nErr:%s" % (out.decode(), err.decode()))
except TimeoutError:
# 如果超时到期,则子进程不会被终止,需要自己处理一下
process.kill()
out, err = process.communicate()
print("Out:%s\nErr:%s" % (out.decode(), err.decode()))
if __name__ == '__main__':
main()
输出:
IPython 6.4.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: hello
In [2]: Do you really want to exit ([y]/n)?
Err:
注意点:如果超时到期,则子进程不会被终止,需要自己处理一下(官方提醒)
这个等会说进程间通信还会说,所以简单举个例子,老规矩拿 ps aux|grep bash
说事:
import subprocess
def main():
# ps aux | grep bash
# 进程1获取结果
p1 = subprocess.Popen(["ps", "-aux"], stdout=subprocess.PIPE)
# 得到进程1的结果再进行筛选
p2 = subprocess.Popen(["grep", "bash"], stdin=p1.stdout, stdout=subprocess.PIPE)
# 关闭写段(结果已经获取到进程2中了,防止干扰显示)
p1.stdout.close()
# 与流程交互:将数据发送到stdin并关闭它。
msg_tuple = p2.communicate()
# 输出结果
print(msg_tuple[0].decode())
if __name__ == '__main__':
main()
输出:(以前案例:进程间通信~PIPE匿名管道)
dnt 2470 0.0 0.1 24612 5236 pts/0 Ss 06:01 0:00 bash
dnt 2512 0.0 0.1 24744 5760 pts/1 Ss 06:02 0:00 bash
dnt 20784 0.0 0.1 24692 5588 pts/2 Ss+ 06:21 0:00 /bin/bash
dnt 22377 0.0 0.0 16180 1052 pts/1 S+ 06:30 0:00 grep bash