| import argparse |
| import sys |
| import time |
| import token |
| import tokenize |
| import traceback |
| |
| from abc import abstractmethod |
| from typing import Any, Callable, cast, Dict, Optional, Tuple, Type, TypeVar |
| |
| from pegen.tokenizer import exact_token_types |
| from pegen.tokenizer import Mark |
| from pegen.tokenizer import Tokenizer |
| |
| T = TypeVar("T") |
| P = TypeVar("P", bound="Parser") |
| F = TypeVar("F", bound=Callable[..., Any]) |
| |
| |
| def logger(method: F) -> F: |
| """For non-memoized functions that we want to be logged. |
| |
| (In practice this is only non-leader left-recursive functions.) |
| """ |
| method_name = method.__name__ |
| |
| def logger_wrapper(self: P, *args: object) -> T: |
| if not self._verbose: |
| return method(self, *args) |
| argsr = ",".join(repr(arg) for arg in args) |
| fill = " " * self._level |
| print(f"{fill}{method_name}({argsr}) .... (looking at {self.showpeek()})") |
| self._level += 1 |
| tree = method(self, *args) |
| self._level -= 1 |
| print(f"{fill}... {method_name}({argsr}) --> {tree!s:.200}") |
| return tree |
| |
| logger_wrapper.__wrapped__ = method # type: ignore |
| return cast(F, logger_wrapper) |
| |
| |
| def memoize(method: F) -> F: |
| """Memoize a symbol method.""" |
| method_name = method.__name__ |
| |
| def memoize_wrapper(self: P, *args: object) -> T: |
| mark = self.mark() |
| key = mark, method_name, args |
| # Fast path: cache hit, and not verbose. |
| if key in self._cache and not self._verbose: |
| tree, endmark = self._cache[key] |
| self.reset(endmark) |
| return tree |
| # Slow path: no cache hit, or verbose. |
| verbose = self._verbose |
| argsr = ",".join(repr(arg) for arg in args) |
| fill = " " * self._level |
| if key not in self._cache: |
| if verbose: |
| print(f"{fill}{method_name}({argsr}) ... (looking at {self.showpeek()})") |
| self._level += 1 |
| tree = method(self, *args) |
| self._level -= 1 |
| if verbose: |
| print(f"{fill}... {method_name}({argsr}) -> {tree!s:.200}") |
| endmark = self.mark() |
| self._cache[key] = tree, endmark |
| else: |
| tree, endmark = self._cache[key] |
| if verbose: |
| print(f"{fill}{method_name}({argsr}) -> {tree!s:.200}") |
| self.reset(endmark) |
| return tree |
| |
| memoize_wrapper.__wrapped__ = method # type: ignore |
| return cast(F, memoize_wrapper) |
| |
| |
| def memoize_left_rec(method: Callable[[P], Optional[T]]) -> Callable[[P], Optional[T]]: |
| """Memoize a left-recursive symbol method.""" |
| method_name = method.__name__ |
| |
| def memoize_left_rec_wrapper(self: P) -> Optional[T]: |
| mark = self.mark() |
| key = mark, method_name, () |
| # Fast path: cache hit, and not verbose. |
| if key in self._cache and not self._verbose: |
| tree, endmark = self._cache[key] |
| self.reset(endmark) |
| return tree |
| # Slow path: no cache hit, or verbose. |
| verbose = self._verbose |
| fill = " " * self._level |
| if key not in self._cache: |
| if verbose: |
| print(f"{fill}{method_name} ... (looking at {self.showpeek()})") |
| self._level += 1 |
| |
| # For left-recursive rules we manipulate the cache and |
| # loop until the rule shows no progress, then pick the |
| # previous result. For an explanation why this works, see |
| # https://github.com/PhilippeSigaud/Pegged/wiki/Left-Recursion |
| # (But we use the memoization cache instead of a static |
| # variable; perhaps this is similar to a paper by Warth et al. |
| # (http://web.cs.ucla.edu/~todd/research/pub.php?id=pepm08). |
| |
| # Prime the cache with a failure. |
| self._cache[key] = None, mark |
| lastresult, lastmark = None, mark |
| depth = 0 |
| if verbose: |
| print(f"{fill}Recursive {method_name} at {mark} depth {depth}") |
| |
| while True: |
| self.reset(mark) |
| result = method(self) |
| endmark = self.mark() |
| depth += 1 |
| if verbose: |
| print( |
| f"{fill}Recursive {method_name} at {mark} depth {depth}: {result!s:.200} to {endmark}" |
| ) |
| if not result: |
| if verbose: |
| print(f"{fill}Fail with {lastresult!s:.200} to {lastmark}") |
| break |
| if endmark <= lastmark: |
| if verbose: |
| print(f"{fill}Bailing with {lastresult!s:.200} to {lastmark}") |
| break |
| self._cache[key] = lastresult, lastmark = result, endmark |
| |
| self.reset(lastmark) |
| tree = lastresult |
| |
| self._level -= 1 |
| if verbose: |
| print(f"{fill}{method_name}() -> {tree!s:.200} [cached]") |
| if tree: |
| endmark = self.mark() |
| else: |
| endmark = mark |
| self.reset(endmark) |
| self._cache[key] = tree, endmark |
| else: |
| tree, endmark = self._cache[key] |
| if verbose: |
| print(f"{fill}{method_name}() -> {tree!s:.200} [fresh]") |
| if tree: |
| self.reset(endmark) |
| return tree |
| |
| memoize_left_rec_wrapper.__wrapped__ = method # type: ignore |
| return memoize_left_rec_wrapper |
| |
| |
| class Parser: |
| """Parsing base class.""" |
| |
| def __init__(self, tokenizer: Tokenizer, *, verbose: bool = False): |
| self._tokenizer = tokenizer |
| self._verbose = verbose |
| self._level = 0 |
| self._cache: Dict[Tuple[Mark, str, Tuple[Any, ...]], Tuple[Any, Mark]] = {} |
| # Pass through common tokenizer methods. |
| # TODO: Rename to _mark and _reset. |
| self.mark = self._tokenizer.mark |
| self.reset = self._tokenizer.reset |
| |
| @abstractmethod |
| def start(self) -> Any: |
| pass |
| |
| def showpeek(self) -> str: |
| tok = self._tokenizer.peek() |
| return f"{tok.start[0]}.{tok.start[1]}: {token.tok_name[tok.type]}:{tok.string!r}" |
| |
| @memoize |
| def name(self) -> Optional[tokenize.TokenInfo]: |
| tok = self._tokenizer.peek() |
| if tok.type == token.NAME: |
| return self._tokenizer.getnext() |
| return None |
| |
| @memoize |
| def number(self) -> Optional[tokenize.TokenInfo]: |
| tok = self._tokenizer.peek() |
| if tok.type == token.NUMBER: |
| return self._tokenizer.getnext() |
| return None |
| |
| @memoize |
| def string(self) -> Optional[tokenize.TokenInfo]: |
| tok = self._tokenizer.peek() |
| if tok.type == token.STRING: |
| return self._tokenizer.getnext() |
| return None |
| |
| @memoize |
| def op(self) -> Optional[tokenize.TokenInfo]: |
| tok = self._tokenizer.peek() |
| if tok.type == token.OP: |
| return self._tokenizer.getnext() |
| return None |
| |
| @memoize |
| def expect(self, type: str) -> Optional[tokenize.TokenInfo]: |
| tok = self._tokenizer.peek() |
| if tok.string == type: |
| return self._tokenizer.getnext() |
| if type in exact_token_types: |
| if tok.type == exact_token_types[type]: |
| return self._tokenizer.getnext() |
| if type in token.__dict__: |
| if tok.type == token.__dict__[type]: |
| return self._tokenizer.getnext() |
| if tok.type == token.OP and tok.string == type: |
| return self._tokenizer.getnext() |
| return None |
| |
| def positive_lookahead(self, func: Callable[..., T], *args: object) -> T: |
| mark = self.mark() |
| ok = func(*args) |
| self.reset(mark) |
| return ok |
| |
| def negative_lookahead(self, func: Callable[..., object], *args: object) -> bool: |
| mark = self.mark() |
| ok = func(*args) |
| self.reset(mark) |
| return not ok |
| |
| def make_syntax_error(self, filename: str = "<unknown>") -> SyntaxError: |
| tok = self._tokenizer.diagnose() |
| return SyntaxError( |
| "pegen parse failure", (filename, tok.start[0], 1 + tok.start[1], tok.line) |
| ) |
| |
| |
| def simple_parser_main(parser_class: Type[Parser]) -> None: |
| argparser = argparse.ArgumentParser() |
| argparser.add_argument( |
| "-v", |
| "--verbose", |
| action="count", |
| default=0, |
| help="Print timing stats; repeat for more debug output", |
| ) |
| argparser.add_argument( |
| "-q", "--quiet", action="store_true", help="Don't print the parsed program" |
| ) |
| argparser.add_argument("filename", help="Input file ('-' to use stdin)") |
| |
| args = argparser.parse_args() |
| verbose = args.verbose |
| verbose_tokenizer = verbose >= 3 |
| verbose_parser = verbose == 2 or verbose >= 4 |
| |
| t0 = time.time() |
| |
| filename = args.filename |
| if filename == "" or filename == "-": |
| filename = "<stdin>" |
| file = sys.stdin |
| else: |
| file = open(args.filename) |
| try: |
| tokengen = tokenize.generate_tokens(file.readline) |
| tokenizer = Tokenizer(tokengen, verbose=verbose_tokenizer) |
| parser = parser_class(tokenizer, verbose=verbose_parser) |
| tree = parser.start() |
| try: |
| if file.isatty(): |
| endpos = 0 |
| else: |
| endpos = file.tell() |
| except IOError: |
| endpos = 0 |
| finally: |
| if file is not sys.stdin: |
| file.close() |
| |
| t1 = time.time() |
| |
| if not tree: |
| err = parser.make_syntax_error(filename) |
| traceback.print_exception(err.__class__, err, None) |
| sys.exit(1) |
| |
| if not args.quiet: |
| print(tree) |
| |
| if verbose: |
| dt = t1 - t0 |
| diag = tokenizer.diagnose() |
| nlines = diag.end[0] |
| if diag.type == token.ENDMARKER: |
| nlines -= 1 |
| print(f"Total time: {dt:.3f} sec; {nlines} lines", end="") |
| if endpos: |
| print(f" ({endpos} bytes)", end="") |
| if dt: |
| print(f"; {nlines / dt:.0f} lines/sec") |
| else: |
| print() |
| print("Caches sizes:") |
| print(f" token array : {len(tokenizer._tokens):10}") |
| print(f" cache : {len(parser._cache):10}") |
| ## print_memstats() |