前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >asyncio模块

asyncio模块

作者头像
星哥玩云
发布2022-09-08 13:39:38
6090
发布2022-09-08 13:39:38
举报
文章被收录于专栏:开源部署

一、概述

  • asyncio模块 是python3.4版本引入的标准库,直接内置了对异步IO的操作
  • 编程模式 是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO
  • 说明 到目前为止实现协程的不仅仅只有asyncio,tornado和gevent都实现了类似功能
  • 关键字的说明

关键字

说明

event_loop

消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数

coroutine

协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用

task

任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态

future

代表将来执行或没有执行的任务的结果,它和task上没有本质上的区别

async/await

python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口

二、asyncio基本使用

定义一个协程

代码语言:javascript
复制
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> time

<span class="hljs-comment"># 通过async关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中</span>
<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(x)</span>:</span>
    print(<span class="hljs-string">"waiting:%d"</span>%x)
    time.sleep(x)
    print(<span class="hljs-string">"结束run"</span>)

<span class="hljs-comment">#得到一个协程对象</span>
coroutine = run(<span class="hljs-number">2</span>)

<span class="hljs-comment"># 创建一个消息循环</span>
<span class="hljs-comment"># 注意:真实是在<a href="https://www.xgss.net/tag/asyncio%e6%a8%a1%e5%9d%97" title="查看“asyncio模块”所有文章" target="_blank">asyncio模块</a>中获取一个引用</span>
loop = asyncio.get_event_loop()

<span class="hljs-comment">#将协程对象加入到消息循环</span>
loop.run_until_complete(coroutine)

创建一个任务

代码语言:javascript
复制
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> time

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(x)</span>:</span>
    print(<span class="hljs-string">"waiting:%d"</span>%x)
    time.sleep(x)
    print(<span class="hljs-string">"结束run"</span>)

coroutine = run(<span class="hljs-number">2</span>)
<span class="hljs-comment">#创建任务</span>
task = asyncio.ensure_future(coroutine)

loop = asyncio.get_event_loop()
<span class="hljs-comment"># 协程对象加入到消息循环中,协程对象不能直接运行的,在注册消息循环时run_until_complete方法将加入的协程对象包装成一个任务,task对象时Future类的子类对象,保存协程运行后的状态,用于未来获取协程的结果</span>
<span class="hljs-comment"># loop.run_until_complete(coroutine)</span>

<span class="hljs-comment"># 将任务加入到消息循环</span>
loop.run_until_complete(task)

绑定回调

回调:不需要手动调用,触发某种条件才会调用

代码语言:javascript
复制
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-comment"># 向百度要数据,网络IO</span>
    asyncio.sleep(<span class="hljs-number">5</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    print(<span class="hljs-string">"给你数据"</span>)
    <span class="hljs-keyword">return</span> data

<span class="hljs-comment"># 定义一个回调函数(不需要手动调用,触发某种条件才会调用)</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
    print(<span class="hljs-string">"call_back:"</span>, future.result())

coroutine = run(<span class="hljs-string">"百度"</span>)
<span class="hljs-comment"># 创建一个任务对象</span>
task = asyncio.ensure_future(coroutine)

<span class="hljs-comment"># 给任务添加回调,在任务结束后调用回调函数</span>
task.add_done_callback(call_back)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)

print(<span class="hljs-string">"-------main------"</span>)
<span class="hljs-keyword">while</span> <span class="hljs-number">1</span>:
    time.sleep(<span class="hljs-number">2</span>)

阻塞和await

async可以定义协程,使用await可以针对耗时操作进行挂起,就与生成器的yield一样,函数交出控制权。协程遇到await,消息循环会挂起该协程,执行别的协程,直到其他协程也会挂起或者执行完毕,在进行下一次执行

代码语言:javascript
复制
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-comment"># 向百度要数据,网络IO</span>
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">5</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    print(<span class="hljs-string">"给你数据"</span>)
    <span class="hljs-keyword">return</span> data

<span class="hljs-comment"># 定义一个回调函数(不需要手动调用,触发某种条件才会调用)</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
    print(<span class="hljs-string">"call_back:"</span>, future.result())

coroutine = run(<span class="hljs-string">"百度"</span>)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)

print(<span class="hljs-string">"-------main------"</span>)

三、多任务

同步

同时请求"百度", “阿里”, “腾讯”, "新浪"四个网站,假设响应时长均为2秒

代码语言:javascript
复制
<span class="hljs-keyword">import</span> time

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-comment"># 向百度要数据,网络IO</span>
    time.sleep(<span class="hljs-number">2</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    <span class="hljs-keyword">return</span> data

<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    t1 = time.time()
    <span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
        print(run(url))
    t2 = time.time()
    print(<span class="hljs-string">"总耗时:%.2f"</span>%(t2-t1))

异步

同时请求"百度", “阿里”, “腾讯”, "新浪"四个网站,假设响应时长均为2秒

代码语言:javascript
复制
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    <span class="hljs-keyword">return</span> data

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
    print(<span class="hljs-string">"call_back:"</span>, future.result())

<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    loop = asyncio.get_event_loop()
    tasks = []
    t1 = time.time()
    
    <span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
        coroutine = run(url)
        task = asyncio.ensure_future(coroutine)
        task.add_done_callback(call_back)
        tasks.append(task)
        
    <span class="hljs-comment">#同时添加4个异步任务</span>
    loop.run_until_complete(asyncio.gather(*tasks))
    
    t2 = time.time()
    print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))

