blob: 7ca0557800b6d8d8c0065dfc1c4a81ed97fabce7 [file] [log] [blame]
Hai Shia089d212020-07-06 17:15:08 +08001from test.support import verbose, reap_children
2from test.support.import_helper import import_module
R. David Murrayeb3615d2009-04-22 02:24:39 +00003
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004# Skip these tests if termios is not available
R. David Murrayeb3615d2009-04-22 02:24:39 +00005import_module('termios')
6
Guido van Rossum360e4b82007-05-14 22:51:27 +00007import errno
R. David Murrayeb3615d2009-04-22 02:24:39 +00008import pty
Guido van Rossumd8faa362007-04-27 19:54:29 +00009import os
10import sys
Gregory P. Smith05f59532012-02-16 00:29:12 -080011import select
Guido van Rossumd8faa362007-04-27 19:54:29 +000012import signal
Gregory P. Smith05f59532012-02-16 00:29:12 -080013import socket
Cornelius Diekmanne6f62f62017-10-02 11:39:55 +020014import io # readline
Guido van Rossumd8faa362007-04-27 19:54:29 +000015import unittest
Fred Drake4c136ee2000-06-30 23:22:35 +000016
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000017TEST_STRING_1 = b"I wish to buy a fish license.\n"
18TEST_STRING_2 = b"For my pet fish, Eric.\n"
Fred Drake4c136ee2000-06-30 23:22:35 +000019
20if verbose:
21 def debug(msg):
Guido van Rossumbe19ed72007-02-09 05:37:30 +000022 print(msg)
Fred Drake4c136ee2000-06-30 23:22:35 +000023else:
24 def debug(msg):
25 pass
26
Guido van Rossumd8faa362007-04-27 19:54:29 +000027
Cornelius Diekmanne6f62f62017-10-02 11:39:55 +020028# Note that os.read() is nondeterministic so we need to be very careful
29# to make the test suite deterministic. A normal call to os.read() may
30# give us less than expected.
31#
32# Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get
33# back 'foo\r\n' at the other end. The behavior depends on the termios
34# setting. The newline translation may be OS-specific. To make the
35# test suite deterministic and OS-independent, the functions _readline
36# and normalize_output can be used.
37
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000038def normalize_output(data):
Benjamin Peterson06930632017-09-04 16:36:05 -070039 # Some operating systems do conversions on newline. We could possibly fix
40 # that by doing the appropriate termios.tcsetattr()s. I couldn't figure out
41 # the right combo on Tru64. So, just normalize the output and doc the
42 # problem O/Ses by allowing certain combinations for some platforms, but
43 # avoid allowing other differences (like extra whitespace, trailing garbage,
44 # etc.)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000045
46 # This is about the best we can do without getting some feedback
47 # from someone more knowledgable.
48
49 # OSF/1 (Tru64) apparently turns \n into \r\r\n.
Walter Dörwald812d8342007-05-29 18:57:42 +000050 if data.endswith(b'\r\r\n'):
51 return data.replace(b'\r\r\n', b'\n')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000052
Walter Dörwald812d8342007-05-29 18:57:42 +000053 if data.endswith(b'\r\n'):
54 return data.replace(b'\r\n', b'\n')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000055
56 return data
57
Cornelius Diekmanne6f62f62017-10-02 11:39:55 +020058def _readline(fd):
59 """Read one line. May block forever if no newline is read."""
60 reader = io.FileIO(fd, mode='rb', closefd=False)
61 return reader.readline()
62
63
Guido van Rossumd8faa362007-04-27 19:54:29 +000064
Fred Drake4c136ee2000-06-30 23:22:35 +000065# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
66# because pty code is not too portable.
Guido van Rossum360e4b82007-05-14 22:51:27 +000067# XXX(nnorwitz): these tests leak fds when there is an error.
Guido van Rossumd8faa362007-04-27 19:54:29 +000068class PtyTest(unittest.TestCase):
69 def setUp(self):
Victor Stinner9abee722017-09-19 09:36:54 -070070 old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
71 self.addCleanup(signal.signal, signal.SIGALRM, old_alarm)
Victor Stinnera1838ec2019-12-09 11:57:05 +010072
73 old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup)
Victor Stinner7a51a7e2020-04-03 00:40:25 +020074 self.addCleanup(signal.signal, signal.SIGHUP, old_sighup)
Victor Stinnera1838ec2019-12-09 11:57:05 +010075
76 # isatty() and close() can hang on some platforms. Set an alarm
77 # before running the test to make sure we don't hang forever.
Victor Stinner9abee722017-09-19 09:36:54 -070078 self.addCleanup(signal.alarm, 0)
Guido van Rossumd8faa362007-04-27 19:54:29 +000079 signal.alarm(10)
Fred Drake4c136ee2000-06-30 23:22:35 +000080
Guido van Rossumd8faa362007-04-27 19:54:29 +000081 def handle_sig(self, sig, frame):
82 self.fail("isatty hung")
Neal Norwitz7d814522003-03-21 01:39:14 +000083
Victor Stinnera1838ec2019-12-09 11:57:05 +010084 @staticmethod
Victor Stinner7a51a7e2020-04-03 00:40:25 +020085 def handle_sighup(signum, frame):
86 # bpo-38547: if the process is the session leader, os.close(master_fd)
Victor Stinnera1838ec2019-12-09 11:57:05 +010087 # of "master_fd, slave_name = pty.master_open()" raises SIGHUP
88 # signal: just ignore the signal.
89 pass
90
Guido van Rossumd8faa362007-04-27 19:54:29 +000091 def test_basic(self):
92 try:
93 debug("Calling master_open()")
94 master_fd, slave_name = pty.master_open()
95 debug("Got master_fd '%d', slave_name '%s'" %
96 (master_fd, slave_name))
97 debug("Calling slave_open(%r)" % (slave_name,))
98 slave_fd = pty.slave_open(slave_name)
99 debug("Got slave_fd '%d'" % slave_fd)
100 except OSError:
101 # " An optional feature could not be imported " ... ?
Benjamin Petersone549ead2009-03-28 21:42:05 +0000102 raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
Neal Norwitz7d814522003-03-21 01:39:14 +0000103
Guido van Rossumd8faa362007-04-27 19:54:29 +0000104 self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
Neal Norwitz7d814522003-03-21 01:39:14 +0000105
Guido van Rossum360e4b82007-05-14 22:51:27 +0000106 # Solaris requires reading the fd before anything is returned.
107 # My guess is that since we open and close the slave fd
108 # in master_open(), we need to read the EOF.
109
110 # Ensure the fd is non-blocking in case there's nothing to read.
Victor Stinner1db9e7b2014-07-29 22:32:47 +0200111 blocking = os.get_blocking(master_fd)
Guido van Rossum360e4b82007-05-14 22:51:27 +0000112 try:
Victor Stinner1db9e7b2014-07-29 22:32:47 +0200113 os.set_blocking(master_fd, False)
114 try:
115 s1 = os.read(master_fd, 1024)
116 self.assertEqual(b'', s1)
117 except OSError as e:
118 if e.errno != errno.EAGAIN:
119 raise
120 finally:
121 # Restore the original flags.
122 os.set_blocking(master_fd, blocking)
Guido van Rossum360e4b82007-05-14 22:51:27 +0000123
Guido van Rossumd8faa362007-04-27 19:54:29 +0000124 debug("Writing to slave_fd")
125 os.write(slave_fd, TEST_STRING_1)
Cornelius Diekmanne6f62f62017-10-02 11:39:55 +0200126 s1 = _readline(master_fd)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000127 self.assertEqual(b'I wish to buy a fish license.\n',
128 normalize_output(s1))
Neal Norwitz7d814522003-03-21 01:39:14 +0000129
Guido van Rossumd8faa362007-04-27 19:54:29 +0000130 debug("Writing chunked output")
131 os.write(slave_fd, TEST_STRING_2[:5])
132 os.write(slave_fd, TEST_STRING_2[5:])
Cornelius Diekmanne6f62f62017-10-02 11:39:55 +0200133 s2 = _readline(master_fd)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000134 self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2))
Neal Norwitz7d814522003-03-21 01:39:14 +0000135
Guido van Rossumd8faa362007-04-27 19:54:29 +0000136 os.close(slave_fd)
Victor Stinnera1838ec2019-12-09 11:57:05 +0100137 # closing master_fd can raise a SIGHUP if the process is
138 # the session leader: we installed a SIGHUP signal handler
139 # to ignore this signal.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000140 os.close(master_fd)
Thomas Wouters9fe394c2007-02-05 01:24:16 +0000141
Guido van Rossumd8faa362007-04-27 19:54:29 +0000142 def test_fork(self):
143 debug("calling pty.fork()")
144 pid, master_fd = pty.fork()
145 if pid == pty.CHILD:
146 # stdout should be connected to a tty.
147 if not os.isatty(1):
148 debug("Child's fd 1 is not a tty?!")
149 os._exit(3)
Fred Drake4c136ee2000-06-30 23:22:35 +0000150
Guido van Rossumd8faa362007-04-27 19:54:29 +0000151 # After pty.fork(), the child should already be a session leader.
152 # (on those systems that have that concept.)
153 debug("In child, calling os.setsid()")
154 try:
155 os.setsid()
156 except OSError:
157 # Good, we already were session leader
158 debug("Good: OSError was raised.")
159 pass
160 except AttributeError:
161 # Have pty, but not setsid()?
162 debug("No setsid() available?")
163 pass
164 except:
165 # We don't want this error to propagate, escaping the call to
166 # os._exit() and causing very peculiar behavior in the calling
167 # regrtest.py !
168 # Note: could add traceback printing here.
169 debug("An unexpected error was raised.")
170 os._exit(1)
171 else:
172 debug("os.setsid() succeeded! (bad!)")
173 os._exit(2)
174 os._exit(4)
175 else:
176 debug("Waiting for child (%d) to finish." % pid)
177 # In verbose mode, we have to consume the debug output from the
178 # child or the child will block, causing this test to hang in the
179 # parent's waitpid() call. The child blocks after a
180 # platform-dependent amount of data is written to its fd. On
181 # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
182 # X even the small writes in the child above will block it. Also
Andrew Svetlov737fb892012-12-18 21:14:22 +0200183 # on Linux, the read() will raise an OSError (input/output error)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000184 # when it tries to read past the end of the buffer but the child's
185 # already exited, so catch and discard those exceptions. It's not
186 # worth checking for EIO.
187 while True:
188 try:
189 data = os.read(master_fd, 80)
190 except OSError:
191 break
192 if not data:
193 break
Alexandre Vassalottia351f772008-03-03 02:59:49 +0000194 sys.stdout.write(str(data.replace(b'\r\n', b'\n'),
195 encoding='ascii'))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000196
197 ##line = os.read(master_fd, 80)
198 ##lines = line.replace('\r\n', '\n').split('\n')
199 ##if False and lines != ['In child, calling os.setsid()',
200 ## 'Good: OSError was raised.', '']:
201 ## raise TestFailed("Unexpected output from child: %r" % line)
202
203 (pid, status) = os.waitpid(pid, 0)
Victor Stinner65a796e2020-04-01 18:49:29 +0200204 res = os.waitstatus_to_exitcode(status)
205 debug("Child (%d) exited with code %d (status %d)." % (pid, res, status))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000206 if res == 1:
207 self.fail("Child raised an unexpected exception in os.setsid()")
208 elif res == 2:
209 self.fail("pty.fork() failed to make child a session leader.")
210 elif res == 3:
211 self.fail("Child spawned by pty.fork() did not have a tty as stdout")
212 elif res != 4:
213 self.fail("pty.fork() failed for unknown reasons.")
214
215 ##debug("Reading from master_fd now that the child has exited")
216 ##try:
217 ## s1 = os.read(master_fd, 1024)
Andrew Svetlov8b33dd82012-12-24 19:58:48 +0200218 ##except OSError:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000219 ## pass
220 ##else:
221 ## raise TestFailed("Read from master_fd did not raise exception")
222
223 os.close(master_fd)
224
225 # pty.fork() passed.
226
Gregory P. Smith05f59532012-02-16 00:29:12 -0800227
228class SmallPtyTests(unittest.TestCase):
229 """These tests don't spawn children or hang."""
230
231 def setUp(self):
232 self.orig_stdin_fileno = pty.STDIN_FILENO
233 self.orig_stdout_fileno = pty.STDOUT_FILENO
234 self.orig_pty_select = pty.select
235 self.fds = [] # A list of file descriptors to close.
Victor Stinnerb1f7f632012-03-06 02:04:58 +0100236 self.files = []
Gregory P. Smith05f59532012-02-16 00:29:12 -0800237 self.select_rfds_lengths = []
238 self.select_rfds_results = []
239
240 def tearDown(self):
241 pty.STDIN_FILENO = self.orig_stdin_fileno
242 pty.STDOUT_FILENO = self.orig_stdout_fileno
243 pty.select = self.orig_pty_select
Victor Stinnerb1f7f632012-03-06 02:04:58 +0100244 for file in self.files:
245 try:
246 file.close()
247 except OSError:
248 pass
Gregory P. Smith05f59532012-02-16 00:29:12 -0800249 for fd in self.fds:
250 try:
251 os.close(fd)
Victor Stinnerb1f7f632012-03-06 02:04:58 +0100252 except OSError:
Gregory P. Smith05f59532012-02-16 00:29:12 -0800253 pass
254
255 def _pipe(self):
256 pipe_fds = os.pipe()
257 self.fds.extend(pipe_fds)
258 return pipe_fds
259
Victor Stinnerb1f7f632012-03-06 02:04:58 +0100260 def _socketpair(self):
261 socketpair = socket.socketpair()
262 self.files.extend(socketpair)
263 return socketpair
264
Gregory P. Smith05f59532012-02-16 00:29:12 -0800265 def _mock_select(self, rfds, wfds, xfds):
266 # This will raise IndexError when no more expected calls exist.
267 self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
268 return self.select_rfds_results.pop(0), [], []
269
270 def test__copy_to_each(self):
271 """Test the normal data case on both master_fd and stdin."""
272 read_from_stdout_fd, mock_stdout_fd = self._pipe()
273 pty.STDOUT_FILENO = mock_stdout_fd
274 mock_stdin_fd, write_to_stdin_fd = self._pipe()
275 pty.STDIN_FILENO = mock_stdin_fd
Victor Stinnerb1f7f632012-03-06 02:04:58 +0100276 socketpair = self._socketpair()
Gregory P. Smith05f59532012-02-16 00:29:12 -0800277 masters = [s.fileno() for s in socketpair]
Gregory P. Smith05f59532012-02-16 00:29:12 -0800278
279 # Feed data. Smaller than PIPEBUF. These writes will not block.
280 os.write(masters[1], b'from master')
281 os.write(write_to_stdin_fd, b'from stdin')
282
283 # Expect two select calls, the last one will cause IndexError
284 pty.select = self._mock_select
285 self.select_rfds_lengths.append(2)
286 self.select_rfds_results.append([mock_stdin_fd, masters[0]])
287 self.select_rfds_lengths.append(2)
288
289 with self.assertRaises(IndexError):
290 pty._copy(masters[0])
291
292 # Test that the right data went to the right places.
293 rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
Gregory P. Smith5b791fb2012-02-16 00:35:43 -0800294 self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
Gregory P. Smith05f59532012-02-16 00:29:12 -0800295 self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
296 self.assertEqual(os.read(masters[1], 20), b'from stdin')
297
298 def test__copy_eof_on_all(self):
299 """Test the empty read EOF case on both master_fd and stdin."""
300 read_from_stdout_fd, mock_stdout_fd = self._pipe()
301 pty.STDOUT_FILENO = mock_stdout_fd
302 mock_stdin_fd, write_to_stdin_fd = self._pipe()
303 pty.STDIN_FILENO = mock_stdin_fd
Victor Stinnerb1f7f632012-03-06 02:04:58 +0100304 socketpair = self._socketpair()
Gregory P. Smith05f59532012-02-16 00:29:12 -0800305 masters = [s.fileno() for s in socketpair]
Gregory P. Smith05f59532012-02-16 00:29:12 -0800306
Gregory P. Smith05f59532012-02-16 00:29:12 -0800307 socketpair[1].close()
308 os.close(write_to_stdin_fd)
309
310 # Expect two select calls, the last one will cause IndexError
311 pty.select = self._mock_select
312 self.select_rfds_lengths.append(2)
313 self.select_rfds_results.append([mock_stdin_fd, masters[0]])
314 # We expect that both fds were removed from the fds list as they
315 # both encountered an EOF before the second select call.
316 self.select_rfds_lengths.append(0)
317
318 with self.assertRaises(IndexError):
319 pty._copy(masters[0])
320
321
Zachary Ware38c707e2015-04-13 15:00:43 -0500322def tearDownModule():
323 reap_children()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000324
325if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500326 unittest.main()