| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 1 | import socket | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 2 | import select | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 3 | import telnetlib | 
 | 4 | import time | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 5 | import contextlib | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 6 |  | 
| Gregory P. Smith | 5bcd005 | 2012-07-16 13:34:50 -0700 | [diff] [blame] | 7 | import unittest | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 8 | from unittest import TestCase | 
| Benjamin Peterson | ee8712c | 2008-05-20 21:35:26 +0000 | [diff] [blame] | 9 | from test import support | 
| Victor Stinner | 45df820 | 2010-04-28 22:31:17 +0000 | [diff] [blame] | 10 | threading = support.import_module('threading') | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 11 |  | 
| Benjamin Peterson | ee8712c | 2008-05-20 21:35:26 +0000 | [diff] [blame] | 12 | HOST = support.HOST | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 13 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 14 | def server(evt, serv): | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 15 |     serv.listen(5) | 
| Christian Heimes | af98da1 | 2008-01-27 15:18:18 +0000 | [diff] [blame] | 16 |     evt.set() | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 17 |     try: | 
 | 18 |         conn, addr = serv.accept() | 
| Jesus Cea | bc91b46 | 2011-11-08 16:24:43 +0100 | [diff] [blame] | 19 |         conn.close() | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 20 |     except socket.timeout: | 
 | 21 |         pass | 
 | 22 |     finally: | 
 | 23 |         serv.close() | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 24 |  | 
 | 25 | class GeneralTests(TestCase): | 
 | 26 |  | 
 | 27 |     def setUp(self): | 
 | 28 |         self.evt = threading.Event() | 
| Christian Heimes | 5e69685 | 2008-04-09 08:37:03 +0000 | [diff] [blame] | 29 |         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
| Jesus Cea | bc91b46 | 2011-11-08 16:24:43 +0100 | [diff] [blame] | 30 |         self.sock.settimeout(60)  # Safety net. Look issue 11812 | 
| Benjamin Peterson | ee8712c | 2008-05-20 21:35:26 +0000 | [diff] [blame] | 31 |         self.port = support.bind_port(self.sock) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 32 |         self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) | 
| Jesus Cea | bc91b46 | 2011-11-08 16:24:43 +0100 | [diff] [blame] | 33 |         self.thread.setDaemon(True) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 34 |         self.thread.start() | 
| Christian Heimes | af98da1 | 2008-01-27 15:18:18 +0000 | [diff] [blame] | 35 |         self.evt.wait() | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 36 |  | 
 | 37 |     def tearDown(self): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 38 |         self.thread.join() | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 39 |  | 
 | 40 |     def testBasic(self): | 
 | 41 |         # connects | 
| Christian Heimes | 5e69685 | 2008-04-09 08:37:03 +0000 | [diff] [blame] | 42 |         telnet = telnetlib.Telnet(HOST, self.port) | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 43 |         telnet.sock.close() | 
 | 44 |  | 
 | 45 |     def testTimeoutDefault(self): | 
| Georg Brandl | f78e02b | 2008-06-10 17:40:04 +0000 | [diff] [blame] | 46 |         self.assertTrue(socket.getdefaulttimeout() is None) | 
 | 47 |         socket.setdefaulttimeout(30) | 
 | 48 |         try: | 
| Jesus Cea | bc91b46 | 2011-11-08 16:24:43 +0100 | [diff] [blame] | 49 |             telnet = telnetlib.Telnet(HOST, self.port) | 
| Georg Brandl | f78e02b | 2008-06-10 17:40:04 +0000 | [diff] [blame] | 50 |         finally: | 
 | 51 |             socket.setdefaulttimeout(None) | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 52 |         self.assertEqual(telnet.sock.gettimeout(), 30) | 
 | 53 |         telnet.sock.close() | 
 | 54 |  | 
 | 55 |     def testTimeoutNone(self): | 
 | 56 |         # None, having other default | 
| Georg Brandl | f78e02b | 2008-06-10 17:40:04 +0000 | [diff] [blame] | 57 |         self.assertTrue(socket.getdefaulttimeout() is None) | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 58 |         socket.setdefaulttimeout(30) | 
 | 59 |         try: | 
