理想-咸鱼

鱼翔浅底


  • Home

  • Archives

  • Tags

  • Categories

tornado源码之iostream

Posted on 2018-12-20
Words count in article: 2.8k | Reading time ≈ 15

iostream.py

A utility class to write to and read from a non-blocking socket.

IOStream 对 socket 进行包装,采用注册回调方式实现非阻塞。
通过接口注册各个事件回调

  • _read_callback
  • _write_callback
  • _close_callback
  • _connect_callback

ioloop 中 socket 事件发生后,调用 IOStream._handle_events 方法,对事件进行分发。
对应的事件处理过程中,如果满足注册的回调条件,则调用回调函数
回调函数在 IOStream._handle_events 中被调用

contents

  • iostream.py
    • contents
    • example
    • head
    • IOStream.__init__
    • IOStream.connect
    • IOStream.read_until
    • IOStream.read_bytes
    • IOStream.write
    • IOStream.close
    • IOStream._handle_events
    • IOStream._run_callback
    • IOStream._run_callback
    • IOStream._read_from_socket
    • IOStream._read_to_buffer
    • IOStream._read_from_buffer
    • IOStream._handle_connect
    • IOStream._handle_write
    • IOStream._consume
    • IOStream._add_io_state
    • IOStream._read_buffer_size
    • copyright

example

一个简单的 IOStream 客户端示例
由此可见, IOStream 是一个异步回调链

  1. 创建 socket
  2. 创建 IOStream 对象
  3. 连接到主机,传入连接成功后回调函数 send_request
  4. socket 输出数据请求页面,读取 head, 传入读取 head 成功后回调函数 on_headers
  5. 继续读取 body, 传入读取 body 成功后回调函数 on_body
  6. 关闭 stream,关闭 ioloop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from tornado import ioloop
from tornado import iostream
import socket


def send_request():
stream.write("GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")
stream.read_until("\r\n\r\n", on_headers)


def on_headers(data):
headers = {}
for line in data.split("\r\n"):
parts = line.split(":")
if len(parts) == 2:
headers[parts[0].strip()] = parts[1].strip()
stream.read_bytes(int(headers["Content-Length"]), on_body)


def on_body(data):
print data
stream.close()
ioloop.IOLoop.instance().stop()


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = iostream.IOStream(s)
stream.connect(("baidu.com", 80), send_request)
ioloop.IOLoop.instance().start()


# html>
# <meta http-equiv="refresh" content="0; url=http://www.baidu.com/">
# </html>

head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from __future__ import with_statement

import collections
import errno
import logging
import socket
import sys

from tornado import ioloop
from tornado import stack_context

try:
import ssl # Python 2.6+
except ImportError:
ssl = None

IOStream.__init__

包装 socket 类
关键语句 self.io_loop.add_handler(self.socket.fileno(), self._handle_events, self._state) 将自身的_handle_events 加入到全局 ioloop poll 事件回调
此时只注册了 ERROR 类型事件

_read_buffer: 读缓冲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class IOStream(object):

def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
read_chunk_size=4096):
self.socket = socket
self.socket.setblocking(False)
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.max_buffer_size = max_buffer_size
self.read_chunk_size = read_chunk_size
self._read_buffer = collections.deque()
self._write_buffer = collections.deque()
self._write_buffer_frozen = False
self._read_delimiter = None
self._read_bytes = None
self._read_callback = None
self._write_callback = None
self._close_callback = None
self._connect_callback = None
self._connecting = False
self._state = self.io_loop.ERROR
with stack_context.NullContext():
self.io_loop.add_handler(
self.socket.fileno(), self._handle_events, self._state)

IOStream.connect

连接 socket 到远程地址,非阻塞模式

  1. 连接 socket
  2. 注册连接完成回调
  3. poll 增加 socket 写事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def connect(self, address, callback=None):
"""Connects the socket to a remote address without blocking.

May only be called if the socket passed to the constructor was
not previously connected. The address parameter is in the
same format as for socket.connect, i.e. a (host, port) tuple.
If callback is specified, it will be called when the
connection is completed.

Note that it is safe to call IOStream.write while the
connection is pending, in which case the data will be written
as soon as the connection is ready. Calling IOStream read
methods before the socket is connected works on some platforms
but is non-portable.
"""
self._connecting = True
try:
self.socket.connect(address)
except socket.error, e:
# In non-blocking mode connect() always raises an exception
if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
raise
self._connect_callback = stack_context.wrap(callback)
self._add_io_state(self.io_loop.WRITE)

IOStream.read_until

  1. 注册读完成回调
  2. 尝试从缓冲中读
  3. 从 socket 中读到缓冲区
  4. 重复 2,3, 没有数据则退出
  5. 将 socket 读事件加入 poll

如果缓存中数据满足条件,则直接执行 callback 并返回,
否则,保存 callback 函数下次 read 事件发生时,_handle_events 处理读事件时,再进行检测及调用

1
2
3
4
5
6
7
8
9
10
11
12
13
def read_until(self, delimiter, callback):
"""Call callback when we read the given delimiter."""
assert not self._read_callback, "Already reading"
self._read_delimiter = delimiter
self._read_callback = stack_context.wrap(callback)
while True:
# See if we've already got the data from a previous read
if self._read_from_buffer():
return
self._check_closed()
if self._read_to_buffer() == 0:
break
self._add_io_state(self.io_loop.READ)

IOStream.read_bytes

参考 read_until,读限定字节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def read_bytes(self, num_bytes, callback):
"""Call callback when we read the given number of bytes."""
assert not self._read_callback, "Already reading"
if num_bytes == 0:
callback("")
return
self._read_bytes = num_bytes
self._read_callback = stack_context.wrap(callback)
while True:
if self._read_from_buffer():
return
self._check_closed()
if self._read_to_buffer() == 0:
break
self._add_io_state(self.io_loop.READ)

IOStream.write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def write(self, data, callback=None):
"""Write the given data to this stream.

If callback is given, we call it when all of the buffered write
data has been successfully written to the stream. If there was
previously buffered write data and an old write callback, that
callback is simply overwritten with this new callback.
"""
self._check_closed()
self._write_buffer.append(data)
self._add_io_state(self.io_loop.WRITE)
self._write_callback = stack_context.wrap(callback)

def set_close_callback(self, callback):
"""Call the given callback when the stream is closed."""
self._close_callback = stack_context.wrap(callback)

IOStream.close

  1. 从 ioloop 移除 socket 事件
  2. 关闭 socket
  3. 调用关闭回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def close(self):
