Merged revisions 73708,73738 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r73708 | jesse.noller | 2009-06-30 13:11:52 -0400 (Tue, 30 Jun 2009) | 1 line
Resolves issues 5155, 5313, 5331 - bad file descriptor error with processes in processes
........
r73738 | r.david.murray | 2009-06-30 22:49:10 -0400 (Tue, 30 Jun 2009) | 2 lines
Make punctuation prettier and break up run-on sentence.
........
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index 41a53c6..a94571b 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -2087,6 +2087,38 @@
for i in range(10):
Process(target=f, args=(lock,)).start()
+Beware replacing sys.stdin with a "file like object"
+
+ :mod:`multiprocessing` originally unconditionally called::
+
+ os.close(sys.stdin.fileno())
+
+ in the :meth:`multiprocessing.Process._bootstrap` method --- this resulted
+ in issues with processes-in-processes. This has been changed to::
+
+ sys.stdin.close()
+ sys.stdin = open(os.devnull)
+
+ Which solves the fundamental issue of processes colliding with each other
+ resulting in a bad file descriptor error, but introduces a potential danger
+ to applications which replace :func:`sys.stdin` with a "file-like object"
+ with output buffering. This danger is that if multiple processes call
+ :func:`close()` on this file-like object, it could result in the same
+ data being flushed to the object multiple times, resulting in corruption.
+
+ If you write a file-like object and implement your own caching, you can
+ make it fork-safe by storing the pid whenever you append to the cache,
+ and discarding the cache when the pid changes. For example::
+
+ @property
+ def cache(self):
+ pid = os.getpid()
+ if pid != self._pid:
+ self._pid = pid
+ self._cache = []
+ return self._cache
+
+ For more information, see :issue:`5155`, :issue:`5313` and :issue:`5331`
Windows
~~~~~~~
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index b034317..0b04e36 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -220,7 +220,8 @@
self._children = set()
self._counter = itertools.count(1)
try:
- os.close(sys.stdin.fileno())
+ sys.stdin.close()
+ sys.stdin = open(os.devnull)
except (OSError, ValueError):
pass
_current_process = self
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 29323a5..58647ed 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -17,6 +17,7 @@
import socket
import random
import logging
+from StringIO import StringIO
# Work around broken sem_open implementations
@@ -1829,7 +1830,73 @@
multiprocessing.connection.answer_challenge,
_FakeConnection(), b'abc')
-testcases_other = [OtherTest, TestInvalidHandle]
+#
+# Issue 5155, 5313, 5331: Test process in processes
+# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
+#
+
+def _ThisSubProcess(q):
+ try:
+ item = q.get(block=False)
+ except Queue.Empty:
+ pass
+
+def _TestProcess(q):
+ queue = multiprocessing.Queue()
+ subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
+ subProc.start()
+ subProc.join()
+
+def _afunc(x):
+ return x*x
+
+def pool_in_process():
+ pool = multiprocessing.Pool(processes=4)
+ x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
+
+class _file_like(object):
+ def __init__(self, delegate):
+ self._delegate = delegate
+ self._pid = None
+
+ @property
+ def cache(self):
+ pid = os.getpid()
+ # There are no race conditions since fork keeps only the running thread
+ if pid != self._pid:
+ self._pid = pid
+ self._cache = []
+ return self._cache
+
+ def write(self, data):
+ self.cache.append(data)
+
+ def flush(self):
+ self._delegate.write(''.join(self.cache))
+ self._cache = []
+
+class TestStdinBadfiledescriptor(unittest.TestCase):
+
+ def test_queue_in_process(self):
+ queue = multiprocessing.Queue()
+ proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
+ proc.start()
+ proc.join()
+
+ def test_pool_in_process(self):
+ p = multiprocessing.Process(target=pool_in_process)
+ p.start()
+ p.join()
+
+ def test_flushing(self):
+ sio = StringIO()
+ flike = _file_like(sio)
+ flike.write('foo')
+ proc = multiprocessing.Process(target=lambda: flike.flush())
+ flike.flush()
+ assert sio.getvalue() == 'foo'
+
+testcases_other = [OtherTest, TestInvalidHandle, TestStdinBadfiledescriptor]
#
#
diff --git a/Misc/ACKS b/Misc/ACKS
index b35cc47..b4b07cf 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -45,6 +45,7 @@
Ulf Bartelt
Nick Bastin
Jeff Bauer
+Mike Bayer
Michael R Bax
Anthony Baxter
Samuel L. Bayer
@@ -178,6 +179,7 @@
Dean Draayer
John DuBois
Paul Dubois
+Graham Dumpleton
Quinn Dunkan
Robin Dunn
Luke Dunstan
@@ -536,6 +538,7 @@
Santiago Peresón
Mark Perrego
Trevor Perrin
+Gabriel de Perthuis
Tim Peters
Benjamin Peterson
Chris Petrilli
diff --git a/Misc/NEWS b/Misc/NEWS
index 078c30d..e0cb38d 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -65,6 +65,10 @@
Library
-------
+- Issues #5155, 5313, 5331: multiprocessing.Process._bootstrap was
+ unconditionally calling "os.close(sys.stdin.fileno())" resulting in file
+ descriptor errors
+
- Issue #6415: Fixed warnings.warn sagfault on bad formatted string.
- Issue #6344: Fixed a crash of mmap.read() when passed a negative argument.