blob: 742bc4b75190462fd428ebb67f1f576c2fb845b2 [file] [log] [blame]
Guido van Rossumd8faa362007-04-27 19:54:29 +00001import socket
2import threading
3import telnetlib
4import time
Jack Diederich1c8f38c2009-04-10 05:33:26 +00005import queue
Jack Diederich36596a32009-07-26 22:23:04 +00006import sys
7import io
Guido van Rossumd8faa362007-04-27 19:54:29 +00008
9from unittest import TestCase
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Guido van Rossumd8faa362007-04-27 19:54:29 +000011
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012HOST = support.HOST
Jack Diederich1c8f38c2009-04-10 05:33:26 +000013EOF_sigil = object()
Guido van Rossumd8faa362007-04-27 19:54:29 +000014
Jack Diederich1c8f38c2009-04-10 05:33:26 +000015def server(evt, serv, dataq=None):
16 """ Open a tcp server in three steps
17 1) set evt to true to let the parent know we are ready
18 2) [optional] if is not False, write the list of data from dataq.get()
19 to the socket.
20 3) set evt to true to let the parent know we're done
21 """
Guido van Rossumd8faa362007-04-27 19:54:29 +000022 serv.listen(5)
Christian Heimesaf98da12008-01-27 15:18:18 +000023 evt.set()
Guido van Rossumd8faa362007-04-27 19:54:29 +000024 try:
25 conn, addr = serv.accept()
Jack Diederich1c8f38c2009-04-10 05:33:26 +000026 if dataq:
27 data = b''
28 new_data = dataq.get(True, 0.5)
29 dataq.task_done()
30 for item in new_data:
31 if item == EOF_sigil:
32 break
33 if type(item) in [int, float]:
34 time.sleep(item)
35 else:
36 data += item
37 written = conn.send(data)
38 data = data[written:]
Guido van Rossumd8faa362007-04-27 19:54:29 +000039 except socket.timeout:
40 pass
41 finally:
42 serv.close()
43 evt.set()
44
45class GeneralTests(TestCase):
46
47 def setUp(self):
48 self.evt = threading.Event()
Christian Heimes5e696852008-04-09 08:37:03 +000049 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.sock.settimeout(3)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000051 self.port = support.bind_port(self.sock)
Jack Diederich1c8f38c2009-04-10 05:33:26 +000052 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
53 self.thread.start()
Christian Heimesaf98da12008-01-27 15:18:18 +000054 self.evt.wait()
55 self.evt.clear()
56 time.sleep(.1)
Guido van Rossumd8faa362007-04-27 19:54:29 +000057
58 def tearDown(self):
59 self.evt.wait()
Jack Diederich1c8f38c2009-04-10 05:33:26 +000060 self.thread.join()
Guido van Rossumd8faa362007-04-27 19:54:29 +000061
62 def testBasic(self):
63 # connects
Christian Heimes5e696852008-04-09 08:37:03 +000064 telnet = telnetlib.Telnet(HOST, self.port)
Guido van Rossumd8faa362007-04-27 19:54:29 +000065 telnet.sock.close()
66
67 def testTimeoutDefault(self):
Georg Brandlf78e02b2008-06-10 17:40:04 +000068 self.assertTrue(socket.getdefaulttimeout() is None)
69 socket.setdefaulttimeout(30)
70 try:
71 telnet = telnetlib.Telnet("localhost", self.port)
72 finally:
73 socket.setdefaulttimeout(None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000074 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
76
77 def testTimeoutNone(self):
78 # None, having other default
Georg Brandlf78e02b2008-06-10 17:40:04 +000079 self.assertTrue(socket.getdefaulttimeout() is None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000080 socket.setdefaulttimeout(30)
81 try:
Christian Heimes5e696852008-04-09 08:37:03 +000082 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000083 finally:
Georg Brandlf78e02b2008-06-10 17:40:04 +000084 socket.setdefaulttimeout(None)
85 self.assertTrue(telnet.sock.gettimeout() is None)
86 telnet.sock.close()
87
88 def testTimeoutValue(self):
89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Guido van Rossumd8faa362007-04-27 19:54:29 +000090 self.assertEqual(telnet.sock.gettimeout(), 30)
91 telnet.sock.close()
92
Georg Brandlf78e02b2008-06-10 17:40:04 +000093 def testTimeoutOpen(self):
94 telnet = telnetlib.Telnet()
95 telnet.open("localhost", self.port, timeout=30)
96 self.assertEqual(telnet.sock.gettimeout(), 30)
97 telnet.sock.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +000098
Jack Diederich1c8f38c2009-04-10 05:33:26 +000099def _read_setUp(self):
100 self.evt = threading.Event()
101 self.dataq = queue.Queue()
102 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 self.sock.settimeout(3)
104 self.port = support.bind_port(self.sock)
105 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
106 self.thread.start()
107 self.evt.wait()
108 self.evt.clear()
109 time.sleep(.1)
110
111def _read_tearDown(self):
112 self.evt.wait()
113 self.thread.join()
114
115class ReadTests(TestCase):
116 setUp = _read_setUp
117 tearDown = _read_tearDown
118
119 # use a similar approach to testing timeouts as test_timeout.py
120 # these will never pass 100% but make the fuzz big enough that it is rare
121 block_long = 0.6
122 block_short = 0.3
123 def test_read_until_A(self):
124 """
125 read_until(expected, [timeout])
126 Read until the expected string has been seen, or a timeout is
127 hit (default is no timeout); may block.
128 """
129 want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil]
130 self.dataq.put(want)
131 telnet = telnetlib.Telnet(HOST, self.port)
132 self.dataq.join()
133 data = telnet.read_until(b'match')
134 self.assertEqual(data, b''.join(want[:-2]))
135
136 def test_read_until_B(self):
137 # test the timeout - it does NOT raise socket.timeout
138 want = [b'hello', self.block_long, b'not seen', EOF_sigil]
139 self.dataq.put(want)
140 telnet = telnetlib.Telnet(HOST, self.port)
141 self.dataq.join()
142 data = telnet.read_until(b'not seen', self.block_short)
143 self.assertEqual(data, want[0])
144 self.assertEqual(telnet.read_all(), b'not seen')
145
146 def test_read_all_A(self):
147 """
148 read_all()
149 Read all data until EOF; may block.
150 """
151 want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil]
152 self.dataq.put(want)
153 telnet = telnetlib.Telnet(HOST, self.port)
154 self.dataq.join()
155 data = telnet.read_all()
156 self.assertEqual(data, b''.join(want[:-1]))
157 return
158
159 def _test_blocking(self, func):
160 self.dataq.put([self.block_long, EOF_sigil])
161 self.dataq.join()
162 start = time.time()
163 data = func()
164 self.assertTrue(self.block_short <= time.time() - start)
165
166 def test_read_all_B(self):
167 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
168
169 def test_read_all_C(self):
170 self.dataq.put([EOF_sigil])
171 telnet = telnetlib.Telnet(HOST, self.port)
172 self.dataq.join()
173 telnet.read_all()
174 telnet.read_all() # shouldn't raise
175
176 def test_read_some_A(self):
177 """
178 read_some()
179 Read at least one byte or EOF; may block.
180 """
181 # test 'at least one byte'
182 want = [b'x' * 500, EOF_sigil]
183 self.dataq.put(want)
184 telnet = telnetlib.Telnet(HOST, self.port)
185 self.dataq.join()
186 data = telnet.read_all()
187 self.assertTrue(len(data) >= 1)
188
189 def test_read_some_B(self):
190 # test EOF
191 self.dataq.put([EOF_sigil])
192 telnet = telnetlib.Telnet(HOST, self.port)
193 self.dataq.join()
194 self.assertEqual(b'', telnet.read_some())
195
196 def test_read_some_C(self):
197 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
198
199 def _test_read_any_eager_A(self, func_name):
200 """
201 read_very_eager()
202 Read all data available already queued or on the socket,
203 without blocking.
204 """
205 want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil]
206 expects = want[1] + want[2]
207 self.dataq.put(want)
208 telnet = telnetlib.Telnet(HOST, self.port)
209 self.dataq.join()
210 func = getattr(telnet, func_name)
211 data = b''
212 while True:
213 try:
214 data += func()
215 self.assertTrue(expects.startswith(data))
216 except EOFError:
217 break
218 self.assertEqual(expects, data)
219
220 def _test_read_any_eager_B(self, func_name):
221 # test EOF
222 self.dataq.put([EOF_sigil])
223 telnet = telnetlib.Telnet(HOST, self.port)
224 self.dataq.join()
225 time.sleep(self.block_short)
226 func = getattr(telnet, func_name)
227 self.assertRaises(EOFError, func)
228
229 # read_eager and read_very_eager make the same gaurantees
230 # (they behave differently but we only test the gaurantees)
231 def test_read_very_eager_A(self):
232 self._test_read_any_eager_A('read_very_eager')
233 def test_read_very_eager_B(self):
234 self._test_read_any_eager_B('read_very_eager')
235 def test_read_eager_A(self):
236 self._test_read_any_eager_A('read_eager')
237 def test_read_eager_B(self):
238 self._test_read_any_eager_B('read_eager')
239 # NB -- we need to test the IAC block which is mentioned in the docstring
240 # but not in the module docs
241
242 def _test_read_any_lazy_B(self, func_name):
243 self.dataq.put([EOF_sigil])
244 telnet = telnetlib.Telnet(HOST, self.port)
245 self.dataq.join()
246 func = getattr(telnet, func_name)
247 telnet.fill_rawq()
248 self.assertRaises(EOFError, func)
249
250 def test_read_lazy_A(self):
251 want = [b'x' * 100, EOF_sigil]
252 self.dataq.put(want)
253 telnet = telnetlib.Telnet(HOST, self.port)
254 self.dataq.join()
255 time.sleep(self.block_short)
256 self.assertEqual(b'', telnet.read_lazy())
257 data = b''
258 while True:
259 try:
260 read_data = telnet.read_lazy()
261 data += read_data
262 if not read_data:
263 telnet.fill_rawq()
264 except EOFError:
265 break
266 self.assertTrue(want[0].startswith(data))
267 self.assertEqual(data, want[0])
268
269 def test_read_lazy_B(self):
270 self._test_read_any_lazy_B('read_lazy')
271
272 def test_read_very_lazy_A(self):
273 want = [b'x' * 100, EOF_sigil]
274 self.dataq.put(want)
275 telnet = telnetlib.Telnet(HOST, self.port)
276 self.dataq.join()
277 time.sleep(self.block_short)
278 self.assertEqual(b'', telnet.read_very_lazy())
279 data = b''
280 while True:
281 try:
282 read_data = telnet.read_very_lazy()
283 except EOFError:
284 break
285 data += read_data
286 if not read_data:
287 telnet.fill_rawq()
288 self.assertEqual(b'', telnet.cookedq)
289 telnet.process_rawq()
290 self.assertTrue(want[0].startswith(data))
291 self.assertEqual(data, want[0])
292
293 def test_read_very_lazy_B(self):
294 self._test_read_any_lazy_B('read_very_lazy')
295
296class nego_collector(object):
297 def __init__(self, sb_getter=None):
298 self.seen = b''
299 self.sb_getter = sb_getter
300 self.sb_seen = b''
301
302 def do_nego(self, sock, cmd, opt):
303 self.seen += cmd + opt
304 if cmd == tl.SE and self.sb_getter:
305 sb_data = self.sb_getter()
306 self.sb_seen += sb_data
307
Jack Diederichf31f7bb2009-09-03 20:37:58 +0000308class SocketProxy(object):
309 ''' a socket proxy that re-defines sendall() '''
310 def __init__(self, real_sock):
311 self.socket = real_sock
312 self._raw_sent = b''
313 def __getattr__(self, k):
314 return getattr(self.socket, k)
315 def sendall(self, data):
316 self._raw_sent += data
317 self.socket.sendall(data)
318
319class TelnetSockSendall(telnetlib.Telnet):
320 def open(self, *args, **opts):
321 telnetlib.Telnet.open(self, *args, **opts)
322 self.sock = SocketProxy(self.sock)
323
324class WriteTests(TestCase):
325 '''The only thing that write does is replace each tl.IAC for
326 tl.IAC+tl.IAC'''
327 setUp = _read_setUp
328 tearDown = _read_tearDown
329
330 def _test_write(self, data):
331 self.telnet.sock._raw_sent = b''
332 self.telnet.write(data)
333 after_write = self.telnet.sock._raw_sent
334 self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC),
335 after_write)
336
337 def test_write(self):
338 self.telnet = TelnetSockSendall()
339 data_sample = [b'data sample without IAC',
340 b'data sample with' + tl.IAC + b' one IAC',
341 b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
342 tl.IAC,
343 b'']
344 self.telnet.open(HOST, self.port)
345 for d in data_sample:
346 self.dataq.put([b''])
347 self._test_write(d)
348 self.telnet.close()
349
Jack Diederich1c8f38c2009-04-10 05:33:26 +0000350tl = telnetlib
Jack Diederich36596a32009-07-26 22:23:04 +0000351
352class TelnetDebuglevel(tl.Telnet):
353 ''' Telnet-alike that captures messages written to stdout when
354 debuglevel > 0
355 '''
356 _messages = ''
357 def msg(self, msg, *args):
358 orig_stdout = sys.stdout
359 sys.stdout = fake_stdout = io.StringIO()
360 tl.Telnet.msg(self, msg, *args)
361 self._messages += fake_stdout.getvalue()
362 sys.stdout = orig_stdout
363 return
364
Jack Diederich1c8f38c2009-04-10 05:33:26 +0000365class OptionTests(TestCase):
366 setUp = _read_setUp
367 tearDown = _read_tearDown
368 # RFC 854 commands
369 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
370
371 def _test_command(self, data):
372 """ helper for testing IAC + cmd """
373 self.setUp()
374 self.dataq.put(data)
375 telnet = telnetlib.Telnet(HOST, self.port)
376 self.dataq.join()
377 nego = nego_collector()
378 telnet.set_option_negotiation_callback(nego.do_nego)
379 txt = telnet.read_all()
380 cmd = nego.seen
381 self.assertTrue(len(cmd) > 0) # we expect at least one command
382 self.assertTrue(cmd[:1] in self.cmds)
383 self.assertEqual(cmd[1:2], tl.NOOPT)
384 self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd))
385 nego.sb_getter = None # break the nego => telnet cycle
386 self.tearDown()
387
388 def test_IAC_commands(self):
389 # reset our setup
390 self.dataq.put([EOF_sigil])
391 telnet = telnetlib.Telnet(HOST, self.port)
392 self.dataq.join()
393 self.tearDown()
394
395 for cmd in self.cmds:
396 self._test_command([tl.IAC, cmd, EOF_sigil])
397 self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil])
398 self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil])
399 # all at once
400 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
401 self.assertEqual(b'', telnet.read_sb_data())
402
403 def test_SB_commands(self):
404 # RFC 855, subnegotiations portion
405 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
406 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
407 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
408 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
409 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
410 EOF_sigil,
411 ]
412 self.dataq.put(send)
413 telnet = telnetlib.Telnet(HOST, self.port)
414 self.dataq.join()
415 nego = nego_collector(telnet.read_sb_data)
416 telnet.set_option_negotiation_callback(nego.do_nego)
417 txt = telnet.read_all()
418 self.assertEqual(txt, b'')
419 want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
420 self.assertEqual(nego.sb_seen, want_sb_data)
421 self.assertEqual(b'', telnet.read_sb_data())
422 nego.sb_getter = None # break the nego => telnet cycle
Guido van Rossumd8faa362007-04-27 19:54:29 +0000423
Jack Diederich36596a32009-07-26 22:23:04 +0000424 def _test_debuglevel(self, data, expected_msg):
425 """ helper for testing debuglevel messages """
426 self.setUp()
Jack Diederichf31f7bb2009-09-03 20:37:58 +0000427 self.dataq.put(data + [EOF_sigil])
Jack Diederich36596a32009-07-26 22:23:04 +0000428 telnet = TelnetDebuglevel(HOST, self.port)
429 telnet.set_debuglevel(1)
430 self.dataq.join()
431 txt = telnet.read_all()
432 self.assertTrue(expected_msg in telnet._messages,
433 msg=(telnet._messages, expected_msg))
Jack Diederichf31f7bb2009-09-03 20:37:58 +0000434 telnet.close()
Jack Diederich36596a32009-07-26 22:23:04 +0000435 self.tearDown()
436
Jack Diederichf31f7bb2009-09-03 20:37:58 +0000437 def test_debuglevel_reads(self):
Jack Diederich36596a32009-07-26 22:23:04 +0000438 # test all the various places that self.msg(...) is called
439 given_a_expect_b = [
440 # Telnet.fill_rawq
441 (b'a', ": recv b''\n"),
442 # Telnet.process_rawq
443 (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
444 (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
445 (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
446 (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
447 (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
Jack Diederich36596a32009-07-26 22:23:04 +0000448 ]
449 for a, b in given_a_expect_b:
450 self._test_debuglevel([a, EOF_sigil], b)
451 return
452
Jack Diederichf31f7bb2009-09-03 20:37:58 +0000453 def test_debuglevel_write(self):
454 self.setUp()
455 telnet = TelnetDebuglevel(HOST, self.port)
456 telnet.set_debuglevel(1)
457 self.dataq.put([b'', EOF_sigil])
458 self.dataq.join()
459 telnet.write(b'xxx')
460 expected = "send b'xxx'\n"
461 self.assertTrue(expected in telnet._messages,
462 msg=(telnet._messages, expected))
463 telnet.close()
464 self.tearDown()
465
Guido van Rossumd8faa362007-04-27 19:54:29 +0000466def test_main(verbose=None):
Jack Diederichf31f7bb2009-09-03 20:37:58 +0000467 support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000468
469if __name__ == '__main__':
470 test_main()