blob: 38c58c3687f711182dca363cef4606617a71f120 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
Antoine Pitrouca5a06a2010-01-27 21:48:46 +000090
91 self.assertEqual(f.write(b"blah."), 5)
92 f.truncate(0)
93 self.assertEqual(f.tell(), 5)
94 f.seek(0)
95
Christian Heimes1a6387e2008-03-26 12:49:49 +000096 self.assertEqual(f.write(b"blah."), 5)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"Hello."), 6)
99 self.assertEqual(f.tell(), 6)
100 self.assertEqual(f.seek(-1, 1), 5)
101 self.assertEqual(f.tell(), 5)
102 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
103 self.assertEqual(f.seek(0), 0)
104 self.assertEqual(f.write(b"h"), 1)
105 self.assertEqual(f.seek(-1, 2), 13)
106 self.assertEqual(f.tell(), 13)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000107
Christian Heimes1a6387e2008-03-26 12:49:49 +0000108 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000109 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000110 self.assertRaises(TypeError, f.seek, 0.0)
111
112 def read_ops(self, f, buffered=False):
113 data = f.read(5)
114 self.assertEqual(data, b"hello")
115 data = bytearray(data)
116 self.assertEqual(f.readinto(data), 5)
117 self.assertEqual(data, b" worl")
118 self.assertEqual(f.readinto(data), 2)
119 self.assertEqual(len(data), 5)
120 self.assertEqual(data[:2], b"d\n")
121 self.assertEqual(f.seek(0), 0)
122 self.assertEqual(f.read(20), b"hello world\n")
123 self.assertEqual(f.read(1), b"")
124 self.assertEqual(f.readinto(bytearray(b"x")), 0)
125 self.assertEqual(f.seek(-6, 2), 6)
126 self.assertEqual(f.read(5), b"world")
127 self.assertEqual(f.read(0), b"")
128 self.assertEqual(f.readinto(bytearray()), 0)
129 self.assertEqual(f.seek(-6, 1), 5)
130 self.assertEqual(f.read(5), b" worl")
131 self.assertEqual(f.tell(), 10)
132 self.assertRaises(TypeError, f.seek, 0.0)
133 if buffered:
134 f.seek(0)
135 self.assertEqual(f.read(), b"hello world\n")
136 f.seek(6)
137 self.assertEqual(f.read(), b"world\n")
138 self.assertEqual(f.read(), b"")
139
140 LARGE = 2**31
141
142 def large_file_ops(self, f):
143 assert f.readable()
144 assert f.writable()
145 self.assertEqual(f.seek(self.LARGE), self.LARGE)
146 self.assertEqual(f.tell(), self.LARGE)
147 self.assertEqual(f.write(b"xxx"), 3)
148 self.assertEqual(f.tell(), self.LARGE + 3)
149 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
150 self.assertEqual(f.truncate(), self.LARGE + 2)
151 self.assertEqual(f.tell(), self.LARGE + 2)
152 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
153 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000154 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000155 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
156 self.assertEqual(f.seek(-1, 2), self.LARGE)
157 self.assertEqual(f.read(2), b"x")
158
159 def test_raw_file_io(self):
160 f = io.open(test_support.TESTFN, "wb", buffering=0)
161 self.assertEqual(f.readable(), False)
162 self.assertEqual(f.writable(), True)
163 self.assertEqual(f.seekable(), True)
164 self.write_ops(f)
165 f.close()
166 f = io.open(test_support.TESTFN, "rb", buffering=0)
167 self.assertEqual(f.readable(), True)
168 self.assertEqual(f.writable(), False)
169 self.assertEqual(f.seekable(), True)
170 self.read_ops(f)
171 f.close()
172
173 def test_buffered_file_io(self):
174 f = io.open(test_support.TESTFN, "wb")
175 self.assertEqual(f.readable(), False)
176 self.assertEqual(f.writable(), True)
177 self.assertEqual(f.seekable(), True)
178 self.write_ops(f)
179 f.close()
180 f = io.open(test_support.TESTFN, "rb")
181 self.assertEqual(f.readable(), True)
182 self.assertEqual(f.writable(), False)
183 self.assertEqual(f.seekable(), True)
184 self.read_ops(f, True)
185 f.close()
186
187 def test_readline(self):
188 f = io.open(test_support.TESTFN, "wb")
189 f.write(b"abc\ndef\nxyzzy\nfoo")
190 f.close()
191 f = io.open(test_support.TESTFN, "rb")
192 self.assertEqual(f.readline(), b"abc\n")
193 self.assertEqual(f.readline(10), b"def\n")
194 self.assertEqual(f.readline(2), b"xy")
195 self.assertEqual(f.readline(4), b"zzy\n")
196 self.assertEqual(f.readline(), b"foo")
197 f.close()
198
199 def test_raw_bytes_io(self):
200 f = io.BytesIO()
201 self.write_ops(f)
202 data = f.getvalue()
203 self.assertEqual(data, b"hello world\n")
204 f = io.BytesIO(data)
205 self.read_ops(f, True)
206
207 def test_large_file_ops(self):
208 # On Windows and Mac OSX this test comsumes large resources; It takes
209 # a long time to build the >2GB file and takes >2GB of disk space
210 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000211 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000212 if not test_support.is_resource_enabled("largefile"):
213 print("\nTesting large file ops skipped on %s." % sys.platform,
214 file=sys.stderr)
215 print("It requires %d bytes and a long time." % self.LARGE,
216 file=sys.stderr)
217 print("Use 'regrtest.py -u largefile test_io' to run it.",
218 file=sys.stderr)
219 return
220 f = io.open(test_support.TESTFN, "w+b", 0)
221 self.large_file_ops(f)
222 f.close()
223 f = io.open(test_support.TESTFN, "w+b")
224 self.large_file_ops(f)
225 f.close()
226
227 def test_with_open(self):
228 for bufsize in (0, 1, 100):
229 f = None
230 with open(test_support.TESTFN, "wb", bufsize) as f:
231 f.write(b"xxx")
232 self.assertEqual(f.closed, True)
233 f = None
234 try:
235 with open(test_support.TESTFN, "wb", bufsize) as f:
236 1/0
237 except ZeroDivisionError:
238 self.assertEqual(f.closed, True)
239 else:
240 self.fail("1/0 didn't raise an exception")
241
Antoine Pitrou19fec8b2009-01-21 00:56:37 +0000242 # issue 5008
243 def test_append_mode_tell(self):
244 with io.open(test_support.TESTFN, "wb") as f:
245 f.write(b"xxx")
246 with io.open(test_support.TESTFN, "ab", buffering=0) as f:
247 self.assertEqual(f.tell(), 3)
248 with io.open(test_support.TESTFN, "ab") as f:
249 self.assertEqual(f.tell(), 3)
250 with io.open(test_support.TESTFN, "a") as f:
251 self.assert_(f.tell() > 0)
252
Christian Heimes1a6387e2008-03-26 12:49:49 +0000253 def test_destructor(self):
254 record = []
255 class MyFileIO(io.FileIO):
256 def __del__(self):
257 record.append(1)
258 io.FileIO.__del__(self)
259 def close(self):
260 record.append(2)
261 io.FileIO.close(self)
262 def flush(self):
263 record.append(3)
264 io.FileIO.flush(self)
265 f = MyFileIO(test_support.TESTFN, "w")
266 f.write("xxx")
267 del f
268 self.assertEqual(record, [1, 2, 3])
269
270 def test_close_flushes(self):
271 f = io.open(test_support.TESTFN, "wb")
272 f.write(b"xxx")
273 f.close()
274 f = io.open(test_support.TESTFN, "rb")
275 self.assertEqual(f.read(), b"xxx")
276 f.close()
277
278 def XXXtest_array_writes(self):
279 # XXX memory view not available yet
280 a = array.array('i', range(10))
281 n = len(memoryview(a))
282 f = io.open(test_support.TESTFN, "wb", 0)
283 self.assertEqual(f.write(a), n)
284 f.close()
285 f = io.open(test_support.TESTFN, "wb")
286 self.assertEqual(f.write(a), n)
287 f.close()
288
289 def test_closefd(self):
290 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
291 closefd=False)
292
Georg Brandld2094602008-12-05 08:51:30 +0000293 def testReadClosed(self):
294 with io.open(test_support.TESTFN, "w") as f:
295 f.write("egg\n")
296 with io.open(test_support.TESTFN, "r") as f:
297 file = io.open(f.fileno(), "r", closefd=False)
298 self.assertEqual(file.read(), "egg\n")
299 file.seek(0)
300 file.close()
301 self.assertRaises(ValueError, file.read)
302
303 def test_no_closefd_with_filename(self):
304 # can't use closefd in combination with a file name
305 self.assertRaises(ValueError,
306 io.open, test_support.TESTFN, "r", closefd=False)
307
308 def test_closefd_attr(self):
309 with io.open(test_support.TESTFN, "wb") as f:
310 f.write(b"egg\n")
311 with io.open(test_support.TESTFN, "r") as f:
312 self.assertEqual(f.buffer.raw.closefd, True)
313 file = io.open(f.fileno(), "r", closefd=False)
314 self.assertEqual(file.buffer.raw.closefd, False)
315
316
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317class MemorySeekTestMixin:
318
319 def testInit(self):
320 buf = self.buftype("1234567890")
321 bytesIo = self.ioclass(buf)
322
323 def testRead(self):
324 buf = self.buftype("1234567890")
325 bytesIo = self.ioclass(buf)
326
327 self.assertEquals(buf[:1], bytesIo.read(1))
328 self.assertEquals(buf[1:5], bytesIo.read(4))
329 self.assertEquals(buf[5:], bytesIo.read(900))
330 self.assertEquals(self.EOF, bytesIo.read())
331
332 def testReadNoArgs(self):
333 buf = self.buftype("1234567890")
334 bytesIo = self.ioclass(buf)
335
336 self.assertEquals(buf, bytesIo.read())
337 self.assertEquals(self.EOF, bytesIo.read())
338
339 def testSeek(self):
340 buf = self.buftype("1234567890")
341 bytesIo = self.ioclass(buf)
342
343 bytesIo.read(5)
344 bytesIo.seek(0)
345 self.assertEquals(buf, bytesIo.read())
346
347 bytesIo.seek(3)
348 self.assertEquals(buf[3:], bytesIo.read())
349 self.assertRaises(TypeError, bytesIo.seek, 0.0)
350
351 def testTell(self):
352 buf = self.buftype("1234567890")
353 bytesIo = self.ioclass(buf)
354
355 self.assertEquals(0, bytesIo.tell())
356 bytesIo.seek(5)
357 self.assertEquals(5, bytesIo.tell())
358 bytesIo.seek(10000)
359 self.assertEquals(10000, bytesIo.tell())
360
361
362class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
363 @staticmethod
364 def buftype(s):
365 return s.encode("utf-8")
366 ioclass = io.BytesIO
367 EOF = b""
368
369
370class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
371 buftype = str
372 ioclass = io.StringIO
373 EOF = ""
374
375
376class BufferedReaderTest(unittest.TestCase):
377
378 def testRead(self):
379 rawio = MockRawIO((b"abc", b"d", b"efg"))
380 bufio = io.BufferedReader(rawio)
381
382 self.assertEquals(b"abcdef", bufio.read(6))
383
384 def testBuffering(self):
385 data = b"abcdefghi"
386 dlen = len(data)
387
388 tests = [
389 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
390 [ 100, [ 3, 3, 3], [ dlen ] ],
391 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
392 ]
393
394 for bufsize, buf_read_sizes, raw_read_sizes in tests:
395 rawio = MockFileIO(data)
396 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
397 pos = 0
398 for nbytes in buf_read_sizes:
399 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
400 pos += nbytes
401 self.assertEquals(rawio.read_history, raw_read_sizes)
402
403 def testReadNonBlocking(self):
404 # Inject some None's in there to simulate EWOULDBLOCK
405 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
406 bufio = io.BufferedReader(rawio)
407
408 self.assertEquals(b"abcd", bufio.read(6))
409 self.assertEquals(b"e", bufio.read(1))
410 self.assertEquals(b"fg", bufio.read())
411 self.assert_(None is bufio.read())
412 self.assertEquals(b"", bufio.read())
413
414 def testReadToEof(self):
415 rawio = MockRawIO((b"abc", b"d", b"efg"))
416 bufio = io.BufferedReader(rawio)
417
418 self.assertEquals(b"abcdefg", bufio.read(9000))
419
420 def testReadNoArgs(self):
421 rawio = MockRawIO((b"abc", b"d", b"efg"))
422 bufio = io.BufferedReader(rawio)
423
424 self.assertEquals(b"abcdefg", bufio.read())
425
426 def testFileno(self):
427 rawio = MockRawIO((b"abc", b"d", b"efg"))
428 bufio = io.BufferedReader(rawio)
429
430 self.assertEquals(42, bufio.fileno())
431
432 def testFilenoNoFileno(self):
433 # XXX will we always have fileno() function? If so, kill
434 # this test. Else, write it.
435 pass
436
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000437 def testThreads(self):
438 try:
439 # Write out many bytes with exactly the same number of 0's,
440 # 1's... 255's. This will help us check that concurrent reading
441 # doesn't duplicate or forget contents.
442 N = 1000
443 l = range(256) * N
444 random.shuffle(l)
445 s = bytes(bytearray(l))
446 with io.open(test_support.TESTFN, "wb") as f:
447 f.write(s)
448 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
449 bufio = io.BufferedReader(raw, 8)
450 errors = []
451 results = []
452 def f():
453 try:
454 # Intra-buffer read then buffer-flushing read
455 for n in cycle([1, 19]):
456 s = bufio.read(n)
457 if not s:
458 break
459 # list.append() is atomic
460 results.append(s)
461 except Exception as e:
462 errors.append(e)
463 raise
464 threads = [threading.Thread(target=f) for x in range(20)]
465 for t in threads:
466 t.start()
467 time.sleep(0.02) # yield
468 for t in threads:
469 t.join()
470 self.assertFalse(errors,
471 "the following exceptions were caught: %r" % errors)
472 s = b''.join(results)
473 for i in range(256):
474 c = bytes(bytearray([i]))
475 self.assertEqual(s.count(c), N)
476 finally:
477 test_support.unlink(test_support.TESTFN)
478
479
Christian Heimes1a6387e2008-03-26 12:49:49 +0000480
481class BufferedWriterTest(unittest.TestCase):
482
483 def testWrite(self):
484 # Write to the buffered IO but don't overflow the buffer.
485 writer = MockRawIO()
486 bufio = io.BufferedWriter(writer, 8)
487
488 bufio.write(b"abc")
489
490 self.assertFalse(writer._write_stack)
491
492 def testWriteOverflow(self):
493 writer = MockRawIO()
494 bufio = io.BufferedWriter(writer, 8)
495
496 bufio.write(b"abc")
497 bufio.write(b"defghijkl")
498
499 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
500
501 def testWriteNonBlocking(self):
502 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
503 bufio = io.BufferedWriter(raw, 8, 16)
504
505 bufio.write(b"asdf")
506 bufio.write(b"asdfa")
507 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
508
509 bufio.write(b"asdfasdfasdf")
510 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
511 bufio.write(b"asdfasdfasdf")
512 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
513 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
514
515 bufio.write(b"asdfasdfasdf")
516
517 # XXX I don't like this test. It relies too heavily on how the
518 # algorithm actually works, which we might change. Refactor
519 # later.
520
521 def testFileno(self):
522 rawio = MockRawIO((b"abc", b"d", b"efg"))
523 bufio = io.BufferedWriter(rawio)
524
525 self.assertEquals(42, bufio.fileno())
526
527 def testFlush(self):
528 writer = MockRawIO()
529 bufio = io.BufferedWriter(writer, 8)
530
531 bufio.write(b"abc")
532 bufio.flush()
533
534 self.assertEquals(b"abc", writer._write_stack[0])
535
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000536 def testThreads(self):
537 # BufferedWriter should not raise exceptions or crash
538 # when called from multiple threads.
539 try:
540 # We use a real file object because it allows us to
541 # exercise situations where the GIL is released before
542 # writing the buffer to the raw streams. This is in addition
543 # to concurrency issues due to switching threads in the middle
544 # of Python code.
545 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
546 bufio = io.BufferedWriter(raw, 8)
547 errors = []
548 def f():
549 try:
550 # Write enough bytes to flush the buffer
551 s = b"a" * 19
552 for i in range(50):
553 bufio.write(s)
554 except Exception as e:
555 errors.append(e)
556 raise
557 threads = [threading.Thread(target=f) for x in range(20)]
558 for t in threads:
559 t.start()
560 time.sleep(0.02) # yield
561 for t in threads:
562 t.join()
563 self.assertFalse(errors,
564 "the following exceptions were caught: %r" % errors)
565 finally:
566 test_support.unlink(test_support.TESTFN)
567
Christian Heimes1a6387e2008-03-26 12:49:49 +0000568
569class BufferedRWPairTest(unittest.TestCase):
570
571 def testRWPair(self):
572 r = MockRawIO(())
573 w = MockRawIO()
574 pair = io.BufferedRWPair(r, w)
Benjamin Peterson828a7062008-12-27 17:05:29 +0000575 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000576
Benjamin Peterson828a7062008-12-27 17:05:29 +0000577 # XXX More Tests
Christian Heimes1a6387e2008-03-26 12:49:49 +0000578
579
580class BufferedRandomTest(unittest.TestCase):
581
582 def testReadAndWrite(self):
583 raw = MockRawIO((b"asdf", b"ghjk"))
584 rw = io.BufferedRandom(raw, 8, 12)
585
586 self.assertEqual(b"as", rw.read(2))
587 rw.write(b"ddd")
588 rw.write(b"eee")
589 self.assertFalse(raw._write_stack) # Buffer writes
590 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
591 self.assertEquals(b"dddeee", raw._write_stack[0])
592
593 def testSeekAndTell(self):
594 raw = io.BytesIO(b"asdfghjkl")
595 rw = io.BufferedRandom(raw)
596
597 self.assertEquals(b"as", rw.read(2))
598 self.assertEquals(2, rw.tell())
599 rw.seek(0, 0)
600 self.assertEquals(b"asdf", rw.read(4))
601
602 rw.write(b"asdf")
603 rw.seek(0, 0)
604 self.assertEquals(b"asdfasdfl", rw.read())
605 self.assertEquals(9, rw.tell())
606 rw.seek(-4, 2)
607 self.assertEquals(5, rw.tell())
608 rw.seek(2, 1)
609 self.assertEquals(7, rw.tell())
610 self.assertEquals(b"fl", rw.read(11))
611 self.assertRaises(TypeError, rw.seek, 0.0)
612
613# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
614# properties:
615# - A single output character can correspond to many bytes of input.
616# - The number of input bytes to complete the character can be
617# undetermined until the last input byte is received.
618# - The number of input bytes can vary depending on previous input.
619# - A single input byte can correspond to many characters of output.
620# - The number of output characters can be undetermined until the
621# last input byte is received.
622# - The number of output characters can vary depending on previous input.
623
624class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
625 """
626 For testing seek/tell behavior with a stateful, buffering decoder.
627
628 Input is a sequence of words. Words may be fixed-length (length set
629 by input) or variable-length (period-terminated). In variable-length
630 mode, extra periods are ignored. Possible words are:
631 - 'i' followed by a number sets the input length, I (maximum 99).
632 When I is set to 0, words are space-terminated.
633 - 'o' followed by a number sets the output length, O (maximum 99).
634 - Any other word is converted into a word followed by a period on
635 the output. The output word consists of the input word truncated
636 or padded out with hyphens to make its length equal to O. If O
637 is 0, the word is output verbatim without truncating or padding.
638 I and O are initially set to 1. When I changes, any buffered input is
639 re-scanned according to the new I. EOF also terminates the last word.
640 """
641
642 def __init__(self, errors='strict'):
643 codecs.IncrementalDecoder.__init__(self, errors)
644 self.reset()
645
646 def __repr__(self):
647 return '<SID %x>' % id(self)
648
649 def reset(self):
650 self.i = 1
651 self.o = 1
652 self.buffer = bytearray()
653
654 def getstate(self):
655 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
656 return bytes(self.buffer), i*100 + o
657
658 def setstate(self, state):
659 buffer, io = state
660 self.buffer = bytearray(buffer)
661 i, o = divmod(io, 100)
662 self.i, self.o = i ^ 1, o ^ 1
663
664 def decode(self, input, final=False):
665 output = ''
666 for b in input:
667 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000668 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000669 if self.buffer:
670 output += self.process_word()
671 else:
672 self.buffer.append(b)
673 else: # fixed-length, terminate after self.i bytes
674 self.buffer.append(b)
675 if len(self.buffer) == self.i:
676 output += self.process_word()
677 if final and self.buffer: # EOF terminates the last word
678 output += self.process_word()
679 return output
680
681 def process_word(self):
682 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000683 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000684 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000685 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000686 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
687 else:
688 output = self.buffer.decode('ascii')
689 if len(output) < self.o:
690 output += '-'*self.o # pad out with hyphens
691 if self.o:
692 output = output[:self.o] # truncate to output length
693 output += '.'
694 self.buffer = bytearray()
695 return output
696
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000697 codecEnabled = False
698
699 @classmethod
700 def lookupTestDecoder(cls, name):
701 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000702 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000703 return codecs.CodecInfo(
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000704 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000705 incrementalencoder=None,
706 streamreader=None, streamwriter=None,
707 incrementaldecoder=cls)
708
709# Register the previous decoder for testing.
710# Disabled by default, tests will enable it.
711codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
712
713
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714class StatefulIncrementalDecoderTest(unittest.TestCase):
715 """
716 Make sure the StatefulIncrementalDecoder actually works.
717 """
718
719 test_cases = [
720 # I=1, O=1 (fixed-length input == fixed-length output)
721 (b'abcd', False, 'a.b.c.d.'),
722 # I=0, O=0 (variable-length input, variable-length output)
723 (b'oiabcd', True, 'abcd.'),
724 # I=0, O=0 (should ignore extra periods)
725 (b'oi...abcd...', True, 'abcd.'),
726 # I=0, O=6 (variable-length input, fixed-length output)
727 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
728 # I=2, O=6 (fixed-length input < fixed-length output)
729 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
730 # I=6, O=3 (fixed-length input > fixed-length output)
731 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
732 # I=0, then 3; O=29, then 15 (with longer output)
733 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
734 'a----------------------------.' +
735 'b----------------------------.' +
736 'cde--------------------------.' +
737 'abcdefghijabcde.' +
738 'a.b------------.' +
739 '.c.------------.' +
740 'd.e------------.' +
741 'k--------------.' +
742 'l--------------.' +
743 'm--------------.')
744 ]
745
746 def testDecoder(self):
747 # Try a few one-shot test cases.
748 for input, eof, output in self.test_cases:
749 d = StatefulIncrementalDecoder()
750 self.assertEquals(d.decode(input, eof), output)
751
752 # Also test an unfinished decode, followed by forcing EOF.
753 d = StatefulIncrementalDecoder()
754 self.assertEquals(d.decode(b'oiabcd'), '')
755 self.assertEquals(d.decode(b'', 1), 'abcd.')
756
757class TextIOWrapperTest(unittest.TestCase):
758
759 def setUp(self):
760 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
761 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
762
763 def tearDown(self):
764 test_support.unlink(test_support.TESTFN)
765
766 def testLineBuffering(self):
767 r = io.BytesIO()
768 b = io.BufferedWriter(r, 1000)
769 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
770 t.write(u"X")
771 self.assertEquals(r.getvalue(), b"") # No flush happened
772 t.write(u"Y\nZ")
773 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
774 t.write(u"A\rB")
775 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
776
777 def testEncodingErrorsReading(self):
778 # (1) default
779 b = io.BytesIO(b"abc\n\xff\n")
780 t = io.TextIOWrapper(b, encoding="ascii")
781 self.assertRaises(UnicodeError, t.read)
782 # (2) explicit strict
783 b = io.BytesIO(b"abc\n\xff\n")
784 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
785 self.assertRaises(UnicodeError, t.read)
786 # (3) ignore
787 b = io.BytesIO(b"abc\n\xff\n")
788 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
789 self.assertEquals(t.read(), "abc\n\n")
790 # (4) replace
791 b = io.BytesIO(b"abc\n\xff\n")
792 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
793 self.assertEquals(t.read(), u"abc\n\ufffd\n")
794
795 def testEncodingErrorsWriting(self):
796 # (1) default
797 b = io.BytesIO()
798 t = io.TextIOWrapper(b, encoding="ascii")
799 self.assertRaises(UnicodeError, t.write, u"\xff")
800 # (2) explicit strict
801 b = io.BytesIO()
802 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
803 self.assertRaises(UnicodeError, t.write, u"\xff")
804 # (3) ignore
805 b = io.BytesIO()
806 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
807 newline="\n")
808 t.write(u"abc\xffdef\n")
809 t.flush()
810 self.assertEquals(b.getvalue(), b"abcdef\n")
811 # (4) replace
812 b = io.BytesIO()
813 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
814 newline="\n")
815 t.write(u"abc\xffdef\n")
816 t.flush()
817 self.assertEquals(b.getvalue(), b"abc?def\n")
818
819 def testNewlinesInput(self):
820 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
821 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
822 for newline, expected in [
823 (None, normalized.decode("ascii").splitlines(True)),
824 ("", testdata.decode("ascii").splitlines(True)),
825 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
826 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
827 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
828 ]:
829 buf = io.BytesIO(testdata)
830 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
831 self.assertEquals(txt.readlines(), expected)
832 txt.seek(0)
833 self.assertEquals(txt.read(), "".join(expected))
834
835 def testNewlinesOutput(self):
836 testdict = {
837 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
838 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
839 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
840 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
841 }
842 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
843 for newline, expected in tests:
844 buf = io.BytesIO()
845 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
846 txt.write("AAA\nB")
847 txt.write("BB\nCCC\n")
848 txt.write("X\rY\r\nZ")
849 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000850 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000851 self.assertEquals(buf.getvalue(), expected)
852
853 def testNewlines(self):
854 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
855
856 tests = [
857 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
858 [ '', input_lines ],
859 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
860 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
861 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
862 ]
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000863 encodings = (
864 'utf-8', 'latin-1',
865 'utf-16', 'utf-16-le', 'utf-16-be',
866 'utf-32', 'utf-32-le', 'utf-32-be',
867 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000868
869 # Try a range of buffer sizes to test the case where \r is the last
870 # character in TextIOWrapper._pending_line.
871 for encoding in encodings:
872 # XXX: str.encode() should return bytes
873 data = bytes(''.join(input_lines).encode(encoding))
874 for do_reads in (False, True):
875 for bufsize in range(1, 10):
876 for newline, exp_lines in tests:
877 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
878 textio = io.TextIOWrapper(bufio, newline=newline,
879 encoding=encoding)
880 if do_reads:
881 got_lines = []
882 while True:
883 c2 = textio.read(2)
884 if c2 == '':
885 break
886 self.assertEquals(len(c2), 2)
887 got_lines.append(c2 + textio.readline())
888 else:
889 got_lines = list(textio)
890
891 for got_line, exp_line in zip(got_lines, exp_lines):
892 self.assertEquals(got_line, exp_line)
893 self.assertEquals(len(got_lines), len(exp_lines))
894
895 def testNewlinesInput(self):
896 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
897 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
898 for newline, expected in [
899 (None, normalized.decode("ascii").splitlines(True)),
900 ("", testdata.decode("ascii").splitlines(True)),
901 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
902 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
903 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
904 ]:
905 buf = io.BytesIO(testdata)
906 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
907 self.assertEquals(txt.readlines(), expected)
908 txt.seek(0)
909 self.assertEquals(txt.read(), "".join(expected))
910
911 def testNewlinesOutput(self):
912 data = u"AAA\nBBB\rCCC\n"
913 data_lf = b"AAA\nBBB\rCCC\n"
914 data_cr = b"AAA\rBBB\rCCC\r"
915 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
916 save_linesep = os.linesep
917 try:
918 for os.linesep, newline, expected in [
919 ("\n", None, data_lf),
920 ("\r\n", None, data_crlf),
921 ("\n", "", data_lf),
922 ("\r\n", "", data_lf),
923 ("\n", "\n", data_lf),
924 ("\r\n", "\n", data_lf),
925 ("\n", "\r", data_cr),
926 ("\r\n", "\r", data_cr),
927 ("\n", "\r\n", data_crlf),
928 ("\r\n", "\r\n", data_crlf),
929 ]:
930 buf = io.BytesIO()
931 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
932 txt.write(data)
933 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000934 self.assertEquals(buf.closed, True)
935 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000936 finally:
937 os.linesep = save_linesep
938
939 # Systematic tests of the text I/O API
940
941 def testBasicIO(self):
942 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
943 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
944 f = io.open(test_support.TESTFN, "w+", encoding=enc)
945 f._CHUNK_SIZE = chunksize
946 self.assertEquals(f.write(u"abc"), 3)
947 f.close()
948 f = io.open(test_support.TESTFN, "r+", encoding=enc)
949 f._CHUNK_SIZE = chunksize
950 self.assertEquals(f.tell(), 0)
951 self.assertEquals(f.read(), u"abc")
952 cookie = f.tell()
953 self.assertEquals(f.seek(0), 0)
954 self.assertEquals(f.read(2), u"ab")
955 self.assertEquals(f.read(1), u"c")
956 self.assertEquals(f.read(1), u"")
957 self.assertEquals(f.read(), u"")
958 self.assertEquals(f.tell(), cookie)
959 self.assertEquals(f.seek(0), 0)
960 self.assertEquals(f.seek(0, 2), cookie)
961 self.assertEquals(f.write(u"def"), 3)
962 self.assertEquals(f.seek(cookie), cookie)
963 self.assertEquals(f.read(), u"def")
964 if enc.startswith("utf"):
965 self.multi_line_test(f, enc)
966 f.close()
967
968 def multi_line_test(self, f, enc):
969 f.seek(0)
970 f.truncate()
971 sample = u"s\xff\u0fff\uffff"
972 wlines = []
973 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
974 chars = []
975 for i in range(size):
976 chars.append(sample[i % len(sample)])
977 line = u"".join(chars) + u"\n"
978 wlines.append((f.tell(), line))
979 f.write(line)
980 f.seek(0)
981 rlines = []
982 while True:
983 pos = f.tell()
984 line = f.readline()
985 if not line:
986 break
987 rlines.append((pos, line))
988 self.assertEquals(rlines, wlines)
989
990 def testTelling(self):
991 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
992 p0 = f.tell()
993 f.write(u"\xff\n")
994 p1 = f.tell()
995 f.write(u"\xff\n")
996 p2 = f.tell()
997 f.seek(0)
998 self.assertEquals(f.tell(), p0)
999 self.assertEquals(f.readline(), u"\xff\n")
1000 self.assertEquals(f.tell(), p1)
1001 self.assertEquals(f.readline(), u"\xff\n")
1002 self.assertEquals(f.tell(), p2)
1003 f.seek(0)
1004 for line in f:
1005 self.assertEquals(line, u"\xff\n")
1006 self.assertRaises(IOError, f.tell)
1007 self.assertEquals(f.tell(), p2)
1008 f.close()
1009
1010 def testSeeking(self):
1011 chunk_size = io.TextIOWrapper._CHUNK_SIZE
1012 prefix_size = chunk_size - 2
1013 u_prefix = "a" * prefix_size
1014 prefix = bytes(u_prefix.encode("utf-8"))
1015 self.assertEquals(len(u_prefix), len(prefix))
1016 u_suffix = "\u8888\n"
1017 suffix = bytes(u_suffix.encode("utf-8"))
1018 line = prefix + suffix
1019 f = io.open(test_support.TESTFN, "wb")
1020 f.write(line*2)
1021 f.close()
1022 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1023 s = f.read(prefix_size)
1024 self.assertEquals(s, unicode(prefix, "ascii"))
1025 self.assertEquals(f.tell(), prefix_size)
1026 self.assertEquals(f.readline(), u_suffix)
1027
1028 def testSeekingToo(self):
1029 # Regression test for a specific bug
1030 data = b'\xe0\xbf\xbf\n'
1031 f = io.open(test_support.TESTFN, "wb")
1032 f.write(data)
1033 f.close()
1034 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1035 f._CHUNK_SIZE # Just test that it exists
1036 f._CHUNK_SIZE = 2
1037 f.readline()
1038 f.tell()
1039
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001040 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001041 """Test seek/tell using the StatefulIncrementalDecoder."""
1042
Christian Heimes1a6387e2008-03-26 12:49:49 +00001043 def testSeekAndTellWithData(data, min_pos=0):
1044 """Tell/seek to various points within a data stream and ensure
1045 that the decoded data returned by read() is consistent."""
1046 f = io.open(test_support.TESTFN, 'wb')
1047 f.write(data)
1048 f.close()
1049 f = io.open(test_support.TESTFN, encoding='test_decoder')
1050 decoded = f.read()
1051 f.close()
1052
1053 for i in range(min_pos, len(decoded) + 1): # seek positions
1054 for j in [1, 5, len(decoded) - i]: # read lengths
1055 f = io.open(test_support.TESTFN, encoding='test_decoder')
1056 self.assertEquals(f.read(i), decoded[:i])
1057 cookie = f.tell()
1058 self.assertEquals(f.read(j), decoded[i:i + j])
1059 f.seek(cookie)
1060 self.assertEquals(f.read(), decoded[i:])
1061 f.close()
1062
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001063 # Enable the test decoder.
1064 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065
1066 # Run the tests.
1067 try:
1068 # Try each test case.
1069 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1070 testSeekAndTellWithData(input)
1071
1072 # Position each test case so that it crosses a chunk boundary.
1073 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1074 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1075 offset = CHUNK_SIZE - len(input)//2
1076 prefix = b'.'*offset
1077 # Don't bother seeking into the prefix (takes too long).
1078 min_pos = offset*2
1079 testSeekAndTellWithData(prefix + input, min_pos)
1080
1081 # Ensure our test decoder won't interfere with subsequent tests.
1082 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001083 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001084
1085 def testEncodedWrites(self):
1086 data = u"1234567890"
1087 tests = ("utf-16",
1088 "utf-16-le",
1089 "utf-16-be",
1090 "utf-32",
1091 "utf-32-le",
1092 "utf-32-be")
1093 for encoding in tests:
1094 buf = io.BytesIO()
1095 f = io.TextIOWrapper(buf, encoding=encoding)
1096 # Check if the BOM is written only once (see issue1753).
1097 f.write(data)
1098 f.write(data)
1099 f.seek(0)
1100 self.assertEquals(f.read(), data * 2)
1101 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1102
1103 def timingTest(self):
1104 timer = time.time
1105 enc = "utf8"
1106 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1107 nlines = 10000
1108 nchars = len(line)
1109 nbytes = len(line.encode(enc))
1110 for chunk_size in (32, 64, 128, 256):
1111 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1112 f._CHUNK_SIZE = chunk_size
1113 t0 = timer()
1114 for i in range(nlines):
1115 f.write(line)
1116 f.flush()
1117 t1 = timer()
1118 f.seek(0)
1119 for line in f:
1120 pass
1121 t2 = timer()
1122 f.seek(0)
1123 while f.readline():
1124 pass
1125 t3 = timer()
1126 f.seek(0)
1127 while f.readline():
1128 f.tell()
1129 t4 = timer()
1130 f.close()
1131 if test_support.verbose:
1132 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1133 (nlines, nchars, nbytes))
1134 print("File chunk size: %6s" % f._CHUNK_SIZE)
1135 print("Writing: %6.3f seconds" % (t1-t0))
1136 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1137 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1138 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1139
1140 def testReadOneByOne(self):
1141 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1142 reads = ""
1143 while True:
1144 c = txt.read(1)
1145 if not c:
1146 break
1147 reads += c
1148 self.assertEquals(reads, "AA\nBB")
1149
1150 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1151 def testReadByChunk(self):
1152 # make sure "\r\n" straddles 128 char boundary.
1153 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1154 reads = ""
1155 while True:
1156 c = txt.read(128)
1157 if not c:
1158 break
1159 reads += c
1160 self.assertEquals(reads, "A"*127+"\nB")
1161
1162 def test_issue1395_1(self):
1163 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1164
1165 # read one char at a time
1166 reads = ""
1167 while True:
1168 c = txt.read(1)
1169 if not c:
1170 break
1171 reads += c
1172 self.assertEquals(reads, self.normalized)
1173
1174 def test_issue1395_2(self):
1175 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1176 txt._CHUNK_SIZE = 4
1177
1178 reads = ""
1179 while True:
1180 c = txt.read(4)
1181 if not c:
1182 break
1183 reads += c
1184 self.assertEquals(reads, self.normalized)
1185
1186 def test_issue1395_3(self):
1187 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1188 txt._CHUNK_SIZE = 4
1189
1190 reads = txt.read(4)
1191 reads += txt.read(4)
1192 reads += txt.readline()
1193 reads += txt.readline()
1194 reads += txt.readline()
1195 self.assertEquals(reads, self.normalized)
1196
1197 def test_issue1395_4(self):
1198 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1199 txt._CHUNK_SIZE = 4
1200
1201 reads = txt.read(4)
1202 reads += txt.read()
1203 self.assertEquals(reads, self.normalized)
1204
1205 def test_issue1395_5(self):
1206 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1207 txt._CHUNK_SIZE = 4
1208
1209 reads = txt.read(4)
1210 pos = txt.tell()
1211 txt.seek(0)
1212 txt.seek(pos)
1213 self.assertEquals(txt.read(4), "BBB\n")
1214
1215 def test_issue2282(self):
1216 buffer = io.BytesIO(self.testdata)
1217 txt = io.TextIOWrapper(buffer, encoding="ascii")
1218
1219 self.assertEqual(buffer.seekable(), txt.seekable())
1220
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001221 def check_newline_decoder_utf8(self, decoder):
1222 # UTF-8 specific tests for a newline decoder
1223 def _check_decode(b, s, **kwargs):
1224 # We exercise getstate() / setstate() as well as decode()
1225 state = decoder.getstate()
1226 self.assertEquals(decoder.decode(b, **kwargs), s)
1227 decoder.setstate(state)
1228 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001229
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001230 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001231
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001232 _check_decode(b'\xe8', "")
1233 _check_decode(b'\xa2', "")
1234 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001235
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001236 _check_decode(b'\xe8', "")
1237 _check_decode(b'\xa2', "")
1238 _check_decode(b'\x88', "\u8888")
1239
1240 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001241 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1242
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001243 decoder.reset()
1244 _check_decode(b'\n', "\n")
1245 _check_decode(b'\r', "")
1246 _check_decode(b'', "\n", final=True)
1247 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001248
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001249 _check_decode(b'\r', "")
1250 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001251
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001252 _check_decode(b'\r\r\n', "\n\n")
1253 _check_decode(b'\r', "")
1254 _check_decode(b'\r', "\n")
1255 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001256
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001257 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1258 _check_decode(b'\xe8\xa2\x88', "\u8888")
1259 _check_decode(b'\n', "\n")
1260 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1261 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001262
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001263 def check_newline_decoder(self, decoder, encoding):
1264 result = []
1265 encoder = codecs.getincrementalencoder(encoding)()
1266 def _decode_bytewise(s):
1267 for b in encoder.encode(s):
1268 result.append(decoder.decode(b))
1269 self.assertEquals(decoder.newlines, None)
1270 _decode_bytewise("abc\n\r")
1271 self.assertEquals(decoder.newlines, '\n')
1272 _decode_bytewise("\nabc")
1273 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1274 _decode_bytewise("abc\r")
1275 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1276 _decode_bytewise("abc")
1277 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1278 _decode_bytewise("abc\r")
1279 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1280 decoder.reset()
1281 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1282 self.assertEquals(decoder.newlines, None)
1283
1284 def test_newline_decoder(self):
1285 encodings = (
1286 'utf-8', 'latin-1',
1287 'utf-16', 'utf-16-le', 'utf-16-be',
1288 'utf-32', 'utf-32-le', 'utf-32-be',
1289 )
1290 for enc in encodings:
1291 decoder = codecs.getincrementaldecoder(enc)()
1292 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1293 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001294 decoder = codecs.getincrementaldecoder("utf-8")()
1295 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001296 self.check_newline_decoder_utf8(decoder)
1297
Christian Heimes1a6387e2008-03-26 12:49:49 +00001298
1299# XXX Tests for open()
1300
1301class MiscIOTest(unittest.TestCase):
1302
Georg Brandld2094602008-12-05 08:51:30 +00001303 def tearDown(self):
1304 test_support.unlink(test_support.TESTFN)
1305
Christian Heimes1a6387e2008-03-26 12:49:49 +00001306 def testImport__all__(self):
1307 for name in io.__all__:
1308 obj = getattr(io, name, None)
1309 self.assert_(obj is not None, name)
1310 if name == "open":
1311 continue
1312 elif "error" in name.lower():
1313 self.assert_(issubclass(obj, Exception), name)
1314 else:
1315 self.assert_(issubclass(obj, io.IOBase))
1316
1317
Georg Brandld2094602008-12-05 08:51:30 +00001318 def test_attributes(self):
1319 f = io.open(test_support.TESTFN, "wb", buffering=0)
Georg Brandlfa71a902008-12-05 09:08:28 +00001320 self.assertEquals(f.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001321 f.close()
1322
1323 f = io.open(test_support.TESTFN, "U")
1324 self.assertEquals(f.name, test_support.TESTFN)
1325 self.assertEquals(f.buffer.name, test_support.TESTFN)
1326 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1327 self.assertEquals(f.mode, "U")
Georg Brandlfa71a902008-12-05 09:08:28 +00001328 self.assertEquals(f.buffer.mode, "rb")
1329 self.assertEquals(f.buffer.raw.mode, "rb")
Georg Brandld2094602008-12-05 08:51:30 +00001330 f.close()
1331
1332 f = io.open(test_support.TESTFN, "w+")
1333 self.assertEquals(f.mode, "w+")
Georg Brandlfa71a902008-12-05 09:08:28 +00001334 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1335 self.assertEquals(f.buffer.raw.mode, "rb+")
Georg Brandld2094602008-12-05 08:51:30 +00001336
1337 g = io.open(f.fileno(), "wb", closefd=False)
Georg Brandlfa71a902008-12-05 09:08:28 +00001338 self.assertEquals(g.mode, "wb")
1339 self.assertEquals(g.raw.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001340 self.assertEquals(g.name, f.fileno())
1341 self.assertEquals(g.raw.name, f.fileno())
1342 f.close()
1343 g.close()
1344
1345
Christian Heimes1a6387e2008-03-26 12:49:49 +00001346def test_main():
1347 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001348 BufferedReaderTest, BufferedWriterTest,
1349 BufferedRWPairTest, BufferedRandomTest,
1350 StatefulIncrementalDecoderTest,
1351 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001352
1353if __name__ == "__main__":
1354 unittest.main()