blob: aebe67bd87f4ba8f335a16c584e2081ac91760c4 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
Antoine Pitrouca5a06a2010-01-27 21:48:46 +000090
91 self.assertEqual(f.write(b"blah."), 5)
92 f.truncate(0)
93 self.assertEqual(f.tell(), 5)
94 f.seek(0)
95
Christian Heimes1a6387e2008-03-26 12:49:49 +000096 self.assertEqual(f.write(b"blah."), 5)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"Hello."), 6)
99 self.assertEqual(f.tell(), 6)
100 self.assertEqual(f.seek(-1, 1), 5)
101 self.assertEqual(f.tell(), 5)
102 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
103 self.assertEqual(f.seek(0), 0)
104 self.assertEqual(f.write(b"h"), 1)
105 self.assertEqual(f.seek(-1, 2), 13)
106 self.assertEqual(f.tell(), 13)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000107
Christian Heimes1a6387e2008-03-26 12:49:49 +0000108 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000109 self.assertEqual(f.tell(), 13)
Antoine Pitrouc4006102010-05-15 20:33:07 +0000110 self.assertEqual(f.write(b"hij"), 3)
111 self.assertEqual(f.seek(0,1), 16)
112 self.assertEqual(f.tell(), 16)
113 self.assertEqual(f.truncate(12), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000114 self.assertRaises(TypeError, f.seek, 0.0)
115
116 def read_ops(self, f, buffered=False):
117 data = f.read(5)
118 self.assertEqual(data, b"hello")
119 data = bytearray(data)
120 self.assertEqual(f.readinto(data), 5)
121 self.assertEqual(data, b" worl")
122 self.assertEqual(f.readinto(data), 2)
123 self.assertEqual(len(data), 5)
124 self.assertEqual(data[:2], b"d\n")
125 self.assertEqual(f.seek(0), 0)
126 self.assertEqual(f.read(20), b"hello world\n")
127 self.assertEqual(f.read(1), b"")
128 self.assertEqual(f.readinto(bytearray(b"x")), 0)
129 self.assertEqual(f.seek(-6, 2), 6)
130 self.assertEqual(f.read(5), b"world")
131 self.assertEqual(f.read(0), b"")
132 self.assertEqual(f.readinto(bytearray()), 0)
133 self.assertEqual(f.seek(-6, 1), 5)
134 self.assertEqual(f.read(5), b" worl")
135 self.assertEqual(f.tell(), 10)
Antoine Pitrouc4006102010-05-15 20:33:07 +0000136 f.seek(0)
137 f.read(2)
138 f.seek(0, 1)
139 self.assertEqual(f.tell(), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000140 self.assertRaises(TypeError, f.seek, 0.0)
141 if buffered:
142 f.seek(0)
143 self.assertEqual(f.read(), b"hello world\n")
144 f.seek(6)
145 self.assertEqual(f.read(), b"world\n")
146 self.assertEqual(f.read(), b"")
147
148 LARGE = 2**31
149
150 def large_file_ops(self, f):
151 assert f.readable()
152 assert f.writable()
153 self.assertEqual(f.seek(self.LARGE), self.LARGE)
154 self.assertEqual(f.tell(), self.LARGE)
155 self.assertEqual(f.write(b"xxx"), 3)
156 self.assertEqual(f.tell(), self.LARGE + 3)
157 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
158 self.assertEqual(f.truncate(), self.LARGE + 2)
159 self.assertEqual(f.tell(), self.LARGE + 2)
160 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
161 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000162 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
164 self.assertEqual(f.seek(-1, 2), self.LARGE)
165 self.assertEqual(f.read(2), b"x")
166
167 def test_raw_file_io(self):
168 f = io.open(test_support.TESTFN, "wb", buffering=0)
169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
174 f = io.open(test_support.TESTFN, "rb", buffering=0)
175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
178 self.read_ops(f)
179 f.close()
180
181 def test_buffered_file_io(self):
182 f = io.open(test_support.TESTFN, "wb")
183 self.assertEqual(f.readable(), False)
184 self.assertEqual(f.writable(), True)
185 self.assertEqual(f.seekable(), True)
186 self.write_ops(f)
187 f.close()
188 f = io.open(test_support.TESTFN, "rb")
189 self.assertEqual(f.readable(), True)
190 self.assertEqual(f.writable(), False)
191 self.assertEqual(f.seekable(), True)
192 self.read_ops(f, True)
Antoine Pitrouc4006102010-05-15 20:33:07 +0000193 f = io.open(test_support.TESTFN, "r+b")
194 self.assertEqual(f.readable(), True)
195 self.assertEqual(f.writable(), True)
196 self.assertEqual(f.seekable(), True)
197 self.write_ops(f)
198 f.seek(0)
199 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200 f.close()
201
202 def test_readline(self):
203 f = io.open(test_support.TESTFN, "wb")
204 f.write(b"abc\ndef\nxyzzy\nfoo")
205 f.close()
206 f = io.open(test_support.TESTFN, "rb")
207 self.assertEqual(f.readline(), b"abc\n")
208 self.assertEqual(f.readline(10), b"def\n")
209 self.assertEqual(f.readline(2), b"xy")
210 self.assertEqual(f.readline(4), b"zzy\n")
211 self.assertEqual(f.readline(), b"foo")
212 f.close()
213
214 def test_raw_bytes_io(self):
215 f = io.BytesIO()
216 self.write_ops(f)
217 data = f.getvalue()
218 self.assertEqual(data, b"hello world\n")
219 f = io.BytesIO(data)
220 self.read_ops(f, True)
221
222 def test_large_file_ops(self):
223 # On Windows and Mac OSX this test comsumes large resources; It takes
224 # a long time to build the >2GB file and takes >2GB of disk space
225 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000226 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000227 if not test_support.is_resource_enabled("largefile"):
228 print("\nTesting large file ops skipped on %s." % sys.platform,
229 file=sys.stderr)
230 print("It requires %d bytes and a long time." % self.LARGE,
231 file=sys.stderr)
232 print("Use 'regrtest.py -u largefile test_io' to run it.",
233 file=sys.stderr)
234 return
235 f = io.open(test_support.TESTFN, "w+b", 0)
236 self.large_file_ops(f)
237 f.close()
238 f = io.open(test_support.TESTFN, "w+b")
239 self.large_file_ops(f)
240 f.close()
241
242 def test_with_open(self):
243 for bufsize in (0, 1, 100):
244 f = None
245 with open(test_support.TESTFN, "wb", bufsize) as f:
246 f.write(b"xxx")
247 self.assertEqual(f.closed, True)
248 f = None
249 try:
250 with open(test_support.TESTFN, "wb", bufsize) as f:
251 1/0
252 except ZeroDivisionError:
253 self.assertEqual(f.closed, True)
254 else:
255 self.fail("1/0 didn't raise an exception")
256
Antoine Pitrou19fec8b2009-01-21 00:56:37 +0000257 # issue 5008
258 def test_append_mode_tell(self):
259 with io.open(test_support.TESTFN, "wb") as f:
260 f.write(b"xxx")
261 with io.open(test_support.TESTFN, "ab", buffering=0) as f:
262 self.assertEqual(f.tell(), 3)
263 with io.open(test_support.TESTFN, "ab") as f:
264 self.assertEqual(f.tell(), 3)
265 with io.open(test_support.TESTFN, "a") as f:
266 self.assert_(f.tell() > 0)
267
Christian Heimes1a6387e2008-03-26 12:49:49 +0000268 def test_destructor(self):
269 record = []
270 class MyFileIO(io.FileIO):
271 def __del__(self):
272 record.append(1)
273 io.FileIO.__del__(self)
274 def close(self):
275 record.append(2)
276 io.FileIO.close(self)
277 def flush(self):
278 record.append(3)
279 io.FileIO.flush(self)
280 f = MyFileIO(test_support.TESTFN, "w")
281 f.write("xxx")
282 del f
283 self.assertEqual(record, [1, 2, 3])
284
285 def test_close_flushes(self):
286 f = io.open(test_support.TESTFN, "wb")
287 f.write(b"xxx")
288 f.close()
289 f = io.open(test_support.TESTFN, "rb")
290 self.assertEqual(f.read(), b"xxx")
291 f.close()
292
293 def XXXtest_array_writes(self):
294 # XXX memory view not available yet
295 a = array.array('i', range(10))
296 n = len(memoryview(a))
297 f = io.open(test_support.TESTFN, "wb", 0)
298 self.assertEqual(f.write(a), n)
299 f.close()
300 f = io.open(test_support.TESTFN, "wb")
301 self.assertEqual(f.write(a), n)
302 f.close()
303
304 def test_closefd(self):
305 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
306 closefd=False)
307
Georg Brandld2094602008-12-05 08:51:30 +0000308 def testReadClosed(self):
309 with io.open(test_support.TESTFN, "w") as f:
310 f.write("egg\n")
311 with io.open(test_support.TESTFN, "r") as f:
312 file = io.open(f.fileno(), "r", closefd=False)
313 self.assertEqual(file.read(), "egg\n")
314 file.seek(0)
315 file.close()
316 self.assertRaises(ValueError, file.read)
317
318 def test_no_closefd_with_filename(self):
319 # can't use closefd in combination with a file name
320 self.assertRaises(ValueError,
321 io.open, test_support.TESTFN, "r", closefd=False)
322
323 def test_closefd_attr(self):
324 with io.open(test_support.TESTFN, "wb") as f:
325 f.write(b"egg\n")
326 with io.open(test_support.TESTFN, "r") as f:
327 self.assertEqual(f.buffer.raw.closefd, True)
328 file = io.open(f.fileno(), "r", closefd=False)
329 self.assertEqual(file.buffer.raw.closefd, False)
330
Antoine Pitrou01a255a2010-05-03 16:48:13 +0000331 def test_flush_error_on_close(self):
332 f = io.open(test_support.TESTFN, "wb", buffering=0)
333 def bad_flush():
334 raise IOError()
335 f.flush = bad_flush
336 self.assertRaises(IOError, f.close) # exception not swallowed
337
338 def test_multi_close(self):
339 f = io.open(test_support.TESTFN, "wb", buffering=0)
340 f.close()
341 f.close()
342 f.close()
343 self.assertRaises(ValueError, f.flush)
344
Georg Brandld2094602008-12-05 08:51:30 +0000345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346class MemorySeekTestMixin:
347
348 def testInit(self):
349 buf = self.buftype("1234567890")
350 bytesIo = self.ioclass(buf)
351
352 def testRead(self):
353 buf = self.buftype("1234567890")
354 bytesIo = self.ioclass(buf)
355
356 self.assertEquals(buf[:1], bytesIo.read(1))
357 self.assertEquals(buf[1:5], bytesIo.read(4))
358 self.assertEquals(buf[5:], bytesIo.read(900))
359 self.assertEquals(self.EOF, bytesIo.read())
360
361 def testReadNoArgs(self):
362 buf = self.buftype("1234567890")
363 bytesIo = self.ioclass(buf)
364
365 self.assertEquals(buf, bytesIo.read())
366 self.assertEquals(self.EOF, bytesIo.read())
367
368 def testSeek(self):
369 buf = self.buftype("1234567890")
370 bytesIo = self.ioclass(buf)
371
372 bytesIo.read(5)
373 bytesIo.seek(0)
374 self.assertEquals(buf, bytesIo.read())
375
376 bytesIo.seek(3)
377 self.assertEquals(buf[3:], bytesIo.read())
378 self.assertRaises(TypeError, bytesIo.seek, 0.0)
379
380 def testTell(self):
381 buf = self.buftype("1234567890")
382 bytesIo = self.ioclass(buf)
383
384 self.assertEquals(0, bytesIo.tell())
385 bytesIo.seek(5)
386 self.assertEquals(5, bytesIo.tell())
387 bytesIo.seek(10000)
388 self.assertEquals(10000, bytesIo.tell())
389
390
391class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
392 @staticmethod
393 def buftype(s):
394 return s.encode("utf-8")
395 ioclass = io.BytesIO
396 EOF = b""
397
398
399class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
400 buftype = str
401 ioclass = io.StringIO
402 EOF = ""
403
404
405class BufferedReaderTest(unittest.TestCase):
406
407 def testRead(self):
408 rawio = MockRawIO((b"abc", b"d", b"efg"))
409 bufio = io.BufferedReader(rawio)
410
411 self.assertEquals(b"abcdef", bufio.read(6))
412
413 def testBuffering(self):
414 data = b"abcdefghi"
415 dlen = len(data)
416
417 tests = [
418 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
419 [ 100, [ 3, 3, 3], [ dlen ] ],
420 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
421 ]
422
423 for bufsize, buf_read_sizes, raw_read_sizes in tests:
424 rawio = MockFileIO(data)
425 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
426 pos = 0
427 for nbytes in buf_read_sizes:
428 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
429 pos += nbytes
430 self.assertEquals(rawio.read_history, raw_read_sizes)
431
432 def testReadNonBlocking(self):
433 # Inject some None's in there to simulate EWOULDBLOCK
434 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
435 bufio = io.BufferedReader(rawio)
436
437 self.assertEquals(b"abcd", bufio.read(6))
438 self.assertEquals(b"e", bufio.read(1))
439 self.assertEquals(b"fg", bufio.read())
440 self.assert_(None is bufio.read())
441 self.assertEquals(b"", bufio.read())
442
443 def testReadToEof(self):
444 rawio = MockRawIO((b"abc", b"d", b"efg"))
445 bufio = io.BufferedReader(rawio)
446
447 self.assertEquals(b"abcdefg", bufio.read(9000))
448
449 def testReadNoArgs(self):
450 rawio = MockRawIO((b"abc", b"d", b"efg"))
451 bufio = io.BufferedReader(rawio)
452
453 self.assertEquals(b"abcdefg", bufio.read())
454
455 def testFileno(self):
456 rawio = MockRawIO((b"abc", b"d", b"efg"))
457 bufio = io.BufferedReader(rawio)
458
459 self.assertEquals(42, bufio.fileno())
460
461 def testFilenoNoFileno(self):
462 # XXX will we always have fileno() function? If so, kill
463 # this test. Else, write it.
464 pass
465
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000466 def testThreads(self):
467 try:
468 # Write out many bytes with exactly the same number of 0's,
469 # 1's... 255's. This will help us check that concurrent reading
470 # doesn't duplicate or forget contents.
471 N = 1000
472 l = range(256) * N
473 random.shuffle(l)
474 s = bytes(bytearray(l))
475 with io.open(test_support.TESTFN, "wb") as f:
476 f.write(s)
477 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
478 bufio = io.BufferedReader(raw, 8)
479 errors = []
480 results = []
481 def f():
482 try:
483 # Intra-buffer read then buffer-flushing read
484 for n in cycle([1, 19]):
485 s = bufio.read(n)
486 if not s:
487 break
488 # list.append() is atomic
489 results.append(s)
490 except Exception as e:
491 errors.append(e)
492 raise
493 threads = [threading.Thread(target=f) for x in range(20)]
494 for t in threads:
495 t.start()
496 time.sleep(0.02) # yield
497 for t in threads:
498 t.join()
499 self.assertFalse(errors,
500 "the following exceptions were caught: %r" % errors)
501 s = b''.join(results)
502 for i in range(256):
503 c = bytes(bytearray([i]))
504 self.assertEqual(s.count(c), N)
505 finally:
506 test_support.unlink(test_support.TESTFN)
507
508
Christian Heimes1a6387e2008-03-26 12:49:49 +0000509
510class BufferedWriterTest(unittest.TestCase):
511
512 def testWrite(self):
513 # Write to the buffered IO but don't overflow the buffer.
514 writer = MockRawIO()
515 bufio = io.BufferedWriter(writer, 8)
516
517 bufio.write(b"abc")
518
519 self.assertFalse(writer._write_stack)
520
521 def testWriteOverflow(self):
522 writer = MockRawIO()
523 bufio = io.BufferedWriter(writer, 8)
524
525 bufio.write(b"abc")
526 bufio.write(b"defghijkl")
527
528 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
529
530 def testWriteNonBlocking(self):
Antoine Pitrouc4006102010-05-15 20:33:07 +0000531 raw = MockNonBlockWriterIO((9, 2, 10, -6, 10, 8, 12))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000532 bufio = io.BufferedWriter(raw, 8, 16)
533
534 bufio.write(b"asdf")
535 bufio.write(b"asdfa")
536 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
537
538 bufio.write(b"asdfasdfasdf")
539 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
540 bufio.write(b"asdfasdfasdf")
541 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
542 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
543
544 bufio.write(b"asdfasdfasdf")
545
546 # XXX I don't like this test. It relies too heavily on how the
547 # algorithm actually works, which we might change. Refactor
548 # later.
549
550 def testFileno(self):
551 rawio = MockRawIO((b"abc", b"d", b"efg"))
552 bufio = io.BufferedWriter(rawio)
553
554 self.assertEquals(42, bufio.fileno())
555
556 def testFlush(self):
557 writer = MockRawIO()
558 bufio = io.BufferedWriter(writer, 8)
559
560 bufio.write(b"abc")
561 bufio.flush()
562
563 self.assertEquals(b"abc", writer._write_stack[0])
564
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000565 def testThreads(self):
566 # BufferedWriter should not raise exceptions or crash
567 # when called from multiple threads.
568 try:
569 # We use a real file object because it allows us to
570 # exercise situations where the GIL is released before
571 # writing the buffer to the raw streams. This is in addition
572 # to concurrency issues due to switching threads in the middle
573 # of Python code.
574 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
575 bufio = io.BufferedWriter(raw, 8)
576 errors = []
577 def f():
578 try:
579 # Write enough bytes to flush the buffer
580 s = b"a" * 19
581 for i in range(50):
582 bufio.write(s)
583 except Exception as e:
584 errors.append(e)
585 raise
586 threads = [threading.Thread(target=f) for x in range(20)]
587 for t in threads:
588 t.start()
589 time.sleep(0.02) # yield
590 for t in threads:
591 t.join()
592 self.assertFalse(errors,
593 "the following exceptions were caught: %r" % errors)
594 finally:
595 test_support.unlink(test_support.TESTFN)
596
Antoine Pitrou01a255a2010-05-03 16:48:13 +0000597 def test_flush_error_on_close(self):
598 raw = MockRawIO()
599 def bad_flush():
600 raise IOError()
601 raw.flush = bad_flush
602 b = io.BufferedWriter(raw)
603 self.assertRaises(IOError, b.close) # exception not swallowed
604
605 def test_multi_close(self):
606 raw = MockRawIO()
607 b = io.BufferedWriter(raw)
608 b.close()
609 b.close()
610 b.close()
611 self.assertRaises(ValueError, b.flush)
612
Christian Heimes1a6387e2008-03-26 12:49:49 +0000613
614class BufferedRWPairTest(unittest.TestCase):
615
616 def testRWPair(self):
617 r = MockRawIO(())
618 w = MockRawIO()
619 pair = io.BufferedRWPair(r, w)
Benjamin Peterson828a7062008-12-27 17:05:29 +0000620 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000621
Benjamin Peterson828a7062008-12-27 17:05:29 +0000622 # XXX More Tests
Christian Heimes1a6387e2008-03-26 12:49:49 +0000623
624
625class BufferedRandomTest(unittest.TestCase):
626
627 def testReadAndWrite(self):
628 raw = MockRawIO((b"asdf", b"ghjk"))
629 rw = io.BufferedRandom(raw, 8, 12)
630
631 self.assertEqual(b"as", rw.read(2))
632 rw.write(b"ddd")
633 rw.write(b"eee")
634 self.assertFalse(raw._write_stack) # Buffer writes
635 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
636 self.assertEquals(b"dddeee", raw._write_stack[0])
637
638 def testSeekAndTell(self):
639 raw = io.BytesIO(b"asdfghjkl")
640 rw = io.BufferedRandom(raw)
641
642 self.assertEquals(b"as", rw.read(2))
643 self.assertEquals(2, rw.tell())
644 rw.seek(0, 0)
645 self.assertEquals(b"asdf", rw.read(4))
646
647 rw.write(b"asdf")
648 rw.seek(0, 0)
649 self.assertEquals(b"asdfasdfl", rw.read())
650 self.assertEquals(9, rw.tell())
651 rw.seek(-4, 2)
652 self.assertEquals(5, rw.tell())
653 rw.seek(2, 1)
654 self.assertEquals(7, rw.tell())
655 self.assertEquals(b"fl", rw.read(11))
656 self.assertRaises(TypeError, rw.seek, 0.0)
657
658# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
659# properties:
660# - A single output character can correspond to many bytes of input.
661# - The number of input bytes to complete the character can be
662# undetermined until the last input byte is received.
663# - The number of input bytes can vary depending on previous input.
664# - A single input byte can correspond to many characters of output.
665# - The number of output characters can be undetermined until the
666# last input byte is received.
667# - The number of output characters can vary depending on previous input.
668
669class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
670 """
671 For testing seek/tell behavior with a stateful, buffering decoder.
672
673 Input is a sequence of words. Words may be fixed-length (length set
674 by input) or variable-length (period-terminated). In variable-length
675 mode, extra periods are ignored. Possible words are:
676 - 'i' followed by a number sets the input length, I (maximum 99).
677 When I is set to 0, words are space-terminated.
678 - 'o' followed by a number sets the output length, O (maximum 99).
679 - Any other word is converted into a word followed by a period on
680 the output. The output word consists of the input word truncated
681 or padded out with hyphens to make its length equal to O. If O
682 is 0, the word is output verbatim without truncating or padding.
683 I and O are initially set to 1. When I changes, any buffered input is
684 re-scanned according to the new I. EOF also terminates the last word.
685 """
686
687 def __init__(self, errors='strict'):
688 codecs.IncrementalDecoder.__init__(self, errors)
689 self.reset()
690
691 def __repr__(self):
692 return '<SID %x>' % id(self)
693
694 def reset(self):
695 self.i = 1
696 self.o = 1
697 self.buffer = bytearray()
698
699 def getstate(self):
700 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
701 return bytes(self.buffer), i*100 + o
702
703 def setstate(self, state):
704 buffer, io = state
705 self.buffer = bytearray(buffer)
706 i, o = divmod(io, 100)
707 self.i, self.o = i ^ 1, o ^ 1
708
709 def decode(self, input, final=False):
710 output = ''
711 for b in input:
712 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000713 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714 if self.buffer:
715 output += self.process_word()
716 else:
717 self.buffer.append(b)
718 else: # fixed-length, terminate after self.i bytes
719 self.buffer.append(b)
720 if len(self.buffer) == self.i:
721 output += self.process_word()
722 if final and self.buffer: # EOF terminates the last word
723 output += self.process_word()
724 return output
725
726 def process_word(self):
727 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000728 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000730 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000731 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
732 else:
733 output = self.buffer.decode('ascii')
734 if len(output) < self.o:
735 output += '-'*self.o # pad out with hyphens
736 if self.o:
737 output = output[:self.o] # truncate to output length
738 output += '.'
739 self.buffer = bytearray()
740 return output
741
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000742 codecEnabled = False
743
744 @classmethod
745 def lookupTestDecoder(cls, name):
746 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000747 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000748 return codecs.CodecInfo(
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000749 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000750 incrementalencoder=None,
751 streamreader=None, streamwriter=None,
752 incrementaldecoder=cls)
753
754# Register the previous decoder for testing.
755# Disabled by default, tests will enable it.
756codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
757
758
Christian Heimes1a6387e2008-03-26 12:49:49 +0000759class StatefulIncrementalDecoderTest(unittest.TestCase):
760 """
761 Make sure the StatefulIncrementalDecoder actually works.
762 """
763
764 test_cases = [
765 # I=1, O=1 (fixed-length input == fixed-length output)
766 (b'abcd', False, 'a.b.c.d.'),
767 # I=0, O=0 (variable-length input, variable-length output)
768 (b'oiabcd', True, 'abcd.'),
769 # I=0, O=0 (should ignore extra periods)
770 (b'oi...abcd...', True, 'abcd.'),
771 # I=0, O=6 (variable-length input, fixed-length output)
772 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
773 # I=2, O=6 (fixed-length input < fixed-length output)
774 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
775 # I=6, O=3 (fixed-length input > fixed-length output)
776 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
777 # I=0, then 3; O=29, then 15 (with longer output)
778 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
779 'a----------------------------.' +
780 'b----------------------------.' +
781 'cde--------------------------.' +
782 'abcdefghijabcde.' +
783 'a.b------------.' +
784 '.c.------------.' +
785 'd.e------------.' +
786 'k--------------.' +
787 'l--------------.' +
788 'm--------------.')
789 ]
790
791 def testDecoder(self):
792 # Try a few one-shot test cases.
793 for input, eof, output in self.test_cases:
794 d = StatefulIncrementalDecoder()
795 self.assertEquals(d.decode(input, eof), output)
796
797 # Also test an unfinished decode, followed by forcing EOF.
798 d = StatefulIncrementalDecoder()
799 self.assertEquals(d.decode(b'oiabcd'), '')
800 self.assertEquals(d.decode(b'', 1), 'abcd.')
801
802class TextIOWrapperTest(unittest.TestCase):
803
804 def setUp(self):
805 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
806 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
807
808 def tearDown(self):
809 test_support.unlink(test_support.TESTFN)
810
811 def testLineBuffering(self):
812 r = io.BytesIO()
813 b = io.BufferedWriter(r, 1000)
814 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
815 t.write(u"X")
816 self.assertEquals(r.getvalue(), b"") # No flush happened
817 t.write(u"Y\nZ")
818 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
819 t.write(u"A\rB")
820 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
821
822 def testEncodingErrorsReading(self):
823 # (1) default
824 b = io.BytesIO(b"abc\n\xff\n")
825 t = io.TextIOWrapper(b, encoding="ascii")
826 self.assertRaises(UnicodeError, t.read)
827 # (2) explicit strict
828 b = io.BytesIO(b"abc\n\xff\n")
829 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
830 self.assertRaises(UnicodeError, t.read)
831 # (3) ignore
832 b = io.BytesIO(b"abc\n\xff\n")
833 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
834 self.assertEquals(t.read(), "abc\n\n")
835 # (4) replace
836 b = io.BytesIO(b"abc\n\xff\n")
837 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
838 self.assertEquals(t.read(), u"abc\n\ufffd\n")
839
840 def testEncodingErrorsWriting(self):
841 # (1) default
842 b = io.BytesIO()
843 t = io.TextIOWrapper(b, encoding="ascii")
844 self.assertRaises(UnicodeError, t.write, u"\xff")
845 # (2) explicit strict
846 b = io.BytesIO()
847 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
848 self.assertRaises(UnicodeError, t.write, u"\xff")
849 # (3) ignore
850 b = io.BytesIO()
851 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
852 newline="\n")
853 t.write(u"abc\xffdef\n")
854 t.flush()
855 self.assertEquals(b.getvalue(), b"abcdef\n")
856 # (4) replace
857 b = io.BytesIO()
858 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
859 newline="\n")
860 t.write(u"abc\xffdef\n")
861 t.flush()
862 self.assertEquals(b.getvalue(), b"abc?def\n")
863
864 def testNewlinesInput(self):
865 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
866 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
867 for newline, expected in [
868 (None, normalized.decode("ascii").splitlines(True)),
869 ("", testdata.decode("ascii").splitlines(True)),
870 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
871 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
872 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
873 ]:
874 buf = io.BytesIO(testdata)
875 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
876 self.assertEquals(txt.readlines(), expected)
877 txt.seek(0)
878 self.assertEquals(txt.read(), "".join(expected))
879
880 def testNewlinesOutput(self):
881 testdict = {
882 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
883 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
884 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
885 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
886 }
887 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
888 for newline, expected in tests:
889 buf = io.BytesIO()
890 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
891 txt.write("AAA\nB")
892 txt.write("BB\nCCC\n")
893 txt.write("X\rY\r\nZ")
894 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000895 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000896 self.assertEquals(buf.getvalue(), expected)
897
898 def testNewlines(self):
899 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
900
901 tests = [
902 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
903 [ '', input_lines ],
904 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
905 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
906 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
907 ]
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000908 encodings = (
909 'utf-8', 'latin-1',
910 'utf-16', 'utf-16-le', 'utf-16-be',
911 'utf-32', 'utf-32-le', 'utf-32-be',
912 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000913
914 # Try a range of buffer sizes to test the case where \r is the last
915 # character in TextIOWrapper._pending_line.
916 for encoding in encodings:
917 # XXX: str.encode() should return bytes
918 data = bytes(''.join(input_lines).encode(encoding))
919 for do_reads in (False, True):
920 for bufsize in range(1, 10):
921 for newline, exp_lines in tests:
922 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
923 textio = io.TextIOWrapper(bufio, newline=newline,
924 encoding=encoding)
925 if do_reads:
926 got_lines = []
927 while True:
928 c2 = textio.read(2)
929 if c2 == '':
930 break
931 self.assertEquals(len(c2), 2)
932 got_lines.append(c2 + textio.readline())
933 else:
934 got_lines = list(textio)
935
936 for got_line, exp_line in zip(got_lines, exp_lines):
937 self.assertEquals(got_line, exp_line)
938 self.assertEquals(len(got_lines), len(exp_lines))
939
940 def testNewlinesInput(self):
941 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
942 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
943 for newline, expected in [
944 (None, normalized.decode("ascii").splitlines(True)),
945 ("", testdata.decode("ascii").splitlines(True)),
946 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
947 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
948 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
949 ]:
950 buf = io.BytesIO(testdata)
951 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
952 self.assertEquals(txt.readlines(), expected)
953 txt.seek(0)
954 self.assertEquals(txt.read(), "".join(expected))
955
956 def testNewlinesOutput(self):
957 data = u"AAA\nBBB\rCCC\n"
958 data_lf = b"AAA\nBBB\rCCC\n"
959 data_cr = b"AAA\rBBB\rCCC\r"
960 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
961 save_linesep = os.linesep
962 try:
963 for os.linesep, newline, expected in [
964 ("\n", None, data_lf),
965 ("\r\n", None, data_crlf),
966 ("\n", "", data_lf),
967 ("\r\n", "", data_lf),
968 ("\n", "\n", data_lf),
969 ("\r\n", "\n", data_lf),
970 ("\n", "\r", data_cr),
971 ("\r\n", "\r", data_cr),
972 ("\n", "\r\n", data_crlf),
973 ("\r\n", "\r\n", data_crlf),
974 ]:
975 buf = io.BytesIO()
976 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
977 txt.write(data)
978 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000979 self.assertEquals(buf.closed, True)
980 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000981 finally:
982 os.linesep = save_linesep
983
984 # Systematic tests of the text I/O API
985
986 def testBasicIO(self):
987 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
988 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
989 f = io.open(test_support.TESTFN, "w+", encoding=enc)
990 f._CHUNK_SIZE = chunksize
991 self.assertEquals(f.write(u"abc"), 3)
992 f.close()
993 f = io.open(test_support.TESTFN, "r+", encoding=enc)
994 f._CHUNK_SIZE = chunksize
995 self.assertEquals(f.tell(), 0)
996 self.assertEquals(f.read(), u"abc")
997 cookie = f.tell()
998 self.assertEquals(f.seek(0), 0)
999 self.assertEquals(f.read(2), u"ab")
1000 self.assertEquals(f.read(1), u"c")
1001 self.assertEquals(f.read(1), u"")
1002 self.assertEquals(f.read(), u"")
1003 self.assertEquals(f.tell(), cookie)
1004 self.assertEquals(f.seek(0), 0)
1005 self.assertEquals(f.seek(0, 2), cookie)
1006 self.assertEquals(f.write(u"def"), 3)
1007 self.assertEquals(f.seek(cookie), cookie)
1008 self.assertEquals(f.read(), u"def")
1009 if enc.startswith("utf"):
1010 self.multi_line_test(f, enc)
1011 f.close()
1012
1013 def multi_line_test(self, f, enc):
1014 f.seek(0)
1015 f.truncate()
1016 sample = u"s\xff\u0fff\uffff"
1017 wlines = []
1018 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1019 chars = []
1020 for i in range(size):
1021 chars.append(sample[i % len(sample)])
1022 line = u"".join(chars) + u"\n"
1023 wlines.append((f.tell(), line))
1024 f.write(line)
1025 f.seek(0)
1026 rlines = []
1027 while True:
1028 pos = f.tell()
1029 line = f.readline()
1030 if not line:
1031 break
1032 rlines.append((pos, line))
1033 self.assertEquals(rlines, wlines)
1034
1035 def testTelling(self):
1036 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
1037 p0 = f.tell()
1038 f.write(u"\xff\n")
1039 p1 = f.tell()
1040 f.write(u"\xff\n")
1041 p2 = f.tell()
1042 f.seek(0)
1043 self.assertEquals(f.tell(), p0)
1044 self.assertEquals(f.readline(), u"\xff\n")
1045 self.assertEquals(f.tell(), p1)
1046 self.assertEquals(f.readline(), u"\xff\n")
1047 self.assertEquals(f.tell(), p2)
1048 f.seek(0)
1049 for line in f:
1050 self.assertEquals(line, u"\xff\n")
1051 self.assertRaises(IOError, f.tell)
1052 self.assertEquals(f.tell(), p2)
1053 f.close()
1054
1055 def testSeeking(self):
1056 chunk_size = io.TextIOWrapper._CHUNK_SIZE
1057 prefix_size = chunk_size - 2
1058 u_prefix = "a" * prefix_size
1059 prefix = bytes(u_prefix.encode("utf-8"))
1060 self.assertEquals(len(u_prefix), len(prefix))
1061 u_suffix = "\u8888\n"
1062 suffix = bytes(u_suffix.encode("utf-8"))
1063 line = prefix + suffix
1064 f = io.open(test_support.TESTFN, "wb")
1065 f.write(line*2)
1066 f.close()
1067 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1068 s = f.read(prefix_size)
1069 self.assertEquals(s, unicode(prefix, "ascii"))
1070 self.assertEquals(f.tell(), prefix_size)
1071 self.assertEquals(f.readline(), u_suffix)
1072
1073 def testSeekingToo(self):
1074 # Regression test for a specific bug
1075 data = b'\xe0\xbf\xbf\n'
1076 f = io.open(test_support.TESTFN, "wb")
1077 f.write(data)
1078 f.close()
1079 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1080 f._CHUNK_SIZE # Just test that it exists
1081 f._CHUNK_SIZE = 2
1082 f.readline()
1083 f.tell()
1084
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001085 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001086 """Test seek/tell using the StatefulIncrementalDecoder."""
1087
Christian Heimes1a6387e2008-03-26 12:49:49 +00001088 def testSeekAndTellWithData(data, min_pos=0):
1089 """Tell/seek to various points within a data stream and ensure
1090 that the decoded data returned by read() is consistent."""
1091 f = io.open(test_support.TESTFN, 'wb')
1092 f.write(data)
1093 f.close()
1094 f = io.open(test_support.TESTFN, encoding='test_decoder')
1095 decoded = f.read()
1096 f.close()
1097
1098 for i in range(min_pos, len(decoded) + 1): # seek positions
1099 for j in [1, 5, len(decoded) - i]: # read lengths
1100 f = io.open(test_support.TESTFN, encoding='test_decoder')
1101 self.assertEquals(f.read(i), decoded[:i])
1102 cookie = f.tell()
1103 self.assertEquals(f.read(j), decoded[i:i + j])
1104 f.seek(cookie)
1105 self.assertEquals(f.read(), decoded[i:])
1106 f.close()
1107
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001108 # Enable the test decoder.
1109 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001110
1111 # Run the tests.
1112 try:
1113 # Try each test case.
1114 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1115 testSeekAndTellWithData(input)
1116
1117 # Position each test case so that it crosses a chunk boundary.
1118 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1119 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1120 offset = CHUNK_SIZE - len(input)//2
1121 prefix = b'.'*offset
1122 # Don't bother seeking into the prefix (takes too long).
1123 min_pos = offset*2
1124 testSeekAndTellWithData(prefix + input, min_pos)
1125
1126 # Ensure our test decoder won't interfere with subsequent tests.
1127 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001128 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001129
1130 def testEncodedWrites(self):
1131 data = u"1234567890"
1132 tests = ("utf-16",
1133 "utf-16-le",
1134 "utf-16-be",
1135 "utf-32",
1136 "utf-32-le",
1137 "utf-32-be")
1138 for encoding in tests:
1139 buf = io.BytesIO()
1140 f = io.TextIOWrapper(buf, encoding=encoding)
1141 # Check if the BOM is written only once (see issue1753).
1142 f.write(data)
1143 f.write(data)
1144 f.seek(0)
1145 self.assertEquals(f.read(), data * 2)
1146 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1147
1148 def timingTest(self):
1149 timer = time.time
1150 enc = "utf8"
1151 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1152 nlines = 10000
1153 nchars = len(line)
1154 nbytes = len(line.encode(enc))
1155 for chunk_size in (32, 64, 128, 256):
1156 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1157 f._CHUNK_SIZE = chunk_size
1158 t0 = timer()
1159 for i in range(nlines):
1160 f.write(line)
1161 f.flush()
1162 t1 = timer()
1163 f.seek(0)
1164 for line in f:
1165 pass
1166 t2 = timer()
1167 f.seek(0)
1168 while f.readline():
1169 pass
1170 t3 = timer()
1171 f.seek(0)
1172 while f.readline():
1173 f.tell()
1174 t4 = timer()
1175 f.close()
1176 if test_support.verbose:
1177 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1178 (nlines, nchars, nbytes))
1179 print("File chunk size: %6s" % f._CHUNK_SIZE)
1180 print("Writing: %6.3f seconds" % (t1-t0))
1181 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1182 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1183 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1184
1185 def testReadOneByOne(self):
1186 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1187 reads = ""
1188 while True:
1189 c = txt.read(1)
1190 if not c:
1191 break
1192 reads += c
1193 self.assertEquals(reads, "AA\nBB")
1194
1195 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1196 def testReadByChunk(self):
1197 # make sure "\r\n" straddles 128 char boundary.
1198 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1199 reads = ""
1200 while True:
1201 c = txt.read(128)
1202 if not c:
1203 break
1204 reads += c
1205 self.assertEquals(reads, "A"*127+"\nB")
1206
1207 def test_issue1395_1(self):
1208 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1209
1210 # read one char at a time
1211 reads = ""
1212 while True:
1213 c = txt.read(1)
1214 if not c:
1215 break
1216 reads += c
1217 self.assertEquals(reads, self.normalized)
1218
1219 def test_issue1395_2(self):
1220 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1221 txt._CHUNK_SIZE = 4
1222
1223 reads = ""
1224 while True:
1225 c = txt.read(4)
1226 if not c:
1227 break
1228 reads += c
1229 self.assertEquals(reads, self.normalized)
1230
1231 def test_issue1395_3(self):
1232 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1233 txt._CHUNK_SIZE = 4
1234
1235 reads = txt.read(4)
1236 reads += txt.read(4)
1237 reads += txt.readline()
1238 reads += txt.readline()
1239 reads += txt.readline()
1240 self.assertEquals(reads, self.normalized)
1241
1242 def test_issue1395_4(self):
1243 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1244 txt._CHUNK_SIZE = 4
1245
1246 reads = txt.read(4)
1247 reads += txt.read()
1248 self.assertEquals(reads, self.normalized)
1249
1250 def test_issue1395_5(self):
1251 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1252 txt._CHUNK_SIZE = 4
1253
1254 reads = txt.read(4)
1255 pos = txt.tell()
1256 txt.seek(0)
1257 txt.seek(pos)
1258 self.assertEquals(txt.read(4), "BBB\n")
1259
1260 def test_issue2282(self):
1261 buffer = io.BytesIO(self.testdata)
1262 txt = io.TextIOWrapper(buffer, encoding="ascii")
1263
1264 self.assertEqual(buffer.seekable(), txt.seekable())
1265
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001266 def check_newline_decoder_utf8(self, decoder):
1267 # UTF-8 specific tests for a newline decoder
1268 def _check_decode(b, s, **kwargs):
1269 # We exercise getstate() / setstate() as well as decode()
1270 state = decoder.getstate()
1271 self.assertEquals(decoder.decode(b, **kwargs), s)
1272 decoder.setstate(state)
1273 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001274
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001275 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001276
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001277 _check_decode(b'\xe8', "")
1278 _check_decode(b'\xa2', "")
1279 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001280
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001281 _check_decode(b'\xe8', "")
1282 _check_decode(b'\xa2', "")
1283 _check_decode(b'\x88', "\u8888")
1284
1285 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001286 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1287
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001288 decoder.reset()
1289 _check_decode(b'\n', "\n")
1290 _check_decode(b'\r', "")
1291 _check_decode(b'', "\n", final=True)
1292 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001293
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001294 _check_decode(b'\r', "")
1295 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001296
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001297 _check_decode(b'\r\r\n', "\n\n")
1298 _check_decode(b'\r', "")
1299 _check_decode(b'\r', "\n")
1300 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001301
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001302 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1303 _check_decode(b'\xe8\xa2\x88', "\u8888")
1304 _check_decode(b'\n', "\n")
1305 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1306 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001307
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001308 def check_newline_decoder(self, decoder, encoding):
1309 result = []
1310 encoder = codecs.getincrementalencoder(encoding)()
1311 def _decode_bytewise(s):
1312 for b in encoder.encode(s):
1313 result.append(decoder.decode(b))
1314 self.assertEquals(decoder.newlines, None)
1315 _decode_bytewise("abc\n\r")
1316 self.assertEquals(decoder.newlines, '\n')
1317 _decode_bytewise("\nabc")
1318 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1319 _decode_bytewise("abc\r")
1320 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1321 _decode_bytewise("abc")
1322 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1323 _decode_bytewise("abc\r")
1324 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1325 decoder.reset()
1326 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1327 self.assertEquals(decoder.newlines, None)
1328
1329 def test_newline_decoder(self):
1330 encodings = (
1331 'utf-8', 'latin-1',
1332 'utf-16', 'utf-16-le', 'utf-16-be',
1333 'utf-32', 'utf-32-le', 'utf-32-be',
1334 )
1335 for enc in encodings:
1336 decoder = codecs.getincrementaldecoder(enc)()
1337 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1338 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001339 decoder = codecs.getincrementaldecoder("utf-8")()
1340 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001341 self.check_newline_decoder_utf8(decoder)
1342
Antoine Pitrou01a255a2010-05-03 16:48:13 +00001343 def test_flush_error_on_close(self):
1344 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1345 def bad_flush():
1346 raise IOError()
1347 txt.flush = bad_flush
1348 self.assertRaises(IOError, txt.close) # exception not swallowed
1349
1350 def test_multi_close(self):
1351 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1352 txt.close()
1353 txt.close()
1354 txt.close()
1355 self.assertRaises(ValueError, txt.flush)
1356
Christian Heimes1a6387e2008-03-26 12:49:49 +00001357
1358# XXX Tests for open()
1359
1360class MiscIOTest(unittest.TestCase):
1361
Georg Brandld2094602008-12-05 08:51:30 +00001362 def tearDown(self):
1363 test_support.unlink(test_support.TESTFN)
1364
Christian Heimes1a6387e2008-03-26 12:49:49 +00001365 def testImport__all__(self):
1366 for name in io.__all__:
1367 obj = getattr(io, name, None)
1368 self.assert_(obj is not None, name)
1369 if name == "open":
1370 continue
1371 elif "error" in name.lower():
1372 self.assert_(issubclass(obj, Exception), name)
1373 else:
1374 self.assert_(issubclass(obj, io.IOBase))
1375
1376
Georg Brandld2094602008-12-05 08:51:30 +00001377 def test_attributes(self):
1378 f = io.open(test_support.TESTFN, "wb", buffering=0)
Georg Brandlfa71a902008-12-05 09:08:28 +00001379 self.assertEquals(f.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001380 f.close()
1381
1382 f = io.open(test_support.TESTFN, "U")
1383 self.assertEquals(f.name, test_support.TESTFN)
1384 self.assertEquals(f.buffer.name, test_support.TESTFN)
1385 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1386 self.assertEquals(f.mode, "U")
Georg Brandlfa71a902008-12-05 09:08:28 +00001387 self.assertEquals(f.buffer.mode, "rb")
1388 self.assertEquals(f.buffer.raw.mode, "rb")
Georg Brandld2094602008-12-05 08:51:30 +00001389 f.close()
1390
1391 f = io.open(test_support.TESTFN, "w+")
1392 self.assertEquals(f.mode, "w+")
Georg Brandlfa71a902008-12-05 09:08:28 +00001393 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1394 self.assertEquals(f.buffer.raw.mode, "rb+")
Georg Brandld2094602008-12-05 08:51:30 +00001395
1396 g = io.open(f.fileno(), "wb", closefd=False)
Georg Brandlfa71a902008-12-05 09:08:28 +00001397 self.assertEquals(g.mode, "wb")
1398 self.assertEquals(g.raw.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001399 self.assertEquals(g.name, f.fileno())
1400 self.assertEquals(g.raw.name, f.fileno())
1401 f.close()
1402 g.close()
1403
1404
Christian Heimes1a6387e2008-03-26 12:49:49 +00001405def test_main():
1406 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001407 BufferedReaderTest, BufferedWriterTest,
1408 BufferedRWPairTest, BufferedRandomTest,
1409 StatefulIncrementalDecoderTest,
1410 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001411
1412if __name__ == "__main__":
1413 unittest.main()