blob: 962301ae1499eb5fbfc6e895c087129210ef29da [file] [log] [blame]
Victor Stinneraddaaaa2020-03-09 23:45:59 +01001"""
2Basic subprocess implementation for POSIX which only uses os functions. Only
3implement features required by setup.py to build C extension modules when
4subprocess is unavailable. setup.py is not used on Windows.
5"""
6import os
7
8
9# distutils.spawn used by distutils.command.build_ext
10# calls subprocess.Popen().wait()
11class Popen:
12 def __init__(self, cmd, env=None):
13 self._cmd = cmd
14 self._env = env
15 self.returncode = None
16
17 def wait(self):
18 pid = os.fork()
19 if pid == 0:
20 # Child process
21 try:
22 if self._env is not None:
23 os.execve(self._cmd[0], self._cmd, self._env)
24 else:
25 os.execv(self._cmd[0], self._cmd)
26 finally:
27 os._exit(1)
28 else:
29 # Parent process
30 pid, status = os.waitpid(pid, 0)
31 if os.WIFSIGNALED(status):
32 self.returncode = -os.WTERMSIG(status)
33 elif os.WIFEXITED(status):
34 self.returncode = os.WEXITSTATUS(status)
35 elif os.WIFSTOPPED(status):
36 self.returncode = -os.WSTOPSIG(status)
37 else:
38 raise Exception(f"unknown child process exit status: {status!r}")
39
40 return self.returncode
41
42
43def _check_cmd(cmd):
44 # Use regex [a-zA-Z0-9./-]+: reject empty string, space, etc.
45 safe_chars = []
46 for first, last in (("a", "z"), ("A", "Z"), ("0", "9")):
47 for ch in range(ord(first), ord(last) + 1):
48 safe_chars.append(chr(ch))
49 safe_chars.append("./-")
50 safe_chars = ''.join(safe_chars)
51
52 if isinstance(cmd, (tuple, list)):
53 check_strs = cmd
54 elif isinstance(cmd, str):
55 check_strs = [cmd]
56 else:
57 return False
58
59 for arg in check_strs:
60 if not isinstance(arg, str):
61 return False
62 if not arg:
63 # reject empty string
64 return False
65 for ch in arg:
66 if ch not in safe_chars:
67 return False
68
69 return True
70
71
72# _aix_support used by distutil.util calls subprocess.check_output()
73def check_output(cmd, **kwargs):
74 if kwargs:
75 raise NotImplementedError(repr(kwargs))
76
77 if not _check_cmd(cmd):
78 raise ValueError(f"unsupported command: {cmd!r}")
79
80 tmp_filename = "check_output.tmp"
81 if not isinstance(cmd, str):
82 cmd = " ".join(cmd)
83 cmd = f"{cmd} >{tmp_filename}"
84
85 try:
86 # system() spawns a shell
87 status = os.system(cmd)
88 if status:
89 raise ValueError(f"Command {cmd!r} failed with status {status!r}")
90
91 try:
92 with open(tmp_filename, "rb") as fp:
93 stdout = fp.read()
94 except FileNotFoundError:
95 stdout = b''
96 finally:
97 try:
98 os.unlink(tmp_filename)
99 except OSError:
100 pass
101
102 return stdout