Fix bpo-30596: Add close() method to multiprocessing.Process (#2010)

* Fix bpo-30596: Add close() method to multiprocessing.Process

* Raise ValueError if close() is called before the Process is finished running

* Add docs

* Add NEWS blurb
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index 6b4a8cb..5265639 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -598,6 +598,16 @@
          acquired a lock or semaphore etc. then terminating it is liable to
          cause other processes to deadlock.
 
+   .. method:: close()
+
+      Close the :class:`Process` object, releasing all resources associated
+      with it.  :exc:`ValueError` is raised if the underlying process
+      is still running.  Once :meth:`close` returns successfully, most
+      other methods and attributes of the :class:`Process` object will
+      raise :exc:`ValueError`.
+
+      .. versionadded:: 3.7
+
    Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`,
    :meth:`terminate` and :attr:`exitcode` methods should only be called by
    the process that created the process object.
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index 7010515..b9f9b9d 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -210,8 +210,12 @@
                             else:
                                 assert os.WIFEXITED(sts)
                                 returncode = os.WEXITSTATUS(sts)
-                            # Write the exit code to the pipe
-                            write_signed(child_w, returncode)
+                            # Send exit code to client process
+                            try:
+                                write_signed(child_w, returncode)
+                            except BrokenPipeError:
+                                # client vanished
+                                pass
                             os.close(child_w)
                         else:
                             # This shouldn't happen really
@@ -241,8 +245,12 @@
                             finally:
                                 os._exit(code)
                         else:
-                            # Send pid to client processes
-                            write_signed(child_w, pid)
+                            # Send pid to client process
+                            try:
+                                write_signed(child_w, pid)
+                            except BrokenPipeError:
+                                # client vanished
+                                pass
                             pid_to_fd[pid] = child_w
                             os.close(child_r)
                             for fd in fds:
diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py
index ca28bf3..5af9d91 100644
--- a/Lib/multiprocessing/popen_fork.py
+++ b/Lib/multiprocessing/popen_fork.py
@@ -17,6 +17,7 @@
         sys.stdout.flush()
         sys.stderr.flush()
         self.returncode = None
+        self.finalizer = None
         self._launch(process_obj)
 
     def duplicate_for_child(self, fd):
@@ -70,5 +71,9 @@
                 os._exit(code)
         else:
             os.close(child_w)
-            util.Finalize(self, os.close, (parent_r,))
+            self.finalizer = util.Finalize(self, os.close, (parent_r,))
             self.sentinel = parent_r
+
+    def close(self):
+        if self.finalizer is not None:
+            self.finalizer()
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
index fa8e574..a51a277 100644
--- a/Lib/multiprocessing/popen_forkserver.py
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -49,7 +49,7 @@
             set_spawning_popen(None)
 
         self.sentinel, w = forkserver.connect_to_new_process(self._fds)
-        util.Finalize(self, os.close, (self.sentinel,))
+        self.finalizer = util.Finalize(self, os.close, (self.sentinel,))
         with open(w, 'wb', closefd=True) as f:
             f.write(buf.getbuffer())
         self.pid = forkserver.read_signed(self.sentinel)
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
index 98f8f0a..3815106 100644
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -62,7 +62,7 @@
                 f.write(fp.getbuffer())
         finally:
             if parent_r is not None:
-                util.Finalize(self, os.close, (parent_r,))
+                self.finalizer = util.Finalize(self, os.close, (parent_r,))
             for fd in (child_r, child_w, parent_w):
                 if fd is not None:
                     os.close(fd)
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
index 6fd588f..ecb86e9 100644
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -56,7 +56,7 @@
             self.returncode = None
             self._handle = hp
             self.sentinel = int(hp)
-            util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+            self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
 
             # send information to child
             set_spawning_popen(self)
@@ -96,3 +96,6 @@
             except OSError:
                 if self.wait(timeout=1.0) is None:
                     raise
+
+    def close(self):
+        self.finalizer()
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 37365f2..70bb50d 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -76,6 +76,7 @@
         self._config = _current_process._config.copy()
         self._parent_pid = os.getpid()
         self._popen = None
+        self._closed = False
         self._target = target
         self._args = tuple(args)
         self._kwargs = dict(kwargs)
@@ -85,6 +86,10 @@
             self.daemon = daemon
         _dangling.add(self)
 
+    def _check_closed(self):
+        if self._closed:
+            raise ValueError("process object is closed")
+
     def run(self):
         '''
         Method to be run in sub-process; can be overridden in sub-class
@@ -96,6 +101,7 @@
         '''
         Start child process
         '''
+        self._check_closed()
         assert self._popen is None, 'cannot start a process twice'
         assert self._parent_pid == os.getpid(), \
                'can only start a process object created by current process'
@@ -110,12 +116,14 @@
         '''
         Terminate process; sends SIGTERM signal or uses TerminateProcess()
         '''
+        self._check_closed()
         self._popen.terminate()
 
     def join(self, timeout=None):
         '''
         Wait until child process terminates
         '''
+        self._check_closed()
         assert self._parent_pid == os.getpid(), 'can only join a child process'
         assert self._popen is not None, 'can only join a started process'
         res = self._popen.wait(timeout)
@@ -126,6 +134,7 @@
         '''
         Return whether process is alive
         '''
+        self._check_closed()
         if self is _current_process:
             return True
         assert self._parent_pid == os.getpid(), 'can only test a child process'
@@ -134,6 +143,23 @@
         self._popen.poll()
         return self._popen.returncode is None
 
+    def close(self):
+        '''
+        Close the Process object.
+
+        This method releases resources held by the Process object.  It is
+        an error to call this method if the child process is still running.
+        '''
+        if self._popen is not None:
+            if self._popen.poll() is None:
+                raise ValueError("Cannot close a process while it is still running. "
+                                 "You should first call join() or terminate().")
+            self._popen.close()
+            self._popen = None
+            del self._sentinel
+            _children.discard(self)
+        self._closed = True
+
     @property
     def name(self):
         return self._name
@@ -174,6 +200,7 @@
         '''
         Return exit code of process or `None` if it has yet to stop
         '''
+        self._check_closed()
         if self._popen is None:
             return self._popen
         return self._popen.poll()
@@ -183,6 +210,7 @@
         '''
         Return identifier (PID) of process or `None` if it has yet to start
         '''
+        self._check_closed()
         if self is _current_process:
             return os.getpid()
         else:
@@ -196,6 +224,7 @@
         Return a file descriptor (Unix) or handle (Windows) suitable for
         waiting for process termination.
         '''
+        self._check_closed()
         try:
             return self._sentinel
         except AttributeError:
@@ -204,6 +233,8 @@
     def __repr__(self):
         if self is _current_process:
             status = 'started'
+        elif self._closed:
+            status = 'closed'
         elif self._parent_pid != os.getpid():
             status = 'unknown'
         elif self._popen is None:
@@ -295,6 +326,7 @@
         self._name = 'MainProcess'
         self._parent_pid = None
         self._popen = None
+        self._closed = False
         self._config = {'authkey': AuthenticationString(os.urandom(32)),
                         'semprefix': '/mp'}
         # Note that some versions of FreeBSD only allow named
@@ -307,6 +339,9 @@
         # Everything in self._config will be inherited by descendant
         # processes.
 
+    def close(self):
+        pass
+
 
 _current_process = _MainProcess()
 _process_counter = itertools.count(1)
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 7148ea4..d4a461d 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -403,6 +403,42 @@
         p.join()
         self.assertTrue(wait_for_handle(sentinel, timeout=1))
 
+    @classmethod
+    def _test_close(cls, rc=0, q=None):
+        if q is not None:
+            q.get()
+        sys.exit(rc)
+
+    def test_close(self):
+        if self.TYPE == "threads":
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+        q = self.Queue()
+        p = self.Process(target=self._test_close, kwargs={'q': q})
+        p.daemon = True
+        p.start()
+        self.assertEqual(p.is_alive(), True)
+        # Child is still alive, cannot close
+        with self.assertRaises(ValueError):
+            p.close()
+
+        q.put(None)
+        p.join()
+        self.assertEqual(p.is_alive(), False)
+        self.assertEqual(p.exitcode, 0)
+        p.close()
+        with self.assertRaises(ValueError):
+            p.is_alive()
+        with self.assertRaises(ValueError):
+            p.join()
+        with self.assertRaises(ValueError):
+            p.terminate()
+        p.close()
+
+        wr = weakref.ref(p)
+        del p
+        gc.collect()
+        self.assertIs(wr(), None)
+
     def test_many_processes(self):
         if self.TYPE == 'threads':
             self.skipTest('test not appropriate for {}'.format(self.TYPE))
diff --git a/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst
new file mode 100644
index 0000000..6b9e9a1
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst
@@ -0,0 +1 @@
+Add a ``close()`` method to ``multiprocessing.Process``.