| Christian Heimes | 5e69685 | 2008-04-09 08:37:03 +0000 | [diff] [blame] | 60 |             telnet = telnetlib.Telnet(HOST, self.port, timeout=None) | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 61 |         finally: | 
| Georg Brandl | f78e02b | 2008-06-10 17:40:04 +0000 | [diff] [blame] | 62 |             socket.setdefaulttimeout(None) | 
 | 63 |         self.assertTrue(telnet.sock.gettimeout() is None) | 
 | 64 |         telnet.sock.close() | 
 | 65 |  | 
 | 66 |     def testTimeoutValue(self): | 
| Jesus Cea | bc91b46 | 2011-11-08 16:24:43 +0100 | [diff] [blame] | 67 |         telnet = telnetlib.Telnet(HOST, self.port, timeout=30) | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 68 |         self.assertEqual(telnet.sock.gettimeout(), 30) | 
 | 69 |         telnet.sock.close() | 
 | 70 |  | 
| Georg Brandl | f78e02b | 2008-06-10 17:40:04 +0000 | [diff] [blame] | 71 |     def testTimeoutOpen(self): | 
 | 72 |         telnet = telnetlib.Telnet() | 
| Jesus Cea | bc91b46 | 2011-11-08 16:24:43 +0100 | [diff] [blame] | 73 |         telnet.open(HOST, self.port, timeout=30) | 
| Georg Brandl | f78e02b | 2008-06-10 17:40:04 +0000 | [diff] [blame] | 74 |         self.assertEqual(telnet.sock.gettimeout(), 30) | 
 | 75 |         telnet.sock.close() | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 76 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 77 | class SocketStub(object): | 
 | 78 |     ''' a socket proxy that re-defines sendall() ''' | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 79 |     def __init__(self, reads=()): | 
 | 80 |         self.reads = list(reads)  # Intentionally make a copy. | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 81 |         self.writes = [] | 
 | 82 |         self.block = False | 
 | 83 |     def sendall(self, data): | 
 | 84 |         self.writes.append(data) | 
 | 85 |     def recv(self, size): | 
 | 86 |         out = b'' | 
 | 87 |         while self.reads and len(out) < size: | 
 | 88 |             out += self.reads.pop(0) | 
 | 89 |         if len(out) > size: | 
 | 90 |             self.reads.insert(0, out[size:]) | 
 | 91 |             out = out[:size] | 
 | 92 |         return out | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 93 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 94 | class TelnetAlike(telnetlib.Telnet): | 
 | 95 |     def fileno(self): | 
 | 96 |         raise NotImplementedError() | 
 | 97 |     def close(self): pass | 
 | 98 |     def sock_avail(self): | 
 | 99 |         return (not self.sock.block) | 
 | 100 |     def msg(self, msg, *args): | 
 | 101 |         with support.captured_stdout() as out: | 
 | 102 |             telnetlib.Telnet.msg(self, msg, *args) | 
 | 103 |         self._messages += out.getvalue() | 
 | 104 |         return | 
 | 105 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 106 | def mock_select(*s_args): | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 107 |     block = False | 
 | 108 |     for l in s_args: | 
 | 109 |         for fob in l: | 
 | 110 |             if isinstance(fob, TelnetAlike): | 
 | 111 |                 block = fob.sock.block | 
 | 112 |     if block: | 
 | 113 |         return [[], [], []] | 
 | 114 |     else: | 
 | 115 |         return s_args | 
 | 116 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 117 | class MockPoller(object): | 
 | 118 |     test_case = None  # Set during TestCase setUp. | 
 | 119 |  | 
 | 120 |     def __init__(self): | 
 | 121 |         self._file_objs = [] | 
 | 122 |  | 
 | 123 |     def register(self, fd, eventmask): | 
 | 124 |         self.test_case.assertTrue(hasattr(fd, 'fileno'), fd) | 
 | 125 |         self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI) | 
 | 126 |         self._file_objs.append(fd) | 
 | 127 |  | 
 | 128 |     def poll(self, timeout=None): | 
 | 129 |         block = False | 
 | 130 |         for fob in self._file_objs: | 
 | 131 |             if isinstance(fob, TelnetAlike): | 
 | 132 |                 block = fob.sock.block | 
 | 133 |         if block: | 
 | 134 |             return [] | 
 | 135 |         else: | 
 | 136 |             return zip(self._file_objs, [select.POLLIN]*len(self._file_objs)) | 
 | 137 |  | 
 | 138 |     def unregister(self, fd): | 
 | 139 |         self._file_objs.remove(fd) | 
 | 140 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 141 | @contextlib.contextmanager | 
 | 142 | def test_socket(reads): | 
 | 143 |     def new_conn(*ignored): | 
 | 144 |         return SocketStub(reads) | 
 | 145 |     try: | 
 | 146 |         old_conn = socket.create_connection | 
 | 147 |         socket.create_connection = new_conn | 
 | 148 |         yield None | 
 | 149 |     finally: | 
 | 150 |         socket.create_connection = old_conn | 
 | 151 |     return | 
 | 152 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 153 | def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 154 |     ''' return a telnetlib.Telnet object that uses a SocketStub with | 
 | 155 |         reads queued up to be read ''' | 
 | 156 |     for x in reads: | 
 | 157 |         assert type(x) is bytes, x | 
 | 158 |     with test_socket(reads): | 
 | 159 |         telnet = cls('dummy', 0) | 
 | 160 |         telnet._messages = '' # debuglevel output | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 161 |         if use_poll is not None: | 
 | 162 |             if use_poll and not telnet._has_poll: | 
 | 163 |                 raise unittest.SkipTest('select.poll() required.') | 
 | 164 |             telnet._has_poll = use_poll | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 165 |     return telnet | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 166 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 167 |  | 
 | 168 | class ExpectAndReadTestCase(TestCase): | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 169 |     def setUp(self): | 
 | 170 |         self.old_select = select.select | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 171 |         select.select = mock_select | 
