blob: d6214be5b91a4434c3448d02fa13ef81432a082b [file] [log] [blame]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00001import socket
2import threading
3import telnetlib
4import time
Jack Diederich183028e2009-04-06 02:08:44 +00005import Queue
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00006
7from unittest import TestCase
8from test import test_support
9
Trent Nelsone41b0062008-04-08 23:47:30 +000010HOST = test_support.HOST
Jack Diederich183028e2009-04-06 02:08:44 +000011EOF_sigil = object()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000012
Jack Diederich183028e2009-04-06 02:08:44 +000013def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
Jack Diederich3b2312e2009-04-07 20:22:59 +000016 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
Jack Diederich183028e2009-04-06 02:08:44 +000018 3) set evt to true to let the parent know we're done
19 """
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000020 serv.listen(5)
Neal Norwitz37184292008-01-26 21:21:59 +000021 evt.set()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich183028e2009-04-06 02:08:44 +000024 if dataq:
25 data = ''
Jack Diederich3b2312e2009-04-07 20:22:59 +000026 done = False
27 for new_data in dataq.get(True, 0.5):
28 if not done:
29 dataq.task_done()
30 done = True
31 if new_data == EOF_sigil:
32 break
33 if type(new_data) in [int, float]:
Jack Diederich183028e2009-04-06 02:08:44 +000034 time.sleep(new_data)
Jack Diederich3b2312e2009-04-07 20:22:59 +000035 else:
36 data += new_data
Jack Diederich183028e2009-04-06 02:08:44 +000037 written = conn.send(data)
38 data = data[written:]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000039 except socket.timeout:
40 pass
41 finally:
42 serv.close()
43 evt.set()
44
Jack Diederich183028e2009-04-06 02:08:44 +000045def wibble_float(num):
46 ''' return a (low, high) tuple that are 1% more and 1% less of num '''
47 return num * 0.99, num * 1.01
48
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000049class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000050
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000051 def setUp(self):
52 self.evt = threading.Event()
Trent Nelsone41b0062008-04-08 23:47:30 +000053 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
54 self.sock.settimeout(3)
55 self.port = test_support.bind_port(self.sock)
Jack Diederich183028e2009-04-06 02:08:44 +000056 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
57 self.thread.start()
Neal Norwitz37184292008-01-26 21:21:59 +000058 self.evt.wait()
59 self.evt.clear()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000060 time.sleep(.1)
61
62 def tearDown(self):
63 self.evt.wait()
Jack Diederich183028e2009-04-06 02:08:44 +000064 self.thread.join()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000065
66 def testBasic(self):
67 # connects
Trent Nelsone41b0062008-04-08 23:47:30 +000068 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000069 telnet.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000070
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000071 def testTimeoutDefault(self):
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000072 self.assertTrue(socket.getdefaulttimeout() is None)
73 socket.setdefaulttimeout(30)
74 try:
75 telnet = telnetlib.Telnet("localhost", self.port)
76 finally:
77 socket.setdefaulttimeout(None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000078 self.assertEqual(telnet.sock.gettimeout(), 30)
79 telnet.sock.close()
80
81 def testTimeoutNone(self):
82 # None, having other default
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000083 self.assertTrue(socket.getdefaulttimeout() is None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000084 socket.setdefaulttimeout(30)
85 try:
Trent Nelsone41b0062008-04-08 23:47:30 +000086 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000087 finally:
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000088 socket.setdefaulttimeout(None)
89 self.assertTrue(telnet.sock.gettimeout() is None)
90 telnet.sock.close()
91
92 def testTimeoutValue(self):
93 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000094 self.assertEqual(telnet.sock.gettimeout(), 30)
95 telnet.sock.close()
96
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000097 def testTimeoutOpen(self):
98 telnet = telnetlib.Telnet()
99 telnet.open("localhost", self.port, timeout=30)
100 self.assertEqual(telnet.sock.gettimeout(), 30)
101 telnet.sock.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000102
Jack Diederich183028e2009-04-06 02:08:44 +0000103def _read_setUp(self):
104 # the blocking constant should be tuned!
Jack Diederich3b2312e2009-04-07 20:22:59 +0000105 self.blocking_timeout = 0.3
Jack Diederich183028e2009-04-06 02:08:44 +0000106 self.evt = threading.Event()
107 self.dataq = Queue.Queue()
108 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
109 self.sock.settimeout(3)
110 self.port = test_support.bind_port(self.sock)
111 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
112 self.thread.start()
113 self.evt.wait()
114 self.evt.clear()
115 time.sleep(.1)
116
117def _read_tearDown(self):
118 self.evt.wait()
119 self.thread.join()
120
Jack Diederich183028e2009-04-06 02:08:44 +0000121class ReadTests(TestCase):
122 setUp = _read_setUp
123 tearDown = _read_tearDown
124
Jack Diederich183028e2009-04-06 02:08:44 +0000125 def test_read_until_A(self):
126 """
127 read_until(expected, [timeout])
128 Read until the expected string has been seen, or a timeout is
129 hit (default is no timeout); may block.
130 """
131 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000132 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000133 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000134 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000135 data = telnet.read_until('match')
136 self.assertEqual(data, ''.join(want[:-2]))
137
138 def test_read_until_B(self):
139 # test the timeout - it does NOT raise socket.timeout
140 want = ['hello', self.blocking_timeout, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000141 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000142 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000143 self.dataq.join()
144 data = telnet.read_until('not seen')
Jack Diederich183028e2009-04-06 02:08:44 +0000145 self.assertEqual(data, want[0])
146
147 def test_read_all_A(self):
148 """
149 read_all()
150 Read all data until EOF; may block.
151 """
152 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000153 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000154 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000155 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000156 data = telnet.read_all()
157 self.assertEqual(data, ''.join(want[:-1]))
158 return
159
Jack Diederich3b2312e2009-04-07 20:22:59 +0000160 def _test_blocking(self, func):
161 self.dataq.put([self.blocking_timeout, EOF_sigil])
162 self.dataq.join()
163 start = time.time()
164 data = func()
165 low, high = wibble_float(self.blocking_timeout)
166 self.assertTrue(low <= time.time() - start)
167
Jack Diederich183028e2009-04-06 02:08:44 +0000168 def test_read_all_B(self):
169 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
170
Jack Diederich3b2312e2009-04-07 20:22:59 +0000171 def test_read_all_C(self):
172 self.dataq.put([EOF_sigil])
173 telnet = telnetlib.Telnet(HOST, self.port)
174 self.dataq.join()
175 telnet.read_all()
176 telnet.read_all() # shouldn't raise
177
Jack Diederich183028e2009-04-06 02:08:44 +0000178 def test_read_some_A(self):
179 """
180 read_some()
181 Read at least one byte or EOF; may block.
182 """
183 # test 'at least one byte'
184 want = ['x' * 500, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000185 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000186 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000187 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000188 data = telnet.read_all()
189 self.assertTrue(len(data) >= 1)
190
191 def test_read_some_B(self):
192 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000193 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000194 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000195 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000196 self.assertEqual('', telnet.read_some())
197
Jack Diederich3b2312e2009-04-07 20:22:59 +0000198 def test_read_some_C(self):
Jack Diederich183028e2009-04-06 02:08:44 +0000199 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
200
201 def _test_read_any_eager_A(self, func_name):
202 """
203 read_very_eager()
204 Read all data available already queued or on the socket,
205 without blocking.
206 """
207 # this never blocks so it should return eat part in turn
208 want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil]
209 expects = want[0] + want[2]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000210 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000211 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000212 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000213 func = getattr(telnet, func_name)
Jack Diederich183028e2009-04-06 02:08:44 +0000214 data = ''
215 while True:
216 try:
217 data += func()
218 self.assertTrue(expects.startswith(data))
Jack Diederich183028e2009-04-06 02:08:44 +0000219 except EOFError:
220 break
221 self.assertEqual(expects, data)
222
223 def _test_read_any_eager_B(self, func_name):
224 # test EOF
Jack Diederich3b2312e2009-04-07 20:22:59 +0000225 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000226 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000227 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000228 func = getattr(telnet, func_name)
229 self.assertRaises(EOFError, func)
230
231 # read_eager and read_very_eager make the same gaurantees
232 # (they behave differently but we only test the gaurantees)
233 def test_read_very_eager_A(self):
234 self._test_read_any_eager_A('read_very_eager')
235 def test_read_very_eager_B(self):
236 self._test_read_any_eager_B('read_very_eager')
237 def test_read_eager_A(self):
238 self._test_read_any_eager_A('read_eager')
239 def test_read_eager_B(self):
240 self._test_read_any_eager_B('read_eager')
241 # NB -- we need to test the IAC block which is mentioned in the docstring
242 # but not in the module docs
243
244 def _test_read_any_lazy_A(self, func_name):
245 want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000246 self.dataq.put(want)
Jack Diederich183028e2009-04-06 02:08:44 +0000247 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000248 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000249 func = getattr(telnet, func_name)
250 self.assertEqual('', func())
251 data = ''
252 while True:
Jack Diederich3b2312e2009-04-07 20:22:59 +0000253 time.sleep(0.0)
Jack Diederich183028e2009-04-06 02:08:44 +0000254 try:
255 telnet.fill_rawq()
256 data += func()
257 if not data:
258 break
259 except EOFError:
260 break
261 self.assertTrue(want[1].startswith(data))
262 return data, want[1]
263
264 def _test_read_any_lazy_B(self, func_name):
Jack Diederich3b2312e2009-04-07 20:22:59 +0000265 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000266 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000267 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000268 func = getattr(telnet, func_name)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000269 time.sleep(0.0)
Jack Diederich183028e2009-04-06 02:08:44 +0000270 telnet.fill_rawq()
271 self.assertRaises(EOFError, func)
272
273 # read_lazy and read_very_lazy make the samish gaurantees
274 def test_read_very_lazy_A(self):
275 data, want = self._test_read_any_lazy_A('read_very_lazy')
276 self.assertEqual(data, '')
277 def test_read_lazy(self):
278 data, want = self._test_read_any_lazy_A('read_lazy')
279 self.assertEqual(data, want)
280 def test_read_very_lazy_B(self):
281 self._test_read_any_lazy_B('read_very_lazy')
282 def test_read_lazy_B(self):
283 self._test_read_any_lazy_B('read_lazy')
284
285class nego_collector(object):
286 def __init__(self, sb_getter=None):
287 self.seen = ''
288 self.sb_getter = sb_getter
289 self.sb_seen = ''
290
291 def do_nego(self, sock, cmd, opt):
292 self.seen += cmd + opt
293 if cmd == tl.SE and self.sb_getter:
294 sb_data = self.sb_getter()
295 self.sb_seen += sb_data
296
297tl = telnetlib
298class OptionTests(TestCase):
299 setUp = _read_setUp
300 tearDown = _read_tearDown
301 # RFC 854 commands
302 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
303
304 def _test_command(self, data):
305 """ helper for testing IAC + cmd """
306 self.setUp()
Jack Diederich3b2312e2009-04-07 20:22:59 +0000307 self.dataq.put(data)
Jack Diederich183028e2009-04-06 02:08:44 +0000308 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000309 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000310 nego = nego_collector()
311 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000312 txt = telnet.read_all()
313 cmd = nego.seen
314 self.assertTrue(len(cmd) > 0) # we expect at least one command
315 self.assertTrue(cmd[0] in self.cmds)
316 self.assertEqual(cmd[1], tl.NOOPT)
317 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
318 self.tearDown()
319
320 def test_IAC_commands(self):
321 # reset our setup
Jack Diederich3b2312e2009-04-07 20:22:59 +0000322 self.dataq.put([EOF_sigil])
Jack Diederich183028e2009-04-06 02:08:44 +0000323 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000324 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000325 self.tearDown()
326
327 for cmd in self.cmds:
328 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
329 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
330 self._test_command([tl.IAC + cmd, EOF_sigil])
331 # all at once
332 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
Jack Diederich3b2312e2009-04-07 20:22:59 +0000333 self.assertEqual('', telnet.read_sb_data())
Jack Diederich183028e2009-04-06 02:08:44 +0000334
335 def test_SB_commands(self):
336 # RFC 855, subnegotiations portion
337 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
338 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
339 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
340 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
341 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
342 EOF_sigil,
343 ]
Jack Diederich3b2312e2009-04-07 20:22:59 +0000344 self.dataq.put(send)
Jack Diederich183028e2009-04-06 02:08:44 +0000345 telnet = telnetlib.Telnet(HOST, self.port)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000346 self.dataq.join()
Jack Diederich183028e2009-04-06 02:08:44 +0000347 nego = nego_collector(telnet.read_sb_data)
348 telnet.set_option_negotiation_callback(nego.do_nego)
Jack Diederich183028e2009-04-06 02:08:44 +0000349 txt = telnet.read_all()
350 self.assertEqual(txt, '')
351 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
352 self.assertEqual(nego.sb_seen, want_sb_data)
Jack Diederich3b2312e2009-04-07 20:22:59 +0000353 self.assertEqual('', telnet.read_sb_data())
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000354
355def test_main(verbose=None):
Jack Diederich183028e2009-04-06 02:08:44 +0000356 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000357
358if __name__ == '__main__':
359 test_main()