Issue #8713: Support alternative start methods in multiprocessing on Unix.

See http://hg.python.org/sandbox/sbt#spawn
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index b42613f..fd75839 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -21,6 +21,8 @@
     'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
     'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
     'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+    'set_executable', 'set_start_method', 'get_start_method',
+    'get_all_start_methods', 'set_forkserver_preload'
     ]
 
 #
@@ -30,8 +32,14 @@
 import os
 import sys
 
-from multiprocessing.process import Process, current_process, active_children
-from multiprocessing.util import SUBDEBUG, SUBWARNING
+from .process import Process, current_process, active_children
+
+#
+# XXX These should not really be documented or public.
+#
+
+SUBDEBUG = 5
+SUBWARNING = 25
 
 #
 # Alias for main module -- will be reset by bootstrapping child processes
@@ -56,8 +64,6 @@
 class AuthenticationError(ProcessError):
     pass
 
-import _multiprocessing
-
 #
 # Definitions not depending on native semaphores
 #
@@ -69,7 +75,7 @@
     The managers methods such as `Lock()`, `Condition()` and `Queue()`
     can be used to create shared objects.
     '''
-    from multiprocessing.managers import SyncManager
+    from .managers import SyncManager
     m = SyncManager()
     m.start()
     return m
@@ -78,7 +84,7 @@
     '''
     Returns two connection object connected by a pipe
     '''
-    from multiprocessing.connection import Pipe
+    from .connection import Pipe
     return Pipe(duplex)
 
 def cpu_count():
@@ -97,21 +103,21 @@
     If so then run code specified by commandline and exit.
     '''
     if sys.platform == 'win32' and getattr(sys, 'frozen', False):
-        from multiprocessing.forking import freeze_support
+        from .spawn import freeze_support
         freeze_support()
 
 def get_logger():
     '''
     Return package logger -- if it does not already exist then it is created
     '''
-    from multiprocessing.util import get_logger
+    from .util import get_logger
     return get_logger()
 
 def log_to_stderr(level=None):
     '''
     Turn on logging and add a handler which prints to stderr
     '''
-    from multiprocessing.util import log_to_stderr
+    from .util import log_to_stderr
     return log_to_stderr(level)
 
 def allow_connection_pickling():