"""Close this stream."""
if self.socket is not None:
self.io_loop.remove_handler(self.socket.fileno())
self.socket.close()
self.socket = None
if self._close_callback:
self._run_callback(self._close_callback)

def reading(self):
"""Returns true if we are currently reading from the stream."""
return self._read_callback is not None

def writing(self):
"""Returns true if we are currently writing to the stream."""
return bool(self._write_buffer)

def closed(self):
return self.socket is None

IOStream._handle_events

核心回调
任何类型的 socket 事件触发 ioloop 回调_handle_events,然后在_handle_events 再进行分发
值得注意的是,IOStream 不处理连接请求的 read 事件
注意
作为服务端,默认代理的是已经建立连接的 socket

1
2
3
# HTTPServer.\_handle_events
# connection 为已经accept的连接
stream = iostream.IOStream(connection, io_loop=self.io_loop)

作为客户端,需要手动调用 IOStream.connect,连接成功后,成功回调在 write 事件中处理

这个实现比较别扭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def _handle_events(self, fd, events):
if not self.socket:
logging.warning("Got events for closed stream %d", fd)
return
try:
# 处理读事件,调用已注册回调
if events & self.io_loop.READ:
self._handle_read()
if not self.socket:
return
# 处理写事件,如果是刚建立连接,调用连接建立回调
if events & self.io_loop.WRITE:
if self._connecting:
self._handle_connect()
self._handle_write()
if not self.socket:
return
# 错误事件,关闭 socket
if events & self.io_loop.ERROR:
self.close()
return
state = self.io_loop.ERROR
if self.reading():
state |= self.io_loop.READ
if self.writing():
state |= self.io_loop.WRITE
if state != self._state:
self._state = state
self.io_loop.update_handler(self.socket.fileno(), self._state)
except:
logging.error("Uncaught exception, closing connection.",
exc_info=True)
self.close()
raise

IOStream._run_callback

执行回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _run_callback(self, callback, *args, **kwargs):
try:
# Use a NullContext to ensure that all StackContexts are run
# inside our blanket exception handler rather than outside.
with stack_context.NullContext():
callback(*args, **kwargs)
except:
logging.error("Uncaught exception, closing connection.",
exc_info=True)
# Close the socket on an uncaught exception from a user callback
# (It would eventually get closed when the socket object is
# gc'd, but we don't want to rely on gc happening before we
# run out of file descriptors)
self.close()
# Re-raise the exception so that IOLoop.handle_callback_exception
# can see it and log the error
raise

IOStream._run_callback

读回调

  1. 从 socket 读取数据到缓存
  2. 无数据, socket 关闭
  3. 检测是否满足 read_until read_bytes
  4. 满足则执行对应回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _handle_read(self):
while True:
try:
# Read from the socket until we get EWOULDBLOCK or equivalent.
# SSL sockets do some internal buffering, and if the data is
# sitting in the SSL object's buffer select() and friends
# can't see it; the only way to find out if it's there is to
# try to read it.
result = self._read_to_buffer()
except Exception:
self.close()
return
if result == 0:
break
else:
if self._read_from_buffer():
return

IOStream._read_from_socket

从 socket 读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _read_from_socket(self):
"""Attempts to read from the socket.

Returns the data read or None if there is nothing to read.
May be overridden in subclasses.
"""
try:
chunk = self.socket.recv(self.read_chunk_size)
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return None
else:
raise
if not chunk:
self.close()
return None
return chunk

IOStream._read_to_buffer

从 socket 读取数据存入缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _read_to_buffer(self):
"""Reads from the socket and appends the result to the read buffer.

Returns the number of bytes read. Returns 0 if there is nothing
to read (i.e. the read returns EWOULDBLOCK or equivalent). On
error closes the socket and raises an exception.
"""
try:
chunk = self._read_from_socket()
except socket.error, e:
# ssl.SSLError is a subclass of socket.error
logging.warning("Read error on %d: %s",
self.socket.fileno(), e)
self.close()
raise
if chunk is None:
return 0
self._read_buffer.append(chunk)
if self._read_buffer_size() >= self.max_buffer_size:
logging.error("Reached maximum read buffer size")
self.close()
raise IOError("Reached maximum read buffer size")
return len(chunk)

IOStream._read_from_buffer

从缓冲中过滤数据
检测是否满足结束条件 (read_until/read_bytes),满足则调用之前注册的回调
采用的是查询方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def _read_from_buffer(self):
"""Attempts to complete the currently-pending read from the buffer.

Returns True if the read was completed.
"""
if self._read_bytes:
if self._read_buffer_size() >= self._read_bytes:
num_bytes = self._read_bytes
callback = self._read_callback
self._read_callback = None
self._read_bytes = None
self._run_callback(callback, self._consume(num_bytes))
return True
elif self._read_delimiter:
_merge_prefix(self._read_buffer, sys.maxint)
loc = self._read_buffer[0].find(self._read_delimiter)
if loc != -1:
callback = self._read_callback
delimiter_len = len(self._read_delimiter)
self._read_callback = None
self._read_delimiter = None
self._run_callback(callback,
self._consume(loc + delimiter_len))
return True
return False

IOStream._handle_connect

调用连接建立回调,并清除连接中标志

1
2
3
4
5
6
def _handle_connect(self):
if self._connect_callback is not None:
callback = self._connect_callback
self._connect_callback = None
self._run_callback(callback)
self._connecting = False

IOStream._handle_write

写事件

  1. 从缓冲区获取限定范围内数据
  2. 调用 socket.send 输出数据
  3. 如果数据发送我且已注册回调,调用发送完成回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def _handle_write(self):
while self._write_buffer:
try:
if not self._write_buffer_frozen:
# On windows, socket.send blows up if given a
# write buffer that's too large, instead of just
# returning the number of bytes it was able to
# process. Therefore we must not call socket.send
# with more than 128KB at a time.
_merge_prefix(self._write_buffer, 128 * 1024)
num_bytes = self.socket.send(self._write_buffer[0])
self._write_buffer_frozen = False
_merge_prefix(self._write_buffer, num_bytes)
self._write_buffer.popleft()
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
# With OpenSSL, after send returns EWOULDBLOCK,
# the very same string object must be used on the
# next call to send. Therefore we suppress
# merging the write buffer after an EWOULDBLOCK.
# A cleaner solution would be to set
# SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
# not yet accessible from python
# (http://bugs.python.org/issue8240)
self._write_buffer_frozen = True
break
else:
logging.warning("Write error on %d: %s",
self.socket.fileno(), e)
self.close()
return
if not self._write_buffer and self._write_callback:
callback = self._write_callback
self._write_callback = None
self._run_callback(callback)

