| """Tests for sys.audit and sys.addaudithook |
| """ |
| |
| import os |
| import subprocess |
| import sys |
| import unittest |
| from test import support |
| |
| if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"): |
| raise unittest.SkipTest("test only relevant when sys.audit is available") |
| |
| |
| class TestHook: |
| """Used in standard hook tests to collect any logged events. |
| |
| Should be used in a with block to ensure that it has no impact |
| after the test completes. Audit hooks cannot be removed, so the |
| best we can do for the test run is disable it by calling close(). |
| """ |
| |
| def __init__(self, raise_on_events=None, exc_type=RuntimeError): |
| self.raise_on_events = raise_on_events or () |
| self.exc_type = exc_type |
| self.seen = [] |
| self.closed = False |
| |
| def __enter__(self, *a): |
| sys.addaudithook(self) |
| return self |
| |
| def __exit__(self, *a): |
| self.close() |
| |
| def close(self): |
| self.closed = True |
| |
| @property |
| def seen_events(self): |
| return [i[0] for i in self.seen] |
| |
| def __call__(self, event, args): |
| if self.closed: |
| return |
| self.seen.append((event, args)) |
| if event in self.raise_on_events: |
| raise self.exc_type("saw event " + event) |
| |
| |
| class TestFinalizeHook: |
| """Used in the test_finalize_hooks function to ensure that hooks |
| are correctly cleaned up, that they are notified about the cleanup, |
| and are unable to prevent it. |
| """ |
| |
| def __init__(self): |
| print("Created", id(self), file=sys.stderr, flush=True) |
| |
| def __call__(self, event, args): |
| # Avoid recursion when we call id() below |
| if event == "builtins.id": |
| return |
| |
| print(event, id(self), file=sys.stderr, flush=True) |
| |
| if event == "cpython._PySys_ClearAuditHooks": |
| raise RuntimeError("Should be ignored") |
| elif event == "cpython.PyInterpreterState_Clear": |
| raise RuntimeError("Should be ignored") |
| |
| |
| def run_finalize_test(): |
| """Called by test_finalize_hooks in a subprocess.""" |
| sys.addaudithook(TestFinalizeHook()) |
| |
| |
| class AuditTest(unittest.TestCase): |
| def test_basic(self): |
| with TestHook() as hook: |
| sys.audit("test_event", 1, 2, 3) |
| self.assertEqual(hook.seen[0][0], "test_event") |
| self.assertEqual(hook.seen[0][1], (1, 2, 3)) |
| |
| def test_block_add_hook(self): |
| # Raising an exception should prevent a new hook from being added, |
| # but will not propagate out. |
| with TestHook(raise_on_events="sys.addaudithook") as hook1: |
| with TestHook() as hook2: |
| sys.audit("test_event") |
| self.assertIn("test_event", hook1.seen_events) |
| self.assertNotIn("test_event", hook2.seen_events) |
| |
| def test_block_add_hook_baseexception(self): |
| # Raising BaseException will propagate out when adding a hook |
| with self.assertRaises(BaseException): |
| with TestHook( |
| raise_on_events="sys.addaudithook", exc_type=BaseException |
| ) as hook1: |
| # Adding this next hook should raise BaseException |
| with TestHook() as hook2: |
| pass |
| |
| def test_finalize_hooks(self): |
| events = [] |
| with subprocess.Popen( |
| [ |
| sys.executable, |
| "-c", |
| "import test.test_audit; test.test_audit.run_finalize_test()", |
| ], |
| encoding="utf-8", |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| ) as p: |
| p.wait() |
| for line in p.stderr: |
| events.append(line.strip().partition(" ")) |
| firstId = events[0][2] |
| self.assertSequenceEqual( |
| [ |
| ("Created", " ", firstId), |
| ("cpython._PySys_ClearAuditHooks", " ", firstId), |
| ], |
| events, |
| ) |
| |
| def test_pickle(self): |
| pickle = support.import_module("pickle") |
| |
| class PicklePrint: |
| def __reduce_ex__(self, p): |
| return str, ("Pwned!",) |
| |
| payload_1 = pickle.dumps(PicklePrint()) |
| payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3)) |
| |
| # Before we add the hook, ensure our malicious pickle loads |
| self.assertEqual("Pwned!", pickle.loads(payload_1)) |
| |
| with TestHook(raise_on_events="pickle.find_class") as hook: |
| with self.assertRaises(RuntimeError): |
| # With the hook enabled, loading globals is not allowed |
| pickle.loads(payload_1) |
| # pickles with no globals are okay |
| pickle.loads(payload_2) |
| |
| def test_monkeypatch(self): |
| class A: |
| pass |
| |
| class B: |
| pass |
| |
| class C(A): |
| pass |
| |
| a = A() |
| |
| with TestHook() as hook: |
| # Catch name changes |
| C.__name__ = "X" |
| # Catch type changes |
| C.__bases__ = (B,) |
| # Ensure bypassing __setattr__ is still caught |
| type.__dict__["__bases__"].__set__(C, (B,)) |
| # Catch attribute replacement |
| C.__init__ = B.__init__ |
| # Catch attribute addition |
| C.new_attr = 123 |
| # Catch class changes |
| a.__class__ = B |
| |
| actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"] |
| self.assertSequenceEqual( |
| [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], |
| actual, |
| ) |
| |
| def test_open(self): |
| # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open() |
| try: |
| import ssl |
| |
| load_dh_params = ssl.create_default_context().load_dh_params |
| except ImportError: |
| load_dh_params = None |
| |
| # Try a range of "open" functions. |
| # All of them should fail |
| with TestHook(raise_on_events={"open"}) as hook: |
| for fn, *args in [ |
| (open, support.TESTFN, "r"), |
| (open, sys.executable, "rb"), |
| (open, 3, "wb"), |
| (open, support.TESTFN, "w", -1, None, None, None, False, lambda *a: 1), |
| (load_dh_params, support.TESTFN), |
| ]: |
| if not fn: |
| continue |
| self.assertRaises(RuntimeError, fn, *args) |
| |
| actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]] |
| actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]] |
| self.assertSequenceEqual( |
| [ |
| i |
| for i in [ |
| (support.TESTFN, "r"), |
| (sys.executable, "r"), |
| (3, "w"), |
| (support.TESTFN, "w"), |
| (support.TESTFN, "rb") if load_dh_params else None, |
| ] |
| if i is not None |
| ], |
| actual_mode, |
| ) |
| self.assertSequenceEqual([], actual_flag) |
| |
| def test_cantrace(self): |
| traced = [] |
| |
| def trace(frame, event, *args): |
| if frame.f_code == TestHook.__call__.__code__: |
| traced.append(event) |
| |
| old = sys.settrace(trace) |
| try: |
| with TestHook() as hook: |
| # No traced call |
| eval("1") |
| |
| # No traced call |
| hook.__cantrace__ = False |
| eval("2") |
| |
| # One traced call |
| hook.__cantrace__ = True |
| eval("3") |
| |
| # Two traced calls (writing to private member, eval) |
| hook.__cantrace__ = 1 |
| eval("4") |
| |
| # One traced call (writing to private member) |
| hook.__cantrace__ = 0 |
| finally: |
| sys.settrace(old) |
| |
| self.assertSequenceEqual(["call"] * 4, traced) |
| |
| |
| if __name__ == "__main__": |
| if len(sys.argv) >= 2 and sys.argv[1] == "spython_test": |
| # Doesn't matter what we add - it will be blocked |
| sys.addaudithook(None) |
| |
| sys.exit(0) |
| |
| unittest.main() |