| Gregory P. Smith | 954d46b | 2012-07-16 15:48:30 -0700 | [diff] [blame] | 172 |         self.old_poll = False | 
| Gregory P. Smith | ac14aa5 | 2012-07-16 13:38:45 -0700 | [diff] [blame] | 173 |         if hasattr(select, 'poll'): | 
 | 174 |             self.old_poll = select.poll | 
 | 175 |             select.poll = MockPoller | 
 | 176 |             MockPoller.test_case = self | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 177 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 178 |     def tearDown(self): | 
| Gregory P. Smith | 954d46b | 2012-07-16 15:48:30 -0700 | [diff] [blame] | 179 |         if self.old_poll: | 
| Gregory P. Smith | ac14aa5 | 2012-07-16 13:38:45 -0700 | [diff] [blame] | 180 |             MockPoller.test_case = None | 
 | 181 |             select.poll = self.old_poll | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 182 |         select.select = self.old_select | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 183 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 184 |  | 
 | 185 | class ReadTests(ExpectAndReadTestCase): | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 186 |     def test_read_until(self): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 187 |         """ | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 188 |         read_until(expected, timeout=None) | 
 | 189 |         test the blocking version of read_util | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 190 |         """ | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 191 |         want = [b'xxxmatchyyy'] | 
 | 192 |         telnet = test_telnet(want) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 193 |         data = telnet.read_until(b'match') | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 194 |         self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 195 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 196 |         reads = [b'x' * 50, b'match', b'y' * 50] | 
 | 197 |         expect = b''.join(reads[:-1]) | 
 | 198 |         telnet = test_telnet(reads) | 
 | 199 |         data = telnet.read_until(b'match') | 
 | 200 |         self.assertEqual(data, expect) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 201 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 202 |     def test_read_until_with_poll(self): | 
 | 203 |         """Use select.poll() to implement telnet.read_until().""" | 
 | 204 |         want = [b'x' * 10, b'match', b'y' * 10] | 
 | 205 |         telnet = test_telnet(want, use_poll=True) | 
 | 206 |         select.select = lambda *_: self.fail('unexpected select() call.') | 
 | 207 |         data = telnet.read_until(b'match') | 
 | 208 |         self.assertEqual(data, b''.join(want[:-1])) | 
 | 209 |  | 
 | 210 |     def test_read_until_with_select(self): | 
 | 211 |         """Use select.select() to implement telnet.read_until().""" | 
 | 212 |         want = [b'x' * 10, b'match', b'y' * 10] | 
 | 213 |         telnet = test_telnet(want, use_poll=False) | 
