可能由于公众号内markdown可能会出现排版错误,可以在有道云查看:http://note.youdao.com/noteshare?id=4d32e4861ed17ef6ce51c1bf8215ed91&sub=4FD7718719614C3BA2C1648BAB33E918
线程使用threading模块 p = threading.Thread(target=func,args=(),kwargs={})
p.start()
示例:
import threadingimport time
def a():
print("线程开始运行")
time.sleep(5)
print("线程结束")
if __name__ == '__main__':
p = threading.Thread(target=a)
p.start()
print("主线程")
上述代码结果:
线程开始运行 主线程 线程结束
前面两行是立刻产生,最后一行输出束必须等待5秒后输出
虽然,由于python中GIL锁的存在,一个进程在同一时刻最多只能运行一个线程。
简介: Global Interpreter Lock,可以称之为全局解释器锁 ,GIL只是在主流版本的python中存在(也就是Cpython),而其他语言实现的python中(如:Jython),多半是没有这个限制的
产生原因: python中GIL锁是一个非常霸道的存在,为了实现不同线程对于共享资源的互斥,最便捷的实现方法就是加一把大锁(每次只有一个线程能够得到解释执行)。
优缺点: 初看上去,这样的保护机制粒度太大了(其实刚开始的时候电脑都是单核的,所以不存在问题)。由于多核电脑的出现,这时候GIL才显示出了它的劣势,GIL保护了共享资源,但是也使得多核cpu无法发挥最大的效率(哪怕在多个核上都开了线程,但是最终也只有一个线程得到了解释器)。
改进: 也有过尝试对它进行改进,GIL的保护机制粒度太大,实际上只需要对共享资源进行保护即可。于是,也有过这样的改进,但是实际使用过程中,这种方式的效率却比GIL的效率低很多,由于粒度小,频繁的加锁和解锁,造成了性能的低下。
进程切换: 在GIL中,面有一个非常好的设置,那就是当一个线程进入阻塞状态的时候,会释放当前的线程去执行其他的线程。(这个具体的实现还不是很清楚,之后再补充)。另外一个,就是当进程执行完一定数量的机器码,将强行将线程挂起,释放GIL,然后有底层操作系统去决定执行哪一个线程。
参考资料(一些值得了解的文章):
Python的GIL是什么鬼,多线程性能究竟如何
有时候我们希望主线程等待子线程结束之后在结束,那么我们需要使用join()
方法
import threadingimport time
def a():
print("线程开始")
time.sleep(5)
print("线程结束")
if __name__ == '__main__':
p = threading.Thread(target=a)
p.start()
p.join()
print("主进程")
输出和上面的一样
线程开始 线程结束 主进程
但是先后顺序并不一样,第一行先输出,然后等待5秒,子线程结束,然后继续主线程运行。当然,正确的方法不能像我们这样些,直接在start后面写上join,这就和单个线程没有区别了。
正确的写法:
import threadingimport time
def a():
print("线程开始")
time.sleep(5)
print("线程结束")
if __name__ == '__main__':
thread_list = []
for i in range(5):
p = threading.Thread(target=a)
thread_list.append(p)
p.start()
for i in thread_list:
i.join()
print("主进程")
进程模块的API封装和线程基本上是一样的。
import multiprocessingimport time
def a():
print("进程开始")
time.sleep(5)
print("进程结束")
if __name__ == '__main__':
p = multiprocessing.Process(target=a)
p.start()
print("主进程")
结果也是差不多。
守护进程(线程) 设置deamon这个属性为True,那么会进入守护模式,也就是主进程结束之后,子进程或者是线程也会随之结束
p.daemon = True #默认这个属性为False
但是需要了解一下进程里面特有的东西:
p.terminate() 使进程提前终结。线程是没法直接终结的。这是故意这样设计的,正确终结的线程的方式应该是给线程传递一个信号,让线程自己在内部进行终结。
为什么线程需要上锁,因为线程可以被抢断,我们为了保证其中一些操作的原子性,我们需要上锁。下面用一个小栗子说明。
import threadinga = 0 #一个共享变量(全局变量)n = 100000 #我们希望循环的次数def add(): global a
for i in range(n):
a += 1def sub():
global a
for i in range(n):
a -= 1
if __name__ == '__main__':
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=sub) # 分别开启两个线程,并等待线程结束
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
上面的代码很简单,只是开启了两个线程,而每个线程里面做的事情也很简单,一个是对a进行n次的自加,一个是对a进行n次的自减。最后输出这个a的值,
上面的代码,输出为:0(当然,你运行起来也有很极小的概率不为0)
下面我们改动一行代码,把n = 100000
改成n = 1000000
,也就是多加了一个0
最后输出为:我也不知道(各种奇怪的数字,正的负的都有,有极小的概率为0)
为什么会出现这样的情况:
因为增量赋值并不是一个原子操作。我们至少可以将a+=1
拆分为两个步骤:
我们知道,线程是可以被抢断的,那么当进行两个操作的时候,如果正好在第一步和第二步之间进行了抢断的话,相当于进行了自加的操作,但是却没有将值赋给a,也就是无效了,当循环次数太少,正好在两个步骤中间进行切换的概率就很小了,所以当我们将循环次数不断加大时,就越有可能发生抢断。
解决方案:线程锁 我们首先初始化一个锁对象,然后再自加和自减的前后分别加上一行代码。如下:
import threadingfrom threading import Lock
lock = Lock()# 初始化一个锁对象
a = 0 #一个共享变量
n = 1000000 #我们希望循环的次数
def add():
global a
for i in range(n):
lock.acquire() # 获取锁,如果的不到,就阻塞,阻塞的时候线程会自动挂起进行切换
a += 1
lock.release() # 释放锁def sub(): global a
for i in range(n):
lock.acquire() a -= 1
lock.release()
if __name__ == '__main__':
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=sub) # 分别开启两个线程,并等待线程结束
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
频繁的加锁和解锁非常影响性能,所以尽量少的使用锁,我在测试的时候发现没有加锁的运行时间大概是加锁的十分之一
进程的使用和线程差不多,但是线程是可以直接进行使用全局变量进行通信的,而进程却不行。于是为了让进程进行通信,我们必须另外开启一个进程,来保存进程之间共同的数据。
import multiprocessingfrom multiprocessing import Managerl = Manager.list()
Manager这个模块提供了很多数据类型,使用的时候就和python里面的数据类型一样使用,只是他们是可以在进程之间共享的。
在看以下代码之前,希望你有基本的队列的知识。
由于频繁的创建和销毁线程也是一种非常大的消耗,而且我们很多时候都是重复的销毁和创建,于是我们便想出了使用线程池的方法。一开始就创建三个线程,然后线程等待执行,然后直到最后线程池被销毁(程序终结)。
import threadingimport queueimport timeclass Thread_Pool(threading.Thread):
''' 一个线程池对象
'''
def __init__(self):
''' 初始化三个线程
'''
super().__init__()
self.queue = queue.Queue()
for i in range(3):
t = threading.Thread(target=self.work, name='thread_{}'.format(i)).start() def work(self):
''' 提供一个查询线程的方法
'''
while True:
func = self.queue.get()
func()
self.queue.task_done() # 将计数器减1
def apply_async(self, func):
''' 提供一个增加线程的方法
'''
self.queue.put(func) def join(self):
'''用于阻塞线程池,当队列中的计数器不为0就阻塞'''
self.queue.join()
线程池的思路和步骤
self.work
,这是我们封装的一个接口,如果队列里面有事务(也就是函数),那么get方法可以取出func来执行,否则进入阻塞状态,直到可以取出。apply_async
,将事务(函数)添加到队列中,可以让work
取出,放入线程中运行join
其实如果你不知道队列里面的方法的话,你只需要知道这个阻塞和线程的join阻塞几乎功能相同。线程池的使用: 我们加上以下代码
def func():
print("start")
time.sleep(3)
print("end")
if __name__ == '__main__':
pool = Thread_Pool()
for i in range(10):
pool.apply_async(func)
运行结果:
startstartstartendendstartendstartstartendstartendendstartstartendstartendendend
当你运行一遍之后就会发现,不管我们加多少个事务,每次都有三个线程在运行。
进程池的话与这个类似,不再重复写了。
其实大多数情况我们都不需要去写线程池和进程池,因为和multiprocessing有自带的线程池和进程池。将上面的代码重写,如下:
from multiprocessing import Poolimport time
def func():
print("start")
time.sleep(3)
print("end")
if __name__ == '__main__':
pool = Pool()
for i in range(10):
pool.apply_async(func=func)
pool.close()
pool.join()
apply_async
来传入事务(函数)close
方法,之后我们再就不能向进程池中添加事务了join
的目的是阻塞主进程,等待子进程执行结束,否则你根本看不到输出,因为主进程结束,子进程自动被终结了。记住:join必须再close之后。线程池的话,只需要将 from multiprocessing import Pool
改成:
from multiprocessing.dummy import Pool