blob: a04bc3fe678ee817980732683e7c8c1ebb2f855b [file] [log] [blame]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00001import socket
2import threading
3import telnetlib
4import time
Jack Diederich183028e2009-04-06 02:08:44 +00005import Queue
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00006
7from unittest import TestCase
8from test import test_support
9
Trent Nelsone41b0062008-04-08 23:47:30 +000010HOST = test_support.HOST
Jack Diederich183028e2009-04-06 02:08:44 +000011EOF_sigil = object()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000012
Jack Diederich183028e2009-04-06 02:08:44 +000013def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
Jack Diederich3b2312e2009-04-07 20:22:59 +000016 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
Jack Diederich183028e2009-04-06 02:08:44 +000018 3) set evt to true to let the parent know we're done
19 """
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000020 serv.listen(5)
Neal Norwitz37184292008-01-26 21:21:59 +000021 evt.set()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich183028e2009-04-06 02:08:44 +000024 if dataq:
25 data = ''
Jack Diederich7f9bb912009-04-07 23:56:57 +000026 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
Jack Diederich3b2312e2009-04-07 20:22:59 +000030 break
Jack Diederich7f9bb912009-04-07 23:56:57 +000031 if type(item) in [int, float]:
32 time.sleep(item)
Jack Diederich3b2312e2009-04-07 20:22:59 +000033 else:
Jack Diederich7f9bb912009-04-07 23:56:57 +000034 data += item
Jack Diederich183028e2009-04-06 02:08:44 +000035 written = conn.send(data)
36 data = data[written:]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000037 except socket.timeout:
38 pass
39 finally:
40 serv.close()
41 evt.set()
42
43class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000044
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000045 def setUp(self):
46 self.evt = threading.Event()
Trent Nelsone41b0062008-04-08 23:47:30 +000047 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 self.sock.settimeout(3)
49 self.port = test_support.bind_port(self.sock)
Jack Diederich183028e2009-04-06 02:08:44 +000050 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
51 self.thread.start()
Neal Norwitz37184292008-01-26 21:21:59 +000052 self.evt.wait()
53 self.evt.clear()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000054 time.sleep(.1)
55
56 def tearDown(self):
57 self.evt.wait()
Jack Diederich183028e2009-04-06 02:08:44 +000058 self.thread.join()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000059
60 def testBasic(self):
61 # connects
Trent Nelsone41b0062008-04-08 23:47:30 +000062 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000063 telnet.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000064
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000065 def testTimeoutDefault(self):
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000066 self.assertTrue(socket.getdefaulttimeout() is None)
67 socket.setdefaulttimeout(30)
68 try:
69 telnet = telnetlib.Telnet("localhost", self.port)
70 finally:
71 socket.setdefaulttimeout(None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000072 self.assertEqual(telnet.sock.gettimeout(), 30)
73 telnet.sock.close()
74
75 def testTimeoutNone(self):
76 # None, having other default
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000077 self.assertTrue(socket.getdefaulttimeout() is None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000078 socket.setdefaulttimeout(30)
79 try:
Trent Nelsone41b0062008-04-08 23:47:30 +000080 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000081 finally:
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000082 socket.setdefaulttimeout(None)
83 self.assertTrue(telnet.sock.gettimeout() is None)
84 telnet.sock.close()
85
86 def testTimeoutValue(self):
87 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000088 self.assertEqual(telnet.sock.gettimeout(), 30)
89 telnet.sock.close()
90
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000091 def testTimeoutOpen(self):
92 telnet = telnetlib.Telnet()
93 telnet.open("localhost", self.port, timeout=30)
94 self.assertEqual(telnet.sock.gettimeout(), 30)
95 telnet.sock.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000096
Jack Diederich183028e2009-04-06 02:08:44 +000097def _read_setUp(self):
Jack Diederich183028e2009-04-06 02:08:44 +000098 self.evt = threading.Event()
99 self.dataq = Queue.Queue()
100 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101 self.sock.settimeout(3)
102 self.port = test_support.bind_port(self.sock)
103 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
104 self.thread.start()
105 self.evt.wait()
106 self.evt.clear()
107 time.sleep(.1)
108
109def _read_tearDown(self):
110 self.evt.wait()
111 self.thread.join()
112
Jack Diederich183028e2009-04-06 02:08:44 +0000113class ReadTests(TestCase):
114 setUp = _read_setUp
115 tearDown = _read_tearDown
116
Jack Diederich7f9bb912009-04-07 23:56:57 +0000117 # use a similar approach to testing timeouts as test_timeout.py
118 # these will never pass 100% but make the fuzz big enough that it is rare
119 block_long = 0.6
120 block_short = 0.3
Jack Diederich183028e2009-04-06 02:08:44 +0000121 def test_read_until_A(self):
122 """
123 read_until(expected, [timeout])
124 Read until the expected string has been seen, or a timeout is
125 hit (default is no timeout); may block.
126 """
127 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000128 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000129 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000130 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000131 data = telnet.read_until('match')
132 self.assertEqual(data, ''.join(want[:-2]))
133
134 def test_read_until_B(self):
135 # test the timeout - it does NOT raise socket.timeout
Jack Diederich7f9bb912009-04-07 23:56:57 +0000136 want = ['hello', self.block_long, 'not seen', EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000137 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000138 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000139 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000140 data = telnet.read_until('not seen', self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000141 self.assertEqual(data, want[0])
Jack Diederich7f9bb912009-04-07 23:56:57 +0000142 self.assertEqual(telnet.read_all(), 'not seen')
Jack Diederich183028e2009-04-06 02:08:44 +0000143
144 def test_read_all_A(self):
145 """
146 read_all()
147 Read all data until EOF; may block.
148 """
149 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000150 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000151 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000152 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000153 data = telnet.read_all()
154 self.assertEqual(data, ''.join(want[:-1]))
155 return
156
Jack Diederich3b2312e2009-04-07 20:22:59 +0000157 def _test_blocking(self, func):
Jack Diederich7f9bb912009-04-07 23:56:57 +0000158 self.dataq.put([self.block_long, EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000159 self.dataq.join()
160 start = time.time()
161 data = func()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000162 self.assertTrue(self.block_short <= time.time() - start)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000163
Jack Diederich183028e2009-04-06 02:08:44 +0000164 def test_read_all_B(self):
165 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
166
Jack Diederich3b2312e2009-04-07 20:22:59 +0000167 def test_read_all_C(self):
168 self.dataq.put([EOF_sigil])
169 telnet = telnetlib.Telnet(HOST, self.port)
170 self.dataq.join()
171 telnet.read_all()
172 telnet.read_all() # shouldn't raise
173
Jack Diederich183028e2009-04-06 02:08:44 +0000174 def test_read_some_A(self):
175 """
176 read_some()
177 Read at least one byte or EOF; may block.
178 """
179 # test 'at least one byte'
180 want = ['x' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000181 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000182 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000183 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000184 data = telnet.read_all()
185 self.assertTrue(len(data) >= 1)
186
187 def test_read_some_B(self):
188 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000189 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000190 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000191 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000192 self.assertEqual('', telnet.read_some())
193
Jack Diederich3b2312e2009-04-07 20:22:59 +0000194 def test_read_some_C(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000195 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
196
197 def _test_read_any_eager_A(self, func_name):
198 """
199 read_very_eager()
200 Read all data available already queued or on the socket,
201 without blocking.
202 """
Jack Diederich7f9bb912009-04-07 23:56:57 +0000203 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
204 expects = want[1] + want[2]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000205 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000206 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000207 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000208 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000209 data = ''
210 while True:
211 try:
212 data += func()
213 self.assertTrue(expects.startswith(data))
Jack Diederich183028e2009-04-06 02:08:44 +0000214 except EOFError:
215 break
216 self.assertEqual(expects, data)
217
218 def _test_read_any_eager_B(self, func_name):
219 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000220 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000221 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000222 self.dataq.join()
Jack Diederich7f9bb912009-04-07 23:56:57 +0000223 time.sleep(self.block_short)
Jack Diederich183028e2009-04-06 02:08:44 +0000224 func = getattr(telnet, func_name)
225 self.assertRaises(EOFError, func)
226
227 # read_eager and read_very_eager make the same gaurantees
228 # (they behave differently but we only test the gaurantees)
229 def test_read_very_eager_A(self):
230 self._test_read_any_eager_A('read_very_eager')
231 def test_read_very_eager_B(self):
232 self._test_read_any_eager_B('read_very_eager')
233 def test_read_eager_A(self):
234 self._test_read_any_eager_A('read_eager')
235 def test_read_eager_B(self):
236 self._test_read_any_eager_B('read_eager')
237 # NB -- we need to test the IAC block which is mentioned in the docstring
238 # but not in the module docs
239
Jack Diederich183028e2009-04-06 02:08:44 +0000240 def _test_read_any_lazy_B(self, func_name):
Jack Diederich3b2312e2009-04-07 20:22:59 +0000241 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000242 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000243 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000244 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000245 telnet.fill_rawq()
246 self.assertRaises(EOFError, func)
247
Jack Diederich7f9bb912009-04-07 23:56:57 +0000248 def test_read_lazy_A(self):
249 want = ['x' * 100, EOF_sigil]
250 self.dataq.put(want)
251 telnet = telnetlib.Telnet(HOST, self.port)
252 self.dataq.join()
253 time.sleep(self.block_short)
254 self.assertEqual('', telnet.read_lazy())
255 data = ''
256 while True:
257 try:
258 read_data = telnet.read_lazy()
259 data += read_data
260 if not read_data:
261 telnet.fill_rawq()
262 except EOFError:
263 break
264 self.assertTrue(want[0].startswith(data))
265 self.assertEqual(data, want[0])
266
Jack Diederich183028e2009-04-06 02:08:44 +0000267 def test_read_lazy_B(self):
268 self._test_read_any_lazy_B('read_lazy')
269
Jack Diederich7f9bb912009-04-07 23:56:57 +0000270 def test_read_very_lazy_A(self):
271 want = ['x' * 100, EOF_sigil]
272 self.dataq.put(want)
273 telnet = telnetlib.Telnet(HOST, self.port)
274 self.dataq.join()
275 time.sleep(self.block_short)
276 self.assertEqual('', telnet.read_very_lazy())
277 data = ''
278 while True:
279 try:
280 read_data = telnet.read_very_lazy()
281 except EOFError:
282 break
283 data += read_data
284 if not read_data:
285 telnet.fill_rawq()
286 self.assertEqual('', telnet.cookedq)
287 telnet.process_rawq()
288 self.assertTrue(want[0].startswith(data))
289 self.assertEqual(data, want[0])
290
291 def test_read_very_lazy_B(self):
292 self._test_read_any_lazy_B('read_very_lazy')
293
Jack Diederich183028e2009-04-06 02:08:44 +0000294class nego_collector(object):
295 def __init__(self, sb_getter=None):
296 self.seen = ''
297 self.sb_getter = sb_getter
298 self.sb_seen = ''
299
300 def do_nego(self, sock, cmd, opt):
301 self.seen += cmd + opt
302 if cmd == tl.SE and self.sb_getter:
303 sb_data = self.sb_getter()
304 self.sb_seen += sb_data
305
306tl = telnetlib
307class OptionTests(TestCase):
308 setUp = _read_setUp
309 tearDown = _read_tearDown
310 # RFC 854 commands
311 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
312
313 def _test_command(self, data):
314 """ helper for testing IAC + cmd """
315 self.setUp()
Jack Diederich3b2312e2009-04-07 20:22:59 +0000316 self.dataq.put(data)
Jack Diederich183028e2009-04-06 02:08:44 +0000317 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000318 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000319 nego = nego_collector()
320 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000321 txt = telnet.read_all()
322 cmd = nego.seen
323 self.assertTrue(len(cmd) > 0) # we expect at least one command
Ezio Melottiaa980582010-01-23 23:04:36 +0000324 self.assertIn(cmd[0], self.cmds)
Jack Diederich183028e2009-04-06 02:08:44 +0000325 self.assertEqual(cmd[1], tl.NOOPT)
326 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
Jack Diederich7f9bb912009-04-07 23:56:57 +0000327 nego.sb_getter = None # break the nego => telnet cycle
Jack Diederich183028e2009-04-06 02:08:44 +0000328 self.tearDown()
329
330 def test_IAC_commands(self):
331 # reset our setup
Jack Diederich3b2312e2009-04-07 20:22:59 +0000332 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000333 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000334 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000335 self.tearDown()
336
337 for cmd in self.cmds:
338 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
339 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
340 self._test_command([tl.IAC + cmd, EOF_sigil])
341 # all at once
342 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000343 self.assertEqual('', telnet.read_sb_data())
Jack Diederich183028e2009-04-06 02:08:44 +0000344
345 def test_SB_commands(self):
346 # RFC 855, subnegotiations portion
347 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
348 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
349 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
350 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
351 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
352 EOF_sigil,
353 ]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000354 self.dataq.put(send)
Jack Diederich183028e2009-04-06 02:08:44 +0000355 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000356 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000357 nego = nego_collector(telnet.read_sb_data)
358 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000359 txt = telnet.read_all()
360 self.assertEqual(txt, '')
361 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
362 self.assertEqual(nego.sb_seen, want_sb_data)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000363 self.assertEqual('', telnet.read_sb_data())
Jack Diederich7f9bb912009-04-07 23:56:57 +0000364 nego.sb_getter = None # break the nego => telnet cycle
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000365
366def test_main(verbose=None):
Jack Diederich183028e2009-04-06 02:08:44 +0000367 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000368
369if __name__ == '__main__':
370 test_main()