iostream.py
A utility class to write to and read from a non-blocking socket.
IOStream 对 socket 进行包装,采用注册回调方式实现非阻塞。
通过接口注册各个事件回调
- _read_callback
- _write_callback
- _close_callback
- _connect_callback
ioloop 中 socket 事件发生后,调用 IOStream._handle_events 方法,对事件进行分发。
对应的事件处理过程中,如果满足注册的回调条件,则调用回调函数
回调函数在 IOStream._handle_events 中被调用
contents
- iostream.py
- contents
- example
- head
- IOStream.__init__
- IOStream.connect
- IOStream.read_until
- IOStream.read_bytes
- IOStream.write
- IOStream.close
- IOStream._handle_events
- IOStream._run_callback
- IOStream._run_callback
- IOStream._read_from_socket
- IOStream._read_to_buffer
- IOStream._read_from_buffer
- IOStream._handle_connect
- IOStream._handle_write
- IOStream._consume
- IOStream._add_io_state
- IOStream._read_buffer_size
- copyright
example
一个简单的 IOStream 客户端示例
由此可见, IOStream 是一个异步回调链
- 创建 socket
- 创建 IOStream 对象
- 连接到主机,传入连接成功后回调函数 send_request
- socket 输出数据请求页面,读取 head, 传入读取 head 成功后回调函数 on_headers
- 继续读取 body, 传入读取 body 成功后回调函数 on_body
- 关闭 stream,关闭 ioloop
1 | from tornado import ioloop |
head
1 | from __future__ import with_statement |
IOStream.__init__
包装 socket 类
关键语句 self.io_loop.add_handler(self.socket.fileno(), self._handle_events, self._state)
将自身的_handle_events 加入到全局 ioloop poll 事件回调
此时只注册了 ERROR 类型事件
_read_buffer: 读缓冲
1 | class IOStream(object): |
IOStream.connect
连接 socket 到远程地址,非阻塞模式
- 连接 socket
- 注册连接完成回调
- poll 增加 socket 写事件
1 | def connect(self, address, callback=None): |
IOStream.read_until
- 注册读完成回调
- 尝试从缓冲中读
- 从 socket 中读到缓冲区
- 重复 2,3, 没有数据则退出
- 将 socket 读事件加入 poll
如果缓存中数据满足条件,则直接执行 callback 并返回,
否则,保存 callback 函数下次 read 事件发生时,_handle_events 处理读事件时,再进行检测及调用
1 | def read_until(self, delimiter, callback): |
IOStream.read_bytes
参考 read_until,读限定字节
1 | def read_bytes(self, num_bytes, callback): |
IOStream.write
1 | def write(self, data, callback=None): |
IOStream.close
- 从 ioloop 移除 socket 事件
- 关闭 socket
- 调用关闭回调
1 | def close(self): |
IOStream._handle_events
核心回调
任何类型的 socket 事件触发 ioloop 回调_handle_events,然后在_handle_events 再进行分发
值得注意的是,IOStream 不处理连接请求的 read 事件
注意
作为服务端,默认代理的是已经建立连接的 socket
1 | # HTTPServer.\_handle_events |
作为客户端,需要手动调用 IOStream.connect,连接成功后,成功回调在 write 事件中处理
这个实现比较别扭
1 | def _handle_events(self, fd, events): |
IOStream._run_callback
执行回调
1 | def _run_callback(self, callback, *args, **kwargs): |
IOStream._run_callback
读回调
- 从 socket 读取数据到缓存
- 无数据, socket 关闭
- 检测是否满足 read_until read_bytes
- 满足则执行对应回调
1 | def _handle_read(self): |
IOStream._read_from_socket
从 socket 读取数据
1 | def _read_from_socket(self): |
IOStream._read_to_buffer
从 socket 读取数据存入缓存
1 | def _read_to_buffer(self): |
IOStream._read_from_buffer
从缓冲中过滤数据
检测是否满足结束条件 (read_until/read_bytes),满足则调用之前注册的回调
采用的是查询方式
1 | def _read_from_buffer(self): |
IOStream._handle_connect
调用连接建立回调,并清除连接中标志
1 | def _handle_connect(self): |
IOStream._handle_write
写事件
- 从缓冲区获取限定范围内数据
- 调用 socket.send 输出数据
- 如果数据发送我且已注册回调,调用发送完成回调
1 | def _handle_write(self): |
IOStream._consume
从读缓存消费 loc 长度的数据
1 | def _consume(self, loc): |
IOStream._add_io_state
增加 socket 事件状态
1 | def _add_io_state(self, state): |
IOStream._read_buffer_size
获取读缓存中已有数据长度
1 | def _read_buffer_size(self): |
copyright
author:bigfish
copyright: 许可协议 知识共享署名 - 非商业性使用 4.0 国际许可协议
Sync From: https://github.com/TheBigFish/blog/issues/8