| Gregory P. Smith | 954d46b | 2012-07-16 15:48:30 -0700 | [diff] [blame] | 214 |         if self.old_poll: | 
 | 215 |             select.poll = lambda *_: self.fail('unexpected poll() call.') | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 216 |         data = telnet.read_until(b'match') | 
 | 217 |         self.assertEqual(data, b''.join(want[:-1])) | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 218 |  | 
 | 219 |     def test_read_all(self): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 220 |         """ | 
 | 221 |         read_all() | 
 | 222 |           Read all data until EOF; may block. | 
 | 223 |         """ | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 224 |         reads = [b'x' * 500, b'y' * 500, b'z' * 500] | 
 | 225 |         expect = b''.join(reads) | 
 | 226 |         telnet = test_telnet(reads) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 227 |         data = telnet.read_all() | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 228 |         self.assertEqual(data, expect) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 229 |         return | 
 | 230 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 231 |     def test_read_some(self): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 232 |         """ | 
 | 233 |         read_some() | 
 | 234 |           Read at least one byte or EOF; may block. | 
 | 235 |         """ | 
 | 236 |         # test 'at least one byte' | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 237 |         telnet = test_telnet([b'x' * 500]) | 
 | 238 |         data = telnet.read_some() | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 239 |         self.assertTrue(len(data) >= 1) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 240 |         # test EOF | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 241 |         telnet = test_telnet() | 
 | 242 |         data = telnet.read_some() | 
 | 243 |         self.assertEqual(b'', data) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 244 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 245 |     def _read_eager(self, func_name): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 246 |         """ | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 247 |         read_*_eager() | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 248 |           Read all data available already queued or on the socket, | 
 | 249 |           without blocking. | 
 | 250 |         """ | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 251 |         want = b'x' * 100 | 
 | 252 |         telnet = test_telnet([want]) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 253 |         func = getattr(telnet, func_name) | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 254 |         telnet.sock.block = True | 
 | 255 |         self.assertEqual(b'', func()) | 
 | 256 |         telnet.sock.block = False | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 257 |         data = b'' | 
 | 258 |         while True: | 
 | 259 |             try: | 
 | 260 |                 data += func() | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 261 |             except EOFError: | 
 | 262 |                 break | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 263 |         self.assertEqual(data, want) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 264 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 265 |     def test_read_eager(self): | 
 | 266 |         # read_eager and read_very_eager make the same gaurantees | 
 | 267 |         # (they behave differently but we only test the gaurantees) | 
 | 268 |         self._read_eager('read_eager') | 
 | 269 |         self._read_eager('read_very_eager') | 
 | 270 |         # NB -- we need to test the IAC block which is mentioned in the | 
 | 271 |         # docstring but not in the module docs | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 272 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 273 |     def read_very_lazy(self): | 
 | 274 |         want = b'x' * 100 | 
 | 275 |         telnet = test_telnet([want]) | 
 | 276 |         self.assertEqual(b'', telnet.read_very_lazy()) | 
 | 277 |         while telnet.sock.reads: | 
 | 278 |             telnet.fill_rawq() | 
 | 279 |         data = telnet.read_very_lazy() | 
 | 280 |         self.assertEqual(want, data) | 
 | 281 |         self.assertRaises(EOFError, telnet.read_very_lazy) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 282 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 283 |     def test_read_lazy(self): | 
 | 284 |         want = b'x' * 100 | 
 | 285 |         telnet = test_telnet([want]) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 286 |         self.assertEqual(b'', telnet.read_lazy()) | 
 | 287 |         data = b'' | 
 | 288 |         while True: | 
 | 289 |             try: | 
 | 290 |                 read_data = telnet.read_lazy() | 
 | 291 |                 data += read_data | 
 | 292 |                 if not read_data: | 
 | 293 |                     telnet.fill_rawq() | 
 | 294 |             except EOFError: | 
 | 295 |                 break | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 296 |             self.assertTrue(want.startswith(data)) | 
 | 297 |         self.assertEqual(data, want) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 298 |  | 
 | 299 | class nego_collector(object): | 
 | 300 |     def __init__(self, sb_getter=None): | 
 | 301 |         self.seen = b'' | 
 | 302 |         self.sb_getter = sb_getter | 
 | 303 |         self.sb_seen = b'' | 
 | 304 |  | 
 | 305 |     def do_nego(self, sock, cmd, opt): | 
 | 306 |         self.seen += cmd + opt | 
 | 307 |         if cmd == tl.SE and self.sb_getter: | 
 | 308 |             sb_data = self.sb_getter() | 
 | 309 |             self.sb_seen += sb_data | 
 | 310 |  | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 311 | tl = telnetlib | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 312 |  | 
 | 313 | class WriteTests(TestCase): | 
 | 314 |     '''The only thing that write does is replace each tl.IAC for | 
 | 315 |     tl.IAC+tl.IAC''' | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 316 |  | 
 | 317 |     def test_write(self): | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 318 |         data_sample = [b'data sample without IAC', | 
 | 319 |                        b'data sample with' + tl.IAC + b' one IAC', | 
 | 320 |                        b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, | 
 | 321 |                        tl.IAC, | 
 | 322 |                        b''] | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 323 |         for data in data_sample: | 
 | 324 |             telnet = test_telnet() | 
 | 325 |             telnet.write(data) | 
 | 326 |             written = b''.join(telnet.sock.writes) | 
 | 327 |             self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) | 
