blob: f0b38b6f7a94b0b1bdfbbfb9354d50c15f459307 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
90 self.assertEqual(f.write(b"blah."), 5)
91 self.assertEqual(f.seek(0), 0)
92 self.assertEqual(f.write(b"Hello."), 6)
93 self.assertEqual(f.tell(), 6)
94 self.assertEqual(f.seek(-1, 1), 5)
95 self.assertEqual(f.tell(), 5)
96 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"h"), 1)
99 self.assertEqual(f.seek(-1, 2), 13)
100 self.assertEqual(f.tell(), 13)
101 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000102 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000103 self.assertRaises(TypeError, f.seek, 0.0)
104
105 def read_ops(self, f, buffered=False):
106 data = f.read(5)
107 self.assertEqual(data, b"hello")
108 data = bytearray(data)
109 self.assertEqual(f.readinto(data), 5)
110 self.assertEqual(data, b" worl")
111 self.assertEqual(f.readinto(data), 2)
112 self.assertEqual(len(data), 5)
113 self.assertEqual(data[:2], b"d\n")
114 self.assertEqual(f.seek(0), 0)
115 self.assertEqual(f.read(20), b"hello world\n")
116 self.assertEqual(f.read(1), b"")
117 self.assertEqual(f.readinto(bytearray(b"x")), 0)
118 self.assertEqual(f.seek(-6, 2), 6)
119 self.assertEqual(f.read(5), b"world")
120 self.assertEqual(f.read(0), b"")
121 self.assertEqual(f.readinto(bytearray()), 0)
122 self.assertEqual(f.seek(-6, 1), 5)
123 self.assertEqual(f.read(5), b" worl")
124 self.assertEqual(f.tell(), 10)
125 self.assertRaises(TypeError, f.seek, 0.0)
126 if buffered:
127 f.seek(0)
128 self.assertEqual(f.read(), b"hello world\n")
129 f.seek(6)
130 self.assertEqual(f.read(), b"world\n")
131 self.assertEqual(f.read(), b"")
132
133 LARGE = 2**31
134
135 def large_file_ops(self, f):
136 assert f.readable()
137 assert f.writable()
138 self.assertEqual(f.seek(self.LARGE), self.LARGE)
139 self.assertEqual(f.tell(), self.LARGE)
140 self.assertEqual(f.write(b"xxx"), 3)
141 self.assertEqual(f.tell(), self.LARGE + 3)
142 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
143 self.assertEqual(f.truncate(), self.LARGE + 2)
144 self.assertEqual(f.tell(), self.LARGE + 2)
145 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
146 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000147 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000148 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
149 self.assertEqual(f.seek(-1, 2), self.LARGE)
150 self.assertEqual(f.read(2), b"x")
151
152 def test_raw_file_io(self):
153 f = io.open(test_support.TESTFN, "wb", buffering=0)
154 self.assertEqual(f.readable(), False)
155 self.assertEqual(f.writable(), True)
156 self.assertEqual(f.seekable(), True)
157 self.write_ops(f)
158 f.close()
159 f = io.open(test_support.TESTFN, "rb", buffering=0)
160 self.assertEqual(f.readable(), True)
161 self.assertEqual(f.writable(), False)
162 self.assertEqual(f.seekable(), True)
163 self.read_ops(f)
164 f.close()
165
166 def test_buffered_file_io(self):
167 f = io.open(test_support.TESTFN, "wb")
168 self.assertEqual(f.readable(), False)
169 self.assertEqual(f.writable(), True)
170 self.assertEqual(f.seekable(), True)
171 self.write_ops(f)
172 f.close()
173 f = io.open(test_support.TESTFN, "rb")
174 self.assertEqual(f.readable(), True)
175 self.assertEqual(f.writable(), False)
176 self.assertEqual(f.seekable(), True)
177 self.read_ops(f, True)
178 f.close()
179
180 def test_readline(self):
181 f = io.open(test_support.TESTFN, "wb")
182 f.write(b"abc\ndef\nxyzzy\nfoo")
183 f.close()
184 f = io.open(test_support.TESTFN, "rb")
185 self.assertEqual(f.readline(), b"abc\n")
186 self.assertEqual(f.readline(10), b"def\n")
187 self.assertEqual(f.readline(2), b"xy")
188 self.assertEqual(f.readline(4), b"zzy\n")
189 self.assertEqual(f.readline(), b"foo")
190 f.close()
191
192 def test_raw_bytes_io(self):
193 f = io.BytesIO()
194 self.write_ops(f)
195 data = f.getvalue()
196 self.assertEqual(data, b"hello world\n")
197 f = io.BytesIO(data)
198 self.read_ops(f, True)
199
200 def test_large_file_ops(self):
201 # On Windows and Mac OSX this test comsumes large resources; It takes
202 # a long time to build the >2GB file and takes >2GB of disk space
203 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000204 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 if not test_support.is_resource_enabled("largefile"):
206 print("\nTesting large file ops skipped on %s." % sys.platform,
207 file=sys.stderr)
208 print("It requires %d bytes and a long time." % self.LARGE,
209 file=sys.stderr)
210 print("Use 'regrtest.py -u largefile test_io' to run it.",
211 file=sys.stderr)
212 return
213 f = io.open(test_support.TESTFN, "w+b", 0)
214 self.large_file_ops(f)
215 f.close()
216 f = io.open(test_support.TESTFN, "w+b")
217 self.large_file_ops(f)
218 f.close()
219
220 def test_with_open(self):
221 for bufsize in (0, 1, 100):
222 f = None
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 f.write(b"xxx")
225 self.assertEqual(f.closed, True)
226 f = None
227 try:
228 with open(test_support.TESTFN, "wb", bufsize) as f:
229 1/0
230 except ZeroDivisionError:
231 self.assertEqual(f.closed, True)
232 else:
233 self.fail("1/0 didn't raise an exception")
234
Antoine Pitroue741cc62009-01-21 00:45:36 +0000235 # issue 5008
236 def test_append_mode_tell(self):
237 with io.open(test_support.TESTFN, "wb") as f:
238 f.write(b"xxx")
239 with io.open(test_support.TESTFN, "ab", buffering=0) as f:
240 self.assertEqual(f.tell(), 3)
241 with io.open(test_support.TESTFN, "ab") as f:
242 self.assertEqual(f.tell(), 3)
243 with io.open(test_support.TESTFN, "a") as f:
244 self.assert_(f.tell() > 0)
245
Christian Heimes1a6387e2008-03-26 12:49:49 +0000246 def test_destructor(self):
247 record = []
248 class MyFileIO(io.FileIO):
249 def __del__(self):
250 record.append(1)
251 io.FileIO.__del__(self)
252 def close(self):
253 record.append(2)
254 io.FileIO.close(self)
255 def flush(self):
256 record.append(3)
257 io.FileIO.flush(self)
258 f = MyFileIO(test_support.TESTFN, "w")
259 f.write("xxx")
260 del f
261 self.assertEqual(record, [1, 2, 3])
262
263 def test_close_flushes(self):
264 f = io.open(test_support.TESTFN, "wb")
265 f.write(b"xxx")
266 f.close()
267 f = io.open(test_support.TESTFN, "rb")
268 self.assertEqual(f.read(), b"xxx")
269 f.close()
270
271 def XXXtest_array_writes(self):
272 # XXX memory view not available yet
273 a = array.array('i', range(10))
274 n = len(memoryview(a))
275 f = io.open(test_support.TESTFN, "wb", 0)
276 self.assertEqual(f.write(a), n)
277 f.close()
278 f = io.open(test_support.TESTFN, "wb")
279 self.assertEqual(f.write(a), n)
280 f.close()
281
282 def test_closefd(self):
283 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
284 closefd=False)
285
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000286 def testReadClosed(self):
287 with io.open(test_support.TESTFN, "w") as f:
288 f.write("egg\n")
289 with io.open(test_support.TESTFN, "r") as f:
290 file = io.open(f.fileno(), "r", closefd=False)
291 self.assertEqual(file.read(), "egg\n")
292 file.seek(0)
293 file.close()
294 self.assertRaises(ValueError, file.read)
295
296 def test_no_closefd_with_filename(self):
297 # can't use closefd in combination with a file name
298 self.assertRaises(ValueError,
299 io.open, test_support.TESTFN, "r", closefd=False)
300
301 def test_closefd_attr(self):
302 with io.open(test_support.TESTFN, "wb") as f:
303 f.write(b"egg\n")
304 with io.open(test_support.TESTFN, "r") as f:
305 self.assertEqual(f.buffer.raw.closefd, True)
306 file = io.open(f.fileno(), "r", closefd=False)
307 self.assertEqual(file.buffer.raw.closefd, False)
308
309
Christian Heimes1a6387e2008-03-26 12:49:49 +0000310class MemorySeekTestMixin:
311
312 def testInit(self):
313 buf = self.buftype("1234567890")
314 bytesIo = self.ioclass(buf)
315
316 def testRead(self):
317 buf = self.buftype("1234567890")
318 bytesIo = self.ioclass(buf)
319
320 self.assertEquals(buf[:1], bytesIo.read(1))
321 self.assertEquals(buf[1:5], bytesIo.read(4))
322 self.assertEquals(buf[5:], bytesIo.read(900))
323 self.assertEquals(self.EOF, bytesIo.read())
324
325 def testReadNoArgs(self):
326 buf = self.buftype("1234567890")
327 bytesIo = self.ioclass(buf)
328
329 self.assertEquals(buf, bytesIo.read())
330 self.assertEquals(self.EOF, bytesIo.read())
331
332 def testSeek(self):
333 buf = self.buftype("1234567890")
334 bytesIo = self.ioclass(buf)
335
336 bytesIo.read(5)
337 bytesIo.seek(0)
338 self.assertEquals(buf, bytesIo.read())
339
340 bytesIo.seek(3)
341 self.assertEquals(buf[3:], bytesIo.read())
342 self.assertRaises(TypeError, bytesIo.seek, 0.0)
343
344 def testTell(self):
345 buf = self.buftype("1234567890")
346 bytesIo = self.ioclass(buf)
347
348 self.assertEquals(0, bytesIo.tell())
349 bytesIo.seek(5)
350 self.assertEquals(5, bytesIo.tell())
351 bytesIo.seek(10000)
352 self.assertEquals(10000, bytesIo.tell())
353
354
355class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
356 @staticmethod
357 def buftype(s):
358 return s.encode("utf-8")
359 ioclass = io.BytesIO
360 EOF = b""
361
362
363class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
364 buftype = str
365 ioclass = io.StringIO
366 EOF = ""
367
368
369class BufferedReaderTest(unittest.TestCase):
370
371 def testRead(self):
372 rawio = MockRawIO((b"abc", b"d", b"efg"))
373 bufio = io.BufferedReader(rawio)
374
375 self.assertEquals(b"abcdef", bufio.read(6))
376
377 def testBuffering(self):
378 data = b"abcdefghi"
379 dlen = len(data)
380
381 tests = [
382 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
383 [ 100, [ 3, 3, 3], [ dlen ] ],
384 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
385 ]
386
387 for bufsize, buf_read_sizes, raw_read_sizes in tests:
388 rawio = MockFileIO(data)
389 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
390 pos = 0
391 for nbytes in buf_read_sizes:
392 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
393 pos += nbytes
394 self.assertEquals(rawio.read_history, raw_read_sizes)
395
396 def testReadNonBlocking(self):
397 # Inject some None's in there to simulate EWOULDBLOCK
398 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
399 bufio = io.BufferedReader(rawio)
400
401 self.assertEquals(b"abcd", bufio.read(6))
402 self.assertEquals(b"e", bufio.read(1))
403 self.assertEquals(b"fg", bufio.read())
404 self.assert_(None is bufio.read())
405 self.assertEquals(b"", bufio.read())
406
407 def testReadToEof(self):
408 rawio = MockRawIO((b"abc", b"d", b"efg"))
409 bufio = io.BufferedReader(rawio)
410
411 self.assertEquals(b"abcdefg", bufio.read(9000))
412
413 def testReadNoArgs(self):
414 rawio = MockRawIO((b"abc", b"d", b"efg"))
415 bufio = io.BufferedReader(rawio)
416
417 self.assertEquals(b"abcdefg", bufio.read())
418
419 def testFileno(self):
420 rawio = MockRawIO((b"abc", b"d", b"efg"))
421 bufio = io.BufferedReader(rawio)
422
423 self.assertEquals(42, bufio.fileno())
424
425 def testFilenoNoFileno(self):
426 # XXX will we always have fileno() function? If so, kill
427 # this test. Else, write it.
428 pass
429
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000430 def testThreads(self):
431 try:
432 # Write out many bytes with exactly the same number of 0's,
433 # 1's... 255's. This will help us check that concurrent reading
434 # doesn't duplicate or forget contents.
435 N = 1000
436 l = range(256) * N
437 random.shuffle(l)
438 s = bytes(bytearray(l))
439 with io.open(test_support.TESTFN, "wb") as f:
440 f.write(s)
441 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
442 bufio = io.BufferedReader(raw, 8)
443 errors = []
444 results = []
445 def f():
446 try:
447 # Intra-buffer read then buffer-flushing read
448 for n in cycle([1, 19]):
449 s = bufio.read(n)
450 if not s:
451 break
452 # list.append() is atomic
453 results.append(s)
454 except Exception as e:
455 errors.append(e)
456 raise
457 threads = [threading.Thread(target=f) for x in range(20)]
458 for t in threads:
459 t.start()
460 time.sleep(0.02) # yield
461 for t in threads:
462 t.join()
463 self.assertFalse(errors,
464 "the following exceptions were caught: %r" % errors)
465 s = b''.join(results)
466 for i in range(256):
467 c = bytes(bytearray([i]))
468 self.assertEqual(s.count(c), N)
469 finally:
470 test_support.unlink(test_support.TESTFN)
471
472
Christian Heimes1a6387e2008-03-26 12:49:49 +0000473
474class BufferedWriterTest(unittest.TestCase):
475
476 def testWrite(self):
477 # Write to the buffered IO but don't overflow the buffer.
478 writer = MockRawIO()
479 bufio = io.BufferedWriter(writer, 8)
480
481 bufio.write(b"abc")
482
483 self.assertFalse(writer._write_stack)
484
485 def testWriteOverflow(self):
486 writer = MockRawIO()
487 bufio = io.BufferedWriter(writer, 8)
488
489 bufio.write(b"abc")
490 bufio.write(b"defghijkl")
491
492 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
493
494 def testWriteNonBlocking(self):
495 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
496 bufio = io.BufferedWriter(raw, 8, 16)
497
498 bufio.write(b"asdf")
499 bufio.write(b"asdfa")
500 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
501
502 bufio.write(b"asdfasdfasdf")
503 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
504 bufio.write(b"asdfasdfasdf")
505 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
506 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
507
508 bufio.write(b"asdfasdfasdf")
509
510 # XXX I don't like this test. It relies too heavily on how the
511 # algorithm actually works, which we might change. Refactor
512 # later.
513
514 def testFileno(self):
515 rawio = MockRawIO((b"abc", b"d", b"efg"))
516 bufio = io.BufferedWriter(rawio)
517
518 self.assertEquals(42, bufio.fileno())
519
520 def testFlush(self):
521 writer = MockRawIO()
522 bufio = io.BufferedWriter(writer, 8)
523
524 bufio.write(b"abc")
525 bufio.flush()
526
527 self.assertEquals(b"abc", writer._write_stack[0])
528
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000529 def testThreads(self):
530 # BufferedWriter should not raise exceptions or crash
531 # when called from multiple threads.
532 try:
533 # We use a real file object because it allows us to
534 # exercise situations where the GIL is released before
535 # writing the buffer to the raw streams. This is in addition
536 # to concurrency issues due to switching threads in the middle
537 # of Python code.
538 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
539 bufio = io.BufferedWriter(raw, 8)
540 errors = []
541 def f():
542 try:
543 # Write enough bytes to flush the buffer
544 s = b"a" * 19
545 for i in range(50):
546 bufio.write(s)
547 except Exception as e:
548 errors.append(e)
549 raise
550 threads = [threading.Thread(target=f) for x in range(20)]
551 for t in threads:
552 t.start()
553 time.sleep(0.02) # yield
554 for t in threads:
555 t.join()
556 self.assertFalse(errors,
557 "the following exceptions were caught: %r" % errors)
558 finally:
559 test_support.unlink(test_support.TESTFN)
560
Christian Heimes1a6387e2008-03-26 12:49:49 +0000561
562class BufferedRWPairTest(unittest.TestCase):
563
564 def testRWPair(self):
565 r = MockRawIO(())
566 w = MockRawIO()
567 pair = io.BufferedRWPair(r, w)
Benjamin Peterson54686e32008-12-24 15:10:27 +0000568 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000569
Benjamin Peterson54686e32008-12-24 15:10:27 +0000570 # XXX More Tests
Christian Heimes1a6387e2008-03-26 12:49:49 +0000571
572
573class BufferedRandomTest(unittest.TestCase):
574
575 def testReadAndWrite(self):
576 raw = MockRawIO((b"asdf", b"ghjk"))
577 rw = io.BufferedRandom(raw, 8, 12)
578
579 self.assertEqual(b"as", rw.read(2))
580 rw.write(b"ddd")
581 rw.write(b"eee")
582 self.assertFalse(raw._write_stack) # Buffer writes
583 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
584 self.assertEquals(b"dddeee", raw._write_stack[0])
585
586 def testSeekAndTell(self):
587 raw = io.BytesIO(b"asdfghjkl")
588 rw = io.BufferedRandom(raw)
589
590 self.assertEquals(b"as", rw.read(2))
591 self.assertEquals(2, rw.tell())
592 rw.seek(0, 0)
593 self.assertEquals(b"asdf", rw.read(4))
594
595 rw.write(b"asdf")
596 rw.seek(0, 0)
597 self.assertEquals(b"asdfasdfl", rw.read())
598 self.assertEquals(9, rw.tell())
599 rw.seek(-4, 2)
600 self.assertEquals(5, rw.tell())
601 rw.seek(2, 1)
602 self.assertEquals(7, rw.tell())
603 self.assertEquals(b"fl", rw.read(11))
604 self.assertRaises(TypeError, rw.seek, 0.0)
605
606# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
607# properties:
608# - A single output character can correspond to many bytes of input.
609# - The number of input bytes to complete the character can be
610# undetermined until the last input byte is received.
611# - The number of input bytes can vary depending on previous input.
612# - A single input byte can correspond to many characters of output.
613# - The number of output characters can be undetermined until the
614# last input byte is received.
615# - The number of output characters can vary depending on previous input.
616
617class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
618 """
619 For testing seek/tell behavior with a stateful, buffering decoder.
620
621 Input is a sequence of words. Words may be fixed-length (length set
622 by input) or variable-length (period-terminated). In variable-length
623 mode, extra periods are ignored. Possible words are:
624 - 'i' followed by a number sets the input length, I (maximum 99).
625 When I is set to 0, words are space-terminated.
626 - 'o' followed by a number sets the output length, O (maximum 99).
627 - Any other word is converted into a word followed by a period on
628 the output. The output word consists of the input word truncated
629 or padded out with hyphens to make its length equal to O. If O
630 is 0, the word is output verbatim without truncating or padding.
631 I and O are initially set to 1. When I changes, any buffered input is
632 re-scanned according to the new I. EOF also terminates the last word.
633 """
634
635 def __init__(self, errors='strict'):
636 codecs.IncrementalDecoder.__init__(self, errors)
637 self.reset()
638
639 def __repr__(self):
640 return '<SID %x>' % id(self)
641
642 def reset(self):
643 self.i = 1
644 self.o = 1
645 self.buffer = bytearray()
646
647 def getstate(self):
648 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
649 return bytes(self.buffer), i*100 + o
650
651 def setstate(self, state):
652 buffer, io = state
653 self.buffer = bytearray(buffer)
654 i, o = divmod(io, 100)
655 self.i, self.o = i ^ 1, o ^ 1
656
657 def decode(self, input, final=False):
658 output = ''
659 for b in input:
660 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000661 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000662 if self.buffer:
663 output += self.process_word()
664 else:
665 self.buffer.append(b)
666 else: # fixed-length, terminate after self.i bytes
667 self.buffer.append(b)
668 if len(self.buffer) == self.i:
669 output += self.process_word()
670 if final and self.buffer: # EOF terminates the last word
671 output += self.process_word()
672 return output
673
674 def process_word(self):
675 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000676 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000677 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000678 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000679 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
680 else:
681 output = self.buffer.decode('ascii')
682 if len(output) < self.o:
683 output += '-'*self.o # pad out with hyphens
684 if self.o:
685 output = output[:self.o] # truncate to output length
686 output += '.'
687 self.buffer = bytearray()
688 return output
689
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000690 codecEnabled = False
691
692 @classmethod
693 def lookupTestDecoder(cls, name):
694 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000695 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000696 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000697 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000698 incrementalencoder=None,
699 streamreader=None, streamwriter=None,
700 incrementaldecoder=cls)
701
702# Register the previous decoder for testing.
703# Disabled by default, tests will enable it.
704codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
705
706
Christian Heimes1a6387e2008-03-26 12:49:49 +0000707class StatefulIncrementalDecoderTest(unittest.TestCase):
708 """
709 Make sure the StatefulIncrementalDecoder actually works.
710 """
711
712 test_cases = [
713 # I=1, O=1 (fixed-length input == fixed-length output)
714 (b'abcd', False, 'a.b.c.d.'),
715 # I=0, O=0 (variable-length input, variable-length output)
716 (b'oiabcd', True, 'abcd.'),
717 # I=0, O=0 (should ignore extra periods)
718 (b'oi...abcd...', True, 'abcd.'),
719 # I=0, O=6 (variable-length input, fixed-length output)
720 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
721 # I=2, O=6 (fixed-length input < fixed-length output)
722 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
723 # I=6, O=3 (fixed-length input > fixed-length output)
724 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
725 # I=0, then 3; O=29, then 15 (with longer output)
726 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
727 'a----------------------------.' +
728 'b----------------------------.' +
729 'cde--------------------------.' +
730 'abcdefghijabcde.' +
731 'a.b------------.' +
732 '.c.------------.' +
733 'd.e------------.' +
734 'k--------------.' +
735 'l--------------.' +
736 'm--------------.')
737 ]
738
739 def testDecoder(self):
740 # Try a few one-shot test cases.
741 for input, eof, output in self.test_cases:
742 d = StatefulIncrementalDecoder()
743 self.assertEquals(d.decode(input, eof), output)
744
745 # Also test an unfinished decode, followed by forcing EOF.
746 d = StatefulIncrementalDecoder()
747 self.assertEquals(d.decode(b'oiabcd'), '')
748 self.assertEquals(d.decode(b'', 1), 'abcd.')
749
750class TextIOWrapperTest(unittest.TestCase):
751
752 def setUp(self):
753 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
754 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
755
756 def tearDown(self):
757 test_support.unlink(test_support.TESTFN)
758
759 def testLineBuffering(self):
760 r = io.BytesIO()
761 b = io.BufferedWriter(r, 1000)
762 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
763 t.write(u"X")
764 self.assertEquals(r.getvalue(), b"") # No flush happened
765 t.write(u"Y\nZ")
766 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
767 t.write(u"A\rB")
768 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
769
770 def testEncodingErrorsReading(self):
771 # (1) default
772 b = io.BytesIO(b"abc\n\xff\n")
773 t = io.TextIOWrapper(b, encoding="ascii")
774 self.assertRaises(UnicodeError, t.read)
775 # (2) explicit strict
776 b = io.BytesIO(b"abc\n\xff\n")
777 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
778 self.assertRaises(UnicodeError, t.read)
779 # (3) ignore
780 b = io.BytesIO(b"abc\n\xff\n")
781 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
782 self.assertEquals(t.read(), "abc\n\n")
783 # (4) replace
784 b = io.BytesIO(b"abc\n\xff\n")
785 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
786 self.assertEquals(t.read(), u"abc\n\ufffd\n")
787
788 def testEncodingErrorsWriting(self):
789 # (1) default
790 b = io.BytesIO()
791 t = io.TextIOWrapper(b, encoding="ascii")
792 self.assertRaises(UnicodeError, t.write, u"\xff")
793 # (2) explicit strict
794 b = io.BytesIO()
795 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
796 self.assertRaises(UnicodeError, t.write, u"\xff")
797 # (3) ignore
798 b = io.BytesIO()
799 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
800 newline="\n")
801 t.write(u"abc\xffdef\n")
802 t.flush()
803 self.assertEquals(b.getvalue(), b"abcdef\n")
804 # (4) replace
805 b = io.BytesIO()
806 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
807 newline="\n")
808 t.write(u"abc\xffdef\n")
809 t.flush()
810 self.assertEquals(b.getvalue(), b"abc?def\n")
811
812 def testNewlinesInput(self):
813 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
814 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
815 for newline, expected in [
816 (None, normalized.decode("ascii").splitlines(True)),
817 ("", testdata.decode("ascii").splitlines(True)),
818 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
819 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
820 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
821 ]:
822 buf = io.BytesIO(testdata)
823 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
824 self.assertEquals(txt.readlines(), expected)
825 txt.seek(0)
826 self.assertEquals(txt.read(), "".join(expected))
827
828 def testNewlinesOutput(self):
829 testdict = {
830 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
831 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
832 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
833 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
834 }
835 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
836 for newline, expected in tests:
837 buf = io.BytesIO()
838 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
839 txt.write("AAA\nB")
840 txt.write("BB\nCCC\n")
841 txt.write("X\rY\r\nZ")
842 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000843 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000844 self.assertEquals(buf.getvalue(), expected)
845
846 def testNewlines(self):
847 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
848
849 tests = [
850 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
851 [ '', input_lines ],
852 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
853 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
854 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
855 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000856 encodings = (
857 'utf-8', 'latin-1',
858 'utf-16', 'utf-16-le', 'utf-16-be',
859 'utf-32', 'utf-32-le', 'utf-32-be',
860 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000861
862 # Try a range of buffer sizes to test the case where \r is the last
863 # character in TextIOWrapper._pending_line.
864 for encoding in encodings:
865 # XXX: str.encode() should return bytes
866 data = bytes(''.join(input_lines).encode(encoding))
867 for do_reads in (False, True):
868 for bufsize in range(1, 10):
869 for newline, exp_lines in tests:
870 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
871 textio = io.TextIOWrapper(bufio, newline=newline,
872 encoding=encoding)
873 if do_reads:
874 got_lines = []
875 while True:
876 c2 = textio.read(2)
877 if c2 == '':
878 break
879 self.assertEquals(len(c2), 2)
880 got_lines.append(c2 + textio.readline())
881 else:
882 got_lines = list(textio)
883
884 for got_line, exp_line in zip(got_lines, exp_lines):
885 self.assertEquals(got_line, exp_line)
886 self.assertEquals(len(got_lines), len(exp_lines))
887
888 def testNewlinesInput(self):
889 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
890 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
891 for newline, expected in [
892 (None, normalized.decode("ascii").splitlines(True)),
893 ("", testdata.decode("ascii").splitlines(True)),
894 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
895 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
896 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
897 ]:
898 buf = io.BytesIO(testdata)
899 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
900 self.assertEquals(txt.readlines(), expected)
901 txt.seek(0)
902 self.assertEquals(txt.read(), "".join(expected))
903
904 def testNewlinesOutput(self):
905 data = u"AAA\nBBB\rCCC\n"
906 data_lf = b"AAA\nBBB\rCCC\n"
907 data_cr = b"AAA\rBBB\rCCC\r"
908 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
909 save_linesep = os.linesep
910 try:
911 for os.linesep, newline, expected in [
912 ("\n", None, data_lf),
913 ("\r\n", None, data_crlf),
914 ("\n", "", data_lf),
915 ("\r\n", "", data_lf),
916 ("\n", "\n", data_lf),
917 ("\r\n", "\n", data_lf),
918 ("\n", "\r", data_cr),
919 ("\r\n", "\r", data_cr),
920 ("\n", "\r\n", data_crlf),
921 ("\r\n", "\r\n", data_crlf),
922 ]:
923 buf = io.BytesIO()
924 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
925 txt.write(data)
926 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000927 self.assertEquals(buf.closed, True)
928 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000929 finally:
930 os.linesep = save_linesep
931
932 # Systematic tests of the text I/O API
933
934 def testBasicIO(self):
935 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
936 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
937 f = io.open(test_support.TESTFN, "w+", encoding=enc)
938 f._CHUNK_SIZE = chunksize
939 self.assertEquals(f.write(u"abc"), 3)
940 f.close()
941 f = io.open(test_support.TESTFN, "r+", encoding=enc)
942 f._CHUNK_SIZE = chunksize
943 self.assertEquals(f.tell(), 0)
944 self.assertEquals(f.read(), u"abc")
945 cookie = f.tell()
946 self.assertEquals(f.seek(0), 0)
947 self.assertEquals(f.read(2), u"ab")
948 self.assertEquals(f.read(1), u"c")
949 self.assertEquals(f.read(1), u"")
950 self.assertEquals(f.read(), u"")
951 self.assertEquals(f.tell(), cookie)
952 self.assertEquals(f.seek(0), 0)
953 self.assertEquals(f.seek(0, 2), cookie)
954 self.assertEquals(f.write(u"def"), 3)
955 self.assertEquals(f.seek(cookie), cookie)
956 self.assertEquals(f.read(), u"def")
957 if enc.startswith("utf"):
958 self.multi_line_test(f, enc)
959 f.close()
960
961 def multi_line_test(self, f, enc):
962 f.seek(0)
963 f.truncate()
964 sample = u"s\xff\u0fff\uffff"
965 wlines = []
966 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
967 chars = []
968 for i in range(size):
969 chars.append(sample[i % len(sample)])
970 line = u"".join(chars) + u"\n"
971 wlines.append((f.tell(), line))
972 f.write(line)
973 f.seek(0)
974 rlines = []
975 while True:
976 pos = f.tell()
977 line = f.readline()
978 if not line:
979 break
980 rlines.append((pos, line))
981 self.assertEquals(rlines, wlines)
982
983 def testTelling(self):
984 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
985 p0 = f.tell()
986 f.write(u"\xff\n")
987 p1 = f.tell()
988 f.write(u"\xff\n")
989 p2 = f.tell()
990 f.seek(0)
991 self.assertEquals(f.tell(), p0)
992 self.assertEquals(f.readline(), u"\xff\n")
993 self.assertEquals(f.tell(), p1)
994 self.assertEquals(f.readline(), u"\xff\n")
995 self.assertEquals(f.tell(), p2)
996 f.seek(0)
997 for line in f:
998 self.assertEquals(line, u"\xff\n")
999 self.assertRaises(IOError, f.tell)
1000 self.assertEquals(f.tell(), p2)
1001 f.close()
1002
1003 def testSeeking(self):
1004 chunk_size = io.TextIOWrapper._CHUNK_SIZE
1005 prefix_size = chunk_size - 2
1006 u_prefix = "a" * prefix_size
1007 prefix = bytes(u_prefix.encode("utf-8"))
1008 self.assertEquals(len(u_prefix), len(prefix))
1009 u_suffix = "\u8888\n"
1010 suffix = bytes(u_suffix.encode("utf-8"))
1011 line = prefix + suffix
1012 f = io.open(test_support.TESTFN, "wb")
1013 f.write(line*2)
1014 f.close()
1015 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1016 s = f.read(prefix_size)
1017 self.assertEquals(s, unicode(prefix, "ascii"))
1018 self.assertEquals(f.tell(), prefix_size)
1019 self.assertEquals(f.readline(), u_suffix)
1020
1021 def testSeekingToo(self):
1022 # Regression test for a specific bug
1023 data = b'\xe0\xbf\xbf\n'
1024 f = io.open(test_support.TESTFN, "wb")
1025 f.write(data)
1026 f.close()
1027 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1028 f._CHUNK_SIZE # Just test that it exists
1029 f._CHUNK_SIZE = 2
1030 f.readline()
1031 f.tell()
1032
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001033 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001034 """Test seek/tell using the StatefulIncrementalDecoder."""
1035
Christian Heimes1a6387e2008-03-26 12:49:49 +00001036 def testSeekAndTellWithData(data, min_pos=0):
1037 """Tell/seek to various points within a data stream and ensure
1038 that the decoded data returned by read() is consistent."""
1039 f = io.open(test_support.TESTFN, 'wb')
1040 f.write(data)
1041 f.close()
1042 f = io.open(test_support.TESTFN, encoding='test_decoder')
1043 decoded = f.read()
1044 f.close()
1045
1046 for i in range(min_pos, len(decoded) + 1): # seek positions
1047 for j in [1, 5, len(decoded) - i]: # read lengths
1048 f = io.open(test_support.TESTFN, encoding='test_decoder')
1049 self.assertEquals(f.read(i), decoded[:i])
1050 cookie = f.tell()
1051 self.assertEquals(f.read(j), decoded[i:i + j])
1052 f.seek(cookie)
1053 self.assertEquals(f.read(), decoded[i:])
1054 f.close()
1055
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001056 # Enable the test decoder.
1057 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001058
1059 # Run the tests.
1060 try:
1061 # Try each test case.
1062 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1063 testSeekAndTellWithData(input)
1064
1065 # Position each test case so that it crosses a chunk boundary.
1066 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1067 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1068 offset = CHUNK_SIZE - len(input)//2
1069 prefix = b'.'*offset
1070 # Don't bother seeking into the prefix (takes too long).
1071 min_pos = offset*2
1072 testSeekAndTellWithData(prefix + input, min_pos)
1073
1074 # Ensure our test decoder won't interfere with subsequent tests.
1075 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001076 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001077
1078 def testEncodedWrites(self):
1079 data = u"1234567890"
1080 tests = ("utf-16",
1081 "utf-16-le",
1082 "utf-16-be",
1083 "utf-32",
1084 "utf-32-le",
1085 "utf-32-be")
1086 for encoding in tests:
1087 buf = io.BytesIO()
1088 f = io.TextIOWrapper(buf, encoding=encoding)
1089 # Check if the BOM is written only once (see issue1753).
1090 f.write(data)
1091 f.write(data)
1092 f.seek(0)
1093 self.assertEquals(f.read(), data * 2)
1094 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1095
1096 def timingTest(self):
1097 timer = time.time
1098 enc = "utf8"
1099 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1100 nlines = 10000
1101 nchars = len(line)
1102 nbytes = len(line.encode(enc))
1103 for chunk_size in (32, 64, 128, 256):
1104 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1105 f._CHUNK_SIZE = chunk_size
1106 t0 = timer()
1107 for i in range(nlines):
1108 f.write(line)
1109 f.flush()
1110 t1 = timer()
1111 f.seek(0)
1112 for line in f:
1113 pass
1114 t2 = timer()
1115 f.seek(0)
1116 while f.readline():
1117 pass
1118 t3 = timer()
1119 f.seek(0)
1120 while f.readline():
1121 f.tell()
1122 t4 = timer()
1123 f.close()
1124 if test_support.verbose:
1125 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1126 (nlines, nchars, nbytes))
1127 print("File chunk size: %6s" % f._CHUNK_SIZE)
1128 print("Writing: %6.3f seconds" % (t1-t0))
1129 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1130 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1131 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1132
1133 def testReadOneByOne(self):
1134 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1135 reads = ""
1136 while True:
1137 c = txt.read(1)
1138 if not c:
1139 break
1140 reads += c
1141 self.assertEquals(reads, "AA\nBB")
1142
1143 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1144 def testReadByChunk(self):
1145 # make sure "\r\n" straddles 128 char boundary.
1146 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1147 reads = ""
1148 while True:
1149 c = txt.read(128)
1150 if not c:
1151 break
1152 reads += c
1153 self.assertEquals(reads, "A"*127+"\nB")
1154
1155 def test_issue1395_1(self):
1156 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1157
1158 # read one char at a time
1159 reads = ""
1160 while True:
1161 c = txt.read(1)
1162 if not c:
1163 break
1164 reads += c
1165 self.assertEquals(reads, self.normalized)
1166
1167 def test_issue1395_2(self):
1168 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1169 txt._CHUNK_SIZE = 4
1170
1171 reads = ""
1172 while True:
1173 c = txt.read(4)
1174 if not c:
1175 break
1176 reads += c
1177 self.assertEquals(reads, self.normalized)
1178
1179 def test_issue1395_3(self):
1180 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1181 txt._CHUNK_SIZE = 4
1182
1183 reads = txt.read(4)
1184 reads += txt.read(4)
1185 reads += txt.readline()
1186 reads += txt.readline()
1187 reads += txt.readline()
1188 self.assertEquals(reads, self.normalized)
1189
1190 def test_issue1395_4(self):
1191 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1192 txt._CHUNK_SIZE = 4
1193
1194 reads = txt.read(4)
1195 reads += txt.read()
1196 self.assertEquals(reads, self.normalized)
1197
1198 def test_issue1395_5(self):
1199 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1200 txt._CHUNK_SIZE = 4
1201
1202 reads = txt.read(4)
1203 pos = txt.tell()
1204 txt.seek(0)
1205 txt.seek(pos)
1206 self.assertEquals(txt.read(4), "BBB\n")
1207
1208 def test_issue2282(self):
1209 buffer = io.BytesIO(self.testdata)
1210 txt = io.TextIOWrapper(buffer, encoding="ascii")
1211
1212 self.assertEqual(buffer.seekable(), txt.seekable())
1213
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001214 def check_newline_decoder_utf8(self, decoder):
1215 # UTF-8 specific tests for a newline decoder
1216 def _check_decode(b, s, **kwargs):
1217 # We exercise getstate() / setstate() as well as decode()
1218 state = decoder.getstate()
1219 self.assertEquals(decoder.decode(b, **kwargs), s)
1220 decoder.setstate(state)
1221 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001223 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001224
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001225 _check_decode(b'\xe8', "")
1226 _check_decode(b'\xa2', "")
1227 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001228
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001229 _check_decode(b'\xe8', "")
1230 _check_decode(b'\xa2', "")
1231 _check_decode(b'\x88', "\u8888")
1232
1233 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001234 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1235
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001236 decoder.reset()
1237 _check_decode(b'\n', "\n")
1238 _check_decode(b'\r', "")
1239 _check_decode(b'', "\n", final=True)
1240 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001241
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001242 _check_decode(b'\r', "")
1243 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001244
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001245 _check_decode(b'\r\r\n', "\n\n")
1246 _check_decode(b'\r', "")
1247 _check_decode(b'\r', "\n")
1248 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001249
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001250 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1251 _check_decode(b'\xe8\xa2\x88', "\u8888")
1252 _check_decode(b'\n', "\n")
1253 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1254 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001255
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001256 def check_newline_decoder(self, decoder, encoding):
1257 result = []
1258 encoder = codecs.getincrementalencoder(encoding)()
1259 def _decode_bytewise(s):
1260 for b in encoder.encode(s):
1261 result.append(decoder.decode(b))
1262 self.assertEquals(decoder.newlines, None)
1263 _decode_bytewise("abc\n\r")
1264 self.assertEquals(decoder.newlines, '\n')
1265 _decode_bytewise("\nabc")
1266 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1267 _decode_bytewise("abc\r")
1268 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1269 _decode_bytewise("abc")
1270 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1271 _decode_bytewise("abc\r")
1272 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1273 decoder.reset()
1274 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1275 self.assertEquals(decoder.newlines, None)
1276
1277 def test_newline_decoder(self):
1278 encodings = (
1279 'utf-8', 'latin-1',
1280 'utf-16', 'utf-16-le', 'utf-16-be',
1281 'utf-32', 'utf-32-le', 'utf-32-be',
1282 )
1283 for enc in encodings:
1284 decoder = codecs.getincrementaldecoder(enc)()
1285 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1286 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001287 decoder = codecs.getincrementaldecoder("utf-8")()
1288 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001289 self.check_newline_decoder_utf8(decoder)
1290
Christian Heimes1a6387e2008-03-26 12:49:49 +00001291
1292# XXX Tests for open()
1293
1294class MiscIOTest(unittest.TestCase):
1295
Benjamin Petersonad100c32008-11-20 22:06:22 +00001296 def tearDown(self):
1297 test_support.unlink(test_support.TESTFN)
1298
Christian Heimes1a6387e2008-03-26 12:49:49 +00001299 def testImport__all__(self):
1300 for name in io.__all__:
1301 obj = getattr(io, name, None)
1302 self.assert_(obj is not None, name)
1303 if name == "open":
1304 continue
1305 elif "error" in name.lower():
1306 self.assert_(issubclass(obj, Exception), name)
1307 else:
1308 self.assert_(issubclass(obj, io.IOBase))
1309
1310
Benjamin Petersonad100c32008-11-20 22:06:22 +00001311 def test_attributes(self):
1312 f = io.open(test_support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001313 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001314 f.close()
1315
1316 f = io.open(test_support.TESTFN, "U")
1317 self.assertEquals(f.name, test_support.TESTFN)
1318 self.assertEquals(f.buffer.name, test_support.TESTFN)
1319 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1320 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001321 self.assertEquals(f.buffer.mode, "rb")
1322 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001323 f.close()
1324
1325 f = io.open(test_support.TESTFN, "w+")
1326 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001327 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1328 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001329
1330 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001331 self.assertEquals(g.mode, "wb")
1332 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001333 self.assertEquals(g.name, f.fileno())
1334 self.assertEquals(g.raw.name, f.fileno())
1335 f.close()
1336 g.close()
1337
1338
Christian Heimes1a6387e2008-03-26 12:49:49 +00001339def test_main():
1340 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001341 BufferedReaderTest, BufferedWriterTest,
1342 BufferedRWPairTest, BufferedRandomTest,
1343 StatefulIncrementalDecoderTest,
1344 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001345
1346if __name__ == "__main__":
1347 unittest.main()