@@ -120,7 +126,7 @@
     '''
     # This is undocumented.  In previous versions of multiprocessing
     # its only effect was to make socket objects inheritable on Windows.
-    import multiprocessing.connection
+    from . import connection
 
 #
 # Definitions depending on native semaphores
@@ -130,120 +136,151 @@
     '''
     Returns a non-recursive lock object
     '''
-    from multiprocessing.synchronize import Lock
+    from .synchronize import Lock
     return Lock()
 
 def RLock():
     '''
     Returns a recursive lock object
     '''
-    from multiprocessing.synchronize import RLock
+    from .synchronize import RLock
     return RLock()
 
 def Condition(lock=None):
     '''
     Returns a condition object
     '''
-    from multiprocessing.synchronize import Condition
+    from .synchronize import Condition
     return Condition(lock)
 
 def Semaphore(value=1):
     '''
     Returns a semaphore object
     '''
-    from multiprocessing.synchronize import Semaphore
+    from .synchronize import Semaphore
     return Semaphore(value)
 
 def BoundedSemaphore(value=1):
     '''
     Returns a bounded semaphore object
     '''
-    from multiprocessing.synchronize import BoundedSemaphore
+    from .synchronize import BoundedSemaphore
     return BoundedSemaphore(value)
 
 def Event():
     '''
     Returns an event object
     '''
-    from multiprocessing.synchronize import Event
+    from .synchronize import Event
     return Event()
 
 def Barrier(parties, action=None, timeout=None):
     '''
     Returns a barrier object
     '''
-    from multiprocessing.synchronize import Barrier
+    from .synchronize import Barrier
     return Barrier(parties, action, timeout)
 
 def Queue(maxsize=0):
     '''
     Returns a queue object
     '''
-    from multiprocessing.queues import Queue
+    from .queues import Queue
     return Queue(maxsize)
 
 def JoinableQueue(maxsize=0):
     '''
     Returns a queue object
     '''
-    from multiprocessing.queues import JoinableQueue
+    from .queues import JoinableQueue
     return JoinableQueue(maxsize)
 
 def SimpleQueue():
     '''
     Returns a queue object
     '''
-    from multiprocessing.queues import SimpleQueue
+    from .queues import SimpleQueue
     return SimpleQueue()
 
 def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
     '''
     Returns a process pool object
     '''
-    from multiprocessing.pool import Pool
+    from .pool import Pool
     return Pool(processes, initializer, initargs, maxtasksperchild)
 
 def RawValue(typecode_or_type, *args):
     '''
     Returns a shared object
     '''
-    from multiprocessing.sharedctypes import RawValue
+    from .sharedctypes import RawValue
     return RawValue(typecode_or_type, *args)
 
 def RawArray(typecode_or_type, size_or_initializer):
     '''
     Returns a shared array
     '''
-    from multiprocessing.sharedctypes import RawArray
+    from .sharedctypes import RawArray
     return RawArray(typecode_or_type, size_or_initializer)
 
 def Value(typecode_or_type, *args, lock=True):
     '''
     Returns a synchronized shared object
     '''
-    from multiprocessing.sharedctypes import Value
+    from .sharedctypes import Value
     return Value(typecode_or_type, *args, lock=lock)
 
 def Array(typecode_or_type, size_or_initializer, *, lock=True):
     '''
     Returns a synchronized shared array
     '''
-    from multiprocessing.sharedctypes import Array
+    from .sharedctypes import Array
     return Array(typecode_or_type, size_or_initializer, lock=lock)
 
 #
 #
 #
 
-if sys.platform == 'win32':
+def set_executable(executable):
+    '''
+    Sets the path to a python.exe or pythonw.exe binary used to run
+    child processes instead of sys.executable when using the 'spawn'
+    start method.  Useful for people embedding Python.
+    '''
+    from .spawn import set_executable
+    set_executable(executable)
 
-    def set_executable(executable):
-        '''
-        Sets the path to a python.exe or pythonw.exe binary used to run
-        child processes on Windows instead of sys.executable.
-        Useful for people embedding Python.
-        '''
-        from multiprocessing.forking import set_executable
-        set_executable(executable)
+def set_start_method(method):
+    '''
+    Set method for starting processes: 'fork', 'spawn' or 'forkserver'.
+    '''
+    from .popen import set_start_method
+    set_start_method(method)
 
-    __all__ += ['set_executable']
+def get_start_method():
+    '''
+    Get method for starting processes: 'fork', 'spawn' or 'forkserver'.
+    '''
+    from .popen import get_start_method
+    return get_start_method()
+
+def get_all_start_methods():
+    '''
+    Get list of availables start methods, default first.
+    '''
+    from .popen import get_all_start_methods
+    return get_all_start_methods()
+
+def set_forkserver_preload(module_names):
+    '''
+    Set list of module names to try to load in the forkserver process
+    when it is started.  Properly chosen this can significantly reduce
+    the cost of starting a new process using the forkserver method.
+    The default list is ['__main__'].
+    '''
+    try:
+        from .forkserver import set_forkserver_preload
+    except ImportError:
+        pass
+    else:
+        set_forkserver_preload(module_names)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 1093d9f..443fa7e 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -21,9 +21,13 @@
 import itertools
 
 import _multiprocessing
-from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import ForkingPickler
+
+from . import reduction
+from . import util
+
+from . import AuthenticationError, BufferTooShort
+from .reduction import ForkingPickler
+
 try:
     import _winapi
     from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
@@ -71,7 +75,7 @@
     if family == 'AF_INET':
         return ('localhost', 0)
     elif family == 'AF_UNIX':
-        return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
+        return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
     elif family == 'AF_PIPE':
         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
                                (os.getpid(), next(_mmap_counter)))
@@ -505,7 +509,7 @@
             c1 = Connection(s1.detach())
             c2 = Connection(s2.detach())
         else:
-            fd1, fd2 = os.pipe()
+            fd1, fd2 = util.pipe()
             c1 = Connection(fd1, writable=False)
             c2 = Connection(fd2, readable=False)
 
@@ -577,7 +581,7 @@
         self._last_accepted = None
 
         if family == 'AF_UNIX':
-            self._unlink = Finalize(
+            self._unlink = util.Finalize(
                 self, os.unlink, args=(address,), exitpriority=0
                 )
         else:
@@ -625,8 +629,8 @@
             self._handle_queue = [self._new_handle(first=True)]
 
             self._last_accepted = None
-            sub_debug('listener created with address=%r', self._address)
-            self.close = Finalize(
+            util.sub_debug('listener created with address=%r', self._address)
+            self.close = util.Finalize(
                 self, PipeListener._finalize_pipe_listener,
                 args=(self._handle_queue, self._address), exitpriority=0
                 )
@@ -668,7 +672,7 @@
 
         @staticmethod
         def _finalize_pipe_listener(queue, address):
-            sub_debug('closing listener with address=%r', address)
+            util.sub_debug('closing listener with address=%r', address)
             for handle in queue:
                 _winapi.CloseHandle(handle)
 
@@ -919,15 +923,32 @@
 #
 
 if sys.platform == 'win32':
-    from . import reduction
-    ForkingPickler.register(socket.socket, reduction.reduce_socket)
-    ForkingPickler.register(Connection, reduction.reduce_connection)
-    ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+    def reduce_connection(conn):
+        handle = conn.fileno()
+        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+            from . import resource_sharer
+            ds = resource_sharer.DupSocket(s)
+            return rebuild_connection, (ds, conn.readable, conn.writable)
+    def rebuild_connection(ds, readable, writable):
+        sock = ds.detach()
+        return Connection(sock.detach(), readable, writable)
+    reduction.register(Connection, reduce_connection)
+
+    def reduce_pipe_connection(conn):
+        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+        dh = reduction.DupHandle(conn.fileno(), access)
+        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
+    def rebuild_pipe_connection(dh, readable, writable):
+        handle = dh.detach()
+        return PipeConnection(handle, readable, writable)
+    reduction.register(PipeConnection, reduce_pipe_connection)
+
 else:
-    try:
-        from . import reduction
-    except ImportError:
-        pass
-    else:
-        ForkingPickler.register(socket.socket, reduction.reduce_socket)
-        ForkingPickler.register(Connection, reduction.reduce_connection)
+    def reduce_connection(conn):
+        df = reduction.DupFd(conn.fileno())
+        return rebuild_connection, (df, conn.readable, conn.writable)
+    def rebuild_connection(df, readable, writable):
+        fd = df.detach()
+        return Connection(fd, readable, writable)
+    reduction.register(Connection, reduce_connection)
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index 20ae957..97f7af7 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -22,7 +22,7 @@
 import weakref
 import array
 
-from multiprocessing.dummy.connection import Pipe
+from .connection import Pipe
 from threading import Lock, RLock, Semaphore, BoundedSemaphore
 from threading import Event, Condition, Barrier
 from queue import Queue
@@ -113,7 +113,7 @@
     pass
 
 def Pool(processes=None, initializer=None, initargs=()):
-    from multiprocessing.pool import ThreadPool
+    from ..pool import ThreadPool
     return ThreadPool(processes, initializer, initargs)
 
 JoinableQueue = Queue
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
deleted file mode 100644
index 39cfd8d..0000000
--- a/Lib/multiprocessing/forking.py
+++ /dev/null
@@ -1,477 +0,0 @@
-#
-# Module for starting a process object using os.fork() or CreateProcess()
-#
-# multiprocessing/forking.py
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# Licensed to PSF under a Contributor Agreement.
-#
-
-import io
-import os
-import pickle
-import sys
-import signal
-import errno
-
-from multiprocessing import util, process
-
-__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
-
-#
-# Check that the current thread is spawning a child process
-#
-
-def assert_spawning(self):
-    if not Popen.thread_is_spawning():
-        raise RuntimeError(
-            '%s objects should only be shared between processes'
-            ' through inheritance' % type(self).__name__
-            )
-
-#
-# Try making some callable types picklable
-#
-
-from pickle import Pickler
-from copyreg import dispatch_table
-
-class ForkingPickler(Pickler):
-    _extra_reducers = {}
-    def __init__(self, *args):
-        Pickler.__init__(self, *args)
-        self.dispatch_table = dispatch_table.copy()
-        self.dispatch_table.update(self._extra_reducers)
-    @classmethod
-    def register(cls, type, reduce):
-        cls._extra_reducers[type] = reduce
-
-    @staticmethod
-    def dumps(obj):
-        buf = io.BytesIO()
-        ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
-        return buf.getbuffer()
-
-    loads = pickle.loads
-
-
-def _reduce_method(m):
-    if m.__self__ is None:
-        return getattr, (m.__class__, m.__func__.__name__)
-    else:
-        return getattr, (m.__self__, m.__func__.__name__)
-class _C:
-    def f(self):
-        pass
-ForkingPickler.register(type(_C().f), _reduce_method)
-
-
-def _reduce_method_descriptor(m):
-    return getattr, (m.__objclass__, m.__name__)
-ForkingPickler.register(type(list.append), _reduce_method_descriptor)
-ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
-
-try:
-    from functools import partial
-except ImportError:
-    pass
-else:
-    def _reduce_partial(p):
-        return _rebuild_partial, (p.func, p.args, p.keywords or {})
-    def _rebuild_partial(func, args, keywords):
-        return partial(func, *args, **keywords)
-    ForkingPickler.register(partial, _reduce_partial)
-
-#
-# Unix
-#
-
-if sys.platform != 'win32':
-    duplicate = os.dup
-    close = os.close
-
-    #
-    # We define a Popen class similar to the one from subprocess, but
-    # whose constructor takes a process object as its argument.
-    #
-
-    class Popen(object):
-
-        def __init__(self, process_obj):
-            sys.stdout.flush()
-            sys.stderr.flush()
-            self.returncode = None
-
-            r, w = os.pipe()
-            self.sentinel = r
-
-            self.pid = os.fork()
-            if self.pid == 0:
-                os.close(r)
-                if 'random' in sys.modules:
-                    import random
-                    random.seed()
-                code = process_obj._bootstrap()
-                os._exit(code)
-
-            # `w` will be closed when the child exits, at which point `r`
-            # will become ready for reading (using e.g. select()).
-            os.close(w)
-            util.Finalize(self, os.close, (r,))
-
-        def poll(self, flag=os.WNOHANG):
-            if self.returncode is None:
-                while True:
-                    try:
-                        pid, sts = os.waitpid(self.pid, flag)
-                    except OSError as e:
-                        if e.errno == errno.EINTR:
-                            continue
-                        # Child process not yet created. See #1731717
-                        # e.errno == errno.ECHILD == 10
-                        return None
-                    else:
-                        break
-                if pid == self.pid:
-                    if os.WIFSIGNALED(sts):
-                        self.returncode = -os.WTERMSIG(sts)
-                    else:
-                        assert os.WIFEXITED(sts)
-                        self.returncode = os.WEXITSTATUS(sts)
-            return self.returncode
-
-        def wait(self, timeout=None):
-            if self.returncode is None:
-                if timeout is not None:
-                    from .connection import wait
-                    if not wait([self.sentinel], timeout):
-                        return None
-                # This shouldn't block if wait() returned successfully.
-                return self.poll(os.WNOHANG if timeout == 0.0 else 0)
-            return self.returncode
-
-        def terminate(self):
-            if self.returncode is None:
-                try:
-                    os.kill(self.pid, signal.SIGTERM)
-                except OSError:
-                    if self.wait(timeout=0.1) is None:
-                        raise
-
-        @staticmethod
-        def thread_is_spawning():
-            return False
-
-#
-# Windows
-#
-
-else:
-    import _thread
-    import msvcrt
-    import _winapi
-
-    from pickle import load, HIGHEST_PROTOCOL
-
-    def dump(obj, file, protocol=None):
-        ForkingPickler(file, protocol).dump(obj)
-
-    #
-    #
-    #
-
-    TERMINATE = 0x10000
-    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
-    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
-
-    close = _winapi.CloseHandle
-
-    #
-    # _python_exe is the assumed path to the python executable.
-    # People embedding Python want to modify it.
-    #
-
-    if WINSERVICE:
-        _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
-    else:
-        _python_exe = sys.executable
-
-    def set_executable(exe):
-        global _python_exe
-        _python_exe = exe
-
-    #
-    #
-    #
-
-    def duplicate(handle, target_process=None, inheritable=False):
-        if target_process is None:
-            target_process = _winapi.GetCurrentProcess()
-        return _winapi.DuplicateHandle(
-            _winapi.GetCurrentProcess(), handle, target_process,
-            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
-            )
-
-    #
-    # We define a Popen class similar to the one from subprocess, but
-    # whose constructor takes a process object as its argument.
-    #
-
-    class Popen(object):
-        '''
-        Start a subprocess to run the code of a process object
-        '''
-        _tls = _thread._local()
-
-        def __init__(self, process_obj):
-            cmd = ' '.join('"%s"' % x for x in get_command_line())
-            prep_data = get_preparation_data(process_obj._name)
-
-            # create pipe for communication with child
-            rfd, wfd = os.pipe()
-
-            # get handle for read end of the pipe and make it inheritable
-            rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
-            os.close(rfd)
-
-            with open(wfd, 'wb', closefd=True) as to_child:
-                # start process
-                try:
-                    hp, ht, pid, tid = _winapi.CreateProcess(
-                        _python_exe, cmd + (' %s' % rhandle),
-                        None, None, 1, 0, None, None, None
-                        )
-                    _winapi.CloseHandle(ht)
-                finally:
-                    close(rhandle)
-
-                # set attributes of self
-                self.pid = pid
-                self.returncode = None
-                self._handle = hp
-                self.sentinel = int(hp)
-                util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
-
-                # send information to child
-                Popen._tls.process_handle = int(hp)
-                try:
-                    dump(prep_data, to_child, HIGHEST_PROTOCOL)
-                    dump(process_obj, to_child, HIGHEST_PROTOCOL)
-                finally:
-                    del Popen._tls.process_handle
-
-        @staticmethod
-        def thread_is_spawning():
-            return getattr(Popen._tls, 'process_handle', None) is not None
-
-        @staticmethod
-        def duplicate_for_child(handle):
-            return duplicate(handle, Popen._tls.process_handle)
-
-        def wait(self, timeout=None):
-            if self.returncode is None:
-                if timeout is None:
-                    msecs = _winapi.INFINITE
-                else:
-                    msecs = max(0, int(timeout * 1000 + 0.5))
-
-                res = _winapi.WaitForSingleObject(int(self._handle), msecs)
-                if res == _winapi.WAIT_OBJECT_0:
-                    code = _winapi.GetExitCodeProcess(self._handle)
-                    if code == TERMINATE:
-                        code = -signal.SIGTERM
-                    self.returncode = code
-
-            return self.returncode
-
-        def poll(self):
-            return self.wait(timeout=0)
-
-        def terminate(self):
-            if self.returncode is None:
-                try:
-                    _winapi.TerminateProcess(int(self._handle), TERMINATE)
-                except OSError:
-                    if self.wait(timeout=1.0) is None:
-                        raise
-
-    #
-    #
-    #
-
-    def is_forking(argv):
-        '''
-        Return whether commandline indicates we are forking
-        '''
-        if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
-            assert len(argv) == 3
-            return True
-        else:
-            return False
-
-
-    def freeze_support():
-        '''
-        Run code for process object if this in not the main process
-        '''
-        if is_forking(sys.argv):
-            main()
-            sys.exit()
-
-
-    def get_command_line():
-        '''
-        Returns prefix of command line used for spawning a child process
-        '''
-        if getattr(process.current_process(), '_inheriting', False):
-            raise RuntimeError('''
-            Attempt to start a new process before the current process
-            has finished its bootstrapping phase.
-
-            This probably means that you are on Windows and you have
-            forgotten to use the proper idiom in the main module:
-
-                if __name__ == '__main__':
-                    freeze_support()
-                    ...
-
-            The "freeze_support()" line can be omitted if the program
-            is not going to be frozen to produce a Windows executable.''')
-
-        if getattr(sys, 'frozen', False):
-            return [sys.executable, '--multiprocessing-fork']
-        else:
-            prog = 'from multiprocessing.forking import main; main()'
-            opts = util._args_from_interpreter_flags()
-            return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
-
-
-    def main():
-        '''
-        Run code specifed by data received over pipe
-        '''
-        assert is_forking(sys.argv)
-
-        handle = int(sys.argv[-1])
-        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
-        from_parent = os.fdopen(fd, 'rb')
-
-        process.current_process()._inheriting = True
-        preparation_data = load(from_parent)
-        prepare(preparation_data)
-        self = load(from_parent)
-        process.current_process()._inheriting = False
-
-        from_parent.close()
-
-        exitcode = self._bootstrap()
-        sys.exit(exitcode)
-
-
-    def get_preparation_data(name):
-        '''
-        Return info about parent needed by child to unpickle process object
-        '''
-        from .util import _logger, _log_to_stderr
-
-        d = dict(
-            name=name,
-            sys_path=sys.path,
-            sys_argv=sys.argv,
-            log_to_stderr=_log_to_stderr,
-            orig_dir=process.ORIGINAL_DIR,
-            authkey=process.current_process().authkey,
-            )
-
-        if _logger is not None:
-            d['log_level'] = _logger.getEffectiveLevel()
-
-        if not WINEXE and not WINSERVICE:
-            main_path = getattr(sys.modules['__main__'], '__file__', None)
-            if not main_path and sys.argv[0] not in ('', '-c'):
-                main_path = sys.argv[0]
-            if main_path is not None:
-                if not os.path.isabs(main_path) and \
-                                          process.ORIGINAL_DIR is not None:
-                    main_path = os.path.join(process.ORIGINAL_DIR, main_path)
-                d['main_path'] = os.path.normpath(main_path)
-
-        return d
-
-#
-# Prepare current process
-#
-
-old_main_modules = []
-
-def prepare(data):
-    '''
-    Try to get current process ready to unpickle process object
-    '''
-    old_main_modules.append(sys.modules['__main__'])
-
-    if 'name' in data:
-        process.current_process().name = data['name']
-
-    if 'authkey' in data:
-        process.current_process()._authkey = data['authkey']
-
-    if 'log_to_stderr' in data and data['log_to_stderr']:
-        util.log_to_stderr()
-
-    if 'log_level' in data:
-        util.get_logger().setLevel(data['log_level'])
-
-    if 'sys_path' in data:
-        sys.path = data['sys_path']
-
-    if 'sys_argv' in data:
-        sys.argv = data['sys_argv']
-
-    if 'dir' in data:
-        os.chdir(data['dir'])
-
-    if 'orig_dir' in data:
-        process.ORIGINAL_DIR = data['orig_dir']
-
-    if 'main_path' in data:
-        # XXX (ncoghlan): The following code makes several bogus
-        # assumptions regarding the relationship between __file__
-        # and a module's real name. See PEP 302 and issue #10845
-        main_path = data['main_path']
-        main_name = os.path.splitext(os.path.basename(main_path))[0]
-        if main_name == '__init__':
-            main_name = os.path.basename(os.path.dirname(main_path))
-
-        if main_name == '__main__':
-            main_module = sys.modules['__main__']
-            main_module.__file__ = main_path
-        elif main_name != 'ipython':
-            # Main modules not actually called __main__.py may
-            # contain additional code that should still be executed
-            import importlib
-            import types
-
-            if main_path is None:
-                dirs = None
-            elif os.path.basename(main_path).startswith('__init__.py'):
-                dirs = [os.path.dirname(os.path.dirname(main_path))]
-            else:
-                dirs = [os.path.dirname(main_path)]
-
-            assert main_name not in sys.modules, main_name
-            sys.modules.pop('__mp_main__', None)
-            # We should not try to load __main__
-            # since that would execute 'if __name__ == "__main__"'
-            # clauses, potentially causing a psuedo fork bomb.
-            loader = importlib.find_loader(main_name, path=dirs)
-            main_module = types.ModuleType(main_name)
-            try:
-                loader.init_module_attrs(main_module)
-            except AttributeError:  # init_module_attrs is optional
-                pass
-            main_module.__name__ = '__mp_main__'
-            code = loader.get_code(main_name)
-            exec(code, main_module.__dict__)
-
-            sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
new file mode 100644
index 0000000..628808e
--- /dev/null
+++ b/Lib/multiprocessing/forkserver.py
@@ -0,0 +1,238 @@
+import errno
+import os
+import select
+import signal
+import socket
+import struct
+import sys
+import threading
+
+from . import connection
+from . import process
+from . import reduction
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
+           'set_forkserver_preload']
+
+#
+#
+#
+
+MAXFDS_TO_SEND = 256
+UNSIGNED_STRUCT = struct.Struct('Q')     # large enough for pid_t
+
+_inherited_fds = None
+_lock = threading.Lock()
+_preload_modules = ['__main__']
+
+
+#
+# Public function
+#
+
+def set_forkserver_preload(modules_names):
+    '''Set list of module names to try to load in forkserver process.'''
+    global _preload_modules
+    _preload_modules = modules_names
+
+
+def get_inherited_fds():
+    '''Return list of fds inherited from parent process.
+
+    This returns None if the current process was not started by fork server.
+    '''
+    return _inherited_fds
+
+
+def connect_to_new_process(fds):
+    '''Request forkserver to create a child process.
+
+    Returns a pair of fds (status_r, data_w).  The calling process can read
+    the child process's pid and (eventually) its returncode from status_r.
+    The calling process should write to data_w the pickled preparation and
+    process data.
+    '''
+    if len(fds) + 3 >= MAXFDS_TO_SEND:
+        raise ValueError('too many fds')
+    address, alive_w = process.current_process()._config['forkserver_info']
+    with socket.socket(socket.AF_UNIX) as client:
+        client.connect(address)
+        parent_r, child_w = util.pipe()
+        child_r, parent_w = util.pipe()
+        allfds = [child_r, child_w, alive_w]
+        allfds += fds
+        try:
+            reduction.sendfds(client, allfds)
+            return parent_r, parent_w
+        except:
+            os.close(parent_r)
+            os.close(parent_w)
+            raise
+        finally:
+            os.close(child_r)
+            os.close(child_w)
+
+
+def ensure_running():
+    '''Make sure that a fork server is running.
+
+    This can be called from any process.  Note that usually a child
+    process will just reuse the forkserver started by its parent, so
+    ensure_running() will do nothing.
+    '''
+    with _lock:
+        config = process.current_process()._config
+        if config.get('forkserver_info') is not None:
+            return
+
+        assert all(type(mod) is str for mod in _preload_modules)
+        semaphore_tracker_fd = config['semaphore_tracker_fd']
+        cmd = ('from multiprocessing.forkserver import main; ' +
+               'main(%d, %d, %r, **%r)')
+
+        if _preload_modules:
+            desired_keys = {'main_path', 'sys_path'}
+            data = spawn.get_preparation_data('ignore')
+            data = dict((x,y) for (x,y) in data.items() if x in desired_keys)
+        else:
+            data = {}
+
+        with socket.socket(socket.AF_UNIX) as listener:
+            address = connection.arbitrary_address('AF_UNIX')
+            listener.bind(address)
+            os.chmod(address, 0o600)
+            listener.listen(100)
+
+            # all client processes own the write end of the "alive" pipe;
+            # when they all terminate the read end becomes ready.
+            alive_r, alive_w = os.pipe()
+            config['forkserver_info'] = (address, alive_w)
+            fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+            cmd %= (listener.fileno(), alive_r, _preload_modules, data)
+            exe = spawn.get_executable()
+            args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
+            pid = util.spawnv_passfds(exe, args, fds_to_pass)
+
+
+def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
+    '''Run forkserver.'''
+    if preload:
+        if '__main__' in preload and main_path is not None:
+            process.current_process()._inheriting = True
+            try:
+                spawn.import_main_path(main_path)
+            finally:
+                del process.current_process()._inheriting
+        for modname in preload:
+            try:
+                __import__(modname)
+            except ImportError:
+                pass
+
+    # close sys.stdin
+    if sys.stdin is not None:
+        try:
+            sys.stdin.close()
+            sys.stdin = open(os.devnull)
+        except (OSError, ValueError):
+            pass
+
+    # ignoring SIGCHLD means no need to reap zombie processes
+    handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
+    with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+        readers = [listener, alive_r]
+
+        while True:
+            try:
+                rfds, wfds, xfds = select.select(readers, [], [])
+
+                if alive_r in rfds:
+                    # EOF because no more client processes left
+                    assert os.read(alive_r, 1) == b''
+                    raise SystemExit
+
+                assert listener in rfds
+                with listener.accept()[0] as s:
+                    code = 1
+                    if os.fork() == 0:
+                        try:
+                            _serve_one(s, listener, alive_r, handler)
+                        except Exception:
+                            sys.excepthook(*sys.exc_info())
+                            sys.stderr.flush()
+                        finally:
+                            os._exit(code)
+
+            except InterruptedError:
+                pass
+            except OSError as e:
+                if e.errno != errno.ECONNABORTED:
+                    raise
+
+#
+# Code to bootstrap new process
+#
+
+def _serve_one(s, listener, alive_r, handler):
+    global _inherited_fds
+
+    # close unnecessary stuff and reset SIGCHLD handler
+    listener.close()
+    os.close(alive_r)
+    signal.signal(signal.SIGCHLD, handler)
+
+    # receive fds from parent process
+    fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
+    s.close()
+    assert len(fds) <= MAXFDS_TO_SEND
+    child_r, child_w, alive_w, *_inherited_fds = fds
+
+    # send pid to client processes
+    write_unsigned(child_w, os.getpid())
+
+    # reseed random number generator
+    if 'random' in sys.modules:
+        import random
+        random.seed()
+
+    # run process object received over pipe
+    code = spawn._main(child_r)
+
+    # write the exit code to the pipe
+    write_unsigned(child_w, code)
+
+#
+# Read and write unsigned numbers
+#
+
+def read_unsigned(fd):
+    data = b''
+    length = UNSIGNED_STRUCT.size
+    while len(data) < length:
+        while True:
+            try:
+                s = os.read(fd, length - len(data))
+            except InterruptedError:
+                pass
+            else:
+                break
+        if not s:
+            raise EOFError('unexpected EOF')
+        data += s
+    return UNSIGNED_STRUCT.unpack(data)[0]
+
+def write_unsigned(fd, n):
+    msg = UNSIGNED_STRUCT.pack(n)
+    while msg:
+        while True:
+            try:
+                nbytes = os.write(fd, msg)
+            except InterruptedError:
+                pass
+            else:
+                break
+        if nbytes == 0:
+            raise RuntimeError('should not get here')
+        msg = msg[nbytes:]
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index e63fdb8..b95f90f 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -8,15 +8,17 @@
 #
 
 import bisect
+import itertools
 import mmap
 import os
 import sys
+import tempfile
 import threading
-import itertools
-
 import _multiprocessing
-from multiprocessing.util import Finalize, info
-from multiprocessing.forking import assert_spawning
+
+from . import popen
+from . import reduction
+from . import util
 
 __all__ = ['BufferWrapper']
 
@@ -30,17 +32,25 @@
 
     class Arena(object):
 
-        _counter = itertools.count()
+        _rand = tempfile._RandomNameSequence()
 
         def __init__(self, size):
             self.size = size
-            self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
-            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
-            assert _winapi.GetLastError() == 0, 'tagname already in use'
+            for i in range(100):
+                name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
+                buf = mmap.mmap(-1, size, tagname=name)
+                if _winapi.GetLastError() == 0:
+                    break
+                # We have reopened a preexisting mmap.
+                buf.close()
+            else:
+                raise FileExistsError('Cannot find name for new mmap')
+            self.name = name
+            self.buffer = buf
             self._state = (self.size, self.name)
 
         def __getstate__(self):
-            assert_spawning(self)
+            popen.assert_spawning(self)
             return self._state
 
         def __setstate__(self, state):
@@ -52,10 +62,28 @@
 
     class Arena(object):
 
-        def __init__(self, size):
-            self.buffer = mmap.mmap(-1, size)
+        def __init__(self, size, fd=-1):
             self.size = size
-            self.name = None
+            self.fd = fd
+            if fd == -1:
+                self.fd, name = tempfile.mkstemp(
+                     prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())
+                os.unlink(name)
+                util.Finalize(self, os.close, (self.fd,))
+                with open(self.fd, 'wb', closefd=False) as f:
+                    f.write(b'\0'*size)
+            self.buffer = mmap.mmap(self.fd, self.size)
+
+    def reduce_arena(a):
+        if a.fd == -1:
+            raise ValueError('Arena is unpicklable because '
+                             'forking was enabled when it was created')
+        return rebuild_arena, (a.size, reduction.DupFd(a.fd))
+
+    def rebuild_arena(size, dupfd):
+        return Arena(size, dupfd.detach())
+
+    reduction.register(Arena, reduce_arena)
 
 #
 # Class allowing allocation of chunks of memory from arenas
@@ -90,7 +118,7 @@
         if i == len(self._lengths):
             length = self._roundup(max(self._size, size), mmap.PAGESIZE)
             self._size *= 2
-            info('allocating a new mmap of length %d', length)
+            util.info('allocating a new mmap of length %d', length)
             arena = Arena(length)
             self._arenas.append(arena)
             return (arena, 0, length)
@@ -216,7 +244,7 @@
         assert 0 <= size < sys.maxsize
         block = BufferWrapper._heap.malloc(size)
         self._state = (block, size)
-        Finalize(self, BufferWrapper._heap.free, args=(block,))
+        util.Finalize(self, BufferWrapper._heap.free, args=(block,))
 
     def create_memoryview(self):
         (arena, start, stop), size = self._state
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 36cd650..f580e9e 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -19,11 +19,15 @@
 import array
 import queue
 
-from traceback import format_exc
-from multiprocessing import Process, current_process, active_children, Pool, util, connection
-from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import Popen, ForkingPickler
 from time import time as _time
+from traceback import format_exc
+
+from . import connection
+from . import pool
+from . import process
+from . import popen
+from . import reduction
+from . import util
 
 #
 # Register some things for pickling
@@ -31,16 +35,14 @@
 
 def reduce_array(a):
     return array.array, (a.typecode, a.tobytes())
-ForkingPickler.register(array.array, reduce_array)
+reduction.register(array.array, reduce_array)
 
 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
 if view_types[0] is not list:       # only needed in Py3.0
     def rebuild_as_list(obj):
         return list, (list(obj),)
     for view_type in view_types:
-        ForkingPickler.register(view_type, rebuild_as_list)
-        import copyreg
-        copyreg.pickle(view_type, rebuild_as_list)
+        reduction.register(view_type, rebuild_as_list)
 
 #
 # Type for identifying shared objects
@@ -130,7 +132,7 @@
     def __init__(self, registry, address, authkey, serializer):
         assert isinstance(authkey, bytes)
         self.registry = registry
-        self.authkey = AuthenticationString(authkey)
+        self.authkey = process.AuthenticationString(authkey)
         Listener, Client = listener_client[serializer]
 
         # do authentication later
@@ -146,7 +148,7 @@
         Run the server forever
         '''
         self.stop_event = threading.Event()