| Jack Diederich | 36596a3 | 2009-07-26 22:23:04 +0000 | [diff] [blame] | 328 |  | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 329 | class OptionTests(TestCase): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 330 |     # RFC 854 commands | 
 | 331 |     cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] | 
 | 332 |  | 
 | 333 |     def _test_command(self, data): | 
 | 334 |         """ helper for testing IAC + cmd """ | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 335 |         telnet = test_telnet(data) | 
 | 336 |         data_len = len(b''.join(data)) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 337 |         nego = nego_collector() | 
 | 338 |         telnet.set_option_negotiation_callback(nego.do_nego) | 
 | 339 |         txt = telnet.read_all() | 
 | 340 |         cmd = nego.seen | 
 | 341 |         self.assertTrue(len(cmd) > 0) # we expect at least one command | 
| Benjamin Peterson | 577473f | 2010-01-19 00:09:57 +0000 | [diff] [blame] | 342 |         self.assertIn(cmd[:1], self.cmds) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 343 |         self.assertEqual(cmd[1:2], tl.NOOPT) | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 344 |         self.assertEqual(data_len, len(txt + cmd)) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 345 |         nego.sb_getter = None # break the nego => telnet cycle | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 346 |  | 
 | 347 |     def test_IAC_commands(self): | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 348 |         for cmd in self.cmds: | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 349 |             self._test_command([tl.IAC, cmd]) | 
 | 350 |             self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) | 
 | 351 |             self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 352 |         # all at once | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 353 |         self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 354 |  | 
 | 355 |     def test_SB_commands(self): | 
 | 356 |         # RFC 855, subnegotiations portion | 
 | 357 |         send = [tl.IAC + tl.SB + tl.IAC + tl.SE, | 
 | 358 |                 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, | 
 | 359 |                 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, | 
 | 360 |                 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, | 
 | 361 |                 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 362 |                ] | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 363 |         telnet = test_telnet(send) | 
| Jack Diederich | 1c8f38c | 2009-04-10 05:33:26 +0000 | [diff] [blame] | 364 |         nego = nego_collector(telnet.read_sb_data) | 
 | 365 |         telnet.set_option_negotiation_callback(nego.do_nego) | 
 | 366 |         txt = telnet.read_all() | 
 | 367 |         self.assertEqual(txt, b'') | 
 | 368 |         want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd' | 
 | 369 |         self.assertEqual(nego.sb_seen, want_sb_data) | 
 | 370 |         self.assertEqual(b'', telnet.read_sb_data()) | 
 | 371 |         nego.sb_getter = None # break the nego => telnet cycle | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 372 |  | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 373 |     def test_debuglevel_reads(self): | 
