| """Miscellaneous utility functions useful for dealing with ESIS streams.""" |
| |
| import re |
| |
| import xml.dom.pulldom |
| |
| import xml.sax |
| import xml.sax.handler |
| import xml.sax.xmlreader |
| |
| |
| _data_match = re.compile(r"[^\\][^\\]*").match |
| |
| def decode(s): |
| r = '' |
| while s: |
| m = _data_match(s) |
| if m: |
| r = r + m.group() |
| s = s[m.end():] |
| elif s[1] == "\\": |
| r = r + "\\" |
| s = s[2:] |
| elif s[1] == "n": |
| r = r + "\n" |
| s = s[2:] |
| elif s[1] == "%": |
| s = s[2:] |
| n, s = s.split(";", 1) |
| r = r + unichr(int(n)) |
| else: |
| raise ValueError, "can't handle " + `s` |
| return r |
| |
| |
| _charmap = {} |
| for c in range(128): |
| _charmap[chr(c)] = chr(c) |
| _charmap[unichr(c + 128)] = chr(c + 128) |
| _charmap["\n"] = r"\n" |
| _charmap["\\"] = r"\\" |
| del c |
| |
| _null_join = ''.join |
| def encode(s): |
| try: |
| return _null_join(map(_charmap.get, s)) |
| except TypeError: |
| raise Exception("could not encode %r: %r" % (s, map(_charmap.get, s))) |
| |
| |
| class ESISReader(xml.sax.xmlreader.XMLReader): |
| """SAX Reader which reads from an ESIS stream. |
| |
| No verification of the document structure is performed by the |
| reader; a general verifier could be used as the target |
| ContentHandler instance. |
| |
| """ |
| _decl_handler = None |
| _lexical_handler = None |
| |
| _public_id = None |
| _system_id = None |
| |
| _buffer = "" |
| _is_empty = 0 |
| _lineno = 0 |
| _started = 0 |
| |
| def __init__(self, contentHandler=None, errorHandler=None): |
| xml.sax.xmlreader.XMLReader.__init__(self) |
| self._attrs = {} |
| self._attributes = Attributes(self._attrs) |
| self._locator = Locator() |
| self._empties = {} |
| if contentHandler: |
| self.setContentHandler(contentHandler) |
| if errorHandler: |
| self.setErrorHandler(errorHandler) |
| |
| def get_empties(self): |
| return self._empties.keys() |
| |
| # |
| # XMLReader interface |
| # |
| |
| def parse(self, source): |
| raise RuntimeError |
| self._locator._public_id = source.getPublicId() |
| self._locator._system_id = source.getSystemId() |
| fp = source.getByteStream() |
| handler = self.getContentHandler() |
| if handler: |
| handler.startDocument() |
| lineno = 0 |
| while 1: |
| token, data = self._get_token(fp) |
| if token is None: |
| break |
| lineno = lineno + 1 |
| self._locator._lineno = lineno |
| self._handle_token(token, data) |
| handler = self.getContentHandler() |
| if handler: |
| handler.startDocument() |
| |
| def feed(self, data): |
| if not self._started: |
| handler = self.getContentHandler() |
| if handler: |
| handler.startDocument() |
| self._started = 1 |
| data = self._buffer + data |
| self._buffer = None |
| lines = data.split("\n") |
| if lines: |
| for line in lines[:-1]: |
| self._lineno = self._lineno + 1 |
| self._locator._lineno = self._lineno |
| if not line: |
| e = xml.sax.SAXParseException( |
| "ESIS input line contains no token type mark", |
| None, self._locator) |
| self.getErrorHandler().error(e) |
| else: |
| self._handle_token(line[0], line[1:]) |
| self._buffer = lines[-1] |
| else: |
| self._buffer = "" |
| |
| def close(self): |
| handler = self.getContentHandler() |
| if handler: |
| handler.endDocument() |
| self._buffer = "" |
| |
| def _get_token(self, fp): |
| try: |
| line = fp.readline() |
| except IOError, e: |
| e = SAXException("I/O error reading input stream", e) |
| self.getErrorHandler().fatalError(e) |
| return |
| if not line: |
| return None, None |
| if line[-1] == "\n": |
| line = line[:-1] |
| if not line: |
| e = xml.sax.SAXParseException( |
| "ESIS input line contains no token type mark", |
| None, self._locator) |
| self.getErrorHandler().error(e) |
| return |
| return line[0], line[1:] |
| |
| def _handle_token(self, token, data): |
| handler = self.getContentHandler() |
| if token == '-': |
| if data and handler: |
| handler.characters(decode(data)) |
| elif token == ')': |
| if handler: |
| handler.endElement(decode(data)) |
| elif token == '(': |
| if self._is_empty: |
| self._empties[data] = 1 |
| self._is_empty = 0 |
| if handler: |
| handler.startElement(data, self._attributes) |
| self._attrs.clear() |
| elif token == 'A': |
| name, value = data.split(' ', 1) |
| if value != "IMPLIED": |
| type, value = value.split(' ', 1) |
| self._attrs[name] = (decode(value), type) |
| elif token == '&': |
| # entity reference in SAX? |
| pass |
| elif token == '?': |
| if handler: |
| if ' ' in data: |
| target, data = data.split(None, 1) |
| else: |
| target, data = data, "" |
| handler.processingInstruction(target, decode(data)) |
| elif token == 'N': |
| handler = self.getDTDHandler() |
| if handler: |
| handler.notationDecl(data, self._public_id, self._system_id) |
| self._public_id = None |
| self._system_id = None |
| elif token == 'p': |
| self._public_id = decode(data) |
| elif token == 's': |
| self._system_id = decode(data) |
| elif token == 'e': |
| self._is_empty = 1 |
| elif token == 'C': |
| pass |
| else: |
| e = SAXParseException("unknown ESIS token in event stream", |
| None, self._locator) |
| self.getErrorHandler().error(e) |
| |
| def setContentHandler(self, handler): |
| old = self.getContentHandler() |
| if old: |
| old.setDocumentLocator(None) |
| if handler: |
| handler.setDocumentLocator(self._locator) |
| xml.sax.xmlreader.XMLReader.setContentHandler(self, handler) |
| |
| def getProperty(self, property): |
| if property == xml.sax.handler.property_lexical_handler: |
| return self._lexical_handler |
| |
| elif property == xml.sax.handler.property_declaration_handler: |
| return self._decl_handler |
| |
| else: |
| raise xml.sax.SAXNotRecognizedException("unknown property %s" |
| % `property`) |
| |
| def setProperty(self, property, value): |
| if property == xml.sax.handler.property_lexical_handler: |
| if self._lexical_handler: |
| self._lexical_handler.setDocumentLocator(None) |
| if value: |
| value.setDocumentLocator(self._locator) |
| self._lexical_handler = value |
| |
| elif property == xml.sax.handler.property_declaration_handler: |
| if self._decl_handler: |
| self._decl_handler.setDocumentLocator(None) |
| if value: |
| value.setDocumentLocator(self._locator) |
| self._decl_handler = value |
| |
| else: |
| raise xml.sax.SAXNotRecognizedException() |
| |
| def getFeature(self, feature): |
| if feature == xml.sax.handler.feature_namespaces: |
| return 1 |
| else: |
| return xml.sax.xmlreader.XMLReader.getFeature(self, feature) |
| |
| def setFeature(self, feature, enabled): |
| if feature == xml.sax.handler.feature_namespaces: |
| pass |
| else: |
| xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled) |
| |
| |
| class Attributes(xml.sax.xmlreader.AttributesImpl): |
| # self._attrs has the form {name: (value, type)} |
| |
| def getType(self, name): |
| return self._attrs[name][1] |
| |
| def getValue(self, name): |
| return self._attrs[name][0] |
| |
| def getValueByQName(self, name): |
| return self._attrs[name][0] |
| |
| def __getitem__(self, name): |
| return self._attrs[name][0] |
| |
| def get(self, name, default=None): |
| if self._attrs.has_key(name): |
| return self._attrs[name][0] |
| return default |
| |
| def items(self): |
| L = [] |
| for name, (value, type) in self._attrs.items(): |
| L.append((name, value)) |
| return L |
| |
| def values(self): |
| L = [] |
| for value, type in self._attrs.values(): |
| L.append(value) |
| return L |
| |
| |
| class Locator(xml.sax.xmlreader.Locator): |
| _lineno = -1 |
| _public_id = None |
| _system_id = None |
| |
| def getLineNumber(self): |
| return self._lineno |
| |
| def getPublicId(self): |
| return self._public_id |
| |
| def getSystemId(self): |
| return self._system_id |
| |
| |
| def parse(stream_or_string, parser=None): |
| if type(stream_or_string) in [type(""), type(u"")]: |
| stream = open(stream_or_string) |
| else: |
| stream = stream_or_string |
| if not parser: |
| parser = ESISReader() |
| return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20) |