blob: e40c1fe42180f499a7eae5cef5ac52309b01a461 [file] [log] [blame]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00001import socket
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00002import telnetlib
3import time
Jack Diederich183028e2009-04-06 02:08:44 +00004import Queue
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00005
6from unittest import TestCase
7from test import test_support
Victor Stinner6a102812010-04-27 23:55:59 +00008threading = test_support.import_module('threading')
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00009
Trent Nelsone41b0062008-04-08 23:47:30 +000010HOST = test_support.HOST
Jack Diederich183028e2009-04-06 02:08:44 +000011EOF_sigil = object()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000012
Jack Diederich183028e2009-04-06 02:08:44 +000013def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
Jack Diederich3b2312e2009-04-07 20:22:59 +000016 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
Jack Diederich183028e2009-04-06 02:08:44 +000018 3) set evt to true to let the parent know we're done
19 """
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000020 serv.listen(5)
Neal Norwitz37184292008-01-26 21:21:59 +000021 evt.set()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich183028e2009-04-06 02:08:44 +000024 if dataq:
25 data = ''
Jack Diederich7f9bb912009-04-07 23:56:57 +000026 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
Jack Diederich3b2312e2009-04-07 20:22:59 +000030 break
Jack Diederich7f9bb912009-04-07 23:56:57 +000031 if type(item) in [int, float]:
32 time.sleep(item)
Jack Diederich3b2312e2009-04-07 20:22:59 +000033 else:
Jack Diederich7f9bb912009-04-07 23:56:57 +000034 data += item
Jack Diederich183028e2009-04-06 02:08:44 +000035 written = conn.send(data)
36 data = data[written:]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000037 except socket.timeout:
38 pass
Antoine Pitrou22627662011-03-01 00:48:40 +000039 else:
40 conn.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000041 finally:
42 serv.close()
43 evt.set()
44
45class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000046
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000047 def setUp(self):
48 self.evt = threading.Event()
Trent Nelsone41b0062008-04-08 23:47:30 +000049 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.sock.settimeout(3)
51 self.port = test_support.bind_port(self.sock)
Jack Diederich183028e2009-04-06 02:08:44 +000052 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
53 self.thread.start()
Neal Norwitz37184292008-01-26 21:21:59 +000054 self.evt.wait()
55 self.evt.clear()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000056 time.sleep(.1)
57
58 def tearDown(self):
59 self.evt.wait()
Jack Diederich183028e2009-04-06 02:08:44 +000060 self.thread.join()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000061
62 def testBasic(self):
63 # connects
Trent Nelsone41b0062008-04-08 23:47:30 +000064 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000065 telnet.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000066
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000067 def testTimeoutDefault(self):
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000068 self.assertTrue(socket.getdefaulttimeout() is None)
69 socket.setdefaulttimeout(30)
70 try:
71 telnet = telnetlib.Telnet("localhost", self.port)
72 finally:
73 socket.setdefaulttimeout(None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000074 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
76
77 def testTimeoutNone(self):
78 # None, having other default
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000079 self.assertTrue(socket.getdefaulttimeout() is None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000080 socket.setdefaulttimeout(30)
81 try:
Trent Nelsone41b0062008-04-08 23:47:30 +000082 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000083 finally:
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000084 socket.setdefaulttimeout(None)
85 self.assertTrue(telnet.sock.gettimeout() is None)
86 telnet.sock.close()
87
88 def testTimeoutValue(self):
89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000090 self.assertEqual(telnet.sock.gettimeout(), 30)
91 telnet.sock.close()
92
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000093 def testTimeoutOpen(self):
94 telnet = telnetlib.Telnet()
95 telnet.open("localhost", self.port, timeout=30)
96 self.assertEqual(telnet.sock.gettimeout(), 30)
97 telnet.sock.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000098
Jack Diederich183028e2009-04-06 02:08:44 +000099def _read_setUp(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000100 self.evt = threading.Event()
101 self.dataq = Queue.Queue()
102 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 self.sock.settimeout(3)
104 self.port = test_support.bind_port(self.sock)
105 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
106 self.thread.start()
107 self.evt.wait()
108 self.evt.clear()
109 time.sleep(.1)
110
111def _read_tearDown(self):
112 self.evt.wait()
113 self.thread.join()
114
Jack Diederich183028e2009-04-06 02:08:44 +0000115class ReadTests(TestCase):
116 setUp = _read_setUp
117 tearDown = _read_tearDown
118
Jack Diederich7f9bb912009-04-07 23:56:57 +0000119 # use a similar approach to testing timeouts as test_timeout.py
120 # these will never pass 100% but make the fuzz big enough that it is rare
121 block_long = 0.6
122 block_short = 0.3
Jack Diederich183028e2009-04-06 02:08:44 +0000123 def test_read_until_A(self):
124 """
125 read_until(expected, [timeout])
126 Read until the expected string has been seen, or a timeout is
127 hit (default is no timeout); may block.
128 """
129 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000130 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000131 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000132 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000133 data = telnet.read_until('match')
134 self.assertEqual(data, ''.join(want[:-2]))
135
136 def test_read_until_B(self):
137 # test the timeout - it does NOT raise socket.timeout
Jack Diederich7f9bb912009-04-07 23:56:57 +0000138 want = ['hello', self.block_long, 'not seen', EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000139 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000140 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000141 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000142 data = telnet.read_until('not seen', self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000143 self.assertEqual(data, want[0])
Jack Diederich7f9bb912009-04-07 23:56:57 +0000144 self.assertEqual(telnet.read_all(), 'not seen')
Jack Diederich183028e2009-04-06 02:08:44 +0000145
146 def test_read_all_A(self):
147 """
148 read_all()
149 Read all data until EOF; may block.
150 """
151 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000152 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000153 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000154 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000155 data = telnet.read_all()
156 self.assertEqual(data, ''.join(want[:-1]))
157 return
158
Jack Diederich3b2312e2009-04-07 20:22:59 +0000159 def _test_blocking(self, func):
Jack Diederich7f9bb912009-04-07 23:56:57 +0000160 self.dataq.put([self.block_long, EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000161 self.dataq.join()
162 start = time.time()
163 data = func()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000164 self.assertTrue(self.block_short <= time.time() - start)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000165
Jack Diederich183028e2009-04-06 02:08:44 +0000166 def test_read_all_B(self):
167 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
168
Jack Diederich3b2312e2009-04-07 20:22:59 +0000169 def test_read_all_C(self):
170 self.dataq.put([EOF_sigil])
171 telnet = telnetlib.Telnet(HOST, self.port)
172 self.dataq.join()
173 telnet.read_all()
174 telnet.read_all() # shouldn't raise
175
Jack Diederich183028e2009-04-06 02:08:44 +0000176 def test_read_some_A(self):
177 """
178 read_some()
179 Read at least one byte or EOF; may block.
180 """
181 # test 'at least one byte'
182 want = ['x' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000183 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000184 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000185 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000186 data = telnet.read_all()
187 self.assertTrue(len(data) >= 1)
188
189 def test_read_some_B(self):
190 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000191 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000192 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000193 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000194 self.assertEqual('', telnet.read_some())
195
Jack Diederich3b2312e2009-04-07 20:22:59 +0000196 def test_read_some_C(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000197 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
198
199 def _test_read_any_eager_A(self, func_name):
200 """
201 read_very_eager()
202 Read all data available already queued or on the socket,
203 without blocking.
204 """
Jack Diederich7f9bb912009-04-07 23:56:57 +0000205 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
206 expects = want[1] + want[2]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000207 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000208 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000209 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000210 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000211 data = ''
212 while True:
213 try:
214 data += func()
215 self.assertTrue(expects.startswith(data))
Jack Diederich183028e2009-04-06 02:08:44 +0000216 except EOFError:
217 break
218 self.assertEqual(expects, data)
219
220 def _test_read_any_eager_B(self, func_name):
221 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000222 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000223 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000224 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000225 time.sleep(self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000226 func = getattr(telnet, func_name)
227 self.assertRaises(EOFError, func)
228
229 # read_eager and read_very_eager make the same gaurantees
230 # (they behave differently but we only test the gaurantees)
231 def test_read_very_eager_A(self):
232 self._test_read_any_eager_A('read_very_eager')
233 def test_read_very_eager_B(self):
234 self._test_read_any_eager_B('read_very_eager')
235 def test_read_eager_A(self):
236 self._test_read_any_eager_A('read_eager')
237 def test_read_eager_B(self):
238 self._test_read_any_eager_B('read_eager')
239 # NB -- we need to test the IAC block which is mentioned in the docstring
240 # but not in the module docs
241
Jack Diederich183028e2009-04-06 02:08:44 +0000242 def _test_read_any_lazy_B(self, func_name):
Jack Diederich3b2312e2009-04-07 20:22:59 +0000243 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000244 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000245 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000246 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000247 telnet.fill_rawq()
248 self.assertRaises(EOFError, func)
249
Jack Diederich7f9bb912009-04-07 23:56:57 +0000250 def test_read_lazy_A(self):
251 want = ['x' * 100, EOF_sigil]
252 self.dataq.put(want)
253 telnet = telnetlib.Telnet(HOST, self.port)
254 self.dataq.join()
255 time.sleep(self.block_short)
256 self.assertEqual('', telnet.read_lazy())
257 data = ''
258 while True:
259 try:
260 read_data = telnet.read_lazy()
261 data += read_data
262 if not read_data:
263 telnet.fill_rawq()
264 except EOFError:
265 break
266 self.assertTrue(want[0].startswith(data))
267 self.assertEqual(data, want[0])
268
Jack Diederich183028e2009-04-06 02:08:44 +0000269 def test_read_lazy_B(self):
270 self._test_read_any_lazy_B('read_lazy')
271
Jack Diederich7f9bb912009-04-07 23:56:57 +0000272 def test_read_very_lazy_A(self):
273 want = ['x' * 100, EOF_sigil]
274 self.dataq.put(want)
275 telnet = telnetlib.Telnet(HOST, self.port)
276 self.dataq.join()
277 time.sleep(self.block_short)
278 self.assertEqual('', telnet.read_very_lazy())
279 data = ''
280 while True:
281 try:
282 read_data = telnet.read_very_lazy()
283 except EOFError:
284 break
285 data += read_data
286 if not read_data:
287 telnet.fill_rawq()
288 self.assertEqual('', telnet.cookedq)
289 telnet.process_rawq()
290 self.assertTrue(want[0].startswith(data))
291 self.assertEqual(data, want[0])
292
293 def test_read_very_lazy_B(self):
294 self._test_read_any_lazy_B('read_very_lazy')
295
Jack Diederich183028e2009-04-06 02:08:44 +0000296class nego_collector(object):
297 def __init__(self, sb_getter=None):
298 self.seen = ''
299 self.sb_getter = sb_getter
300 self.sb_seen = ''
301
302 def do_nego(self, sock, cmd, opt):
303 self.seen += cmd + opt
304 if cmd == tl.SE and self.sb_getter:
305 sb_data = self.sb_getter()
306 self.sb_seen += sb_data
307
308tl = telnetlib
309class OptionTests(TestCase):
310 setUp = _read_setUp
311 tearDown = _read_tearDown
312 # RFC 854 commands
313 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
314
315 def _test_command(self, data):
316 """ helper for testing IAC + cmd """
317 self.setUp()
Jack Diederich3b2312e2009-04-07 20:22:59 +0000318 self.dataq.put(data)
Jack Diederich183028e2009-04-06 02:08:44 +0000319 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000320 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000321 nego = nego_collector()
322 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000323 txt = telnet.read_all()
324 cmd = nego.seen
325 self.assertTrue(len(cmd) > 0) # we expect at least one command
Ezio Melottiaa980582010-01-23 23:04:36 +0000326 self.assertIn(cmd[0], self.cmds)
Jack Diederich183028e2009-04-06 02:08:44 +0000327 self.assertEqual(cmd[1], tl.NOOPT)
328 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
Jack Diederich7f9bb912009-04-07 23:56:57 +0000329 nego.sb_getter = None # break the nego => telnet cycle
Jack Diederich183028e2009-04-06 02:08:44 +0000330 self.tearDown()
331
332 def test_IAC_commands(self):
333 # reset our setup
Jack Diederich3b2312e2009-04-07 20:22:59 +0000334 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000335 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000336 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000337 self.tearDown()
338
339 for cmd in self.cmds:
340 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
341 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
342 self._test_command([tl.IAC + cmd, EOF_sigil])
343 # all at once
344 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000345 self.assertEqual('', telnet.read_sb_data())
Jack Diederich183028e2009-04-06 02:08:44 +0000346
347 def test_SB_commands(self):
348 # RFC 855, subnegotiations portion
349 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
350 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
351 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
352 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
353 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
354 EOF_sigil,
355 ]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000356 self.dataq.put(send)
Jack Diederich183028e2009-04-06 02:08:44 +0000357 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000358 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000359 nego = nego_collector(telnet.read_sb_data)
360 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000361 txt = telnet.read_all()
362 self.assertEqual(txt, '')
363 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
364 self.assertEqual(nego.sb_seen, want_sb_data)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000365 self.assertEqual('', telnet.read_sb_data())
Jack Diederich7f9bb912009-04-07 23:56:57 +0000366 nego.sb_getter = None # break the nego => telnet cycle
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000367
368def test_main(verbose=None):
Jack Diederich183028e2009-04-06 02:08:44 +0000369 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000370
371if __name__ == '__main__':
372 test_main()