Issue #18808: Thread.join() now waits for the underlying thread state to be destroyed before returning.
This prevents unpredictable aborts in Py_EndInterpreter() when some non-daemon threads are still running.
diff --git a/Include/pystate.h b/Include/pystate.h
index e41fe4c..ddc6892 100644
--- a/Include/pystate.h
+++ b/Include/pystate.h
@@ -118,6 +118,32 @@
     int trash_delete_nesting;
     PyObject *trash_delete_later;
 
+    /* Called when a thread state is deleted normally, but not when it
+     * is destroyed after fork().
+     * Pain:  to prevent rare but fatal shutdown errors (issue 18808),
+     * Thread.join() must wait for the join'ed thread's tstate to be unlinked
+     * from the tstate chain.  That happens at the end of a thread's life,
+     * in pystate.c.
+     * The obvious way doesn't quite work:  create a lock which the tstate
+     * unlinking code releases, and have Thread.join() wait to acquire that
+     * lock.  The problem is that we _are_ at the end of the thread's life:
+     * if the thread holds the last reference to the lock, decref'ing the
+     * lock will delete the lock, and that may trigger arbitrary Python code
+     * if there's a weakref, with a callback, to the lock.  But by this time
+     * _PyThreadState_Current is already NULL, so only the simplest of C code
+     * can be allowed to run (in particular it must not be possible to
+     * release the GIL).
+     * So instead of holding the lock directly, the tstate holds a weakref to
+     * the lock:  that's the value of on_delete_data below.  Decref'ing a
+     * weakref is harmless.
+     * on_delete points to _threadmodule.c's static release_sentinel() function.
+     * After the tstate is unlinked, release_sentinel is called with the
+     * weakref-to-lock (on_delete_data) argument, and release_sentinel releases
+     * the indirectly held lock.
+     */
+    void (*on_delete)(void *);
+    void *on_delete_data;
+
     /* XXX signal handlers should also be here */
 
 } PyThreadState;
diff --git a/Lib/_dummy_thread.py b/Lib/_dummy_thread.py
index 13b1f26..b67cfb9 100644
--- a/Lib/_dummy_thread.py
+++ b/Lib/_dummy_thread.py
@@ -81,6 +81,10 @@
         raise error("setting thread stack size not supported")
     return 0
 
