先写几个问号来概况下今天准备说的内容:(谜底自己解开,文中都有)
Ctrl+C
终止进程的本质吗?你知道 Kill-9pid
的真正含义吗?僵尸进程
和 孤儿进程
的悲催生产史吗? 孤儿找干爹
, 僵尸送往生
想知道不?李代桃僵
吗? ps aux|grep xxx
的背后到底隐藏了什么?密密私语
等着你来查看哦~关于帮助文档的说明:
man
查看,eg: man2pipe
help
查看,eg: help(os.pipe)
正式讲课之前,先说些基本概念,难理解的用程序跑跑然后再理解:如有错误欢迎批评指正
并发 :一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。
并行 :当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。(在同一个时间段内,两个或多个程序执行,有时间上的重叠)
通俗的举个例子:
小明、小潘、小张、小康去食堂打饭,4个小伙子Coding了3天,饿爆了,现在需要1分钟内让他们都吃上饭,不然就有可怕的事情发生。
按照正常的流程,1分钟可能只够他们一个人打饭,这不行啊,于是有了几种处理方法:
并发:快快快,一人先吃一口,轮着来,一直喂到你们都饱了(只有一个食堂打饭的窗口)(单核CPU)
并行:
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开两个浏览器就启动了两个浏览器进程。
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。
由于每个进程至少要干一件事,所以,一个进程至少有一个线程。像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。
通俗讲:线程是最小的执行单元,而进程由至少一个线程组成。 如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间
PS:进程5态下次正式讲程序的时候会说,然后就是==> 程序实战不会像今天这样繁琐的,Code很简单,但是不懂这些基本概念往后会吃很多亏,逆天遇到太多坑了,所以避免大家入坑,简单说说这些概念和一些偏底层的东西~看不懂没事,有个印象即可,以后遇到问题至少知道从哪个方向去解决
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/base
(linux/unix)操作系统提供了一个 fork()
系统调用。普通的函数调用,调用一次,返回一次,但是 fork()
一次调用,两次返回。
因为操作系统自动把父进程复制了一份,分别在父进程和子进程内返回。为了便于区分,操作系统是这样做的:子进程永远返回0,而父进程返回子进程的ID
查看下帮助文档:
import oshelp(os.fork)Help on built-in function fork in module posix:fork() Fork a child process. Return 0 to child process and PID of child to parent process.
我们来跑个程序验证一下:(PID返回值如果小于0一般都是出错了)
import osdef main(): print("准备测试~PID:%d" % os.getpid()) pid = os.fork() if pid == 0: print("子进程:PID:%d,PPID:%d" % (os.getpid(), os.getppid())) elif pid > 0: print("父进程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
结果:
准备测试~PID:11247父进程:PID:11247,PPID:11229子进程:PID:11248,PPID:11247
可以查看下这个进程是啥:
这个指令如果还不熟悉,Linux基础得好好复习下了:https://www.cnblogs.com/dunitian/p/4822807.html,简单分析下吧:a是查看所有(可以联想ls -a),u是显示详细信息,x是把不依赖终端的进程也显示出来(终端可以理解为:人与机器交互的那些)
技巧:指令学习可以递增式学习: ps
, ps a
ps au
ps aux
ps ajx
现在验证一下复制一份是什么意思:(代码原封不动,只是在最下面添加了一行输出)
import osdef main(): print("准备测试~PID:%d" % os.getpid()) pid = os.fork() # 子进程被父进程fork出来后,在fork处往下执行 if pid == 0: print("子进程:PID:%d,PPID:%d" % (os.getpid(), os.getppid())) elif pid > 0: print("父进程:PID:%d,PPID:%d" % (os.getpid(), os.getppid())) print("PID:%d,我是卖报的小行家,大风大雨都不怕" % os.getpid())if __name__ == '__main__': main()
输出:(父子进程的执行顺序是系统调度决定的)
准备测试~PID:13081父进程:PID:13081,PPID:9388PID:13081,我是卖报的小行家,大风大雨都不怕子进程:PID:13083,PPID:13081PID:13083,我是卖报的小行家,大风大雨都不怕
的确是Copy了一份,Code都一样(玩过逆向的应该知道,这份Code其实就放在了 .text
(代码段)里面)
子进程被父进程fork出来后,在fork处往下执行(Code和父进程一样),这时候他们就为了抢CPU各自为战了
最后验证一下:各个进程地址空间中数据是完全独立的(有血缘关系的则是:读时共享,写时复制,比如父子进程等)
import osdef main(): num = 100 pid = os.fork() # 子进程 if pid == 0: num += 10 elif pid > 0: num += 20 print("PID:%d,PPID:%d,Num=%d" % (os.getpid(), os.getppid(), num))if __name__ == '__main__': main()
输出:(进程间通信下一节课会系统的讲,今天只谈Linux和概念)
PID:6369,PPID:6332,Num=120PID:6376,PPID:6369,Num=110
扩展:(简单了解下即可)
init
or systemd
)先看看Linux启动的图示:(图片来自网络)
查看一下init进程
CentOS进行了优化管理~ systemd
其实程序开机启动方式也可以知道区别了: systemctl start mongodb.service
and sudo/etc/init.d/ssh start
Win系列的0号进程:
第5点的说明:(以远程CentOS服务器为例) pstree -ps
systemd(1)─┬─NetworkManager(646)─┬─{NetworkManager}(682) │ └─{NetworkManager}(684) ├─agetty(1470) ├─auditd(600)───{auditd}(601) ├─crond(637) ├─dbus-daemon(629)───{dbus-daemon}(634) ├─firewalld(645)───{firewalld}(774) ├─lvmetad(483) ├─master(1059)─┬─pickup(52930) │ └─qmgr(1061) ├─polkitd(627)─┬─{polkitd}(636) │ ├─{polkitd}(639) │ ├─{polkitd}(640) │ ├─{polkitd}(642) │ └─{polkitd}(643) ├─rsyslogd(953)─┬─{rsyslogd}(960) │ └─{rsyslogd}(961) ├─sshd(950)───sshd(50312)───sshd(50325)───bash(50326)───pstree(54258) ├─systemd-journal(462) ├─systemd-logind(626) ├─systemd-udevd(492) └─tuned(951)─┬─{tuned}(1005) ├─{tuned}(1006) ├─{tuned}(1007) └─{tuned}(1048)
再看一个例子:
[dnt@localhost ~]$ pstree dnt -pssshd(50325)───bash(50326)───pstree(54471)[dnt@localhost ~]$ pstree 50325 -pssystemd(1)───sshd(950)───sshd(50312)───sshd(50325)───bash(50326)───pstree(54489)
其实你可以在虚拟机试试干死1号进程,就到了登录页面了【现在大部分系统都不让你这么干了】 kill -9 1
-bash: kill: (1) - 不允许的操作
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/base
先看看定义:
孤儿进程 :一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
僵尸进程 :一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。
通俗讲就是:
孤儿进程:你爸在你之前死了,你成了孤儿,然后你被进程1收养,你死后的事宜你干爹帮你解决
僵尸进程:你挂了,你爸忙着干其他事情没有帮你安葬,你变成了孤魂野鬼,你的怨念一直长存世间
举个例子看看:
import osimport timedef main(): pid = os.fork() if pid == 0: print("子进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) time.sleep(1) # 睡1s elif pid > 0: print("父进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) print("pid=%d,over" % os.getpid())if __name__ == '__main__': main()
输出:
import osimport timedef main(): pid = os.fork() if pid == 0: print("子进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) elif pid > 0: print("父进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) while 1: print("父亲我忙着呢,没时间管你个小屁孩") time.sleep(1) print("pid=%d,over" % os.getpid())if __name__ == '__main__': main()
输出+测试:
其实僵尸进程的危害真的很大,这也就是为什么有些人为了追求效率过度调用底层,不考虑自己实际情况最后发现还不如用自托管的效率高
僵尸进程是杀不死的,必须杀死父类才能彻底解决它们,下面说说怎么让父进程为子进程‘收尸’
讲解之前先简单分析一下上面的Linux指令(防止有人不太清楚)
kill-9pid
==> 以前逆天说过,是无条件杀死进程,其实这种说法不准确,应该是发信号给某个进程
-9指的就是信号道里面的 SIGKILL
(信号终止),你写成 kill-SIGKILL pid
也一样
-9只是系统给的一种简化写法(好像记得1~31信号,各个Linux中都差不多,其他的有点不一样)
dnt@MZY-PC:~/桌面/work/PC/python/Thread/Linux kill -l 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR111) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+338) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+843) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+1348) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-1253) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-758) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-263) SIGRTMAX-1 64) SIGRTMAX
一般搜索进程中的某个程序一般都是用这个: ps-aux|grep xxx
( |
其实就是管道,用于有血缘关系进程间通信,等会讲)
如果安装了 pstree
就更方便了: pstree13570-ps
(Ubuntu自带,CentOS要装下 yum install psmisc
)
systemd(1)───systemd(1160)───gnome-terminal-(21604)───bash(8169)───python3(13570)───python3(13571)
扩展:我们平时 Ctrl+C
其实就是给 2) SIGINT
发一个信号
代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/wait
步入正题:
Python的Wait和C系列的稍有不同,这边重点说说Python:
help(os.wait)Help on built-in function wait in module posix:wait() Wait for completion of a child process. Returns a tuple of information about the child process: (pid, status)
os.wait()
返回一个元组,第一个是进程id,第二个是状态,正常退出是0,被九号信号干死就返回9
来个案例:
import osimport timedef main(): pid = os.fork() if pid == 0: print("子进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) elif pid > 0: print("父进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) wpid, status = os.wait() print(wpid) print(status) print("pid=%d,over" % os.getpid())if __name__ == '__main__': main()
输出:
父进程:Pid=22322,PPID=10139子进程:Pid=22323,PPID=22322pid=22323,over223230pid=22322,over
演示一下被9号信号干死的情况:
import osimport timedef main(): pid = os.fork() if pid == 0: print("子进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) while 1: print("孩子老卵,就是不听话") time.sleep(1) elif pid > 0: print("父进程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid())) wpid, status = os.wait() # 调用一次只能回收一次,想都回收,就来个while循环,-1则退出 print(wpid) print(status) if status == 0: print("正常退出") elif status == 9: print("被信号9干死了") print("pid=%d,over" % os.getpid())if __name__ == '__main__': main()
输出:
扩展:(回收所有子进程,status返回-1代表没有子进程了,Python里面没有子进程会触发异常)
import osimport timedef main(): i = 0 while i < 3: pid = os.fork() # 防止产生孙子进程(可以自己思索下) if pid == 0: break i += 1 if i == 0: print("i=%d,子进程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid())) time.sleep(1) elif i == 1: print("i=%d,子进程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid())) time.sleep(1) elif i == 2: print("i=%d,子进程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid())) time.sleep(3) while 1: print("(PID=%d)我又老卵了,怎么滴~" % os.getpid()) time.sleep(3) elif i==3: # 循环结束后,父进程才会退出,这时候i=3 print("i=%d,父进程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid())) while True: print("等待回收子进程") try: wpid, status = os.wait() print(wpid) print(status) if status == 0: print("正常退出") elif status == 9: print("被信号9干死了") except OSError as ex: print(ex) break print("pid=%d,over,ppid=%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
演示:看最后一句输出,父进程扫尾工作做完就over了
代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/waitpid
上面说的 wait
方法是阻塞进程的一种方式, waitpid
可以设置不阻塞进程
help(os.waitpid)Help on built-in function waitpid in module posix:waitpid(pid, options, /) Wait for completion of a given child process. Returns a tuple of information regarding the child process: (pid, status) The options argument is ignored on Windows.
等待进程id为pid的进程结束,返回一个tuple,包括进程的进程ID和退出信息(和os.wait()一样),参数options会影响该函数的行为。在默认情况下,options的值为0。
官方原话是这样的:(英语好的可以看看我有没有翻译错)
If pid is greater than 0, waitpid() requests status information for that specific process.If pid is 0, the request is for the status of any child in the process group of the current process. If pid is -1, the request pertains to any child of the current process. If pid is less than -1, status is requested for any process in the process group -pid (the absolute value of pid).
options:(宏)
os.WNOHANG - 如果没有子进程退出,则不阻塞waitpid()调用os.WCONTINUED - 如果子进程从stop状态变为继续执行,则返回进程自前一次报告以来的信息。os.WUNTRACED - 如果子进程被停止过而且其状态信息还没有报告过,则报告子进程的信息。
补充:
用法和 wait
差不多,就是多了一个不阻塞线程的方法:
import osimport timedef main(): pid = os.fork() if pid == 0: print("[子进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid())) time.sleep(2) elif pid > 0: print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid())) while True: try: wpid, status = os.waitpid(0, os.WNOHANG) if wpid > 0: print("回收子进程wpid:%d,状态status:%d" % (wpid, status)) except OSError: print("没有子进程了") break print("父进程忙着挣钱养家呢~") time.sleep(3) print("[over]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
输出:
[父进程]PID:1371,PPID:29604[子进程]PID:1372,PPID:1371父进程忙着挣钱养家呢~[over]PID:1372,PPID:1371回收子进程wpid:1372,状态status:0父进程忙着挣钱养家呢~没有子进程了[over]PID:1371,PPID:29604
代码实例:https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/Linux/wait3.py
help(os.wait3)Help on built-in function wait3 in module posix:wait3(options) Wait for completion of a child process. Returns a tuple of information about the child process: (pid, status, rusage)
help(os.wait4)Help on built-in function wait4 in module posix:wait4(pid, options) Wait for completion of a specific child process. Returns a tuple of information about the child process: (pid, status, rusage)
这个是Python扩展的方法,用法和 wait、waitpid
差不多,我就不一个个的举例子了,以 wait3
为例
import osimport timedef main(): pid = os.fork() if pid == 0: print("[子进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid())) time.sleep(2) elif pid > 0: print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid())) while True: try: wpid, status, rusage = os.wait3(os.WNOHANG) if wpid > 0: print("回收子进程wpid:%d,状态status:%d\n详细信息:%s" % (wpid, status, rusage)) except OSError: print("没有子进程了") break print("父进程忙着挣钱养家呢~") time.sleep(3) print("[over]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
输出
[父进程]PID:2638,PPID:29604[子进程]PID:2639,PPID:2638父进程忙着挣钱养家呢~[over]PID:2639,PPID:2638回收子进程wpid:2639,状态status:0详细信息:resource.struct_rusage(ru_utime=0.0052179999999999995, ru_stime=0.0052179999999999995, ru_maxrss=7032, ru_ixrss=0, ru_idrss=0, ru_isrss=0, ru_minflt=869, ru_majflt=0, ru_nswap=0, ru_inblock=0, ru_oublock=0, ru_msgsnd=0, ru_msgrcv=0, ru_nsignals=0, ru_nvcsw=2, ru_nivcsw=0)父进程忙着挣钱养家呢~没有子进程了[over]PID:2638,PPID:29604
代码实例:https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/Linux/execl.py
之前有说fork后,相当于copy了一份,.text里面放的是代码段,如果想要调用另一个程序,可以使用 execlxxx
,他会把.text里面的代码替换掉
help(os.execl)Help on function execl in module os:execl(file, *args) execl(file, *args) Execute the executable file with argument list args, replacing the current process.
help(os.execlp)Help on function execlp in module os:execlp(file, *args) execlp(file, *args) Execute the executable file (which is searched for along PATH) with argument list args, replacing the current process.
来看个例子, os.execl("绝对路径","参数或者指令")
or os.execlp("Path中包含的命令","参数或者指令")
提示:查看命令路径:eg: which ls
import osdef main(): pid = os.fork() if pid == 0: # 第二个参数不能为None,,第一个路径为绝对路径 eg:os.execl("/bin/ls"," ") os.execl("/bin/ls", "ls", "-al") # os.execlp("ls", "ls", "-al") # 执行Path环境变量可以搜索到的命令 print("exec函数族会替换代码,我是不会被执行的,除非上面的出问题了") print("-" * 10) # 父进程执行一次,子进程不会执行if __name__ == '__main__': main()
注意输出信息: os.execlp("ls","ls","-al")
----------总用量 28drwxrwxr-x 6 dnt dnt 4096 7月 26 05:23 .drwxrwxr-x 9 dnt dnt 4096 7月 24 20:55 ..drwxr-xr-x 2 dnt dnt 4096 7月 19 14:47 .ipynb_checkpointsdrwxrwxr-x 6 dnt dnt 4096 7月 26 06:27 Linux-rw-rw-r-- 1 dnt dnt 93 7月 26 05:49 temp.pydrwxrwxr-x 2 dnt dnt 4096 7月 24 15:29 .vscodedrwxrwxr-x 2 dnt dnt 4096 7月 25 12:18 进程
代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/进程通信/1.file
讲管道之前,先说个简单的:通过文件进行通信
来一个简单读写的案例先适应下文件操作:
!ls# 这种写法类似于Net的 using 托管with open("test.txt", "w") as f: f.write("从前有座山,山上有座庙,庙里有个老和尚和一个小和尚。有一天,老和尚对小和尚说:")with open("test.txt", "r") as f: data = f.read() print(data)!ls
并发编程~进程先导篇.ipynb从前有座山,山上有座庙,庙里有个老和尚和一个小和尚。有一天,老和尚对小和尚说:test.txt 并发编程~进程先导篇.ipynb
来个简单的案例:
import osimport timedef main(): pid = os.fork() if pid > 0: print("父进程(pid=%d)开始写入:" % os.getpid()) with open(str(pid), "w") as f: f.write("[父进程写入]从前有座山,山上有座庙,庙里有个老和尚和一个小和尚。有一天,老和尚对小和尚说:\n") time.sleep(2) print("父进程(pid=%d)开始读取:" % os.getpid()) with open(str(pid), "r") as f: print(f.read()) wpid, status = os.wait() # 收尸 print("pid=%d已经回收,status:%d" % (wpid, status)) elif pid == 0: print("子进程(pid=%d)开始读取:" % os.getpid()) with open(str(os.getpid()), "r") as f: print(f.read()) print("子进程(pid=%d)开始追加:" % os.getpid()) with open(str(os.getpid()), "a") as f: # 追加 f.write("[子进程追加]从前有座山,山上有座庙,庙里有个老和尚和一个小和尚。有一天,老和尚对小和尚说:\n") print("\n进程(pid=%d)完蛋了" % os.getpid())if __name__ == '__main__': main()
图示:
代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/进程通信/2.Queue
from multiprocessing import Queuehelp(Queue)Help on method Queue in module multiprocessing.context:Queue(maxsize=0) method of multiprocessing.context.DefaultContext instance Returns a queue object
实例化对象帮助文档:
from multiprocessing import Queueq = Queue(2)help(q)Help on Queue in module multiprocessing.queues object:class Queue(builtins.object) | Methods defined here: | | __getstate__(self) | | __init__(self, maxsize=0, *, ctx) | Initialize self. See help(type(self)) for accurate signature. | | __setstate__(self, state) | | cancel_join_thread(self) | | close(self) | | empty(self) | | full(self) | | get(self, block=True, timeout=None) | | get_nowait(self) | | join_thread(self) | | put(self, obj, block=True, timeout=None) | | put_nowait(self, obj) | | qsize(self) | | ---------------------------------------------------------------------- | Data descriptors defined here: | | __dict__ | dictionary for instance variables (if defined) | | __weakref__ | list of weak references to the object (if defined)
详细内容(如:非阻塞、池中应用等)下次讲代码的时候会详说,简单看个例子:
import osfrom multiprocessing import Queuedef main(): q = Queue(1) # 创建一个容量为1的队列(只能put接受1条,get取出后才可以放) pid = os.fork() if pid == 0: print("[子进程]:pid:%d,ppid:%d" % (os.getpid(), os.getppid())) q.put("父亲大人,我可以出去玩吗?") output = q.get() print("[子进程]收到父亲大人回复:%s" % output) elif pid > 0: print("[父进程]:pid:%d,ppid:%d" % (os.getppid(), os.getppid())) output = q.get() # 儿子每天出去都会说,等待ing print("[父进程]收到儿子的话:%s" % output) q.put("准了") wpid, status = os.wait() print("[父进程]帮pid:%d收尸,状态:%d" % (wpid, status)) print("[OVER]:pid:%d,ppid:%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
输出:
[父进程]:pid:12403,ppid:12403[子进程]:pid:744,ppid:743[父进程]收到儿子的话:父亲大人,我可以出去玩吗?[子进程]收到父亲大人回复:准了[OVER]:pid:744,ppid:743[父进程]帮pid:744收尸,状态:0[OVER]:pid:743,ppid:12403
代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/进程通信/3.pipe
知识普及:
如果终端的概念还不清楚可以看之前的文章:https://www.cnblogs.com/dunitian/p/6658273.html
help(os.pipe)Help on built-in function pipe in module posix:pipe() Create a pipe. Returns a tuple of two file descriptors: (read_fd, write_fd)
匿名管道的模式其实我们平时都在用,只是不知道而已,比如: ps aux|grep"python"
这个 |
就是匿名管道
本质:内核的缓冲区,不占用磁盘空间(可以看成伪文件)【默认4k,相差不大的情况下系统会自动微调大小】
我们来看一下: ulimit-a
Ubuntu 18.04
core file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 14894max locked memory (kbytes, -l) 16384max memory size (kbytes, -m) unlimitedopen files (-n) 1024pipe size (512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 8192cpu time (seconds, -t) unlimitedmax user processes (-u) 14894virtual memory (kbytes, -v) unlimitedfile locks (-x) unlimited
CentOS 7.5
core file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 3543max locked memory (kbytes, -l) 64max memory size (kbytes, -m) unlimitedopen files (-n) 1024pipe size (512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 8192cpu time (seconds, -t) unlimitedmax user processes (-u) 3543virtual memory (kbytes, -v) unlimitedfile locks (-x) unlimited
原理:算法实现的环形队列(队列:先进先出)
特点:
4的意思是这样的(网上找个图,然后改造一下)
验证一下3:
为什么是3开始呢?查看一下源码:(https://github.com/python/cpython/blob/v3.7.0/Lib/pty.py)
STDIN_FILENO = 0 # 看这:文件描述符输入(读端)STDOUT_FILENO = 1 # 看这:文件描述符输出(写端)STDERR_FILENO = 2 # 已经被占用了0~2了,自然从3开始# 下面的不用你会,上面Code看完,我们的目的就达到了,下面看看即可def fork(): """fork() -> (pid, master_fd) Fork分叉后让子进程成为控制终端的会话领导者""" try: pid, fd = os.forkpty() # 设置会话领导 except (AttributeError, OSError): pass else: # 没有错误执行 if pid == CHILD: os.setsid() return pid, fd master_fd, slave_fd = openpty() pid = os.fork() if pid == CHILD: # 建立一个新的会话 os.setsid() os.close(master_fd) # 把子进程里面的 slave_fd 重定向到 stdin/stdout/stderr os.dup2(slave_fd, STDIN_FILENO) os.dup2(slave_fd, STDOUT_FILENO) os.dup2(slave_fd, STDERR_FILENO) if (slave_fd > STDERR_FILENO): os.close (slave_fd) # 显式打开tty,使它成为一个tty控制 tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR) os.close(tmp_fd) else: os.close(slave_fd) # Parent and child process. return pid, master_fd
画个大纲图理解一下:(读的时候关闭写,写的时候关闭读)
结合单向传输理解一下:(父子只能一个人写,另一个人只能读)
简单概况上图:子进程只读,父进程只写 or 子进程只写,父进程只读 (如果想要相互读写通信~两根管道走起)
简单分析一下 ps aux|grep python
,本来ps aux是准备在终端中输出的,现在写入内核缓冲区了,grep从内核缓冲区里面读取,把符合条件的输出到终端
终端文件描述获取:
import syssys.stdin.fileno() # STDIN_FILENO = 0:文件描述符输入(读端)sys.stdout.fileno() # STDOUT_FILENO = 1:看这:文件描述符输出(写端)
我们用程序实现一个同样效果的:( grep
有颜色,其实就是加了 --color=auto
)
import osimport sysdef main(): # 创建内核缓存区(伪文件) read_fd, write_fd = os.pipe() print("read_fd:%s\nwrite_fd:%s" % (read_fd, write_fd)) pid = os.fork() if pid > 0: print("[父进程]pid=%d,ppid=%d" % (os.getpid(), os.getppid())) # 写或者读,则需要关闭另一端(防止自己写自己读) os.close(read_fd) # dup2(oldfd,newfd) 把写端数据重定向到文件描述符输出端 os.dup2(write_fd, sys.stdout.fileno()) # STDOUT_FILENO==1 (文件描述符输出,写端) # 僵桃李代 os.execlp("ps", "ps", "aux") elif pid == 0: print("[子进程]pid=%d,ppid=%d" % (os.getpid(), os.getppid())) # 子进程现在需要读,关闭写段 os.close(write_fd) # dup2(oldfd,newfd) 把读端数据重定向到文件描述符输入端 os.dup2(read_fd, sys.stdin.fileno()) # STDOUT_FILENO == 0 (文件描述符输入,读端) # 僵桃李代 (默认是从终端读,重定向后从内核缓冲区读) os.execlp("grep", "grep", "python", "--color=auto")if __name__ == '__main__': main()
输出:(用到的函数: os.pipe()
and os.dup2(oldfd,newfd)
)
PS:在C系列里面如果你该关闭的fd没关,会资源浪费,python好像做了处理,没能够问题复现,所以还是建议父子一方只读,一方只写
概念再理解:fork了两个子进程,则文件描述符被复制了2份,大家文件描述符的3、4都指向了 pipe管道
的 read_fd
和 write_fd
来张图理解一下,哪些fd被close了(如果让子进程之间通信,父进程因为不读不写,所以读写都得关闭)
代码演示:(这次注释很全)
import osimport sysimport timedef main(): read_fd, write_fd = os.pipe() # 可以思考为啥在上面创建管道(提示.text代码段都一样) i = 0 while i < 2: pid = os.fork() # 防止子进程生猴子 if pid == 0: break i += 1 # 子进程1 if i == 0: print("[子进程%d]pid=%d,ppid=%d" % (i, os.getpid(), os.getppid())) # 准备重定向到写端,所以先关了读端 os.close(read_fd) os.dup2(write_fd, sys.stdout.fileno()) # STDOUT_FILENO == 1 (文件描述符输出,写端) # 僵桃李代 os.execlp("ps", "ps", "-aux") # 僵桃李代后,.text代码段都被替换了,自然不会执行 print("我是不会执行的,不信你看呗") elif i == 1: print("[子进程%d]pid=%d,ppid=%d" % (i, os.getpid(), os.getppid())) # 准备重定向到读端,所以先关了写端 os.close(write_fd) os.dup2(read_fd, sys.stdin.fileno()) # STDIN_FILENO == 0 (文件描述符输入,读端) # 僵桃李代 ”bash“是查找关键词,你写你想找的字符串即可 os.execlp("grep", "grep", "bash", "--color=auto") # 僵桃李代后,.text代码段都被替换了,自然不会执行 print("我是不会执行的,不信你看呗") elif i == 2: print("[父进程]pid=%d,ppid=%d" % (os.getpid(), os.getppid())) # 我不写不读 os.close(read_fd) os.close(write_fd) # 为了大家熟练掌握wait系列,这次用waitpid while (True): info = () try: info = os.waitpid(-1, os.WNOHANG) # 非阻塞的方式回收所有子进程 except OSError: break # waitpid返回-1的时候,Python会抛出异常 if info[0] > 0: print("父进程收尸成功:pid=%d,ppid=%d,状态status:%d" % (os.getpid(), os.getppid(), info[1])) print("父进程做其他事情...") time.sleep(0.005) # 休息 0.005s print("[父进程-遗言]pid=%d,ppid=%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
结果:
[父进程]pid=18678,ppid=27202[子进程0]pid=18679,ppid=18678[子进程1]pid=18680,ppid=18678父进程做其他事情...父进程做其他事情...父进程做其他事情...父进程做其他事情...dnt 4622 0.0 0.1 24880 5688 pts/2 Ss 05:28 0:00 bash父进程做其他事情...dnt 15419 0.0 0.1 25152 5884 pts/0 Ss+ 06:29 0:00 /bin/bashdnt 18680 0.0 0.0 16184 1044 pts/4 S+ 13:25 0:00 grep bash --color=autodnt 27202 0.0 0.1 25012 6052 pts/4 Ss 08:25 0:00 bash父进程收尸成功:pid=18678,ppid=27202,状态status:0父进程做其他事情...父进程收尸成功:pid=18678,ppid=27202,状态status:0父进程做其他事情...[父进程-遗言]pid=18678,ppid=27202
# 说管道读写之前,先复习个知识点:bts = "尴尬".encode()b_str = bts.decode()print(bts)print(b_str)
b'\xe5\xb0\xb4\xe5\xb0\xac'尴尬
上面知识点忘了可以复习一下:https://www.cnblogs.com/dotnetcrazy/p/9278573.html#1.2.字符串和编码
用到的函数:(这个就不需要使用 dup2
来重定向到终端了【有血缘关系的进程之间通信,并不依赖于终端显示】)
os.write(fd,str)
写入字符串到文件描述符 fd中. 返回实际写入的字符串长度
os.read(fd,n)
从文件描述符 fd 中读取最多 n 个字节,返回包含读取字节的字符串
如果文件描述符fd对应文件已达到结尾, 返回一个空字符串
举个父子间通信的例子(比C系列简单太多)【下次讲的通用Code会更简单】
import osdef close_fd(*fd_tuple_args): """关闭fd,fd_tuple_args是可变参数""" for item in fd_tuple_args: os.close(item[0]) os.close(item[1])def main(): # 管道是单向的,相互读写,那就创建两个管道 fd_tuple1 = os.pipe() # 进程1写,进程2读 fd_tuple2 = os.pipe() # 进程2写,进程1读 i = 0 while (i < 2): pid = os.fork() if pid == 0: break i += 1 # 子进程1 if i == 0: print("[子进程]pid:%d,ppid:%d" % (os.getpid(), os.getppid())) os.close(fd_tuple1[0]) # 进程1写,则关闭下读端 msg_str = "进程1说:兄弟,今天撸串吗?" os.write(fd_tuple1[1], msg_str.encode()) # 把字符串xxx转换成bytes # 不读的我关闭掉: os.close(fd_tuple2[1]) # 进程2写,我不需要写,关闭写端 bts = os.read(fd_tuple2[0], 1024) print("[子进程1]", bts.decode()) exit(0) # 退出后就不执行下面代码块语句了 # 子进程2 elif i == 1: print("[子进程2]pid:%d,ppid:%d" % (os.getpid(), os.getppid())) os.close(fd_tuple1[1]) # 进程2读,则关闭下写端 bts = os.read(fd_tuple1[0], 1024) print("[子进程2]", bts.decode()) # 不读的我关闭掉: os.close(fd_tuple2[0]) # 进程2写,关闭读端 msg_str = "进程2说:可以可以~" os.write(fd_tuple2[1], msg_str.encode()) # 把字符串xxx转换成bytes exit() # 不加参数默认是None # 父进程 elif i == 2: print("[父进程]pid:%d,ppid:%d" % (os.getpid(), os.getppid())) # 父进程不读不写,就看看 close_fd(fd_tuple1, fd_tuple2) # 收尸ing while True: try: wpid, status = os.wait() print("[父进程~收尸]子进程PID:%d 的状态status:%d" % (wpid, status)) except OSError: break # 子进程都exit()退出了,不会执行这句话了 print("[父进程遗言]pid:%d,ppid:%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
输出结果:
[父进程]pid:12002,ppid:27202[子进程2]pid:12004,ppid:12002[子进程]pid:12003,ppid:12002[子进程2] 进程1说:兄弟,今天撸串吗?[子进程1] 进程2说:可以可以~[父进程~收尸]子进程PID:12003 的状态status:0[父进程~收尸]子进程PID:12004 的状态status:0[父进程遗言]pid:12002,ppid:27202
队列的 get
, put
方法默认也是阻塞的,如果想非阻塞可以调用 get_nowait
和 put_nowait
来变成非阻塞,那pipe管道呢?
C系列一般使用 fcntl
来实现,Python进行了封装,我们可以通过 os.pipe2(os.O_NONBLOCK)
来设置非阻塞管道
help(os.pipe2)Help on built-in function pipe2 in module posix:pipe2(flags, /) Create a pipe with flags set atomically. Returns a tuple of two file descriptors: (read_fd, write_fd) flags can be constructed by ORing together one or more of these values: O_NONBLOCK, O_CLOEXEC.
举个例子:
import osimport timedef main(): r_fd, w_fd = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC) pid = os.fork() if pid == 0: print("子进程:pid=%d,ppid=%d" % (os.getpid(), os.getppid())) time.sleep(0.5) # 和父进程进行通信 os.close(r_fd) os.write(w_fd, "老爸,我出去玩了~".encode()) exit(0) # 子进程退出 elif pid > 0: print("父进程:pid=%d,ppid=%d" % (os.getpid(), os.getppid())) # 读儿子的留言 os.close(w_fd) b_msg = b"" while True: try: b_msg = os.read(r_fd, 1) # 没有数据就出错(一般都是等待一会,也可以和信号联合使用) except OSError: print("儿子怎么没有留言呢?") print("父进程:做其他事情...") if len(b_msg) > 0: break time.sleep(0.1) # 继续读剩下的消息 b_msg += os.read(r_fd, 1024) print("儿子留言:", b_msg.decode()) wpid, status = os.wait() print("帮儿子做扫尾工作:pid=%d,status=%d" % (wpid, status)) print("父进程遗言:pid=%d,status=%d" % (os.getpid(), os.getppid()))if __name__ == '__main__': main()
输出:
父进程:pid=31430,ppid=27202子进程:pid=31431,ppid=31430儿子怎么没有留言呢?父进程:做其他事情...儿子怎么没有留言呢?父进程:做其他事情...儿子怎么没有留言呢?父进程:做其他事情...儿子怎么没有留言呢?父进程:做其他事情...儿子怎么没有留言呢?父进程:做其他事情...父进程:做其他事情...儿子留言: 老爸,我出去玩了~帮儿子做扫尾工作:pid=31431,status=0父进程遗言:pid=31430,status=27202
扩展:
代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/进程通信/4.fifo
p
,大小为 0
的管道文件(伪文件,大小始终为0)对2的验证: 其实你用 ll
来查看,就是文件类型为 p
的文件(大小始终为0)
Linux底层提供了 mkfifo
函数,Python创建使用 os.mkfifo()
画个图来看3:
知识普及:
help(os.open)Help on built-in function open in module posix:open(path, flags, mode=511, *, dir_fd=None) Open a file for low level IO. Returns a file descriptor (integer). If dir_fd is not None, it should be a file descriptor open to a directory, and path should be relative; path will then be relative to that directory. dir_fd may not be implemented on your platform. If it is unavailable, using it will raise a NotImplementedError.
flags
-- 该参数可以是以下选项,多个使用 |
隔开:
很多人直接使用了Open方法 open(fifo_path,"r")
和 open(fifo_path,"w")
貌似也是可以的,但是不推荐
我们使用官方推荐的方法:
fifo操作非常简单,和文件IO操作几乎一样,看个无血缘关系进程通信的例子:
进程1源码:r_fifo.py
import osdef main(): file_name = "fifo_file" if not os.path.exists(file_name): os.mkfifo(file_name) fd = os.open(file_name, os.O_RDONLY) # 只读(阻塞) while True: b_msg = os.read(fd, 1024) if len(b_msg) > 0: print(b_msg.decode())if __name__ == '__main__': main()
进程2源码:w_fifo.py
import osimport timedef main(): file_name = "fifo_file" if not os.path.exists(file_name): os.mkfifo(file_name) fd = os.open(file_name, os.O_WRONLY) # 只写 while True: time.sleep(1) # 模拟一下实际生产环境下的 读快写慢 try: os.write(fd, "我是说话有魔性,喝水会长胖的小明同学".encode()) # 写入bytes except BrokenPipeError: print("如果读端全部关闭,管道破裂,进程自动被终止") breakif __name__ == '__main__': main()
做个读端的测试:
读写双测:(fifo文件大小始终为0,只是伪文件而已)
扩展一下,如果你通过终端读写呢?(同上)
再来个读写的案例
3.rw_fifo1.py
import osdef main(): file_name = "fifo_temp" if not os.path.exists(file_name): os.mkfifo(file_name) fd = os.open(file_name, os.O_RDWR) # 你输入os.O_rw就会有这个选项了,不用硬记 msg = os.read(fd, 1024).decode() # 阻塞的方式,不用担心 print("[进程2]%s" % msg) os.write(fd, "小明啊,你忘记你长几斤肉了?".encode())if __name__ == '__main__': main()
rw_fifo2.py
import osimport timedef main(): file_name = "fifo_temp" if not os.path.exists(file_name): os.mkfifo(file_name) fd = os.open(file_name, os.O_RDWR) # 你输入os.O_rw就会有这个选项了,不用硬记 os.write(fd, "小潘,撸串去不?".encode()) time.sleep(3) # 防止自己写的被自己读了 msg = os.read(fd, 1024).decode() # 阻塞的方式,不用担心 print("[进程1]]%s" % msg)if __name__ == '__main__': main()
来个父子间通信:(代码比较简单,和上面差不多,看看即可)
import osdef main(): file_name = "fifo_test" if not os.path.exists(file_name): os.mkfifo(file_name) fd = os.open(file_name, os.O_RDWR) # 读写方式打开文件描述符 (O_RDONLY | O_WRONLY) pid = os.fork() if pid == 0: print("子进程:PID:%d,PPID:%d" % (os.getpid(), os.getppid())) os.write(fd, "子进程说:老爸,我想出去玩".encode()) # 写 msg = os.read(fd, 1024).decode() # 读 print("[子进程]%s" % msg) elif pid > 0: print("父进程:PID:%d,PPID:%d" % (os.getpid(), os.getppid())) msg = os.read(fd, 1024).decode() # 阻塞方式,不用担心 print("[父进程]%s" % msg) os.write(fd, "父进程说:去吧乖儿子".encode()) # 给子进程收尸 wpid, status = os.wait() print("父进程收尸:子进程PID=%d,PPID=%d" % (wpid, status)) print("进程遗言:PID=%d,PPID=%d" % (os.getpid(), os.getppid())) # 剩下的代码段if __name__ == '__main__': main()
输出:
父进程:PID:21498,PPID:20943子进程:PID:21499,PPID:21498[父进程]子进程说:老爸,我想出去玩[子进程]父进程说:去吧乖儿子进程遗言:PID=21499,PPID=21498父进程收尸:子进程PID=21499,PPID=0进程遗言:PID=21498,PPID=20943
进程间通信~MMAP内存映射(常用)
进程间通信~Signal信号
进程守护
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。