blob: 321e052fa9dafe42b97df37443478b6049af4787 [file] [log] [blame]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00001import socket
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00002import telnetlib
3import time
Jack Diederich183028e2009-04-06 02:08:44 +00004import Queue
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00005
6from unittest import TestCase
7from test import test_support
Victor Stinner6a102812010-04-27 23:55:59 +00008threading = test_support.import_module('threading')
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00009
Trent Nelsone41b0062008-04-08 23:47:30 +000010HOST = test_support.HOST
Jack Diederich183028e2009-04-06 02:08:44 +000011EOF_sigil = object()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000012
Jack Diederich183028e2009-04-06 02:08:44 +000013def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
Jack Diederich3b2312e2009-04-07 20:22:59 +000016 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
Jack Diederich183028e2009-04-06 02:08:44 +000018 3) set evt to true to let the parent know we're done
19 """
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000020 serv.listen(5)
Neal Norwitz37184292008-01-26 21:21:59 +000021 evt.set()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich183028e2009-04-06 02:08:44 +000024 if dataq:
25 data = ''
Jack Diederich7f9bb912009-04-07 23:56:57 +000026 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
Jack Diederich3b2312e2009-04-07 20:22:59 +000030 break
Jack Diederich7f9bb912009-04-07 23:56:57 +000031 if type(item) in [int, float]:
32 time.sleep(item)
Jack Diederich3b2312e2009-04-07 20:22:59 +000033 else:
Jack Diederich7f9bb912009-04-07 23:56:57 +000034 data += item
Jack Diederich183028e2009-04-06 02:08:44 +000035 written = conn.send(data)
36 data = data[written:]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000037 except socket.timeout:
38 pass
39 finally:
40 serv.close()
Brian Curtin0c6d4382010-11-06 01:37:00 +000041 conn.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000042 evt.set()
43
44class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000045
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000046 def setUp(self):
47 self.evt = threading.Event()
Trent Nelsone41b0062008-04-08 23:47:30 +000048 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
49 self.sock.settimeout(3)
50 self.port = test_support.bind_port(self.sock)
Jack Diederich183028e2009-04-06 02:08:44 +000051 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
52 self.thread.start()
Neal Norwitz37184292008-01-26 21:21:59 +000053 self.evt.wait()
54 self.evt.clear()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000055 time.sleep(.1)
56
57 def tearDown(self):
58 self.evt.wait()
Jack Diederich183028e2009-04-06 02:08:44 +000059 self.thread.join()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000060
61 def testBasic(self):
62 # connects
Trent Nelsone41b0062008-04-08 23:47:30 +000063 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000064 telnet.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000065
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000066 def testTimeoutDefault(self):
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000067 self.assertTrue(socket.getdefaulttimeout() is None)
68 socket.setdefaulttimeout(30)
69 try:
70 telnet = telnetlib.Telnet("localhost", self.port)
71 finally:
72 socket.setdefaulttimeout(None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000073 self.assertEqual(telnet.sock.gettimeout(), 30)
74 telnet.sock.close()
75
76 def testTimeoutNone(self):
77 # None, having other default
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000078 self.assertTrue(socket.getdefaulttimeout() is None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000079 socket.setdefaulttimeout(30)
80 try:
Trent Nelsone41b0062008-04-08 23:47:30 +000081 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000082 finally:
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000083 socket.setdefaulttimeout(None)
84 self.assertTrue(telnet.sock.gettimeout() is None)
85 telnet.sock.close()
86
87 def testTimeoutValue(self):
88 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000089 self.assertEqual(telnet.sock.gettimeout(), 30)
90 telnet.sock.close()
91
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000092 def testTimeoutOpen(self):
93 telnet = telnetlib.Telnet()
94 telnet.open("localhost", self.port, timeout=30)
95 self.assertEqual(telnet.sock.gettimeout(), 30)
96 telnet.sock.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000097
Jack Diederich183028e2009-04-06 02:08:44 +000098def _read_setUp(self):
Jack Diederich183028e2009-04-06 02:08:44 +000099 self.evt = threading.Event()
100 self.dataq = Queue.Queue()
101 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
102 self.sock.settimeout(3)
103 self.port = test_support.bind_port(self.sock)
104 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
105 self.thread.start()
106 self.evt.wait()
107 self.evt.clear()
108 time.sleep(.1)
109
110def _read_tearDown(self):
111 self.evt.wait()
112 self.thread.join()
113
Jack Diederich183028e2009-04-06 02:08:44 +0000114class ReadTests(TestCase):
115 setUp = _read_setUp
116 tearDown = _read_tearDown
117
Jack Diederich7f9bb912009-04-07 23:56:57 +0000118 # use a similar approach to testing timeouts as test_timeout.py
119 # these will never pass 100% but make the fuzz big enough that it is rare
120 block_long = 0.6
121 block_short = 0.3
Jack Diederich183028e2009-04-06 02:08:44 +0000122 def test_read_until_A(self):
123 """
124 read_until(expected, [timeout])
125 Read until the expected string has been seen, or a timeout is
126 hit (default is no timeout); may block.
127 """
128 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000129 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000130 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000131 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000132 data = telnet.read_until('match')
133 self.assertEqual(data, ''.join(want[:-2]))
134
135 def test_read_until_B(self):
136 # test the timeout - it does NOT raise socket.timeout
Jack Diederich7f9bb912009-04-07 23:56:57 +0000137 want = ['hello', self.block_long, 'not seen', EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000138 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000139 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000140 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000141 data = telnet.read_until('not seen', self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000142 self.assertEqual(data, want[0])
Jack Diederich7f9bb912009-04-07 23:56:57 +0000143 self.assertEqual(telnet.read_all(), 'not seen')
Jack Diederich183028e2009-04-06 02:08:44 +0000144
145 def test_read_all_A(self):
146 """
147 read_all()
148 Read all data until EOF; may block.
149 """
150 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000151 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000152 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000153 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000154 data = telnet.read_all()
155 self.assertEqual(data, ''.join(want[:-1]))
156 return
157
Jack Diederich3b2312e2009-04-07 20:22:59 +0000158 def _test_blocking(self, func):
Jack Diederich7f9bb912009-04-07 23:56:57 +0000159 self.dataq.put([self.block_long, EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000160 self.dataq.join()
161 start = time.time()
162 data = func()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000163 self.assertTrue(self.block_short <= time.time() - start)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000164
Jack Diederich183028e2009-04-06 02:08:44 +0000165 def test_read_all_B(self):
166 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
167
Jack Diederich3b2312e2009-04-07 20:22:59 +0000168 def test_read_all_C(self):
169 self.dataq.put([EOF_sigil])
170 telnet = telnetlib.Telnet(HOST, self.port)
171 self.dataq.join()
172 telnet.read_all()
173 telnet.read_all() # shouldn't raise
174
Jack Diederich183028e2009-04-06 02:08:44 +0000175 def test_read_some_A(self):
176 """
177 read_some()
178 Read at least one byte or EOF; may block.
179 """
180 # test 'at least one byte'
181 want = ['x' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000182 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000183 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000184 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000185 data = telnet.read_all()
186 self.assertTrue(len(data) >= 1)
187
188 def test_read_some_B(self):
189 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000190 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000191 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000192 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000193 self.assertEqual('', telnet.read_some())
194
Jack Diederich3b2312e2009-04-07 20:22:59 +0000195 def test_read_some_C(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000196 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
197
198 def _test_read_any_eager_A(self, func_name):
199 """
200 read_very_eager()
201 Read all data available already queued or on the socket,
202 without blocking.
203 """
Jack Diederich7f9bb912009-04-07 23:56:57 +0000204 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
205 expects = want[1] + want[2]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000206 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000207 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000208 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000209 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000210 data = ''
211 while True:
212 try:
213 data += func()
214 self.assertTrue(expects.startswith(data))
Jack Diederich183028e2009-04-06 02:08:44 +0000215 except EOFError:
216 break
217 self.assertEqual(expects, data)
218
219 def _test_read_any_eager_B(self, func_name):
220 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000221 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000222 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000223 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000224 time.sleep(self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000225 func = getattr(telnet, func_name)
226 self.assertRaises(EOFError, func)
227
228 # read_eager and read_very_eager make the same gaurantees
229 # (they behave differently but we only test the gaurantees)
230 def test_read_very_eager_A(self):
231 self._test_read_any_eager_A('read_very_eager')
232 def test_read_very_eager_B(self):
233 self._test_read_any_eager_B('read_very_eager')
234 def test_read_eager_A(self):
235 self._test_read_any_eager_A('read_eager')
236 def test_read_eager_B(self):
237 self._test_read_any_eager_B('read_eager')
238 # NB -- we need to test the IAC block which is mentioned in the docstring
239 # but not in the module docs
240
Jack Diederich183028e2009-04-06 02:08:44 +0000241 def _test_read_any_lazy_B(self, func_name):
Jack Diederich3b2312e2009-04-07 20:22:59 +0000242 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000243 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000244 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000245 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000246 telnet.fill_rawq()
247 self.assertRaises(EOFError, func)
248
Jack Diederich7f9bb912009-04-07 23:56:57 +0000249 def test_read_lazy_A(self):
250 want = ['x' * 100, EOF_sigil]
251 self.dataq.put(want)
252 telnet = telnetlib.Telnet(HOST, self.port)
253 self.dataq.join()
254 time.sleep(self.block_short)
255 self.assertEqual('', telnet.read_lazy())
256 data = ''
257 while True:
258 try:
259 read_data = telnet.read_lazy()
260 data += read_data
261 if not read_data:
262 telnet.fill_rawq()
263 except EOFError:
264 break
265 self.assertTrue(want[0].startswith(data))
266 self.assertEqual(data, want[0])
267
Jack Diederich183028e2009-04-06 02:08:44 +0000268 def test_read_lazy_B(self):
269 self._test_read_any_lazy_B('read_lazy')
270
Jack Diederich7f9bb912009-04-07 23:56:57 +0000271 def test_read_very_lazy_A(self):
272 want = ['x' * 100, EOF_sigil]
273 self.dataq.put(want)
274 telnet = telnetlib.Telnet(HOST, self.port)
275 self.dataq.join()
276 time.sleep(self.block_short)
277 self.assertEqual('', telnet.read_very_lazy())
278 data = ''
279 while True:
280 try:
281 read_data = telnet.read_very_lazy()
282 except EOFError:
283 break
284 data += read_data
285 if not read_data:
286 telnet.fill_rawq()
287 self.assertEqual('', telnet.cookedq)
288 telnet.process_rawq()
289 self.assertTrue(want[0].startswith(data))
290 self.assertEqual(data, want[0])
291
292 def test_read_very_lazy_B(self):
293 self._test_read_any_lazy_B('read_very_lazy')
294
Jack Diederich183028e2009-04-06 02:08:44 +0000295class nego_collector(object):
296 def __init__(self, sb_getter=None):
297 self.seen = ''
298 self.sb_getter = sb_getter
299 self.sb_seen = ''
300
301 def do_nego(self, sock, cmd, opt):
302 self.seen += cmd + opt
303 if cmd == tl.SE and self.sb_getter:
304 sb_data = self.sb_getter()
305 self.sb_seen += sb_data
306
307tl = telnetlib
308class OptionTests(TestCase):
309 setUp = _read_setUp
310 tearDown = _read_tearDown
311 # RFC 854 commands
312 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
313
314 def _test_command(self, data):
315 """ helper for testing IAC + cmd """
316 self.setUp()
Jack Diederich3b2312e2009-04-07 20:22:59 +0000317 self.dataq.put(data)
Jack Diederich183028e2009-04-06 02:08:44 +0000318 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000319 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000320 nego = nego_collector()
321 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000322 txt = telnet.read_all()
323 cmd = nego.seen
324 self.assertTrue(len(cmd) > 0) # we expect at least one command
Ezio Melottiaa980582010-01-23 23:04:36 +0000325 self.assertIn(cmd[0], self.cmds)
Jack Diederich183028e2009-04-06 02:08:44 +0000326 self.assertEqual(cmd[1], tl.NOOPT)
327 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
Jack Diederich7f9bb912009-04-07 23:56:57 +0000328 nego.sb_getter = None # break the nego => telnet cycle
Jack Diederich183028e2009-04-06 02:08:44 +0000329 self.tearDown()
330
331 def test_IAC_commands(self):
332 # reset our setup
Jack Diederich3b2312e2009-04-07 20:22:59 +0000333 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000334 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000335 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000336 self.tearDown()
337
338 for cmd in self.cmds:
339 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
340 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
341 self._test_command([tl.IAC + cmd, EOF_sigil])
342 # all at once
343 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000344 self.assertEqual('', telnet.read_sb_data())
Jack Diederich183028e2009-04-06 02:08:44 +0000345
346 def test_SB_commands(self):
347 # RFC 855, subnegotiations portion
348 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
349 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
350 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
351 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
352 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
353 EOF_sigil,
354 ]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000355 self.dataq.put(send)
Jack Diederich183028e2009-04-06 02:08:44 +0000356 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000357 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000358 nego = nego_collector(telnet.read_sb_data)
359 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000360 txt = telnet.read_all()
361 self.assertEqual(txt, '')
362 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
363 self.assertEqual(nego.sb_seen, want_sb_data)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000364 self.assertEqual('', telnet.read_sb_data())
Jack Diederich7f9bb912009-04-07 23:56:57 +0000365 nego.sb_getter = None # break the nego => telnet cycle
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000366
367def test_main(verbose=None):
Jack Diederich183028e2009-04-06 02:08:44 +0000368 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000369
370if __name__ == '__main__':
371 test_main()