| import pprint |
| import sys |
| import unittest |
| |
| from test import test_support |
| |
| |
| class HookWatcher: |
| def __init__(self): |
| self.frames = [] |
| self.events = [] |
| |
| def callback(self, frame, event, arg): |
| self.add_event(event, frame) |
| |
| def add_event(self, event, frame=None): |
| """Add an event to the log.""" |
| if frame is None: |
| frame = sys._getframe(1) |
| |
| try: |
| frameno = self.frames.index(frame) |
| except ValueError: |
| frameno = len(self.frames) |
| self.frames.append(frame) |
| |
| self.events.append((frameno, event, ident(frame))) |
| |
| def get_events(self): |
| """Remove calls to add_event().""" |
| disallowed = [ident(self.add_event.im_func), ident(ident)] |
| self.frames = None |
| |
| return [item for item in self.events if item[2] not in disallowed] |
| |
| |
| class ProfileSimulator(HookWatcher): |
| def __init__(self, testcase): |
| self.testcase = testcase |
| self.stack = [] |
| HookWatcher.__init__(self) |
| |
| def callback(self, frame, event, arg): |
| # Callback registered with sys.setprofile()/sys.settrace() |
| self.dispatch[event](self, frame) |
| |
| def trace_call(self, frame): |
| self.add_event('call', frame) |
| self.stack.append(frame) |
| |
| def trace_return(self, frame): |
| self.add_event('return', frame) |
| self.stack.pop() |
| |
| def trace_exception(self, frame): |
| self.testcase.fail( |
| "the profiler should never receive exception events") |
| |
| dispatch = { |
| 'call': trace_call, |
| 'exception': trace_exception, |
| 'return': trace_return, |
| } |
| |
| |
| class TestCaseBase(unittest.TestCase): |
| def check_events(self, callable, expected): |
| events = capture_events(callable, self.new_watcher()) |
| if events != expected: |
| self.fail("Expected events:\n%s\nReceived events:\n%s" |
| % (pprint.pformat(expected), pprint.pformat(events))) |
| |
| |
| class ProfileHookTestCase(TestCaseBase): |
| def new_watcher(self): |
| return HookWatcher() |
| |
| def test_simple(self): |
| def f(p): |
| pass |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_exception(self): |
| def f(p): |
| 1/0 |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_caught_exception(self): |
| def f(p): |
| try: 1/0 |
| except: pass |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_caught_nested_exception(self): |
| def f(p): |
| try: 1/0 |
| except: pass |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_nested_exception(self): |
| def f(p): |
| 1/0 |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| # This isn't what I expected: |
| # (0, 'exception', protect_ident), |
| # I expected this again: |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_exception_in_except_clause(self): |
| def f(p): |
| 1/0 |
| def g(p): |
| try: |
| f(p) |
| except: |
| try: f(p) |
| except: pass |
| f_ident = ident(f) |
| g_ident = ident(g) |
| self.check_events(g, [(1, 'call', g_ident), |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| (3, 'call', f_ident), |
| (3, 'return', f_ident), |
| (1, 'return', g_ident), |
| ]) |
| |
| def test_exception_propogation(self): |
| def f(p): |
| 1/0 |
| def g(p): |
| try: f(p) |
| finally: p.add_event("falling through") |
| f_ident = ident(f) |
| g_ident = ident(g) |
| self.check_events(g, [(1, 'call', g_ident), |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| (1, 'falling through', g_ident), |
| (1, 'return', g_ident), |
| ]) |
| |
| def test_raise_twice(self): |
| def f(p): |
| try: 1/0 |
| except: 1/0 |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_raise_reraise(self): |
| def f(p): |
| try: 1/0 |
| except: raise |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_raise(self): |
| def f(p): |
| raise Exception() |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_distant_exception(self): |
| def f(): |
| 1/0 |
| def g(): |
| f() |
| def h(): |
| g() |
| def i(): |
| h() |
| def j(p): |
| i() |
| f_ident = ident(f) |
| g_ident = ident(g) |
| h_ident = ident(h) |
| i_ident = ident(i) |
| j_ident = ident(j) |
| self.check_events(j, [(1, 'call', j_ident), |
| (2, 'call', i_ident), |
| (3, 'call', h_ident), |
| (4, 'call', g_ident), |
| (5, 'call', f_ident), |
| (5, 'return', f_ident), |
| (4, 'return', g_ident), |
| (3, 'return', h_ident), |
| (2, 'return', i_ident), |
| (1, 'return', j_ident), |
| ]) |
| |
| def test_generator(self): |
| def f(): |
| for i in range(2): |
| yield i |
| def g(p): |
| for i in f(): |
| pass |
| f_ident = ident(f) |
| g_ident = ident(g) |
| self.check_events(g, [(1, 'call', g_ident), |
| # call the iterator twice to generate values |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| # once more; returns end-of-iteration with |
| # actually raising an exception |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| (1, 'return', g_ident), |
| ]) |
| |
| def test_stop_iteration(self): |
| def f(): |
| for i in range(2): |
| yield i |
| raise StopIteration |
| def g(p): |
| for i in f(): |
| pass |
| f_ident = ident(f) |
| g_ident = ident(g) |
| self.check_events(g, [(1, 'call', g_ident), |
| # call the iterator twice to generate values |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| # once more to hit the raise: |
| (2, 'call', f_ident), |
| (2, 'return', f_ident), |
| (1, 'return', g_ident), |
| ]) |
| |
| |
| class ProfileSimulatorTestCase(TestCaseBase): |
| def new_watcher(self): |
| return ProfileSimulator(self) |
| |
| def test_simple(self): |
| def f(p): |
| pass |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_basic_exception(self): |
| def f(p): |
| 1/0 |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_caught_exception(self): |
| def f(p): |
| try: 1/0 |
| except: pass |
| f_ident = ident(f) |
| self.check_events(f, [(1, 'call', f_ident), |
| (1, 'return', f_ident), |
| ]) |
| |
| def test_distant_exception(self): |
| def f(): |
| 1/0 |
| def g(): |
| f() |
| def h(): |
| g() |
| def i(): |
| h() |
| def j(p): |
| i() |
| f_ident = ident(f) |
| g_ident = ident(g) |
| h_ident = ident(h) |
| i_ident = ident(i) |
| j_ident = ident(j) |
| self.check_events(j, [(1, 'call', j_ident), |
| (2, 'call', i_ident), |
| (3, 'call', h_ident), |
| (4, 'call', g_ident), |
| (5, 'call', f_ident), |
| (5, 'return', f_ident), |
| (4, 'return', g_ident), |
| (3, 'return', h_ident), |
| (2, 'return', i_ident), |
| (1, 'return', j_ident), |
| ]) |
| |
| |
| def ident(function): |
| if hasattr(function, "f_code"): |
| code = function.f_code |
| else: |
| code = function.func_code |
| return code.co_firstlineno, code.co_name |
| |
| |
| def protect(f, p): |
| try: f(p) |
| except: pass |
| |
| protect_ident = ident(protect) |
| |
| |
| def capture_events(callable, p=None): |
| try: |
| sys.setprofile() |
| except TypeError: |
| pass |
| else: |
| raise test_support.TestFailed( |
| 'sys.setprofile() did not raise TypeError') |
| |
| if p is None: |
| p = HookWatcher() |
| sys.setprofile(p.callback) |
| protect(callable, p) |
| sys.setprofile(None) |
| return p.get_events()[1:-1] |
| |
| |
| def show_events(callable): |
| import pprint |
| pprint.pprint(capture_events(callable)) |
| |
| |
| def test_main(): |
| test_support.run_unittest( |
| ProfileHookTestCase, |
| ProfileSimulatorTestCase |
| ) |
| |
| |
| if __name__ == "__main__": |
| test_main() |