bpo-40094: Add os.waitstatus_to_exitcode() (GH-19201)
Add os.waitstatus_to_exitcode() function to convert a wait status to an
exitcode.
Suggest waitstatus_to_exitcode() usage in the documentation when
appropriate.
Use waitstatus_to_exitcode() in:
* multiprocessing, os, subprocess and _bootsubprocess modules;
* test.support.wait_process();
* setup.py: run_command();
* and many tests.
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 7272d47..1f792d8 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -3442,18 +3442,11 @@
sleep = min(sleep * 2, max_sleep)
time.sleep(sleep)
-
- if os.WIFEXITED(status):
- exitcode2 = os.WEXITSTATUS(status)
- elif os.WIFSIGNALED(status):
- exitcode2 = -os.WTERMSIG(status)
- else:
- raise ValueError(f"invalid wait status: {status!r}")
else:
# Windows implementation
pid2, status = os.waitpid(pid, 0)
- exitcode2 = (status >> 8)
+ exitcode2 = os.waitstatus_to_exitcode(status)
if exitcode2 != exitcode:
raise AssertionError(f"process {pid} exited with code {exitcode2}, "
f"but exit code {exitcode} is expected")
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index be85616..142cfea 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -2794,6 +2794,35 @@
pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
support.wait_process(pid, exitcode=0)
+ def test_waitstatus_to_exitcode(self):
+ exitcode = 23
+ filename = support.TESTFN
+ self.addCleanup(support.unlink, filename)
+
+ with open(filename, "w") as fp:
+ print(f'import sys; sys.exit({exitcode})', file=fp)
+ fp.flush()
+ args = [sys.executable, filename]
+ pid = os.spawnv(os.P_NOWAIT, args[0], args)
+
+ pid2, status = os.waitpid(pid, 0)
+ self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
+ self.assertEqual(pid2, pid)
+
+ # Skip the test on Windows
+ @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
+ def test_waitstatus_to_exitcode_kill(self):
+ signum = signal.SIGKILL
+ args = [sys.executable, '-c',
+ f'import time; time.sleep({support.LONG_TIMEOUT})']
+ pid = os.spawnv(os.P_NOWAIT, args[0], args)
+
+ os.kill(pid, signum)
+
+ pid2, status = os.waitpid(pid, 0)
+ self.assertEqual(os.waitstatus_to_exitcode(status), -signum)
+ self.assertEqual(pid2, pid)
+
class SpawnTests(unittest.TestCase):
def create_args(self, *, with_env=False, use_bytes=False):
diff --git a/Lib/test/test_popen.py b/Lib/test/test_popen.py
index da01a87..ab1bc77 100644
--- a/Lib/test/test_popen.py
+++ b/Lib/test/test_popen.py
@@ -44,10 +44,11 @@
def test_return_code(self):
self.assertEqual(os.popen("exit 0").close(), None)
+ status = os.popen("exit 42").close()
if os.name == 'nt':
- self.assertEqual(os.popen("exit 42").close(), 42)
+ self.assertEqual(status, 42)
else:
- self.assertEqual(os.popen("exit 42").close(), 42 << 8)
+ self.assertEqual(os.waitstatus_to_exitcode(status), 42)
def test_contextmanager(self):
with os.popen("echo hello") as f:
diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py
index ce85f57..aa5c687 100644
--- a/Lib/test/test_pty.py
+++ b/Lib/test/test_pty.py
@@ -200,8 +200,8 @@
## raise TestFailed("Unexpected output from child: %r" % line)
(pid, status) = os.waitpid(pid, 0)
- res = status >> 8
- debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
+ res = os.waitstatus_to_exitcode(status)
+ debug("Child (%d) exited with code %d (status %d)." % (pid, res, status))
if res == 1:
self.fail("Child raised an unexpected exception in os.setsid()")
elif res == 2:
diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py
index 6e06049..aa166ba 100644
--- a/Lib/test/test_wait3.py
+++ b/Lib/test/test_wait3.py
@@ -30,8 +30,7 @@
time.sleep(0.1)
self.assertEqual(spid, cpid)
- self.assertEqual(status, exitcode << 8,
- "cause = %d, exit = %d" % (status&0xff, status>>8))
+ self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
self.assertTrue(rusage)
def test_wait3_rusage_initialized(self):
diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py
index 6c7ebcb..f8d5e13 100644
--- a/Lib/test/test_wait4.py
+++ b/Lib/test/test_wait4.py
@@ -29,8 +29,7 @@
break
time.sleep(0.1)
self.assertEqual(spid, cpid)
- self.assertEqual(status, exitcode << 8,
- "cause = %d, exit = %d" % (status&0xff, status>>8))
+ self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
self.assertTrue(rusage)
def tearDownModule():