Flask源码剖析(8): 信号

Posted by xiezg247 on July 2, 2017

一. 含义

什么是信号?信号通过发送发生在核心框架的其它地方或 Flask 扩展的动作时的通知来帮助你解耦应用。简而言之,信号允许特定的发送端通知订阅者发生了什么。 来自于Flask文档:信号

二. 信号

Flask的关于信号的逻辑主要是放在signals.py文件中,先看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- coding: utf-8 -*-
signals_available = False
try:
    from blinker import Namespace
    signals_available = True
except ImportError:
    class Namespace(object):
        def signal(self, name, doc=None):
            return _FakeSignal(name, doc)

    class _FakeSignal(object):

        def __init__(self, name, doc=None):
            self.name = name
            self.__doc__ = doc
        def _fail(self, *args, **kwargs):
            raise RuntimeError('signalling support is unavailable '
                               'because the blinker library is '
                               'not installed.')
        send = lambda *a, **kw: None
        connect = disconnect = has_receivers_for = receivers_for = \
            temporarily_connected_to = connected_to = _fail
        del _fail

_signals = Namespace()


template_rendered = _signals.signal('template-rendered')
before_render_template = _signals.signal('before-render-template')
request_started = _signals.signal('request-started')
request_finished = _signals.signal('request-finished')
request_tearing_down = _signals.signal('request-tearing-down')
got_request_exception = _signals.signal('got-request-exception')
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')
appcontext_pushed = _signals.signal('appcontext-pushed')
appcontext_popped = _signals.signal('appcontext-popped')
message_flashed = _signals.signal('message-flashed')

很明显,Flask的信号依赖于blinker库,如果没有安装blinker库,则创建一个不会报错的虚假信号类。 Flask提供了一系列的核心信号,template_rendered/before_render_template/request_started/request_finished/request_tearing_down/got_request_exception/appcontext_pushed/appcontext_popped/message_flashed, 当程序一启动时就创建了上面的信号。

三. 信号调用

来看一个简单的程序,从中研究Flask是怎么创建/发送/订阅信号的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- coding: utf-8 -*-

from flask import Flask, request_started
app = Flask(__name__)

def log_request(sender, **extra):
    print 'request start test'
    sender.logger.debug('Request context is set up')
    flask.request_finished

@app.route('/')
def hello_world():
    request_started.connect(log_request, app)
    return 'Hello World!'

if __name__ == '__main__':
    app.run()

执行以下代码,获取到信号返回:

1
curl http://127.0.0.1:5000/

输出

1
2
3
request start test
127.0.0.1 - - [17/Aug/2017 00:08:54] "GET / HTTP/1.1" 200 -

还是从wsgi_app()入口开始,在full_dispatch_request()函数中有这么一段代码,如下:

1
request_started.send(self)

在finalize_request()函数中有一段代码,如下:

1
request_finished.send(self, response=response)

可知,Flask在一个请求进来时,就会发送一个请求开始的信号,在结束请求的操作时,则会发送一个请求结束的信号。 回归到小程序上,程序里是这样订阅信号的

1
request_started.connect(log_request, app)

四. 实现

从上文可以知道,Flask的信号机制实现依赖与blinker库,下面来粗略看看这个库是怎么运作的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
xiezhigang@ flask_site_packages 16$ tree -I "*.pyc" blinker
blinker
├── base.py
├── __init__.py
├── _saferef.py
└── _utilities.py

0 directories, 4 files
xiezhigang@ flask_site_packages 17$ cloc blinker
       9 text files.
       9 unique files.                              
       1 file ignored.

http://cloc.sourceforge.net v 1.60  T=0.03 s (262.7 files/s, 38250.6 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Python                           4            179            283            412
XML                              4              0              0            291
-------------------------------------------------------------------------------
SUM:                             8            179            283            703
-------------------------------------------------------------------------------

blinker库总共4个文件,有效代码400多行,是一个很小的库。下面看看blinker的NameSpace类是怎么定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class NamedSignal(Signal):
    """A named generic notification emitter."""

    def __init__(self, name, doc=None):
        Signal.__init__(self, doc)

        #: The name of this signal.
        self.name = name

    def __repr__(self):
        base = Signal.__repr__(self)
        return "%s; %r>" % (base[:-1], self.name)
        
class Namespace(dict):
    """A mapping of signal names to signals."""

    def signal(self, name, doc=None):
        """Return the :class:`NamedSignal` *name*, creating it if required.

        Repeated calls to this function will return the same signal object.

        """
        try:
            return self[name]
        except KeyError:
            return self.setdefault(name, NamedSignal(name, doc))

Namespace()这个类是继续了字典类(dict),如果存在该name,则返回该name对应的key值,key值是一个Signal类实例化对象。 下面看看Signal类定义了哪些方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
ANY = symbol('ANY')
ANY.__doc__ = 'Token for "any sender".'
ANY_ID = 0


