多线程与多进程大约是后端工程师面试最常被问的几个问题之一了,网上也有不少资料对多线程与多进程进行了详细的介绍,这里,我们就不多做赘述了。
总之,灵活地使用多线程以及多进程可以大幅地提升程序的运行效率,尤其是针对爬虫或者线上模型调用等场景。因此,在我们的实际工作中,多线程与多进程无疑是一个非常常用的工具。
因此,下面,我们就针对python中的多线程以及多进程的实现进行简单的一个介绍。
多线程的定义这里不再过多赘述,他是从单一进程中分出多个线程,并行地执行某一批量任务。
由于本质上来说,多线程还是在单一进程的基础上的,因此它所拥有的资源并不会因此而增多,故并不适合执行cpu计算密集型的任务,对于这样的任务,多线程反而可能会拖累运行速度。
反之,对于cpu计算并不密集反而需要大量调用外部服务的任务,多线程就会是一个非常有效的提高资源利用的方法。
下面,我们来看一下python中多线程的基本用法。
python中多线程的实现事实上是非常简单的,只要简单地调用python内置的threading库即可快速地实现。
最简单的多线程的实现可以由以下几个部分构成:
但是,需要注意的是,线程的创建仅仅是发送了一个开始信号,与主线程是分离的,因此,主线程在发送了启动命令之后可以直接开始后续的代码执行,而同时线程就会去申请资源然后进行作业执行。
这样会带来一个问题就是正常情况下主线程无法知道各个线程的运行状况,而有些操作事实上是需要确认线程已经执行完成之后才能执行的。因此,大多数情况下,我们需要额外加入一个合并操作将子线程合并到主线程当中,此时主线程将会等到子线程运行完毕之后才会开始后续的代码执行。
给出一个最为基础的多线程使用样例如下:
import threading
def job():
print("hello world!")
return
def main():
thread = threading.Thread(target=job)
thread.start()
thread.join()
return
main()
此外,如果我们需要给线程的任务额外输入参数时,我们可以使用下述方式进行参数传递。
import threading
def job(message):
print(message)
return
def main():
thread = [threading.Thread(target=job, args=("hello world!", ))
thread.start()
thread.join()
return
main()
如前所述,线程是独立于主线程的,即使我们可以通过join
方法将其合并到主线程当中,事实上,我们也无法从中获取任何的返回结果。
要想从线程中获取运行的结果,我们需要通过某种方式将其写入到某个可供公共访问的存储空间当中。
另一方面,当使用线程实现生产者消费者模式时,同样我们需要将数据存储到一个公用的存储空间当中。
一种比较野的路子就是使用全局变量,但是这种方式并不够优雅,而且在高并发的情况下不排除可能会存在什么隐患,更为优雅的方式是使用python内建库中的队列方式进行实现。
下面,我们给出一个典型的例子如下:
import threading
from queue import Queue
def job(q_in, q_out):
while not q_in.empty():
try:
n = q_in.get()
q_out.put(n**2)
except:
break
return
def main():
q_in = Queue()
q_out = Queue()
[q_in.put(i) for i in range(10000)]
thread_list = [threading.Thread(target=job, args=(q_in, q_out)) for i in range(5)]
[thread.start() for thread in thread_list]
[thread.join() for thread in thread_list]
ans = []
while not q_out.empty():
ans.append(q_out.get())
return ans
main()
但是,需要注意的是,由于线程是并行穿插运行的,因此,事实上我们无法保证输入的queue与输出的queue之间顺序的一一对应关系。如果对于保序有需求,我们需要通过一些其他的手段对其进行额外的保证。
最后,我们来看一下多线程中线程锁的应用方法。
如前所述,多线程常用的场景是使用多个并发的线程来执行同一个任务,从而提升代码的执行效率。但是,如果当所有的线程都需要对同一个参数进行读写时,由于线程间本身是没有通讯机制的,因此可能会导致读写冲突发生。
例如:
import threading
from time import sleep
def job():
for i in range(5):
print(i)
sleep(1)
return
def main():
n = 1
thread_list = [threading.Thread(target=job) for _ in range(5)]
[thread.start() for thread in thread_list]
[thread.join() for thread in thread_list]
return
main()
在上述代码中,由于所有的线程都共享同一片读写缓存,因此,相互的打印可能会出现串行现象,并非我们期待的现象。
故,我们需要引入锁来对资源进行保护。
我们给出线程锁的基本用法如下:
import threading
from time import sleep
def job(lock):
for i in range(10):
lock.acquire()
print(i)
lock.release()
sleep(1)
return
def main():
n = 1
lock = threading.Lock()
thread_list = [threading.Thread(target=job, args=(lock, )) for _ in range(10)]
[thread.start() for thread in thread_list]
[thread.join() for thread in thread_list]
return
main()
与多线程不同,多进程则是直接在进程方面进行并行,由于每一个进程都会去抢占一份资源,而非像是多线程一样共享一份资源,因此,对于cpu计算密集型的任务,多进程可以有效地提升执行效率,反之,多线程则往往无法提升执行效率甚至会拉低执行效率。
在python中,多进程与多线程的实现是极其相似的,只需要将内置的多线程threading替换为多进程multiprocessing库即可。
给出最基础的多进程实现代码范例如下:
import multiprocessing
def job():
print("hello world!")
return
def main():
process = multiprocessing.Process(target=job)
process.start()
process.join()
return
main()
同样的,当需要传入参数是,我们只需要通过args参数将参数进行传递即可。
import multiprocessing
def job(n):
print(sum([i**2 for i in range(1, n+1)]))
return
def main():
process = multiprocessing.Process(target=job, args=(10, ))
process.start()
process.join()
return
main()
现在,我们来考察多进程中队列的使用。
多进程的队列使用方法和多线程也基本一致,唯一的差别在于多线程可以混用自身的Queue类以及queue库中的Queue类,但是多进程由于不共享进程资源,因此绝对不能够混用上述两种Queue类,必须要使用多进程库中自身实现的Queue类。
给出代码实例如下:
import multiprocessing
from multiprocessing import Queue
from time import sleep
def job(q_in, q_out):
while not q_in.empty():
try:
n = q_in.get()
except:
break
q_out.put(n ** 2)
return
def main():
q_in = Queue()
q_out = Queue()
for i in range(1, 11):
q_in.put(i)
process_list = [multiprocessing.Process(target=job, args=(q_in, q_out)) for i in range(5)]
[process.start() for process in process_list]
[process.join() for process in process_list]
while not q_out.empty():
print(q_out.get())
return
main()
如果错误地使用了一般queue库中的Queue类实现,则会发现上述代码根本打印不出东西,原因在于不同进程间的Queue类是不通用的,主进程的q_out中事实上一直没有被写入元素。
现在,我们来考察一下多进程中锁的使用方法。
与队列相似,多进程中锁的使用方法和多线程几乎一模一样,只需要将threading库中的Lock类替换为multiprocessing库中的Lock类即可。
给出代码实例如下:
import multiprocessing
from multiprocessing import Lock
def job(lock):
for i in range(5):
lock.acquire()
print(i)
lock.release()
return
def main():
lock = Lock()
process_list = [multiprocessing.Process(target=job, args=(lock, )) for i in range(5)]
[process.start() for process in process_list]
[process.join() for process in process_list]
return
main()
进程池是多进程程序中独有的一种概念。他的作用是自动化的对多进程所需的资源进行分配并进行进程启动。
其用法相当于先定义后启动,但是其使用逻辑为:
其代码范例如下:
import multiprocessing
def job(x):
return x**2
# 实现方式一
def multi_process_pool_test_v1():
pool = multiprocessing.Pool(processes = 2) # 分配两个核的资源
ans = pool.map(job, range(10))
print(ans)
# 实现方式二
def multi_process_pool_test_v2():
pool = multiprocessing.Pool(processes = 2) # 分配两个核的资源
multi_p = [pool.apply_async(job, (i,)) for i in range(10)]
ans = [it.get() for it in multi_p]
print(ans)
multi_process_pool_test_v1()
multi_process_pool_test_v2()