bpo-40089: Add _at_fork_reinit() method to locks (GH-19195)

Add a private _at_fork_reinit() method to _thread.Lock,
_thread.RLock, threading.RLock and threading.Condition classes:
reinitialize the lock after fork in the child process; reset the lock
to the unlocked state.

Rename also the private _reset_internal_locks() method of
threading.Event to _at_fork_reinit().

* Add _PyThread_at_fork_reinit() private function. It is excluded
  from the limited C API.
* threading.Thread._reset_internal_locks() now calls
  _at_fork_reinit() on self._tstate_lock rather than creating a new
  Python lock object.
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index cd1155d..b397525 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -2,6 +2,7 @@
 Various tests for synchronization primitives.
 """
 
+import os
 import sys
 import time
 from _thread import start_new_thread, TIMEOUT_MAX
@@ -12,6 +13,11 @@
 from test import support
 
 
+requires_fork = unittest.skipUnless(hasattr(os, 'fork'),
+                                    "platform doesn't support fork "
+                                     "(no _at_fork_reinit method)")
+
+
 def _wait():
     # A crude wait/yield function not relying on synchronization primitives.
     time.sleep(0.01)
@@ -265,6 +271,25 @@
         self.assertFalse(lock.locked())
         self.assertTrue(lock.acquire(blocking=False))
 
+    @requires_fork
+    def test_at_fork_reinit(self):
+        def use_lock(lock):
+            # make sure that the lock still works normally
+            # after _at_fork_reinit()
+            lock.acquire()
+            lock.release()
+
+        # unlocked
+        lock = self.locktype()
+        lock._at_fork_reinit()
+        use_lock(lock)
+
+        # locked: _at_fork_reinit() resets the lock to the unlocked state
+        lock2 = self.locktype()
+        lock2.acquire()
+        lock2._at_fork_reinit()
+        use_lock(lock2)
+
 
 class RLockTests(BaseLockTests):
     """
@@ -417,12 +442,13 @@
         b.wait_for_finished()
         self.assertEqual(results, [True] * N)
 
-    def test_reset_internal_locks(self):
+    @requires_fork
+    def test_at_fork_reinit(self):
         # ensure that condition is still using a Lock after reset
         evt = self.eventtype()
         with evt._cond:
             self.assertFalse(evt._cond.acquire(False))
-        evt._reset_internal_locks()
+        evt._at_fork_reinit()
         with evt._cond:
             self.assertFalse(evt._cond.acquire(False))
 
diff --git a/Lib/threading.py b/Lib/threading.py
index 6b25e7a..5424db3 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -123,6 +123,11 @@
             hex(id(self))
         )
 
+    def _at_fork_reinit(self):
+        self._block._at_fork_reinit()
+        self._owner = None
+        self._count = 0
+
     def acquire(self, blocking=True, timeout=-1):
         """Acquire a lock, blocking or non-blocking.
 
@@ -245,6 +250,10 @@
             pass
         self._waiters = _deque()
 
+    def _at_fork_reinit(self):
+        self._lock._at_fork_reinit()
+        self._waiters.clear()
+
     def __enter__(self):
         return self._lock.__enter__()
 
@@ -514,9 +523,9 @@
         self._cond = Condition(Lock())
         self._flag = False
 
-    def _reset_internal_locks(self):
-        # private!  called by Thread._reset_internal_locks by _after_fork()
-        self._cond.__init__(Lock())
+    def _at_fork_reinit(self):
+        # Private method called by Thread._reset_internal_locks()
+        self._cond._at_fork_reinit()
 
     def is_set(self):
         """Return true if and only if the internal flag is true."""
@@ -816,9 +825,10 @@
     def _reset_internal_locks(self, is_alive):
         # private!  Called by _after_fork() to reset our internal locks as
         # they may be in an invalid state leading to a deadlock or crash.
-        self._started._reset_internal_locks()
+        self._started._at_fork_reinit()
         if is_alive:
-            self._set_tstate_lock()
+            self._tstate_lock._at_fork_reinit()
+            self._tstate_lock.acquire()
         else:
             # The thread isn't alive after fork: it doesn't have a tstate
             # anymore.