IOStream._consume

从读缓存消费 loc 长度的数据

1
2
3
4
5
6
7
def _consume(self, loc):
_merge_prefix(self._read_buffer, loc)
return self._read_buffer.popleft()

def _check_closed(self):
if not self.socket:
raise IOError("Stream is closed")

IOStream._add_io_state

增加 socket 事件状态

1
2
3
4
5
6
7
def _add_io_state(self, state):
if self.socket is None:
# connection has been closed, so there can be no future events
return
if not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.socket.fileno(), self._state)

IOStream._read_buffer_size

获取读缓存中已有数据长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    def _read_buffer_size(self):
return sum(len(chunk) for chunk in self._read_buffer)


class SSLIOStream(IOStream):
"""A utility class to write to and read from a non-blocking socket.

If the socket passed to the constructor is already connected,
it should be wrapped with
ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
before constructing the SSLIOStream. Unconnected sockets will be
wrapped when IOStream.connect is finished.
"""
def __init__(self, *args, **kwargs):
"""Creates an SSLIOStream.

If a dictionary is provided as keyword argument ssl_options,
it will be used as additional keyword arguments to ssl.wrap_socket.
"""
self._ssl_options = kwargs.pop('ssl_options', {})
super(SSLIOStream, self).__init__(*args, **kwargs)
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False

def reading(self):
return self._handshake_reading or super(SSLIOStream, self).reading()

def writing(self):
return self._handshake_writing or super(SSLIOStream, self).writing()

def _do_ssl_handshake(self):
# Based on code from test_ssl.py in the python stdlib
try:
self._handshake_reading = False
self._handshake_writing = False
self.socket.do_handshake()
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
self._handshake_reading = True
return
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
self._handshake_writing = True
return
elif err.args[0] in (ssl.SSL_ERROR_EOF,
ssl.SSL_ERROR_ZERO_RETURN):
return self.close()
elif err.args[0] == ssl.SSL_ERROR_SSL:
logging.warning("SSL Error on %d: %s", self.socket.fileno(), err)
return self.close()
raise
except socket.error, err:
if err.args[0] == errno.ECONNABORTED:
return self.close()
else:
self._ssl_accepting = False
super(SSLIOStream, self)._handle_connect()

def _handle_read(self):
if self._ssl_accepting:
self._do_ssl_handshake()
return
super(SSLIOStream, self)._handle_read()

def _handle_write(self):
if self._ssl_accepting:
self._do_ssl_handshake()
return
super(SSLIOStream, self)._handle_write()

def _handle_connect(self):
self.socket = ssl.wrap_socket(self.socket,
do_handshake_on_connect=False,
**self._ssl_options)
# Don't call the superclass's _handle_connect (which is responsible
# for telling the application that the connection is complete)
# until we've completed the SSL handshake (so certificates are
# available, etc).


def _read_from_socket(self):
try:
# SSLSocket objects have both a read() and recv() method,
# while regular sockets only have recv().
# The recv() method blocks (at least in python 2.6) if it is
# called when there is nothing to read, so we have to use
# read() instead.
chunk = self.socket.read(self.read_chunk_size)
except ssl.SSLError, e:
# SSLError is a subclass of socket.error, so this except
# block must come first.
if e.args[0] == ssl.SSL_ERROR_WANT_READ:
return None
else:
raise
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return None
else:
raise
if not chunk:
self.close()
return None
return chunk

def _merge_prefix(deque, size):
"""Replace the first entries in a deque of strings with a single
string of up to size bytes.

>>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
>>> _merge_prefix(d, 5); print d
deque(['abcde', 'fghi', 'j'])

Strings will be split as necessary to reach the desired size.
>>> _merge_prefix(d, 7); print d
deque(['abcdefg', 'hi', 'j'])

>>> _merge_prefix(d, 3); print d
deque(['abc', 'defg', 'hi', 'j'])

>>> _merge_prefix(d, 100); print d
deque(['abcdefghij'])
"""
prefix = []
remaining = size
while deque and remaining > 0:
chunk = deque.popleft()
if len(chunk) > remaining:
deque.appendleft(chunk[remaining:])
chunk = chunk[:remaining]
prefix.append(chunk)
remaining -= len(chunk)
deque.appendleft(''.join(prefix))

def doctests():
import doctest
return doctest.DocTestSuite()

copyright

author:bigfish
copyright: 许可协议 知识共享署名 - 非商业性使用 4.0 国际许可协议


Sync From: https://github.com/TheBigFish/blog/issues/8

python decorators

Posted on 2018-11-29
Words count in article: 773 | Reading time ≈ 3

python decorators

装饰器基础

Decorator 本质

@ 本质是语法糖 - Syntactic Sugar
使用 @decorator 来修饰某个函数 func 时:

1
2
3
@decorator
def func():
pass

其解释器会解释成:

1
func = decorator(func)

注意这条语句会被执行

多重装饰器

1
2
3
4
@decorator_one
@decorator_two
def func():
pass

相当于:

1
func = decorator_one(decorator_two(func))

带参数装饰器

1
2
3
@decorator(arg1, arg2)
def func():
pass

相当于:

1
func = decorator(arg1,arg2)(func)

使用 *args、**kwargs 给被装饰函数传递参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def wrapper(func):
def wrapper_in(*args, **kwargs):
# args是一个数组,kwargs一个字典
print("%s is running" % func.__name__)
return func(*args, **kwargs)
return wrapper_in

@wrapper
def func(parameter1, parameter2, key1=1):
print("call func with {} {} {}".format(parameter1, parameter2, key1))


func("haha", None, key1=2)

# func is running
# call func with haha None 2

带参数的装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def log(level):
def decorator(func):
def wrapper(*args, **kwargs):
if level == "warn":
print("%s with warn is running" % func.__name__)
elif level == "info":
print("%s with info is running" % func.__name__)
return func(*args, **kwargs)
return wrapper

return decorator


@log("warn")
def foo(*args, **kwargs):
print("args {}, kwargs{}".format(args, kwargs))

foo(1, 2, a = 3)

# foo with warn is running
# args (1, 2), kwargs{'a': 3}

等同于

1
2
3
4
def foo(name='foo'):
print("args {}, kwargs{}".format(args, kwargs))

foo = log("warn")(foo)

方法装饰器

