blob: 16f7f935442bab778a5e82debdf0ade89c8268c4 [file] [log] [blame]
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00001import socket
2import threading
3import telnetlib
4import time
Jack Diederich183028e2009-04-06 02:08:44 +00005import Queue
Facundo Batistab6a5c9d2007-03-29 18:22:35 +00006
7from unittest import TestCase
8from test import test_support
9
Trent Nelsone41b0062008-04-08 23:47:30 +000010HOST = test_support.HOST
Jack Diederich183028e2009-04-06 02:08:44 +000011EOF_sigil = object()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000012
Jack Diederich183028e2009-04-06 02:08:44 +000013def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] write all the data in dataq to the socket
17 terminate when dataq.get() returns EOF_sigil
18 3) set evt to true to let the parent know we're done
19 """
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000020 serv.listen(5)
Neal Norwitz37184292008-01-26 21:21:59 +000021 evt.set()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich183028e2009-04-06 02:08:44 +000024 if dataq:
25 data = ''
26 new_data = dataq.get(True, 0.5)
27 while new_data is not EOF_sigil:
28 if type(new_data) == str:
29 data += new_data
30 elif type(new_data) in [int, float]:
31 time.sleep(new_data)
32 written = conn.send(data)
33 data = data[written:]
34 new_data = dataq.get(True, 0.5)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000035 except socket.timeout:
36 pass
37 finally:
38 serv.close()
39 evt.set()
40
Jack Diederich183028e2009-04-06 02:08:44 +000041def wibble_float(num):
42 ''' return a (low, high) tuple that are 1% more and 1% less of num '''
43 return num * 0.99, num * 1.01
44
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000045class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000046
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000047 def setUp(self):
48 self.evt = threading.Event()
Trent Nelsone41b0062008-04-08 23:47:30 +000049 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.sock.settimeout(3)
51 self.port = test_support.bind_port(self.sock)
Jack Diederich183028e2009-04-06 02:08:44 +000052 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
53 self.thread.start()
Neal Norwitz37184292008-01-26 21:21:59 +000054 self.evt.wait()
55 self.evt.clear()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000056 time.sleep(.1)
57
58 def tearDown(self):
59 self.evt.wait()
Jack Diederich183028e2009-04-06 02:08:44 +000060 self.thread.join()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000061
62 def testBasic(self):
63 # connects
Trent Nelsone41b0062008-04-08 23:47:30 +000064 telnet = telnetlib.Telnet(HOST, self.port)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000065 telnet.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000066
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000067 def testTimeoutDefault(self):
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000068 self.assertTrue(socket.getdefaulttimeout() is None)
69 socket.setdefaulttimeout(30)
70 try:
71 telnet = telnetlib.Telnet("localhost", self.port)
72 finally:
73 socket.setdefaulttimeout(None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000074 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
76
77 def testTimeoutNone(self):
78 # None, having other default
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000079 self.assertTrue(socket.getdefaulttimeout() is None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000080 socket.setdefaulttimeout(30)
81 try:
Trent Nelsone41b0062008-04-08 23:47:30 +000082 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000083 finally:
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000084 socket.setdefaulttimeout(None)
85 self.assertTrue(telnet.sock.gettimeout() is None)
86 telnet.sock.close()
87
88 def testTimeoutValue(self):
89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000090 self.assertEqual(telnet.sock.gettimeout(), 30)
91 telnet.sock.close()
92
Facundo Batista4f1b1ed2008-05-29 16:39:26 +000093 def testTimeoutOpen(self):
94 telnet = telnetlib.Telnet()
95 telnet.open("localhost", self.port, timeout=30)
96 self.assertEqual(telnet.sock.gettimeout(), 30)
97 telnet.sock.close()
Facundo Batistab6a5c9d2007-03-29 18:22:35 +000098
Jack Diederich183028e2009-04-06 02:08:44 +000099def _read_setUp(self):
100 # the blocking constant should be tuned!
Benjamin Petersondda7b192009-04-07 16:03:04 +0000101 self.blocking_timeout = 0.0
Jack Diederich183028e2009-04-06 02:08:44 +0000102 self.evt = threading.Event()
103 self.dataq = Queue.Queue()
104 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.sock.settimeout(3)
106 self.port = test_support.bind_port(self.sock)
107 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
108 self.thread.start()
109 self.evt.wait()
110 self.evt.clear()
111 time.sleep(.1)
112
113def _read_tearDown(self):
114 self.evt.wait()
115 self.thread.join()
116
117
118class ReadTests(TestCase):
119 setUp = _read_setUp
120 tearDown = _read_tearDown
121
122 def _test_blocking(self, func):
123 start = time.time()
124 self.dataq.put(self.blocking_timeout)
125 self.dataq.put(EOF_sigil)
126 data = func()
127 low, high = wibble_float(self.blocking_timeout)
128 self.assertTrue(time.time() - start >= low)
129
130 def test_read_until_A(self):
131 """
132 read_until(expected, [timeout])
133 Read until the expected string has been seen, or a timeout is
134 hit (default is no timeout); may block.
135 """
136 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
137 for item in want:
138 self.dataq.put(item)
139 telnet = telnetlib.Telnet(HOST, self.port)
140 data = telnet.read_until('match')
141 self.assertEqual(data, ''.join(want[:-2]))
142
143 def test_read_until_B(self):
144 # test the timeout - it does NOT raise socket.timeout
145 want = ['hello', self.blocking_timeout, EOF_sigil]
146 for item in want:
147 self.dataq.put(item)
148 telnet = telnetlib.Telnet(HOST, self.port)
149 start = time.time()
150 timeout = self.blocking_timeout / 2
151 data = telnet.read_until('not seen', timeout)
152 low, high = wibble_float(timeout)
153 self.assertTrue(low <= time.time() - high)
154 self.assertEqual(data, want[0])
155
156 def test_read_all_A(self):
157 """
158 read_all()
159 Read all data until EOF; may block.
160 """
161 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
162 for item in want:
163 self.dataq.put(item)
164 telnet = telnetlib.Telnet(HOST, self.port)
165 data = telnet.read_all()
166 self.assertEqual(data, ''.join(want[:-1]))
167 return
168
169 def test_read_all_B(self):
170 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
171
172 def test_read_some_A(self):
173 """
174 read_some()
175 Read at least one byte or EOF; may block.
176 """
177 # test 'at least one byte'
178 want = ['x' * 500, EOF_sigil]
179 for item in want:
180 self.dataq.put(item)
181 telnet = telnetlib.Telnet(HOST, self.port)
182 data = telnet.read_all()
183 self.assertTrue(len(data) >= 1)
184
185 def test_read_some_B(self):
186 # test EOF
187 self.dataq.put(EOF_sigil)
188 telnet = telnetlib.Telnet(HOST, self.port)
189 self.assertEqual('', telnet.read_some())
190
191 def test_read_all_C(self):
192 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
193
194 def _test_read_any_eager_A(self, func_name):
195 """
196 read_very_eager()
197 Read all data available already queued or on the socket,
198 without blocking.
199 """
200 # this never blocks so it should return eat part in turn
201 want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil]
202 expects = want[0] + want[2]
203 for item in want:
204 self.dataq.put(item)
205 telnet = telnetlib.Telnet(HOST, self.port)
206 func = getattr(telnet, func_name)
207 time.sleep(self.blocking_timeout/10)
208 data = ''
209 while True:
210 try:
211 data += func()
212 self.assertTrue(expects.startswith(data))
213 time.sleep(self.blocking_timeout)
214 except EOFError:
215 break
216 self.assertEqual(expects, data)
217
218 def _test_read_any_eager_B(self, func_name):
219 # test EOF
220 self.dataq.put(EOF_sigil)
221 time.sleep(self.blocking_timeout / 10)
222 telnet = telnetlib.Telnet(HOST, self.port)
223 func = getattr(telnet, func_name)
224 self.assertRaises(EOFError, func)
225
226 # read_eager and read_very_eager make the same gaurantees
227 # (they behave differently but we only test the gaurantees)
228 def test_read_very_eager_A(self):
229 self._test_read_any_eager_A('read_very_eager')
230 def test_read_very_eager_B(self):
231 self._test_read_any_eager_B('read_very_eager')
232 def test_read_eager_A(self):
233 self._test_read_any_eager_A('read_eager')
234 def test_read_eager_B(self):
235 self._test_read_any_eager_B('read_eager')
236 # NB -- we need to test the IAC block which is mentioned in the docstring
237 # but not in the module docs
238
239 def _test_read_any_lazy_A(self, func_name):
240 want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil]
241 for item in want:
242 self.dataq.put(item)
243 telnet = telnetlib.Telnet(HOST, self.port)
244 func = getattr(telnet, func_name)
245 self.assertEqual('', func())
246 data = ''
247 while True:
248 time.sleep(self.blocking_timeout)
249 try:
250 telnet.fill_rawq()
251 data += func()
252 if not data:
253 break
254 except EOFError:
255 break
256 self.assertTrue(want[1].startswith(data))
257 return data, want[1]
258
259 def _test_read_any_lazy_B(self, func_name):
260 self.dataq.put(EOF_sigil)
261 telnet = telnetlib.Telnet(HOST, self.port)
262 func = getattr(telnet, func_name)
263 time.sleep(self.blocking_timeout/10)
264 telnet.fill_rawq()
265 self.assertRaises(EOFError, func)
266
267 # read_lazy and read_very_lazy make the samish gaurantees
268 def test_read_very_lazy_A(self):
269 data, want = self._test_read_any_lazy_A('read_very_lazy')
270 self.assertEqual(data, '')
271 def test_read_lazy(self):
272 data, want = self._test_read_any_lazy_A('read_lazy')
273 self.assertEqual(data, want)
274 def test_read_very_lazy_B(self):
275 self._test_read_any_lazy_B('read_very_lazy')
276 def test_read_lazy_B(self):
277 self._test_read_any_lazy_B('read_lazy')
278
279class nego_collector(object):
280 def __init__(self, sb_getter=None):
281 self.seen = ''
282 self.sb_getter = sb_getter
283 self.sb_seen = ''
284
285 def do_nego(self, sock, cmd, opt):
286 self.seen += cmd + opt
287 if cmd == tl.SE and self.sb_getter:
288 sb_data = self.sb_getter()
289 self.sb_seen += sb_data
290
291tl = telnetlib
292class OptionTests(TestCase):
293 setUp = _read_setUp
294 tearDown = _read_tearDown
295 # RFC 854 commands
296 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
297
298 def _test_command(self, data):
299 """ helper for testing IAC + cmd """
300 self.setUp()
301 for item in data:
302 self.dataq.put(item)
303 telnet = telnetlib.Telnet(HOST, self.port)
304 nego = nego_collector()
305 telnet.set_option_negotiation_callback(nego.do_nego)
306 time.sleep(self.blocking_timeout/10)
307 txt = telnet.read_all()
308 cmd = nego.seen
309 self.assertTrue(len(cmd) > 0) # we expect at least one command
310 self.assertTrue(cmd[0] in self.cmds)
311 self.assertEqual(cmd[1], tl.NOOPT)
312 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
313 self.tearDown()
314
315 def test_IAC_commands(self):
316 # reset our setup
317 self.dataq.put(EOF_sigil)
318 telnet = telnetlib.Telnet(HOST, self.port)
319 self.tearDown()
320
321 for cmd in self.cmds:
322 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
323 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
324 self._test_command([tl.IAC + cmd, EOF_sigil])
325 # all at once
326 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
327
328 def test_SB_commands(self):
329 # RFC 855, subnegotiations portion
330 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
331 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
332 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
333 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
334 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
335 EOF_sigil,
336 ]
337 for item in send:
338 self.dataq.put(item)
339 telnet = telnetlib.Telnet(HOST, self.port)
340 nego = nego_collector(telnet.read_sb_data)
341 telnet.set_option_negotiation_callback(nego.do_nego)
342 time.sleep(self.blocking_timeout/10)
343 txt = telnet.read_all()
344 self.assertEqual(txt, '')
345 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
346 self.assertEqual(nego.sb_seen, want_sb_data)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000347
348def test_main(verbose=None):
Jack Diederich183028e2009-04-06 02:08:44 +0000349 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
Facundo Batistab6a5c9d2007-03-29 18:22:35 +0000350
351if __name__ == '__main__':
352 test_main()