Issue #14310: inter-process socket duplication for windows
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 17c5486..6da423a 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -26,6 +26,10 @@
     import fcntl
 except ImportError:
     fcntl = False
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = False
 
 HOST = support.HOST
 MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return
@@ -4643,6 +4647,106 @@
         socket.setdefaulttimeout(t)
 
 
+@unittest.skipUnless(os.name == "nt", "Windows specific")
+@unittest.skipUnless(multiprocessing, "need multiprocessing")
+class TestSocketSharing(SocketTCPTest):
+    # This must be classmethod and not staticmethod or multiprocessing
+    # won't be able to bootstrap it.
+    @classmethod
+    def remoteProcessServer(cls, q):
+        # Recreate socket from shared data
+        sdata = q.get()
+        message = q.get()
+
+        s = socket.fromshare(sdata)
+        s2, c = s.accept()
+
+        # Send the message
+        s2.sendall(message)
+        s2.close()
+        s.close()
+
+    def testShare(self):
+        # Transfer the listening server socket to another process
+        # and service it from there.
+
+        # Create process:
+        q = multiprocessing.Queue()
+        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
+        p.start()
+
+        # Get the shared socket data
+        data = self.serv.share(p.pid)
+
+        # Pass the shared socket to the other process
+        addr = self.serv.getsockname()
+        self.serv.close()
+        q.put(data)
+
+        # The data that the server will send us
+        message = b"slapmahfro"
+        q.put(message)
+
+        # Connect
+        s = socket.create_connection(addr)
+        #  listen for the data
+        m = []
+        while True:
+            data = s.recv(100)
+            if not data:
+                break
+            m.append(data)
+        s.close()
+        received = b"".join(m)
+        self.assertEqual(received, message)
+        p.join()
+
+    def testShareLength(self):
+        data = self.serv.share(os.getpid())
+        self.assertRaises(ValueError, socket.fromshare, data[:-1])
+        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
+
+    def compareSockets(self, org, other):
+        # socket sharing is expected to work only for blocking socket
+        # since the internal python timout value isn't transfered.
+        self.assertEqual(org.gettimeout(), None)
+        self.assertEqual(org.gettimeout(), other.gettimeout())
+
+        self.assertEqual(org.family, other.family)
+        self.assertEqual(org.type, other.type)
+        # If the user specified "0" for proto, then
+        # internally windows will have picked the correct value.
+        # Python introspection on the socket however will still return
+        # 0.  For the shared socket, the python value is recreated
+        # from the actual value, so it may not compare correctly.
+        if org.proto != 0:
+            self.assertEqual(org.proto, other.proto)
+
+    def testShareLocal(self):
+        data = self.serv.share(os.getpid())
+        s = socket.fromshare(data)
+        try:
+            self.compareSockets(self.serv, s)
+        finally:
+            s.close()
+
+    def testTypes(self):
+        families = [socket.AF_INET, socket.AF_INET6]
+        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
+        for f in families:
+            for t in types:
+                source = socket.socket(f, t)
+                try:
+                    data = source.share(os.getpid())
+                    shared = socket.fromshare(data)
+                    try:
+                        self.compareSockets(source, shared)
+                    finally:
+                        shared.close()
+                finally:
+                    source.close()
+
+
 def test_main():
     tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
              TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ]
@@ -4699,6 +4803,7 @@
         # These are slow when setitimer() is not available
         InterruptedRecvTimeoutTest,
         InterruptedSendTimeoutTest,
+        TestSocketSharing,
     ])
 
     thread_info = support.threading_setup()