| Jack Diederich | 36596a3 | 2009-07-26 22:23:04 +0000 | [diff] [blame] | 374 |         # test all the various places that self.msg(...) is called | 
 | 375 |         given_a_expect_b = [ | 
 | 376 |             # Telnet.fill_rawq | 
 | 377 |             (b'a', ": recv b''\n"), | 
 | 378 |             # Telnet.process_rawq | 
 | 379 |             (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"), | 
 | 380 |             (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"), | 
 | 381 |             (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), | 
 | 382 |             (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), | 
 | 383 |             (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), | 
| Jack Diederich | 36596a3 | 2009-07-26 22:23:04 +0000 | [diff] [blame] | 384 |            ] | 
 | 385 |         for a, b in given_a_expect_b: | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 386 |             telnet = test_telnet([a]) | 
 | 387 |             telnet.set_debuglevel(1) | 
 | 388 |             txt = telnet.read_all() | 
| Benjamin Peterson | 577473f | 2010-01-19 00:09:57 +0000 | [diff] [blame] | 389 |             self.assertIn(b, telnet._messages) | 
| Jack Diederich | 36596a3 | 2009-07-26 22:23:04 +0000 | [diff] [blame] | 390 |         return | 
 | 391 |  | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 392 |     def test_debuglevel_write(self): | 
| Jack Diederich | 1766b9d | 2009-11-06 17:20:42 +0000 | [diff] [blame] | 393 |         telnet = test_telnet() | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 394 |         telnet.set_debuglevel(1) | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 395 |         telnet.write(b'xxx') | 
 | 396 |         expected = "send b'xxx'\n" | 
| Benjamin Peterson | 577473f | 2010-01-19 00:09:57 +0000 | [diff] [blame] | 397 |         self.assertIn(expected, telnet._messages) | 
| Jack Diederich | f31f7bb | 2009-09-03 20:37:58 +0000 | [diff] [blame] | 398 |  | 
| R. David Murray | 32ef70c | 2010-12-14 14:16:20 +0000 | [diff] [blame] | 399 |     def test_debug_accepts_str_port(self): | 
 | 400 |         # Issue 10695 | 
 | 401 |         with test_socket([]): | 
 | 402 |             telnet = TelnetAlike('dummy', '0') | 
 | 403 |             telnet._messages = '' | 
 | 404 |         telnet.set_debuglevel(1) | 
 | 405 |         telnet.msg('test') | 
 | 406 |         self.assertRegex(telnet._messages, r'0.*test') | 
 | 407 |  | 
 | 408 |  | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 409 | class ExpectTests(ExpectAndReadTestCase): | 
 | 410 |     def test_expect(self): | 
 | 411 |         """ | 
 | 412 |         expect(expected, [timeout]) | 
 | 413 |           Read until the expected string has been seen, or a timeout is | 
 | 414 |           hit (default is no timeout); may block. | 
 | 415 |         """ | 
 | 416 |         want = [b'x' * 10, b'match', b'y' * 10] | 
 | 417 |         telnet = test_telnet(want) | 
 | 418 |         (_,_,data) = telnet.expect([b'match']) | 
 | 419 |         self.assertEqual(data, b''.join(want[:-1])) | 
 | 420 |  | 
 | 421 |     def test_expect_with_poll(self): | 
 | 422 |         """Use select.poll() to implement telnet.expect().""" | 
 | 423 |         want = [b'x' * 10, b'match', b'y' * 10] | 
 | 424 |         telnet = test_telnet(want, use_poll=True) | 
 | 425 |         select.select = lambda *_: self.fail('unexpected select() call.') | 
 | 426 |         (_,_,data) = telnet.expect([b'match']) | 
 | 427 |         self.assertEqual(data, b''.join(want[:-1])) | 
 | 428 |  | 
 | 429 |     def test_expect_with_select(self): | 
 | 430 |         """Use select.select() to implement telnet.expect().""" | 
 | 431 |         want = [b'x' * 10, b'match', b'y' * 10] | 
 | 432 |         telnet = test_telnet(want, use_poll=False) | 
| Gregory P. Smith | 954d46b | 2012-07-16 15:48:30 -0700 | [diff] [blame] | 433 |         if self.old_poll: | 
 | 434 |             select.poll = lambda *_: self.fail('unexpected poll() call.') | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 435 |         (_,_,data) = telnet.expect([b'match']) | 
 | 436 |         self.assertEqual(data, b''.join(want[:-1])) | 
 | 437 |  | 
 | 438 |  | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 439 | def test_main(verbose=None): | 
| Gregory P. Smith | dad5711 | 2012-07-15 23:42:26 -0700 | [diff] [blame] | 440 |     support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests, | 
 | 441 |                          ExpectTests) | 
| Guido van Rossum | d8faa36 | 2007-04-27 19:54:29 +0000 | [diff] [blame] | 442 |  | 
 | 443 | if __name__ == '__main__': | 
 | 444 |     test_main() |