类方法是一个特殊的函数,它的第一个参数 self 指向类实例
所以我们同样可以装饰类方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def decorate(func):
def wrapper(self):
return "<p>{0}</p>".format(func(self))
return wrapper

class Person(object):
def __init__(self):
self.name = "John"
self.family = "Doe"

@decorate
def get_fullname(self):
return self.name+" "+self.family

my_person = Person()
print my_person.get_fullname()

# <p>John Doe</p>

上例相当于固定了 self 参数, 不太灵活
使用 *args, **kwargs传递给 wrapper 更加通用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def pecorate(func):
def wrapper(*args, **kwargs):
return "<p>{0}</p>".format(func(*args, **kwargs))
return wrapper

class Person(object):
def __init__(self):
self.name = "John"
self.family = "Doe"

@pecorate
def get_fullname(self):
return self.name+" "+self.family

my_person = Person()

print my_person.get_fullname()

类装饰器

类实现 __call__ 方法后变成可调用对象,故可以用类做装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class EntryExit(object):

def __init__(self, f):
self.f = f

def __call__(self):
print "Entering", self.f.__name__
self.f()
print "Exited", self.f.__name__

@EntryExit
def func1():
print "inside func1()"

@EntryExit
def func2():
print "inside func2()"

def func3():
pass

print type(EntryExit(None))
# func1 变为类实例
print type(func1)
print type(EntryExit)
# func3 是普通函数
print type(func3)
func1()
func2()

# <class '__main__.EntryExit'>
# <class '__main__.EntryExit'>
# <type 'type'>
# <type 'function'>
# Entering func1
# inside func1()
# Exited func1
# Entering func2
# inside func2()
# Exited func2

类装饰器

1
2
3
@EntryExit
def func1():
print "inside func1()"

等同于

1
2
3
4
def func1():
print "inside func1()"
# 此处可以看出 func1 是类EntryExit的一个实例
func1 = EntryExit(myfunc1)

装饰器装饰类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
register_handles = []


def route(url):
global register_handles

def register(handler):
register_handles.append((".*$", [(url, handler)]))
return handler

return register

@route("/index")
class Index():
def get(self, *args, **kwargs):
print("hi")

# Index 仍然为原来定义的类实例
# 相当于在定义类的同时调用装饰器函数 route, 将该类注册到全局路由 register_handles
@route("/main")
class Main():
def get(self, *args, **kwargs):
print("hi")

print (register_handles)

print(type(Index))

# [('.*$', [('/index', <class __main__.Index at 0x0000000002A49828>)]), ('.*$', [('/main', <class __main__.Main at 0x0000000002FBABE8>)])]
# <type 'classobj'>
1
2
3
4
@route("/index")
class Index():
def get(self, *args, **kwargs):
print("hi")
1
2
Index = route("/index")(Index)
# register 返回传入的 handler,故 Index 仍然为类对象

functools

上述装饰器实现有个问题,就是被装饰函数的属性被改变


Sync From: https://github.com/TheBigFish/blog/issues/7

python setup.py 浅析

Posted on 2018-11-16
Words count in article: 2.1k | Reading time ≈ 10

python setup.py 浅析

setuptools.setup() 参数说明

packages

对于所有 packages 列表里提到的纯 Python 模块做处理
需要在 setup 脚本里有一个包名到目录的映射。
默认对于 setup 脚本所在目录下同名的目录即视为包所在目录。
当你在 setup 脚本中写入 packages = [‘foo’] 时, setup 脚本的同级目录下可以找到 foo/__init__.py。如果没有找到对应文件,disutils 不会直接报错,而是给出一个告警然后继续进行有问题的打包流程。

package_dir

阐明包名到目录的映射,见 packages

package_dir = {'': 'lib'}

键: 代表了包的名字,空的包名则代表 root package(不在任何包中的顶层包)。
值: 代表了对于 setup 脚本所在目录的相对路径.

packages = ['foo']
package_dir = {'': 'lib'}

指明包位于 lib/foo/, lib/foo/__init__.py 这个文件存在

另一种方法则是直接将 foo 这个包的内容全部放入 lib 而不是在 lib 下建一个 foo 目录

package_dir = {'foo': 'lib'}

一个在 package_dir 字典中的 package: dir 映射会对当前包下的所有包都生效, 所以 foo.bar 会自动生效. 在这个例子当中, packages = ['foo', 'foo.bar'] 告诉 distutils 去寻找 lib/__init__.py 和 lib/bar/__init__.py.

py_modules

对于一个相对较小的模块的发布,你可能更想要列出所有模块而不是列出所有的包,尤其是对于那种根目录下就是一个简单模块的类型.
这描述了两个包,一个在根目录下,另一个则在 pkg 目录下。
默认的 “包:目录” 映射关系表明你可以在 setup 脚本所在的路径下找到 mod1.py 和 pkg/mod2.py。
当然,你也可以用 package_dir 选项重写这层映射关系就是了。

find_packages

packages=find_packages(exclude=(‘tests’, ‘robot_server.scripts’)),
exclude 里面是包名,而非路径

include_package_data

引入包内的非 Python 文件
include_package_data 需要配合 MANIFEST.in 一起使用

MANIFEST.in:

1
2
include myapp/scripts/start.py
recursive-include myapp/static *
1
2
3
4
5
6
setup(
name='MyApp', # 应用名
version='1.0', # 版本号
packages=['myapp'], # 包括在安装包内的Python包
include_package_data=True # 启用清单文件MANIFEST.in
)

注意,此处引入或者排除的文件必须是 package 内的文件

setup-demo/
  ├ mydata.data      # 数据文件
  ├ setup.py         # 安装文件
  ├ MANIFEST.in      # 清单文件
  └ myapp/           # 源代码
      ├ static/      # 静态文件目录
      ├ __init__.py
      ...

在 MANIFEST.in 引入 include mydata.data 将不起作用

exclude_package_date

排除一部分包文件
{‘myapp’:[‘.gitignore]},就表明只排除 myapp 包下的所有. gitignore 文件。

data_files

指定其他的一些文件(如配置文件)

1
2
3
data_files=[('bitmaps', ['bm/b1.gif', 'bm/b2.gif']),
('config', ['cfg/data.cfg']),
('/etc/init.d', ['init-script'])]

规定了哪些文件被安装到哪些目录中。
如果目录名是相对路径 (比如 bitmaps),则是相对于 sys.prefix(/usr) 或 sys.exec_prefix 的路径。
否则安装到绝对路径 (比如 /etc/init.d)。

cmdclass

定制化命令,通过继承 setuptools.command 下的命令类来进行定制化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class UploadCommand(Command):
"""Support setup.py upload."""
...

