bpo-36719: Fix regrtest MultiprocessThread (GH-13301)

MultiprocessThread.kill() now closes stdout and stderr to prevent
popen.communicate() to hang.
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index ced7f86..4217847 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -21,6 +21,9 @@
 # Display the running tests if nothing happened last N seconds
 PROGRESS_UPDATE = 30.0   # seconds
 
+# Time to wait until a worker completes: should be immediate
+JOIN_TIMEOUT = 30.0   # seconds
+
 
 def must_stop(result, ns):
     if result.result == INTERRUPTED:
@@ -91,6 +94,10 @@
 MultiprocessResult = collections.namedtuple('MultiprocessResult',
     'result stdout stderr error_msg')
 
+class ExitThread(Exception):
+    pass
+
+
 class MultiprocessThread(threading.Thread):
     def __init__(self, pending, output, ns):
         super().__init__()
@@ -100,13 +107,31 @@
         self.current_test_name = None
         self.start_time = None
         self._popen = None
+        self._killed = False
+
+    def __repr__(self):
+        info = ['MultiprocessThread']
+        test = self.current_test_name
+        if self.is_alive():
+            info.append('alive')
+        if test:
+            info.append(f'test={test}')
+        popen = self._popen
+        if popen:
+            info.append(f'pid={popen.pid}')
+        return '<%s>' % ' '.join(info)
 
     def kill(self):
+        self._killed = True
+
         popen = self._popen
         if popen is None:
             return
-        print("Kill regrtest worker process %s" % popen.pid)
         popen.kill()
+        # stdout and stderr must be closed to ensure that communicate()
+        # does not hang
+        popen.stdout.close()
+        popen.stderr.close()
 
     def _runtest(self, test_name):
         try:
@@ -117,7 +142,21 @@
             popen = self._popen
             with popen:
                 try:
-                    stdout, stderr = popen.communicate()
+                    if self._killed:
+                        # If kill() has been called before self._popen is set,
+                        # self._popen is still running. Call again kill()
+                        # to ensure that the process is killed.
+                        self.kill()
+                        raise ExitThread
+
+                    try:
+                        stdout, stderr = popen.communicate()
+                    except OSError:
+                        if self._killed:
+                            # kill() has been called: communicate() fails
+                            # on reading closed stdout/stderr
+                            raise ExitThread
+                        raise
                 except:
                     self.kill()
                     popen.wait()
@@ -154,7 +193,7 @@
         return MultiprocessResult(result, stdout, stderr, err_msg)
 
     def run(self):
-        while True:
+        while not self._killed:
             try:
                 try:
                     test_name = next(self.pending)
@@ -166,6 +205,8 @@
 
                 if must_stop(mp_result.result, self.ns):
                     break
+            except ExitThread:
+                break
             except BaseException:
                 self.output.put((True, traceback.format_exc()))
                 break
@@ -205,10 +246,20 @@
             worker.start()
 
     def wait_workers(self):
+        start_time = time.monotonic()
         for worker in self.workers:
             worker.kill()
         for worker in self.workers:
-            worker.join()
+            while True:
+                worker.join(1.0)
+                if not worker.is_alive():
+                    break
+                dt = time.monotonic() - start_time
+                print("Wait for regrtest worker %r for %.1f sec" % (worker, dt))
+                if dt > JOIN_TIMEOUT:
+                    print("Warning -- failed to join a regrtest worker %s"
+                          % worker)
+                    break
 
     def _get_result(self):
         if not any(worker.is_alive() for worker in self.workers):