Issue #25220: Create libregrtest/runtest_mp.py

Move the code to run tests in multiple processes using threading and subprocess
to a new submodule.

Move also slave_runner() (renamed to run_tests_slave()) and
run_test_in_subprocess() (renamed to run_tests_in_subprocess()) there.
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 63388c9..b66f045 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -1,5 +1,4 @@
 import faulthandler
-import json
 import os
 import platform
 import random
@@ -9,13 +8,10 @@
 import sysconfig
 import tempfile
 import textwrap
-import traceback
 import unittest
 from test.libregrtest.runtest import (
-    findtests, runtest, run_test_in_subprocess,
-    STDTESTS, NOTTESTS,
-    PASSED, FAILED, ENV_CHANGED, SKIPPED,
-    RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR)
+    findtests, runtest,
+    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED)
 from test.libregrtest.refleak import warm_caches
 from test.libregrtest.cmdline import _parse_args
 from test import support
@@ -39,23 +35,6 @@
 TEMPDIR = os.path.abspath(TEMPDIR)
 
 
-def slave_runner(slaveargs):
-    args, kwargs = json.loads(slaveargs)
-    if kwargs.get('huntrleaks'):
-        unittest.BaseTestSuite._cleanup = False
-    try:
-        result = runtest(*args, **kwargs)
-    except KeyboardInterrupt:
-        result = INTERRUPTED, ''
-    except BaseException as e:
-        traceback.print_exc()
-        result = CHILD_ERROR, str(e)
-    sys.stdout.flush()
-    print()   # Force a newline (just in case)
-    print(json.dumps(result))
-    sys.exit(0)
-
-
 def setup_python():
     # Display the Python traceback on fatal errors (e.g. segfault)
     faulthandler.enable(all_threads=True)
@@ -367,75 +346,6 @@
                     print(count(len(self.bad), 'test'), "failed again:")
                     printlist(self.bad)
 
-    def _run_tests_mp(self):
-        try:
-            from threading import Thread
-        except ImportError:
-            print("Multiprocess option requires thread support")
-            sys.exit(2)
-        from queue import Queue
-
-        debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
-        output = Queue()
-        pending = MultiprocessTests(self.tests)
-
-        def work():
-            # A worker thread.
-            try:
-                while True:
-                    try:
-                        test = next(pending)
-                    except StopIteration:
-                        output.put((None, None, None, None))
-                        return
-                    retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
-                    # Strip last refcount output line if it exists, since it
-                    # comes from the shutdown of the interpreter in the subcommand.
-                    stderr = debug_output_pat.sub("", stderr)
-                    stdout, _, result = stdout.strip().rpartition("\n")
-                    if retcode != 0:
-                        result = (CHILD_ERROR, "Exit code %s" % retcode)
-                        output.put((test, stdout.rstrip(), stderr.rstrip(), result))
-                        return
-                    if not result:
-                        output.put((None, None, None, None))
-                        return
-                    result = json.loads(result)
-                    output.put((test, stdout.rstrip(), stderr.rstrip(), result))
-            except BaseException:
-                output.put((None, None, None, None))
-                raise
-
-        workers = [Thread(target=work) for i in range(self.ns.use_mp)]
-        for worker in workers:
-            worker.start()
-        finished = 0
-        test_index = 1
-        try:
-            while finished < self.ns.use_mp:
-                test, stdout, stderr, result = output.get()
-                if test is None:
-                    finished += 1
-                    continue
-                self.accumulate_result(test, result)
-                self.display_progress(test_index, test)
-                if stdout:
-                    print(stdout)
-                if stderr:
-                    print(stderr, file=sys.stderr)
-                sys.stdout.flush()
-                sys.stderr.flush()
-                if result[0] == INTERRUPTED:
-                    raise KeyboardInterrupt
-                if result[0] == CHILD_ERROR:
-                    raise Exception("Child error on {}: {}".format(test, result[1]))
-                test_index += 1
-        except KeyboardInterrupt:
-            self.interrupted = True
-            pending.interrupted = True
-        for worker in workers:
-            worker.join()
-
     def _run_tests_sequential(self):
         save_modules = sys.modules.keys()
 
@@ -491,7 +401,8 @@
             self.test_count_width = len(self.test_count) - 1
 
         if self.ns.use_mp:
-            self._run_tests_mp()
+            from test.libregrtest.runtest_mp import run_tests_multiprocess
+            run_tests_multiprocess(self)
         else:
             self._run_tests_sequential()
 
@@ -518,7 +429,8 @@
         if self.ns.wait:
             input("Press any key to continue...")
         if self.ns.slaveargs is not None:
-            slave_runner(self.ns.slaveargs)
+            from test.libregrtest.runtest_mp import run_tests_slave
+            run_tests_slave(self.ns.slaveargs)
         self.find_tests(tests)
         self.run_tests()
         self.display_result()
@@ -526,26 +438,6 @@
         sys.exit(len(self.bad) > 0 or self.interrupted)
 
 
-# We do not use a generator so multiple threads can call next().
-class MultiprocessTests(object):
-
-    """A thread-safe iterator over tests for multiprocess mode."""
-
-    def __init__(self, tests):
-        self.interrupted = False
-        self.lock = threading.Lock()
-        self.tests = tests
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        with self.lock:
-            if self.interrupted:
-                raise StopIteration('tests interrupted')
-            return next(self.tests)
-
-
 def replace_stdout():
     """Set stdout encoder error handler to backslashreplace (as stderr error
     handler) to avoid UnicodeEncodeError when printing a traceback"""