前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >13 | Tornado源码分析:BaseIOStream 对象(下)

13 | Tornado源码分析:BaseIOStream 对象(下)

作者头像
python编程从入门到实践
发布2020-09-24 16:03:09
7000
发布2020-09-24 16:03:09
举报
文章被收录于专栏:python编程军火库

hello 大家好 上期我们已经介绍了 tornado.iostream 模块,也整理了核心代码,不知大家是否理解其中的运作原理,本期我们对这部分的源码进行批注并进行总结。 # -*- encoding: utf-8 -*- # !/usr/bin/python """ @File : __init__.py.py @Time : 2020/09/13 15:24 @Author : haishiniu @Software: PyCharm """ import numbers import socket import sys import errno from tornado import ioloop, stack_context from tornado.concurrent import TracebackFuture from tornado.iostream import UnsatisfiableReadError, StreamBufferFullError from tornado.log import app_log, gen_log from tornado.util import errno_from_exception class BaseIOStream(object): def __init__(self, io_loop=None, max_buffer_size=None, read_chunk_size=None, max_write_buffer_size=None): self.io_loop = io_loop or ioloop.IOLoop.current() self.max_buffer_size = max_buffer_size or 104857600 # 每次<fd>.read调用最多读取的字节数 self.read_chunk_size = min(read_chunk_size or 65536,self.max_buffer_size // 2) # 读缓冲区:读缓冲区中的数据分为已经被消费 + 尚未被消费的。 self._read_buffer = bytearray() # 读指针指向第一个尚未被消费的字节。随着缓冲区中的数据被消费,读指针会右移。 # 当读指针大于缓冲区大小时,缓冲区会向右收缩,释放空间。 self._read_buffer_pos = 0 # 读缓冲区的大小(特指未被消费的那部分缓冲区的大小) self._read_buffer_size = 0 # read_bytes()方法的第一个参数 self._read_bytes = None # read callback 当读操作完成之后,会调用该回调函数 self._read_callback = None # read future 当读操作完成时,会将数据或异常信息填充到该对象中; self._read_future = None # 关注的事件 self._state = None # 异步的读取指定数量的字节。 # 如果指定了callback,那么当读取到指定数量的数据之后,会使用数据作为第一个参数调用这个回调函数; # 如果没有指定callback,则返回一个Future对象。 # 本次我们只解析 streaming_callback、partial为 默认值的情况。 def read_bytes(self, num_bytes, callback=None, streaming_callback=None, partial=False): future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._read_partial = partial self._streaming_callback = stack_context.wrap(streaming_callback) try: self._try_inline_read() except: if future is not None: future.add_done_callback(lambda f: f.exception()) raise return future # 如果callback为None,则返回一个Future对象,当读操作完成时,会将数据或异常信息填充到该对象中; # 否则,将其设置为read callback,当读操作完成之后,会调用该回调函数。 def _set_read_callback(self, callback): # 如果 read callback 和 read future 都不为None,说明已经有一个读操作正在执行,抛出错误。 assert self._read_callback is None, "Already reading" assert self._read_future is None, "Already reading" if callback is not None: self._read_callback = stack_context.wrap(callback) else: self._read_future = TracebackFuture() return self._read_future # 尝试从读缓冲区,完成当前的读操作。 # 如果读操作能够被满足,则在下一次IOLoop迭代时,执行 read callback;否则,将文件描述符添加到 IOLoop上,并且关注其上的读操作。 def _try_inline_read(self): self._run_streaming_callback() # 1,尝试从读缓冲区完成当前挂起的读操作: pos = self._find_read_pos() if pos is not None: # 1.1,如果可以完成,那么从 read buffer 读取数据之后,使用数据调用 read callback,或 填充read future self._read_from_buffer(pos) return self._check_closed() try: # 2.尝试从fd上读取能够完成当前读操作的数据 pos = self._read_to_buffer_loop() except Exception: self._maybe_run_close_callback() raise # 2.1 如果能够从fd上读取到能完成当前读操作的数据,那么从read buffer读取数据 if pos is not None: self._read_from_buffer(pos) return # 3,否则将fd添加到IOLoop,并关注其上的读操作 if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ) # 尝试从读缓冲区中找到满足 # 当前挂起的读请求的位置,如果当前读请求能够被满足,则返回这个位置,否则返回None。比如,当前的读请求是read_bytes(num_bytes, partial=False),那么当读缓冲区中有 # 大于等于 num_bytes 个字节时,则返回num_bytes 否则,返回None def _find_read_pos(self): if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))): num_bytes = min(self._read_bytes, self._read_buffer_size) return num_bytes return None # 从读缓冲区中,完成当前挂起的读请求。 def _read_from_buffer(self, pos): self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False self._run_read_callback(pos, False) def _run_read_callback(self, size, streaming): if 1: pass else: callback = self._read_callback self._read_callback = self._streaming_callback = None if self._read_future is not None: assert callback is None # 如果没有设置read callback,则将数据保存到read future future = self._read_future self._read_future = None future.set_result(self._consume(size)) if callback is not None: assert (self._read_future is None) or streaming # 如果设置了read callback,那么则在下一次IOLoop迭代时,调度它 self._run_callback(callback, self._consume(size)) else: self._maybe_add_error_listener() def _run_callback(self, callback, *args): def wrapper(): self._pending_callbacks -= 1 try: return callback(*args) except Exception: raise finally: self._maybe_add_error_listener() with stack_context.NullContext(): self._pending_callbacks += 1 self.io_loop.add_callback(wrapper) # 该方法用于检测:一个没有活跃读、写请求的连接是否被关闭,为此,必须监听读事件。然而,在连接刚建立的时候,执行这个操作是无用的, # 因为这种情况下,我们会立刻进行读写。在IOStream中,很多地方都插入了这个检查,当连接空闲时,那么就监听其上的读事件。 def _maybe_add_error_listener(self): if self._pending_callbacks != 0: return if self._state is None or self._state == ioloop.IOLoop.ERROR: if self.closed(): self._maybe_run_close_callback() elif (self._read_buffer_size == 0 and self._close_callback is not None): self._add_io_state(ioloop.IOLoop.READ) # 从 read buffer上读取loc个字节,并返回。 def _consume(self, loc): if loc == 0: return b"" assert loc <= self._read_buffer_size # 这里用到了memoryview,memoryview为支持buffer protocol的对象,提供了基于字节的访问接口。 # 使用memoryview不会发生内存拷贝。 b = (memoryview(self._read_buffer) [self._read_buffer_pos:self._read_buffer_pos + loc] ).tobytes() # 移动读指针 和 修改缓冲区大小 self._read_buffer_pos += loc self._read_buffer_size -= loc # 当 读指针 大于 缓冲区大小 的时候,会对缓冲区进行收缩: # 1,删除已经被消费内容的缓冲区 # 2,将读指针归零 if self._read_buffer_pos > self._read_buffer_size: del self._read_buffer[:self._read_buffer_pos] self._read_buffer_pos = 0 return b # 向事件处理函数添加`state` def _add_io_state(self, state): if self.closed(): return if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler( self.fileno(), self._handle_events, self._state) elif not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.fileno(), self._state) # 尝试从 fd上 读取期望数量的字节, # 如果读取到的数据,能够满足当前的读操作,则返回位置;否则返回None。 def _read_to_buffer_loop(self): try: if self._read_bytes is not None: target_bytes = self._read_bytes else: target_bytes = 0 next_find_pos = 0 self._pending_callbacks += 1 while not self.closed(): # 如果fd的输入缓冲区为空,则退出 if self._read_to_buffer() == 0: break self._run_streaming_callback() if (target_bytes is not None and self._read_buffer_size >= target_bytes): break if self._read_buffer_size >= next_find_pos: pos = self._find_read_pos() if pos is not None: return pos next_find_pos = self._read_buffer_size * 2 return self._find_read_pos() finally: self._pending_callbacks -= 1 def _handle_events(self, fd, events): if self.closed(): gen_log.warning("Got events for closed stream %s", fd) return try: if events & self.io_loop.READ: self._handle_read() # 在处理完读写事件之后,会更改关注的事件。 # 如果当前没有读操作,那么则不再关注读事件,否则关注; # 如果当前没有写操作,那么则不再关注写事件,否则关注; # 如果当前连接空闲(也就是没有读写操作,并且读缓冲区为空), # 则关注读操作(目的是检测连接是否断开),否则不关注; # 最后,更新关注的事件 state = self.io_loop.ERROR if self.reading(): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE if state == self.io_loop.ERROR and self._read_buffer_size == 0: state |= self.io_loop.READ if state != self._state: assert self._state is not None, \ "shouldn't happen: _handle_events without self._state" self._state = state self.io_loop.update_handler(self.fileno(), self._state) except UnsatisfiableReadError as e: gen_log.info("Unsatisfiable read, closing connection: %s" % e) self.close(exc_info=True) except Exception: gen_log.error("Uncaught exception, closing connection.", exc_info=True) self.close(exc_info=True) raise # 从fd上读取数据,并追加到读缓冲区的末尾。 # 该方法返回实际读取的自节数。如果没有读到任何数据,则返回0。 # 出错时,关闭fd,并抛出异常。 def _read_to_buffer(self): while True: try: chunk = self.read_from_fd() except (socket.error, IOError, OSError) as e: if errno_from_exception(e) == errno.EINTR: continue if self._is_connreset(e): self.close(exc_info=True) return self.close(exc_info=True) raise break if chunk is None: return 0 self._read_buffer += chunk self._read_buffer_size += len(chunk) if self._read_buffer_size > self.max_buffer_size: gen_log.error("Reached maximum read buffer size") self.close() raise StreamBufferFullError("Reached maximum read buffer size") return len(chunk)

好的,以上就是本期分享的源码批注,我们再来简单总结一下:

read_bytes(num_bytes)的大致执行流程是:

1. 如果read buffer有num_bytes个字节,goto 步骤4

2. 从fd的缓冲区读数据到read buffer,一直到:

读到了期望数量的数据,goto 步骤4

或者,fd的缓冲区被读空

3. 将fd 注册到IOLoop,并关注其上的读事件,每次IOLoop迭代的时候,如果fd上有读事件发生,则执行步骤2

4. 从read buffer上消费num_bytes个字节,

若设置了 read callback ,则在下一次IOLoop迭代的时候,执行callback

否则,将数据填充到 read future

好的,本期我们给出这部分核心代码的批注,希望以上内容对你有所帮助,感谢大家的支持,谢谢!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 python编程从入门到实践 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档