Fixes Issue #14635: telnetlib will use poll() rather than select() when possible
to avoid failing due to the select() file descriptor limit.
Contributed by Akintayo Holder and under the Google contributor agreement.
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py
index 5a179f0..c66f49b 100644
--- a/Lib/test/test_telnetlib.py
+++ b/Lib/test/test_telnetlib.py
@@ -135,6 +135,28 @@
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), 'not seen')
+ def test_read_until_with_poll(self):
+ """Use select.poll() to implement telnet.read_until()."""
+ want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
+ self.dataq.put(want)
+ telnet = telnetlib.Telnet(HOST, self.port)
+ if not telnet._has_poll:
+ raise unittest.SkipTest('select.poll() is required')
+ telnet._has_poll = True
+ self.dataq.join()
+ data = telnet.read_until('match')
+ self.assertEqual(data, ''.join(want[:-2]))
+
+ def test_read_until_with_select(self):
+ """Use select.select() to implement telnet.read_until()."""
+ want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
+ self.dataq.put(want)
+ telnet = telnetlib.Telnet(HOST, self.port)
+ telnet._has_poll = False
+ self.dataq.join()
+ data = telnet.read_until('match')
+ self.assertEqual(data, ''.join(want[:-2]))
+
def test_read_all_A(self):
"""
read_all()
@@ -357,8 +379,75 @@
self.assertEqual('', telnet.read_sb_data())
nego.sb_getter = None # break the nego => telnet cycle
+
+class ExpectTests(TestCase):
+ def setUp(self):
+ self.evt = threading.Event()
+ self.dataq = Queue.Queue()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(10)
+ self.port = test_support.bind_port(self.sock)
+ self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
+ self.dataq))
+ self.thread.start()
+ self.evt.wait()
+
+ def tearDown(self):
+ self.thread.join()
+
+ # use a similar approach to testing timeouts as test_timeout.py
+ # these will never pass 100% but make the fuzz big enough that it is rare
+ block_long = 0.6
+ block_short = 0.3
+ def test_expect_A(self):
+ """
+ expect(expected, [timeout])
+ Read until the expected string has been seen, or a timeout is
+ hit (default is no timeout); may block.
+ """
+ want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
+ self.dataq.put(want)
+ telnet = telnetlib.Telnet(HOST, self.port)
+ self.dataq.join()
+ (_,_,data) = telnet.expect(['match'])
+ self.assertEqual(data, ''.join(want[:-2]))
+
+ def test_expect_B(self):
+ # test the timeout - it does NOT raise socket.timeout
+ want = ['hello', self.block_long, 'not seen', EOF_sigil]
+ self.dataq.put(want)
+ telnet = telnetlib.Telnet(HOST, self.port)
+ self.dataq.join()
+ (_,_,data) = telnet.expect(['not seen'], self.block_short)
+ self.assertEqual(data, want[0])
+ self.assertEqual(telnet.read_all(), 'not seen')
+
+ def test_expect_with_poll(self):
+ """Use select.poll() to implement telnet.expect()."""
+ want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
+ self.dataq.put(want)
+ telnet = telnetlib.Telnet(HOST, self.port)
+ if not telnet._has_poll:
+ raise unittest.SkipTest('select.poll() is required')
+ telnet._has_poll = True
+ self.dataq.join()
+ (_,_,data) = telnet.expect(['match'])
+ self.assertEqual(data, ''.join(want[:-2]))
+
+ def test_expect_with_select(self):
+ """Use select.select() to implement telnet.expect()."""
+ want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
+ self.dataq.put(want)
+ telnet = telnetlib.Telnet(HOST, self.port)
+ telnet._has_poll = False
+ self.dataq.join()
+ (_,_,data) = telnet.expect(['match'])
+ self.assertEqual(data, ''.join(want[:-2]))
+
+
def test_main(verbose=None):
- test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
+ test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
+ ExpectTests)
if __name__ == '__main__':
test_main()