blob: 662c7151cf08e25f58608c184d195a267dbcae70 [file] [log] [blame]
Guido van Rossumd8faa362007-04-27 19:54:29 +00001import socket
2import threading
3import telnetlib
4import time
Jack Diederich1c8f38c2009-04-10 05:33:26 +00005import queue
Georg Brandle6700e92009-08-13 08:39:33 +00006import sys
7import io
Guido van Rossumd8faa362007-04-27 19:54:29 +00008
9from unittest import TestCase
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Guido van Rossumd8faa362007-04-27 19:54:29 +000011
Benjamin Petersonee8712c2008-05-20 21:35:26 +000012HOST = support.HOST
Jack Diederich1c8f38c2009-04-10 05:33:26 +000013EOF_sigil = object()
Guido van Rossumd8faa362007-04-27 19:54:29 +000014
Jack Diederich1c8f38c2009-04-10 05:33:26 +000015def server(evt, serv, dataq=None):
16 """ Open a tcp server in three steps
17 1) set evt to true to let the parent know we are ready
18 2) [optional] if is not False, write the list of data from dataq.get()
19 to the socket.
20 3) set evt to true to let the parent know we're done
21 """
Guido van Rossumd8faa362007-04-27 19:54:29 +000022 serv.listen(5)
Christian Heimesaf98da12008-01-27 15:18:18 +000023 evt.set()
Guido van Rossumd8faa362007-04-27 19:54:29 +000024 try:
25 conn, addr = serv.accept()
Jack Diederich1c8f38c2009-04-10 05:33:26 +000026 if dataq:
27 data = b''
28 new_data = dataq.get(True, 0.5)
29 dataq.task_done()
30 for item in new_data:
31 if item == EOF_sigil:
32 break
33 if type(item) in [int, float]:
34 time.sleep(item)
35 else:
36 data += item
37 written = conn.send(data)
38 data = data[written:]
Guido van Rossumd8faa362007-04-27 19:54:29 +000039 except socket.timeout:
40 pass
41 finally:
42 serv.close()
43 evt.set()
44
45class GeneralTests(TestCase):
46
47 def setUp(self):
48 self.evt = threading.Event()
Christian Heimes5e696852008-04-09 08:37:03 +000049 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.sock.settimeout(3)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000051 self.port = support.bind_port(self.sock)
Jack Diederich1c8f38c2009-04-10 05:33:26 +000052 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
53 self.thread.start()
Christian Heimesaf98da12008-01-27 15:18:18 +000054 self.evt.wait()
55 self.evt.clear()
56 time.sleep(.1)
Guido van Rossumd8faa362007-04-27 19:54:29 +000057
58 def tearDown(self):
59 self.evt.wait()
Jack Diederich1c8f38c2009-04-10 05:33:26 +000060 self.thread.join()
Guido van Rossumd8faa362007-04-27 19:54:29 +000061
62 def testBasic(self):
63 # connects
Christian Heimes5e696852008-04-09 08:37:03 +000064 telnet = telnetlib.Telnet(HOST, self.port)
Guido van Rossumd8faa362007-04-27 19:54:29 +000065 telnet.sock.close()
66
67 def testTimeoutDefault(self):
Georg Brandlf78e02b2008-06-10 17:40:04 +000068 self.assertTrue(socket.getdefaulttimeout() is None)
69 socket.setdefaulttimeout(30)
70 try:
71 telnet = telnetlib.Telnet("localhost", self.port)
72 finally:
73 socket.setdefaulttimeout(None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000074 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
76
77 def testTimeoutNone(self):
78 # None, having other default
Georg Brandlf78e02b2008-06-10 17:40:04 +000079 self.assertTrue(socket.getdefaulttimeout() is None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000080 socket.setdefaulttimeout(30)
81 try:
Christian Heimes5e696852008-04-09 08:37:03 +000082 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000083 finally:
Georg Brandlf78e02b2008-06-10 17:40:04 +000084 socket.setdefaulttimeout(None)
85 self.assertTrue(telnet.sock.gettimeout() is None)
86 telnet.sock.close()
87
88 def testTimeoutValue(self):
89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Guido van Rossumd8faa362007-04-27 19:54:29 +000090 self.assertEqual(telnet.sock.gettimeout(), 30)
91 telnet.sock.close()
92
Georg Brandlf78e02b2008-06-10 17:40:04 +000093 def testTimeoutOpen(self):
94 telnet = telnetlib.Telnet()
95 telnet.open("localhost", self.port, timeout=30)
96 self.assertEqual(telnet.sock.gettimeout(), 30)
97 telnet.sock.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +000098
Jack Diederich1c8f38c2009-04-10 05:33:26 +000099def _read_setUp(self):
100 self.evt = threading.Event()
101 self.dataq = queue.Queue()
102 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 self.sock.settimeout(3)
104 self.port = support.bind_port(self.sock)
105 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
106 self.thread.start()
107 self.evt.wait()
108 self.evt.clear()
109 time.sleep(.1)
110
111def _read_tearDown(self):
112 self.evt.wait()
113 self.thread.join()
114
115class ReadTests(TestCase):
116 setUp = _read_setUp
117 tearDown = _read_tearDown
118
119 # use a similar approach to testing timeouts as test_timeout.py
120 # these will never pass 100% but make the fuzz big enough that it is rare
121 block_long = 0.6
122 block_short = 0.3
123 def test_read_until_A(self):
124 """
125 read_until(expected, [timeout])
126 Read until the expected string has been seen, or a timeout is
127 hit (default is no timeout); may block.
128 """
129 want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil]
130 self.dataq.put(want)
131 telnet = telnetlib.Telnet(HOST, self.port)
132 self.dataq.join()
133 data = telnet.read_until(b'match')
134 self.assertEqual(data, b''.join(want[:-2]))
135
136 def test_read_until_B(self):
137 # test the timeout - it does NOT raise socket.timeout
138 want = [b'hello', self.block_long, b'not seen', EOF_sigil]
139 self.dataq.put(want)
140 telnet = telnetlib.Telnet(HOST, self.port)
141 self.dataq.join()
142 data = telnet.read_until(b'not seen', self.block_short)
143 self.assertEqual(data, want[0])
144 self.assertEqual(telnet.read_all(), b'not seen')
145
146 def test_read_all_A(self):
147 """
148 read_all()
149 Read all data until EOF; may block.
150 """
151 want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil]
152 self.dataq.put(want)
153 telnet = telnetlib.Telnet(HOST, self.port)
154 self.dataq.join()
155 data = telnet.read_all()
156 self.assertEqual(data, b''.join(want[:-1]))
157 return
158
159 def _test_blocking(self, func):
160 self.dataq.put([self.block_long, EOF_sigil])
161 self.dataq.join()
162 start = time.time()
163 data = func()
164 self.assertTrue(self.block_short <= time.time() - start)
165
166 def test_read_all_B(self):
167 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
168
169 def test_read_all_C(self):
170 self.dataq.put([EOF_sigil])
171 telnet = telnetlib.Telnet(HOST, self.port)
172 self.dataq.join()
173 telnet.read_all()
174 telnet.read_all() # shouldn't raise
175
176 def test_read_some_A(self):
177 """
178 read_some()
179 Read at least one byte or EOF; may block.
180 """
181 # test 'at least one byte'
182 want = [b'x' * 500, EOF_sigil]
183 self.dataq.put(want)
184 telnet = telnetlib.Telnet(HOST, self.port)
185 self.dataq.join()
186 data = telnet.read_all()
187 self.assertTrue(len(data) >= 1)
188
189 def test_read_some_B(self):
190 # test EOF
191 self.dataq.put([EOF_sigil])
192 telnet = telnetlib.Telnet(HOST, self.port)
193 self.dataq.join()
194 self.assertEqual(b'', telnet.read_some())
195
196 def test_read_some_C(self):
197 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
198
199 def _test_read_any_eager_A(self, func_name):
200 """
201 read_very_eager()
202 Read all data available already queued or on the socket,
203 without blocking.
204 """
205 want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil]
206 expects = want[1] + want[2]
207 self.dataq.put(want)
208 telnet = telnetlib.Telnet(HOST, self.port)
209 self.dataq.join()
210 func = getattr(telnet, func_name)
211 data = b''
212 while True:
213 try:
214 data += func()
215 self.assertTrue(expects.startswith(data))
216 except EOFError:
217 break
218 self.assertEqual(expects, data)
219
220 def _test_read_any_eager_B(self, func_name):
221 # test EOF
222 self.dataq.put([EOF_sigil])
223 telnet = telnetlib.Telnet(HOST, self.port)
224 self.dataq.join()
225 time.sleep(self.block_short)
226 func = getattr(telnet, func_name)
227 self.assertRaises(EOFError, func)
228
229 # read_eager and read_very_eager make the same gaurantees
230 # (they behave differently but we only test the gaurantees)
231 def test_read_very_eager_A(self):
232 self._test_read_any_eager_A('read_very_eager')
233 def test_read_very_eager_B(self):
234 self._test_read_any_eager_B('read_very_eager')
235 def test_read_eager_A(self):
236 self._test_read_any_eager_A('read_eager')
237 def test_read_eager_B(self):
238 self._test_read_any_eager_B('read_eager')
239 # NB -- we need to test the IAC block which is mentioned in the docstring
240 # but not in the module docs
241
242 def _test_read_any_lazy_B(self, func_name):
243 self.dataq.put([EOF_sigil])
244 telnet = telnetlib.Telnet(HOST, self.port)
245 self.dataq.join()
246 func = getattr(telnet, func_name)
247 telnet.fill_rawq()
248 self.assertRaises(EOFError, func)
249
250 def test_read_lazy_A(self):
251 want = [b'x' * 100, EOF_sigil]
252 self.dataq.put(want)
253 telnet = telnetlib.Telnet(HOST, self.port)
254 self.dataq.join()
255 time.sleep(self.block_short)
256 self.assertEqual(b'', telnet.read_lazy())
257 data = b''
258 while True:
259 try:
260 read_data = telnet.read_lazy()
261 data += read_data
262 if not read_data:
263 telnet.fill_rawq()
264 except EOFError:
265 break
266 self.assertTrue(want[0].startswith(data))
267 self.assertEqual(data, want[0])
268
269 def test_read_lazy_B(self):
270 self._test_read_any_lazy_B('read_lazy')
271
272 def test_read_very_lazy_A(self):
273 want = [b'x' * 100, EOF_sigil]
274 self.dataq.put(want)
275 telnet = telnetlib.Telnet(HOST, self.port)
276 self.dataq.join()
277 time.sleep(self.block_short)
278 self.assertEqual(b'', telnet.read_very_lazy())
279 data = b''
280 while True:
281 try:
282 read_data = telnet.read_very_lazy()
283 except EOFError:
284 break
285 data += read_data
286 if not read_data:
287 telnet.fill_rawq()
288 self.assertEqual(b'', telnet.cookedq)
289 telnet.process_rawq()
290 self.assertTrue(want[0].startswith(data))
291 self.assertEqual(data, want[0])
292
293 def test_read_very_lazy_B(self):
294 self._test_read_any_lazy_B('read_very_lazy')
295
296class nego_collector(object):
297 def __init__(self, sb_getter=None):
298 self.seen = b''
299 self.sb_getter = sb_getter
300 self.sb_seen = b''
301
302 def do_nego(self, sock, cmd, opt):
303 self.seen += cmd + opt
304 if cmd == tl.SE and self.sb_getter:
305 sb_data = self.sb_getter()
306 self.sb_seen += sb_data
307
308tl = telnetlib
Georg Brandle6700e92009-08-13 08:39:33 +0000309
310class TelnetDebuglevel(tl.Telnet):
311 ''' Telnet-alike that captures messages written to stdout when
312 debuglevel > 0
313 '''
314 _messages = ''
315 def msg(self, msg, *args):
316 orig_stdout = sys.stdout
317 sys.stdout = fake_stdout = io.StringIO()
318 tl.Telnet.msg(self, msg, *args)
319 self._messages += fake_stdout.getvalue()
320 sys.stdout = orig_stdout
321 return
322
Jack Diederich1c8f38c2009-04-10 05:33:26 +0000323class OptionTests(TestCase):
324 setUp = _read_setUp
325 tearDown = _read_tearDown
326 # RFC 854 commands
327 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
328
329 def _test_command(self, data):
330 """ helper for testing IAC + cmd """
331 self.setUp()
332 self.dataq.put(data)
333 telnet = telnetlib.Telnet(HOST, self.port)
334 self.dataq.join()
335 nego = nego_collector()
336 telnet.set_option_negotiation_callback(nego.do_nego)
337 txt = telnet.read_all()
338 cmd = nego.seen
339 self.assertTrue(len(cmd) > 0) # we expect at least one command
340 self.assertTrue(cmd[:1] in self.cmds)
341 self.assertEqual(cmd[1:2], tl.NOOPT)
342 self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd))
343 nego.sb_getter = None # break the nego => telnet cycle
344 self.tearDown()
345
346 def test_IAC_commands(self):
347 # reset our setup
348 self.dataq.put([EOF_sigil])
349 telnet = telnetlib.Telnet(HOST, self.port)
350 self.dataq.join()
351 self.tearDown()
352
353 for cmd in self.cmds:
354 self._test_command([tl.IAC, cmd, EOF_sigil])
355 self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil])
356 self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil])
357 # all at once
358 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
359 self.assertEqual(b'', telnet.read_sb_data())
360
361 def test_SB_commands(self):
362 # RFC 855, subnegotiations portion
363 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
364 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
365 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
366 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
367 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
368 EOF_sigil,
369 ]
370 self.dataq.put(send)
371 telnet = telnetlib.Telnet(HOST, self.port)
372 self.dataq.join()
373 nego = nego_collector(telnet.read_sb_data)
374 telnet.set_option_negotiation_callback(nego.do_nego)
375 txt = telnet.read_all()
376 self.assertEqual(txt, b'')
377 want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
378 self.assertEqual(nego.sb_seen, want_sb_data)
379 self.assertEqual(b'', telnet.read_sb_data())
380 nego.sb_getter = None # break the nego => telnet cycle
Guido van Rossumd8faa362007-04-27 19:54:29 +0000381
Georg Brandle6700e92009-08-13 08:39:33 +0000382 def _test_debuglevel(self, data, expected_msg):
383 """ helper for testing debuglevel messages """
384 self.setUp()
385 self.dataq.put(data)
386 telnet = TelnetDebuglevel(HOST, self.port)
387 telnet.set_debuglevel(1)
388 self.dataq.join()
389 txt = telnet.read_all()
390 self.assertTrue(expected_msg in telnet._messages,
391 msg=(telnet._messages, expected_msg))
392 self.tearDown()
393
394 def test_debuglevel(self):
395 # test all the various places that self.msg(...) is called
396 given_a_expect_b = [
397 # Telnet.fill_rawq
398 (b'a', ": recv b''\n"),
399 # Telnet.process_rawq
400 (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
401 (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
402 (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
403 (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
404 (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
405 # Telnet.write
406 # XXX, untested
407 ]
408 for a, b in given_a_expect_b:
409 self._test_debuglevel([a, EOF_sigil], b)
410 return
411
Guido van Rossumd8faa362007-04-27 19:54:29 +0000412def test_main(verbose=None):
Jack Diederich1c8f38c2009-04-10 05:33:26 +0000413 support.run_unittest(GeneralTests, ReadTests, OptionTests)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000414
415if __name__ == '__main__':
416 test_main()