+def _set_sentinel():
+    """Dummy implementation of _thread._set_sentinel()."""
+    return LockType()
+
 class LockType(object):
     """Class implementing dummy implementation of _thread.LockType.
 
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 971a635..79b10ed 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -539,6 +539,40 @@
         self.assertEqual(err, b"")
         self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
 
+    def test_tstate_lock(self):
+        # Test an implementation detail of Thread objects.
+        started = _thread.allocate_lock()
+        finish = _thread.allocate_lock()
+        started.acquire()
+        finish.acquire()
+        def f():
+            started.release()
+            finish.acquire()
+            time.sleep(0.01)
+        # The tstate lock is None until the thread is started
+        t = threading.Thread(target=f)
+        self.assertIs(t._tstate_lock, None)
+        t.start()
+        started.acquire()
+        self.assertTrue(t.is_alive())
+        # The tstate lock can't be acquired when the thread is running
+        # (or suspended).
+        tstate_lock = t._tstate_lock
+        self.assertFalse(tstate_lock.acquire(timeout=0), False)
+        finish.release()
+        # When the thread ends, the state_lock can be successfully
+        # acquired.
+        self.assertTrue(tstate_lock.acquire(timeout=5), False)
+        # But is_alive() is still True:  we hold _tstate_lock now, which
+        # prevents is_alive() from knowing the thread's end-of-life C code
+        # is done.
+        self.assertTrue(t.is_alive())
+        # Let is_alive() find out the C code is done.
+        tstate_lock.release()
+        self.assertFalse(t.is_alive())
+        # And verify the thread disposed of _tstate_lock.
+        self.assertTrue(t._tstate_lock is None)
+
 
 class ThreadJoinOnShutdown(BaseTestCase):
 
@@ -669,7 +703,7 @@
             # someone else tries to fix this test case by acquiring this lock
             # before forking instead of resetting it, the test case will
             # deadlock when it shouldn't.
-            condition = w._block
+            condition = w._stopped._cond
             orig_acquire = condition.acquire
             call_count_lock = threading.Lock()
             call_count = 0
@@ -733,7 +767,7 @@
             # causes the worker to fork.  At this point, the problematic waiter
             # lock has been acquired once by the waiter and has been put onto
             # the waiters list.
-            condition = w._block
+            condition = w._stopped._cond
             orig_release_save = condition._release_save
             def my_release_save():
                 global start_fork
@@ -867,6 +901,38 @@
         # The thread was joined properly.
         self.assertEqual(os.read(r, 1), b"x")
 
+    def test_threads_join_2(self):
+        # Same as above, but a delay gets introduced after the thread's
+        # Python code returned but before the thread state is deleted.
+        # To achieve this, we register a thread-local object which sleeps
+        # a bit when deallocated.
+        r, w = os.pipe()
+        self.addCleanup(os.close, r)
+        self.addCleanup(os.close, w)
+        code = r"""if 1:
+            import os
+            import threading
+            import time
+
+            class Sleeper:
+                def __del__(self):
+                    time.sleep(0.05)
+
+            tls = threading.local()
+
+            def f():
+                # Sleep a bit so that the thread is still running when
+                # Py_EndInterpreter is called.
+                time.sleep(0.05)
+                tls.x = Sleeper()
+                os.write(%d, b"x")
+            threading.Thread(target=f).start()
+            """ % (w,)
+        ret = _testcapi.run_in_subinterp(code)
+        self.assertEqual(ret, 0)
+        # The thread was joined properly.
+        self.assertEqual(os.read(r, 1), b"x")
+
     def test_daemon_threads_fatal_error(self):
         subinterp_code = r"""if 1:
             import os
diff --git a/Lib/threading.py b/Lib/threading.py
index b6d19d5..d49be0b 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -33,6 +33,7 @@
 # Rename some stuff so "from threading import *" is safe
 _start_new_thread = _thread.start_new_thread
 _allocate_lock = _thread.allocate_lock
+_set_sentinel = _thread._set_sentinel
 get_ident = _thread.get_ident
 ThreadError = _thread.error
 try:
@@ -548,28 +549,33 @@
         else:
             self._daemonic = current_thread().daemon
         self._ident = None
+        self._tstate_lock = None
         self._started = Event()
-        self._stopped = False
-        self._block = Condition(Lock())
+        self._stopped = Event()
         self._initialized = True
         # sys.stderr is not stored in the class like
         # sys.exc_info since it can be changed between instances
         self._stderr = _sys.stderr
         _dangling.add(self)
 
-    def _reset_internal_locks(self):
+    def _reset_internal_locks(self, is_alive):
         # private!  Called by _after_fork() to reset our internal locks as
         # they may be in an invalid state leading to a deadlock or crash.
-        if hasattr(self, '_block'):  # DummyThread deletes _block
-            self._block.__init__()
         self._started._reset_internal_locks()
+        self._stopped._reset_internal_locks()
+        if is_alive:
+            self._set_tstate_lock()
+        else:
+            # The thread isn't alive after fork: it doesn't have a tstate
+            # anymore.
+            self._tstate_lock = None
 
     def __repr__(self):
         assert self._initialized, "Thread.__init__() was not called"
         status = "initial"
         if self._started.is_set():
             status = "started"
-        if self._stopped:
+        if self._stopped.is_set():
             status = "stopped"
         if self._daemonic:
             status += " daemon"
@@ -625,9 +631,18 @@
     def _set_ident(self):
         self._ident = get_ident()
 
