blob: 6c122d7b4876e91f201d88d94f5155fd38a2fac5 [file] [log] [blame]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00001import socket
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00002import telnetlib
3import time
Jack Diederich183028e2009-04-06 02:08:44 +00004import Queue
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00005
Gregory P. Smithe4220a52012-07-16 13:36:01 -07006import unittest
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00007from unittest import TestCase
8from test import test_support
Victor Stinner6a102812010-04-27 23:55:59 +00009threading = test_support.import_module('threading')
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000010
Trent Nelsone41b0062008-04-08 23:47:30 +000011HOST = test_support.HOST
Jack Diederich183028e2009-04-06 02:08:44 +000012EOF_sigil = object()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000013
Jack Diederich183028e2009-04-06 02:08:44 +000014def server(evt, serv, dataq=None):
15 """ Open a tcp server in three steps
16 1) set evt to true to let the parent know we are ready
Jack Diederich3b2312e2009-04-07 20:22:59 +000017 2) [optional] if is not False, write the list of data from dataq.get()
18 to the socket.
Jack Diederich183028e2009-04-06 02:08:44 +000019 """
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000020 serv.listen(5)
Neal Norwitz37184292008-01-26 21:21:59 +000021 evt.set()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich183028e2009-04-06 02:08:44 +000024 if dataq:
25 data = ''
Jack Diederich7f9bb912009-04-07 23:56:57 +000026 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
Jack Diederich3b2312e2009-04-07 20:22:59 +000030 break
Jack Diederich7f9bb912009-04-07 23:56:57 +000031 if type(item) in [int, float]:
32 time.sleep(item)
Jack Diederich3b2312e2009-04-07 20:22:59 +000033 else:
Jack Diederich7f9bb912009-04-07 23:56:57 +000034 data += item
Jack Diederich183028e2009-04-06 02:08:44 +000035 written = conn.send(data)
36 data = data[written:]
Jesus Ceacb65f322011-11-08 16:06:44 +010037 conn.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000038 except socket.timeout:
39 pass
40 finally:
41 serv.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000042
43class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000044
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000045 def setUp(self):
46 self.evt = threading.Event()
Trent Nelsone41b0062008-04-08 23:47:30 +000047 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Jesus Ceacb65f322011-11-08 16:06:44 +010048 self.sock.settimeout(60) # Safety net. Look issue 11812
Trent Nelsone41b0062008-04-08 23:47:30 +000049 self.port = test_support.bind_port(self.sock)
Jack Diederich183028e2009-04-06 02:08:44 +000050 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
Jesus Ceacb65f322011-11-08 16:06:44 +010051 self.thread.setDaemon(True)
Jack Diederich183028e2009-04-06 02:08:44 +000052 self.thread.start()
Neal Norwitz37184292008-01-26 21:21:59 +000053 self.evt.wait()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000054
55 def tearDown(self):
Jack Diederich183028e2009-04-06 02:08:44 +000056 self.thread.join()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000057
58 def testBasic(self):
59 # connects
Trent Nelsone41b0062008-04-08 23:47:30 +000060 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000061 telnet.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000062
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000063 def testTimeoutDefault(self):
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000064 self.assertTrue(socket.getdefaulttimeout() is None)
65 socket.setdefaulttimeout(30)
66 try:
Jesus Ceacb65f322011-11-08 16:06:44 +010067 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000068 finally:
69 socket.setdefaulttimeout(None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000070 self.assertEqual(telnet.sock.gettimeout(), 30)
71 telnet.sock.close()
72
73 def testTimeoutNone(self):
74 # None, having other default
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000075 self.assertTrue(socket.getdefaulttimeout() is None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000076 socket.setdefaulttimeout(30)
77 try:
Trent Nelsone41b0062008-04-08 23:47:30 +000078 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000079 finally:
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000080 socket.setdefaulttimeout(None)
81 self.assertTrue(telnet.sock.gettimeout() is None)
82 telnet.sock.close()
83
84 def testTimeoutValue(self):
Jesus Ceacb65f322011-11-08 16:06:44 +010085 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000086 self.assertEqual(telnet.sock.gettimeout(), 30)
87 telnet.sock.close()
88
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000089 def testTimeoutOpen(self):
90 telnet = telnetlib.Telnet()
Jesus Ceacb65f322011-11-08 16:06:44 +010091 telnet.open(HOST, self.port, timeout=30)
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000092 self.assertEqual(telnet.sock.gettimeout(), 30)
93 telnet.sock.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000094
Ezio Melotti290c6b32013-08-25 23:56:43 +030095 def testGetters(self):
96 # Test telnet getter methods
97 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
98 t_sock = telnet.sock
99 self.assertEqual(telnet.get_socket(), t_sock)
100 self.assertEqual(telnet.fileno(), t_sock.fileno())
101 telnet.sock.close()
102
Jack Diederich183028e2009-04-06 02:08:44 +0000103def _read_setUp(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000104 self.evt = threading.Event()
105 self.dataq = Queue.Queue()
106 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Charles-François Natali3b818072011-12-19 16:11:04 +0100107 self.sock.settimeout(10)
Jack Diederich183028e2009-04-06 02:08:44 +0000108 self.port = test_support.bind_port(self.sock)
109 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
110 self.thread.start()
111 self.evt.wait()
Jack Diederich183028e2009-04-06 02:08:44 +0000112
113def _read_tearDown(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000114 self.thread.join()
115
Jack Diederich183028e2009-04-06 02:08:44 +0000116class ReadTests(TestCase):
117 setUp = _read_setUp
118 tearDown = _read_tearDown
119
Jack Diederich7f9bb912009-04-07 23:56:57 +0000120 # use a similar approach to testing timeouts as test_timeout.py
121 # these will never pass 100% but make the fuzz big enough that it is rare
122 block_long = 0.6
123 block_short = 0.3
Jack Diederich183028e2009-04-06 02:08:44 +0000124 def test_read_until_A(self):
125 """
126 read_until(expected, [timeout])
127 Read until the expected string has been seen, or a timeout is
128 hit (default is no timeout); may block.
129 """
130 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000131 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000132 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000133 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000134 data = telnet.read_until('match')
135 self.assertEqual(data, ''.join(want[:-2]))
136
137 def test_read_until_B(self):
138 # test the timeout - it does NOT raise socket.timeout
Jack Diederich7f9bb912009-04-07 23:56:57 +0000139 want = ['hello', self.block_long, 'not seen', EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000140 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000141 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000142 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000143 data = telnet.read_until('not seen', self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000144 self.assertEqual(data, want[0])
Jack Diederich7f9bb912009-04-07 23:56:57 +0000145 self.assertEqual(telnet.read_all(), 'not seen')
Jack Diederich183028e2009-04-06 02:08:44 +0000146
Gregory P. Smithe0c22202012-07-15 22:16:06 -0700147 def test_read_until_with_poll(self):
148 """Use select.poll() to implement telnet.read_until()."""
149 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
150 self.dataq.put(want)
151 telnet = telnetlib.Telnet(HOST, self.port)
152 if not telnet._has_poll:
153 raise unittest.SkipTest('select.poll() is required')
154 telnet._has_poll = True
155 self.dataq.join()
156 data = telnet.read_until('match')
157 self.assertEqual(data, ''.join(want[:-2]))
158
159 def test_read_until_with_select(self):
160 """Use select.select() to implement telnet.read_until()."""
161 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
162 self.dataq.put(want)
163 telnet = telnetlib.Telnet(HOST, self.port)
164 telnet._has_poll = False
165 self.dataq.join()
166 data = telnet.read_until('match')
167 self.assertEqual(data, ''.join(want[:-2]))
168
Jack Diederich183028e2009-04-06 02:08:44 +0000169 def test_read_all_A(self):
170 """
171 read_all()
172 Read all data until EOF; may block.
173 """
174 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000175 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000176 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000177 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000178 data = telnet.read_all()
179 self.assertEqual(data, ''.join(want[:-1]))
Jack Diederich183028e2009-04-06 02:08:44 +0000180
Jack Diederich3b2312e2009-04-07 20:22:59 +0000181 def _test_blocking(self, func):
Jack Diederich7f9bb912009-04-07 23:56:57 +0000182 self.dataq.put([self.block_long, EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000183 self.dataq.join()
184 start = time.time()
185 data = func()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000186 self.assertTrue(self.block_short <= time.time() - start)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000187
Jack Diederich183028e2009-04-06 02:08:44 +0000188 def test_read_all_B(self):
189 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
190
Jack Diederich3b2312e2009-04-07 20:22:59 +0000191 def test_read_all_C(self):
192 self.dataq.put([EOF_sigil])
193 telnet = telnetlib.Telnet(HOST, self.port)
194 self.dataq.join()
195 telnet.read_all()
196 telnet.read_all() # shouldn't raise
197
Jack Diederich183028e2009-04-06 02:08:44 +0000198 def test_read_some_A(self):
199 """
200 read_some()
201 Read at least one byte or EOF; may block.
202 """
203 # test 'at least one byte'
204 want = ['x' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000205 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000206 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000207 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000208 data = telnet.read_all()
209 self.assertTrue(len(data) >= 1)
210
211 def test_read_some_B(self):
212 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000213 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000214 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000215 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000216 self.assertEqual('', telnet.read_some())
217
Jack Diederich3b2312e2009-04-07 20:22:59 +0000218 def test_read_some_C(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000219 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
220
221 def _test_read_any_eager_A(self, func_name):
222 """
223 read_very_eager()
224 Read all data available already queued or on the socket,
225 without blocking.
226 """
Jack Diederich7f9bb912009-04-07 23:56:57 +0000227 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
228 expects = want[1] + want[2]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000229 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000230 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000231 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000232 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000233 data = ''
234 while True:
235 try:
236 data += func()
237 self.assertTrue(expects.startswith(data))
Jack Diederich183028e2009-04-06 02:08:44 +0000238 except EOFError:
239 break
240 self.assertEqual(expects, data)
241
242 def _test_read_any_eager_B(self, func_name):
243 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000244 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000245 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000246 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000247 time.sleep(self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000248 func = getattr(telnet, func_name)
249 self.assertRaises(EOFError, func)
250
251 # read_eager and read_very_eager make the same gaurantees
252 # (they behave differently but we only test the gaurantees)
253 def test_read_very_eager_A(self):
254 self._test_read_any_eager_A('read_very_eager')
255 def test_read_very_eager_B(self):
256 self._test_read_any_eager_B('read_very_eager')
257 def test_read_eager_A(self):
258 self._test_read_any_eager_A('read_eager')
259 def test_read_eager_B(self):
260 self._test_read_any_eager_B('read_eager')
261 # NB -- we need to test the IAC block which is mentioned in the docstring
262 # but not in the module docs
263
Jack Diederich183028e2009-04-06 02:08:44 +0000264 def _test_read_any_lazy_B(self, func_name):
Jack Diederich3b2312e2009-04-07 20:22:59 +0000265 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000266 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000267 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000268 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000269 telnet.fill_rawq()
270 self.assertRaises(EOFError, func)
271
Jack Diederich7f9bb912009-04-07 23:56:57 +0000272 def test_read_lazy_A(self):
273 want = ['x' * 100, EOF_sigil]
274 self.dataq.put(want)
275 telnet = telnetlib.Telnet(HOST, self.port)
276 self.dataq.join()
277 time.sleep(self.block_short)
278 self.assertEqual('', telnet.read_lazy())
279 data = ''
280 while True:
281 try:
282 read_data = telnet.read_lazy()
283 data += read_data
284 if not read_data:
285 telnet.fill_rawq()
286 except EOFError:
287 break
288 self.assertTrue(want[0].startswith(data))
289 self.assertEqual(data, want[0])
290
Jack Diederich183028e2009-04-06 02:08:44 +0000291 def test_read_lazy_B(self):
292 self._test_read_any_lazy_B('read_lazy')
293
Jack Diederich7f9bb912009-04-07 23:56:57 +0000294 def test_read_very_lazy_A(self):
295 want = ['x' * 100, EOF_sigil]
296 self.dataq.put(want)
297 telnet = telnetlib.Telnet(HOST, self.port)
298 self.dataq.join()
299 time.sleep(self.block_short)
300 self.assertEqual('', telnet.read_very_lazy())
301 data = ''
302 while True:
303 try:
304 read_data = telnet.read_very_lazy()
305 except EOFError:
306 break
307 data += read_data
308 if not read_data:
309 telnet.fill_rawq()
310 self.assertEqual('', telnet.cookedq)
311 telnet.process_rawq()
312 self.assertTrue(want[0].startswith(data))
313 self.assertEqual(data, want[0])
314
315 def test_read_very_lazy_B(self):
316 self._test_read_any_lazy_B('read_very_lazy')
317
Jack Diederich183028e2009-04-06 02:08:44 +0000318class nego_collector(object):
319 def __init__(self, sb_getter=None):
320 self.seen = ''
321 self.sb_getter = sb_getter
322 self.sb_seen = ''
323
324 def do_nego(self, sock, cmd, opt):
325 self.seen += cmd + opt
326 if cmd == tl.SE and self.sb_getter:
327 sb_data = self.sb_getter()
328 self.sb_seen += sb_data
329
330tl = telnetlib
331class OptionTests(TestCase):
332 setUp = _read_setUp
333 tearDown = _read_tearDown
334 # RFC 854 commands
335 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
336
337 def _test_command(self, data):
338 """ helper for testing IAC + cmd """
339 self.setUp()
Jack Diederich3b2312e2009-04-07 20:22:59 +0000340 self.dataq.put(data)
Jack Diederich183028e2009-04-06 02:08:44 +0000341 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000342 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000343 nego = nego_collector()
344 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000345 txt = telnet.read_all()
346 cmd = nego.seen
347 self.assertTrue(len(cmd) > 0) # we expect at least one command
Ezio Melottiaa980582010-01-23 23:04:36 +0000348 self.assertIn(cmd[0], self.cmds)
Jack Diederich183028e2009-04-06 02:08:44 +0000349 self.assertEqual(cmd[1], tl.NOOPT)
350 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
Jack Diederich7f9bb912009-04-07 23:56:57 +0000351 nego.sb_getter = None # break the nego => telnet cycle
Jack Diederich183028e2009-04-06 02:08:44 +0000352 self.tearDown()
353
354 def test_IAC_commands(self):
355 # reset our setup
Jack Diederich3b2312e2009-04-07 20:22:59 +0000356 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000357 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000358 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000359 self.tearDown()
360
361 for cmd in self.cmds:
362 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
363 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
364 self._test_command([tl.IAC + cmd, EOF_sigil])
365 # all at once
366 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000367 self.assertEqual('', telnet.read_sb_data())
Jack Diederich183028e2009-04-06 02:08:44 +0000368
369 def test_SB_commands(self):
370 # RFC 855, subnegotiations portion
371 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
372 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
373 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
374 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
375 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
376 EOF_sigil,
377 ]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000378 self.dataq.put(send)
Jack Diederich183028e2009-04-06 02:08:44 +0000379 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000380 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000381 nego = nego_collector(telnet.read_sb_data)
382 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000383 txt = telnet.read_all()
384 self.assertEqual(txt, '')
385 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
386 self.assertEqual(nego.sb_seen, want_sb_data)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000387 self.assertEqual('', telnet.read_sb_data())
Jack Diederich7f9bb912009-04-07 23:56:57 +0000388 nego.sb_getter = None # break the nego => telnet cycle
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000389
Gregory P. Smithe0c22202012-07-15 22:16:06 -0700390
391class ExpectTests(TestCase):
392 def setUp(self):
393 self.evt = threading.Event()
394 self.dataq = Queue.Queue()
395 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
396 self.sock.settimeout(10)
397 self.port = test_support.bind_port(self.sock)
398 self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
399 self.dataq))
400 self.thread.start()
401 self.evt.wait()
402
403 def tearDown(self):
404 self.thread.join()
405
406 # use a similar approach to testing timeouts as test_timeout.py
407 # these will never pass 100% but make the fuzz big enough that it is rare
408 block_long = 0.6
409 block_short = 0.3
410 def test_expect_A(self):
411 """
412 expect(expected, [timeout])
413 Read until the expected string has been seen, or a timeout is
414 hit (default is no timeout); may block.
415 """
416 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
417 self.dataq.put(want)
418 telnet = telnetlib.Telnet(HOST, self.port)
419 self.dataq.join()
420 (_,_,data) = telnet.expect(['match'])
421 self.assertEqual(data, ''.join(want[:-2]))
422
423 def test_expect_B(self):
424 # test the timeout - it does NOT raise socket.timeout
425 want = ['hello', self.block_long, 'not seen', EOF_sigil]
426 self.dataq.put(want)
427 telnet = telnetlib.Telnet(HOST, self.port)
428 self.dataq.join()
429 (_,_,data) = telnet.expect(['not seen'], self.block_short)
430 self.assertEqual(data, want[0])
431 self.assertEqual(telnet.read_all(), 'not seen')
432
433 def test_expect_with_poll(self):
434 """Use select.poll() to implement telnet.expect()."""
435 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
436 self.dataq.put(want)
437 telnet = telnetlib.Telnet(HOST, self.port)
438 if not telnet._has_poll:
439 raise unittest.SkipTest('select.poll() is required')
440 telnet._has_poll = True
441 self.dataq.join()
442 (_,_,data) = telnet.expect(['match'])
443 self.assertEqual(data, ''.join(want[:-2]))
444
445 def test_expect_with_select(self):
446 """Use select.select() to implement telnet.expect()."""
447 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
448 self.dataq.put(want)
449 telnet = telnetlib.Telnet(HOST, self.port)
450 telnet._has_poll = False
451 self.dataq.join()
452 (_,_,data) = telnet.expect(['match'])
453 self.assertEqual(data, ''.join(want[:-2]))
454
455
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000456def test_main(verbose=None):
Gregory P. Smithe0c22202012-07-15 22:16:06 -0700457 test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
458 ExpectTests)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000459
460if __name__ == '__main__':
461 test_main()