blob: bad281961988938bfbb6c9cc46071e7a558ed892 [file] [log] [blame]
Guido van Rossumd8faa362007-04-27 19:54:29 +00001import socket
2import threading
3import telnetlib
4import time
Jack Diederich1c8f38c2009-04-10 05:33:26 +00005import queue
Guido van Rossumd8faa362007-04-27 19:54:29 +00006
7from unittest import TestCase
Benjamin Petersonee8712c2008-05-20 21:35:26 +00008from test import support
Guido van Rossumd8faa362007-04-27 19:54:29 +00009
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010HOST = support.HOST
Jack Diederich1c8f38c2009-04-10 05:33:26 +000011EOF_sigil = object()
Guido van Rossumd8faa362007-04-27 19:54:29 +000012
Jack Diederich1c8f38c2009-04-10 05:33:26 +000013def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
18 3) set evt to true to let the parent know we're done
19 """
Guido van Rossumd8faa362007-04-27 19:54:29 +000020 serv.listen(5)
Christian Heimesaf98da12008-01-27 15:18:18 +000021 evt.set()
Guido van Rossumd8faa362007-04-27 19:54:29 +000022 try:
23 conn, addr = serv.accept()
Jack Diederich1c8f38c2009-04-10 05:33:26 +000024 if dataq:
25 data = b''
26 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
30 break
31 if type(item) in [int, float]:
32 time.sleep(item)
33 else:
34 data += item
35 written = conn.send(data)
36 data = data[written:]
Guido van Rossumd8faa362007-04-27 19:54:29 +000037 except socket.timeout:
38 pass
39 finally:
40 serv.close()
41 evt.set()
42
43class GeneralTests(TestCase):
44
45 def setUp(self):
46 self.evt = threading.Event()
Christian Heimes5e696852008-04-09 08:37:03 +000047 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 self.sock.settimeout(3)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000049 self.port = support.bind_port(self.sock)
Jack Diederich1c8f38c2009-04-10 05:33:26 +000050 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
51 self.thread.start()
Christian Heimesaf98da12008-01-27 15:18:18 +000052 self.evt.wait()
53 self.evt.clear()
54 time.sleep(.1)
Guido van Rossumd8faa362007-04-27 19:54:29 +000055
56 def tearDown(self):
57 self.evt.wait()
Jack Diederich1c8f38c2009-04-10 05:33:26 +000058 self.thread.join()
Guido van Rossumd8faa362007-04-27 19:54:29 +000059
60 def testBasic(self):
61 # connects
Christian Heimes5e696852008-04-09 08:37:03 +000062 telnet = telnetlib.Telnet(HOST, self.port)
Guido van Rossumd8faa362007-04-27 19:54:29 +000063 telnet.sock.close()
64
65 def testTimeoutDefault(self):
Georg Brandlf78e02b2008-06-10 17:40:04 +000066 self.assertTrue(socket.getdefaulttimeout() is None)
67 socket.setdefaulttimeout(30)
68 try:
69 telnet = telnetlib.Telnet("localhost", self.port)
70 finally:
71 socket.setdefaulttimeout(None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000072 self.assertEqual(telnet.sock.gettimeout(), 30)
73 telnet.sock.close()
74
75 def testTimeoutNone(self):
76 # None, having other default
Georg Brandlf78e02b2008-06-10 17:40:04 +000077 self.assertTrue(socket.getdefaulttimeout() is None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000078 socket.setdefaulttimeout(30)
79 try:
Christian Heimes5e696852008-04-09 08:37:03 +000080 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000081 finally:
Georg Brandlf78e02b2008-06-10 17:40:04 +000082 socket.setdefaulttimeout(None)
83 self.assertTrue(telnet.sock.gettimeout() is None)
84 telnet.sock.close()
85
86 def testTimeoutValue(self):
87 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
Guido van Rossumd8faa362007-04-27 19:54:29 +000088 self.assertEqual(telnet.sock.gettimeout(), 30)
89 telnet.sock.close()
90
Georg Brandlf78e02b2008-06-10 17:40:04 +000091 def testTimeoutOpen(self):
92 telnet = telnetlib.Telnet()
93 telnet.open("localhost", self.port, timeout=30)
94 self.assertEqual(telnet.sock.gettimeout(), 30)
95 telnet.sock.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +000096
Jack Diederich1c8f38c2009-04-10 05:33:26 +000097def _read_setUp(self):
98 self.evt = threading.Event()
99 self.dataq = queue.Queue()
100 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101 self.sock.settimeout(3)
102 self.port = support.bind_port(self.sock)
103 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
104 self.thread.start()
105 self.evt.wait()
106 self.evt.clear()
107 time.sleep(.1)
108
109def _read_tearDown(self):
110 self.evt.wait()
111 self.thread.join()
112
113class ReadTests(TestCase):
114 setUp = _read_setUp
115 tearDown = _read_tearDown
116
117 # use a similar approach to testing timeouts as test_timeout.py
118 # these will never pass 100% but make the fuzz big enough that it is rare
119 block_long = 0.6
120 block_short = 0.3
121 def test_read_until_A(self):
122 """
123 read_until(expected, [timeout])
124 Read until the expected string has been seen, or a timeout is
125 hit (default is no timeout); may block.
126 """
127 want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil]
128 self.dataq.put(want)
129 telnet = telnetlib.Telnet(HOST, self.port)
130 self.dataq.join()
131 data = telnet.read_until(b'match')
132 self.assertEqual(data, b''.join(want[:-2]))
133
134 def test_read_until_B(self):
135 # test the timeout - it does NOT raise socket.timeout
136 want = [b'hello', self.block_long, b'not seen', EOF_sigil]
137 self.dataq.put(want)
138 telnet = telnetlib.Telnet(HOST, self.port)
139 self.dataq.join()
140 data = telnet.read_until(b'not seen', self.block_short)
141 self.assertEqual(data, want[0])
142 self.assertEqual(telnet.read_all(), b'not seen')
143
144 def test_read_all_A(self):
145 """
146 read_all()
147 Read all data until EOF; may block.
148 """
149 want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil]
150 self.dataq.put(want)
151 telnet = telnetlib.Telnet(HOST, self.port)
152 self.dataq.join()
153 data = telnet.read_all()
154 self.assertEqual(data, b''.join(want[:-1]))
155 return
156
157 def _test_blocking(self, func):
158 self.dataq.put([self.block_long, EOF_sigil])
159 self.dataq.join()
160 start = time.time()
161 data = func()
162 self.assertTrue(self.block_short <= time.time() - start)
163
164 def test_read_all_B(self):
165 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
166
167 def test_read_all_C(self):
168 self.dataq.put([EOF_sigil])
169 telnet = telnetlib.Telnet(HOST, self.port)
170 self.dataq.join()
171 telnet.read_all()
172 telnet.read_all() # shouldn't raise
173
174 def test_read_some_A(self):
175 """
176 read_some()
177 Read at least one byte or EOF; may block.
178 """
179 # test 'at least one byte'
180 want = [b'x' * 500, EOF_sigil]
181 self.dataq.put(want)
182 telnet = telnetlib.Telnet(HOST, self.port)
183 self.dataq.join()
184 data = telnet.read_all()
185 self.assertTrue(len(data) >= 1)
186
187 def test_read_some_B(self):
188 # test EOF
189 self.dataq.put([EOF_sigil])
190 telnet = telnetlib.Telnet(HOST, self.port)
191 self.dataq.join()
192 self.assertEqual(b'', telnet.read_some())
193
194 def test_read_some_C(self):
195 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
196
197 def _test_read_any_eager_A(self, func_name):
198 """
199 read_very_eager()
200 Read all data available already queued or on the socket,
201 without blocking.
202 """
203 want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil]
204 expects = want[1] + want[2]
205 self.dataq.put(want)
206 telnet = telnetlib.Telnet(HOST, self.port)
207 self.dataq.join()
208 func = getattr(telnet, func_name)
209 data = b''
210 while True:
211 try:
212 data += func()
213 self.assertTrue(expects.startswith(data))
214 except EOFError:
215 break
216 self.assertEqual(expects, data)
217
218 def _test_read_any_eager_B(self, func_name):
219 # test EOF
220 self.dataq.put([EOF_sigil])
221 telnet = telnetlib.Telnet(HOST, self.port)
222 self.dataq.join()
223 time.sleep(self.block_short)
224 func = getattr(telnet, func_name)
225 self.assertRaises(EOFError, func)
226
227 # read_eager and read_very_eager make the same gaurantees
228 # (they behave differently but we only test the gaurantees)
229 def test_read_very_eager_A(self):
230 self._test_read_any_eager_A('read_very_eager')
231 def test_read_very_eager_B(self):
232 self._test_read_any_eager_B('read_very_eager')
233 def test_read_eager_A(self):
234 self._test_read_any_eager_A('read_eager')
235 def test_read_eager_B(self):
236 self._test_read_any_eager_B('read_eager')
237 # NB -- we need to test the IAC block which is mentioned in the docstring
238 # but not in the module docs
239
240 def _test_read_any_lazy_B(self, func_name):
241 self.dataq.put([EOF_sigil])
242 telnet = telnetlib.Telnet(HOST, self.port)
243 self.dataq.join()
244 func = getattr(telnet, func_name)
245 telnet.fill_rawq()
246 self.assertRaises(EOFError, func)
247
248 def test_read_lazy_A(self):
249 want = [b'x' * 100, EOF_sigil]
250 self.dataq.put(want)
251 telnet = telnetlib.Telnet(HOST, self.port)
252 self.dataq.join()
253 time.sleep(self.block_short)
254 self.assertEqual(b'', telnet.read_lazy())
255 data = b''
256 while True:
257 try:
258 read_data = telnet.read_lazy()
259 data += read_data
260 if not read_data:
261 telnet.fill_rawq()
262 except EOFError:
263 break
264 self.assertTrue(want[0].startswith(data))
265 self.assertEqual(data, want[0])
266
267 def test_read_lazy_B(self):
268 self._test_read_any_lazy_B('read_lazy')
269
270 def test_read_very_lazy_A(self):
271 want = [b'x' * 100, EOF_sigil]
272 self.dataq.put(want)
273 telnet = telnetlib.Telnet(HOST, self.port)
274 self.dataq.join()
275 time.sleep(self.block_short)
276 self.assertEqual(b'', telnet.read_very_lazy())
277 data = b''
278 while True:
279 try:
280 read_data = telnet.read_very_lazy()
281 except EOFError:
282 break
283 data += read_data
284 if not read_data:
285 telnet.fill_rawq()
286 self.assertEqual(b'', telnet.cookedq)
287 telnet.process_rawq()
288 self.assertTrue(want[0].startswith(data))
289 self.assertEqual(data, want[0])
290
291 def test_read_very_lazy_B(self):
292 self._test_read_any_lazy_B('read_very_lazy')
293
294class nego_collector(object):
295 def __init__(self, sb_getter=None):
296 self.seen = b''
297 self.sb_getter = sb_getter
298 self.sb_seen = b''
299
300 def do_nego(self, sock, cmd, opt):
301 self.seen += cmd + opt
302 if cmd == tl.SE and self.sb_getter:
303 sb_data = self.sb_getter()
304 self.sb_seen += sb_data
305
306tl = telnetlib
307class OptionTests(TestCase):
308 setUp = _read_setUp
309 tearDown = _read_tearDown
310 # RFC 854 commands
311 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
312
313 def _test_command(self, data):
314 """ helper for testing IAC + cmd """
315 self.setUp()
316 self.dataq.put(data)
317 telnet = telnetlib.Telnet(HOST, self.port)
318 self.dataq.join()
319 nego = nego_collector()
320 telnet.set_option_negotiation_callback(nego.do_nego)
321 txt = telnet.read_all()
322 cmd = nego.seen
323 self.assertTrue(len(cmd) > 0) # we expect at least one command
324 self.assertTrue(cmd[:1] in self.cmds)
325 self.assertEqual(cmd[1:2], tl.NOOPT)
326 self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd))
327 nego.sb_getter = None # break the nego => telnet cycle
328 self.tearDown()
329
330 def test_IAC_commands(self):
331 # reset our setup
332 self.dataq.put([EOF_sigil])
333 telnet = telnetlib.Telnet(HOST, self.port)
334 self.dataq.join()
335 self.tearDown()
336
337 for cmd in self.cmds:
338 self._test_command([tl.IAC, cmd, EOF_sigil])
339 self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil])
340 self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil])
341 # all at once
342 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
343 self.assertEqual(b'', telnet.read_sb_data())
344
345 def test_SB_commands(self):
346 # RFC 855, subnegotiations portion
347 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
348 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
349 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
350 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
351 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
352 EOF_sigil,
353 ]
354 self.dataq.put(send)
355 telnet = telnetlib.Telnet(HOST, self.port)
356 self.dataq.join()
357 nego = nego_collector(telnet.read_sb_data)
358 telnet.set_option_negotiation_callback(nego.do_nego)
359 txt = telnet.read_all()
360 self.assertEqual(txt, b'')
361 want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
362 self.assertEqual(nego.sb_seen, want_sb_data)
363 self.assertEqual(b'', telnet.read_sb_data())
364 nego.sb_getter = None # break the nego => telnet cycle
Guido van Rossumd8faa362007-04-27 19:54:29 +0000365
366def test_main(verbose=None):
Jack Diederich1c8f38c2009-04-10 05:33:26 +0000367 support.run_unittest(GeneralTests, ReadTests, OptionTests)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000368
369if __name__ == '__main__':
370 test_main()