blob: 348da574bec3a14ee12402996bae4937553f7d2a [file] [log] [blame]
Fredrik Lundh5b3687d2004-10-12 15:26:28 +00001import unittest
2from test import test_support
3import subprocess
4import sys
5import signal
6import os
7import tempfile
8import time
9
10mswindows = (sys.platform == "win32")
11
12#
13# Depends on the following external programs: Python
14#
15
16if mswindows:
17 SETBINARY = 'import msvcrt; msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY);'
18else:
19 SETBINARY = ''
20
21class ProcessTestCase(unittest.TestCase):
22 def mkstemp(self):
23 """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
24 if hasattr(tempfile, "mkstemp"):
25 return tempfile.mkstemp()
26 else:
27 fname = tempfile.mktemp()
28 return os.open(fname, os.O_RDWR|os.O_CREAT), fname
29
30 #
31 # Generic tests
32 #
33 def test_call_seq(self):
34 """call() function with sequence argument"""
35 rc = subprocess.call([sys.executable, "-c", "import sys; sys.exit(47)"])
36 self.assertEqual(rc, 47)
37
38 def test_call_kwargs(self):
39 """call() function with keyword args"""
40 newenv = os.environ.copy()
41 newenv["FRUIT"] = "banana"
42 rc = subprocess.call([sys.executable, "-c",
43 'import sys, os;' \
44 'sys.exit(os.getenv("FRUIT")=="banana")'],
45 env=newenv)
46 self.assertEqual(rc, 1)
47
48 def test_stdin_none(self):
49 """.stdin is None when not redirected"""
50 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
51 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
52 p.wait()
53 self.assertEqual(p.stdin, None)
54
55 def test_stdout_none(self):
56 """.stdout is None when not redirected"""
57 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
58 stdin=subprocess.PIPE, stderr=subprocess.PIPE)
59 p.wait()
60 self.assertEqual(p.stdout, None)
61
62 def test_stderr_none(self):
63 """.stderr is None when not redirected"""
64 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
65 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
66 p.wait()
67 self.assertEqual(p.stderr, None)
68
69 def test_executable(self):
70 """executable"""
71 p = subprocess.Popen(["somethingyoudonthave", "-c", "import sys; sys.exit(47)"],
72 executable=sys.executable)
73 p.wait()
74 self.assertEqual(p.returncode, 47)
75
76 def test_stdin_pipe(self):
77 """stdin redirection"""
78 p = subprocess.Popen([sys.executable, "-c",
79 'import sys; sys.exit(sys.stdin.read() == "pear")'],
80 stdin=subprocess.PIPE)
81 p.stdin.write("pear")
82 p.stdin.close()
83 p.wait()
84 self.assertEqual(p.returncode, 1)
85
86 def test_stdin_filedes(self):
87 """stdin is set to open file descriptor"""
88 tf = tempfile.TemporaryFile()
89 d = tf.fileno()
90 os.write(d, "pear")
91 os.lseek(d, 0, 0)
92 p = subprocess.Popen([sys.executable, "-c",
93 'import sys; sys.exit(sys.stdin.read() == "pear")'],
94 stdin=d)
95 p.wait()
96 self.assertEqual(p.returncode, 1)
97
98 def test_stdin_fileobj(self):
99 """stdin is set to open file object"""
100 tf = tempfile.TemporaryFile()
101 tf.write("pear")
102 tf.seek(0)
103 p = subprocess.Popen([sys.executable, "-c",
104 'import sys; sys.exit(sys.stdin.read() == "pear")'],
105 stdin=tf)
106 p.wait()
107 self.assertEqual(p.returncode, 1)
108
109 def test_stdout_pipe(self):
110 """stdout redirection"""
111 p = subprocess.Popen([sys.executable, "-c",
112 'import sys; sys.stdout.write("orange")'],
113 stdout=subprocess.PIPE)
114 self.assertEqual(p.stdout.read(), "orange")
115
116 def test_stdout_filedes(self):
117 """stdout is set to open file descriptor"""
118 tf = tempfile.TemporaryFile()
119 d = tf.fileno()
120 p = subprocess.Popen([sys.executable, "-c",
121 'import sys; sys.stdout.write("orange")'],
122 stdout=d)
123 p.wait()
124 os.lseek(d, 0, 0)
125 self.assertEqual(os.read(d, 1024), "orange")
126
127 def test_stdout_fileobj(self):
128 """stdout is set to open file object"""
129 tf = tempfile.TemporaryFile()
130 p = subprocess.Popen([sys.executable, "-c",
131 'import sys; sys.stdout.write("orange")'],
132 stdout=tf)
133 p.wait()
134 tf.seek(0)
135 self.assertEqual(tf.read(), "orange")
136
137 def test_stderr_pipe(self):
138 """stderr redirection"""
139 p = subprocess.Popen([sys.executable, "-c",
140 'import sys; sys.stderr.write("strawberry")'],
141 stderr=subprocess.PIPE)
142 self.assertEqual(p.stderr.read(), "strawberry")
143
144 def test_stderr_filedes(self):
145 """stderr is set to open file descriptor"""
146 tf = tempfile.TemporaryFile()
147 d = tf.fileno()
148 p = subprocess.Popen([sys.executable, "-c",
149 'import sys; sys.stderr.write("strawberry")'],
150 stderr=d)
151 p.wait()
152 os.lseek(d, 0, 0)
153 self.assertEqual(os.read(d, 1024), "strawberry")
154
155 def test_stderr_fileobj(self):
156 """stderr is set to open file object"""
157 tf = tempfile.TemporaryFile()
158 p = subprocess.Popen([sys.executable, "-c",
159 'import sys; sys.stderr.write("strawberry")'],
160 stderr=tf)
161 p.wait()
162 tf.seek(0)
163 self.assertEqual(tf.read(), "strawberry")
164
165 def test_stdout_stderr_pipe(self):
166 """capture stdout and stderr to the same pipe"""
167 p = subprocess.Popen([sys.executable, "-c",
168 'import sys;' \
169 'sys.stdout.write("apple");' \
170 'sys.stdout.flush();' \
171 'sys.stderr.write("orange")'],
172 stdout=subprocess.PIPE,
173 stderr=subprocess.STDOUT)
174 self.assertEqual(p.stdout.read(), "appleorange")
175
176 def test_stdout_stderr_file(self):
177 """capture stdout and stderr to the same open file"""
178 tf = tempfile.TemporaryFile()
179 p = subprocess.Popen([sys.executable, "-c",
180 'import sys;' \
181 'sys.stdout.write("apple");' \
182 'sys.stdout.flush();' \
183 'sys.stderr.write("orange")'],
184 stdout=tf,
185 stderr=tf)
186 p.wait()
187 tf.seek(0)
188 self.assertEqual(tf.read(), "appleorange")
189
190 def test_cwd(self):
191 """cwd"""
192 tmpdir = os.getenv("TEMP", "/tmp")
193 tmpdir = os.path.realpath(tmpdir)
194 p = subprocess.Popen([sys.executable, "-c",
195 'import sys,os;' \
196 'sys.stdout.write(os.getcwd())'],
197 stdout=subprocess.PIPE,
198 cwd=tmpdir)
199 self.assertEqual(p.stdout.read(), tmpdir)
200
201 def test_env(self):
202 """env"""
203 newenv = os.environ.copy()
204 newenv["FRUIT"] = "orange"
205 p = subprocess.Popen([sys.executable, "-c",
206 'import sys,os;' \
207 'sys.stdout.write(os.getenv("FRUIT"))'],
208 stdout=subprocess.PIPE,
209 env=newenv)
210 self.assertEqual(p.stdout.read(), "orange")
211
212 def test_communicate(self):
213 """communicate()"""
214 p = subprocess.Popen([sys.executable, "-c",
215 'import sys,os;' \
216 'sys.stderr.write("pineapple");' \
217 'sys.stdout.write(sys.stdin.read())'],
218 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
219 (stdout, stderr) = p.communicate("banana")
220 self.assertEqual(stdout, "banana")
221 self.assertEqual(stderr, "pineapple")
222
223 def test_communicate_returns(self):
224 """communicate() should return None if no redirection is active"""
225 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(47)"])
226 (stdout, stderr) = p.communicate()
227 self.assertEqual(stdout, None)
228 self.assertEqual(stderr, None)
229
230 def test_communicate_pipe_buf(self):
231 """communicate() with writes larger than pipe_buf"""
232 # This test will probably deadlock rather than fail, if
233 # communicate() does not work properly.
234 x, y = os.pipe()
235 if mswindows:
236 pipe_buf = 512
237 else:
238 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
239 os.close(x)
240 os.close(y)
241 p = subprocess.Popen([sys.executable, "-c",
242 'import sys,os;'
243 'sys.stdout.write(sys.stdin.read(47));' \
244 'sys.stderr.write("xyz"*%d);' \
245 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
246 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
247 string_to_write = "abc"*pipe_buf
248 (stdout, stderr) = p.communicate(string_to_write)
249 self.assertEqual(stdout, string_to_write)
250
251 def test_writes_before_communicate(self):
252 """stdin.write before communicate()"""
253 p = subprocess.Popen([sys.executable, "-c",
254 'import sys,os;' \
255 'sys.stdout.write(sys.stdin.read())'],
256 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
257 p.stdin.write("banana")
258 (stdout, stderr) = p.communicate("split")
259 self.assertEqual(stdout, "bananasplit")
260 self.assertEqual(stderr, "")
261
262 def test_universal_newlines(self):
263 """universal newlines"""
264 p = subprocess.Popen([sys.executable, "-c",
265 'import sys,os;' + SETBINARY + \
266 'sys.stdout.write("line1\\n");' \
267 'sys.stdout.flush();' \
268 'sys.stdout.write("line2\\r");' \
269 'sys.stdout.flush();' \
270 'sys.stdout.write("line3\\r\\n");' \
271 'sys.stdout.flush();' \
272 'sys.stdout.write("line4\\r");' \
273 'sys.stdout.flush();' \
274 'sys.stdout.write("\\nline5");'
275 'sys.stdout.flush();' \
276 'sys.stdout.write("\\nline6");'],
277 stdout=subprocess.PIPE,
278 universal_newlines=1)
279 stdout = p.stdout.read()
280 if hasattr(open, 'newlines'):
281 # Interpreter with universal newline support
282 self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
283 else:
284 # Interpreter without universal newline support
285 self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
286
287 def test_universal_newlines_communicate(self):
288 """universal newlines through communicate()"""
289 p = subprocess.Popen([sys.executable, "-c",
290 'import sys,os;' + SETBINARY + \
291 'sys.stdout.write("line1\\n");' \
292 'sys.stdout.flush();' \
293 'sys.stdout.write("line2\\r");' \
294 'sys.stdout.flush();' \
295 'sys.stdout.write("line3\\r\\n");' \
296 'sys.stdout.flush();' \
297 'sys.stdout.write("line4\\r");' \
298 'sys.stdout.flush();' \
299 'sys.stdout.write("\\nline5");'
300 'sys.stdout.flush();' \
301 'sys.stdout.write("\\nline6");'],
302 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
303 universal_newlines=1)
304 (stdout, stderr) = p.communicate()
305 if hasattr(open, 'newlines'):
306 # Interpreter with universal newline support
307 self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
308 else:
309 # Interpreter without universal newline support
310 self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
311
312 def test_no_leaking(self):
313 """Make sure we leak no resources"""
314 for i in range(1026):
315 p = subprocess.Popen([sys.executable, "-c", "import sys;sys.stdout.write(sys.stdin.read())"],
316 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
317 data = p.communicate("lime")[0]
318 self.assertEqual(data, "lime")
319
320
321 def test_list2cmdline(self):
322 """list2cmdline"""
323
324 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
325 '"a b c" d e')
326 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
327 'ab\\"c \\ d')
328 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
329 'a\\\\\\b "de fg" h')
330 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
331 'a\\\\\\"b c d')
332 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
333 '"a\\\\b c" d e')
334 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
335 '"a\\\\b\\ c" d e')
336
337
338 def test_poll(self):
339 """poll"""
340 p = subprocess.Popen([sys.executable,
341 "-c", "import time; time.sleep(4)"])
342 while p.poll() == None:
343 sys.stdout.write(".")
344 sys.stdout.flush()
345 time.sleep(0.5)
346 # Subsequent invocations should just return the returncode
347 self.assertEqual(p.poll(), 0)
348
349
350 def test_wait(self):
351 """wait"""
352 p = subprocess.Popen([sys.executable,
353 "-c", "import time; time.sleep(2)"])
354 self.assertEqual(p.wait(), 0)
355 # Subsequent invocations should just return the returncode
356 self.assertEqual(p.wait(), 0)
357
358 #
359 # POSIX tests
360 #
361 if not mswindows:
362 def test_exceptions(self):
363 """catched & re-raised exceptions"""
364 try:
365 p = subprocess.Popen([sys.executable, "-c", ""],
366 cwd="/this/path/does/not/exist")
367 except OSError, e:
368 # The attribute child_traceback should contain "os.chdir"
369 # somewhere.
370 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
371 else:
372 self.fail("Expected OSError")
373
374 def test_run_abort(self):
375 """returncode handles signal termination"""
376 p = subprocess.Popen([sys.executable, "-c", "import os; os.abort()"])
377 p.wait()
378 self.assertEqual(-p.returncode, signal.SIGABRT)
379
380 def test_preexec(self):
381 """preexec function"""
382 p = subprocess.Popen([sys.executable, "-c",
383 'import sys,os;' \
384 'sys.stdout.write(os.getenv("FRUIT"))'],
385 stdout=subprocess.PIPE,
386 preexec_fn=lambda: os.putenv("FRUIT", "apple"))
387 self.assertEqual(p.stdout.read(), "apple")
388
389 def test_close_fds(self):
390 """close_fds"""
391 # Make sure we have some fds open
392 os.pipe()
393 p = subprocess.Popen([sys.executable, "-c",
394 'import sys,os;' \
395 'sys.stdout.write(str(os.dup(0)))'],
396 stdout=subprocess.PIPE, close_fds=1)
397 # When all fds are closed, the next free fd should be 3.
398 self.assertEqual(p.stdout.read(), "3")
399
400 def test_args_string(self):
401 """args is a string"""
402 f, fname = self.mkstemp()
403 os.write(f, "#!/bin/sh\n")
404 os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
405 os.close(f)
406 os.chmod(fname, 0700)
407 p = subprocess.Popen(fname)
408 p.wait()
409 self.assertEqual(p.returncode, 47)
410 os.remove(fname)
411
412 def test_invalid_args(self):
413 """invalid arguments should raise ValueError"""
414 self.assertRaises(ValueError, subprocess.call,
415 [sys.executable, "-c", "import sys; sys.exit(47)"],
416 startupinfo=47)
417 self.assertRaises(ValueError, subprocess.call,
418 [sys.executable, "-c", "import sys; sys.exit(47)"],
419 creationflags=47)
420
421 def test_shell_sequence(self):
422 """Run command through the shell (sequence)"""
423 newenv = os.environ.copy()
424 newenv["FRUIT"] = "apple"
425 p = subprocess.Popen(["echo $FRUIT"], shell=1,
426 stdout=subprocess.PIPE,
427 env=newenv)
428 self.assertEqual(p.stdout.read().strip(), "apple")
429
430 def test_shell_string(self):
431 """Run command through the shell (string)"""
432 newenv = os.environ.copy()
433 newenv["FRUIT"] = "apple"
434 p = subprocess.Popen("echo $FRUIT", shell=1,
435 stdout=subprocess.PIPE,
436 env=newenv)
437 self.assertEqual(p.stdout.read().strip(), "apple")
438
439 def test_call_string(self):
440 """call() function with string argument on UNIX"""
441 f, fname = self.mkstemp()
442 os.write(f, "#!/bin/sh\n")
443 os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
444 os.close(f)
445 os.chmod(fname, 0700)
446 rc = subprocess.call(fname)
447 self.assertEqual(rc, 47)
448
449
450 #
451 # Windows tests
452 #
453 if mswindows:
454 def test_startupinfo(self):
455 """startupinfo argument"""
456 # We uses hardcoded constants, because we do not want to
457 # depend on win32all.
458 STARTF_USESHOWWINDOW = 1
459 SW_MAXIMIZE = 3
460 startupinfo = subprocess.STARTUPINFO()
461 startupinfo.dwFlags = STARTF_USESHOWWINDOW
462 startupinfo.wShowWindow = SW_MAXIMIZE
463 # Since Python is a console process, it won't be affected
464 # by wShowWindow, but the argument should be silently
465 # ignored
466 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
467 startupinfo=startupinfo)
468
469 def test_creationflags(self):
470 """creationflags argument"""
471 CREATE_NEW_CONSOLE = 16
472 subprocess.call(sys.executable + ' -c "import time; time.sleep(2)"',
473 creationflags=CREATE_NEW_CONSOLE)
474
475 def test_invalid_args(self):
476 """invalid arguments should raise ValueError"""
477 self.assertRaises(ValueError, subprocess.call,
478 [sys.executable, "-c", "import sys; sys.exit(47)"],
479 preexec_fn=lambda: 1)
480 self.assertRaises(ValueError, subprocess.call,
481 [sys.executable, "-c", "import sys; sys.exit(47)"],
482 close_fds=True)
483
484 def test_shell_sequence(self):
485 """Run command through the shell (sequence)"""
486 newenv = os.environ.copy()
487 newenv["FRUIT"] = "physalis"
488 p = subprocess.Popen(["set"], shell=1,
489 stdout=subprocess.PIPE,
490 env=newenv)
491 self.assertNotEqual(p.stdout.read().find("physalis"), -1)
492
493 def test_shell_string(self):
494 """Run command through the shell (string)"""
495 newenv = os.environ.copy()
496 newenv["FRUIT"] = "physalis"
497 p = subprocess.Popen("set", shell=1,
498 stdout=subprocess.PIPE,
499 env=newenv)
500 self.assertNotEqual(p.stdout.read().find("physalis"), -1)
501
502 def test_call_string(self):
503 """call() function with string argument on Windows"""
504 rc = subprocess.call(sys.executable + ' -c "import sys; sys.exit(47)"')
505 self.assertEqual(rc, 47)
506
507
508
509def test_main():
510 test_support.run_unittest(ProcessTestCase)
511
512if __name__ == "__main__":
513 test_main()
514