class Signal(object):
    ANY = ANY

    @lazy_property
    def receiver_connected(self):
        return Signal(doc="Emitted after a receiver connects.")

    @lazy_property
    def receiver_disconnected(self):
        return Signal(doc="Emitted after a receiver disconnects.")

    def __init__(self, doc=None):
        if doc:
            self.__doc__ = doc
        self.receivers = {}
        self._by_receiver = defaultdict(set)
        self._by_sender = defaultdict(set)
        self._weak_senders = {}

    def connect(self, receiver, sender=ANY, weak=True):
        receiver_id = hashable_identity(receiver)
        if weak:
            receiver_ref = reference(receiver, self._cleanup_receiver)
            receiver_ref.receiver_id = receiver_id
        else:
            receiver_ref = receiver
        if sender is ANY:
            sender_id = ANY_ID
        else:
            sender_id = hashable_identity(sender)

        self.receivers.setdefault(receiver_id, receiver_ref)
        self._by_sender[sender_id].add(receiver_id)
        self._by_receiver[receiver_id].add(sender_id)
        del receiver_ref

        if sender is not ANY and sender_id not in self._weak_senders:
            # wire together a cleanup for weakref-able senders
            try:
                sender_ref = reference(sender, self._cleanup_sender)
                sender_ref.sender_id = sender_id
            except TypeError:
                pass
            else:
                self._weak_senders.setdefault(sender_id, sender_ref)
                del sender_ref

        if ('receiver_connected' in self.__dict__ and
            self.receiver_connected.receivers):
            try:
                self.receiver_connected.send(self,
                                             receiver=receiver,
                                             sender=sender,
                                             weak=weak)
            except:
                self.disconnect(receiver, sender)
                raise
        if receiver_connected.receivers and self is not receiver_connected:
            try:
                receiver_connected.send(self,
                                        receiver_arg=receiver,
                                        sender_arg=sender,
                                        weak_arg=weak)
            except:
                self.disconnect(receiver, sender)
                raise
        return receiver

    def connect_via(self, sender, weak=False):
        def decorator(fn):
            self.connect(fn, sender, weak)
            return fn
        return decorator

    @contextmanager
    def connected_to(self, receiver, sender=ANY):
        self.connect(receiver, sender=sender, weak=False)
        try:
            yield None
        except:
            self.disconnect(receiver)
            raise
        else:
            self.disconnect(receiver)

    def temporarily_connected_to(self, receiver, sender=ANY):
        warn("temporarily_connected_to is deprecated; "
             "use connected_to instead.",
             DeprecationWarning)
        return self.connected_to(receiver, sender)

    def send(self, *sender, **kwargs):
        if len(sender) == 0:
            sender = None
        elif len(sender) > 1:
            raise TypeError('send() accepts only one positional argument, '
                            '%s given' % len(sender))
        else:
            sender = sender[0]
        if not self.receivers:
            return []
        else:
            return [(receiver, receiver(sender, **kwargs))
                    for receiver in self.receivers_for(sender)]

    def has_receivers_for(self, sender):
        if not self.receivers:
            return False
        if self._by_sender[ANY_ID]:
            return True
        if sender is ANY:
            return False
        return hashable_identity(sender) in self._by_sender

    def receivers_for(self, sender):
        if self.receivers:
            sender_id = hashable_identity(sender)
            if sender_id in self._by_sender:
                ids = (self._by_sender[ANY_ID] |
                       self._by_sender[sender_id])
            else:
                ids = self._by_sender[ANY_ID].copy()
            for receiver_id in ids:
                receiver = self.receivers.get(receiver_id)
                if receiver is None:
                    continue
                if isinstance(receiver, WeakTypes):
                    strong = receiver()
                    if strong is None:
                        self._disconnect(receiver_id, ANY_ID)
                        continue
                    receiver = strong
                yield receiver

    def disconnect(self, receiver, sender=ANY):
        if sender is ANY:
            sender_id = ANY_ID
        else:
            sender_id = hashable_identity(sender)
        receiver_id = hashable_identity(receiver)
        self._disconnect(receiver_id, sender_id)

        if ('receiver_disconnected' in self.__dict__ and
            self.receiver_disconnected.receivers):
            self.receiver_disconnected.send(self,
                                            receiver=receiver,
                                            sender=sender)

    def _disconnect(self, receiver_id, sender_id):
        if sender_id == ANY_ID:
            if self._by_receiver.pop(receiver_id, False):
                for bucket in self._by_sender.values():
                    bucket.discard(receiver_id)
            self.receivers.pop(receiver_id, None)
        else:
            self._by_sender[sender_id].discard(receiver_id)
            self._by_receiver[receiver_id].discard(sender_id)

    def _cleanup_receiver(self, receiver_ref):
        self._disconnect(receiver_ref.receiver_id, ANY_ID)

    def _cleanup_sender(self, sender_ref):
        sender_id = sender_ref.sender_id
        assert sender_id != ANY_ID
        self._weak_senders.pop(sender_id, None)
        for receiver_id in self._by_sender.pop(sender_id, ()):
            self._by_receiver[receiver_id].discard(sender_id)

    def _cleanup_bookkeeping(self):
        for mapping in (self._by_sender, self._by_receiver):
            for _id, bucket in list(mapping.items()):
                if not bucket:
                    mapping.pop(_id, None)

    def _clear_state(self):
        self._weak_senders.clear()
        self.receivers.clear()
        self._by_sender.clear()
        self._by_receiver.clear()

Signal类实现了一个信号发射器功能,包含创建/发送/订阅/断开/清除等方法。