root/xcap/xcapdiff.py

Revision 411, 5.4 kB (checked in by Denis Bilenko <denis@ag-projects.com>, 5 weeks ago)

removed unused imports

Line 
1"""Track changes of the documents and notify subscribers
2
3Create a Notifier object:
4
5  >>> n = Notifier(publish_xcapdiff)
6
7When a change occurs, call on_change
8
9  >>> n.on_change(xcap_uri_updated, old_etag, new_etag)
10
11(old_etag being None means the document was just created, new_etag being
12None means the document was deleted)
13
14Notifier will call publish_xcapdiff with 2 args: user's uri and xcap-diff document.
15Number of calls is limited to no more than 1 call per MIN_WAIT seconds for
16a given user uri.
17
18"""
19
20from time import time
21from functools import wraps
22from twisted.internet import reactor
23
24def xml_xcapdiff(xcap_root, content):
25    return """<?xml version="1.0" encoding="UTF-8"?>
26<xcap-diff xmlns="urn:ietf:params:xml:ns:xcap-diff" xcap-root="%s">
27%s
28</xcap-diff>
29""" % (xcap_root, content)
30
31def xml_document(sel, old_etag, new_etag):
32    if old_etag:
33        old_etag = ( ' previous-etag="%s"' % old_etag )
34    else:
35        old_etag = ''
36    if new_etag:
37        new_etag = ( ' new-etag="%s"' % new_etag )
38    else:
39        new_etag = ''
40    return '<document%s sel="%s"%s/>' % (new_etag, sel, old_etag)
41
42
43class UserChanges:
44
45    MIN_WAIT = 5
46
47    def __init__(self, publish_xcapdiff):
48        self.changes = {}
49        self.rate_limit = RateLimit(self.MIN_WAIT)
50        self.publish_xcapdiff = publish_xcapdiff
51
52    def add_change(self, uri, old_etag, etag, xcap_root):
53        self.changes.setdefault(uri, [old_etag, etag])[1] = etag
54        self.rate_limit.callAtLimitedRate(self.publish, uri.user.uri, xcap_root)
55
56    def publish(self, user_uri, xcap_root):
57        if self.changes:
58            self.publish_xcapdiff(user_uri, self.unload_changes(xcap_root))
59   
60    def unload_changes(self, xcap_root):
61        docs = []
62        for uri, (old_etag, etag) in self.changes.iteritems():
63            docs.append(xml_document(uri, old_etag, etag))
64        result = xml_xcapdiff(xcap_root, '\n'.join(docs))
65        self.changes.clear()
66        return result
67
68    def __nonzero__(self):
69        return self.changes.__nonzero__()
70
71
72class Notifier:
73
74    def __init__(self, xcap_root, publish_xcapdiff):
75        self.publish_xcapdiff = publish_xcapdiff
76        self.xcap_root = xcap_root
77
78        # maps user_uri to UserChanges
79        self.users_changes = {}
80
81    def on_change(self, uri, old_etag, new_etag):
82        changes = self.users_changes.setdefault(uri.user, UserChanges(self.publish_xcapdiff))
83        changes.add_change(uri, old_etag, new_etag, self.xcap_root)
84
85
86class RateLimit(object):
87
88    def __init__(self, min_wait):
89        # minimum number of seconds between calls
90        self.min_wait = min_wait
91
92        # time() of the last call
93        self.last_call = 0
94
95        # DelayedCall object of scheduled call
96        self.delayed_call = None
97
98    def callAtLimitedRate(self, f, *args, **kwargs):
99        """Call f(*args, **kw) if it wasn't called in the last self.min_wait seconds.
100        If it was, schedule it for later. Don't do anything if it's already scheduled.
101
102        >>> rate = RateLimit(1)
103
104        >>> def f(a, start = time()):
105        ...     print "%d %s" % (time()-start, a)
106        ...     return 'return value is lost!'
107
108        >>> rate.callAtLimitedRate(f, 'a')
109        0 a
110        >>> rate.callAtLimitedRate(f, 'b') # scheduled for 1 second later
111        >>> rate.callAtLimitedRate(f, 'c') # ignored as there's already call in progress
112        >>> _ = reactor.callLater(1.5, rate.callAtLimitedRate, f, 'd')
113        >>> _ = reactor.callLater(2.1, reactor_stop)
114        >>> reactor_run()
115        1 b
116        2 d
117        """
118        current = time()
119        delta = current - self.last_call
120        if not self.delayed_call or \
121               self.delayed_call.called or \
122               self.delayed_call.cancelled:
123            @wraps(f)
124            def wrapped_f():
125                try:
126                    return f(*args, **kwargs)
127                finally:
128                    self.last_call = time()
129            self.delayed_call = callMaybeLater(self.min_wait - delta, wrapped_f)
130
131
132class RateLimitedFun(RateLimit):
133
134    def __init__(self, min_wait, function):
135        RateLimit.__init__(self, min_wait)
136        self.function = function
137
138    def __call__(self, *args, **kwargs):
139        return self.callAtLimitedRate(self.function, *args, **kwargs)
140
141
142def limit_rate(min_wait):
143    """resulting value for the function will become None
144
145    >>> @limit_rate(1)
146    ... def f(a, start = time()):
147    ...     print "%d %s" % (time()-start, a)
148    ...     return 'return value is lost!'
149    >>> f('a')
150    0 a
151    >>> f('b') # scheduled for 1 second later
152    >>> f('c') # ignored as there's already call in progress
153    >>> _ = reactor.callLater(1.5, f, 'd')
154    >>> _ = reactor.callLater(2.1, reactor_stop)
155    >>> reactor_run()
156    1 b
157    2 d
158    """
159
160    rate = RateLimit(min_wait)
161   
162    def decorate(f):
163        @wraps(f)
164        def wrapped(*args, **kwargs):
165            rate.callAtLimitedRate(f, *args, **kwargs)
166        return wrapped
167    return decorate
168
169
170def callMaybeLater(seconds, f, *args, **kw):
171    "execute f and return None if seconds is zero, callLater otherwise"
172    if seconds <= 0:
173        f(*args, **kw)
174    else:
175        return reactor.callLater(seconds, f, *args, **kw)
176
177if __name__=='__main__':
178    def reactor_run(first_time = [True]):
179        if first_time[0]:
180            reactor.run()
181            first_time[0] = False
182        else:
183            reactor.running = True
184            reactor.mainLoop()
185
186    def reactor_stop():
187        reactor.running = False
188   
189    import doctest
190    doctest.testmod()
Note: See TracBrowser for help on using the browser.