bpo-36842: Implement PEP 578 (GH-12613)

Adds sys.audit, sys.addaudithook, io.open_code, and associated C APIs.
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
index 9a6585a..8493114 100644
--- a/Lib/test/libregrtest/setup.py
+++ b/Lib/test/libregrtest/setup.py
@@ -107,6 +107,12 @@
 
     support.use_resources = ns.use_resources
 
+    if hasattr(sys, 'addaudithook'):
+        # Add an auditing hook for all tests to ensure PySys_Audit is tested
+        def _test_audit_hook(name, args):
+            pass
+        sys.addaudithook(_test_audit_hook)
+
 
 def replace_stdout():
     """Set stdout encoder error handler to backslashreplace (as stderr error
diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py
new file mode 100644
index 0000000..5b33d97
--- /dev/null
+++ b/Lib/test/test_audit.py
@@ -0,0 +1,260 @@
+"""Tests for sys.audit and sys.addaudithook
+"""
+
+import os
+import subprocess
+import sys
+import unittest
+from test import support
+
+if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
+    raise unittest.SkipTest("test only relevant when sys.audit is available")
+
+
+class TestHook:
+    """Used in standard hook tests to collect any logged events.
+
+    Should be used in a with block to ensure that it has no impact
+    after the test completes. Audit hooks cannot be removed, so the
+    best we can do for the test run is disable it by calling close().
+    """
+
+    def __init__(self, raise_on_events=None, exc_type=RuntimeError):
+        self.raise_on_events = raise_on_events or ()
+        self.exc_type = exc_type
+        self.seen = []
+        self.closed = False
+
+    def __enter__(self, *a):
+        sys.addaudithook(self)
+        return self
+
+    def __exit__(self, *a):
+        self.close()
+
+    def close(self):
+        self.closed = True
+
+    @property
+    def seen_events(self):
+        return [i[0] for i in self.seen]
+
+    def __call__(self, event, args):
+        if self.closed:
+            return
+        self.seen.append((event, args))
+        if event in self.raise_on_events:
+            raise self.exc_type("saw event " + event)
+
+
+class TestFinalizeHook:
+    """Used in the test_finalize_hooks function to ensure that hooks
+    are correctly cleaned up, that they are notified about the cleanup,
+    and are unable to prevent it.
+    """
+
+    def __init__(self):
+        print("Created", id(self), file=sys.stderr, flush=True)
+
+    def __call__(self, event, args):
+        # Avoid recursion when we call id() below
+        if event == "builtins.id":
+            return
+
+        print(event, id(self), file=sys.stderr, flush=True)
+
+        if event == "cpython._PySys_ClearAuditHooks":
+            raise RuntimeError("Should be ignored")
+        elif event == "cpython.PyInterpreterState_Clear":
+            raise RuntimeError("Should be ignored")
+
+
+def run_finalize_test():
+    """Called by test_finalize_hooks in a subprocess."""
+    sys.addaudithook(TestFinalizeHook())
+
+
+class AuditTest(unittest.TestCase):
+    def test_basic(self):
+        with TestHook() as hook:
+            sys.audit("test_event", 1, 2, 3)
+            self.assertEqual(hook.seen[0][0], "test_event")
+            self.assertEqual(hook.seen[0][1], (1, 2, 3))
+
+    def test_block_add_hook(self):
+        # Raising an exception should prevent a new hook from being added,
+        # but will not propagate out.
+        with TestHook(raise_on_events="sys.addaudithook") as hook1:
+            with TestHook() as hook2:
+                sys.audit("test_event")
+                self.assertIn("test_event", hook1.seen_events)
+                self.assertNotIn("test_event", hook2.seen_events)
+
+    def test_block_add_hook_baseexception(self):
+        # Raising BaseException will propagate out when adding a hook
+        with self.assertRaises(BaseException):
+            with TestHook(
+                raise_on_events="sys.addaudithook", exc_type=BaseException
+            ) as hook1:
+                # Adding this next hook should raise BaseException
+                with TestHook() as hook2:
+                    pass
+
+    def test_finalize_hooks(self):
+        events = []
+        with subprocess.Popen(
+            [
+                sys.executable,
+                "-c",
+                "import test.test_audit; test.test_audit.run_finalize_test()",
+            ],
+            encoding="utf-8",
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+        ) as p:
+            p.wait()
+            for line in p.stderr:
+                events.append(line.strip().partition(" "))
+        firstId = events[0][2]
+        self.assertSequenceEqual(
+            [
+                ("Created", " ", firstId),
+                ("cpython._PySys_ClearAuditHooks", " ", firstId),
+            ],
+            events,
+        )
+
+    def test_pickle(self):
+        pickle = support.import_module("pickle")
+
+        class PicklePrint:
+            def __reduce_ex__(self, p):
+                return str, ("Pwned!",)
+
+        payload_1 = pickle.dumps(PicklePrint())
+        payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3))
+
+        # Before we add the hook, ensure our malicious pickle loads
+        self.assertEqual("Pwned!", pickle.loads(payload_1))
+
+        with TestHook(raise_on_events="pickle.find_class") as hook:
+            with self.assertRaises(RuntimeError):
+                # With the hook enabled, loading globals is not allowed
+                pickle.loads(payload_1)
+            # pickles with no globals are okay
+            pickle.loads(payload_2)
+
+    def test_monkeypatch(self):
+        class A:
+            pass
+
+        class B:
+            pass
+
+        class C(A):
+            pass
+
+        a = A()
+
+        with TestHook() as hook:
+            # Catch name changes
+            C.__name__ = "X"
+            # Catch type changes
+            C.__bases__ = (B,)
+            # Ensure bypassing __setattr__ is still caught
+            type.__dict__["__bases__"].__set__(C, (B,))
+            # Catch attribute replacement
+            C.__init__ = B.__init__
+            # Catch attribute addition
+            C.new_attr = 123
+            # Catch class changes
+            a.__class__ = B
+
+        actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"]
+        self.assertSequenceEqual(
+            [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")],
+            actual,
+        )
+
+    def test_open(self):
+        # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
+        try:
+            import ssl
+
+            load_dh_params = ssl.create_default_context().load_dh_params
+        except ImportError:
+            load_dh_params = None
+
+        # Try a range of "open" functions.
+        # All of them should fail
+        with TestHook(raise_on_events={"open"}) as hook:
+            for fn, *args in [
+                (open, support.TESTFN, "r"),
+                (open, sys.executable, "rb"),
+                (open, 3, "wb"),
+                (open, support.TESTFN, "w", -1, None, None, None, False, lambda *a: 1),
+                (load_dh_params, support.TESTFN),
+            ]:
+                if not fn:
+                    continue
+                self.assertRaises(RuntimeError, fn, *args)
+
+        actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]]
+        actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]]
+        self.assertSequenceEqual(
+            [
+                i
+                for i in [
+                    (support.TESTFN, "r"),
+                    (sys.executable, "r"),
+                    (3, "w"),
+                    (support.TESTFN, "w"),
+                    (support.TESTFN, "rb") if load_dh_params else None,
+                ]
+                if i is not None
+            ],
+            actual_mode,
+        )
+        self.assertSequenceEqual([], actual_flag)
+
+    def test_cantrace(self):
+        traced = []
+
+        def trace(frame, event, *args):
+            if frame.f_code == TestHook.__call__.__code__:
+                traced.append(event)
+
+        old = sys.settrace(trace)
+        try:
+            with TestHook() as hook:
+                # No traced call
+                eval("1")
+
+                # No traced call
+                hook.__cantrace__ = False
+                eval("2")
+
+                # One traced call
+                hook.__cantrace__ = True
+                eval("3")
+
+                # Two traced calls (writing to private member, eval)
+                hook.__cantrace__ = 1
+                eval("4")
+
+                # One traced call (writing to private member)
+                hook.__cantrace__ = 0
+        finally:
+            sys.settrace(old)
+
+        self.assertSequenceEqual(["call"] * 4, traced)
+
+
+if __name__ == "__main__":
+    if len(sys.argv) >= 2 and sys.argv[1] == "spython_test":
+        # Doesn't matter what we add - it will be blocked
+        sys.addaudithook(None)
+
+        sys.exit(0)
+
+    unittest.main()
diff --git a/Lib/test/test_embed.py b/Lib/test/test_embed.py
index 32aabe3..87e90f7 100644
--- a/Lib/test/test_embed.py
+++ b/Lib/test/test_embed.py
@@ -927,5 +927,16 @@
                           api=API_PYTHON)
 
 
+class AuditingTests(EmbeddingTestsMixin, unittest.TestCase):
+    def test_open_code_hook(self):
+        self.run_embedded_interpreter("test_open_code_hook")
+
+    def test_audit(self):
+        self.run_embedded_interpreter("test_audit")
+
+    def test_audit_subinterpreter(self):
+        self.run_embedded_interpreter("test_audit_subinterpreter")
+
+
 if __name__ == "__main__":
     unittest.main()
diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py
index 57a0265..26e4500 100644
--- a/Lib/test/test_fileio.py
+++ b/Lib/test/test_fileio.py
@@ -565,6 +565,7 @@
         self.assertRaises(MyException, MyFileIO, fd)
         os.close(fd)  # should not raise OSError(EBADF)
 
+
 class COtherFileTests(OtherFileTests, unittest.TestCase):
     FileIO = _io.FileIO
     modulename = '_io'
@@ -576,10 +577,32 @@
         self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
         self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
 
+    def test_open_code(self):
+        # Check that the default behaviour of open_code matches
+        # open("rb")
+        with self.FileIO(__file__, "rb") as f:
+            expected = f.read()
+        with _io.open_code(__file__) as f:
+            actual = f.read()
+        self.assertEqual(expected, actual)
+
+
 class PyOtherFileTests(OtherFileTests, unittest.TestCase):
     FileIO = _pyio.FileIO
     modulename = '_pyio'
 
+    def test_open_code(self):
+        # Check that the default behaviour of open_code matches
+        # open("rb")
+        with self.FileIO(__file__, "rb") as f:
+            expected = f.read()
+        with check_warnings(quiet=True) as w:
+            # Always test _open_code_with_warning
+            with _pyio._open_code_with_warning(__file__) as f:
+                actual = f.read()
+            self.assertEqual(expected, actual)
+            self.assertNotEqual(w.warnings, [])
+
 
 def test_main():
     # Historically, these tests have been sloppy about removing TESTFN.
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 2c3bf89..6f22b35 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -3861,7 +3861,7 @@
         for name in self.io.__all__:
             obj = getattr(self.io, name, None)
             self.assertIsNotNone(obj, name)
-            if name == "open":
+            if name in ("open", "open_code"):
                 continue
             elif "error" in name.lower() or name == "UnsupportedOperation":
                 self.assertTrue(issubclass(obj, Exception), name)