关键字 | 说明 |
---|---|
event_loop | 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数 |
coroutine | 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 |
task | 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态 |
future | 代表将来执行或没有执行的任务的结果,它和task上没有本质上的区别 |
async/await | python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口 |
定义一个协程
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> time
<span class="hljs-comment"># 通过async关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中</span>
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(x)</span>:</span>
print(<span class="hljs-string">"waiting:%d"</span>%x)
time.sleep(x)
print(<span class="hljs-string">"结束run"</span>)
<span class="hljs-comment">#得到一个协程对象</span>
coroutine = run(<span class="hljs-number">2</span>)
<span class="hljs-comment"># 创建一个消息循环</span>
<span class="hljs-comment"># 注意:真实是在<a href="https://www.xgss.net/tag/asyncio%e6%a8%a1%e5%9d%97" title="查看“asyncio模块”所有文章" target="_blank">asyncio模块</a>中获取一个引用</span>
loop = asyncio.get_event_loop()
<span class="hljs-comment">#将协程对象加入到消息循环</span>
loop.run_until_complete(coroutine)
创建一个任务
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(x)</span>:</span>
print(<span class="hljs-string">"waiting:%d"</span>%x)
time.sleep(x)
print(<span class="hljs-string">"结束run"</span>)
coroutine = run(<span class="hljs-number">2</span>)
<span class="hljs-comment">#创建任务</span>
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
<span class="hljs-comment"># 协程对象加入到消息循环中,协程对象不能直接运行的,在注册消息循环时run_until_complete方法将加入的协程对象包装成一个任务,task对象时Future类的子类对象,保存协程运行后的状态,用于未来获取协程的结果</span>
<span class="hljs-comment"># loop.run_until_complete(coroutine)</span>
<span class="hljs-comment"># 将任务加入到消息循环</span>
loop.run_until_complete(task)
绑定回调
回调:不需要手动调用,触发某种条件才会调用
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-comment"># 向百度要数据,网络IO</span>
asyncio.sleep(<span class="hljs-number">5</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
print(<span class="hljs-string">"给你数据"</span>)
<span class="hljs-keyword">return</span> data
<span class="hljs-comment"># 定义一个回调函数(不需要手动调用,触发某种条件才会调用)</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
print(<span class="hljs-string">"call_back:"</span>, future.result())
coroutine = run(<span class="hljs-string">"百度"</span>)
<span class="hljs-comment"># 创建一个任务对象</span>
task = asyncio.ensure_future(coroutine)
<span class="hljs-comment"># 给任务添加回调,在任务结束后调用回调函数</span>
task.add_done_callback(call_back)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(<span class="hljs-string">"-------main------"</span>)
<span class="hljs-keyword">while</span> <span class="hljs-number">1</span>:
time.sleep(<span class="hljs-number">2</span>)
阻塞和await
async可以定义协程,使用await可以针对耗时操作进行挂起,就与生成器的yield一样,函数交出控制权。协程遇到await,消息循环会挂起该协程,执行别的协程,直到其他协程也会挂起或者执行完毕,在进行下一次执行
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-comment"># 向百度要数据,网络IO</span>
<span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">5</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
print(<span class="hljs-string">"给你数据"</span>)
<span class="hljs-keyword">return</span> data
<span class="hljs-comment"># 定义一个回调函数(不需要手动调用,触发某种条件才会调用)</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
print(<span class="hljs-string">"call_back:"</span>, future.result())
coroutine = run(<span class="hljs-string">"百度"</span>)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(<span class="hljs-string">"-------main------"</span>)
同步
同时请求"百度", “阿里”, “腾讯”, "新浪"四个网站,假设响应时长均为2秒
<span class="hljs-keyword">import</span> time
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-comment"># 向百度要数据,网络IO</span>
time.sleep(<span class="hljs-number">2</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
<span class="hljs-keyword">return</span> data
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
t1 = time.time()
<span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
print(run(url))
t2 = time.time()
print(<span class="hljs-string">"总耗时:%.2f"</span>%(t2-t1))
异步
同时请求"百度", “阿里”, “腾讯”, "新浪"四个网站,假设响应时长均为2秒
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
<span class="hljs-keyword">return</span> data
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
print(<span class="hljs-string">"call_back:"</span>, future.result())
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
loop = asyncio.get_event_loop()
tasks = []
t1 = time.time()
<span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
<span class="hljs-comment">#同时添加4个异步任务</span>
loop.run_until_complete(asyncio.gather(*tasks))
t2 = time.time()
print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
<span class="hljs-keyword">return</span> data
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
print(<span class="hljs-string">"call_back:"</span>, future.result())
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">main</span><span class="hljs-params">()</span>:</span>
tasks = []
<span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
<span class="hljs-comment"># task.add_done_callback(call_back)</span>
tasks.append(task)
<span class="hljs-comment"># #1、可以没有回调函数</span>
<span class="hljs-comment"># dones, pendings = await asyncio.wait(tasks)</span>
<span class="hljs-comment"># #处理数据,类似回调,建议使用回调</span>
<span class="hljs-comment"># for t in dones:</span>
<span class="hljs-comment"># print("数据:%s"%(t.result()))</span>
<span class="hljs-comment"># #2、可以没有回调函数</span>
<span class="hljs-comment"># results = await asyncio.gather(*tasks)</span>
<span class="hljs-comment"># # 处理数据,类似回调,建议使用回调</span>
<span class="hljs-comment"># for result in results:</span>
<span class="hljs-comment"># print("数据:%s"%(result))</span>
<span class="hljs-comment"># 3、</span>
<span class="hljs-comment"># return await asyncio.wait(tasks)</span>
<span class="hljs-comment"># 4、</span>
<span class="hljs-comment"># return await asyncio.gather(*tasks)</span>
<span class="hljs-comment"># 5、</span>
<span class="hljs-comment"># for t in asyncio.as_completed(tasks):</span>
<span class="hljs-comment"># await t</span>
<span class="hljs-comment"># 6、</span>
<span class="hljs-keyword">for</span> t <span class="hljs-keyword">in</span> asyncio.as_completed(tasks):
<span class="hljs-comment">#可以没有回调</span>
result = <span class="hljs-keyword">await</span> t
print(<span class="hljs-string">"数据:%s"</span>%(result))
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
loop = asyncio.get_event_loop()
t1 = time.time()
<span class="hljs-comment">#1、</span>
<span class="hljs-comment"># loop.run_until_complete(main())</span>
<span class="hljs-comment">#2、</span>
<span class="hljs-comment"># loop.run_until_complete(main())</span>
<span class="hljs-comment"># # 3、</span>
<span class="hljs-comment"># dones, pendings = loop.run_until_complete(main())</span>
<span class="hljs-comment"># #处理数据,类似回调,建议使用回调</span>
<span class="hljs-comment"># for t in dones:</span>
<span class="hljs-comment"># print("数据:%s"%(t.result()))</span>
<span class="hljs-comment"># 4、</span>
<span class="hljs-comment"># results = loop.run_until_complete(main())</span>
<span class="hljs-comment"># for result in results:</span>
<span class="hljs-comment"># print("数据:%s"%(result))</span>
<span class="hljs-comment"># 5、</span>
<span class="hljs-comment"># loop.run_until_complete(main())</span>
<span class="hljs-comment"># 6、</span>
loop.run_until_complete(main())
t2 = time.time()
print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))
整理协程嵌套
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
<span class="hljs-keyword">return</span> data
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
print(<span class="hljs-string">"call_back:"</span>, future.result())
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">main</span><span class="hljs-params">()</span>:</span>
tasks = []
<span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
<span class="hljs-keyword">await</span> asyncio.wait(tasks)
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
loop = asyncio.get_event_loop()
t1 = time.time()
loop.run_until_complete(main())
t2 = time.time()
print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> threading
<span class="hljs-keyword">import</span> time
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
time.sleep(<span class="hljs-number">2</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
print(<span class="hljs-string">"结束请求"</span>)
<span class="hljs-keyword">return</span> data
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start_loop</span><span class="hljs-params">(loop)</span>:</span>
<span class="hljs-comment"># 启动消息循环</span>
asyncio.set_event_loop(loop)
loop.run_forever()
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
<span class="hljs-comment"># 创建消息循环(类似死循环)</span>
<span class="hljs-comment"># 注意:此时消息循环没有启动</span>
loop = asyncio.get_event_loop()
threading.Thread(target=start_loop, args=(loop,)).start()
t1 = time.time()
<span class="hljs-comment"># 给消息循环添加任务</span>
<span class="hljs-comment"># loop.run_until_complete(create_tasks())</span>
loop.call_soon_threadsafe(run, <span class="hljs-string">"百度"</span>)
loop.call_soon_threadsafe(run, <span class="hljs-string">"腾讯"</span>)
t2 = time.time()
print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))
使用到协程嵌套与消息循环在另一个线程中启动相关联
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> threading
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
<span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
data = <span class="hljs-string">"'%s'的数据"</span>%(url)
print(<span class="hljs-string">"结束请求"</span>)
<span class="hljs-keyword">return</span> data
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
print(<span class="hljs-string">"call_back:"</span>, future.result())
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">create_tasks</span><span class="hljs-params">()</span>:</span>
tasks = []
<span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
<span class="hljs-keyword">await</span> asyncio.wait(tasks)
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start_loop</span><span class="hljs-params">(loop)</span>:</span>
<span class="hljs-comment"># 启动消息循环</span>
asyncio.set_event_loop(loop)
loop.run_forever()
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
<span class="hljs-comment"># 创建消息循环(类似死循环)</span>
<span class="hljs-comment"># 注意:此时消息循环没有启动</span>
loop = asyncio.get_event_loop()
threading.Thread(target=start_loop, args=(loop,)).start()
<span class="hljs-comment">#给消息循环添加任务</span>
asyncio.run_coroutine_threadsafe(create_tasks(), loop)
<span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("百度"), loop)</span>
<span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("腾讯"), loop)</span>
<span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("阿里"), loop)</span>
<span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("新浪"), loop)</span>
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> threading
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
print(<span class="hljs-string">"开始加载'%s'页面……"</span> % (url))
<span class="hljs-comment"># 发起链接,耗时IO</span>
connet = asyncio.open_connection(url, <span class="hljs-number">80</span>)
reader, writer = <span class="hljs-keyword">await</span> connet
<span class="hljs-comment"># 链接成功</span>
<span class="hljs-comment"># 发起请求,耗时IO</span>
header = <span class="hljs-string">"GET / HTTP/1.0\r\nHost: %s\r\n\r\n"</span>%(url)
writer.write(header.encode(<span class="hljs-string">"utf-8"</span>))
<span class="hljs-keyword">await</span> writer.drain()
<span class="hljs-comment">#接收数据</span>
<span class="hljs-keyword">with</span> open(url + <span class="hljs-string">".html"</span>, <span class="hljs-string">"wb"</span>) <span class="hljs-keyword">as</span> fp:
<span class="hljs-keyword">while</span> <span class="hljs-keyword">True</span>:
line = <span class="hljs-keyword">await</span> reader.readline()
<span class="hljs-keyword">if</span> line == <span class="hljs-string">b"\r\n"</span>:
<span class="hljs-keyword">break</span>
<span class="hljs-keyword">else</span>:
fp.write(line)
fp.flush()
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">create_tasks</span><span class="hljs-params">()</span>:</span>
tasks = []
<span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"www.baidu.com"</span>, <span class="hljs-string">"www.zutuanxue.com"</span>, <span class="hljs-string">"www.sina.com.cn"</span>]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
tasks.append(task)
<span class="hljs-keyword">await</span> asyncio.wait(tasks)
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start_loop</span><span class="hljs-params">(loop)</span>:</span>
asyncio.set_event_loop(loop)
loop.run_forever()
<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
loop = asyncio.get_event_loop()
threading.Thread(target=start_loop, args=(loop,)).start()
asyncio.run_coroutine_threadsafe(create_tasks(), loop)