| 1 | """Track changes of the documents and notify subscribers |
|---|
| 2 | |
|---|
| 3 | Create a Notifier object: |
|---|
| 4 | |
|---|
| 5 | >>> n = Notifier(publish_xcapdiff) |
|---|
| 6 | |
|---|
| 7 | When a change occurs, call on_change |
|---|
| 8 | |
|---|
| 9 | >>> n.on_change(xcap_uri_updated, old_etag, new_etag) |
|---|
| 10 | |
|---|
| 11 | (old_etag being None means the document was just created, new_etag being |
|---|
| 12 | None means the document was deleted) |
|---|
| 13 | |
|---|
| 14 | Notifier will call publish_xcapdiff with 2 args: user's uri and xcap-diff document. |
|---|
| 15 | Number of calls is limited to no more than 1 call per MIN_WAIT seconds for |
|---|
| 16 | a given user uri. |
|---|
| 17 | |
|---|
| 18 | """ |
|---|
| 19 | |
|---|
| 20 | from time import time |
|---|
| 21 | from functools import wraps |
|---|
| 22 | from twisted.internet import reactor |
|---|
| 23 | |
|---|
| 24 | def xml_xcapdiff(xcap_root, content): |
|---|
| 25 | return """<?xml version="1.0" encoding="UTF-8"?> |
|---|
| 26 | <xcap-diff xmlns="urn:ietf:params:xml:ns:xcap-diff" xcap-root="%s"> |
|---|
| 27 | %s |
|---|
| 28 | </xcap-diff> |
|---|
| 29 | """ % (xcap_root, content) |
|---|
| 30 | |
|---|
| 31 | def xml_document(sel, old_etag, new_etag): |
|---|
| 32 | if old_etag: |
|---|
| 33 | old_etag = ( ' previous-etag="%s"' % old_etag ) |
|---|
| 34 | else: |
|---|
| 35 | old_etag = '' |
|---|
| 36 | if new_etag: |
|---|
| 37 | new_etag = ( ' new-etag="%s"' % new_etag ) |
|---|
| 38 | else: |
|---|
| 39 | new_etag = '' |
|---|
| 40 | return '<document%s sel="%s"%s/>' % (new_etag, sel, old_etag) |
|---|
| 41 | |
|---|
| 42 | |
|---|
| 43 | class UserChanges: |
|---|
| 44 | |
|---|
| 45 | MIN_WAIT = 5 |
|---|
| 46 | |
|---|
| 47 | def __init__(self, publish_xcapdiff): |
|---|
| 48 | self.changes = {} |
|---|
| 49 | self.rate_limit = RateLimit(self.MIN_WAIT) |
|---|
| 50 | self.publish_xcapdiff = publish_xcapdiff |
|---|
| 51 | |
|---|
| 52 | def add_change(self, uri, old_etag, etag, xcap_root): |
|---|
| 53 | self.changes.setdefault(uri, [old_etag, etag])[1] = etag |
|---|
| 54 | self.rate_limit.callAtLimitedRate(self.publish, uri.user.uri, xcap_root) |
|---|
| 55 | |
|---|
| 56 | def publish(self, user_uri, xcap_root): |
|---|
| 57 | if self.changes: |
|---|
| 58 | self.publish_xcapdiff(user_uri, self.unload_changes(xcap_root)) |
|---|
| 59 | |
|---|
| 60 | def unload_changes(self, xcap_root): |
|---|
| 61 | docs = [] |
|---|
| 62 | for uri, (old_etag, etag) in self.changes.iteritems(): |
|---|
| 63 | docs.append(xml_document(uri, old_etag, etag)) |
|---|
| 64 | result = xml_xcapdiff(xcap_root, '\n'.join(docs)) |
|---|
| 65 | self.changes.clear() |
|---|
| 66 | return result |
|---|
| 67 | |
|---|
| 68 | def __nonzero__(self): |
|---|
| 69 | return self.changes.__nonzero__() |
|---|
| 70 | |
|---|
| 71 | |
|---|
| 72 | class Notifier: |
|---|
| 73 | |
|---|
| 74 | def __init__(self, xcap_root, publish_xcapdiff): |
|---|
| 75 | self.publish_xcapdiff = publish_xcapdiff |
|---|
| 76 | self.xcap_root = xcap_root |
|---|
| 77 | |
|---|
| 78 | # maps user_uri to UserChanges |
|---|
| 79 | self.users_changes = {} |
|---|
| 80 | |
|---|
| 81 | def on_change(self, uri, old_etag, new_etag): |
|---|
| 82 | changes = self.users_changes.setdefault(uri.user, UserChanges(self.publish_xcapdiff)) |
|---|
| 83 | changes.add_change(uri, old_etag, new_etag, self.xcap_root) |
|---|
| 84 | |
|---|
| 85 | |
|---|
| 86 | class RateLimit(object): |
|---|
| 87 | |
|---|
| 88 | def __init__(self, min_wait): |
|---|
| 89 | # minimum number of seconds between calls |
|---|
| 90 | self.min_wait = min_wait |
|---|
| 91 | |
|---|
| 92 | # time() of the last call |
|---|
| 93 | self.last_call = 0 |
|---|
| 94 | |
|---|
| 95 | # DelayedCall object of scheduled call |
|---|
| 96 | self.delayed_call = None |
|---|
| 97 | |
|---|
| 98 | def callAtLimitedRate(self, f, *args, **kwargs): |
|---|
| 99 | """Call f(*args, **kw) if it wasn't called in the last self.min_wait seconds. |
|---|
| 100 | If it was, schedule it for later. Don't do anything if it's already scheduled. |
|---|
| 101 | |
|---|
| 102 | >>> rate = RateLimit(1) |
|---|
| 103 | |
|---|
| 104 | >>> def f(a, start = time()): |
|---|
| 105 | ... print "%d %s" % (time()-start, a) |
|---|
| 106 | ... return 'return value is lost!' |
|---|
| 107 | |
|---|
| 108 | >>> rate.callAtLimitedRate(f, 'a') |
|---|
| 109 | 0 a |
|---|
| 110 | >>> rate.callAtLimitedRate(f, 'b') # scheduled for 1 second later |
|---|
| 111 | >>> rate.callAtLimitedRate(f, 'c') # ignored as there's already call in progress |
|---|
| 112 | >>> _ = reactor.callLater(1.5, rate.callAtLimitedRate, f, 'd') |
|---|
| 113 | >>> _ = reactor.callLater(2.1, reactor_stop) |
|---|
| 114 | >>> reactor_run() |
|---|
| 115 | 1 b |
|---|
| 116 | 2 d |
|---|
| 117 | """ |
|---|
| 118 | current = time() |
|---|
| 119 | delta = current - self.last_call |
|---|
| 120 | if not self.delayed_call or \ |
|---|
| 121 | self.delayed_call.called or \ |
|---|
| 122 | self.delayed_call.cancelled: |
|---|
| 123 | @wraps(f) |
|---|
| 124 | def wrapped_f(): |
|---|
| 125 | try: |
|---|
| 126 | return f(*args, **kwargs) |
|---|
| 127 | finally: |
|---|
| 128 | self.last_call = time() |
|---|
| 129 | self.delayed_call = callMaybeLater(self.min_wait - delta, wrapped_f) |
|---|
| 130 | |
|---|
| 131 | |
|---|
| 132 | class RateLimitedFun(RateLimit): |
|---|
| 133 | |
|---|
| 134 | def __init__(self, min_wait, function): |
|---|
| 135 | RateLimit.__init__(self, min_wait) |
|---|
| 136 | self.function = function |
|---|
| 137 | |
|---|
| 138 | def __call__(self, *args, **kwargs): |
|---|
| 139 | return self.callAtLimitedRate(self.function, *args, **kwargs) |
|---|
| 140 | |
|---|
| 141 | |
|---|
| 142 | def limit_rate(min_wait): |
|---|
| 143 | """resulting value for the function will become None |
|---|
| 144 | |
|---|
| 145 | >>> @limit_rate(1) |
|---|
| 146 | ... def f(a, start = time()): |
|---|
| 147 | ... print "%d %s" % (time()-start, a) |
|---|
| 148 | ... return 'return value is lost!' |
|---|
| 149 | >>> f('a') |
|---|
| 150 | 0 a |
|---|
| 151 | >>> f('b') # scheduled for 1 second later |
|---|
| 152 | >>> f('c') # ignored as there's already call in progress |
|---|
| 153 | >>> _ = reactor.callLater(1.5, f, 'd') |
|---|
| 154 | >>> _ = reactor.callLater(2.1, reactor_stop) |
|---|
| 155 | >>> reactor_run() |
|---|
| 156 | 1 b |
|---|
| 157 | 2 d |
|---|
| 158 | """ |
|---|
| 159 | |
|---|
| 160 | rate = RateLimit(min_wait) |
|---|
| 161 | |
|---|
| 162 | def decorate(f): |
|---|
| 163 | @wraps(f) |
|---|
| 164 | def wrapped(*args, **kwargs): |
|---|
| 165 | rate.callAtLimitedRate(f, *args, **kwargs) |
|---|
| 166 | return wrapped |
|---|
| 167 | return decorate |
|---|
| 168 | |
|---|
| 169 | |
|---|
| 170 | def callMaybeLater(seconds, f, *args, **kw): |
|---|
| 171 | "execute f and return None if seconds is zero, callLater otherwise" |
|---|
| 172 | if seconds <= 0: |
|---|
| 173 | f(*args, **kw) |
|---|
| 174 | else: |
|---|
| 175 | return reactor.callLater(seconds, f, *args, **kw) |
|---|
| 176 | |
|---|
| 177 | if __name__=='__main__': |
|---|
| 178 | def reactor_run(first_time = [True]): |
|---|
| 179 | if first_time[0]: |
|---|
| 180 | reactor.run() |
|---|
| 181 | first_time[0] = False |
|---|
| 182 | else: |
|---|
| 183 | reactor.running = True |
|---|
| 184 | reactor.mainLoop() |
|---|
| 185 | |
|---|
| 186 | def reactor_stop(): |
|---|
| 187 | reactor.running = False |
|---|
| 188 | |
|---|
| 189 | import doctest |
|---|
| 190 | doctest.testmod() |
|---|