def run(self):
try:
self.status('Removing previous builds…')
rmtree(os.path.join(here, 'dist'))
except OSError:
pass

self.status('Building Source and Wheel (universal) distribution…')
os.system('{0} setup.py sdist bdist_wheel --universal'.format(sys.executable))

self.status('Uploading the package to PyPI via Twine…')
os.system('twine upload dist/*')

self.status('Pushing git tags…')
os.system('git tag v{0}'.format(about['__version__']))
os.system('git push --tags')

sys.exit()

setup(
...
# $ setup.py publish support.
cmdclass={
'upload': UploadCommand,
},
)

这样可以通过 python setup.py upload 运行打包上传代码

install_requires

安装这个包所需要的依赖,列表

tests_require

与 install_requires 作用相似,单元测试时所需要的依赖

虚拟运行环境下安装包

以 legit 为例

  • 下载 lgit 源码
    git clone https://github.com/kennethreitz/legit.git

  • 创建虚拟运行环境
    virtualenv --no-site-packages venv
    运行环境目录结构为:

    venv/
    ├── bin
    ├── include
    ├── lib
    ├── local
    └── pip-selfcheck.json
  • 打包工程
    python3 setup.py sdist bdist_wheel

    .
    ├── AUTHORS
    ├── build
    │   ├── bdist.linux-x86_64
    │   └── lib.linux-x86_64-2.7
    ├── dist
    │   ├── legit-1.0.1-py2.py3-none-any.whl
    │   └── legit-1.0.1.tar.gz

    在 dist 下生成了安装包

  • 进入虚拟环境
    source venv/bin/activate

  • 安装包
    pip install ./dist/legit-1.0.1.tar.gz

    Successfully built legit args clint
    Installing collected packages: appdirs, args, click, lint, colorama, crayons, smmap2, gitdb2, GitPython, ix, pyparsing, packaging, legit
    Successfully installed GitPython-2.1.8 appdirs-1.4.3 rgs-0.1.0 click-6.7 clint-0.5.1 colorama-0.4.0 rayons-0.1.2 gitdb2-2.0.3 legit-1.0.1 packaging-17.1 yparsing-2.2.0 six-1.11.0 smmap2-2.0.3

安装过程分析

venv/lib/python2.7/site-packages/ 下安装了 legit 及依赖包

legit/venv/lib/python2.7/site-packages$ tree -L 1

.
├── appdirs-1.4.3.dist-info
├── appdirs.py
├── appdirs.pyc
├── args-0.1.0.dist-info
├── args.py
├── args.pyc
├── click
├── click-6.7.dist-info
├── clint
├── clint-0.5.1.dist-info
├── colorama
├── colorama-0.4.0.dist-info
├── crayons-0.1.2.dist-info
├── crayons.py
├── crayons.pyc
├── easy_install.py
├── easy_install.pyc
├── git
├── gitdb
├── gitdb2-2.0.3.dist-info
├── GitPython-2.1.8.dist-info
├── legit
├── legit-1.0.1.dist-info
├── packaging
├── packaging-17.1.dist-info
├── pip
├── pip-18.1.dist-info
├── pkg_resources
├── pyparsing-2.2.0.dist-info
├── pyparsing.py
├── pyparsing.pyc
├── setuptools
├── setuptools-40.6.2.dist-info
├── six-1.11.0.dist-info
├── six.py
├── six.pyc
├── smmap
├── smmap2-2.0.3.dist-info
├── wheel
└── wheel-0.32.2.dist-info

venv/bin 下新增可执行文件 legit, 内容为

1
2
3
4
5
6
7
8
9
10
11
#!/home/turtlebot/learn/python/legit/venv/bin/python

# -*- coding: utf-8 -*-
import re
import sys

from legit.cli import cli

if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
sys.exit(cli())

此时,可以直接运行

1
>>> legit

setup.py 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
from codecs import open # To use a consistent encoding

from setuptools import setup # Always prefer setuptools over distutils

APP_NAME = 'legit'
APP_SCRIPT = './legit_r'
VERSION = '1.0.1'


# Grab requirements.
with open('reqs.txt') as f:
required = f.readlines()


settings = dict()


# Publish Helper.
if sys.argv[-1] == 'publish':
os.system('python setup.py sdist bdist_wheel upload')
sys.exit()


if sys.argv[-1] == 'build_manpage':
os.system('rst2man.py README.rst > extra/man/legit.1')
sys.exit()


# Build Helper.
if sys.argv[-1] == 'build':
import py2exe # noqa
sys.argv.append('py2exe')

settings.update(
console=[{'script': APP_SCRIPT}],
zipfile=None,
options={
'py2exe': {
'compressed': 1,
'optimize': 0,
'bundle_files': 1}})

settings.update(
name=APP_NAME,
version=VERSION,
description='Git for Humans.',
long_description=open('README.rst').read(),
author='Kenneth Reitz',
author_email='me@kennethreitz.com',
url='https://github.com/kennethreitz/legit',
packages=['legit'],
install_requires=required,
license='BSD',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Natural Language :: English',
'License :: OSI Approved :: BSD License',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
],
entry_points={
'console_scripts': [
'legit = legit.cli:cli',
],
}
)


setup(**settings)
  • packages=[‘legit’] 引入 legit 目录下的所有默认引入文件
  • install_requires=required 指明安装时需要额外安装的第三方库
  • 'console_scripts': ['legit = legit.cli:cli',] 生成可执行控制台程序,程序名为 legit, 运行 legit.cli 中的 cli() 函数。最终会在 bin/ 下生成 legit 可执行 py 文件,调用制定的函数

setup.py 实例分析

kennethreitz/setup.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Note: To use the 'upload' functionality of this file, you must:
# $ pip install twine

import io
import os
import sys
from shutil import rmtree

from setuptools import find_packages, setup, Command

# Package meta-data.
NAME = 'mypackage'
DESCRIPTION = 'My short description for my project.'
URL = 'https://github.com/me/myproject'
EMAIL = 'me@example.com'
AUTHOR = 'Awesome Soul'
REQUIRES_PYTHON = '>=3.6.0'
VERSION = None

# What packages are required for this module to be executed?
REQUIRED = [
# 'requests', 'maya', 'records',
]

# What packages are optional?
EXTRAS = {
# 'fancy feature': ['django'],
}

# The rest you shouldn't have to touch too much :)
# ------------------------------------------------
# Except, perhaps the License and Trove Classifiers!
# If you do change the License, remember to change the Trove Classifier for that!

