bpo-41818: Updated tests for the standard pty library (GH-22962)

diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py
index 7ca0557..7de5688 100644
--- a/Lib/test/test_pty.py
+++ b/Lib/test/test_pty.py
@@ -14,9 +14,22 @@
 import io # readline
 import unittest
 
+import struct
+import tty
+import fcntl
+import platform
+import warnings
+
 TEST_STRING_1 = b"I wish to buy a fish license.\n"
 TEST_STRING_2 = b"For my pet fish, Eric.\n"
 
+try:
+    _TIOCGWINSZ = tty.TIOCGWINSZ
+    _TIOCSWINSZ = tty.TIOCSWINSZ
+    _HAVE_WINSZ = True
+except AttributeError:
+    _HAVE_WINSZ = False
+
 if verbose:
     def debug(msg):
         print(msg)
@@ -60,6 +73,27 @@ def _readline(fd):
     reader = io.FileIO(fd, mode='rb', closefd=False)
     return reader.readline()
 
+def expectedFailureIfStdinIsTTY(fun):
+    # avoid isatty() for now
+    try:
+        tty.tcgetattr(pty.STDIN_FILENO)
+        return unittest.expectedFailure(fun)
+    except tty.error:
+        pass
+    return fun
+
+def expectedFailureOnBSD(fun):
+    PLATFORM = platform.system()
+    if PLATFORM.endswith("BSD") or PLATFORM == "Darwin":
+        return unittest.expectedFailure(fun)
+    return fun
+
+def _get_term_winsz(fd):
+    s = struct.pack("HHHH", 0, 0, 0, 0)
+    return fcntl.ioctl(fd, _TIOCGWINSZ, s)
+
+def _set_term_winsz(fd, winsz):
+    fcntl.ioctl(fd, _TIOCSWINSZ, winsz)
 
 
 # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
@@ -78,6 +112,20 @@ def setUp(self):
         self.addCleanup(signal.alarm, 0)
         signal.alarm(10)
 
+        # Save original stdin window size
+        self.stdin_rows = None
+        self.stdin_cols = None
+        if _HAVE_WINSZ:
+            try:
+                stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
+                self.stdin_rows = stdin_dim.lines
+                self.stdin_cols = stdin_dim.columns
+                old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
+                                              self.stdin_cols, 0, 0)
+                self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
+            except OSError:
+                pass
+
     def handle_sig(self, sig, frame):
         self.fail("isatty hung")
 
@@ -86,26 +134,65 @@ def handle_sighup(signum, frame):
         # bpo-38547: if the process is the session leader, os.close(master_fd)
         # of "master_fd, slave_name = pty.master_open()" raises SIGHUP
         # signal: just ignore the signal.
+        #
+        # NOTE: the above comment is from an older version of the test;
+        # master_open() is not being used anymore.
         pass
 
-    def test_basic(self):
+    @expectedFailureIfStdinIsTTY
+    def test_openpty(self):
         try:
-            debug("Calling master_open()")
-            master_fd, slave_name = pty.master_open()
-            debug("Got master_fd '%d', slave_name '%s'" %
-                  (master_fd, slave_name))
-            debug("Calling slave_open(%r)" % (slave_name,))
-            slave_fd = pty.slave_open(slave_name)
-            debug("Got slave_fd '%d'" % slave_fd)
+            mode = tty.tcgetattr(pty.STDIN_FILENO)
+        except tty.error:
+            # not a tty or bad/closed fd
+            debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
+            mode = None
+
+        new_stdin_winsz = None
+        if self.stdin_rows != None and self.stdin_cols != None:
+            try:
+                debug("Setting pty.STDIN_FILENO window size")
+                # Set number of columns and rows to be the
+                # floors of 1/5 of respective original values
+                target_stdin_winsz = struct.pack("HHHH", self.stdin_rows//5,
+                                                 self.stdin_cols//5, 0, 0)
+                _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)
+
+                # Were we able to set the window size
+                # of pty.STDIN_FILENO successfully?
+                new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
+                self.assertEqual(new_stdin_winsz, target_stdin_winsz,
+                                 "pty.STDIN_FILENO window size unchanged")
+            except OSError:
+                warnings.warn("Failed to set pty.STDIN_FILENO window size")
+                pass
+
+        try:
+            debug("Calling pty.openpty()")
+            try:
+                master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
+            except TypeError:
+                master_fd, slave_fd = pty.openpty()
+            debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
         except OSError:
             # " An optional feature could not be imported " ... ?
             raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
 
-        self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
+        self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")
+
+        if mode:
+            self.assertEqual(tty.tcgetattr(slave_fd), mode,
+                             "openpty() failed to set slave termios")
+        if new_stdin_winsz:
+            self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
+                             "openpty() failed to set slave window size")
 
         # Solaris requires reading the fd before anything is returned.
         # My guess is that since we open and close the slave fd
         # in master_open(), we need to read the EOF.
+        #
+        # NOTE: the above comment is from an older version of the test;
+        # master_open() is not being used anymore.
 
         # Ensure the fd is non-blocking in case there's nothing to read.
         blocking = os.get_blocking(master_fd)
@@ -222,8 +309,20 @@ def test_fork(self):
 
         os.close(master_fd)
 
-        # pty.fork() passed.
+    @expectedFailureOnBSD
+    def test_master_read(self):
+        debug("Calling pty.openpty()")
+        master_fd, slave_fd = pty.openpty()
+        debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
 
+        debug("Closing slave_fd")
+        os.close(slave_fd)
+
+        debug("Reading from master_fd")
+        with self.assertRaises(OSError):
+            os.read(master_fd, 1)
+
+        os.close(master_fd)
 
 class SmallPtyTests(unittest.TestCase):
     """These tests don't spawn children or hang."""
@@ -262,8 +361,9 @@ def _socketpair(self):
         self.files.extend(socketpair)
         return socketpair
 
-    def _mock_select(self, rfds, wfds, xfds):
+    def _mock_select(self, rfds, wfds, xfds, timeout=0):
         # This will raise IndexError when no more expected calls exist.
+        # This ignores the timeout
         self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
         return self.select_rfds_results.pop(0), [], []