Issue #15646: Prevent equivalent of a fork bomb when using multiprocessing
on Windows without the "if __name__ == '__main__'" idiom.
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 3fca8b1..55980ff 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -336,7 +336,7 @@
'''
Returns prefix of command line used for spawning a child process
'''
- if process.current_process()._identity==() and is_forking(sys.argv):
+ if getattr(process.current_process(), '_inheriting', False):
raise RuntimeError('''
Attempt to start a new process before the current process
has finished its bootstrapping phase.
diff --git a/Lib/test/mp_fork_bomb.py b/Lib/test/mp_fork_bomb.py
new file mode 100644
index 0000000..72cea25
--- /dev/null
+++ b/Lib/test/mp_fork_bomb.py
@@ -0,0 +1,16 @@
+import multiprocessing
+
+def foo(conn):
+ conn.send("123")
+
+# Because "if __name__ == '__main__'" is missing this will not work
+# correctly on Windows. However, we should get a RuntimeError rather
+# than the Windows equivalent of a fork bomb.
+
+r, w = multiprocessing.Pipe(False)
+p = multiprocessing.Process(target=foo, args=(w,))
+p.start()
+w.close()
+print(r.recv())
+r.close()
+p.join()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index bc2c048..1587057 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -16,6 +16,7 @@
import random
import logging
import errno
+import test.script_helper
from test import test_support
from StringIO import StringIO
_multiprocessing = test_support.import_module('_multiprocessing')
@@ -2349,8 +2350,28 @@
finally:
socket.setdefaulttimeout(old_timeout)
+#
+# Test what happens with no "if __name__ == '__main__'"
+#
+
+class TestNoForkBomb(unittest.TestCase):
+ def test_noforkbomb(self):
+ name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
+ if WIN32:
+ rc, out, err = test.script_helper.assert_python_failure(name)
+ self.assertEqual('', out.decode('ascii'))
+ self.assertIn('RuntimeError', err.decode('ascii'))
+ else:
+ rc, out, err = test.script_helper.assert_python_ok(name)
+ self.assertEqual('123', out.decode('ascii').rstrip())
+ self.assertEqual('', err.decode('ascii'))
+
+#
+#
+#
+
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
- TestStdinBadfiledescriptor, TestTimeouts]
+ TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb]
#
#