blob: 967018ea453a0372641ee5f50f69292ff828c9d2 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
90 self.assertEqual(f.write(b"blah."), 5)
91 self.assertEqual(f.seek(0), 0)
92 self.assertEqual(f.write(b"Hello."), 6)
93 self.assertEqual(f.tell(), 6)
94 self.assertEqual(f.seek(-1, 1), 5)
95 self.assertEqual(f.tell(), 5)
96 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"h"), 1)
99 self.assertEqual(f.seek(-1, 2), 13)
100 self.assertEqual(f.tell(), 13)
101 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000102 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000103 self.assertRaises(TypeError, f.seek, 0.0)
104
105 def read_ops(self, f, buffered=False):
106 data = f.read(5)
107 self.assertEqual(data, b"hello")
108 data = bytearray(data)
109 self.assertEqual(f.readinto(data), 5)
110 self.assertEqual(data, b" worl")
111 self.assertEqual(f.readinto(data), 2)
112 self.assertEqual(len(data), 5)
113 self.assertEqual(data[:2], b"d\n")
114 self.assertEqual(f.seek(0), 0)
115 self.assertEqual(f.read(20), b"hello world\n")
116 self.assertEqual(f.read(1), b"")
117 self.assertEqual(f.readinto(bytearray(b"x")), 0)
118 self.assertEqual(f.seek(-6, 2), 6)
119 self.assertEqual(f.read(5), b"world")
120 self.assertEqual(f.read(0), b"")
121 self.assertEqual(f.readinto(bytearray()), 0)
122 self.assertEqual(f.seek(-6, 1), 5)
123 self.assertEqual(f.read(5), b" worl")
124 self.assertEqual(f.tell(), 10)
125 self.assertRaises(TypeError, f.seek, 0.0)
126 if buffered:
127 f.seek(0)
128 self.assertEqual(f.read(), b"hello world\n")
129 f.seek(6)
130 self.assertEqual(f.read(), b"world\n")
131 self.assertEqual(f.read(), b"")
132
133 LARGE = 2**31
134
135 def large_file_ops(self, f):
136 assert f.readable()
137 assert f.writable()
138 self.assertEqual(f.seek(self.LARGE), self.LARGE)
139 self.assertEqual(f.tell(), self.LARGE)
140 self.assertEqual(f.write(b"xxx"), 3)
141 self.assertEqual(f.tell(), self.LARGE + 3)
142 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
143 self.assertEqual(f.truncate(), self.LARGE + 2)
144 self.assertEqual(f.tell(), self.LARGE + 2)
145 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
146 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000147 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000148 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
149 self.assertEqual(f.seek(-1, 2), self.LARGE)
150 self.assertEqual(f.read(2), b"x")
151
152 def test_raw_file_io(self):
153 f = io.open(test_support.TESTFN, "wb", buffering=0)
154 self.assertEqual(f.readable(), False)
155 self.assertEqual(f.writable(), True)
156 self.assertEqual(f.seekable(), True)
157 self.write_ops(f)
158 f.close()
159 f = io.open(test_support.TESTFN, "rb", buffering=0)
160 self.assertEqual(f.readable(), True)
161 self.assertEqual(f.writable(), False)
162 self.assertEqual(f.seekable(), True)
163 self.read_ops(f)
164 f.close()
165
166 def test_buffered_file_io(self):
167 f = io.open(test_support.TESTFN, "wb")
168 self.assertEqual(f.readable(), False)
169 self.assertEqual(f.writable(), True)
170 self.assertEqual(f.seekable(), True)
171 self.write_ops(f)
172 f.close()
173 f = io.open(test_support.TESTFN, "rb")
174 self.assertEqual(f.readable(), True)
175 self.assertEqual(f.writable(), False)
176 self.assertEqual(f.seekable(), True)
177 self.read_ops(f, True)
178 f.close()
179
180 def test_readline(self):
181 f = io.open(test_support.TESTFN, "wb")
182 f.write(b"abc\ndef\nxyzzy\nfoo")
183 f.close()
184 f = io.open(test_support.TESTFN, "rb")
185 self.assertEqual(f.readline(), b"abc\n")
186 self.assertEqual(f.readline(10), b"def\n")
187 self.assertEqual(f.readline(2), b"xy")
188 self.assertEqual(f.readline(4), b"zzy\n")
189 self.assertEqual(f.readline(), b"foo")
190 f.close()
191
192 def test_raw_bytes_io(self):
193 f = io.BytesIO()
194 self.write_ops(f)
195 data = f.getvalue()
196 self.assertEqual(data, b"hello world\n")
197 f = io.BytesIO(data)
198 self.read_ops(f, True)
199
200 def test_large_file_ops(self):
201 # On Windows and Mac OSX this test comsumes large resources; It takes
202 # a long time to build the >2GB file and takes >2GB of disk space
203 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000204 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 if not test_support.is_resource_enabled("largefile"):
206 print("\nTesting large file ops skipped on %s." % sys.platform,
207 file=sys.stderr)
208 print("It requires %d bytes and a long time." % self.LARGE,
209 file=sys.stderr)
210 print("Use 'regrtest.py -u largefile test_io' to run it.",
211 file=sys.stderr)
212 return
213 f = io.open(test_support.TESTFN, "w+b", 0)
214 self.large_file_ops(f)
215 f.close()
216 f = io.open(test_support.TESTFN, "w+b")
217 self.large_file_ops(f)
218 f.close()
219
220 def test_with_open(self):
221 for bufsize in (0, 1, 100):
222 f = None
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 f.write(b"xxx")
225 self.assertEqual(f.closed, True)
226 f = None
227 try:
228 with open(test_support.TESTFN, "wb", bufsize) as f:
229 1/0
230 except ZeroDivisionError:
231 self.assertEqual(f.closed, True)
232 else:
233 self.fail("1/0 didn't raise an exception")
234
235 def test_destructor(self):
236 record = []
237 class MyFileIO(io.FileIO):
238 def __del__(self):
239 record.append(1)
240 io.FileIO.__del__(self)
241 def close(self):
242 record.append(2)
243 io.FileIO.close(self)
244 def flush(self):
245 record.append(3)
246 io.FileIO.flush(self)
247 f = MyFileIO(test_support.TESTFN, "w")
248 f.write("xxx")
249 del f
250 self.assertEqual(record, [1, 2, 3])
251
252 def test_close_flushes(self):
253 f = io.open(test_support.TESTFN, "wb")
254 f.write(b"xxx")
255 f.close()
256 f = io.open(test_support.TESTFN, "rb")
257 self.assertEqual(f.read(), b"xxx")
258 f.close()
259
260 def XXXtest_array_writes(self):
261 # XXX memory view not available yet
262 a = array.array('i', range(10))
263 n = len(memoryview(a))
264 f = io.open(test_support.TESTFN, "wb", 0)
265 self.assertEqual(f.write(a), n)
266 f.close()
267 f = io.open(test_support.TESTFN, "wb")
268 self.assertEqual(f.write(a), n)
269 f.close()
270
271 def test_closefd(self):
272 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
273 closefd=False)
274
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000275 def testReadClosed(self):
276 with io.open(test_support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(test_support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError,
288 io.open, test_support.TESTFN, "r", closefd=False)
289
290 def test_closefd_attr(self):
291 with io.open(test_support.TESTFN, "wb") as f:
292 f.write(b"egg\n")
293 with io.open(test_support.TESTFN, "r") as f:
294 self.assertEqual(f.buffer.raw.closefd, True)
295 file = io.open(f.fileno(), "r", closefd=False)
296 self.assertEqual(file.buffer.raw.closefd, False)
297
298
Christian Heimes1a6387e2008-03-26 12:49:49 +0000299class MemorySeekTestMixin:
300
301 def testInit(self):
302 buf = self.buftype("1234567890")
303 bytesIo = self.ioclass(buf)
304
305 def testRead(self):
306 buf = self.buftype("1234567890")
307 bytesIo = self.ioclass(buf)
308
309 self.assertEquals(buf[:1], bytesIo.read(1))
310 self.assertEquals(buf[1:5], bytesIo.read(4))
311 self.assertEquals(buf[5:], bytesIo.read(900))
312 self.assertEquals(self.EOF, bytesIo.read())
313
314 def testReadNoArgs(self):
315 buf = self.buftype("1234567890")
316 bytesIo = self.ioclass(buf)
317
318 self.assertEquals(buf, bytesIo.read())
319 self.assertEquals(self.EOF, bytesIo.read())
320
321 def testSeek(self):
322 buf = self.buftype("1234567890")
323 bytesIo = self.ioclass(buf)
324
325 bytesIo.read(5)
326 bytesIo.seek(0)
327 self.assertEquals(buf, bytesIo.read())
328
329 bytesIo.seek(3)
330 self.assertEquals(buf[3:], bytesIo.read())
331 self.assertRaises(TypeError, bytesIo.seek, 0.0)
332
333 def testTell(self):
334 buf = self.buftype("1234567890")
335 bytesIo = self.ioclass(buf)
336
337 self.assertEquals(0, bytesIo.tell())
338 bytesIo.seek(5)
339 self.assertEquals(5, bytesIo.tell())
340 bytesIo.seek(10000)
341 self.assertEquals(10000, bytesIo.tell())
342
343
344class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
345 @staticmethod
346 def buftype(s):
347 return s.encode("utf-8")
348 ioclass = io.BytesIO
349 EOF = b""
350
351
352class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
353 buftype = str
354 ioclass = io.StringIO
355 EOF = ""
356
357
358class BufferedReaderTest(unittest.TestCase):
359
360 def testRead(self):
361 rawio = MockRawIO((b"abc", b"d", b"efg"))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcdef", bufio.read(6))
365
366 def testBuffering(self):
367 data = b"abcdefghi"
368 dlen = len(data)
369
370 tests = [
371 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
372 [ 100, [ 3, 3, 3], [ dlen ] ],
373 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
374 ]
375
376 for bufsize, buf_read_sizes, raw_read_sizes in tests:
377 rawio = MockFileIO(data)
378 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
379 pos = 0
380 for nbytes in buf_read_sizes:
381 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
382 pos += nbytes
383 self.assertEquals(rawio.read_history, raw_read_sizes)
384
385 def testReadNonBlocking(self):
386 # Inject some None's in there to simulate EWOULDBLOCK
387 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
388 bufio = io.BufferedReader(rawio)
389
390 self.assertEquals(b"abcd", bufio.read(6))
391 self.assertEquals(b"e", bufio.read(1))
392 self.assertEquals(b"fg", bufio.read())
393 self.assert_(None is bufio.read())
394 self.assertEquals(b"", bufio.read())
395
396 def testReadToEof(self):
397 rawio = MockRawIO((b"abc", b"d", b"efg"))
398 bufio = io.BufferedReader(rawio)
399
400 self.assertEquals(b"abcdefg", bufio.read(9000))
401
402 def testReadNoArgs(self):
403 rawio = MockRawIO((b"abc", b"d", b"efg"))
404 bufio = io.BufferedReader(rawio)
405
406 self.assertEquals(b"abcdefg", bufio.read())
407
408 def testFileno(self):
409 rawio = MockRawIO((b"abc", b"d", b"efg"))
410 bufio = io.BufferedReader(rawio)
411
412 self.assertEquals(42, bufio.fileno())
413
414 def testFilenoNoFileno(self):
415 # XXX will we always have fileno() function? If so, kill
416 # this test. Else, write it.
417 pass
418
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000419 def testThreads(self):
420 try:
421 # Write out many bytes with exactly the same number of 0's,
422 # 1's... 255's. This will help us check that concurrent reading
423 # doesn't duplicate or forget contents.
424 N = 1000
425 l = range(256) * N
426 random.shuffle(l)
427 s = bytes(bytearray(l))
428 with io.open(test_support.TESTFN, "wb") as f:
429 f.write(s)
430 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
431 bufio = io.BufferedReader(raw, 8)
432 errors = []
433 results = []
434 def f():
435 try:
436 # Intra-buffer read then buffer-flushing read
437 for n in cycle([1, 19]):
438 s = bufio.read(n)
439 if not s:
440 break
441 # list.append() is atomic
442 results.append(s)
443 except Exception as e:
444 errors.append(e)
445 raise
446 threads = [threading.Thread(target=f) for x in range(20)]
447 for t in threads:
448 t.start()
449 time.sleep(0.02) # yield
450 for t in threads:
451 t.join()
452 self.assertFalse(errors,
453 "the following exceptions were caught: %r" % errors)
454 s = b''.join(results)
455 for i in range(256):
456 c = bytes(bytearray([i]))
457 self.assertEqual(s.count(c), N)
458 finally:
459 test_support.unlink(test_support.TESTFN)
460
461
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462
463class BufferedWriterTest(unittest.TestCase):
464
465 def testWrite(self):
466 # Write to the buffered IO but don't overflow the buffer.
467 writer = MockRawIO()
468 bufio = io.BufferedWriter(writer, 8)
469
470 bufio.write(b"abc")
471
472 self.assertFalse(writer._write_stack)
473
474 def testWriteOverflow(self):
475 writer = MockRawIO()
476 bufio = io.BufferedWriter(writer, 8)
477
478 bufio.write(b"abc")
479 bufio.write(b"defghijkl")
480
481 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
482
483 def testWriteNonBlocking(self):
484 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
485 bufio = io.BufferedWriter(raw, 8, 16)
486
487 bufio.write(b"asdf")
488 bufio.write(b"asdfa")
489 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
490
491 bufio.write(b"asdfasdfasdf")
492 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
493 bufio.write(b"asdfasdfasdf")
494 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
495 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
496
497 bufio.write(b"asdfasdfasdf")
498
499 # XXX I don't like this test. It relies too heavily on how the
500 # algorithm actually works, which we might change. Refactor
501 # later.
502
503 def testFileno(self):
504 rawio = MockRawIO((b"abc", b"d", b"efg"))
505 bufio = io.BufferedWriter(rawio)
506
507 self.assertEquals(42, bufio.fileno())
508
509 def testFlush(self):
510 writer = MockRawIO()
511 bufio = io.BufferedWriter(writer, 8)
512
513 bufio.write(b"abc")
514 bufio.flush()
515
516 self.assertEquals(b"abc", writer._write_stack[0])
517
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000518 def testThreads(self):
519 # BufferedWriter should not raise exceptions or crash
520 # when called from multiple threads.
521 try:
522 # We use a real file object because it allows us to
523 # exercise situations where the GIL is released before
524 # writing the buffer to the raw streams. This is in addition
525 # to concurrency issues due to switching threads in the middle
526 # of Python code.
527 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
528 bufio = io.BufferedWriter(raw, 8)
529 errors = []
530 def f():
531 try:
532 # Write enough bytes to flush the buffer
533 s = b"a" * 19
534 for i in range(50):
535 bufio.write(s)
536 except Exception as e:
537 errors.append(e)
538 raise
539 threads = [threading.Thread(target=f) for x in range(20)]
540 for t in threads:
541 t.start()
542 time.sleep(0.02) # yield
543 for t in threads:
544 t.join()
545 self.assertFalse(errors,
546 "the following exceptions were caught: %r" % errors)
547 finally:
548 test_support.unlink(test_support.TESTFN)
549
Christian Heimes1a6387e2008-03-26 12:49:49 +0000550
551class BufferedRWPairTest(unittest.TestCase):
552
553 def testRWPair(self):
554 r = MockRawIO(())
555 w = MockRawIO()
556 pair = io.BufferedRWPair(r, w)
Benjamin Peterson54686e32008-12-24 15:10:27 +0000557 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000558
Benjamin Peterson54686e32008-12-24 15:10:27 +0000559 # XXX More Tests
Christian Heimes1a6387e2008-03-26 12:49:49 +0000560
561
562class BufferedRandomTest(unittest.TestCase):
563
564 def testReadAndWrite(self):
565 raw = MockRawIO((b"asdf", b"ghjk"))
566 rw = io.BufferedRandom(raw, 8, 12)
567
568 self.assertEqual(b"as", rw.read(2))
569 rw.write(b"ddd")
570 rw.write(b"eee")
571 self.assertFalse(raw._write_stack) # Buffer writes
572 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
573 self.assertEquals(b"dddeee", raw._write_stack[0])
574
575 def testSeekAndTell(self):
576 raw = io.BytesIO(b"asdfghjkl")
577 rw = io.BufferedRandom(raw)
578
579 self.assertEquals(b"as", rw.read(2))
580 self.assertEquals(2, rw.tell())
581 rw.seek(0, 0)
582 self.assertEquals(b"asdf", rw.read(4))
583
584 rw.write(b"asdf")
585 rw.seek(0, 0)
586 self.assertEquals(b"asdfasdfl", rw.read())
587 self.assertEquals(9, rw.tell())
588 rw.seek(-4, 2)
589 self.assertEquals(5, rw.tell())
590 rw.seek(2, 1)
591 self.assertEquals(7, rw.tell())
592 self.assertEquals(b"fl", rw.read(11))
593 self.assertRaises(TypeError, rw.seek, 0.0)
594
595# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
596# properties:
597# - A single output character can correspond to many bytes of input.
598# - The number of input bytes to complete the character can be
599# undetermined until the last input byte is received.
600# - The number of input bytes can vary depending on previous input.
601# - A single input byte can correspond to many characters of output.
602# - The number of output characters can be undetermined until the
603# last input byte is received.
604# - The number of output characters can vary depending on previous input.
605
606class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
607 """
608 For testing seek/tell behavior with a stateful, buffering decoder.
609
610 Input is a sequence of words. Words may be fixed-length (length set
611 by input) or variable-length (period-terminated). In variable-length
612 mode, extra periods are ignored. Possible words are:
613 - 'i' followed by a number sets the input length, I (maximum 99).
614 When I is set to 0, words are space-terminated.
615 - 'o' followed by a number sets the output length, O (maximum 99).
616 - Any other word is converted into a word followed by a period on
617 the output. The output word consists of the input word truncated
618 or padded out with hyphens to make its length equal to O. If O
619 is 0, the word is output verbatim without truncating or padding.
620 I and O are initially set to 1. When I changes, any buffered input is
621 re-scanned according to the new I. EOF also terminates the last word.
622 """
623
624 def __init__(self, errors='strict'):
625 codecs.IncrementalDecoder.__init__(self, errors)
626 self.reset()
627
628 def __repr__(self):
629 return '<SID %x>' % id(self)
630
631 def reset(self):
632 self.i = 1
633 self.o = 1
634 self.buffer = bytearray()
635
636 def getstate(self):
637 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
638 return bytes(self.buffer), i*100 + o
639
640 def setstate(self, state):
641 buffer, io = state
642 self.buffer = bytearray(buffer)
643 i, o = divmod(io, 100)
644 self.i, self.o = i ^ 1, o ^ 1
645
646 def decode(self, input, final=False):
647 output = ''
648 for b in input:
649 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000650 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000651 if self.buffer:
652 output += self.process_word()
653 else:
654 self.buffer.append(b)
655 else: # fixed-length, terminate after self.i bytes
656 self.buffer.append(b)
657 if len(self.buffer) == self.i:
658 output += self.process_word()
659 if final and self.buffer: # EOF terminates the last word
660 output += self.process_word()
661 return output
662
663 def process_word(self):
664 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000665 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000666 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000667 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000668 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
669 else:
670 output = self.buffer.decode('ascii')
671 if len(output) < self.o:
672 output += '-'*self.o # pad out with hyphens
673 if self.o:
674 output = output[:self.o] # truncate to output length
675 output += '.'
676 self.buffer = bytearray()
677 return output
678
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000679 codecEnabled = False
680
681 @classmethod
682 def lookupTestDecoder(cls, name):
683 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000684 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000685 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000686 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000687 incrementalencoder=None,
688 streamreader=None, streamwriter=None,
689 incrementaldecoder=cls)
690
691# Register the previous decoder for testing.
692# Disabled by default, tests will enable it.
693codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
694
695
Christian Heimes1a6387e2008-03-26 12:49:49 +0000696class StatefulIncrementalDecoderTest(unittest.TestCase):
697 """
698 Make sure the StatefulIncrementalDecoder actually works.
699 """
700
701 test_cases = [
702 # I=1, O=1 (fixed-length input == fixed-length output)
703 (b'abcd', False, 'a.b.c.d.'),
704 # I=0, O=0 (variable-length input, variable-length output)
705 (b'oiabcd', True, 'abcd.'),
706 # I=0, O=0 (should ignore extra periods)
707 (b'oi...abcd...', True, 'abcd.'),
708 # I=0, O=6 (variable-length input, fixed-length output)
709 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
710 # I=2, O=6 (fixed-length input < fixed-length output)
711 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
712 # I=6, O=3 (fixed-length input > fixed-length output)
713 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
714 # I=0, then 3; O=29, then 15 (with longer output)
715 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
716 'a----------------------------.' +
717 'b----------------------------.' +
718 'cde--------------------------.' +
719 'abcdefghijabcde.' +
720 'a.b------------.' +
721 '.c.------------.' +
722 'd.e------------.' +
723 'k--------------.' +
724 'l--------------.' +
725 'm--------------.')
726 ]
727
728 def testDecoder(self):
729 # Try a few one-shot test cases.
730 for input, eof, output in self.test_cases:
731 d = StatefulIncrementalDecoder()
732 self.assertEquals(d.decode(input, eof), output)
733
734 # Also test an unfinished decode, followed by forcing EOF.
735 d = StatefulIncrementalDecoder()
736 self.assertEquals(d.decode(b'oiabcd'), '')
737 self.assertEquals(d.decode(b'', 1), 'abcd.')
738
739class TextIOWrapperTest(unittest.TestCase):
740
741 def setUp(self):
742 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
743 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
744
745 def tearDown(self):
746 test_support.unlink(test_support.TESTFN)
747
748 def testLineBuffering(self):
749 r = io.BytesIO()
750 b = io.BufferedWriter(r, 1000)
751 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
752 t.write(u"X")
753 self.assertEquals(r.getvalue(), b"") # No flush happened
754 t.write(u"Y\nZ")
755 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
756 t.write(u"A\rB")
757 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
758
759 def testEncodingErrorsReading(self):
760 # (1) default
761 b = io.BytesIO(b"abc\n\xff\n")
762 t = io.TextIOWrapper(b, encoding="ascii")
763 self.assertRaises(UnicodeError, t.read)
764 # (2) explicit strict
765 b = io.BytesIO(b"abc\n\xff\n")
766 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
767 self.assertRaises(UnicodeError, t.read)
768 # (3) ignore
769 b = io.BytesIO(b"abc\n\xff\n")
770 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
771 self.assertEquals(t.read(), "abc\n\n")
772 # (4) replace
773 b = io.BytesIO(b"abc\n\xff\n")
774 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
775 self.assertEquals(t.read(), u"abc\n\ufffd\n")
776
777 def testEncodingErrorsWriting(self):
778 # (1) default
779 b = io.BytesIO()
780 t = io.TextIOWrapper(b, encoding="ascii")
781 self.assertRaises(UnicodeError, t.write, u"\xff")
782 # (2) explicit strict
783 b = io.BytesIO()
784 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
785 self.assertRaises(UnicodeError, t.write, u"\xff")
786 # (3) ignore
787 b = io.BytesIO()
788 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
789 newline="\n")
790 t.write(u"abc\xffdef\n")
791 t.flush()
792 self.assertEquals(b.getvalue(), b"abcdef\n")
793 # (4) replace
794 b = io.BytesIO()
795 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
796 newline="\n")
797 t.write(u"abc\xffdef\n")
798 t.flush()
799 self.assertEquals(b.getvalue(), b"abc?def\n")
800
801 def testNewlinesInput(self):
802 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
803 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
804 for newline, expected in [
805 (None, normalized.decode("ascii").splitlines(True)),
806 ("", testdata.decode("ascii").splitlines(True)),
807 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
808 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
809 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
810 ]:
811 buf = io.BytesIO(testdata)
812 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
813 self.assertEquals(txt.readlines(), expected)
814 txt.seek(0)
815 self.assertEquals(txt.read(), "".join(expected))
816
817 def testNewlinesOutput(self):
818 testdict = {
819 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
820 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
821 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
822 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
823 }
824 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
825 for newline, expected in tests:
826 buf = io.BytesIO()
827 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
828 txt.write("AAA\nB")
829 txt.write("BB\nCCC\n")
830 txt.write("X\rY\r\nZ")
831 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000832 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000833 self.assertEquals(buf.getvalue(), expected)
834
835 def testNewlines(self):
836 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
837
838 tests = [
839 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
840 [ '', input_lines ],
841 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
842 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
843 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
844 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000845 encodings = (
846 'utf-8', 'latin-1',
847 'utf-16', 'utf-16-le', 'utf-16-be',
848 'utf-32', 'utf-32-le', 'utf-32-be',
849 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000850
851 # Try a range of buffer sizes to test the case where \r is the last
852 # character in TextIOWrapper._pending_line.
853 for encoding in encodings:
854 # XXX: str.encode() should return bytes
855 data = bytes(''.join(input_lines).encode(encoding))
856 for do_reads in (False, True):
857 for bufsize in range(1, 10):
858 for newline, exp_lines in tests:
859 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
860 textio = io.TextIOWrapper(bufio, newline=newline,
861 encoding=encoding)
862 if do_reads:
863 got_lines = []
864 while True:
865 c2 = textio.read(2)
866 if c2 == '':
867 break
868 self.assertEquals(len(c2), 2)
869 got_lines.append(c2 + textio.readline())
870 else:
871 got_lines = list(textio)
872
873 for got_line, exp_line in zip(got_lines, exp_lines):
874 self.assertEquals(got_line, exp_line)
875 self.assertEquals(len(got_lines), len(exp_lines))
876
877 def testNewlinesInput(self):
878 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
879 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
880 for newline, expected in [
881 (None, normalized.decode("ascii").splitlines(True)),
882 ("", testdata.decode("ascii").splitlines(True)),
883 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
884 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
885 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
886 ]:
887 buf = io.BytesIO(testdata)
888 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
889 self.assertEquals(txt.readlines(), expected)
890 txt.seek(0)
891 self.assertEquals(txt.read(), "".join(expected))
892
893 def testNewlinesOutput(self):
894 data = u"AAA\nBBB\rCCC\n"
895 data_lf = b"AAA\nBBB\rCCC\n"
896 data_cr = b"AAA\rBBB\rCCC\r"
897 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
898 save_linesep = os.linesep
899 try:
900 for os.linesep, newline, expected in [
901 ("\n", None, data_lf),
902 ("\r\n", None, data_crlf),
903 ("\n", "", data_lf),
904 ("\r\n", "", data_lf),
905 ("\n", "\n", data_lf),
906 ("\r\n", "\n", data_lf),
907 ("\n", "\r", data_cr),
908 ("\r\n", "\r", data_cr),
909 ("\n", "\r\n", data_crlf),
910 ("\r\n", "\r\n", data_crlf),
911 ]:
912 buf = io.BytesIO()
913 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
914 txt.write(data)
915 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000916 self.assertEquals(buf.closed, True)
917 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000918 finally:
919 os.linesep = save_linesep
920
921 # Systematic tests of the text I/O API
922
923 def testBasicIO(self):
924 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
925 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
926 f = io.open(test_support.TESTFN, "w+", encoding=enc)
927 f._CHUNK_SIZE = chunksize
928 self.assertEquals(f.write(u"abc"), 3)
929 f.close()
930 f = io.open(test_support.TESTFN, "r+", encoding=enc)
931 f._CHUNK_SIZE = chunksize
932 self.assertEquals(f.tell(), 0)
933 self.assertEquals(f.read(), u"abc")
934 cookie = f.tell()
935 self.assertEquals(f.seek(0), 0)
936 self.assertEquals(f.read(2), u"ab")
937 self.assertEquals(f.read(1), u"c")
938 self.assertEquals(f.read(1), u"")
939 self.assertEquals(f.read(), u"")
940 self.assertEquals(f.tell(), cookie)
941 self.assertEquals(f.seek(0), 0)
942 self.assertEquals(f.seek(0, 2), cookie)
943 self.assertEquals(f.write(u"def"), 3)
944 self.assertEquals(f.seek(cookie), cookie)
945 self.assertEquals(f.read(), u"def")
946 if enc.startswith("utf"):
947 self.multi_line_test(f, enc)
948 f.close()
949
950 def multi_line_test(self, f, enc):
951 f.seek(0)
952 f.truncate()
953 sample = u"s\xff\u0fff\uffff"
954 wlines = []
955 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
956 chars = []
957 for i in range(size):
958 chars.append(sample[i % len(sample)])
959 line = u"".join(chars) + u"\n"
960 wlines.append((f.tell(), line))
961 f.write(line)
962 f.seek(0)
963 rlines = []
964 while True:
965 pos = f.tell()
966 line = f.readline()
967 if not line:
968 break
969 rlines.append((pos, line))
970 self.assertEquals(rlines, wlines)
971
972 def testTelling(self):
973 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
974 p0 = f.tell()
975 f.write(u"\xff\n")
976 p1 = f.tell()
977 f.write(u"\xff\n")
978 p2 = f.tell()
979 f.seek(0)
980 self.assertEquals(f.tell(), p0)
981 self.assertEquals(f.readline(), u"\xff\n")
982 self.assertEquals(f.tell(), p1)
983 self.assertEquals(f.readline(), u"\xff\n")
984 self.assertEquals(f.tell(), p2)
985 f.seek(0)
986 for line in f:
987 self.assertEquals(line, u"\xff\n")
988 self.assertRaises(IOError, f.tell)
989 self.assertEquals(f.tell(), p2)
990 f.close()
991
992 def testSeeking(self):
993 chunk_size = io.TextIOWrapper._CHUNK_SIZE
994 prefix_size = chunk_size - 2
995 u_prefix = "a" * prefix_size
996 prefix = bytes(u_prefix.encode("utf-8"))
997 self.assertEquals(len(u_prefix), len(prefix))
998 u_suffix = "\u8888\n"
999 suffix = bytes(u_suffix.encode("utf-8"))
1000 line = prefix + suffix
1001 f = io.open(test_support.TESTFN, "wb")
1002 f.write(line*2)
1003 f.close()
1004 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1005 s = f.read(prefix_size)
1006 self.assertEquals(s, unicode(prefix, "ascii"))
1007 self.assertEquals(f.tell(), prefix_size)
1008 self.assertEquals(f.readline(), u_suffix)
1009
1010 def testSeekingToo(self):
1011 # Regression test for a specific bug
1012 data = b'\xe0\xbf\xbf\n'
1013 f = io.open(test_support.TESTFN, "wb")
1014 f.write(data)
1015 f.close()
1016 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1017 f._CHUNK_SIZE # Just test that it exists
1018 f._CHUNK_SIZE = 2
1019 f.readline()
1020 f.tell()
1021
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001022 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001023 """Test seek/tell using the StatefulIncrementalDecoder."""
1024
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025 def testSeekAndTellWithData(data, min_pos=0):
1026 """Tell/seek to various points within a data stream and ensure
1027 that the decoded data returned by read() is consistent."""
1028 f = io.open(test_support.TESTFN, 'wb')
1029 f.write(data)
1030 f.close()
1031 f = io.open(test_support.TESTFN, encoding='test_decoder')
1032 decoded = f.read()
1033 f.close()
1034
1035 for i in range(min_pos, len(decoded) + 1): # seek positions
1036 for j in [1, 5, len(decoded) - i]: # read lengths
1037 f = io.open(test_support.TESTFN, encoding='test_decoder')
1038 self.assertEquals(f.read(i), decoded[:i])
1039 cookie = f.tell()
1040 self.assertEquals(f.read(j), decoded[i:i + j])
1041 f.seek(cookie)
1042 self.assertEquals(f.read(), decoded[i:])
1043 f.close()
1044
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001045 # Enable the test decoder.
1046 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001047
1048 # Run the tests.
1049 try:
1050 # Try each test case.
1051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1052 testSeekAndTellWithData(input)
1053
1054 # Position each test case so that it crosses a chunk boundary.
1055 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1056 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1057 offset = CHUNK_SIZE - len(input)//2
1058 prefix = b'.'*offset
1059 # Don't bother seeking into the prefix (takes too long).
1060 min_pos = offset*2
1061 testSeekAndTellWithData(prefix + input, min_pos)
1062
1063 # Ensure our test decoder won't interfere with subsequent tests.
1064 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001065 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001066
1067 def testEncodedWrites(self):
1068 data = u"1234567890"
1069 tests = ("utf-16",
1070 "utf-16-le",
1071 "utf-16-be",
1072 "utf-32",
1073 "utf-32-le",
1074 "utf-32-be")
1075 for encoding in tests:
1076 buf = io.BytesIO()
1077 f = io.TextIOWrapper(buf, encoding=encoding)
1078 # Check if the BOM is written only once (see issue1753).
1079 f.write(data)
1080 f.write(data)
1081 f.seek(0)
1082 self.assertEquals(f.read(), data * 2)
1083 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1084
1085 def timingTest(self):
1086 timer = time.time
1087 enc = "utf8"
1088 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1089 nlines = 10000
1090 nchars = len(line)
1091 nbytes = len(line.encode(enc))
1092 for chunk_size in (32, 64, 128, 256):
1093 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1094 f._CHUNK_SIZE = chunk_size
1095 t0 = timer()
1096 for i in range(nlines):
1097 f.write(line)
1098 f.flush()
1099 t1 = timer()
1100 f.seek(0)
1101 for line in f:
1102 pass
1103 t2 = timer()
1104 f.seek(0)
1105 while f.readline():
1106 pass
1107 t3 = timer()
1108 f.seek(0)
1109 while f.readline():
1110 f.tell()
1111 t4 = timer()
1112 f.close()
1113 if test_support.verbose:
1114 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1115 (nlines, nchars, nbytes))
1116 print("File chunk size: %6s" % f._CHUNK_SIZE)
1117 print("Writing: %6.3f seconds" % (t1-t0))
1118 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1119 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1120 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1121
1122 def testReadOneByOne(self):
1123 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1124 reads = ""
1125 while True:
1126 c = txt.read(1)
1127 if not c:
1128 break
1129 reads += c
1130 self.assertEquals(reads, "AA\nBB")
1131
1132 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1133 def testReadByChunk(self):
1134 # make sure "\r\n" straddles 128 char boundary.
1135 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1136 reads = ""
1137 while True:
1138 c = txt.read(128)
1139 if not c:
1140 break
1141 reads += c
1142 self.assertEquals(reads, "A"*127+"\nB")
1143
1144 def test_issue1395_1(self):
1145 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1146
1147 # read one char at a time
1148 reads = ""
1149 while True:
1150 c = txt.read(1)
1151 if not c:
1152 break
1153 reads += c
1154 self.assertEquals(reads, self.normalized)
1155
1156 def test_issue1395_2(self):
1157 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1158 txt._CHUNK_SIZE = 4
1159
1160 reads = ""
1161 while True:
1162 c = txt.read(4)
1163 if not c:
1164 break
1165 reads += c
1166 self.assertEquals(reads, self.normalized)
1167
1168 def test_issue1395_3(self):
1169 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1170 txt._CHUNK_SIZE = 4
1171
1172 reads = txt.read(4)
1173 reads += txt.read(4)
1174 reads += txt.readline()
1175 reads += txt.readline()
1176 reads += txt.readline()
1177 self.assertEquals(reads, self.normalized)
1178
1179 def test_issue1395_4(self):
1180 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1181 txt._CHUNK_SIZE = 4
1182
1183 reads = txt.read(4)
1184 reads += txt.read()
1185 self.assertEquals(reads, self.normalized)
1186
1187 def test_issue1395_5(self):
1188 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1189 txt._CHUNK_SIZE = 4
1190
1191 reads = txt.read(4)
1192 pos = txt.tell()
1193 txt.seek(0)
1194 txt.seek(pos)
1195 self.assertEquals(txt.read(4), "BBB\n")
1196
1197 def test_issue2282(self):
1198 buffer = io.BytesIO(self.testdata)
1199 txt = io.TextIOWrapper(buffer, encoding="ascii")
1200
1201 self.assertEqual(buffer.seekable(), txt.seekable())
1202
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001203 def check_newline_decoder_utf8(self, decoder):
1204 # UTF-8 specific tests for a newline decoder
1205 def _check_decode(b, s, **kwargs):
1206 # We exercise getstate() / setstate() as well as decode()
1207 state = decoder.getstate()
1208 self.assertEquals(decoder.decode(b, **kwargs), s)
1209 decoder.setstate(state)
1210 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001211
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001212 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001213
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001214 _check_decode(b'\xe8', "")
1215 _check_decode(b'\xa2', "")
1216 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001217
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001218 _check_decode(b'\xe8', "")
1219 _check_decode(b'\xa2', "")
1220 _check_decode(b'\x88', "\u8888")
1221
1222 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001223 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1224
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001225 decoder.reset()
1226 _check_decode(b'\n', "\n")
1227 _check_decode(b'\r', "")
1228 _check_decode(b'', "\n", final=True)
1229 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001230
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001231 _check_decode(b'\r', "")
1232 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001233
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001234 _check_decode(b'\r\r\n', "\n\n")
1235 _check_decode(b'\r', "")
1236 _check_decode(b'\r', "\n")
1237 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001238
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001239 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1240 _check_decode(b'\xe8\xa2\x88', "\u8888")
1241 _check_decode(b'\n', "\n")
1242 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1243 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001244
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001245 def check_newline_decoder(self, decoder, encoding):
1246 result = []
1247 encoder = codecs.getincrementalencoder(encoding)()
1248 def _decode_bytewise(s):
1249 for b in encoder.encode(s):
1250 result.append(decoder.decode(b))
1251 self.assertEquals(decoder.newlines, None)
1252 _decode_bytewise("abc\n\r")
1253 self.assertEquals(decoder.newlines, '\n')
1254 _decode_bytewise("\nabc")
1255 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1256 _decode_bytewise("abc\r")
1257 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1258 _decode_bytewise("abc")
1259 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1260 _decode_bytewise("abc\r")
1261 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1262 decoder.reset()
1263 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1264 self.assertEquals(decoder.newlines, None)
1265
1266 def test_newline_decoder(self):
1267 encodings = (
1268 'utf-8', 'latin-1',
1269 'utf-16', 'utf-16-le', 'utf-16-be',
1270 'utf-32', 'utf-32-le', 'utf-32-be',
1271 )
1272 for enc in encodings:
1273 decoder = codecs.getincrementaldecoder(enc)()
1274 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1275 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001276 decoder = codecs.getincrementaldecoder("utf-8")()
1277 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001278 self.check_newline_decoder_utf8(decoder)
1279
Christian Heimes1a6387e2008-03-26 12:49:49 +00001280
1281# XXX Tests for open()
1282
1283class MiscIOTest(unittest.TestCase):
1284
Benjamin Petersonad100c32008-11-20 22:06:22 +00001285 def tearDown(self):
1286 test_support.unlink(test_support.TESTFN)
1287
Christian Heimes1a6387e2008-03-26 12:49:49 +00001288 def testImport__all__(self):
1289 for name in io.__all__:
1290 obj = getattr(io, name, None)
1291 self.assert_(obj is not None, name)
1292 if name == "open":
1293 continue
1294 elif "error" in name.lower():
1295 self.assert_(issubclass(obj, Exception), name)
1296 else:
1297 self.assert_(issubclass(obj, io.IOBase))
1298
1299
Benjamin Petersonad100c32008-11-20 22:06:22 +00001300 def test_attributes(self):
1301 f = io.open(test_support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001302 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001303 f.close()
1304
1305 f = io.open(test_support.TESTFN, "U")
1306 self.assertEquals(f.name, test_support.TESTFN)
1307 self.assertEquals(f.buffer.name, test_support.TESTFN)
1308 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1309 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001310 self.assertEquals(f.buffer.mode, "rb")
1311 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001312 f.close()
1313
1314 f = io.open(test_support.TESTFN, "w+")
1315 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001316 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1317 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001318
1319 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001320 self.assertEquals(g.mode, "wb")
1321 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001322 self.assertEquals(g.name, f.fileno())
1323 self.assertEquals(g.raw.name, f.fileno())
1324 f.close()
1325 g.close()
1326
1327
Christian Heimes1a6387e2008-03-26 12:49:49 +00001328def test_main():
1329 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001330 BufferedReaderTest, BufferedWriterTest,
1331 BufferedRWPairTest, BufferedRandomTest,
1332 StatefulIncrementalDecoderTest,
1333 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001334
1335if __name__ == "__main__":
1336 unittest.main()