+    def _set_tstate_lock(self):
+        """
+        Set a lock object which will be released by the interpreter when
+        the underlying thread state (see pystate.h) gets deleted.
+        """
+        self._tstate_lock = _set_sentinel()
+        self._tstate_lock.acquire()
+
     def _bootstrap_inner(self):
         try:
             self._set_ident()
+            self._set_tstate_lock()
             self._started.set()
             with _active_limbo_lock:
                 _active[self._ident] = self
@@ -691,10 +706,7 @@
                     pass
 
     def _stop(self):
-        self._block.acquire()
-        self._stopped = True
-        self._block.notify_all()
-        self._block.release()
+        self._stopped.set()
 
     def _delete(self):
         "Remove current thread from the dict of currently running threads."
@@ -738,21 +750,29 @@
             raise RuntimeError("cannot join thread before it is started")
         if self is current_thread():
             raise RuntimeError("cannot join current thread")
+        if not self.is_alive():
+            return
+        self._stopped.wait(timeout)
+        if self._stopped.is_set():
+            self._wait_for_tstate_lock(timeout is None)
 
-        self._block.acquire()
-        try:
-            if timeout is None:
-                while not self._stopped:
-                    self._block.wait()
-            else:
-                deadline = _time() + timeout
-                while not self._stopped:
-                    delay = deadline - _time()
-                    if delay <= 0:
-                        break
-                    self._block.wait(delay)
-        finally:
-            self._block.release()
+    def _wait_for_tstate_lock(self, block):
+        # Issue #18808: wait for the thread state to be gone.
+        # When self._stopped is set, the Python part of the thread is done,
+        # but the thread's tstate has not yet been destroyed.  The C code
+        # releases self._tstate_lock when the C part of the thread is done
+        # (the code at the end of the thread's life to remove all knowledge
+        # of the thread from the C data structures).
+        # This method waits to acquire _tstate_lock if `block` is True, or
+        # sees whether it can be acquired immediately if `block` is False.
+        # If it does acquire the lock, the C code is done, and _tstate_lock
+        # is set to None.
+        lock = self._tstate_lock
+        if lock is None:
+            return  # already determined that the C code is done
+        if lock.acquire(block):
+            lock.release()
+            self._tstate_lock = None
 
     @property
     def name(self):
@@ -771,7 +791,14 @@
 
     def is_alive(self):
         assert self._initialized, "Thread.__init__() not called"
-        return self._started.is_set() and not self._stopped
+        if not self._started.is_set():
+            return False
+        if not self._stopped.is_set():
+            return True
+        # The Python part of the thread is done, but the C part may still be
+        # waiting to run.
+        self._wait_for_tstate_lock(False)
+        return self._tstate_lock is not None
 
     isAlive = is_alive
 
@@ -854,11 +881,6 @@
     def __init__(self):
         Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
 
-        # Thread._block consumes an OS-level locking primitive, which
-        # can never be used by a _DummyThread.  Since a _DummyThread
-        # instance is immortal, that's bad, so release this resource.
-        del self._block
-
         self._started.set()
         self._set_ident()
         with _active_limbo_lock:
@@ -952,15 +974,16 @@
         for thread in _enumerate():
             # Any lock/condition variable may be currently locked or in an
             # invalid state, so we reinitialize them.
-            thread._reset_internal_locks()
             if thread is current:
                 # There is only one active thread. We reset the ident to
                 # its new value since it can have changed.
+                thread._reset_internal_locks(True)
                 ident = get_ident()
                 thread._ident = ident
                 new_active[ident] = thread
             else:
                 # All the others are already stopped.
+                thread._reset_internal_locks(False)
                 thread._stop()
 
         _limbo.clear()
diff --git a/Misc/NEWS b/Misc/NEWS
index 757ac89..d375a3a 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -56,6 +56,10 @@
 Library
 -------
 