四、协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来

代码语言:javascript
复制
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    <span class="hljs-keyword">return</span> data

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
    print(<span class="hljs-string">"call_back:"</span>, future.result())

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">main</span><span class="hljs-params">()</span>:</span>
    tasks = []
    <span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
        coroutine = run(url)
        task = asyncio.ensure_future(coroutine)
        <span class="hljs-comment"># task.add_done_callback(call_back)</span>
        tasks.append(task)

    <span class="hljs-comment"># #1、可以没有回调函数</span>
    <span class="hljs-comment"># dones, pendings = await asyncio.wait(tasks)</span>
    <span class="hljs-comment"># #处理数据,类似回调,建议使用回调</span>
    <span class="hljs-comment"># for t in dones:</span>
    <span class="hljs-comment">#     print("数据:%s"%(t.result()))</span>

    <span class="hljs-comment"># #2、可以没有回调函数</span>
    <span class="hljs-comment"># results = await asyncio.gather(*tasks)</span>
    <span class="hljs-comment"># # 处理数据,类似回调,建议使用回调</span>
    <span class="hljs-comment"># for result in results:</span>
    <span class="hljs-comment">#     print("数据:%s"%(result))</span>


    <span class="hljs-comment"># 3、</span>
    <span class="hljs-comment"># return await asyncio.wait(tasks)</span>


    <span class="hljs-comment"># 4、</span>
    <span class="hljs-comment"># return await asyncio.gather(*tasks)</span>


    <span class="hljs-comment"># 5、</span>
    <span class="hljs-comment"># for t in asyncio.as_completed(tasks):</span>
    <span class="hljs-comment">#     await t</span>

    <span class="hljs-comment"># 6、</span>
    <span class="hljs-keyword">for</span> t <span class="hljs-keyword">in</span> asyncio.as_completed(tasks):
        <span class="hljs-comment">#可以没有回调</span>
        result = <span class="hljs-keyword">await</span> t
        print(<span class="hljs-string">"数据:%s"</span>%(result))


<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    loop = asyncio.get_event_loop()

    t1 = time.time()
    <span class="hljs-comment">#1、</span>
    <span class="hljs-comment"># loop.run_until_complete(main())</span>

    <span class="hljs-comment">#2、</span>
    <span class="hljs-comment"># loop.run_until_complete(main())</span>

    <span class="hljs-comment"># # 3、</span>
    <span class="hljs-comment"># dones, pendings = loop.run_until_complete(main())</span>
    <span class="hljs-comment"># #处理数据,类似回调,建议使用回调</span>
    <span class="hljs-comment"># for t in dones:</span>
    <span class="hljs-comment">#     print("数据:%s"%(t.result()))</span>

    <span class="hljs-comment"># 4、</span>
    <span class="hljs-comment"># results = loop.run_until_complete(main())</span>
    <span class="hljs-comment"># for result in results:</span>
    <span class="hljs-comment">#     print("数据:%s"%(result))</span>

    <span class="hljs-comment"># 5、</span>
    <span class="hljs-comment"># loop.run_until_complete(main())</span>

    <span class="hljs-comment"># 6、</span>
    loop.run_until_complete(main())

    t2 = time.time()
    print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))

整理协程嵌套

代码语言:javascript
复制
<span class="hljs-keyword">import</span> time
<span class="hljs-keyword">import</span> asyncio

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    <span class="hljs-keyword">return</span> data

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
    print(<span class="hljs-string">"call_back:"</span>, future.result())

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">main</span><span class="hljs-params">()</span>:</span>
    tasks = []
    <span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
        coroutine = run(url)
        task = asyncio.ensure_future(coroutine)
        task.add_done_callback(call_back)
        tasks.append(task)

    <span class="hljs-keyword">await</span> asyncio.wait(tasks)

<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    loop = asyncio.get_event_loop()
    t1 = time.time()
    loop.run_until_complete(main())
    t2 = time.time()
    print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))

五、消息循环在另一个线程中启动

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block

