hello 大家好 上期我们已经介绍了 tornado.iostream 模块,也整理了核心代码,不知大家是否理解其中的运作原理,本期我们对这部分的源码进行批注并进行总结。
# -*- encoding: utf-8 -*-
# !/usr/bin/python
"""
@File : __init__.py.py
@Time : 2020/09/13 15:24
@Author : haishiniu
@Software: PyCharm
"""
import numbers
import socket
import sys
import errno
from tornado import ioloop, stack_context
from tornado.concurrent import TracebackFuture
from tornado.iostream import UnsatisfiableReadError, StreamBufferFullError
from tornado.log import app_log, gen_log
from tornado.util import errno_from_exception
class
BaseIOStream(object):
def
__init__(self, io_loop=None, max_buffer_size=None,
read_chunk_size=None, max_write_buffer_size=None):
self.io_loop = io_loop or ioloop.IOLoop.current()
self.max_buffer_size = max_buffer_size or
104857600
# 每次<fd>.read调用最多读取的字节数
self.read_chunk_size = min(read_chunk_size or
65536,self.max_buffer_size // 2)
# 读缓冲区:读缓冲区中的数据分为已经被消费 + 尚未被消费的。
self._read_buffer = bytearray()
# 读指针指向第一个尚未被消费的字节。随着缓冲区中的数据被消费,读指针会右移。
# 当读指针大于缓冲区大小时,缓冲区会向右收缩,释放空间。
self._read_buffer_pos = 0
# 读缓冲区的大小(特指未被消费的那部分缓冲区的大小)
self._read_buffer_size = 0
# read_bytes()方法的第一个参数
self._read_bytes = None
# read callback 当读操作完成之后,会调用该回调函数
self._read_callback = None
# read future 当读操作完成时,会将数据或异常信息填充到该对象中;
self._read_future = None
# 关注的事件
self._state = None
# 异步的读取指定数量的字节。
# 如果指定了callback,那么当读取到指定数量的数据之后,会使用数据作为第一个参数调用这个回调函数;
# 如果没有指定callback,则返回一个Future对象。
# 本次我们只解析 streaming_callback、partial为 默认值的情况。
def
read_bytes(self, num_bytes, callback=None, streaming_callback=None,
partial=False):
future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._read_partial = partial
self._streaming_callback = stack_context.wrap(streaming_callback)
try:
self._try_inline_read()
except:
if future is
not
None:
future.add_done_callback(lambda f: f.exception())
raise
return future
# 如果callback为None,则返回一个Future对象,当读操作完成时,会将数据或异常信息填充到该对象中;
# 否则,将其设置为read callback,当读操作完成之后,会调用该回调函数。
def
_set_read_callback(self, callback):
# 如果 read callback 和 read future 都不为None,说明已经有一个读操作正在执行,抛出错误。
assert self._read_callback is
None, "Already reading"
assert self._read_future is
None, "Already reading"
if callback is
not
None:
self._read_callback = stack_context.wrap(callback)
else:
self._read_future = TracebackFuture()
return self._read_future
# 尝试从读缓冲区,完成当前的读操作。
# 如果读操作能够被满足,则在下一次IOLoop迭代时,执行 read callback;否则,将文件描述符添加到 IOLoop上,并且关注其上的读操作。
def
_try_inline_read(self):
self._run_streaming_callback()
# 1,尝试从读缓冲区完成当前挂起的读操作:
pos = self._find_read_pos()
if pos is
not
None:
# 1.1,如果可以完成,那么从 read buffer 读取数据之后,使用数据调用 read callback,或 填充read future
self._read_from_buffer(pos)
return
self._check_closed()
try:
# 2.尝试从fd上读取能够完成当前读操作的数据
pos = self._read_to_buffer_loop()
except Exception:
self._maybe_run_close_callback()
raise
# 2.1 如果能够从fd上读取到能完成当前读操作的数据,那么从read buffer读取数据
if pos is
not
None:
self._read_from_buffer(pos)
return
# 3,否则将fd添加到IOLoop,并关注其上的读操作
if self.closed():
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
# 尝试从读缓冲区中找到满足
# 当前挂起的读请求的位置,如果当前读请求能够被满足,则返回这个位置,否则返回None。比如,当前的读请求是read_bytes(num_bytes, partial=False),那么当读缓冲区中有
# 大于等于 num_bytes 个字节时,则返回num_bytes 否则,返回None
def
_find_read_pos(self):
if (self._read_bytes is
not
None
and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
return num_bytes
return
None
# 从读缓冲区中,完成当前挂起的读请求。
def
_read_from_buffer(self, pos):
self._read_bytes = self._read_delimiter = self._read_regex = None
self._read_partial = False
self._run_read_callback(pos, False)
def
_run_read_callback(self, size, streaming):
if
1:
pass
else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
if self._read_future is
not
None:
assert callback is
None
# 如果没有设置read callback,则将数据保存到read future
future = self._read_future
self._read_future = None
future.set_result(self._consume(size))
if callback is
not
None:
assert (self._read_future is
None) or streaming
# 如果设置了read callback,那么则在下一次IOLoop迭代时,调度它
self._run_callback(callback, self._consume(size))
else:
self._maybe_add_error_listener()
def
_run_callback(self, callback, *args):
def
wrapper():
self._pending_callbacks -= 1
try:
return callback(*args)
except Exception:
raise
finally:
self._maybe_add_error_listener()
with stack_context.NullContext():
self._pending_callbacks += 1
self.io_loop.add_callback(wrapper)
# 该方法用于检测:一个没有活跃读、写请求的连接是否被关闭,为此,必须监听读事件。然而,在连接刚建立的时候,执行这个操作是无用的,
# 因为这种情况下,我们会立刻进行读写。在IOStream中,很多地方都插入了这个检查,当连接空闲时,那么就监听其上的读事件。
def
_maybe_add_error_listener(self):
if self._pending_callbacks != 0:
return
if self._state is
None
or self._state == ioloop.IOLoop.ERROR:
if self.closed():
self._maybe_run_close_callback()
elif (self._read_buffer_size == 0
and
self._close_callback is
not
None):
self._add_io_state(ioloop.IOLoop.READ)
# 从 read buffer上读取loc个字节,并返回。
def
_consume(self, loc):
if loc == 0:
return
b""
assert loc <= self._read_buffer_size
# 这里用到了memoryview,memoryview为支持buffer protocol的对象,提供了基于字节的访问接口。
# 使用memoryview不会发生内存拷贝。
b = (memoryview(self._read_buffer)
[self._read_buffer_pos:self._read_buffer_pos + loc]
).tobytes()
# 移动读指针 和 修改缓冲区大小
self._read_buffer_pos += loc
self._read_buffer_size -= loc
# 当 读指针 大于 缓冲区大小 的时候,会对缓冲区进行收缩:
# 1,删除已经被消费内容的缓冲区
# 2,将读指针归零
if self._read_buffer_pos > self._read_buffer_size:
del self._read_buffer[:self._read_buffer_pos]
self._read_buffer_pos = 0
return b
# 向事件处理函数添加`state`
def
_add_io_state(self, state):
if self.closed():
return
if self._state is
None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
self.fileno(), self._handle_events, self._state)
elif
not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
# 尝试从 fd上 读取期望数量的字节,
# 如果读取到的数据,能够满足当前的读操作,则返回位置;否则返回None。
def
_read_to_buffer_loop(self):
try:
if self._read_bytes is
not
None:
target_bytes = self._read_bytes
else:
target_bytes = 0
next_find_pos = 0
self._pending_callbacks += 1
while
not self.closed():
# 如果fd的输入缓冲区为空,则退出
if self._read_to_buffer() == 0:
break
self._run_streaming_callback()
if (target_bytes is
not
None
and
self._read_buffer_size >= target_bytes):
break
if self._read_buffer_size >= next_find_pos:
pos = self._find_read_pos()
if pos is
not
None:
return pos
next_find_pos = self._read_buffer_size * 2
return self._find_read_pos()
finally:
self._pending_callbacks -= 1
def
_handle_events(self, fd, events):
if self.closed():
gen_log.warning("Got events for closed stream %s", fd)
return
try:
if events & self.io_loop.READ:
self._handle_read()
# 在处理完读写事件之后,会更改关注的事件。
# 如果当前没有读操作,那么则不再关注读事件,否则关注;
# 如果当前没有写操作,那么则不再关注写事件,否则关注;
# 如果当前连接空闲(也就是没有读写操作,并且读缓冲区为空),
# 则关注读操作(目的是检测连接是否断开),否则不关注;
# 最后,更新关注的事件
state = self.io_loop.ERROR
if self.reading():
state |= self.io_loop.READ
if self.writing():
state |= self.io_loop.WRITE
if state == self.io_loop.ERROR and self._read_buffer_size == 0:
state |= self.io_loop.READ
if state != self._state:
assert self._state is
not
None, \
"shouldn't happen: _handle_events without self._state"
self._state = state
self.io_loop.update_handler(self.fileno(), self._state)
except UnsatisfiableReadError as e:
gen_log.info("Unsatisfiable read, closing connection: %s" % e)
self.close(exc_info=True)
except Exception:
gen_log.error("Uncaught exception, closing connection.",
exc_info=True)
self.close(exc_info=True)
raise
# 从fd上读取数据,并追加到读缓冲区的末尾。
# 该方法返回实际读取的自节数。如果没有读到任何数据,则返回0。
# 出错时,关闭fd,并抛出异常。
def
_read_to_buffer(self):
while
True:
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
if self._is_connreset(e):
self.close(exc_info=True)
return
self.close(exc_info=True)
raise
break
if chunk is
None:
return
0
self._read_buffer += chunk
self._read_buffer_size += len(chunk)
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
self.close()
raise StreamBufferFullError("Reached maximum read buffer size")
return len(chunk)
好的,以上就是本期分享的源码批注,我们再来简单总结一下:
read_bytes(num_bytes)的大致执行流程是:
1. 如果read buffer有num_bytes个字节,goto 步骤4
2. 从fd的缓冲区读数据到read buffer,一直到:
读到了期望数量的数据,goto 步骤4
或者,fd的缓冲区被读空
3. 将fd 注册到IOLoop,并关注其上的读事件,每次IOLoop迭代的时候,如果fd上有读事件发生,则执行步骤2
4. 从read buffer上消费num_bytes个字节,
若设置了 read callback ,则在下一次IOLoop迭代的时候,执行callback
否则,将数据填充到 read future
好的,本期我们给出这部分核心代码的批注,希望以上内容对你有所帮助,感谢大家的支持,谢谢!
本文分享自 python编程从入门到实践 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!