-        current_process()._manager_server = self
+        process.current_process()._manager_server = self
         try:
             accepter = threading.Thread(target=self.accepter)
             accepter.daemon = True
@@ -438,9 +440,9 @@
 
     def __init__(self, address=None, authkey=None, serializer='pickle'):
         if authkey is None:
-            authkey = current_process().authkey
+            authkey = process.current_process().authkey
         self._address = address     # XXX not final address if eg ('', 0)
-        self._authkey = AuthenticationString(authkey)
+        self._authkey = process.AuthenticationString(authkey)
         self._state = State()
         self._state.value = State.INITIAL
         self._serializer = serializer
@@ -476,7 +478,7 @@
         reader, writer = connection.Pipe(duplex=False)
 
         # spawn process which runs a server
-        self._process = Process(
+        self._process = process.Process(
             target=type(self)._run_server,
             args=(self._registry, self._address, self._authkey,
                   self._serializer, writer, initializer, initargs),
@@ -691,11 +693,11 @@
         self._Client = listener_client[serializer][1]
 
         if authkey is not None:
-            self._authkey = AuthenticationString(authkey)
+            self._authkey = process.AuthenticationString(authkey)
         elif self._manager is not None:
             self._authkey = self._manager._authkey
         else:
-            self._authkey = current_process().authkey
+            self._authkey = process.current_process().authkey
 
         if incref:
             self._incref()
@@ -704,7 +706,7 @@
 
     def _connect(self):
         util.debug('making connection to manager')
-        name = current_process().name
+        name = process.current_process().name
         if threading.current_thread().name != 'MainThread':
             name += '|' + threading.current_thread().name
         conn = self._Client(self._token.address, authkey=self._authkey)
@@ -798,7 +800,7 @@
 
     def __reduce__(self):
         kwds = {}
-        if Popen.thread_is_spawning():
+        if popen.get_spawning_popen() is not None:
             kwds['authkey'] = self._authkey
 
         if getattr(self, '_isauto', False):
@@ -835,14 +837,14 @@
 
     If possible the shared object is returned, or otherwise a proxy for it.
     '''
-    server = getattr(current_process(), '_manager_server', None)
+    server = getattr(process.current_process(), '_manager_server', None)
 
     if server and server.address == token.address:
         return server.id_to_obj[token.id][0]
     else:
         incref = (
             kwds.pop('incref', True) and
-            not getattr(current_process(), '_inheriting', False)
+            not getattr(process.current_process(), '_inheriting', False)
             )
         return func(token, serializer, incref=incref, **kwds)
 
@@ -889,7 +891,7 @@
     if authkey is None and manager is not None:
         authkey = manager._authkey
     if authkey is None:
-        authkey = current_process().authkey
+        authkey = process.current_process().authkey
 
     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
@@ -1109,7 +1111,7 @@
                      AcquirerProxy)
 SyncManager.register('Condition', threading.Condition, ConditionProxy)
 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
-SyncManager.register('Pool', Pool, PoolProxy)
+SyncManager.register('Pool', pool.Pool, PoolProxy)
 SyncManager.register('list', list, ListProxy)
 SyncManager.register('dict', dict, DictProxy)
 SyncManager.register('Value', Value, ValueProxy)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 8082ad6..1cecd09 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -7,7 +7,7 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
-__all__ = ['Pool']
+__all__ = ['Pool', 'ThreadPool']
 
 #
 # Imports
@@ -21,8 +21,10 @@
 import time
 import traceback
 
-from multiprocessing import Process, TimeoutError
-from multiprocessing.util import Finalize, debug
+# If threading is available then ThreadPool should be provided.  Therefore
+# we avoid top-level imports which are liable to fail on some systems.
+from . import util
+from . import Process, cpu_count, TimeoutError, SimpleQueue
 
 #
 # Constants representing the state of a pool
@@ -104,11 +106,11 @@
         try:
             task = get()
         except (EOFError, OSError):
-            debug('worker got EOFError or OSError -- exiting')
+            util.debug('worker got EOFError or OSError -- exiting')
             break
 
         if task is None:
-            debug('worker got sentinel -- exiting')
+            util.debug('worker got sentinel -- exiting')
             break
 
         job, i, func, args, kwds = task
@@ -121,11 +123,11 @@
             put((job, i, result))
         except Exception as e:
             wrapped = MaybeEncodingError(e, result[1])
-            debug("Possible encoding error while sending result: %s" % (
+            util.debug("Possible encoding error while sending result: %s" % (
                 wrapped))
             put((job, i, (False, wrapped)))
         completed += 1
-    debug('worker exiting after %d tasks' % completed)
+    util.debug('worker exiting after %d tasks' % completed)
 
 #
 # Class representing a process pool
@@ -184,7 +186,7 @@
         self._result_handler._state = RUN
         self._result_handler.start()
 
-        self._terminate = Finalize(
+        self._terminate = util.Finalize(
             self, self._terminate_pool,
             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                   self._worker_handler, self._task_handler,
@@ -201,7 +203,7 @@
             worker = self._pool[i]
             if worker.exitcode is not None:
                 # worker exited
-                debug('cleaning up worker %d' % i)
+                util.debug('cleaning up worker %d' % i)
                 worker.join()
                 cleaned = True
                 del self._pool[i]
@@ -221,7 +223,7 @@
             w.name = w.name.replace('Process', 'PoolWorker')
             w.daemon = True
             w.start()
-            debug('added worker')
+            util.debug('added worker')
 
     def _maintain_pool(self):
         """Clean up any exited workers and start replacements for them.
@@ -230,7 +232,6 @@
             self._repopulate_pool()
 
     def _setup_queues(self):
-        from .queues import SimpleQueue
         self._inqueue = SimpleQueue()
         self._outqueue = SimpleQueue()
         self._quick_put = self._inqueue._writer.send
@@ -358,7 +359,7 @@
             time.sleep(0.1)
         # send sentinel to stop workers
         pool._taskqueue.put(None)
-        debug('worker handler exiting')
+        util.debug('worker handler exiting')
 
     @staticmethod
     def _handle_tasks(taskqueue, put, outqueue, pool):
@@ -368,36 +369,36 @@
             i = -1
             for i, task in enumerate(taskseq):
                 if thread._state:
-                    debug('task handler found thread._state != RUN')
+                    util.debug('task handler found thread._state != RUN')
                     break
                 try:
                     put(task)
                 except OSError:
-                    debug('could not put task on queue')
+                    util.debug('could not put task on queue')
                     break
             else:
                 if set_length:
-                    debug('doing set_length()')
+                    util.debug('doing set_length()')
                     set_length(i+1)
                 continue
             break
         else:
-            debug('task handler got sentinel')
+            util.debug('task handler got sentinel')
 
 
         try:
             # tell result handler to finish when cache is empty
-            debug('task handler sending sentinel to result handler')
+            util.debug('task handler sending sentinel to result handler')
             outqueue.put(None)
 
             # tell workers there is no more work
-            debug('task handler sending sentinel to workers')
+            util.debug('task handler sending sentinel to workers')
             for p in pool:
                 put(None)
         except OSError:
-            debug('task handler got OSError when sending sentinels')
+            util.debug('task handler got OSError when sending sentinels')
 
-        debug('task handler exiting')
+        util.debug('task handler exiting')
 
     @staticmethod
     def _handle_results(outqueue, get, cache):
@@ -407,16 +408,16 @@
             try:
                 task = get()
             except (OSError, EOFError):
-                debug('result handler got EOFError/OSError -- exiting')
+                util.debug('result handler got EOFError/OSError -- exiting')
                 return
 
             if thread._state:
                 assert thread._state == TERMINATE
-                debug('result handler found thread._state=TERMINATE')
+                util.debug('result handler found thread._state=TERMINATE')
                 break
 
             if task is None:
-                debug('result handler got sentinel')
+                util.debug('result handler got sentinel')
                 break
 
             job, i, obj = task
@@ -429,11 +430,11 @@
             try:
                 task = get()
             except (OSError, EOFError):
-                debug('result handler got EOFError/OSError -- exiting')
+                util.debug('result handler got EOFError/OSError -- exiting')
                 return
 
             if task is None:
-                debug('result handler ignoring extra sentinel')
+                util.debug('result handler ignoring extra sentinel')
                 continue
             job, i, obj = task
             try:
@@ -442,7 +443,7 @@
                 pass
 
         if hasattr(outqueue, '_reader'):
-            debug('ensuring that outqueue is not full')
+            util.debug('ensuring that outqueue is not full')
             # If we don't make room available in outqueue then
             # attempts to add the sentinel (None) to outqueue may
             # block.  There is guaranteed to be no more than 2 sentinels.
@@ -454,7 +455,7 @@
             except (OSError, EOFError):
                 pass
 
-        debug('result handler exiting: len(cache)=%s, thread._state=%s',
+        util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
               len(cache), thread._state)
 
     @staticmethod
@@ -472,19 +473,19 @@
               )
 
     def close(self):
-        debug('closing pool')
+        util.debug('closing pool')
         if self._state == RUN:
             self._state = CLOSE
             self._worker_handler._state = CLOSE
 
     def terminate(self):
-        debug('terminating pool')
+        util.debug('terminating pool')
         self._state = TERMINATE
         self._worker_handler._state = TERMINATE
         self._terminate()
 
     def join(self):
-        debug('joining pool')
+        util.debug('joining pool')
         assert self._state in (CLOSE, TERMINATE)
         self._worker_handler.join()
         self._task_handler.join()
@@ -495,7 +496,7 @@
     @staticmethod
     def _help_stuff_finish(inqueue, task_handler, size):
         # task_handler may be blocked trying to put items on inqueue
-        debug('removing tasks from inqueue until task handler finished')
+        util.debug('removing tasks from inqueue until task handler finished')
         inqueue._rlock.acquire()
         while task_handler.is_alive() and inqueue._reader.poll():
             inqueue._reader.recv()
@@ -505,12 +506,12 @@
     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
                         worker_handler, task_handler, result_handler, cache):
         # this is guaranteed to only be called once
-        debug('finalizing pool')
+        util.debug('finalizing pool')
 
         worker_handler._state = TERMINATE
         task_handler._state = TERMINATE
 
-        debug('helping task handler/workers to finish')
+        util.debug('helping task handler/workers to finish')
         cls._help_stuff_finish(inqueue, task_handler, len(pool))
 
         assert result_handler.is_alive() or len(cache) == 0
@@ -520,31 +521,31 @@
 
         # We must wait for the worker handler to exit before terminating
         # workers because we don't want workers to be restarted behind our back.
-        debug('joining worker handler')
+        util.debug('joining worker handler')
         if threading.current_thread() is not worker_handler:
             worker_handler.join()
 
         # Terminate workers which haven't already finished.
         if pool and hasattr(pool[0], 'terminate'):
-            debug('terminating workers')
+            util.debug('terminating workers')
             for p in pool:
                 if p.exitcode is None:
                     p.terminate()
 
-        debug('joining task handler')
+        util.debug('joining task handler')
         if threading.current_thread() is not task_handler:
             task_handler.join()
 
-        debug('joining result handler')
+        util.debug('joining result handler')
         if threading.current_thread() is not result_handler:
             result_handler.join()
 
         if pool and hasattr(pool[0], 'terminate'):
-            debug('joining pool workers')
+            util.debug('joining pool workers')
             for p in pool:
                 if p.is_alive():
                     # worker has not yet exited
-                    debug('cleaning up worker %d' % p.pid)
+                    util.debug('cleaning up worker %d' % p.pid)
                     p.join()
 
     def __enter__(self):
@@ -730,7 +731,10 @@
 
 class ThreadPool(Pool):
 
-    from .dummy import Process
+    @staticmethod
+    def Process(*args, **kwds):
+        from .dummy import Process
+        return Process(*args, **kwds)
 
     def __init__(self, processes=None, initializer=None, initargs=()):
         Pool.__init__(self, processes, initializer, initargs)
diff --git a/Lib/multiprocessing/popen.py b/Lib/multiprocessing/popen.py
new file mode 100644
index 0000000..b0c80d5
--- /dev/null
+++ b/Lib/multiprocessing/popen.py
@@ -0,0 +1,78 @@
+import sys
+import threading
+
+__all__ = ['Popen', 'get_spawning_popen', 'set_spawning_popen',
+           'assert_spawning']
+
+#
+# Check that the current thread is spawning a child process
+#
+
+_tls = threading.local()
+
+def get_spawning_popen():
+    return getattr(_tls, 'spawning_popen', None)
+
+def set_spawning_popen(popen):
+    _tls.spawning_popen = popen
+
+def assert_spawning(obj):
+    if get_spawning_popen() is None:
+        raise RuntimeError(
+            '%s objects should only be shared between processes'
+            ' through inheritance' % type(obj).__name__
+            )
+
+#
+#
+#
+
+_Popen = None
+
+def Popen(process_obj):
+    if _Popen is None:
+        set_start_method()
+    return _Popen(process_obj)
+
+def get_start_method():
+    if _Popen is None:
+        set_start_method()
+    return _Popen.method
+
+def set_start_method(meth=None, *, start_helpers=True):
+    global _Popen
+    try:
+        modname = _method_to_module[meth]
+        __import__(modname)
+    except (KeyError, ImportError):
+        raise ValueError('could not use start method %r' % meth)
+    module = sys.modules[modname]
+    if start_helpers:
+        module.Popen.ensure_helpers_running()
+    _Popen = module.Popen
+
+
+if sys.platform == 'win32':
+
+    _method_to_module = {
+        None: 'multiprocessing.popen_spawn_win32',
+        'spawn': 'multiprocessing.popen_spawn_win32',
+        }
+
+    def get_all_start_methods():
+        return ['spawn']
+
+else:
+    _method_to_module = {
+        None: 'multiprocessing.popen_fork',
+        'fork': 'multiprocessing.popen_fork',
+        'spawn': 'multiprocessing.popen_spawn_posix',
+        'forkserver': 'multiprocessing.popen_forkserver',
+        }
+
+    def get_all_start_methods():
+        from . import reduction
+        if reduction.HAVE_SEND_HANDLE:
+            return ['fork', 'spawn', 'forkserver']
+        else:
+            return ['fork', 'spawn']
diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py
new file mode 100644
index 0000000..fd25ddc
--- /dev/null
+++ b/Lib/multiprocessing/popen_fork.py
@@ -0,0 +1,87 @@
+import os
+import sys
+import signal
+import errno
+
+from . import util
+
+__all__ = ['Popen']
+
+#
+# Start child process using fork
+#
+
+class Popen(object):
+    method = 'fork'
+
+    def __init__(self, process_obj):
+        sys.stdout.flush()
+        sys.stderr.flush()
+        self.returncode = None
+        self._launch(process_obj)
+
+    def duplicate_for_child(self, fd):
+        return fd
+
+    def poll(self, flag=os.WNOHANG):
+        if self.returncode is None:
+            while True:
+                try:
+                    pid, sts = os.waitpid(self.pid, flag)
+                except OSError as e:
+                    if e.errno == errno.EINTR:
+                        continue
+                    # Child process not yet created. See #1731717
+                    # e.errno == errno.ECHILD == 10
+                    return None
+                else:
+                    break
+            if pid == self.pid:
+                if os.WIFSIGNALED(sts):
+                    self.returncode = -os.WTERMSIG(sts)
+                else:
+                    assert os.WIFEXITED(sts)
+                    self.returncode = os.WEXITSTATUS(sts)
+        return self.returncode
+
+    def wait(self, timeout=None):
+        if self.returncode is None:
+            if timeout is not None:
+                from .connection import wait
+                if not wait([self.sentinel], timeout):
+                    return None
+            # This shouldn't block if wait() returned successfully.
+            return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+        return self.returncode
+
+    def terminate(self):
+        if self.returncode is None:
+            try:
+                os.kill(self.pid, signal.SIGTERM)
+            except ProcessLookupError:
+                pass
+            except OSError:
+                if self.wait(timeout=0.1) is None:
+                    raise
+
+    def _launch(self, process_obj):
+        code = 1
+        parent_r, child_w = util.pipe()
+        self.pid = os.fork()
+        if self.pid == 0:
+            try:
+                os.close(parent_r)
+                if 'random' in sys.modules:
+                    import random
+                    random.seed()
+                code = process_obj._bootstrap()
+            finally:
+                os._exit(code)
+        else:
+            os.close(child_w)
+            util.Finalize(self, os.close, (parent_r,))
+            self.sentinel = parent_r
+
+    @staticmethod
+    def ensure_helpers_running():
+        pass
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
new file mode 100644
index 0000000..f1c4b57
--- /dev/null
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -0,0 +1,75 @@
+import io
+import os
+
+from . import reduction
+if not reduction.HAVE_SEND_HANDLE:
+    raise ImportError('No support for sending fds between processes')
+from . import forkserver
+from . import popen
+from . import popen_fork
+from . import spawn
+from . import util
+
+
+__all__ = ['Popen']
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+    def __init__(self, ind):
+        self.ind = ind
+    def detach(self):
+        return forkserver.get_inherited_fds()[self.ind]
+
+#
+# Start child process using a server process
+#
+
+class Popen(popen_fork.Popen):
+    method = 'forkserver'
+    DupFd = _DupFd
+
+    def __init__(self, process_obj):
+        self._fds = []
+        super().__init__(process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return len(self._fds) - 1
+
+    def _launch(self, process_obj):
+        prep_data = spawn.get_preparation_data(process_obj._name)
+        buf = io.BytesIO()
+        popen.set_spawning_popen(self)
+        try:
+            reduction.dump(prep_data, buf)
+            reduction.dump(process_obj, buf)
+        finally:
+            popen.set_spawning_popen(None)
+
+        self.sentinel, w = forkserver.connect_to_new_process(self._fds)
+        util.Finalize(self, os.close, (self.sentinel,))
+        with open(w, 'wb', closefd=True) as f:
+            f.write(buf.getbuffer())
+        self.pid = forkserver.read_unsigned(self.sentinel)
+
+    def poll(self, flag=os.WNOHANG):
+        if self.returncode is None:
+            from .connection import wait
+            timeout = 0 if flag == os.WNOHANG else None
+            if not wait([self.sentinel], timeout):
+                return None
+            try:
+                self.returncode = forkserver.read_unsigned(self.sentinel)
+            except (OSError, EOFError):
+                # The process ended abnormally perhaps because of a signal
+                self.returncode = 255
+        return self.returncode
+
+    @staticmethod
+    def ensure_helpers_running():
+        from . import semaphore_tracker
+        semaphore_tracker.ensure_running()
+        forkserver.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
new file mode 100644
index 0000000..de262aa
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -0,0 +1,75 @@
+import fcntl
+import io
+import os
+
+from . import popen
+from . import popen_fork
+from . import reduction
+from . import spawn
+from . import util
+
+from . import current_process
+
+__all__ = ['Popen']
+
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+    def __init__(self, fd):
+        self.fd = fd
+    def detach(self):
+        return self.fd
+
+#
+# Start child process using a fresh interpreter
+#
+
+class Popen(popen_fork.Popen):
+    method = 'spawn'
+    DupFd = _DupFd
+
+    def __init__(self, process_obj):
+        self._fds = []
+        super().__init__(process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return fd
+
+    def _launch(self, process_obj):
+        tracker_fd = current_process()._config['semaphore_tracker_fd']
+        self._fds.append(tracker_fd)
+        prep_data = spawn.get_preparation_data(process_obj._name)
+        fp = io.BytesIO()
+        popen.set_spawning_popen(self)
+        try:
+            reduction.dump(prep_data, fp)
+            reduction.dump(process_obj, fp)
+        finally:
+            popen.set_spawning_popen(None)
+
+        parent_r = child_w = child_r = parent_w = None
+        try:
+            parent_r, child_w = util.pipe()
+            child_r, parent_w = util.pipe()
+            cmd = spawn.get_command_line() + [str(child_r)]
+            self._fds.extend([child_r, child_w])
+            self.pid = util.spawnv_passfds(spawn.get_executable(),
+                                           cmd, self._fds)
+            self.sentinel = parent_r
+            with open(parent_w, 'wb', closefd=False) as f:
+                f.write(fp.getbuffer())
+        finally:
+            if parent_r is not None:
+                util.Finalize(self, os.close, (parent_r,))
+            for fd in (child_r, child_w, parent_w):
+                if fd is not None:
+                    os.close(fd)
+
+    @staticmethod
+    def ensure_helpers_running():
+        from . import semaphore_tracker
+        semaphore_tracker.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
new file mode 100644
index 0000000..7e0c4b3
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -0,0 +1,102 @@
+import os
+import msvcrt
+import signal
+import sys
+import _winapi
+
+from . import spawn
+from . import popen
+from . import reduction
+from . import util
+
+__all__ = ['Popen']
+
+#
+#
+#
+
+TERMINATE = 0x10000
+WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+#
+# We define a Popen class similar to the one from subprocess, but
+# whose constructor takes a process object as its argument.
+#
+
+class Popen(object):
+    '''
+    Start a subprocess to run the code of a process object
+    '''
+    method = 'spawn'
+
+    def __init__(self, process_obj):
+        prep_data = spawn.get_preparation_data(process_obj._name)
+        cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
+
+        # read end of pipe will be "stolen" by the child process
+        # -- see spawn_main() in spawn.py.
+        rhandle, whandle = _winapi.CreatePipe(None, 0)
+        wfd = msvcrt.open_osfhandle(whandle, 0)
+        cmd += ' {} {}'.format(os.getpid(), rhandle)
+
+        with open(wfd, 'wb', closefd=True) as to_child:
+            # start process
+            try:
+                hp, ht, pid, tid = _winapi.CreateProcess(
+                    spawn.get_executable(), cmd,
+                    None, None, False, 0, None, None, None)
+                _winapi.CloseHandle(ht)
+            except:
+                _winapi.CloseHandle(rhandle)
+                raise
+
+            # set attributes of self
+            self.pid = pid
+            self.returncode = None
+            self._handle = hp
+            self.sentinel = int(hp)
+            util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+
+            # send information to child
+            popen.set_spawning_popen(self)
+            try:
+                reduction.dump(prep_data, to_child)
+                reduction.dump(process_obj, to_child)
+            finally:
+                popen.set_spawning_popen(None)
+
+    def duplicate_for_child(self, handle):
+        assert self is popen.get_spawning_popen()
+        return reduction.duplicate(handle, self.sentinel)
+
+    def wait(self, timeout=None):
+        if self.returncode is None:
+            if timeout is None:
+                msecs = _winapi.INFINITE
+            else:
+                msecs = max(0, int(timeout * 1000 + 0.5))
+
+            res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+            if res == _winapi.WAIT_OBJECT_0:
+                code = _winapi.GetExitCodeProcess(self._handle)
+                if code == TERMINATE:
+                    code = -signal.SIGTERM
+                self.returncode = code
+
+        return self.returncode
+
+    def poll(self):
+        return self.wait(timeout=0)
+
+    def terminate(self):
+        if self.returncode is None:
+            try:
+                _winapi.TerminateProcess(int(self._handle), TERMINATE)
+            except OSError:
+                if self.wait(timeout=1.0) is None:
+                    raise
+
+    @staticmethod
+    def ensure_helpers_running():
+        pass
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 893507b..c1cb36f 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -43,7 +43,7 @@
     Return list of process objects corresponding to live child processes
     '''
     _cleanup()
-    return list(_current_process._children)
+    return list(_children)
 
 #
 #
@@ -51,9 +51,9 @@
 
 def _cleanup():
     # check for processes which have finished
-    for p in list(_current_process._children):
+    for p in list(_children):
         if p._popen.poll() is not None:
-            _current_process._children.discard(p)
+            _children.discard(p)
 
 #
 # The `Process` class
@@ -63,21 +63,16 @@
     '''
     Process objects represent activity that is run in a separate process
 
-    The class is analagous to `threading.Thread`
+    The class is analogous to `threading.Thread`
     '''
     _Popen = None
 
     def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
                  *, daemon=None):
         assert group is None, 'group argument must be None for now'
-        count = next(_current_process._counter)
+        count = next(_process_counter)
         self._identity = _current_process._identity + (count,)
-        self._authkey = _current_process._authkey
-        if daemon is not None:
-            self._daemonic = daemon
-        else:
-            self._daemonic = _current_process._daemonic
-        self._tempdir = _current_process._tempdir
+        self._config = _current_process._config.copy()
         self._parent_pid = os.getpid()
         self._popen = None
         self._target = target
@@ -85,6 +80,8 @@
         self._kwargs = dict(kwargs)
         self._name = name or type(self).__name__ + '-' + \
                      ':'.join(str(i) for i in self._identity)
+        if daemon is not None:
+            self.daemon = daemon
         _dangling.add(self)
 
     def run(self):
@@ -101,16 +98,16 @@
         assert self._popen is None, 'cannot start a process twice'
         assert self._parent_pid == os.getpid(), \
                'can only start a process object created by current process'
-        assert not _current_process._daemonic, \
+        assert not _current_process._config.get('daemon'), \
                'daemonic processes are not allowed to have children'
         _cleanup()
         if self._Popen is not None:
             Popen = self._Popen
         else:
-            from .forking import Popen
+            from .popen import Popen
         self._popen = Popen(self)
         self._sentinel = self._popen.sentinel
-        _current_process._children.add(self)
+        _children.add(self)
 
     def terminate(self):
         '''
@@ -126,7 +123,7 @@
         assert self._popen is not None, 'can only join a started process'
         res = self._popen.wait(timeout)
         if res is not None:
-            _current_process._children.discard(self)
+            _children.discard(self)
 
     def is_alive(self):
         '''
@@ -154,7 +151,7 @@
         '''
         Return whether process is a daemon
         '''
-        return self._daemonic
+        return self._config.get('daemon', False)
 
     @daemon.setter
     def daemon(self, daemonic):
@@ -162,18 +159,18 @@
         Set whether process is a daemon
         '''
         assert self._popen is None, 'process has already started'
-        self._daemonic = daemonic
+        self._config['daemon'] = daemonic
 
     @property
     def authkey(self):
-        return self._authkey
+        return self._config['authkey']
 
     @authkey.setter
     def authkey(self, authkey):
         '''
         Set authorization key of process
         '''
-        self._authkey = AuthenticationString(authkey)
+        self._config['authkey'] = AuthenticationString(authkey)
 
     @property
     def exitcode(self):
@@ -227,17 +224,17 @@
                 status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
 
         return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
-                                   status, self._daemonic and ' daemon' or '')
+                                   status, self.daemon and ' daemon' or '')
 
     ##
 
     def _bootstrap(self):
         from . import util
-        global _current_process
+        global _current_process, _process_counter, _children
 
         try:
-            self._children = set()
-            self._counter = itertools.count(1)
+            _process_counter = itertools.count(1)
+            _children = set()
             if sys.stdin is not None:
                 try:
                     sys.stdin.close()
@@ -285,8 +282,8 @@
 
 class AuthenticationString(bytes):
     def __reduce__(self):
-        from .forking import Popen
-        if not Popen.thread_is_spawning():
+        from .popen import get_spawning_popen
+        if get_spawning_popen() is None:
             raise TypeError(
                 'Pickling an AuthenticationString object is '
                 'disallowed for security reasons'
@@ -301,16 +298,19 @@
 
     def __init__(self):
         self._identity = ()
-        self._daemonic = False
         self._name = 'MainProcess'
         self._parent_pid = None
         self._popen = None
-        self._counter = itertools.count(1)
-        self._children = set()
-        self._authkey = AuthenticationString(os.urandom(32))
-        self._tempdir = None
+        self._config = {'authkey': AuthenticationString(os.urandom(32)),
+                        'semprefix': 'mp'}
+        # Note that some versions of FreeBSD only allow named
+        # semaphores to have names of up to 14 characters.  Therfore
+        # we choose a short prefix.
+
 
 _current_process = _MainProcess()
+_process_counter = itertools.count(1)
+_children = set()
 del _MainProcess
 
 #
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index ec188ee..10e40a5 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -18,11 +18,15 @@
 import errno
 
 from queue import Empty, Full
+
 import _multiprocessing
-from multiprocessing.connection import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning, ForkingPickler
+
+from . import connection
+from . import popen
+from . import synchronize
+
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+from .reduction import ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -34,14 +38,14 @@
         if maxsize <= 0:
             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
         self._maxsize = maxsize
-        self._reader, self._writer = Pipe(duplex=False)
-        self._rlock = Lock()
+        self._reader, self._writer = connection.Pipe(duplex=False)
+        self._rlock = synchronize.Lock()
         self._opid = os.getpid()
         if sys.platform == 'win32':
             self._wlock = None
         else:
-            self._wlock = Lock()
-        self._sem = BoundedSemaphore(maxsize)
+            self._wlock = synchronize.Lock()
+        self._sem = synchronize.BoundedSemaphore(maxsize)
         # For use by concurrent.futures
         self._ignore_epipe = False
 
@@ -51,7 +55,7 @@
             register_after_fork(self, Queue._after_fork)
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                 self._rlock, self._wlock, self._sem, self._opid)
 
@@ -208,8 +212,6 @@
     @staticmethod
     def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
         debug('starting thread to feed data to pipe')
-        from .util import is_exiting
-
         nacquire = notempty.acquire
         nrelease = notempty.release
         nwait = notempty.wait
@@ -279,8 +281,8 @@
 
     def __init__(self, maxsize=0):
         Queue.__init__(self, maxsize)
-        self._unfinished_tasks = Semaphore(0)
-        self._cond = Condition()
+        self._unfinished_tasks = synchronize.Semaphore(0)
+        self._cond = synchronize.Condition()
 
     def __getstate__(self):
         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -331,19 +333,19 @@
 class SimpleQueue(object):
 
     def __init__(self):
-        self._reader, self._writer = Pipe(duplex=False)
-        self._rlock = Lock()
+        self._reader, self._writer = connection.Pipe(duplex=False)
+        self._rlock = synchronize.Lock()
         self._poll = self._reader.poll
         if sys.platform == 'win32':
             self._wlock = None
         else:
-            self._wlock = Lock()
+            self._wlock = synchronize.Lock()
 
     def empty(self):
         return not self._poll()
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         return (self._reader, self._writer, self._rlock, self._wlock)
 
     def __setstate__(self, state):
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 656fa8f..5bbbcf4 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -1,6 +1,5 @@
 #
-# Module to allow connection and socket objects to be transferred
-# between processes
+# Module which deals with pickling of objects.
 #
 # multiprocessing/reduction.py
 #
@@ -8,27 +7,57 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
-__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
-
+import copyreg
+import functools
+import io
 import os
-import sys
+import pickle
 import socket
-import threading
-import struct
-import signal
+import sys
 
-from multiprocessing import current_process
-from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.util import is_exiting, sub_warning
+from . import popen
+from . import util
 
+__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
+
+
+HAVE_SEND_HANDLE = (sys.platform == 'win32' or
+                    (hasattr(socket, 'CMSG_LEN') and
+                     hasattr(socket, 'SCM_RIGHTS') and
+                     hasattr(socket.socket, 'sendmsg')))
 
 #
-#
+# Pickler subclass
 #
 
-if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
-                                   hasattr(socket, 'SCM_RIGHTS'))):
-    raise ImportError('pickling of connections not supported')
+class ForkingPickler(pickle.Pickler):
+    '''Pickler subclass used by multiprocessing.'''
+    _extra_reducers = {}
+    _copyreg_dispatch_table = copyreg.dispatch_table
+
+    def __init__(self, *args):
+        super().__init__(*args)
+        self.dispatch_table = self._copyreg_dispatch_table.copy()
+        self.dispatch_table.update(self._extra_reducers)
+
+    @classmethod
+    def register(cls, type, reduce):
+        '''Register a reduce function for a type.'''
+        cls._extra_reducers[type] = reduce
+
+    @classmethod
+    def dumps(cls, obj, protocol=None):
+        buf = io.BytesIO()
+        cls(buf, protocol).dump(obj)
+        return buf.getbuffer()
+
+    loads = pickle.loads
+
+register = ForkingPickler.register
+
+def dump(obj, file, protocol=None):
+    '''Replacement for pickle.dump() using ForkingPickler.'''
+    ForkingPickler(file, protocol).dump(obj)
 
 #
 # Platform specific definitions
@@ -36,20 +65,44 @@
 
 if sys.platform == 'win32':
     # Windows
-    __all__ += ['reduce_pipe_connection']
+    __all__ += ['DupHandle', 'duplicate', 'steal_handle']
     import _winapi
 
+    def duplicate(handle, target_process=None, inheritable=False):
+        '''Duplicate a handle.  (target_process is a handle not a pid!)'''
+        if target_process is None:
+            target_process = _winapi.GetCurrentProcess()
+        return _winapi.DuplicateHandle(
+            _winapi.GetCurrentProcess(), handle, target_process,
+            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
+
+    def steal_handle(source_pid, handle):
+        '''Steal a handle from process identified by source_pid.'''
+        source_process_handle = _winapi.OpenProcess(
+            _winapi.PROCESS_DUP_HANDLE, False, source_pid)
+        try:
+            return _winapi.DuplicateHandle(
+                source_process_handle, handle,
+                _winapi.GetCurrentProcess(), 0, False,
+                _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
+        finally:
+            _winapi.CloseHandle(source_process_handle)
+
     def send_handle(conn, handle, destination_pid):
+        '''Send a handle over a local connection.'''
         dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
         conn.send(dh)
 
     def recv_handle(conn):
+        '''Receive a handle over a local connection.'''
         return conn.recv().detach()
 
     class DupHandle(object):
+        '''Picklable wrapper for a handle.'''
         def __init__(self, handle, access, pid=None):
-            # duplicate handle for process with given pid
             if pid is None:
+                # We just duplicate the handle in the current process and
+                # let the receiving process steal the handle.
                 pid = os.getpid()
             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
             try:
@@ -62,9 +115,12 @@
             self._pid = pid
 
         def detach(self):
+            '''Get the handle.  This should only be called once.'''
             # retrieve handle from process which currently owns it
             if self._pid == os.getpid():
+                # The handle has already been duplicated for this process.
                 return self._handle
+            # We must steal the handle from the process whose pid is self._pid.
             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
                                        self._pid)
             try:
@@ -74,207 +130,112 @@
             finally:
                 _winapi.CloseHandle(proc)
 
-    class DupSocket(object):
-        def __init__(self, sock):
-            new_sock = sock.dup()
-            def send(conn, pid):
-                share = new_sock.share(pid)
-                conn.send_bytes(share)
-            self._id = resource_sharer.register(send, new_sock.close)
-
-        def detach(self):
-            conn = resource_sharer.get_connection(self._id)
-            try:
-                share = conn.recv_bytes()
-                return socket.fromshare(share)
-            finally:
-                conn.close()
-
-    def reduce_socket(s):
-        return rebuild_socket, (DupSocket(s),)
-
-    def rebuild_socket(ds):
-        return ds.detach()
-
-    def reduce_connection(conn):
-        handle = conn.fileno()
-        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
-            ds = DupSocket(s)
-            return rebuild_connection, (ds, conn.readable, conn.writable)
-
-    def rebuild_connection(ds, readable, writable):
-        from .connection import Connection
-        sock = ds.detach()
-        return Connection(sock.detach(), readable, writable)
-
-    def reduce_pipe_connection(conn):
-        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
-                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
-        dh = DupHandle(conn.fileno(), access)
-        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
-
-    def rebuild_pipe_connection(dh, readable, writable):
-        from .connection import PipeConnection
-        handle = dh.detach()
-        return PipeConnection(handle, readable, writable)
-
 else:
     # Unix
+    __all__ += ['DupFd', 'sendfds', 'recvfds']
+    import array
 
     # On MacOSX we should acknowledge receipt of fds -- see Issue14669
     ACKNOWLEDGE = sys.platform == 'darwin'
 
-    def send_handle(conn, handle, destination_pid):
-        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
-            s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
-                                struct.pack("@i", handle))])
-        if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
+    def sendfds(sock, fds):
+        '''Send an array of fds over an AF_UNIX socket.'''
+        fds = array.array('i', fds)
+        msg = bytes([len(fds) % 256])
+        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+        if ACKNOWLEDGE and sock.recv(1) != b'A':
             raise RuntimeError('did not receive acknowledgement of fd')
 
-    def recv_handle(conn):
-        size = struct.calcsize("@i")
+    def recvfds(sock, size):
+        '''Receive an array of fds over an AF_UNIX socket.'''
+        a = array.array('i')
+        bytes_size = a.itemsize * size
+        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
+        if not msg and not ancdata:
+            raise EOFError
+        try:
+            if ACKNOWLEDGE:
+                sock.send(b'A')
+            if len(ancdata) != 1:
+                raise RuntimeError('received %d items of ancdata' %
+                                   len(ancdata))
+            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+            if (cmsg_level == socket.SOL_SOCKET and
+                cmsg_type == socket.SCM_RIGHTS):
+                if len(cmsg_data) % a.itemsize != 0:
+                    raise ValueError
+                a.frombytes(cmsg_data)
+                assert len(a) % 256 == msg[0]
+                return list(a)
+        except (ValueError, IndexError):
+            pass
+        raise RuntimeError('Invalid data received')
+
+    def send_handle(conn, handle, destination_pid):
+        '''Send a handle over a local connection.'''
         with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
-            msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
-            try:
-                if ACKNOWLEDGE:
-                    conn.send_bytes(b'ACK')
-                cmsg_level, cmsg_type, cmsg_data = ancdata[0]
-                if (cmsg_level == socket.SOL_SOCKET and
-                    cmsg_type == socket.SCM_RIGHTS):
-                    return struct.unpack("@i", cmsg_data[:size])[0]
-            except (ValueError, IndexError, struct.error):
-                pass
-            raise RuntimeError('Invalid data received')
+            sendfds(s, [handle])
 
-    class DupFd(object):
-        def __init__(self, fd):
-            new_fd = os.dup(fd)
-            def send(conn, pid):
-                send_handle(conn, new_fd, pid)
-            def close():
-                os.close(new_fd)
-            self._id = resource_sharer.register(send, close)
+    def recv_handle(conn):
+        '''Receive a handle over a local connection.'''
+        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+            return recvfds(s, 1)[0]
 
-        def detach(self):
-            conn = resource_sharer.get_connection(self._id)
-            try:
-                return recv_handle(conn)
-            finally:
-                conn.close()
+    def DupFd(fd):
+        '''Return a wrapper for an fd.'''
+        popen_obj = popen.get_spawning_popen()
+        if popen_obj is not None:
+            return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
+        elif HAVE_SEND_HANDLE:
+            from . import resource_sharer
+            return resource_sharer.DupFd(fd)
+        else:
+            raise ValueError('SCM_RIGHTS appears not to be available')
 
-    def reduce_socket(s):
+#
+# Try making some callable types picklable
+#
+
+def _reduce_method(m):
+    if m.__self__ is None:
+        return getattr, (m.__class__, m.__func__.__name__)
+    else:
+        return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+    def f(self):
+        pass
+register(type(_C().f), _reduce_method)
+
+
+def _reduce_method_descriptor(m):
+    return getattr, (m.__objclass__, m.__name__)
+register(type(list.append), _reduce_method_descriptor)
+register(type(int.__add__), _reduce_method_descriptor)
+
+
+def _reduce_partial(p):
+    return _rebuild_partial, (p.func, p.args, p.keywords or {})
+def _rebuild_partial(func, args, keywords):
+    return functools.partial(func, *args, **keywords)
+register(functools.partial, _reduce_partial)
+
+#
+# Make sockets picklable
+#
+
+if sys.platform == 'win32':
+    def _reduce_socket(s):
+        from .resource_sharer import DupSocket
+        return _rebuild_socket, (DupSocket(s),)
+    def _rebuild_socket(ds):
+        return ds.detach()
+    register(socket.socket, _reduce_socket)
+
+else:
+    def _reduce_socket(s):
         df = DupFd(s.fileno())
-        return rebuild_socket, (df, s.family, s.type, s.proto)
-
-    def rebuild_socket(df, family, type, proto):
+        return _rebuild_socket, (df, s.family, s.type, s.proto)
+    def _rebuild_socket(df, family, type, proto):
         fd = df.detach()
-        s = socket.fromfd(fd, family, type, proto)
-        os.close(fd)
-        return s
-
-    def reduce_connection(conn):
-        df = DupFd(conn.fileno())
-        return rebuild_connection, (df, conn.readable, conn.writable)
-
-    def rebuild_connection(df, readable, writable):
-        from .connection import Connection
-        fd = df.detach()
-        return Connection(fd, readable, writable)
-
-#
-# Server which shares registered resources with clients
-#
-
-class ResourceSharer(object):
-    def __init__(self):
-        self._key = 0
-        self._cache = {}
-        self._old_locks = []
-        self._lock = threading.Lock()
-        self._listener = None
-        self._address = None
-        self._thread = None
-        register_after_fork(self, ResourceSharer._afterfork)
-
-    def register(self, send, close):
-        with self._lock:
-            if self._address is None:
-                self._start()
-            self._key += 1
-            self._cache[self._key] = (send, close)
-            return (self._address, self._key)
-
-    @staticmethod
-    def get_connection(ident):
-        from .connection import Client
-        address, key = ident
-        c = Client(address, authkey=current_process().authkey)
-        c.send((key, os.getpid()))
-        return c
-
-    def stop(self, timeout=None):
-        from .connection import Client
-        with self._lock:
-            if self._address is not None:
-                c = Client(self._address, authkey=current_process().authkey)
-                c.send(None)
-                c.close()
-                self._thread.join(timeout)
-                if self._thread.is_alive():
-                    sub_warn('ResourceSharer thread did not stop when asked')
-                self._listener.close()
-                self._thread = None
-                self._address = None
-                self._listener = None
-                for key, (send, close) in self._cache.items():
-                    close()
-                self._cache.clear()
-
-    def _afterfork(self):
-        for key, (send, close) in self._cache.items():
-            close()
-        self._cache.clear()
-        # If self._lock was locked at the time of the fork, it may be broken
-        # -- see issue 6721.  Replace it without letting it be gc'ed.
-        self._old_locks.append(self._lock)
-        self._lock = threading.Lock()
-        if self._listener is not None:
-            self._listener.close()
-        self._listener = None
-        self._address = None
-        self._thread = None
-
-    def _start(self):
-        from .connection import Listener
-        assert self._listener is None
-        debug('starting listener and thread for sending handles')
-        self._listener = Listener(authkey=current_process().authkey)
-        self._address = self._listener.address
-        t = threading.Thread(target=self._serve)
-        t.daemon = True
-        t.start()
-        self._thread = t
-
-    def _serve(self):
-        if hasattr(signal, 'pthread_sigmask'):
-            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
-        while 1:
-            try:
-                conn = self._listener.accept()
-                msg = conn.recv()
-                if msg is None:
-                    break
-                key, destination_pid = msg
-                send, close = self._cache.pop(key)
-                send(conn, destination_pid)
-                close()
-                conn.close()
-            except:
-                if not is_exiting():
-                    import traceback
-                    sub_warning(
-                        'thread for sharing handles raised exception :\n' +
-                        '-'*79 + '\n' + traceback.format_exc() + '-'*79
-                        )
-
-resource_sharer = ResourceSharer()
+        return socket.socket(family, type, proto, fileno=fd)
+    register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
new file mode 100644
index 0000000..5e46fc6
--- /dev/null
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -0,0 +1,158 @@
+#
+# We use a background thread for sharing fds on Unix, and for sharing sockets on
+# Windows.
+#
+# A client which wants to pickle a resource registers it with the resource
+# sharer and gets an identifier in return.  The unpickling process will connect
+# to the resource sharer, sends the identifier and its pid, and then receives
+# the resource.
+#
+
+import os
+import signal
+import socket
+import sys
+import threading
+
+from . import process
+from . import reduction
+from . import util
+
+__all__ = ['stop']
+
+
+if sys.platform == 'win32':
+    __all__ += ['DupSocket']
+
+    class DupSocket(object):
+        '''Picklable wrapper for a socket.'''
+        def __init__(self, sock):
+            new_sock = sock.dup()
+            def send(conn, pid):
+                share = new_sock.share(pid)
+                conn.send_bytes(share)
+            self._id = _resource_sharer.register(send, new_sock.close)
+
+        def detach(self):
+            '''Get the socket.  This should only be called once.'''
+            with _resource_sharer.get_connection(self._id) as conn:
+                share = conn.recv_bytes()
+                return socket.fromshare(share)
+
+else:
+    __all__ += ['DupFd']
+
+    class DupFd(object):
+        '''Wrapper for fd which can be used at any time.'''
+        def __init__(self, fd):
+            new_fd = os.dup(fd)
+            def send(conn, pid):
+                reduction.send_handle(conn, new_fd, pid)
+            def close():
+                os.close(new_fd)
+            self._id = _resource_sharer.register(send, close)
+
+        def detach(self):
+            '''Get the fd.  This should only be called once.'''
+            with _resource_sharer.get_connection(self._id) as conn:
+                return reduction.recv_handle(conn)
+
+
+class _ResourceSharer(object):
+    '''Manager for resouces using background thread.'''
+    def __init__(self):
+        self._key = 0
+        self._cache = {}
+        self._old_locks = []
+        self._lock = threading.Lock()
+        self._listener = None
+        self._address = None
+        self._thread = None
+        util.register_after_fork(self, _ResourceSharer._afterfork)
+
+    def register(self, send, close):
+        '''Register resource, returning an identifier.'''
+        with self._lock:
+            if self._address is None:
+                self._start()
+            self._key += 1
+            self._cache[self._key] = (send, close)
+            return (self._address, self._key)
+
+    @staticmethod
+    def get_connection(ident):
+        '''Return connection from which to receive identified resource.'''
+        from .connection import Client
+        address, key = ident
+        c = Client(address, authkey=process.current_process().authkey)
+        c.send((key, os.getpid()))
+        return c
+
+    def stop(self, timeout=None):
+        '''Stop the background thread and clear registered resources.'''
+        from .connection import Client
+        with self._lock:
+            if self._address is not None:
+                c = Client(self._address,
+                           authkey=process.current_process().authkey)
+                c.send(None)
+                c.close()
+                self._thread.join(timeout)
+                if self._thread.is_alive():
+                    util.sub_warning('_ResourceSharer thread did '
+                                     'not stop when asked')
+                self._listener.close()
+                self._thread = None
+                self._address = None
+                self._listener = None
+                for key, (send, close) in self._cache.items():
+                    close()
+                self._cache.clear()
+
+    def _afterfork(self):
+        for key, (send, close) in self._cache.items():
+            close()
+        self._cache.clear()
+        # If self._lock was locked at the time of the fork, it may be broken
+        # -- see issue 6721.  Replace it without letting it be gc'ed.
+        self._old_locks.append(self._lock)
+        self._lock = threading.Lock()
+        if self._listener is not None:
+            self._listener.close()
+        self._listener = None
+        self._address = None
+        self._thread = None
+
+    def _start(self):
+        from .connection import Listener
+        assert self._listener is None
+        util.debug('starting listener and thread for sending handles')
+        self._listener = Listener(authkey=process.current_process().authkey)
+        self._address = self._listener.address
+        t = threading.Thread(target=self._serve)
+        t.daemon = True
+        t.start()
+        self._thread = t
+
+    def _serve(self):
+        if hasattr(signal, 'pthread_sigmask'):
+            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
+        while 1:
+            try:
+                with self._listener.accept() as conn:
+                    msg = conn.recv()
+                    if msg is None:
+                        break
+                    key, destination_pid = msg
+                    send, close = self._cache.pop(key)
+                    try:
+                        send(conn, destination_pid)
+                    finally:
+                        close()
+            except:
+                if not util.is_exiting():
+                    sys.excepthook(*sys.exc_info())
+
+
+_resource_sharer = _ResourceSharer()
+stop = _resource_sharer.stop
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
new file mode 100644
index 0000000..4a2d636
--- /dev/null
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -0,0 +1,135 @@
+#
+# On Unix we run a server process which keeps track of unlinked
+# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# pipe.  Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining semaphore names.
+#
+# This is important because the system only supports a limited number
+# of named semaphores, and they will not be automatically removed till
+# the next reboot.  Without this semaphore tracker process, "killall
+# python" would probably leave unlinked semaphores.
+#
+
+import errno
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+
+from . import spawn
+from . import util
+from . import current_process
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+
+_lock = threading.Lock()
+
+
+def ensure_running():
+    '''Make sure that semaphore tracker process is running.
+
+    This can be run from any process.  Usually a child process will use
+    the semaphore created by its parent.'''
+    with _lock:
+        config = current_process()._config
+        if config.get('semaphore_tracker_fd') is not None:
+            return
+        fds_to_pass = []
+        try:
+            fds_to_pass.append(sys.stderr.fileno())
+        except Exception:
+            pass
+        cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
+        r, semaphore_tracker_fd = util.pipe()
+        try:
+            fds_to_pass.append(r)
+            # process will out live us, so no need to wait on pid
+            exe = spawn.get_executable()
+            args = [exe] + util._args_from_interpreter_flags()
+            args += ['-c', cmd % r]
+            util.spawnv_passfds(exe, args, fds_to_pass)
+        except:
+            os.close(semaphore_tracker_fd)
+            raise
+        else:
+            config['semaphore_tracker_fd'] = semaphore_tracker_fd
+        finally:
+            os.close(r)
+
+
+def register(name):
+    '''Register name of semaphore with semaphore tracker.'''
+    _send('REGISTER', name)
+
+
+def unregister(name):
+    '''Unregister name of semaphore with semaphore tracker.'''
+    _send('UNREGISTER', name)
+
+
+def _send(cmd, name):
+    msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+    if len(name) > 512:
+        # posix guarantees that writes to a pipe of less than PIPE_BUF
+        # bytes are atomic, and that PIPE_BUF >= 512
+        raise ValueError('name too long')
+    fd = current_process()._config['semaphore_tracker_fd']
+    nbytes = os.write(fd, msg)
+    assert nbytes == len(msg)
+
+
+def main(fd):
+    '''Run semaphore tracker.'''
+    # protect the process from ^C and "killall python" etc
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
+    signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+    for f in (sys.stdin, sys.stdout):
+        try:
+            f.close()
+        except Exception:
+            pass
+
+    cache = set()
+    try:
+        # keep track of registered/unregistered semaphores
+        with open(fd, 'rb') as f:
+            for line in f:
+                try:
+                    cmd, name = line.strip().split(b':')
+                    if cmd == b'REGISTER':
+                        cache.add(name)
+                    elif cmd == b'UNREGISTER':
+                        cache.remove(name)
+                    else:
+                        raise RuntimeError('unrecognized command %r' % cmd)
+                except Exception:
+                    try:
+                        sys.excepthook(*sys.exc_info())
+                    except:
+                        pass
+    finally:
+        # all processes have terminated; cleanup any remaining semaphores
+        if cache:
+            try:
+                warnings.warn('semaphore_tracker: There appear to be %d '
+                              'leaked semaphores to clean up at shutdown' %
+                              len(cache))
+            except Exception:
+                pass
+        for name in cache:
+            # For some reason the process which created and registered this
+            # semaphore has failed to unregister it. Presumably it has died.
+            # We therefore unlink it.
+            try:
+                name = name.decode('ascii')
+                try:
+                    _multiprocessing.sem_unlink(name)
+                except Exception as e:
+                    warnings.warn('semaphore_tracker: %r: %s' % (name, e))
+            finally:
+                pass
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index a358ed4..a97dadf 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -10,8 +10,11 @@
 import ctypes
 import weakref
 
-from multiprocessing import heap, RLock
-from multiprocessing.forking import assert_spawning, ForkingPickler
+from . import heap
+
+from .synchronize import RLock
+from .reduction import ForkingPickler
+from .popen import assert_spawning
 
 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
new file mode 100644
index 0000000..83561db
--- /dev/null
+++ b/Lib/multiprocessing/spawn.py
@@ -0,0 +1,258 @@
+#
+# Code used to start processes when using the spawn or forkserver
+# start methods.
+#
+# multiprocessing/spawn.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import os
+import pickle
+import sys
+
+from . import process
+from . import util
+from . import popen
+
+__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
+           'get_preparation_data', 'get_command_line', 'import_main_path']
+
+#
+# _python_exe is the assumed path to the python executable.
+# People embedding Python want to modify it.
+#
+
+if sys.platform != 'win32':
+    WINEXE = False
+    WINSERVICE = False
+else:
+    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+if WINSERVICE:
+    _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
+else:
+    _python_exe = sys.executable
+
+def set_executable(exe):
+    global _python_exe
+    _python_exe = exe
+
+def get_executable():
+    return _python_exe
+
+#
+#
+#
+
+def is_forking(argv):
+    '''
+    Return whether commandline indicates we are forking
+    '''
+    if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
+        return True
+    else:
+        return False
+
+
+def freeze_support():
+    '''
+    Run code for process object if this in not the main process
+    '''
+    if is_forking(sys.argv):
+        main()
+        sys.exit()
+
+
+def get_command_line():
+    '''
+    Returns prefix of command line used for spawning a child process
+    '''
+    if getattr(sys, 'frozen', False):
+        return [sys.executable, '--multiprocessing-fork']
+    else:
+        prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+        opts = util._args_from_interpreter_flags()
+        return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
+
+
+def spawn_main():
+    '''
+    Run code specifed by data received over pipe
+    '''
+    assert is_forking(sys.argv)
+    handle = int(sys.argv[-1])
+    if sys.platform == 'win32':
+        import msvcrt
+        from .reduction import steal_handle
+        pid = int(sys.argv[-2])
+        new_handle = steal_handle(pid, handle)
+        fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
+    else:
+        fd = handle
+    exitcode = _main(fd)
+    sys.exit(exitcode)
+
+
+def _main(fd):
+    with os.fdopen(fd, 'rb', closefd=True) as from_parent:
+        process.current_process()._inheriting = True
+        try:
+            preparation_data = pickle.load(from_parent)
+            prepare(preparation_data)
+            self = pickle.load(from_parent)
+        finally:
+            del process.current_process()._inheriting
+    return self._bootstrap()
+
+
+def _check_not_importing_main():
+    if getattr(process.current_process(), '_inheriting', False):
+        raise RuntimeError('''
+        An attempt has been made to start a new process before the
+        current process has finished its bootstrapping phase.
+
+        This probably means that you are not using fork to start your
+        child processes and you have forgotten to use the proper idiom
+        in the main module:
+
+            if __name__ == '__main__':
+                freeze_support()
+                ...
+
+        The "freeze_support()" line can be omitted if the program
+        is not going to be frozen to produce an executable.''')
+
+
+def get_preparation_data(name):
+    '''
+    Return info about parent needed by child to unpickle process object
+    '''
+    _check_not_importing_main()
+    d = dict(
+        log_to_stderr=util._log_to_stderr,
+        authkey=process.current_process().authkey,
+        )
+
+    if util._logger is not None:
+        d['log_level'] = util._logger.getEffectiveLevel()
+
+    sys_path=sys.path.copy()
+    try:
+        i = sys_path.index('')
+    except ValueError:
+        pass
+    else:
+        sys_path[i] = process.ORIGINAL_DIR
+
+    d.update(
+        name=name,
+        sys_path=sys_path,
+        sys_argv=sys.argv,
+        orig_dir=process.ORIGINAL_DIR,
+        dir=os.getcwd(),
+        start_method=popen.get_start_method(),
+        )
+
+    if sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
+        main_path = getattr(sys.modules['__main__'], '__file__', None)
+        if not main_path and sys.argv[0] not in ('', '-c'):
+            main_path = sys.argv[0]
+        if main_path is not None:
+            if (not os.path.isabs(main_path) and
+                        process.ORIGINAL_DIR is not None):
+                main_path = os.path.join(process.ORIGINAL_DIR, main_path)
+            d['main_path'] = os.path.normpath(main_path)
+
+    return d
+
+#
+# Prepare current process
+#
+
+old_main_modules = []
+
+def prepare(data):
+    '''
+    Try to get current process ready to unpickle process object
+    '''
+    if 'name' in data:
+        process.current_process().name = data['name']
+
+    if 'authkey' in data:
+        process.current_process().authkey = data['authkey']
+
+    if 'log_to_stderr' in data and data['log_to_stderr']:
+        util.log_to_stderr()
+
+    if 'log_level' in data:
+        util.get_logger().setLevel(data['log_level'])
+
+    if 'sys_path' in data:
+        sys.path = data['sys_path']
+
+    if 'sys_argv' in data:
+        sys.argv = data['sys_argv']
+
+    if 'dir' in data:
+        os.chdir(data['dir'])
+
+    if 'orig_dir' in data:
+        process.ORIGINAL_DIR = data['orig_dir']
+
+    if 'start_method' in data:
+        popen.set_start_method(data['start_method'], start_helpers=False)
+
+    if 'main_path' in data:
+        import_main_path(data['main_path'])
+
+
+def import_main_path(main_path):
+    '''
+    Set sys.modules['__main__'] to module at main_path
+    '''
+    # XXX (ncoghlan): The following code makes several bogus
+    # assumptions regarding the relationship between __file__
+    # and a module's real name. See PEP 302 and issue #10845
+    if getattr(sys.modules['__main__'], '__file__', None) == main_path:
+        return
+
+    main_name = os.path.splitext(os.path.basename(main_path))[0]
+    if main_name == '__init__':
+        main_name = os.path.basename(os.path.dirname(main_path))
+
+    if main_name == '__main__':
+        main_module = sys.modules['__main__']
+        main_module.__file__ = main_path
+    elif main_name != 'ipython':
+        # Main modules not actually called __main__.py may
+        # contain additional code that should still be executed
+        import importlib
+        import types
+
+        if main_path is None:
+            dirs = None
+        elif os.path.basename(main_path).startswith('__init__.py'):
+            dirs = [os.path.dirname(os.path.dirname(main_path))]
+        else:
+            dirs = [os.path.dirname(main_path)]
+
+        assert main_name not in sys.modules, main_name
+        sys.modules.pop('__mp_main__', None)
+        # We should not try to load __main__
+        # since that would execute 'if __name__ == "__main__"'
+        # clauses, potentially causing a psuedo fork bomb.
+        loader = importlib.find_loader(main_name, path=dirs)
+        main_module = types.ModuleType(main_name)
+        try:
+            loader.init_module_attrs(main_module)
+        except AttributeError:  # init_module_attrs is optional
+            pass
+        main_module.__name__ = '__mp_main__'
+        code = loader.get_code(main_name)
+        exec(code, main_module.__dict__)
+
+        old_main_modules.append(sys.modules['__main__'])
+        sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 0faca78..09736ef 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -11,20 +11,24 @@
     'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
     ]
 
+import os
 import threading
 import sys
-
+import itertools
+import tempfile
 import _multiprocessing
-from multiprocessing.process import current_process
-from multiprocessing.util import register_after_fork, debug
-from multiprocessing.forking import assert_spawning, Popen
+
 from time import time as _time
 
+from . import popen
+from . import process
+from . import util
+
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
 # See issue 3770
 try:
-    from _multiprocessing import SemLock
+    from _multiprocessing import SemLock, sem_unlink
 except (ImportError):
     raise ImportError("This platform lacks a functioning sem_open" +
                       " implementation, therefore, the required" +
@@ -44,15 +48,45 @@
 
 class SemLock(object):
 
+    _rand = tempfile._RandomNameSequence()
+
     def __init__(self, kind, value, maxvalue):
-        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
-        debug('created semlock with handle %s' % sl.handle)
+        unlink_immediately = (sys.platform == 'win32' or
+                              popen.get_start_method() == 'fork')
+        for i in range(100):
+            try:
+                sl = self._semlock = _multiprocessing.SemLock(
+                    kind, value, maxvalue, self._make_name(),
+                    unlink_immediately)
+            except FileExistsError:
+                pass
+            else:
+                break
+        else:
+            raise FileExistsError('cannot find name for semaphore')
+
+        util.debug('created semlock with handle %s' % sl.handle)
         self._make_methods()
 
         if sys.platform != 'win32':
             def _after_fork(obj):
                 obj._semlock._after_fork()
-            register_after_fork(self, _after_fork)
+            util.register_after_fork(self, _after_fork)
+
+        if self._semlock.name is not None:
+            # We only get here if we are on Unix with forking
+            # disabled.  When the object is garbage collected or the
+            # process shuts down we unlink the semaphore name
+            from .semaphore_tracker import register
+            register(self._semlock.name)
+            util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
+                          exitpriority=0)
+
+    @staticmethod
+    def _cleanup(name):
+        from .semaphore_tracker import unregister
+        sem_unlink(name)
+        unregister(name)
 
     def _make_methods(self):
         self.acquire = self._semlock.acquire
@@ -65,15 +99,24 @@
         return self._semlock.__exit__(*args)
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         sl = self._semlock
-        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
+        if sys.platform == 'win32':
+            h = popen.get_spawning_popen().duplicate_for_child(sl.handle)
+        else:
+            h = sl.handle
+        return (h, sl.kind, sl.maxvalue, sl.name)
 
     def __setstate__(self, state):
         self._semlock = _multiprocessing.SemLock._rebuild(*state)
-        debug('recreated blocker with handle %r' % state[0])
+        util.debug('recreated blocker with handle %r' % state[0])
         self._make_methods()
 
+    @staticmethod
+    def _make_name():
+        return '/%s-%s' % (process.current_process()._config['semprefix'],
+                           next(SemLock._rand))
+
 #
 # Semaphore
 #
@@ -122,7 +165,7 @@
     def __repr__(self):
         try:
             if self._semlock._is_mine():
-                name = current_process().name
+                name = process.current_process().name
                 if threading.current_thread().name != 'MainThread':
                     name += '|' + threading.current_thread().name
             elif self._semlock._get_value() == 1:
@@ -147,7 +190,7 @@
     def __repr__(self):
         try:
             if self._semlock._is_mine():
-                name = current_process().name
+                name = process.current_process().name
                 if threading.current_thread().name != 'MainThread':
                     name += '|' + threading.current_thread().name
                 count = self._semlock._count()
@@ -175,7 +218,7 @@
         self._make_methods()
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         return (self._lock, self._sleeping_count,
                 self._woken_count, self._wait_semaphore)
 
@@ -342,7 +385,7 @@
 
     def __init__(self, parties, action=None, timeout=None):
         import struct
-        from multiprocessing.heap import BufferWrapper
+        from .heap import BufferWrapper
         wrapper = BufferWrapper(struct.calcsize('i') * 2)
         cond = Condition()
         self.__setstate__((parties, action, timeout, cond, wrapper))
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index f5862b4..d9e4799 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -17,13 +17,13 @@
                         # cleanup function before multiprocessing does
 from subprocess import _args_from_interpreter_flags
 
-from multiprocessing.process import current_process, active_children
+from . import process
 
 __all__ = [
     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
-    'SUBDEBUG', 'SUBWARNING',
+    'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
     ]
 
 #
@@ -71,8 +71,6 @@
 
             _logger = logging.getLogger(LOGGER_NAME)
             _logger.propagate = 0
-            logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
-            logging.addLevelName(SUBWARNING, 'SUBWARNING')
 
             # XXX multiprocessing should cleanup before logging
             if hasattr(atexit, 'unregister'):
@@ -111,13 +109,14 @@
 
 def get_temp_dir():
     # get name of a temp directory which will be automatically cleaned up
-    if current_process()._tempdir is None:
+    tempdir = process.current_process()._config.get('tempdir')
+    if tempdir is None:
         import shutil, tempfile
         tempdir = tempfile.mkdtemp(prefix='pymp-')
         info('created temp directory %s', tempdir)
         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
-        current_process()._tempdir = tempdir
-    return current_process()._tempdir
+        process.current_process()._config['tempdir'] = tempdir
+    return tempdir
 
 #
 # Support for reinitialization of objects when bootstrapping a child process
@@ -273,8 +272,8 @@
 _exiting = False
 
 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
-                   active_children=active_children,
-                   current_process=current_process):
+                   active_children=process.active_children,
+                   current_process=process.current_process):
     # We hold on to references to functions in the arglist due to the
     # situation described below, where this function is called after this
     # module's globals are destroyed.
@@ -303,7 +302,7 @@
             # #9207.
 
             for p in active_children():
-                if p._daemonic:
+                if p.daemon:
                     info('calling terminate() for daemon %s', p.name)
                     p._popen.terminate()
 
@@ -335,3 +334,54 @@
         register_after_fork(self, lambda obj : obj.__dict__.clear())
     def __reduce__(self):
         return type(self), ()
+
+#
+# Close fds except those specified
+#
+
+try:
+    MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+    MAXFD = 256
+
+def close_all_fds_except(fds):
+    fds = list(fds) + [-1, MAXFD]
+    fds.sort()
+    assert fds[-1] == MAXFD, 'fd too large'
+    for i in range(len(fds) - 1):
+        os.closerange(fds[i]+1, fds[i+1])
+
+#
+# Start a program with only specified fds kept open
+#
+
+def spawnv_passfds(path, args, passfds):
+    import _posixsubprocess, fcntl
+    passfds = sorted(passfds)
+    tmp = []
+    # temporarily unset CLOEXEC on passed fds
+    for fd in passfds:
+        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
+        if flag & fcntl.FD_CLOEXEC:
+            fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC)
+            tmp.append((fd, flag))
+    errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
+    try:
+        return _posixsubprocess.fork_exec(
+            args, [os.fsencode(path)], True, passfds, None, None,
+            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+            False, False, None)
+    finally:
+        os.close(errpipe_read)
+        os.close(errpipe_write)
+        # reset CLOEXEC where necessary
+        for fd, flag in tmp:
+            fcntl.fcntl(fd, fcntl.F_SETFD, flag)
+
+#
+# Return pipe with CLOEXEC set on fds
+#
+
+def pipe():
+    import _posixsubprocess
+    return _posixsubprocess.cloexec_pipe()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
similarity index 92%
rename from Lib/test/test_multiprocessing.py
rename to Lib/test/_test_multiprocessing.py
index 2a6381d..f777edc 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -43,7 +43,7 @@
 
 try:
     from multiprocessing import reduction
-    HAS_REDUCTION = True
+    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
 except ImportError:
     HAS_REDUCTION = False
 
@@ -99,6 +99,9 @@
 except:
     MAXFD = 256
 
+# To speed up tests when using the forkserver, we can preload these:
+PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
+
 #
 # Some tests require ctypes
 #
@@ -330,7 +333,6 @@
 
     @classmethod
     def _test_recursion(cls, wconn, id):
-        from multiprocessing import forking
         wconn.send(id)
         if len(id) < 2:
             for i in range(2):
@@ -378,7 +380,7 @@
         self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
         event.set()
         p.join()
-        self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+        self.assertTrue(wait_for_handle(sentinel, timeout=1))
 
 #
 #
@@ -2493,7 +2495,7 @@
 
     @classmethod
     def tearDownClass(cls):
-        from multiprocessing.reduction import resource_sharer
+        from multiprocessing import resource_sharer
         resource_sharer.stop(timeout=5)
 
     @classmethod
@@ -2807,30 +2809,40 @@
 # Test that from ... import * works for each module
 #
 
-class _TestImportStar(BaseTestCase):
+class _TestImportStar(unittest.TestCase):
 
-    ALLOWED_TYPES = ('processes',)
+    def get_module_names(self):
+        import glob
+        folder = os.path.dirname(multiprocessing.__file__)
+        pattern = os.path.join(folder, '*.py')
+        files = glob.glob(pattern)
+        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
+        modules = ['multiprocessing.' + m for m in modules]
+        modules.remove('multiprocessing.__init__')
+        modules.append('multiprocessing')
+        return modules
 
     def test_import(self):
-        modules = [
-            'multiprocessing', 'multiprocessing.connection',
-            'multiprocessing.heap', 'multiprocessing.managers',
-            'multiprocessing.pool', 'multiprocessing.process',
-            'multiprocessing.synchronize', 'multiprocessing.util'
-            ]
+        modules = self.get_module_names()
+        if sys.platform == 'win32':
+            modules.remove('multiprocessing.popen_fork')
+            modules.remove('multiprocessing.popen_forkserver')
+            modules.remove('multiprocessing.popen_spawn_posix')
+        else:
+            modules.remove('multiprocessing.popen_spawn_win32')
+            if not HAS_REDUCTION:
+                modules.remove('multiprocessing.popen_forkserver')
 
-        if HAS_REDUCTION:
-            modules.append('multiprocessing.reduction')
-
-        if c_int is not None:
+        if c_int is None:
             # This module requires _ctypes
-            modules.append('multiprocessing.sharedctypes')
+            modules.remove('multiprocessing.sharedctypes')
 
         for name in modules:
             __import__(name)
             mod = sys.modules[name]
+            self.assertTrue(hasattr(mod, '__all__'), name)
 
-            for attr in getattr(mod, '__all__', ()):
+            for attr in mod.__all__:
                 self.assertTrue(
                     hasattr(mod, attr),
                     '%r does not have attribute %r' % (mod, attr)
@@ -2953,131 +2965,6 @@
         self.assertRaises((ValueError, OSError),
                           multiprocessing.connection.Connection, -1)
 
-#
-# Functions used to create test cases from the base ones in this module
-#
-
-def create_test_cases(Mixin, type):
-    result = {}
-    glob = globals()
-    Type = type.capitalize()
-    ALL_TYPES = {'processes', 'threads', 'manager'}
-
-    for name in list(glob.keys()):
-        if name.startswith('_Test'):
-            base = glob[name]
-            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
-            if type in base.ALLOWED_TYPES:
-                newname = 'With' + Type + name[1:]
-                class Temp(base, Mixin, unittest.TestCase):
-                    pass
-                result[newname] = Temp
-                Temp.__name__ = Temp.__qualname__ = newname
-                Temp.__module__ = Mixin.__module__
-    return result
-
-#
-# Create test cases
-#
-
-class ProcessesMixin(object):
-    TYPE = 'processes'
-    Process = multiprocessing.Process
-    connection = multiprocessing.connection
-    current_process = staticmethod(multiprocessing.current_process)
-    active_children = staticmethod(multiprocessing.active_children)
-    Pool = staticmethod(multiprocessing.Pool)
-    Pipe = staticmethod(multiprocessing.Pipe)
-    Queue = staticmethod(multiprocessing.Queue)
-    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
-    Lock = staticmethod(multiprocessing.Lock)
-    RLock = staticmethod(multiprocessing.RLock)
-    Semaphore = staticmethod(multiprocessing.Semaphore)
-    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
-    Condition = staticmethod(multiprocessing.Condition)
-    Event = staticmethod(multiprocessing.Event)
-    Barrier = staticmethod(multiprocessing.Barrier)
-    Value = staticmethod(multiprocessing.Value)
-    Array = staticmethod(multiprocessing.Array)
-    RawValue = staticmethod(multiprocessing.RawValue)
-    RawArray = staticmethod(multiprocessing.RawArray)
-
-testcases_processes = create_test_cases(ProcessesMixin, type='processes')
-globals().update(testcases_processes)
-
-
-class ManagerMixin(object):
-    TYPE = 'manager'
-    Process = multiprocessing.Process
-    Queue = property(operator.attrgetter('manager.Queue'))
-    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
-    Lock = property(operator.attrgetter('manager.Lock'))
-    RLock = property(operator.attrgetter('manager.RLock'))
-    Semaphore = property(operator.attrgetter('manager.Semaphore'))
-    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
-    Condition = property(operator.attrgetter('manager.Condition'))
-    Event = property(operator.attrgetter('manager.Event'))
-    Barrier = property(operator.attrgetter('manager.Barrier'))
-    Value = property(operator.attrgetter('manager.Value'))
-    Array = property(operator.attrgetter('manager.Array'))
-    list = property(operator.attrgetter('manager.list'))
-    dict = property(operator.attrgetter('manager.dict'))
-    Namespace = property(operator.attrgetter('manager.Namespace'))
-
-    @classmethod
-    def Pool(cls, *args, **kwds):
-        return cls.manager.Pool(*args, **kwds)
-
-    @classmethod
-    def setUpClass(cls):
-        cls.manager = multiprocessing.Manager()
-
-    @classmethod
-    def tearDownClass(cls):
-        # only the manager process should be returned by active_children()
-        # but this can take a bit on slow machines, so wait a few seconds
-        # if there are other children too (see #17395)
-        t = 0.01
-        while len(multiprocessing.active_children()) > 1 and t < 5:
-            time.sleep(t)
-            t *= 2
-        gc.collect()                       # do garbage collection
-        if cls.manager._number_of_objects() != 0:
-            # This is not really an error since some tests do not
-            # ensure that all processes which hold a reference to a
-            # managed object have been joined.
-            print('Shared objects which still exist at manager shutdown:')
-            print(cls.manager._debug_info())
-        cls.manager.shutdown()
-        cls.manager.join()
-        cls.manager = None
-
-testcases_manager = create_test_cases(ManagerMixin, type='manager')
-globals().update(testcases_manager)
-
-
-class ThreadsMixin(object):
-    TYPE = 'threads'
-    Process = multiprocessing.dummy.Process
-    connection = multiprocessing.dummy.connection
-    current_process = staticmethod(multiprocessing.dummy.current_process)
-    active_children = staticmethod(multiprocessing.dummy.active_children)
-    Pool = staticmethod(multiprocessing.Pool)
-    Pipe = staticmethod(multiprocessing.dummy.Pipe)
-    Queue = staticmethod(multiprocessing.dummy.Queue)
-    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
-    Lock = staticmethod(multiprocessing.dummy.Lock)
-    RLock = staticmethod(multiprocessing.dummy.RLock)
-    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
-    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
-    Condition = staticmethod(multiprocessing.dummy.Condition)
-    Event = staticmethod(multiprocessing.dummy.Event)
-    Barrier = staticmethod(multiprocessing.dummy.Barrier)
-    Value = staticmethod(multiprocessing.dummy.Value)
-    Array = staticmethod(multiprocessing.dummy.Array)
-
-testcases_threads = create_test_cases(ThreadsMixin, type='threads')
-globals().update(testcases_threads)
 
 
 class OtherTest(unittest.TestCase):
@@ -3427,7 +3314,7 @@
     def test_flags(self):
         import json, subprocess
         # start child process using unusual flags
-        prog = ('from test.test_multiprocessing import TestFlags; ' +
+        prog = ('from test._test_multiprocessing import TestFlags; ' +
                 'TestFlags.run_in_child()')
         data = subprocess.check_output(
             [sys.executable, '-E', '-S', '-O', '-c', prog])
@@ -3474,13 +3361,14 @@
 
 class TestNoForkBomb(unittest.TestCase):
     def test_noforkbomb(self):
+        sm = multiprocessing.get_start_method()
         name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
-        if WIN32:
-            rc, out, err = test.script_helper.assert_python_failure(name)
+        if sm != 'fork':
+            rc, out, err = test.script_helper.assert_python_failure(name, sm)
             self.assertEqual('', out.decode('ascii'))
             self.assertIn('RuntimeError', err.decode('ascii'))
         else:
-            rc, out, err = test.script_helper.assert_python_ok(name)
+            rc, out, err = test.script_helper.assert_python_ok(name, sm)
             self.assertEqual('123', out.decode('ascii').rstrip())
             self.assertEqual('', err.decode('ascii'))
 
@@ -3514,6 +3402,72 @@
         self.assertLessEqual(new_size, old_size)
 
 #
+# Check that non-forked child processes do not inherit unneeded fds/handles
+#
+
+class TestCloseFds(unittest.TestCase):
+
+    def get_high_socket_fd(self):
+        if WIN32:
+            # The child process will not have any socket handles, so
+            # calling socket.fromfd() should produce WSAENOTSOCK even
+            # if there is a handle of the same number.
+            return socket.socket().detach()
+        else:
+            # We want to produce a socket with an fd high enough that a
+            # freshly created child process will not have any fds as high.
+            fd = socket.socket().detach()
+            to_close = []
+            while fd < 50:
+                to_close.append(fd)
+                fd = os.dup(fd)
+            for x in to_close:
+                os.close(x)
+            return fd
+
+    def close(self, fd):
+        if WIN32:
+            socket.socket(fileno=fd).close()
+        else:
+            os.close(fd)
+
+    @classmethod
+    def _test_closefds(cls, conn, fd):
+        try:
+            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+        except Exception as e:
+            conn.send(e)
+        else:
+            s.close()
+            conn.send(None)
+
+    def test_closefd(self):
+        if not HAS_REDUCTION:
+            raise unittest.SkipTest('requires fd pickling')
+
+        reader, writer = multiprocessing.Pipe()
+        fd = self.get_high_socket_fd()
+        try:
+            p = multiprocessing.Process(target=self._test_closefds,
+                                        args=(writer, fd))
+            p.start()
+            writer.close()
+            e = reader.recv()
+            p.join(timeout=5)
+        finally:
+            self.close(fd)
+            writer.close()
+            reader.close()
+
+        if multiprocessing.get_start_method() == 'fork':
+            self.assertIs(e, None)
+        else:
+            WSAENOTSOCK = 10038
+            self.assertIsInstance(e, OSError)
+            self.assertTrue(e.errno == errno.EBADF or
+                            e.winerror == WSAENOTSOCK, e)
+
+#
 # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
 #
 
@@ -3557,10 +3511,10 @@
         def handler(signum, frame):
             pass
         signal.signal(signal.SIGUSR1, handler)
-        l = multiprocessing.connection.Listener()
-        conn.send(l.address)
-        a = l.accept()
-        a.send('welcome')
+        with multiprocessing.connection.Listener() as l:
+            conn.send(l.address)
+            a = l.accept()
+            a.send('welcome')
 
     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
     def test_ignore_listener(self):
@@ -3581,26 +3535,221 @@
         finally:
             conn.close()
 
-#
-#
-#
-
-def setUpModule():
-    if sys.platform.startswith("linux"):
+class TestStartMethod(unittest.TestCase):
+    def test_set_get(self):
+        multiprocessing.set_forkserver_preload(PRELOAD)
+        count = 0
+        old_method = multiprocessing.get_start_method()
         try:
-            lock = multiprocessing.RLock()
-        except OSError:
-            raise unittest.SkipTest("OSError raises on RLock creation, "
-                                    "see issue 3111!")
-    check_enough_semaphores()
-    util.get_temp_dir()     # creates temp directory for use by all processes
-    multiprocessing.get_logger().setLevel(LOG_LEVEL)
+            for method in ('fork', 'spawn', 'forkserver'):
+                try:
+                    multiprocessing.set_start_method(method)
+                except ValueError:
+                    continue
+                self.assertEqual(multiprocessing.get_start_method(), method)
+                count += 1
+        finally:
+            multiprocessing.set_start_method(old_method)
+        self.assertGreaterEqual(count, 1)
+
+    def test_get_all(self):
+        methods = multiprocessing.get_all_start_methods()
+        if sys.platform == 'win32':
+            self.assertEqual(methods, ['spawn'])
+        else:
+            self.assertTrue(methods == ['fork', 'spawn'] or
+                            methods == ['fork', 'spawn', 'forkserver'])
+
+#
+# Check that killing process does not leak named semaphores
+#
+
+@unittest.skipIf(sys.platform == "win32",
+                 "test semantics don't make sense on Windows")
+class TestSemaphoreTracker(unittest.TestCase):
+    def test_semaphore_tracker(self):
+        import subprocess
+        cmd = '''if 1:
+            import multiprocessing as mp, time, os
+            mp.set_start_method("spawn")
+            lock1 = mp.Lock()
+            lock2 = mp.Lock()
+            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
+            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
+            time.sleep(10)
+        '''
+        print("\nTestSemaphoreTracker will output warnings a bit like:\n"
+              "    ... There appear to be 2 leaked semaphores"
+                  " to clean up at shutdown\n"
+              "    ... '/mp-03jgqz': [Errno 2] No such file or directory",
+              file=sys.stderr)
+        r, w = os.pipe()
+        p = subprocess.Popen([sys.executable,
+                             #'-W', 'ignore:semaphore_tracker',
+                             '-c', cmd % (w, w)],
+                             pass_fds=[w])
+        os.close(w)
+        with open(r, 'rb', closefd=True) as f:
+            name1 = f.readline().rstrip().decode('ascii')
+            name2 = f.readline().rstrip().decode('ascii')
+        _multiprocessing.sem_unlink(name1)
+        p.terminate()
+        p.wait()
+        time.sleep(1.0)
+        with self.assertRaises(OSError) as ctx:
+            _multiprocessing.sem_unlink(name2)
+        # docs say it should be ENOENT, but OSX seems to give EINVAL
+        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
+
+#
+# Mixins
+#
+
+class ProcessesMixin(object):
+    TYPE = 'processes'
+    Process = multiprocessing.Process
+    connection = multiprocessing.connection
+    current_process = staticmethod(multiprocessing.current_process)
+    active_children = staticmethod(multiprocessing.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.Pipe)
+    Queue = staticmethod(multiprocessing.Queue)
+    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+    Lock = staticmethod(multiprocessing.Lock)
+    RLock = staticmethod(multiprocessing.RLock)
+    Semaphore = staticmethod(multiprocessing.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.Condition)
+    Event = staticmethod(multiprocessing.Event)
+    Barrier = staticmethod(multiprocessing.Barrier)
+    Value = staticmethod(multiprocessing.Value)
+    Array = staticmethod(multiprocessing.Array)
+    RawValue = staticmethod(multiprocessing.RawValue)
+    RawArray = staticmethod(multiprocessing.RawArray)
 
 
-def tearDownModule():
-    # pause a bit so we don't get warning about dangling threads/processes
-    time.sleep(0.5)
+class ManagerMixin(object):
+    TYPE = 'manager'
+    Process = multiprocessing.Process
+    Queue = property(operator.attrgetter('manager.Queue'))
+    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+    Lock = property(operator.attrgetter('manager.Lock'))
+    RLock = property(operator.attrgetter('manager.RLock'))
+    Semaphore = property(operator.attrgetter('manager.Semaphore'))
+    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+    Condition = property(operator.attrgetter('manager.Condition'))
+    Event = property(operator.attrgetter('manager.Event'))
+    Barrier = property(operator.attrgetter('manager.Barrier'))
+    Value = property(operator.attrgetter('manager.Value'))
+    Array = property(operator.attrgetter('manager.Array'))
+    list = property(operator.attrgetter('manager.list'))
+    dict = property(operator.attrgetter('manager.dict'))
+    Namespace = property(operator.attrgetter('manager.Namespace'))
+
+    @classmethod
+    def Pool(cls, *args, **kwds):
+        return cls.manager.Pool(*args, **kwds)
+
+    @classmethod
+    def setUpClass(cls):
+        cls.manager = multiprocessing.Manager()
+
+    @classmethod
+    def tearDownClass(cls):
+        # only the manager process should be returned by active_children()
+        # but this can take a bit on slow machines, so wait a few seconds
+        # if there are other children too (see #17395)
+        t = 0.01
+        while len(multiprocessing.active_children()) > 1 and t < 5:
+            time.sleep(t)
+            t *= 2
+        gc.collect()                       # do garbage collection
+        if cls.manager._number_of_objects() != 0:
+            # This is not really an error since some tests do not
+            # ensure that all processes which hold a reference to a
+            # managed object have been joined.
+            print('Shared objects which still exist at manager shutdown:')
+            print(cls.manager._debug_info())
+        cls.manager.shutdown()
+        cls.manager.join()
+        cls.manager = None
 
 
-if __name__ == '__main__':
-    unittest.main()
+class ThreadsMixin(object):
+    TYPE = 'threads'
+    Process = multiprocessing.dummy.Process
+    connection = multiprocessing.dummy.connection
+    current_process = staticmethod(multiprocessing.dummy.current_process)
+    active_children = staticmethod(multiprocessing.dummy.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.dummy.Pipe)
+    Queue = staticmethod(multiprocessing.dummy.Queue)
+    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+    Lock = staticmethod(multiprocessing.dummy.Lock)
+    RLock = staticmethod(multiprocessing.dummy.RLock)
+    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.dummy.Condition)
+    Event = staticmethod(multiprocessing.dummy.Event)
+    Barrier = staticmethod(multiprocessing.dummy.Barrier)
+    Value = staticmethod(multiprocessing.dummy.Value)
+    Array = staticmethod(multiprocessing.dummy.Array)
+
+#
+# Functions used to create test cases from the base ones in this module
+#
+
+def install_tests_in_module_dict(remote_globs, start_method):
+    __module__ = remote_globs['__name__']
+    local_globs = globals()
+    ALL_TYPES = {'processes', 'threads', 'manager'}
+
+    for name, base in local_globs.items():
+        if not isinstance(base, type):
+            continue
+        if issubclass(base, BaseTestCase):
+            if base is BaseTestCase:
+                continue
+            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
+            for type_ in base.ALLOWED_TYPES:
+                newname = 'With' + type_.capitalize() + name[1:]
+                Mixin = local_globs[type_.capitalize() + 'Mixin']
+                class Temp(base, Mixin, unittest.TestCase):
+                    pass
+                Temp.__name__ = Temp.__qualname__ = newname
+                Temp.__module__ = __module__
+                remote_globs[newname] = Temp
+        elif issubclass(base, unittest.TestCase):
+            class Temp(base, object):
+                pass
+            Temp.__name__ = Temp.__qualname__ = name
+            Temp.__module__ = __module__
+            remote_globs[name] = Temp
+
+    def setUpModule():
+        multiprocessing.set_forkserver_preload(PRELOAD)
+        remote_globs['old_start_method'] = multiprocessing.get_start_method()
+        try:
+            multiprocessing.set_start_method(start_method)
+        except ValueError:
+            raise unittest.SkipTest(start_method +
+                                    ' start method not supported')
+        print('Using start method %r' % multiprocessing.get_start_method())
+
+        if sys.platform.startswith("linux"):
+            try:
+                lock = multiprocessing.RLock()
+            except OSError:
+                raise unittest.SkipTest("OSError raises on RLock creation, "
+                                        "see issue 3111!")
+        check_enough_semaphores()
+        util.get_temp_dir()     # creates temp directory
+        multiprocessing.get_logger().setLevel(LOG_LEVEL)
+
+    def tearDownModule():
+        multiprocessing.set_start_method(remote_globs['old_start_method'])
+        # pause a bit so we don't get warning about dangling threads/processes
+        time.sleep(0.5)
+
+    remote_globs['setUpModule'] = setUpModule
+    remote_globs['tearDownModule'] = tearDownModule
diff --git a/Lib/test/mp_fork_bomb.py b/Lib/test/mp_fork_bomb.py
index 908afe3..017e010 100644
--- a/Lib/test/mp_fork_bomb.py
+++ b/Lib/test/mp_fork_bomb.py
@@ -7,6 +7,11 @@
 # correctly on Windows.  However, we should get a RuntimeError rather
 # than the Windows equivalent of a fork bomb.
 
+if len(sys.argv) > 1:
+    multiprocessing.set_start_method(sys.argv[1])
+else:
+    multiprocessing.set_start_method('spawn')
+
 p = multiprocessing.Process(target=foo)
 p.start()
 p.join()
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index c8bbcb2..b9945d7 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -149,7 +149,7 @@
 except ImportError:
     threading = None
 try:
-    import multiprocessing.process
+    import _multiprocessing, multiprocessing.process
 except ImportError:
     multiprocessing = None
 
diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py
new file mode 100644
index 0000000..2bf4e75
--- /dev/null
+++ b/Lib/test/test_multiprocessing_fork.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork')
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_multiprocessing_forkserver.py b/Lib/test/test_multiprocessing_forkserver.py
new file mode 100644
index 0000000..193a04a
--- /dev/null
+++ b/Lib/test/test_multiprocessing_forkserver.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver')
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_multiprocessing_spawn.py b/Lib/test/test_multiprocessing_spawn.py
new file mode 100644
index 0000000..334ae9e
--- /dev/null
+++ b/Lib/test/test_multiprocessing_spawn.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn')
+
+if __name__ == '__main__':
+    unittest.main()