root/xcap/uri.py

Revision 451, 16.9 kB (checked in by Denis Bilenko <denis@ag-projects.com>, 5 weeks ago)

bug fixed: namespace bindings in the request weren't parsed correctly and resulted in 400

Line 
1"""XCAP URI module
2
3http://tools.ietf.org/html/rfc4825#section-6
4
5The algorithm to decode the URI is as following:
6
7 * First, percent-decode the whole URI (urllib.unquote)
8 * Split document selector from node selector (str.split('~~'))
9 * Then use xpath_tokenizer from lxml to parse the whole node selector
10   and extract individual steps
11
12Although after doing percent-decoding first, we cannot use s.split('/'),
13using lexer from lxml alleviates that fact a bit and produces good results.
14
15A potential problem can arise with URIs that contain [percent-encoded] double quotes.
16Here's an example:
17
18/resource-lists/list[@name="friends"]/external[@anchor="/list[@name=%22mkting%22]"]
19
20which would be converted to
21
22/resource-lists/list[@name="friends"]/external[@anchor="/list[@name="mkting"]"]
23
24and that would confuse the parser.
25
26I'm not sure if it's legal to have such URIs, but if it is this module has to be fixed.
27Meanwhile, the safe approach is to use &quot;
28
29/resource-lists/list[@name="friends"]/external[@anchor="/list[@name=&quot;mkting&quot;]"]
30
31"""
32
33import re
34from urllib import unquote
35
36from copy import copy
37from xml.sax.saxutils import quoteattr
38from lxml import _elementpath as ElementPath
39
40from application import log
41
42
43XPATH_DEFAULT_PREFIX = 'default' # should be more random
44
45class Error(ValueError):
46    "Base class for all errors in this module"
47
48class NodeParsingError(Error):
49    http_error = 400
50
51class DocumentSelectorError(Error):
52    http_error = 404
53
54
55class XCAPUser(object):
56    """XCAP ID."""
57
58    def __init__(self, username, domain):
59        self.username = username
60        self.domain = domain
61
62    @property
63    def uri(self):
64        return 'sip:%s@%s' % (self.username, self.domain)
65
66    def __eq__(self, other):
67        return isinstance(other, XCAPUser) and self.uri == other.uri
68
69    def __ne__(self, other):
70        return not self.__eq__(other)
71
72    def __nonzero__(self):
73        return bool(self.username) and bool(self.domain)
74
75    def __str__(self):
76        return "%s@%s" % (self.username, self.domain)
77
78    def __repr__(self):
79        return 'XCAPUser(%r, %r)' % (self.username, self.domain)
80
81    @classmethod
82    def parse(cls, user_id, default_domain=None):
83        if user_id.startswith("sip:"):
84            user_id = user_id[4:]
85        _split = user_id.split('@', 1)
86        username = _split[0]
87        if len(_split) == 2:
88            domain = _split[1]
89        else:
90            domain = default_domain
91        return cls(username, domain)
92
93# XXX currently equivalent but differently encoded URIs won't be considered equal.
94def unquote_attr_value(s):
95    if len(s)>1 and s[0]==s[-1] and s[0] in '"\'':
96        # what about &quot; and friends?
97        return s[1:-1]
98    raise NodeParsingError
99
100def xpath_tokenizer(p):
101    """
102    >>> xpath_tokenizer('resource-lists')
103    ['resource-lists']
104
105    >>> xpath_tokenizer('list[@name="friends"]')
106    ['list', '[', '@', 'name', '=', 'friends', ']']
107
108    We cannot properly tokenize an URI like this :(
109    >>> uri_ugly = 'external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list[@name="mkting"]"]'
110    >>> len(xpath_tokenizer(uri_ugly)) # expected 7
111    10
112
113    To feed such URI to this function, replace quote \" with &quot;
114    >>> uri_nice = 'external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list[@name=&quot;mkting&quot;]"]'
115    >>> len(xpath_tokenizer(uri_nice)) # expected 7
116    7
117    """
118    out = []
119    prev = None
120    for op, tag in ElementPath.xpath_tokenizer(p):
121        if prev == '=':
122            unq = unquote_attr_value
123        else:
124            unq = lambda x:x
125        if op:
126            x = Op(unq(op))
127        else:
128            x = Tag(unq(tag))
129        out.append(x)
130        prev = x
131    return out
132
133class Op(str):
134    tag = False
135
136class Tag(str):
137    tag = True
138
139
140class TerminalSelector(object):
141    pass
142
143
144class AttributeSelector(TerminalSelector):
145
146    def __init__(self, attribute):
147        self.attribute = attribute
148
149    def __str__(self):
150        return '@' + self.attribute
151
152    def __repr__(self):
153        return 'AttributeSelector(%r)' % self.attribute
154
155
156class NamespaceSelector(TerminalSelector):
157
158    def __str__(self):
159        return "namespace::*"
160   
161    def __repr__(self):
162        return 'NamespaceSelector()'
163
164
165class Str(str):
166    def __repr__(self):
167        return '%s(%s)' % (self.__class__.__name__, str.__repr__(self))
168
169def parse_qname(qname, defnamespace, namespaces):
170    if qname == '*':
171        return qname
172    try:
173        prefix, name = qname.split(':')
174    except ValueError:
175        return (defnamespace, qname)
176    else:
177        return (namespaces[prefix], name)
178
179
180class Step(object):
181
182    def __init__(self, name, position=None, att_name=None, att_value=None):
183        self.name = name
184        self.position = position
185        self.att_name = att_name
186        self.att_value = att_value
187
188    def __repr__(self):
189        args = [self.name, self.position, self.att_name, self.att_value]
190        while args and args[-1] is None:
191            del args[-1]
192        args = [repr(x) for x in args]
193        return 'Step(%s)' % ', '.join(args)
194
195
196def step2str(step, namespace2prefix = {}):
197    try:
198        namespace, name = step.name
199    except ValueError:
200        res = step.name
201    else:
202        prefix = namespace2prefix[namespace]
203        if prefix:
204            res = prefix + ':' + name
205        else:
206            res = name
207
208    if step.position is not None:
209        res += '[%s]' % step.position
210
211    if step.att_name is not None:
212        namespace, name = step.att_name
213        if namespace:
214            prefix = namespace2prefix[namespace]
215        else:
216            prefix = None
217        if prefix:
218            res += '[@%s:%s=%s]' % (prefix, name, quoteattr(step.att_value))
219        else:
220            res += '[@%s=%s]' % (name, quoteattr(step.att_value))
221    return res
222
223
224def read_element_tag(lst, index, namespace, namespaces):
225    if index==len(lst):
226        raise NodeParsingError
227    elif lst[index] == '*':
228        return '*', index+1
229    elif get(lst, index+1)==':':
230        if not lst[index].tag:
231            raise NodeParsingError
232        if not get(lst, index+2) or not get(lst, index+2).tag:
233            raise NodeParsingError
234        return (namespaces[lst[index]], lst[index+2]), index+3
235    else:
236        return (namespace, lst[index]), index+1
237
238def read_position(lst, index):
239    if get(lst, index)=='[' and get(lst, index+2)==']':
240        return int(lst[index+1]), index+3
241    return None, index
242
243# XML attributes don't belong to the same namespace as containing tag?
244# because thats what I get in startElement/attrs.items - (None, 'tag')
245# lxml's xpath works similar way too:
246# doc.xpath('/default:rls-services/defaultg:service[@uri="sip:mybuddies@example.com"]',
247#           namespaces = {'default':"urn:ietf:params:xml:ns:rls-services"})
248# works, while
249# doc.xpath('/default:rls-services/defaultg:service[@default:uri="sip:mybuddies@example.com"]',
250#           namespaces = {'default':"urn:ietf:params:xml:ns:rls-services"})
251# does not
252# that's why _namespace parameter is ignored and None is supplied in that case
253def read_att_test(lst, index, _namespace, namespaces):
254    if get(lst, index)=='[' and get(lst, index+1)=='@' and get(lst, index+3)=='=' and get(lst, index+5)==']':
255        return (None, lst[index+2]), lst[index+4], index+6
256    elif get(lst, index)=='[' and get(lst, index+1)=='@' and get(lst, index+3)==':' \
257         and get(lst, index+5)=='=' and get(lst, index+7)==']':
258        return (namespaces[lst[index+2]], lst[index+4]), lst[index+6], index+8
259    return None, None, index
260
261def get(lst, index, default=None):
262    try:
263        return lst[index]
264    except LookupError:
265        return default
266
267def read_step(lst, index, namespace, namespaces):
268    if get(lst, index)=='@':
269        return AttributeSelector(lst[index+1]), index+2
270    elif get(lst, index)=='namespace' and get(lst, index+1)=='::' and get(lst, index+2)=='*':
271        return NamespaceSelector(), index+3
272    else:
273        tag, index = read_element_tag(lst, index, namespace, namespaces)
274        position, index = read_position(lst, index)
275        att_name, att_value, index = read_att_test(lst, index, namespace, namespaces)
276        return Step(tag, position, att_name, att_value), index
277
278def read_slash(lst, index):
279    if get(lst, index)=='/':
280        return index+1
281    raise NodeParsingError
282
283def read_node_selector(lst, namespace, namespaces):
284    index = 0
285    if get(lst, 0)=='/':
286        index = read_slash(lst, index)
287    steps = []
288    terminal_selector = None
289    while True:
290        step, index = read_step(lst, index, namespace, namespaces)
291        if isinstance(step, TerminalSelector):
292            if index != len(lst):
293                raise NodeParsingError
294            terminal_selector = step
295            break
296        steps.append(step)
297        if index == len(lst):
298            break
299        index = read_slash(lst, index)
300    return ElementSelector(steps, namespace, namespaces), terminal_selector
301
302def parse_node_selector(s, namespace=None, namespaces=None):
303    """
304    >>> parse_node_selector('/resource-lists', None, {})
305    ([Step((None, 'resource-lists'))], None)
306    >>> parse_node_selector('/resource-lists/list[1]/entry[@uri="sip:bob@example.com"]', None, {})
307    ([Step((None, 'resource-lists')), Step((None, 'list'), 1), Step((None, 'entry'), None, (None, 'uri'), 'sip:bob@example.com')], None)
308    >>> parse_node_selector('/*/list[1][@name="friends"]/@name')
309    ([Step('*'), Step((None, 'list'), 1, (None, 'name'), 'friends')], AttributeSelector('name'))
310    >>> parse_node_selector('/*[10][@att="val"]/namespace::*')
311    ([Step('*', 10, (None, 'att'), 'val')], NamespaceSelector())
312    >>> x = parse_node_selector('/resource-lists/list[@name="friends"]/external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list%5b@name=%22mkting%22%5d"]')
313    """
314    if namespaces is None:
315        namespaces = {}
316    try:
317        tokens = xpath_tokenizer(s)
318        element_selector, terminal_selector = read_node_selector(tokens, namespace, namespaces)
319        element_selector._original_string = s
320        return element_selector, terminal_selector
321    except NodeParsingError, ex:
322        ex.args = ('Failed to parse node: %r' % s,)
323        raise
324    except:
325        log.error('internal error in parse_node_selector(%r)' % s)
326        raise
327
328
329class ElementSelector(list):
330
331    def __init__(self, lst, namespace, namespaces):
332        list.__init__(self, lst)
333        self.namespace = namespace
334        self.namespaces = namespaces
335
336    def replace_default_prefix(self, namespace2prefix):
337        "fix string representation so it'll work with lxml xpath"
338        steps = []
339        for step in self:
340            try:
341                namespace, name = step.name
342            except ValueError:
343                steps.append(str(step))
344            else:
345                steps.append(step2str(step, namespace2prefix))
346        return '/' + '/'.join(steps)
347
348    xml_tag = re.compile('\s*<([^ >/]+)')
349
350    def fix_star(self, element_body):
351        """
352        >>> elem_selector = parse_node_selector('/watcherinfo/watcher-list/*[@id="8ajksjda7s"]', None, {})[0]
353        >>> elem_selector.fix_star('<watcher/>')[-1].name[1]
354        'watcher'
355        """
356        if self and self[-1].name == '*' and self[-1].position is None:
357            m = self.xml_tag.match(element_body)
358            if m:
359                (name, ) = m.groups()
360                result = copy(self)
361                result[-1].name = parse_qname(name, self.namespace, self.namespaces)
362                return result
363        return self
364
365
366class NodeSelector(object):
367
368    XMLNS_REGEXP = re.compile("xmlns\((?P<nsdata>.*?)\)")
369   
370    def __init__(self, selector, namespace=None):
371        self._original_string = selector
372        sections = selector.split('?', 1)
373
374        if len(sections) == 2: ## a query component is present
375            self.ns_bindings = self._parse_query(sections[1])
376        else:
377            self.ns_bindings = {}
378
379        self.element_selector, self.terminal_selector = parse_node_selector(sections[0], namespace, self.ns_bindings)
380
381    def __str__(self):
382        return self._original_string
383
384    ## http://www.w3.org/TR/2003/REC-xptr-xmlns-20030325/
385    def _parse_query(self, query):
386        """Return a dictionary of namespace bindings defined by the xmlns() XPointer
387           expressions from the given query."""
388        ns_bindings = {}
389        ns_matches = self.XMLNS_REGEXP.findall(query)
390        for m in ns_matches:
391            try:
392                prefix, ns = m.split('=')
393                ns_bindings[prefix] = ns
394            except ValueError:
395                log.error("Ignoring invalid XPointer XMLNS expression: %r" % m)
396                continue
397        return ns_bindings
398
399    def replace_default_prefix(self, defprefix=XPATH_DEFAULT_PREFIX, append_terminal = True):
400        namespace2prefix = dict((v, k) for (k, v) in self.ns_bindings)
401        namespace2prefix[self.element_selector.namespace] = defprefix
402        res = self.element_selector.replace_default_prefix(namespace2prefix)
403        if append_terminal and self.terminal_selector:
404            res += '/' + str(self.terminal_selector)
405        return res
406
407    def get_ns_bindings(self, default_ns):
408        ns_bindings = self.ns_bindings.copy()
409        ns_bindings[XPATH_DEFAULT_PREFIX] = default_ns
410        return ns_bindings
411
412class DocumentSelector(Str):
413    """Constructs a DocumentSelector containing the application_id, context, user_id
414       and document from the given selector string.
415    >>> x = DocumentSelector('/resource-lists/users/sip:joe@example.com/index')
416    >>> x.application_id, x.context, x.user_id, x.document_path
417    ('resource-lists', 'users', 'sip:joe@example.com', 'index')
418
419    >>> x = DocumentSelector('/rls-services/global/index')
420    >>> x.application_id, x.context, x.user_id, x.document_path
421    ('rls-services', 'global', None, 'index')
422    """
423
424    def __init__(self, selector):
425        if selector[:1]=='/':
426            selector = selector[1:]
427        if selector[-1:]=='/':
428            selector = selector[:-1]
429        if not selector:
430            raise DocumentSelectorError("Document selector does not contain auid")
431        segments  = selector.split('/')
432        if len(segments) < 2:
433            raise DocumentSelectorError("Document selector does not contain context: %r" % selector)
434        self.application_id = segments[0]
435        self.context = segments[1]
436        if self.context not in ("users", "global"):
437            raise DocumentSelectorError("Document selector context must be either 'users' or 'global', not %r: %r" % \
438                                        (self.context, selector))
439        self.user_id = None
440        if self.context == "users":
441            try:
442                self.user_id = segments[2]
443            except IndexError:
444                raise DocumentSelectorError('Document selector does not contain user id: %r' % selector)
445            segments = segments[3:]
446        else:
447            segments = segments[2:]
448        if not segments:
449            raise DocumentSelectorError("Document selector does not contain document's path: %r" % selector)
450        self.document_path = '/'.join(segments)
451
452
453class XCAPUri(object):
454    """An XCAP URI containing the XCAP root, document selector and node selector.
455
456    >>> uri = XCAPUri('https://xcap.sipthor.net/xcap-root@ag-projects.com',
457    ... '/resource-lists/users/sip:denis@umts.ro/properties-resource-list.xml/~~/resource-lists/list%5b@name=%22Default%22%5d/entry%5b@uri=%22sip%3adenis%40umts.ro%22%5d', {})
458
459    >>> uri.user
460    XCAPUser('denis', 'umts.ro')
461
462    >>> uri.node_selector.element_selector
463    [Step((None, 'resource-lists')), Step((None, 'list'), None, (None, 'name'), 'Default'), Step((None, 'entry'), None, (None, 'uri'), 'sip:denis@umts.ro')]
464
465    """
466
467    def __init__(self, xcap_root, resource_selector, namespaces):
468        "namespaces maps application id to default namespace"
469        self.xcap_root = xcap_root
470        self.resource_selector = unquote(resource_selector)
471        realm = None
472
473        # convention to get the realm if it's not contained in the user ID section
474        # of the document selector (bad eyebeam)
475        if self.resource_selector.startswith("@"):
476            first_slash = self.resource_selector.find("/")
477            realm = self.resource_selector[1:first_slash]
478            self.resource_selector = self.resource_selector[first_slash:]
479
480        _split = self.resource_selector.split('~~', 1)
481
482        doc_selector = _split[0]
483        self.doc_selector = DocumentSelector(doc_selector)
484        self.application_id = self.doc_selector.application_id
485        if len(_split)==2:
486            self.node_selector = NodeSelector(_split[1], namespaces.get(self.application_id))
487        else:
488            self.node_selector = None
489        if self.doc_selector.user_id:
490            self.user = XCAPUser.parse(self.doc_selector.user_id, realm)
491        else:
492            self.user = XCAPUser(None, realm)
493
494    def __str__(self):
495        return self.xcap_root + self.resource_selector
496
497if __name__=='__main__':
498    import doctest
499    doctest.testmod()
Note: See TracBrowser for help on using the browser.