| import itertools |
| |
| import Util |
| |
| class ShLexer: |
| def __init__(self, data, win32Escapes = False): |
| self.data = data |
| self.pos = 0 |
| self.end = len(data) |
| self.win32Escapes = win32Escapes |
| |
| def eat(self): |
| c = self.data[self.pos] |
| self.pos += 1 |
| return c |
| |
| def look(self): |
| return self.data[self.pos] |
| |
| def maybe_eat(self, c): |
| """ |
| maybe_eat(c) - Consume the character c if it is the next character, |
| returning True if a character was consumed. """ |
| if self.data[self.pos] == c: |
| self.pos += 1 |
| return True |
| return False |
| |
| def lex_arg_fast(self, c): |
| # Get the leading whitespace free section. |
| chunk = self.data[self.pos - 1:].split(None, 1)[0] |
| |
| # If it has special characters, the fast path failed. |
| if ('|' in chunk or '&' in chunk or |
| '<' in chunk or '>' in chunk or |
| "'" in chunk or '"' in chunk or |
| '\\' in chunk): |
| return None |
| |
| self.pos = self.pos - 1 + len(chunk) |
| return chunk |
| |
| def lex_arg_slow(self, c): |
| if c in "'\"": |
| str = self.lex_arg_quoted(c) |
| else: |
| str = c |
| while self.pos != self.end: |
| c = self.look() |
| if c.isspace() or c in "|&": |
| break |
| elif c in '><': |
| # This is an annoying case; we treat '2>' as a single token so |
| # we don't have to track whitespace tokens. |
| |
| # If the parse string isn't an integer, do the usual thing. |
| if not str.isdigit(): |
| break |
| |
| # Otherwise, lex the operator and convert to a redirection |
| # token. |
| num = int(str) |
| tok = self.lex_one_token() |
| assert isinstance(tok, tuple) and len(tok) == 1 |
| return (tok[0], num) |
| elif c == '"': |
| self.eat() |
| str += self.lex_arg_quoted('"')
|
| elif not self.win32Escapes and c == '\\': |
| # Outside of a string, '\\' escapes everything. |
| self.eat() |
| if self.pos == self.end: |
| Util.warning("escape at end of quoted argument in: %r" % |
| self.data) |
| return str |
| str += self.eat() |
| else: |
| str += self.eat() |
| return str |
| |
| def lex_arg_quoted(self, delim): |
| str = '' |
| while self.pos != self.end: |
| c = self.eat() |
| if c == delim: |
| return str |
| elif c == '\\' and delim == '"': |
| # Inside a '"' quoted string, '\\' only escapes the quote |
| # character and backslash, otherwise it is preserved. |
| if self.pos == self.end: |
| Util.warning("escape at end of quoted argument in: %r" % |
| self.data) |
| return str |
| c = self.eat() |
| if c == '"': # |
| str += '"' |
| elif c == '\\': |
| str += '\\' |
| else: |
| str += '\\' + c |
| else: |
| str += c |
| Util.warning("missing quote character in %r" % self.data) |
| return str |
| |
| def lex_arg_checked(self, c): |
| pos = self.pos |
| res = self.lex_arg_fast(c) |
| end = self.pos |
| |
| self.pos = pos |
| reference = self.lex_arg_slow(c) |
| if res is not None: |
| if res != reference: |
| raise ValueError,"Fast path failure: %r != %r" % (res, reference) |
| if self.pos != end: |
| raise ValueError,"Fast path failure: %r != %r" % (self.pos, end) |
| return reference |
| |
| def lex_arg(self, c): |
| return self.lex_arg_fast(c) or self.lex_arg_slow(c) |
| |
| def lex_one_token(self): |
| """ |
| lex_one_token - Lex a single 'sh' token. """ |
| |
| c = self.eat() |
| if c in ';!': |
| return (c,) |
| if c == '|': |
| if self.maybe_eat('|'): |
| return ('||',) |
| return (c,) |
| if c == '&': |
| if self.maybe_eat('&'): |
| return ('&&',) |
| if self.maybe_eat('>'): |
| return ('&>',) |
| return (c,) |
| if c == '>': |
| if self.maybe_eat('&'): |
| return ('>&',) |
| if self.maybe_eat('>'): |
| return ('>>',) |
| return (c,) |
| if c == '<': |
| if self.maybe_eat('&'): |
| return ('<&',) |
| if self.maybe_eat('>'): |
| return ('<<',) |
| return (c,) |
| |
| return self.lex_arg(c) |
| |
| def lex(self): |
| while self.pos != self.end: |
| if self.look().isspace(): |
| self.eat() |
| else: |
| yield self.lex_one_token() |
| |
| ### |
| |
| class Command: |
| def __init__(self, args, redirects): |
| self.args = list(args) |
| self.redirects = list(redirects) |
| |
| def __repr__(self): |
| return 'Command(%r, %r)' % (self.args, self.redirects) |
| |
| def __cmp__(self, other): |
| if not isinstance(other, Command): |
| return -1 |
| |
| return cmp((self.args, self.redirects), |
| (other.args, other.redirects)) |
| |
| class Pipeline: |
| def __init__(self, commands, negate): |
| self.commands = commands |
| self.negate = negate |
| |
| def __repr__(self): |
| return 'Pipeline(%r, %r)' % (self.commands, self.negate) |
| |
| def __cmp__(self, other): |
| if not isinstance(other, Pipeline): |
| return -1 |
| |
| return cmp((self.commands, self.negate), |
| (other.commands, other.negate)) |
| |
| class Seq: |
| def __init__(self, lhs, op, rhs): |
| assert op in (';', '&', '||', '&&') |
| self.op = op |
| self.lhs = lhs |
| self.rhs = rhs |
| |
| def __repr__(self): |
| return 'Seq(%r, %r, %r)' % (self.lhs, self.op, self.rhs) |
| |
| def __cmp__(self, other): |
| if not isinstance(other, Seq): |
| return -1 |
| |
| return cmp((self.lhs, self.op, self.rhs), |
| (other.lhs, other.op, other.rhs)) |
| |
| class ShParser: |
| def __init__(self, data, win32Escapes = False): |
| self.data = data |
| self.tokens = ShLexer(data, win32Escapes = win32Escapes).lex() |
| |
| def lex(self): |
| try: |
| return self.tokens.next() |
| except StopIteration: |
| return None |
| |
| def look(self): |
| next = self.lex() |
| if next is not None: |
| self.tokens = itertools.chain([next], self.tokens) |
| return next |
| |
| def parse_command(self): |
| tok = self.lex() |
| if not tok: |
| raise ValueError,"empty command!" |
| if isinstance(tok, tuple): |
| raise ValueError,"syntax error near unexpected token %r" % tok[0] |
| |
| args = [tok] |
| redirects = [] |
| while 1: |
| tok = self.look() |
| |
| # EOF? |
| if tok is None: |
| break |
| |
| # If this is an argument, just add it to the current command. |
| if isinstance(tok, str): |
| args.append(self.lex()) |
| continue |
| |
| # Otherwise see if it is a terminator. |
| assert isinstance(tok, tuple) |
| if tok[0] in ('|',';','&','||','&&'): |
| break |
| |
| # Otherwise it must be a redirection. |
| op = self.lex() |
| arg = self.lex() |
| if not arg: |
| raise ValueError,"syntax error near token %r" % op[0] |
| redirects.append((op, arg)) |
| |
| return Command(args, redirects) |
| |
| def parse_pipeline(self): |
| negate = False |
| if self.look() == ('!',): |
| self.lex() |
| negate = True |
| |
| commands = [self.parse_command()] |
| while self.look() == ('|',): |
| self.lex() |
| commands.append(self.parse_command()) |
| return Pipeline(commands, negate) |
| |
| def parse(self): |
| lhs = self.parse_pipeline() |
| |
| while self.look(): |
| operator = self.lex() |
| assert isinstance(operator, tuple) and len(operator) == 1 |
| |
| if not self.look(): |
| raise ValueError, "missing argument to operator %r" % operator[0] |
| |
| # FIXME: Operator precedence!! |
| lhs = Seq(lhs, operator[0], self.parse_pipeline()) |
| |
| return lhs |
| |
| ### |
| |
| import unittest |
| |
| class TestShLexer(unittest.TestCase): |
| def lex(self, str, *args, **kwargs): |
| return list(ShLexer(str, *args, **kwargs).lex()) |
| |
| def test_basic(self): |
| self.assertEqual(self.lex('a|b>c&d<e'), |
| ['a', ('|',), 'b', ('>',), 'c', ('&',), 'd', |
| ('<',), 'e']) |
| |
| def test_redirection_tokens(self): |
| self.assertEqual(self.lex('a2>c'), |
| ['a2', ('>',), 'c']) |
| self.assertEqual(self.lex('a 2>c'), |
| ['a', ('>',2), 'c']) |
| |
| def test_quoting(self): |
| self.assertEqual(self.lex(""" 'a' """), |
| ['a']) |
| self.assertEqual(self.lex(""" "hello\\"world" """), |
| ['hello"world']) |
| self.assertEqual(self.lex(""" "hello\\'world" """), |
| ["hello\\'world"]) |
| self.assertEqual(self.lex(""" "hello\\\\world" """), |
| ["hello\\world"]) |
| self.assertEqual(self.lex(""" he"llo wo"rld """), |
| ["hello world"]) |
| self.assertEqual(self.lex(""" a\\ b a\\\\b """), |
| ["a b", "a\\b"]) |
| self.assertEqual(self.lex(""" "" "" """), |
| ["", ""]) |
| self.assertEqual(self.lex(""" a\\ b """, win32Escapes = True), |
| ['a\\', 'b']) |
| |
| class TestShParse(unittest.TestCase): |
| def parse(self, str): |
| return ShParser(str).parse() |
| |
| def test_basic(self): |
| self.assertEqual(self.parse('echo hello'), |
| Pipeline([Command(['echo', 'hello'], [])], False)) |
| self.assertEqual(self.parse('echo ""'), |
| Pipeline([Command(['echo', ''], [])], False)) |
| |
| def test_redirection(self): |
| self.assertEqual(self.parse('echo hello > c'), |
| Pipeline([Command(['echo', 'hello'], |
| [((('>'),), 'c')])], False)) |
| self.assertEqual(self.parse('echo hello > c >> d'), |
| Pipeline([Command(['echo', 'hello'], [(('>',), 'c'), |
| (('>>',), 'd')])], False)) |
| |
| def test_pipeline(self): |
| self.assertEqual(self.parse('a | b'), |
| Pipeline([Command(['a'], []), |
| Command(['b'], [])], |
| False)) |
| |
| self.assertEqual(self.parse('a | b | c'), |
| Pipeline([Command(['a'], []), |
| Command(['b'], []), |
| Command(['c'], [])], |
| False)) |
| |
| self.assertEqual(self.parse('! a'), |
| Pipeline([Command(['a'], [])], |
| True)) |
| |
| def test_list(self): |
| self.assertEqual(self.parse('a ; b'), |
| Seq(Pipeline([Command(['a'], [])], False), |
| ';', |
| Pipeline([Command(['b'], [])], False))) |
| |
| self.assertEqual(self.parse('a & b'), |
| Seq(Pipeline([Command(['a'], [])], False), |
| '&', |
| Pipeline([Command(['b'], [])], False))) |
| |
| self.assertEqual(self.parse('a && b'), |
| Seq(Pipeline([Command(['a'], [])], False), |
| '&&', |
| Pipeline([Command(['b'], [])], False))) |
| |
| self.assertEqual(self.parse('a || b'), |
| Seq(Pipeline([Command(['a'], [])], False), |
| '||', |
| Pipeline([Command(['b'], [])], False))) |
| |
| self.assertEqual(self.parse('a && b || c'), |
| Seq(Seq(Pipeline([Command(['a'], [])], False), |
| '&&', |
| Pipeline([Command(['b'], [])], False)), |
| '||', |
| Pipeline([Command(['c'], [])], False))) |
| |
| if __name__ == '__main__': |
| unittest.main() |