代码语言:javascript
复制
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> threading
<span class="hljs-keyword">import</span> time

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    time.sleep(<span class="hljs-number">2</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    print(<span class="hljs-string">"结束请求"</span>)
    <span class="hljs-keyword">return</span> data

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start_loop</span><span class="hljs-params">(loop)</span>:</span>
    <span class="hljs-comment"># 启动消息循环</span>
    asyncio.set_event_loop(loop)
    loop.run_forever()

<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    <span class="hljs-comment"># 创建消息循环(类似死循环)</span>
    <span class="hljs-comment"># 注意:此时消息循环没有启动</span>
    loop = asyncio.get_event_loop()
    threading.Thread(target=start_loop, args=(loop,)).start()
    
    t1 = time.time()
    <span class="hljs-comment"># 给消息循环添加任务</span>
    <span class="hljs-comment"># loop.run_until_complete(create_tasks())</span>
    loop.call_soon_threadsafe(run, <span class="hljs-string">"百度"</span>)
    loop.call_soon_threadsafe(run, <span class="hljs-string">"腾讯"</span>)
    t2 = time.time()
    print(<span class="hljs-string">"总耗时:%.2f"</span> % (t2 - t1))

六、asyncio终极使用

使用到协程嵌套与消息循环在另一个线程中启动相关联

代码语言:javascript
复制
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> threading

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始向'%s'要数据……"</span>%(url))
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">2</span>)
    data = <span class="hljs-string">"'%s'的数据"</span>%(url)
    print(<span class="hljs-string">"结束请求"</span>)
    <span class="hljs-keyword">return</span> data

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">call_back</span><span class="hljs-params">(future)</span>:</span>
    print(<span class="hljs-string">"call_back:"</span>, future.result())

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">create_tasks</span><span class="hljs-params">()</span>:</span>
    tasks = []
    <span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"百度"</span>, <span class="hljs-string">"阿里"</span>, <span class="hljs-string">"腾讯"</span>, <span class="hljs-string">"新浪"</span>]:
        coroutine = run(url)
        task = asyncio.ensure_future(coroutine)
        task.add_done_callback(call_back)
        tasks.append(task)
    <span class="hljs-keyword">await</span> asyncio.wait(tasks)

<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start_loop</span><span class="hljs-params">(loop)</span>:</span>
    <span class="hljs-comment"># 启动消息循环</span>
    asyncio.set_event_loop(loop)
    loop.run_forever()

<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    <span class="hljs-comment"># 创建消息循环(类似死循环)</span>
    <span class="hljs-comment"># 注意:此时消息循环没有启动</span>
    loop = asyncio.get_event_loop()
    threading.Thread(target=start_loop, args=(loop,)).start()

    <span class="hljs-comment">#给消息循环添加任务</span>
    asyncio.run_coroutine_threadsafe(create_tasks(), loop)

    <span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("百度"), loop)</span>
    <span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("腾讯"), loop)</span>
    <span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("阿里"), loop)</span>
    <span class="hljs-comment"># asyncio.run_coroutine_threadsafe(run("新浪"), loop)</span>

七、获取网页信息

代码语言:javascript
复制
<span class="hljs-keyword">import</span> asyncio
<span class="hljs-keyword">import</span> threading

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">run</span><span class="hljs-params">(url)</span>:</span>
    print(<span class="hljs-string">"开始加载'%s'页面……"</span> % (url))

    <span class="hljs-comment"># 发起链接,耗时IO</span>
    connet = asyncio.open_connection(url, <span class="hljs-number">80</span>)
    reader, writer = <span class="hljs-keyword">await</span> connet

    <span class="hljs-comment"># 链接成功</span>
    <span class="hljs-comment"># 发起请求,耗时IO</span>
    header = <span class="hljs-string">"GET / HTTP/1.0\r\nHost: %s\r\n\r\n"</span>%(url)
    writer.write(header.encode(<span class="hljs-string">"utf-8"</span>))
    <span class="hljs-keyword">await</span> writer.drain()

    <span class="hljs-comment">#接收数据</span>
    <span class="hljs-keyword">with</span> open(url + <span class="hljs-string">".html"</span>, <span class="hljs-string">"wb"</span>) <span class="hljs-keyword">as</span> fp:
        <span class="hljs-keyword">while</span> <span class="hljs-keyword">True</span>:
            line = <span class="hljs-keyword">await</span> reader.readline()
            <span class="hljs-keyword">if</span> line == <span class="hljs-string">b"\r\n"</span>:
                <span class="hljs-keyword">break</span>
            <span class="hljs-keyword">else</span>:
                fp.write(line)
                fp.flush()

<span class="hljs-keyword">async</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">create_tasks</span><span class="hljs-params">()</span>:</span>
    tasks = []
    <span class="hljs-keyword">for</span> url <span class="hljs-keyword">in</span> [<span class="hljs-string">"www.baidu.com"</span>, <span class="hljs-string">"www.zutuanxue.com"</span>, <span class="hljs-string">"www.sina.com.cn"</span>]:
        coroutine = run(url)
        task = asyncio.ensure_future(coroutine)
        tasks.append(task)
    <span class="hljs-keyword">await</span> asyncio.wait(tasks)


<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">start_loop</span><span class="hljs-params">(loop)</span>:</span>
    asyncio.set_event_loop(loop)
    loop.run_forever()

<span class="hljs-keyword">if</span> __name__ == <span class="hljs-string">"__main__"</span>:
    loop = asyncio.get_event_loop()
    threading.Thread(target=start_loop, args=(loop,)).start()
    asyncio.run_coroutine_threadsafe(create_tasks(), loop)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、asyncio基本使用
  • 三、多任务
  • 四、协程嵌套
  • 五、消息循环在另一个线程中启动
  • 六、asyncio终极使用
  • 七、获取网页信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档