here = os.path.abspath(os.path.dirname(__file__))

# Import the README and use it as the long-description.
# Note: this will only work if 'README.md' is present in your MANIFEST.in file!
try:
with io.open(os.path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = '\n' + f.read()
except FileNotFoundError:
long_description = DESCRIPTION

# Load the package's __version__.py module as a dictionary.
about = {}
if not VERSION:
with open(os.path.join(here, NAME, '__version__.py')) as f:
exec(f.read(), about)
else:
about['__version__'] = VERSION


class UploadCommand(Command):
"""Support setup.py upload."""

description = 'Build and publish the package.'
user_options = []

@staticmethod
def status(s):
"""Prints things in bold."""
print('\033[1m{0}\033[0m'.format(s))

def initialize_options(self):
pass

def finalize_options(self):
pass

def run(self):
try:
self.status('Removing previous builds…')
rmtree(os.path.join(here, 'dist'))
except OSError:
pass

self.status('Building Source and Wheel (universal) distribution…')
os.system('{0} setup.py sdist bdist_wheel --universal'.format(sys.executable))

self.status('Uploading the package to PyPI via Twine…')
os.system('twine upload dist/*')

self.status('Pushing git tags…')
os.system('git tag v{0}'.format(about['__version__']))
os.system('git push --tags')

sys.exit()


# Where the magic happens:
setup(
name=NAME,
version=about['__version__'],
description=DESCRIPTION,
long_description=long_description,
long_description_content_type='text/markdown',
author=AUTHOR,
author_email=EMAIL,
python_requires=REQUIRES_PYTHON,
url=URL,
packages=find_packages(exclude=('tests',)),
# If your package is a single module, use this instead of 'packages':
# py_modules=['mypackage'],

# entry_points={
# 'console_scripts': ['mycli=mymodule:cli'],
# },
install_requires=REQUIRED,
extras_require=EXTRAS,
include_package_data=True,
license='MIT',
classifiers=[
# Trove classifiers
# Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy'
],
# $ setup.py publish support.
cmdclass={
'upload': UploadCommand,
},
)

Sync From: https://github.com/TheBigFish/blog/issues/6

python 多线程编程

Posted on 2018-11-15
Words count in article: 554 | Reading time ≈ 2

python 多线程编程

使用回调方式

1
2
3
4
5
6
7
8
9
10
11
import time
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()

使用继承方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from threading import Thread

class CountdownTask:
def __init__(self):
self._running = True

def terminate(self):
self._running = False

def run(self, n):
while self._running and n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)

c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination
t.join() # Wait for actual termination (if needed)

注意使用变量 self._running 退出线程的方式

使用 Queue 进行线程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import Queue
import threading
import time

task_queue = Queue.Queue()


