bpo-32604: Swap threads only if the interpreter is different. (gh-5783)

The CPython runtime assumes that there is a one-to-one relationship (for a given interpreter) between PyThreadState and OS threads. Sending and receiving on a channel in the same interpreter was causing crashes because of this (specifically due to a check in PyThreadState_Swap()). The solution is to not switch threads if the interpreter is the same.
(cherry picked from commit f53d9f2778a87bdd48eb9030f782a4ebf9e7622f)

Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index d280270..397d359 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -3,6 +3,7 @@
 import pickle
 from textwrap import dedent, indent
 import threading
+import time
 import unittest
 
 from test import support
@@ -1147,6 +1148,54 @@
 
         self.assertEqual(obj, b'spam')
 
+    def test_send_recv_different_threads(self):
+        cid = interpreters.channel_create()
+
+        def f():
+            while True:
+                try:
+                    obj = interpreters.channel_recv(cid)
+                    break
+                except interpreters.ChannelEmptyError:
+                    time.sleep(0.1)
+            interpreters.channel_send(cid, obj)
+        t = threading.Thread(target=f)
+        t.start()
+
+        interpreters.channel_send(cid, b'spam')
+        t.join()
+        obj = interpreters.channel_recv(cid)
+
+        self.assertEqual(obj, b'spam')
+
+    def test_send_recv_different_interpreters_and_threads(self):
+        cid = interpreters.channel_create()
+        id1 = interpreters.create()
+        out = None
+
+        def f():
+            nonlocal out
+            out = _run_output(id1, dedent(f"""
+                import time
+                import _xxsubinterpreters as _interpreters
+                while True:
+                    try:
+                        obj = _interpreters.channel_recv({int(cid)})
+                        break
+                    except _interpreters.ChannelEmptyError:
+                        time.sleep(0.1)
+                assert(obj == b'spam')
+                _interpreters.channel_send({int(cid)}, b'eggs')
+                """))
+        t = threading.Thread(target=f)
+        t.start()
+
+        interpreters.channel_send(cid, b'spam')
+        t.join()
+        obj = interpreters.channel_recv(cid)
+
+        self.assertEqual(obj, b'eggs')
+
     def test_send_not_found(self):
         with self.assertRaises(interpreters.ChannelNotFoundError):
             interpreters.channel_send(10, b'spam')