blob: c732b1fe6855634ea805fdc289f333af534491d9 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
Antoine Pitrouca5a06a2010-01-27 21:48:46 +000090
91 self.assertEqual(f.write(b"blah."), 5)
92 f.truncate(0)
93 self.assertEqual(f.tell(), 5)
94 f.seek(0)
95
Christian Heimes1a6387e2008-03-26 12:49:49 +000096 self.assertEqual(f.write(b"blah."), 5)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"Hello."), 6)
99 self.assertEqual(f.tell(), 6)
100 self.assertEqual(f.seek(-1, 1), 5)
101 self.assertEqual(f.tell(), 5)
102 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
103 self.assertEqual(f.seek(0), 0)
104 self.assertEqual(f.write(b"h"), 1)
105 self.assertEqual(f.seek(-1, 2), 13)
106 self.assertEqual(f.tell(), 13)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000107
Christian Heimes1a6387e2008-03-26 12:49:49 +0000108 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000109 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000110 self.assertRaises(TypeError, f.seek, 0.0)
111
112 def read_ops(self, f, buffered=False):
113 data = f.read(5)
114 self.assertEqual(data, b"hello")
115 data = bytearray(data)
116 self.assertEqual(f.readinto(data), 5)
117 self.assertEqual(data, b" worl")
118 self.assertEqual(f.readinto(data), 2)
119 self.assertEqual(len(data), 5)
120 self.assertEqual(data[:2], b"d\n")
121 self.assertEqual(f.seek(0), 0)
122 self.assertEqual(f.read(20), b"hello world\n")
123 self.assertEqual(f.read(1), b"")
124 self.assertEqual(f.readinto(bytearray(b"x")), 0)
125 self.assertEqual(f.seek(-6, 2), 6)
126 self.assertEqual(f.read(5), b"world")
127 self.assertEqual(f.read(0), b"")
128 self.assertEqual(f.readinto(bytearray()), 0)
129 self.assertEqual(f.seek(-6, 1), 5)
130 self.assertEqual(f.read(5), b" worl")
131 self.assertEqual(f.tell(), 10)
132 self.assertRaises(TypeError, f.seek, 0.0)
133 if buffered:
134 f.seek(0)
135 self.assertEqual(f.read(), b"hello world\n")
136 f.seek(6)
137 self.assertEqual(f.read(), b"world\n")
138 self.assertEqual(f.read(), b"")
139
140 LARGE = 2**31
141
142 def large_file_ops(self, f):
143 assert f.readable()
144 assert f.writable()
145 self.assertEqual(f.seek(self.LARGE), self.LARGE)
146 self.assertEqual(f.tell(), self.LARGE)
147 self.assertEqual(f.write(b"xxx"), 3)
148 self.assertEqual(f.tell(), self.LARGE + 3)
149 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
150 self.assertEqual(f.truncate(), self.LARGE + 2)
151 self.assertEqual(f.tell(), self.LARGE + 2)
152 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
153 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000154 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000155 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
156 self.assertEqual(f.seek(-1, 2), self.LARGE)
157 self.assertEqual(f.read(2), b"x")
158
159 def test_raw_file_io(self):
160 f = io.open(test_support.TESTFN, "wb", buffering=0)
161 self.assertEqual(f.readable(), False)
162 self.assertEqual(f.writable(), True)
163 self.assertEqual(f.seekable(), True)
164 self.write_ops(f)
165 f.close()
166 f = io.open(test_support.TESTFN, "rb", buffering=0)
167 self.assertEqual(f.readable(), True)
168 self.assertEqual(f.writable(), False)
169 self.assertEqual(f.seekable(), True)
170 self.read_ops(f)
171 f.close()
172
173 def test_buffered_file_io(self):
174 f = io.open(test_support.TESTFN, "wb")
175 self.assertEqual(f.readable(), False)
176 self.assertEqual(f.writable(), True)
177 self.assertEqual(f.seekable(), True)
178 self.write_ops(f)
179 f.close()
180 f = io.open(test_support.TESTFN, "rb")
181 self.assertEqual(f.readable(), True)
182 self.assertEqual(f.writable(), False)
183 self.assertEqual(f.seekable(), True)
184 self.read_ops(f, True)
185 f.close()
186
187 def test_readline(self):
188 f = io.open(test_support.TESTFN, "wb")
189 f.write(b"abc\ndef\nxyzzy\nfoo")
190 f.close()
191 f = io.open(test_support.TESTFN, "rb")
192 self.assertEqual(f.readline(), b"abc\n")
193 self.assertEqual(f.readline(10), b"def\n")
194 self.assertEqual(f.readline(2), b"xy")
195 self.assertEqual(f.readline(4), b"zzy\n")
196 self.assertEqual(f.readline(), b"foo")
197 f.close()
198
199 def test_raw_bytes_io(self):
200 f = io.BytesIO()
201 self.write_ops(f)
202 data = f.getvalue()
203 self.assertEqual(data, b"hello world\n")
204 f = io.BytesIO(data)
205 self.read_ops(f, True)
206
207 def test_large_file_ops(self):
208 # On Windows and Mac OSX this test comsumes large resources; It takes
209 # a long time to build the >2GB file and takes >2GB of disk space
210 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000211 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000212 if not test_support.is_resource_enabled("largefile"):
213 print("\nTesting large file ops skipped on %s." % sys.platform,
214 file=sys.stderr)
215 print("It requires %d bytes and a long time." % self.LARGE,
216 file=sys.stderr)
217 print("Use 'regrtest.py -u largefile test_io' to run it.",
218 file=sys.stderr)
219 return
220 f = io.open(test_support.TESTFN, "w+b", 0)
221 self.large_file_ops(f)
222 f.close()
223 f = io.open(test_support.TESTFN, "w+b")
224 self.large_file_ops(f)
225 f.close()
226
227 def test_with_open(self):
228 for bufsize in (0, 1, 100):
229 f = None
230 with open(test_support.TESTFN, "wb", bufsize) as f:
231 f.write(b"xxx")
232 self.assertEqual(f.closed, True)
233 f = None
234 try:
235 with open(test_support.TESTFN, "wb", bufsize) as f:
236 1/0
237 except ZeroDivisionError:
238 self.assertEqual(f.closed, True)
239 else:
240 self.fail("1/0 didn't raise an exception")
241
Antoine Pitrou19fec8b2009-01-21 00:56:37 +0000242 # issue 5008
243 def test_append_mode_tell(self):
244 with io.open(test_support.TESTFN, "wb") as f:
245 f.write(b"xxx")
246 with io.open(test_support.TESTFN, "ab", buffering=0) as f:
247 self.assertEqual(f.tell(), 3)
248 with io.open(test_support.TESTFN, "ab") as f:
249 self.assertEqual(f.tell(), 3)
250 with io.open(test_support.TESTFN, "a") as f:
251 self.assert_(f.tell() > 0)
252
Christian Heimes1a6387e2008-03-26 12:49:49 +0000253 def test_destructor(self):
254 record = []
255 class MyFileIO(io.FileIO):
256 def __del__(self):
257 record.append(1)
258 io.FileIO.__del__(self)
259 def close(self):
260 record.append(2)
261 io.FileIO.close(self)
262 def flush(self):
263 record.append(3)
264 io.FileIO.flush(self)
265 f = MyFileIO(test_support.TESTFN, "w")
266 f.write("xxx")
267 del f
268 self.assertEqual(record, [1, 2, 3])
269
270 def test_close_flushes(self):
271 f = io.open(test_support.TESTFN, "wb")
272 f.write(b"xxx")
273 f.close()
274 f = io.open(test_support.TESTFN, "rb")
275 self.assertEqual(f.read(), b"xxx")
276 f.close()
277
278 def XXXtest_array_writes(self):
279 # XXX memory view not available yet
280 a = array.array('i', range(10))
281 n = len(memoryview(a))
282 f = io.open(test_support.TESTFN, "wb", 0)
283 self.assertEqual(f.write(a), n)
284 f.close()
285 f = io.open(test_support.TESTFN, "wb")
286 self.assertEqual(f.write(a), n)
287 f.close()
288
289 def test_closefd(self):
290 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
291 closefd=False)
292
Georg Brandld2094602008-12-05 08:51:30 +0000293 def testReadClosed(self):
294 with io.open(test_support.TESTFN, "w") as f:
295 f.write("egg\n")
296 with io.open(test_support.TESTFN, "r") as f:
297 file = io.open(f.fileno(), "r", closefd=False)
298 self.assertEqual(file.read(), "egg\n")
299 file.seek(0)
300 file.close()
301 self.assertRaises(ValueError, file.read)
302
303 def test_no_closefd_with_filename(self):
304 # can't use closefd in combination with a file name
305 self.assertRaises(ValueError,
306 io.open, test_support.TESTFN, "r", closefd=False)
307
308 def test_closefd_attr(self):
309 with io.open(test_support.TESTFN, "wb") as f:
310 f.write(b"egg\n")
311 with io.open(test_support.TESTFN, "r") as f:
312 self.assertEqual(f.buffer.raw.closefd, True)
313 file = io.open(f.fileno(), "r", closefd=False)
314 self.assertEqual(file.buffer.raw.closefd, False)
315
Antoine Pitrou01a255a2010-05-03 16:48:13 +0000316 def test_flush_error_on_close(self):
317 f = io.open(test_support.TESTFN, "wb", buffering=0)
318 def bad_flush():
319 raise IOError()
320 f.flush = bad_flush
321 self.assertRaises(IOError, f.close) # exception not swallowed
322
323 def test_multi_close(self):
324 f = io.open(test_support.TESTFN, "wb", buffering=0)
325 f.close()
326 f.close()
327 f.close()
328 self.assertRaises(ValueError, f.flush)
329
Georg Brandld2094602008-12-05 08:51:30 +0000330
Christian Heimes1a6387e2008-03-26 12:49:49 +0000331class MemorySeekTestMixin:
332
333 def testInit(self):
334 buf = self.buftype("1234567890")
335 bytesIo = self.ioclass(buf)
336
337 def testRead(self):
338 buf = self.buftype("1234567890")
339 bytesIo = self.ioclass(buf)
340
341 self.assertEquals(buf[:1], bytesIo.read(1))
342 self.assertEquals(buf[1:5], bytesIo.read(4))
343 self.assertEquals(buf[5:], bytesIo.read(900))
344 self.assertEquals(self.EOF, bytesIo.read())
345
346 def testReadNoArgs(self):
347 buf = self.buftype("1234567890")
348 bytesIo = self.ioclass(buf)
349
350 self.assertEquals(buf, bytesIo.read())
351 self.assertEquals(self.EOF, bytesIo.read())
352
353 def testSeek(self):
354 buf = self.buftype("1234567890")
355 bytesIo = self.ioclass(buf)
356
357 bytesIo.read(5)
358 bytesIo.seek(0)
359 self.assertEquals(buf, bytesIo.read())
360
361 bytesIo.seek(3)
362 self.assertEquals(buf[3:], bytesIo.read())
363 self.assertRaises(TypeError, bytesIo.seek, 0.0)
364
365 def testTell(self):
366 buf = self.buftype("1234567890")
367 bytesIo = self.ioclass(buf)
368
369 self.assertEquals(0, bytesIo.tell())
370 bytesIo.seek(5)
371 self.assertEquals(5, bytesIo.tell())
372 bytesIo.seek(10000)
373 self.assertEquals(10000, bytesIo.tell())
374
375
376class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
377 @staticmethod
378 def buftype(s):
379 return s.encode("utf-8")
380 ioclass = io.BytesIO
381 EOF = b""
382
383
384class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
385 buftype = str
386 ioclass = io.StringIO
387 EOF = ""
388
389
390class BufferedReaderTest(unittest.TestCase):
391
392 def testRead(self):
393 rawio = MockRawIO((b"abc", b"d", b"efg"))
394 bufio = io.BufferedReader(rawio)
395
396 self.assertEquals(b"abcdef", bufio.read(6))
397
398 def testBuffering(self):
399 data = b"abcdefghi"
400 dlen = len(data)
401
402 tests = [
403 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
404 [ 100, [ 3, 3, 3], [ dlen ] ],
405 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
406 ]
407
408 for bufsize, buf_read_sizes, raw_read_sizes in tests:
409 rawio = MockFileIO(data)
410 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
411 pos = 0
412 for nbytes in buf_read_sizes:
413 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
414 pos += nbytes
415 self.assertEquals(rawio.read_history, raw_read_sizes)
416
417 def testReadNonBlocking(self):
418 # Inject some None's in there to simulate EWOULDBLOCK
419 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
420 bufio = io.BufferedReader(rawio)
421
422 self.assertEquals(b"abcd", bufio.read(6))
423 self.assertEquals(b"e", bufio.read(1))
424 self.assertEquals(b"fg", bufio.read())
425 self.assert_(None is bufio.read())
426 self.assertEquals(b"", bufio.read())
427
428 def testReadToEof(self):
429 rawio = MockRawIO((b"abc", b"d", b"efg"))
430 bufio = io.BufferedReader(rawio)
431
432 self.assertEquals(b"abcdefg", bufio.read(9000))
433
434 def testReadNoArgs(self):
435 rawio = MockRawIO((b"abc", b"d", b"efg"))
436 bufio = io.BufferedReader(rawio)
437
438 self.assertEquals(b"abcdefg", bufio.read())
439
440 def testFileno(self):
441 rawio = MockRawIO((b"abc", b"d", b"efg"))
442 bufio = io.BufferedReader(rawio)
443
444 self.assertEquals(42, bufio.fileno())
445
446 def testFilenoNoFileno(self):
447 # XXX will we always have fileno() function? If so, kill
448 # this test. Else, write it.
449 pass
450
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000451 def testThreads(self):
452 try:
453 # Write out many bytes with exactly the same number of 0's,
454 # 1's... 255's. This will help us check that concurrent reading
455 # doesn't duplicate or forget contents.
456 N = 1000
457 l = range(256) * N
458 random.shuffle(l)
459 s = bytes(bytearray(l))
460 with io.open(test_support.TESTFN, "wb") as f:
461 f.write(s)
462 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
463 bufio = io.BufferedReader(raw, 8)
464 errors = []
465 results = []
466 def f():
467 try:
468 # Intra-buffer read then buffer-flushing read
469 for n in cycle([1, 19]):
470 s = bufio.read(n)
471 if not s:
472 break
473 # list.append() is atomic
474 results.append(s)
475 except Exception as e:
476 errors.append(e)
477 raise
478 threads = [threading.Thread(target=f) for x in range(20)]
479 for t in threads:
480 t.start()
481 time.sleep(0.02) # yield
482 for t in threads:
483 t.join()
484 self.assertFalse(errors,
485 "the following exceptions were caught: %r" % errors)
486 s = b''.join(results)
487 for i in range(256):
488 c = bytes(bytearray([i]))
489 self.assertEqual(s.count(c), N)
490 finally:
491 test_support.unlink(test_support.TESTFN)
492
493
Christian Heimes1a6387e2008-03-26 12:49:49 +0000494
495class BufferedWriterTest(unittest.TestCase):
496
497 def testWrite(self):
498 # Write to the buffered IO but don't overflow the buffer.
499 writer = MockRawIO()
500 bufio = io.BufferedWriter(writer, 8)
501
502 bufio.write(b"abc")
503
504 self.assertFalse(writer._write_stack)
505
506 def testWriteOverflow(self):
507 writer = MockRawIO()
508 bufio = io.BufferedWriter(writer, 8)
509
510 bufio.write(b"abc")
511 bufio.write(b"defghijkl")
512
513 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
514
515 def testWriteNonBlocking(self):
516 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
517 bufio = io.BufferedWriter(raw, 8, 16)
518
519 bufio.write(b"asdf")
520 bufio.write(b"asdfa")
521 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
522
523 bufio.write(b"asdfasdfasdf")
524 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
525 bufio.write(b"asdfasdfasdf")
526 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
527 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
528
529 bufio.write(b"asdfasdfasdf")
530
531 # XXX I don't like this test. It relies too heavily on how the
532 # algorithm actually works, which we might change. Refactor
533 # later.
534
535 def testFileno(self):
536 rawio = MockRawIO((b"abc", b"d", b"efg"))
537 bufio = io.BufferedWriter(rawio)
538
539 self.assertEquals(42, bufio.fileno())
540
541 def testFlush(self):
542 writer = MockRawIO()
543 bufio = io.BufferedWriter(writer, 8)
544
545 bufio.write(b"abc")
546 bufio.flush()
547
548 self.assertEquals(b"abc", writer._write_stack[0])
549
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000550 def testThreads(self):
551 # BufferedWriter should not raise exceptions or crash
552 # when called from multiple threads.
553 try:
554 # We use a real file object because it allows us to
555 # exercise situations where the GIL is released before
556 # writing the buffer to the raw streams. This is in addition
557 # to concurrency issues due to switching threads in the middle
558 # of Python code.
559 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
560 bufio = io.BufferedWriter(raw, 8)
561 errors = []
562 def f():
563 try:
564 # Write enough bytes to flush the buffer
565 s = b"a" * 19
566 for i in range(50):
567 bufio.write(s)
568 except Exception as e:
569 errors.append(e)
570 raise
571 threads = [threading.Thread(target=f) for x in range(20)]
572 for t in threads:
573 t.start()
574 time.sleep(0.02) # yield
575 for t in threads:
576 t.join()
577 self.assertFalse(errors,
578 "the following exceptions were caught: %r" % errors)
579 finally:
580 test_support.unlink(test_support.TESTFN)
581
Antoine Pitrou01a255a2010-05-03 16:48:13 +0000582 def test_flush_error_on_close(self):
583 raw = MockRawIO()
584 def bad_flush():
585 raise IOError()
586 raw.flush = bad_flush
587 b = io.BufferedWriter(raw)
588 self.assertRaises(IOError, b.close) # exception not swallowed
589
590 def test_multi_close(self):
591 raw = MockRawIO()
592 b = io.BufferedWriter(raw)
593 b.close()
594 b.close()
595 b.close()
596 self.assertRaises(ValueError, b.flush)
597
Christian Heimes1a6387e2008-03-26 12:49:49 +0000598
599class BufferedRWPairTest(unittest.TestCase):
600
601 def testRWPair(self):
602 r = MockRawIO(())
603 w = MockRawIO()
604 pair = io.BufferedRWPair(r, w)
Benjamin Peterson828a7062008-12-27 17:05:29 +0000605 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000606
Benjamin Peterson828a7062008-12-27 17:05:29 +0000607 # XXX More Tests
Christian Heimes1a6387e2008-03-26 12:49:49 +0000608
609
610class BufferedRandomTest(unittest.TestCase):
611
612 def testReadAndWrite(self):
613 raw = MockRawIO((b"asdf", b"ghjk"))
614 rw = io.BufferedRandom(raw, 8, 12)
615
616 self.assertEqual(b"as", rw.read(2))
617 rw.write(b"ddd")
618 rw.write(b"eee")
619 self.assertFalse(raw._write_stack) # Buffer writes
620 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
621 self.assertEquals(b"dddeee", raw._write_stack[0])
622
623 def testSeekAndTell(self):
624 raw = io.BytesIO(b"asdfghjkl")
625 rw = io.BufferedRandom(raw)
626
627 self.assertEquals(b"as", rw.read(2))
628 self.assertEquals(2, rw.tell())
629 rw.seek(0, 0)
630 self.assertEquals(b"asdf", rw.read(4))
631
632 rw.write(b"asdf")
633 rw.seek(0, 0)
634 self.assertEquals(b"asdfasdfl", rw.read())
635 self.assertEquals(9, rw.tell())
636 rw.seek(-4, 2)
637 self.assertEquals(5, rw.tell())
638 rw.seek(2, 1)
639 self.assertEquals(7, rw.tell())
640 self.assertEquals(b"fl", rw.read(11))
641 self.assertRaises(TypeError, rw.seek, 0.0)
642
643# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
644# properties:
645# - A single output character can correspond to many bytes of input.
646# - The number of input bytes to complete the character can be
647# undetermined until the last input byte is received.
648# - The number of input bytes can vary depending on previous input.
649# - A single input byte can correspond to many characters of output.
650# - The number of output characters can be undetermined until the
651# last input byte is received.
652# - The number of output characters can vary depending on previous input.
653
654class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
655 """
656 For testing seek/tell behavior with a stateful, buffering decoder.
657
658 Input is a sequence of words. Words may be fixed-length (length set
659 by input) or variable-length (period-terminated). In variable-length
660 mode, extra periods are ignored. Possible words are:
661 - 'i' followed by a number sets the input length, I (maximum 99).
662 When I is set to 0, words are space-terminated.
663 - 'o' followed by a number sets the output length, O (maximum 99).
664 - Any other word is converted into a word followed by a period on
665 the output. The output word consists of the input word truncated
666 or padded out with hyphens to make its length equal to O. If O
667 is 0, the word is output verbatim without truncating or padding.
668 I and O are initially set to 1. When I changes, any buffered input is
669 re-scanned according to the new I. EOF also terminates the last word.
670 """
671
672 def __init__(self, errors='strict'):
673 codecs.IncrementalDecoder.__init__(self, errors)
674 self.reset()
675
676 def __repr__(self):
677 return '<SID %x>' % id(self)
678
679 def reset(self):
680 self.i = 1
681 self.o = 1
682 self.buffer = bytearray()
683
684 def getstate(self):
685 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
686 return bytes(self.buffer), i*100 + o
687
688 def setstate(self, state):
689 buffer, io = state
690 self.buffer = bytearray(buffer)
691 i, o = divmod(io, 100)
692 self.i, self.o = i ^ 1, o ^ 1
693
694 def decode(self, input, final=False):
695 output = ''
696 for b in input:
697 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000698 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000699 if self.buffer:
700 output += self.process_word()
701 else:
702 self.buffer.append(b)
703 else: # fixed-length, terminate after self.i bytes
704 self.buffer.append(b)
705 if len(self.buffer) == self.i:
706 output += self.process_word()
707 if final and self.buffer: # EOF terminates the last word
708 output += self.process_word()
709 return output
710
711 def process_word(self):
712 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000713 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000715 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000716 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
717 else:
718 output = self.buffer.decode('ascii')
719 if len(output) < self.o:
720 output += '-'*self.o # pad out with hyphens
721 if self.o:
722 output = output[:self.o] # truncate to output length
723 output += '.'
724 self.buffer = bytearray()
725 return output
726
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000727 codecEnabled = False
728
729 @classmethod
730 def lookupTestDecoder(cls, name):
731 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000732 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000733 return codecs.CodecInfo(
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000734 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000735 incrementalencoder=None,
736 streamreader=None, streamwriter=None,
737 incrementaldecoder=cls)
738
739# Register the previous decoder for testing.
740# Disabled by default, tests will enable it.
741codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
742
743
Christian Heimes1a6387e2008-03-26 12:49:49 +0000744class StatefulIncrementalDecoderTest(unittest.TestCase):
745 """
746 Make sure the StatefulIncrementalDecoder actually works.
747 """
748
749 test_cases = [
750 # I=1, O=1 (fixed-length input == fixed-length output)
751 (b'abcd', False, 'a.b.c.d.'),
752 # I=0, O=0 (variable-length input, variable-length output)
753 (b'oiabcd', True, 'abcd.'),
754 # I=0, O=0 (should ignore extra periods)
755 (b'oi...abcd...', True, 'abcd.'),
756 # I=0, O=6 (variable-length input, fixed-length output)
757 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
758 # I=2, O=6 (fixed-length input < fixed-length output)
759 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
760 # I=6, O=3 (fixed-length input > fixed-length output)
761 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
762 # I=0, then 3; O=29, then 15 (with longer output)
763 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
764 'a----------------------------.' +
765 'b----------------------------.' +
766 'cde--------------------------.' +
767 'abcdefghijabcde.' +
768 'a.b------------.' +
769 '.c.------------.' +
770 'd.e------------.' +
771 'k--------------.' +
772 'l--------------.' +
773 'm--------------.')
774 ]
775
776 def testDecoder(self):
777 # Try a few one-shot test cases.
778 for input, eof, output in self.test_cases:
779 d = StatefulIncrementalDecoder()
780 self.assertEquals(d.decode(input, eof), output)
781
782 # Also test an unfinished decode, followed by forcing EOF.
783 d = StatefulIncrementalDecoder()
784 self.assertEquals(d.decode(b'oiabcd'), '')
785 self.assertEquals(d.decode(b'', 1), 'abcd.')
786
787class TextIOWrapperTest(unittest.TestCase):
788
789 def setUp(self):
790 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
791 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
792
793 def tearDown(self):
794 test_support.unlink(test_support.TESTFN)
795
796 def testLineBuffering(self):
797 r = io.BytesIO()
798 b = io.BufferedWriter(r, 1000)
799 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
800 t.write(u"X")
801 self.assertEquals(r.getvalue(), b"") # No flush happened
802 t.write(u"Y\nZ")
803 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
804 t.write(u"A\rB")
805 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
806
807 def testEncodingErrorsReading(self):
808 # (1) default
809 b = io.BytesIO(b"abc\n\xff\n")
810 t = io.TextIOWrapper(b, encoding="ascii")
811 self.assertRaises(UnicodeError, t.read)
812 # (2) explicit strict
813 b = io.BytesIO(b"abc\n\xff\n")
814 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
815 self.assertRaises(UnicodeError, t.read)
816 # (3) ignore
817 b = io.BytesIO(b"abc\n\xff\n")
818 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
819 self.assertEquals(t.read(), "abc\n\n")
820 # (4) replace
821 b = io.BytesIO(b"abc\n\xff\n")
822 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
823 self.assertEquals(t.read(), u"abc\n\ufffd\n")
824
825 def testEncodingErrorsWriting(self):
826 # (1) default
827 b = io.BytesIO()
828 t = io.TextIOWrapper(b, encoding="ascii")
829 self.assertRaises(UnicodeError, t.write, u"\xff")
830 # (2) explicit strict
831 b = io.BytesIO()
832 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
833 self.assertRaises(UnicodeError, t.write, u"\xff")
834 # (3) ignore
835 b = io.BytesIO()
836 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
837 newline="\n")
838 t.write(u"abc\xffdef\n")
839 t.flush()
840 self.assertEquals(b.getvalue(), b"abcdef\n")
841 # (4) replace
842 b = io.BytesIO()
843 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
844 newline="\n")
845 t.write(u"abc\xffdef\n")
846 t.flush()
847 self.assertEquals(b.getvalue(), b"abc?def\n")
848
849 def testNewlinesInput(self):
850 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
851 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
852 for newline, expected in [
853 (None, normalized.decode("ascii").splitlines(True)),
854 ("", testdata.decode("ascii").splitlines(True)),
855 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
856 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
857 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
858 ]:
859 buf = io.BytesIO(testdata)
860 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
861 self.assertEquals(txt.readlines(), expected)
862 txt.seek(0)
863 self.assertEquals(txt.read(), "".join(expected))
864
865 def testNewlinesOutput(self):
866 testdict = {
867 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
868 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
869 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
870 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
871 }
872 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
873 for newline, expected in tests:
874 buf = io.BytesIO()
875 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
876 txt.write("AAA\nB")
877 txt.write("BB\nCCC\n")
878 txt.write("X\rY\r\nZ")
879 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000880 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000881 self.assertEquals(buf.getvalue(), expected)
882
883 def testNewlines(self):
884 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
885
886 tests = [
887 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
888 [ '', input_lines ],
889 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
890 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
891 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
892 ]
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000893 encodings = (
894 'utf-8', 'latin-1',
895 'utf-16', 'utf-16-le', 'utf-16-be',
896 'utf-32', 'utf-32-le', 'utf-32-be',
897 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000898
899 # Try a range of buffer sizes to test the case where \r is the last
900 # character in TextIOWrapper._pending_line.
901 for encoding in encodings:
902 # XXX: str.encode() should return bytes
903 data = bytes(''.join(input_lines).encode(encoding))
904 for do_reads in (False, True):
905 for bufsize in range(1, 10):
906 for newline, exp_lines in tests:
907 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
908 textio = io.TextIOWrapper(bufio, newline=newline,
909 encoding=encoding)
910 if do_reads:
911 got_lines = []
912 while True:
913 c2 = textio.read(2)
914 if c2 == '':
915 break
916 self.assertEquals(len(c2), 2)
917 got_lines.append(c2 + textio.readline())
918 else:
919 got_lines = list(textio)
920
921 for got_line, exp_line in zip(got_lines, exp_lines):
922 self.assertEquals(got_line, exp_line)
923 self.assertEquals(len(got_lines), len(exp_lines))
924
925 def testNewlinesInput(self):
926 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
927 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
928 for newline, expected in [
929 (None, normalized.decode("ascii").splitlines(True)),
930 ("", testdata.decode("ascii").splitlines(True)),
931 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
932 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
933 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
934 ]:
935 buf = io.BytesIO(testdata)
936 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
937 self.assertEquals(txt.readlines(), expected)
938 txt.seek(0)
939 self.assertEquals(txt.read(), "".join(expected))
940
941 def testNewlinesOutput(self):
942 data = u"AAA\nBBB\rCCC\n"
943 data_lf = b"AAA\nBBB\rCCC\n"
944 data_cr = b"AAA\rBBB\rCCC\r"
945 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
946 save_linesep = os.linesep
947 try:
948 for os.linesep, newline, expected in [
949 ("\n", None, data_lf),
950 ("\r\n", None, data_crlf),
951 ("\n", "", data_lf),
952 ("\r\n", "", data_lf),
953 ("\n", "\n", data_lf),
954 ("\r\n", "\n", data_lf),
955 ("\n", "\r", data_cr),
956 ("\r\n", "\r", data_cr),
957 ("\n", "\r\n", data_crlf),
958 ("\r\n", "\r\n", data_crlf),
959 ]:
960 buf = io.BytesIO()
961 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
962 txt.write(data)
963 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000964 self.assertEquals(buf.closed, True)
965 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000966 finally:
967 os.linesep = save_linesep
968
969 # Systematic tests of the text I/O API
970
971 def testBasicIO(self):
972 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
973 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
974 f = io.open(test_support.TESTFN, "w+", encoding=enc)
975 f._CHUNK_SIZE = chunksize
976 self.assertEquals(f.write(u"abc"), 3)
977 f.close()
978 f = io.open(test_support.TESTFN, "r+", encoding=enc)
979 f._CHUNK_SIZE = chunksize
980 self.assertEquals(f.tell(), 0)
981 self.assertEquals(f.read(), u"abc")
982 cookie = f.tell()
983 self.assertEquals(f.seek(0), 0)
984 self.assertEquals(f.read(2), u"ab")
985 self.assertEquals(f.read(1), u"c")
986 self.assertEquals(f.read(1), u"")
987 self.assertEquals(f.read(), u"")
988 self.assertEquals(f.tell(), cookie)
989 self.assertEquals(f.seek(0), 0)
990 self.assertEquals(f.seek(0, 2), cookie)
991 self.assertEquals(f.write(u"def"), 3)
992 self.assertEquals(f.seek(cookie), cookie)
993 self.assertEquals(f.read(), u"def")
994 if enc.startswith("utf"):
995 self.multi_line_test(f, enc)
996 f.close()
997
998 def multi_line_test(self, f, enc):
999 f.seek(0)
1000 f.truncate()
1001 sample = u"s\xff\u0fff\uffff"
1002 wlines = []
1003 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1004 chars = []
1005 for i in range(size):
1006 chars.append(sample[i % len(sample)])
1007 line = u"".join(chars) + u"\n"
1008 wlines.append((f.tell(), line))
1009 f.write(line)
1010 f.seek(0)
1011 rlines = []
1012 while True:
1013 pos = f.tell()
1014 line = f.readline()
1015 if not line:
1016 break
1017 rlines.append((pos, line))
1018 self.assertEquals(rlines, wlines)
1019
1020 def testTelling(self):
1021 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
1022 p0 = f.tell()
1023 f.write(u"\xff\n")
1024 p1 = f.tell()
1025 f.write(u"\xff\n")
1026 p2 = f.tell()
1027 f.seek(0)
1028 self.assertEquals(f.tell(), p0)
1029 self.assertEquals(f.readline(), u"\xff\n")
1030 self.assertEquals(f.tell(), p1)
1031 self.assertEquals(f.readline(), u"\xff\n")
1032 self.assertEquals(f.tell(), p2)
1033 f.seek(0)
1034 for line in f:
1035 self.assertEquals(line, u"\xff\n")
1036 self.assertRaises(IOError, f.tell)
1037 self.assertEquals(f.tell(), p2)
1038 f.close()
1039
1040 def testSeeking(self):
1041 chunk_size = io.TextIOWrapper._CHUNK_SIZE
1042 prefix_size = chunk_size - 2
1043 u_prefix = "a" * prefix_size
1044 prefix = bytes(u_prefix.encode("utf-8"))
1045 self.assertEquals(len(u_prefix), len(prefix))
1046 u_suffix = "\u8888\n"
1047 suffix = bytes(u_suffix.encode("utf-8"))
1048 line = prefix + suffix
1049 f = io.open(test_support.TESTFN, "wb")
1050 f.write(line*2)
1051 f.close()
1052 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1053 s = f.read(prefix_size)
1054 self.assertEquals(s, unicode(prefix, "ascii"))
1055 self.assertEquals(f.tell(), prefix_size)
1056 self.assertEquals(f.readline(), u_suffix)
1057
1058 def testSeekingToo(self):
1059 # Regression test for a specific bug
1060 data = b'\xe0\xbf\xbf\n'
1061 f = io.open(test_support.TESTFN, "wb")
1062 f.write(data)
1063 f.close()
1064 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1065 f._CHUNK_SIZE # Just test that it exists
1066 f._CHUNK_SIZE = 2
1067 f.readline()
1068 f.tell()
1069
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001070 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001071 """Test seek/tell using the StatefulIncrementalDecoder."""
1072
Christian Heimes1a6387e2008-03-26 12:49:49 +00001073 def testSeekAndTellWithData(data, min_pos=0):
1074 """Tell/seek to various points within a data stream and ensure
1075 that the decoded data returned by read() is consistent."""
1076 f = io.open(test_support.TESTFN, 'wb')
1077 f.write(data)
1078 f.close()
1079 f = io.open(test_support.TESTFN, encoding='test_decoder')
1080 decoded = f.read()
1081 f.close()
1082
1083 for i in range(min_pos, len(decoded) + 1): # seek positions
1084 for j in [1, 5, len(decoded) - i]: # read lengths
1085 f = io.open(test_support.TESTFN, encoding='test_decoder')
1086 self.assertEquals(f.read(i), decoded[:i])
1087 cookie = f.tell()
1088 self.assertEquals(f.read(j), decoded[i:i + j])
1089 f.seek(cookie)
1090 self.assertEquals(f.read(), decoded[i:])
1091 f.close()
1092
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001093 # Enable the test decoder.
1094 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001095
1096 # Run the tests.
1097 try:
1098 # Try each test case.
1099 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1100 testSeekAndTellWithData(input)
1101
1102 # Position each test case so that it crosses a chunk boundary.
1103 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1104 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1105 offset = CHUNK_SIZE - len(input)//2
1106 prefix = b'.'*offset
1107 # Don't bother seeking into the prefix (takes too long).
1108 min_pos = offset*2
1109 testSeekAndTellWithData(prefix + input, min_pos)
1110
1111 # Ensure our test decoder won't interfere with subsequent tests.
1112 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001113 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001114
1115 def testEncodedWrites(self):
1116 data = u"1234567890"
1117 tests = ("utf-16",
1118 "utf-16-le",
1119 "utf-16-be",
1120 "utf-32",
1121 "utf-32-le",
1122 "utf-32-be")
1123 for encoding in tests:
1124 buf = io.BytesIO()
1125 f = io.TextIOWrapper(buf, encoding=encoding)
1126 # Check if the BOM is written only once (see issue1753).
1127 f.write(data)
1128 f.write(data)
1129 f.seek(0)
1130 self.assertEquals(f.read(), data * 2)
1131 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1132
1133 def timingTest(self):
1134 timer = time.time
1135 enc = "utf8"
1136 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1137 nlines = 10000
1138 nchars = len(line)
1139 nbytes = len(line.encode(enc))
1140 for chunk_size in (32, 64, 128, 256):
1141 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1142 f._CHUNK_SIZE = chunk_size
1143 t0 = timer()
1144 for i in range(nlines):
1145 f.write(line)
1146 f.flush()
1147 t1 = timer()
1148 f.seek(0)
1149 for line in f:
1150 pass
1151 t2 = timer()
1152 f.seek(0)
1153 while f.readline():
1154 pass
1155 t3 = timer()
1156 f.seek(0)
1157 while f.readline():
1158 f.tell()
1159 t4 = timer()
1160 f.close()
1161 if test_support.verbose:
1162 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1163 (nlines, nchars, nbytes))
1164 print("File chunk size: %6s" % f._CHUNK_SIZE)
1165 print("Writing: %6.3f seconds" % (t1-t0))
1166 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1167 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1168 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1169
1170 def testReadOneByOne(self):
1171 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1172 reads = ""
1173 while True:
1174 c = txt.read(1)
1175 if not c:
1176 break
1177 reads += c
1178 self.assertEquals(reads, "AA\nBB")
1179
1180 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1181 def testReadByChunk(self):
1182 # make sure "\r\n" straddles 128 char boundary.
1183 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1184 reads = ""
1185 while True:
1186 c = txt.read(128)
1187 if not c:
1188 break
1189 reads += c
1190 self.assertEquals(reads, "A"*127+"\nB")
1191
1192 def test_issue1395_1(self):
1193 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1194
1195 # read one char at a time
1196 reads = ""
1197 while True:
1198 c = txt.read(1)
1199 if not c:
1200 break
1201 reads += c
1202 self.assertEquals(reads, self.normalized)
1203
1204 def test_issue1395_2(self):
1205 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1206 txt._CHUNK_SIZE = 4
1207
1208 reads = ""
1209 while True:
1210 c = txt.read(4)
1211 if not c:
1212 break
1213 reads += c
1214 self.assertEquals(reads, self.normalized)
1215
1216 def test_issue1395_3(self):
1217 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1218 txt._CHUNK_SIZE = 4
1219
1220 reads = txt.read(4)
1221 reads += txt.read(4)
1222 reads += txt.readline()
1223 reads += txt.readline()
1224 reads += txt.readline()
1225 self.assertEquals(reads, self.normalized)
1226
1227 def test_issue1395_4(self):
1228 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1229 txt._CHUNK_SIZE = 4
1230
1231 reads = txt.read(4)
1232 reads += txt.read()
1233 self.assertEquals(reads, self.normalized)
1234
1235 def test_issue1395_5(self):
1236 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1237 txt._CHUNK_SIZE = 4
1238
1239 reads = txt.read(4)
1240 pos = txt.tell()
1241 txt.seek(0)
1242 txt.seek(pos)
1243 self.assertEquals(txt.read(4), "BBB\n")
1244
1245 def test_issue2282(self):
1246 buffer = io.BytesIO(self.testdata)
1247 txt = io.TextIOWrapper(buffer, encoding="ascii")
1248
1249 self.assertEqual(buffer.seekable(), txt.seekable())
1250
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001251 def check_newline_decoder_utf8(self, decoder):
1252 # UTF-8 specific tests for a newline decoder
1253 def _check_decode(b, s, **kwargs):
1254 # We exercise getstate() / setstate() as well as decode()
1255 state = decoder.getstate()
1256 self.assertEquals(decoder.decode(b, **kwargs), s)
1257 decoder.setstate(state)
1258 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001259
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001260 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001261
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001262 _check_decode(b'\xe8', "")
1263 _check_decode(b'\xa2', "")
1264 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001265
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001266 _check_decode(b'\xe8', "")
1267 _check_decode(b'\xa2', "")
1268 _check_decode(b'\x88', "\u8888")
1269
1270 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001271 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1272
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001273 decoder.reset()
1274 _check_decode(b'\n', "\n")
1275 _check_decode(b'\r', "")
1276 _check_decode(b'', "\n", final=True)
1277 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001278
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001279 _check_decode(b'\r', "")
1280 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001281
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001282 _check_decode(b'\r\r\n', "\n\n")
1283 _check_decode(b'\r', "")
1284 _check_decode(b'\r', "\n")
1285 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001286
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001287 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1288 _check_decode(b'\xe8\xa2\x88', "\u8888")
1289 _check_decode(b'\n', "\n")
1290 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1291 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001292
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001293 def check_newline_decoder(self, decoder, encoding):
1294 result = []
1295 encoder = codecs.getincrementalencoder(encoding)()
1296 def _decode_bytewise(s):
1297 for b in encoder.encode(s):
1298 result.append(decoder.decode(b))
1299 self.assertEquals(decoder.newlines, None)
1300 _decode_bytewise("abc\n\r")
1301 self.assertEquals(decoder.newlines, '\n')
1302 _decode_bytewise("\nabc")
1303 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1304 _decode_bytewise("abc\r")
1305 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1306 _decode_bytewise("abc")
1307 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1308 _decode_bytewise("abc\r")
1309 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1310 decoder.reset()
1311 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1312 self.assertEquals(decoder.newlines, None)
1313
1314 def test_newline_decoder(self):
1315 encodings = (
1316 'utf-8', 'latin-1',
1317 'utf-16', 'utf-16-le', 'utf-16-be',
1318 'utf-32', 'utf-32-le', 'utf-32-be',
1319 )
1320 for enc in encodings:
1321 decoder = codecs.getincrementaldecoder(enc)()
1322 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1323 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001324 decoder = codecs.getincrementaldecoder("utf-8")()
1325 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001326 self.check_newline_decoder_utf8(decoder)
1327
Antoine Pitrou01a255a2010-05-03 16:48:13 +00001328 def test_flush_error_on_close(self):
1329 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1330 def bad_flush():
1331 raise IOError()
1332 txt.flush = bad_flush
1333 self.assertRaises(IOError, txt.close) # exception not swallowed
1334
1335 def test_multi_close(self):
1336 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1337 txt.close()
1338 txt.close()
1339 txt.close()
1340 self.assertRaises(ValueError, txt.flush)
1341
Christian Heimes1a6387e2008-03-26 12:49:49 +00001342
1343# XXX Tests for open()
1344
1345class MiscIOTest(unittest.TestCase):
1346
Georg Brandld2094602008-12-05 08:51:30 +00001347 def tearDown(self):
1348 test_support.unlink(test_support.TESTFN)
1349
Christian Heimes1a6387e2008-03-26 12:49:49 +00001350 def testImport__all__(self):
1351 for name in io.__all__:
1352 obj = getattr(io, name, None)
1353 self.assert_(obj is not None, name)
1354 if name == "open":
1355 continue
1356 elif "error" in name.lower():
1357 self.assert_(issubclass(obj, Exception), name)
1358 else:
1359 self.assert_(issubclass(obj, io.IOBase))
1360
1361
Georg Brandld2094602008-12-05 08:51:30 +00001362 def test_attributes(self):
1363 f = io.open(test_support.TESTFN, "wb", buffering=0)
Georg Brandlfa71a902008-12-05 09:08:28 +00001364 self.assertEquals(f.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001365 f.close()
1366
1367 f = io.open(test_support.TESTFN, "U")
1368 self.assertEquals(f.name, test_support.TESTFN)
1369 self.assertEquals(f.buffer.name, test_support.TESTFN)
1370 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1371 self.assertEquals(f.mode, "U")
Georg Brandlfa71a902008-12-05 09:08:28 +00001372 self.assertEquals(f.buffer.mode, "rb")
1373 self.assertEquals(f.buffer.raw.mode, "rb")
Georg Brandld2094602008-12-05 08:51:30 +00001374 f.close()
1375
1376 f = io.open(test_support.TESTFN, "w+")
1377 self.assertEquals(f.mode, "w+")
Georg Brandlfa71a902008-12-05 09:08:28 +00001378 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1379 self.assertEquals(f.buffer.raw.mode, "rb+")
Georg Brandld2094602008-12-05 08:51:30 +00001380
1381 g = io.open(f.fileno(), "wb", closefd=False)
Georg Brandlfa71a902008-12-05 09:08:28 +00001382 self.assertEquals(g.mode, "wb")
1383 self.assertEquals(g.raw.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001384 self.assertEquals(g.name, f.fileno())
1385 self.assertEquals(g.raw.name, f.fileno())
1386 f.close()
1387 g.close()
1388
1389
Christian Heimes1a6387e2008-03-26 12:49:49 +00001390def test_main():
1391 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001392 BufferedReaderTest, BufferedWriterTest,
1393 BufferedRWPairTest, BufferedRandomTest,
1394 StatefulIncrementalDecoderTest,
1395 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001396
1397if __name__ == "__main__":
1398 unittest.main()