class ThreadTest(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
msg = self.queue.get()
print(msg)
time.sleep(0.1)
self.queue.task_done()


def main():
start = time.time()
# populate queue with data
for i in range(100):
task_queue.put("message")

# spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadTest(task_queue)
t.setDaemon(True)
t.start()

# wait on the queue until everything has been processed
task_queue.join()
print "Elapsed Time: {}".format(time.time() - start)


if __name__ == "__main__":
main()

setDaemon 设置为 True, run 函数中不需要退出,主线程结束后所有子线程退出
如果 setDaemon 设置为 False, 则改为

1
2
3
4
5
6
def run(self):
while not self.queue.empty():
msg = self.queue.get()
print(msg)
time.sleep(0.1)
self.queue.task_done()

并且在主函数结束前 join 所有线程

注意

  • 向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from queue import Queue
    from threading import Thread
    import copy

    # A thread that produces data
    def producer(out_q):
    while True:
    # Produce some data
    ...
    out_q.put(copy.deepcopy(data))

    # A thread that consumes data
    def consumer(in_q):
    while True:
    # Get some data
    data = in_q.get()
    # Process the data
    ...
  • q.qsize() , q.full() , q.empty() 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用 empty() 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。

参考

  • python3-cookbook Chapter 12 ‘Concurrency-Starting and Stopping Threads’

  • Practical threaded programming with Python


Sync From: https://github.com/TheBigFish/blog/issues/5

thread local in python

Posted on 2018-11-13
Words count in article: 457 | Reading time ≈ 2
  • thread local in python
    • 线程局部变量
    • 主线程也有自己的线程局部变量
    • 继承 threading.local
    • 应用实例

thread local in python

参考 Thread Locals in Python: Mostly easy

线程局部变量

1
2
3
4
5
6
7
8
9
10
11
12
import threading

mydata = threading.local()
mydata.x = 'hello'

class Worker(threading.Thread):
def run(self):
mydata.x = self.name
print mydata.x

w1, w2 = Worker(), Worker()
w1.start(); w2.start(); w1.join(); w1.join()
Thread-1
Thread-2

各线程独享自己的变量,但是使用全局变量 mydata

主线程也有自己的线程局部变量

1
2
3
4
5
6
7
8
9
10
11
import threading

mydata = threading.local()
mydata.x = {}

class Worker(threading.Thread):
def run(self):
mydata.x['message'] = self.name
print mydata.x['message']
w1, w2 = Worker(), Worker()
w1.start(); w2.start(); w1.join(); w2.join()
Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "E:/learn/python/test/thread_local.py", line 15, in run
    mydata.x['message'] = self.name
AttributeError: 'thread._local' object has no attribute 'x'

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "E:/learn/python/test/thread_local.py", line 15, in run
    mydata.x['message'] = self.name
AttributeError: 'thread._local' object has no attribute 'x'

线程 w1,w2 没有 x 属性,子线程与主线程拥有各自的变量

继承 threading.local

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

class MyData(threading.local):
def __init__(self):
self.x = {}

mydata = MyData()

class Worker(threading.Thread):
def run(self):
mydata.x['message'] = self.name
print mydata.x['message']

w1, w2 = Worker(), Worker()
w1.start(); w2.start(); w1.join(); w2.join()
Thread-1
Thread-2

应用实例

bottle 0.4.10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Request(threading.local):
""" Represents a single request using thread-local namespace. """

def bind(self, environ):
""" Binds the enviroment of the current request to this request handler """
self._environ = environ
self._GET = None
self._POST = None
self._GETPOST = None
self._COOKIES = None
self.path = self._environ.get('PATH_INFO', '/').strip()
if not self.path.startswith('/'):
self.path = '/' + self.path

#----------------------
request = Request()
#----------------------


def WSGIHandler(environ, start_response):
"""The bottle WSGI-handler."""
global request
global response
request.bind(environ)
response.bind()
try:
handler, args = match_url(request.path, request.method)
if not handler:
raise HTTPError(404, "Not found")
output = handler(**args)
except BreakTheBottle, shard:
output = shard.output

Sync From: https://github.com/TheBigFish/blog/issues/4

frp 配置 http、websocket、ssh 转发

Posted on 2018-11-07
Words count in article: 181 | Reading time ≈ 1

frp 配置 http、websocket、ssh 转发

参考 frp#75

http 不使用域名转发

frps.ini

1
2
[common]
bind_port = 7000

frpc.ini

1
2
3
4
5
6
7
8
9
[common]
server_addr = aaa.bbb.ccc.ddd
server_port = 7000

[tcp_port]
type = tcp
local_ip = 127.0.0.1
local_port = 2333
remote_port = 3333

在外网通过 http://aaa.bbb.ccc.ddd:3333 访问到内网机器里的 http://127.0.0.1:2333 了

ssh 转发

frpc.ini

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[common]
server_addr = aaa.bbb.ccc.ddd
server_port = 7000

[ssh]
type = tcp
local_ip = 192.168.0.1
local_port = 22
remote_port = 7022

[tcp_port]
type = tcp
local_ip = 192.168.0.1
local_port = 8888
remote_port = 8888

在外网 ssh 通过 ssh -oPort=7022 user@aaa.bbb.ccc.ddd 访问内网机器

在外网 http 通过 http://aaa.bbb.ccc.ddd:8888 访问到内网机器里的 http://127.0.0.1:8888 了

通过 ws://aaa.bbb.ccc.ddd:8888 访问 websocket

运行服务

nohup ./frps -c ./frps.ini &


Sync From: https://github.com/TheBigFish/blog/issues/3

python json 序列化及反序列化

Posted on 2018-11-01
Words count in article: 710 | Reading time ≈ 4

python json 序列化及反序列化

  • python json 序列化及反序列化
    • 使用namedtuple
    • 使用object_hook
    • 获取对象属性
    • 获取对象的嵌套属性

使用namedtuple

反序列化为 namedtuple

1
2
3
4
5
6
7
8
import json
from collections import namedtuple

data = '{"name": "John Smith", "hometown": {"name": "New York", "id": 123}}'

# Parse JSON into an object with attributes corresponding to dict keys.
x = json.loads(data, object_hook=lambda d: namedtuple('X', d.keys())(*d.values()))
print x.name, x.hometown.name, x.hometown.id

序列化为 json

1
json.dumps(x._asdict())

输出

1
{"hometown": ["New York", 123], "name": "John Smith"}

封装:

1
2
3
4
def _json_object_hook(d): return namedtuple('X', d.keys())(*d.values())
def json2obj(data): return json.loads(data, object_hook=_json_object_hook)

x = json2obj(data)

总结:

序列化及反序列化都比较方便,但是 namedtuple 不能进行复制,不能修改

使用object_hook

反序列化为对象

1
2
3
4
5
6
7
8
9
10
11
class JSONObject:
def __init__(self, d):
self.__dict__ = d

data = '{"name": "John Smith", "hometown": {"name": "New York", "id": 123}}'

a = json.loads(data,
object_hook=JSONObject)

a.name = "changed"
print a.name

获取对象属性

  • 使用 getattr
1
2
3
4
print getattr(a.hometown, 'id', 321)
# 123
print getattr(a.hometown, 'id1', 321)
# 321
  • 使用 try
1
2
3
4
try:
print a.hometown.id2
except AttributeError as ex:
print ex
  • 使用 get
1
x = data.get('first', {}).get('second', {}).get('third', None)

获取对象的嵌套属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def multi_getattr(obj, attr, default = None):
"""
Get a named attribute from an object; multi_getattr(x, 'a.b.c.d') is
equivalent to x.a.b.c.d. When a default argument is given, it is
returned when any attribute in the chain doesn't exist; without
it, an exception is raised when a missing attribute is encountered.

"""
attributes = attr.split(".")
for i in attributes:
try:
obj = getattr(obj, i)
except AttributeError:
if default:
return default
else:
raise
return obj

print multi_getattr(a, "hometown.name")
# New York
print multi_getattr(a, "hometown.name1", "abc")
# abc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# coding=utf-8
from __future__ import unicode_literals
import collections
import operator

_default_stub = object()


def deep_get(obj, path, default=_default_stub, separator='.'):
"""Gets arbitrarily nested attribute or item value.

Args:
obj: Object to search in.
path (str, hashable, iterable of hashables): Arbitrarily nested path in obj hierarchy.
default: Default value. When provided it is returned if the path doesn't exist.
Otherwise the call raises a LookupError.
separator: String to split path by.

Returns:
Value at path.

Raises:
LookupError: If object at path doesn't exist.

Examples:
>>> deep_get({'a': 1}, 'a')
1

>>> deep_get({'a': 1}, 'b')
Traceback (most recent call last):
...
LookupError: {u'a': 1} has no element at 'b'

>>> deep_get(['a', 'b', 'c'], -1)
u'c'

>>> deep_get({'a': [{'b': [1, 2, 3]}, 'some string']}, 'a.0.b')
[1, 2, 3]

>>> class A(object):
... def __init__(self):
... self.x = self
... self.y = {'a': 10}
...
>>> deep_get(A(), 'x.x.x.x.x.x.y.a')
10

>>> deep_get({'a.b': {'c': 1}}, 'a.b.c')
Traceback (most recent call last):
...
LookupError: {u'a.b': {u'c': 1}} has no element at 'a'

>>> deep_get({'a.b': {'Привет': 1}}, ['a.b', 'Привет'])
1

>>> deep_get({'a.b': {'Привет': 1}}, 'a.b/Привет', separator='/')
1

"""
if isinstance(path, basestring):
attributes = path.split(separator)
elif isinstance(path, collections.Iterable):
attributes = path
else:
attributes = [path]

LOOKUPS = [getattr, operator.getitem, lambda obj, i: obj[int(i)]]
try:
for i in attributes:
for lookup in LOOKUPS:
try:
obj = lookup(obj, i)
break
except (TypeError, AttributeError, IndexError, KeyError,
UnicodeEncodeError, ValueError):
pass
else:
msg = "{obj} has no element at '{i}'".format(obj=obj, i=i)
raise LookupError(msg.encode('utf8'))
except Exception:
if _default_stub != default:
return default
raise
return obj

Sync From: https://github.com/TheBigFish/blog/issues/2

tornado-异步上下文管理(StackContext)

Posted on 2018-10-17
Words count in article: 918 | Reading time ≈ 4

tornado - 异步上下文管理(StackContext)

初步使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# -*- coding: utf-8 -*-
import tornado.ioloop
import tornado.stack_context

ioloop = tornado.ioloop.IOLoop.instance()

times = 0

def callback():
print 'run callback'
raise ValueError('except in callback')


def async_task():
global times
times += 1
print 'run async task {}'.format(times)
ioloop.add_callback(callback=callback)


def main():
try:
async_task()
except Exception as e:
print 'main exception {}'.format(e)
print 'end'

main()
ioloop.start()

异常没有在 main 中捕获:

run async task 1
end
run callback
ERROR:root:Exception in callback <function null_wrapper at 0x7f23ec300488>
Traceback (most recent call last):
  File "~/learn/tornado/tornado/ioloop.py", line 370, in _run_callback

包裹上下文

使用 partial 生成新的函数, 最终调用的函数为 wrapper(callback),在 wrapper 中捕获异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding: utf-8 -*-
import tornado.ioloop
import tornado.stack_context
import functools

ioloop = tornado.ioloop.IOLoop.instance()

times = 0

def callback():
print 'run callback'
raise ValueError('except in callback')

def wrapper(func):
try:
func()
except Exception as e:
print 'main exception {}'.format(e)

def async_task():
global times
times += 1
print 'run async task {}'.format(times)
# 使用 partial 生成新的函数
# 最终 ioloop 调用的函数为 wrapper(callback)
ioloop.add_callback(callback=functools.partial(wrapper, callback))

def main():
try:
async_task()
except Exception as e:
print 'main exception {}'.format(e)
print 'end'

main()
ioloop.start()

异常被正确捕获:

run async task 1
end
run callback
main exception except in callback

使用 tornado stack_context 例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: utf-8 -*-
import tornado.ioloop
import tornado.stack_context
import contextlib

ioloop = tornado.ioloop.IOLoop.instance()

times = 0

def callback():
print 'Run callback'
# 抛出的异常在 contextor 中被捕获
raise ValueError('except in callback')

def async_task():
global times
times += 1
print 'run async task {}'.format(times)
# add_callback, 会用之前保存的 (StackContext, contextor),创建一个对象 StackContext(contextor)
# ioloop 回调的时候使用
# with StackContext(contextor)
# callback
# 从而 callback 函数也在 contextor 函数中执行,从而能够在 contextor 中捕获异常
# 从而实现 async_task() 函数在 contextor 中执行,其引发的异常(其实是 callback)同时在 contextor 被捕获
ioloop.add_callback(callback=callback)

@contextlib.contextmanager
def contextor():
print 'Enter contextor'
try:
yield
except Exception as e:
print 'Handler except'
print 'exception {}'.format(e)
finally:
print 'Release'

def main():
#使用StackContext包裹住contextor, 下面函数 async_task() 会在 contextor() 环境中执行
stack_context = tornado.stack_context.StackContext(contextor)
with stack_context:
async_task()
print 'End'


main()
ioloop.start()

tornado.stack_context.StackContext

tornado.stack_context 相当于一个上下文包裹器,它接收一个 context_factory 作为参数并保存
context_factory 是一个上下文类,拥有 __enter__ __exit__方法

使用 with stack_context 时候,执行自己的 __enter__
__enter__ 函数根据保存的 context_factory 创建一个 context 对象,并执行对象的 __enter__方法
StackContext 将 (StackContext, context_factory) 保存,将来执行回调的时候再创建一个 StackContext(context_factory) 来执行 call_back

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class StackContext(object):
def __init__(self, context_factory):
self.context_factory = context_factory

def __enter__(self):
# contexts 入栈
self.old_contexts = _state.contexts
# _state.contexts is a tuple of (class, arg) pairs
_state.contexts = (self.old_contexts +
((StackContext, self.context_factory),))
try:
self.context = self.context_factory()
# 进入 context 对象的执行环境
self.context.__enter__()
except Exception:
_state.contexts = self.old_contexts
raise

def __exit__(self, type, value, traceback):
try:
return self.context.__exit__(type, value, traceback)
finally:
# contexts 出栈
_state.contexts = self.old_contexts

IOLoop.add_callback

1
2
3
4
def add_callback(self, callback):
if not self._callbacks:
self._wake()
self._callbacks.append(stack_context.wrap(callback))

IOLoop.start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def start(self):
if self._stopped:
self._stopped = False
return
self._running = True
while True:
# Never use an infinite timeout here - it can stall epoll
poll_timeout = 0.2

callbacks = self._callbacks
self._callbacks = []
for callback in callbacks:
# 调用注册的 callback
self._run_callback(callback)

IOLoop._run_callback

1
2
3
4
5
6
7
def _run_callback(self, callback):
try:
callback()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handle_callback_exception(callback)

stack_context.wrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def wrap(fn):

if fn is None or fn.__class__ is _StackContextWrapper:
return fn
# functools.wraps doesn't appear to work on functools.partial objects
#@functools.wraps(fn)
def wrapped(callback, contexts, *args, **kwargs):
if contexts is _state.contexts or not contexts:
callback(*args, **kwargs)
return

# 包裹callback, 生成 StackContext(context_factory()) 对象
if not _state.contexts:
new_contexts = [cls(arg) for (cls, arg) in contexts]

elif (len(_state.contexts) > len(contexts) or
any(a[1] is not b[1]
for a, b in itertools.izip(_state.contexts, contexts))):
# contexts have been removed or changed, so start over
new_contexts = ([NullContext()] +
[cls(arg) for (cls,arg) in contexts])
else:
new_contexts = [cls(arg)
for (cls, arg) in contexts[len(_state.contexts):]]
if len(new_contexts) > 1:
with _nested(*new_contexts):
callback(*args, **kwargs)
elif new_contexts:
# 执行 StackContext,调用 fn
with new_contexts[0]:
callback(*args, **kwargs)
else:
callback(*args, **kwargs)
# 返回偏函数,绑定 fn, _state.contexts
return _StackContextWrapper(wrapped, fn, _state.contexts)
1
2
class _StackContextWrapper(functools.partial):
pass

Sync From: https://github.com/TheBigFish/blog/issues/1

<i class="fa fa-angle-left"></i>12

bigfish

一些简单的想法

18 posts
5 tags
RSS
© 2020 bigfish
Powered by Hexo
|
Theme — NexT.Pisces v5.1.3
访客数 人 总访问量 次