+- Issue #18808: Thread.join() now waits for the underlying thread state to
+  be destroyed before returning.  This prevents unpredictable aborts in
+  Py_EndInterpreter() when some non-daemon threads are still running.
+
 - Issue #18458: Prevent crashes with newer versions of libedit.  Its readline
   emulation has changed from 0-based indexing to 1-based like gnu readline.
 
diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c
index cbb2901..d83d117 100644
--- a/Modules/_threadmodule.c
+++ b/Modules/_threadmodule.c
@@ -1172,6 +1172,66 @@
 This function is meant for internal and specialized purposes only.\n\
 In most applications `threading.enumerate()` should be used instead.");
 
+static void
+release_sentinel(void *wr)
+{
+    /* Tricky: this function is called when the current thread state
+       is being deleted.  Therefore, only simple C code can safely
+       execute here. */
+    PyObject *obj = PyWeakref_GET_OBJECT(wr);
+    lockobject *lock;
+    if (obj != Py_None) {
+        assert(Py_TYPE(obj) == &Locktype);
+        lock = (lockobject *) obj;
+        if (lock->locked) {
+            PyThread_release_lock(lock->lock_lock);
+            lock->locked = 0;
+        }
+    }
+    /* Deallocating a weakref with a NULL callback only calls
+       PyObject_GC_Del(), which can't call any Python code. */
+    Py_DECREF(wr);
+}
+
+static PyObject *
+thread__set_sentinel(PyObject *self)
+{
+    PyObject *wr;
+    PyThreadState *tstate = PyThreadState_Get();
+    lockobject *lock;
+
+    if (tstate->on_delete_data != NULL) {
+        /* We must support the re-creation of the lock from a
+           fork()ed child. */
+        assert(tstate->on_delete == &release_sentinel);
+        wr = (PyObject *) tstate->on_delete_data;
+        tstate->on_delete = NULL;
+        tstate->on_delete_data = NULL;
+        Py_DECREF(wr);
+    }
+    lock = newlockobject();
+    if (lock == NULL)
+        return NULL;
+    /* The lock is owned by whoever called _set_sentinel(), but the weakref
+       hangs to the thread state. */
+    wr = PyWeakref_NewRef((PyObject *) lock, NULL);
+    if (wr == NULL) {
+        Py_DECREF(lock);
+        return NULL;
+    }
+    tstate->on_delete_data = (void *) wr;
+    tstate->on_delete = &release_sentinel;
+    return (PyObject *) lock;
+}
+
+PyDoc_STRVAR(_set_sentinel_doc,
+"_set_sentinel() -> lock\n\
+\n\
+Set a sentinel lock that will be released when the current thread\n\
+state is finalized (after it is untied from the interpreter).\n\
+\n\
+This is a private API for the threading module.");
+
 static PyObject *
 thread_stack_size(PyObject *self, PyObject *args)
 {
@@ -1247,6 +1307,8 @@
      METH_NOARGS, _count_doc},
     {"stack_size",              (PyCFunction)thread_stack_size,
      METH_VARARGS, stack_size_doc},
+    {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
+     METH_NOARGS, _set_sentinel_doc},
     {NULL,                      NULL}           /* sentinel */
 };
 
diff --git a/Python/pystate.c b/Python/pystate.c
index 924b6a2..ecd00ce 100644
--- a/Python/pystate.c
+++ b/Python/pystate.c
@@ -208,6 +208,8 @@
 
         tstate->trash_delete_nesting = 0;
         tstate->trash_delete_later = NULL;
+        tstate->on_delete = NULL;
+        tstate->on_delete_data = NULL;
 
         if (init)
             _PyThreadState_Init(tstate);
@@ -390,6 +392,9 @@
     if (tstate->next)
         tstate->next->prev = tstate->prev;
     HEAD_UNLOCK();
+    if (tstate->on_delete != NULL) {
+        tstate->on_delete(tstate->on_delete_data);
+    }
     PyMem_RawFree(tstate);
 }