blob: c9bd38ddffbf44fcf85e5a5bbefaabbcfa7681d6 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
90 self.assertEqual(f.write(b"blah."), 5)
91 self.assertEqual(f.seek(0), 0)
92 self.assertEqual(f.write(b"Hello."), 6)
93 self.assertEqual(f.tell(), 6)
94 self.assertEqual(f.seek(-1, 1), 5)
95 self.assertEqual(f.tell(), 5)
96 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"h"), 1)
99 self.assertEqual(f.seek(-1, 2), 13)
100 self.assertEqual(f.tell(), 13)
101 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000102 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000103 self.assertRaises(TypeError, f.seek, 0.0)
104
105 def read_ops(self, f, buffered=False):
106 data = f.read(5)
107 self.assertEqual(data, b"hello")
108 data = bytearray(data)
109 self.assertEqual(f.readinto(data), 5)
110 self.assertEqual(data, b" worl")
111 self.assertEqual(f.readinto(data), 2)
112 self.assertEqual(len(data), 5)
113 self.assertEqual(data[:2], b"d\n")
114 self.assertEqual(f.seek(0), 0)
115 self.assertEqual(f.read(20), b"hello world\n")
116 self.assertEqual(f.read(1), b"")
117 self.assertEqual(f.readinto(bytearray(b"x")), 0)
118 self.assertEqual(f.seek(-6, 2), 6)
119 self.assertEqual(f.read(5), b"world")
120 self.assertEqual(f.read(0), b"")
121 self.assertEqual(f.readinto(bytearray()), 0)
122 self.assertEqual(f.seek(-6, 1), 5)
123 self.assertEqual(f.read(5), b" worl")
124 self.assertEqual(f.tell(), 10)
125 self.assertRaises(TypeError, f.seek, 0.0)
126 if buffered:
127 f.seek(0)
128 self.assertEqual(f.read(), b"hello world\n")
129 f.seek(6)
130 self.assertEqual(f.read(), b"world\n")
131 self.assertEqual(f.read(), b"")
132
133 LARGE = 2**31
134
135 def large_file_ops(self, f):
136 assert f.readable()
137 assert f.writable()
138 self.assertEqual(f.seek(self.LARGE), self.LARGE)
139 self.assertEqual(f.tell(), self.LARGE)
140 self.assertEqual(f.write(b"xxx"), 3)
141 self.assertEqual(f.tell(), self.LARGE + 3)
142 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
143 self.assertEqual(f.truncate(), self.LARGE + 2)
144 self.assertEqual(f.tell(), self.LARGE + 2)
145 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
146 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000147 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000148 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
149 self.assertEqual(f.seek(-1, 2), self.LARGE)
150 self.assertEqual(f.read(2), b"x")
151
152 def test_raw_file_io(self):
153 f = io.open(test_support.TESTFN, "wb", buffering=0)
154 self.assertEqual(f.readable(), False)
155 self.assertEqual(f.writable(), True)
156 self.assertEqual(f.seekable(), True)
157 self.write_ops(f)
158 f.close()
159 f = io.open(test_support.TESTFN, "rb", buffering=0)
160 self.assertEqual(f.readable(), True)
161 self.assertEqual(f.writable(), False)
162 self.assertEqual(f.seekable(), True)
163 self.read_ops(f)
164 f.close()
165
166 def test_buffered_file_io(self):
167 f = io.open(test_support.TESTFN, "wb")
168 self.assertEqual(f.readable(), False)
169 self.assertEqual(f.writable(), True)
170 self.assertEqual(f.seekable(), True)
171 self.write_ops(f)
172 f.close()
173 f = io.open(test_support.TESTFN, "rb")
174 self.assertEqual(f.readable(), True)
175 self.assertEqual(f.writable(), False)
176 self.assertEqual(f.seekable(), True)
177 self.read_ops(f, True)
178 f.close()
179
180 def test_readline(self):
181 f = io.open(test_support.TESTFN, "wb")
182 f.write(b"abc\ndef\nxyzzy\nfoo")
183 f.close()
184 f = io.open(test_support.TESTFN, "rb")
185 self.assertEqual(f.readline(), b"abc\n")
186 self.assertEqual(f.readline(10), b"def\n")
187 self.assertEqual(f.readline(2), b"xy")
188 self.assertEqual(f.readline(4), b"zzy\n")
189 self.assertEqual(f.readline(), b"foo")
190 f.close()
191
192 def test_raw_bytes_io(self):
193 f = io.BytesIO()
194 self.write_ops(f)
195 data = f.getvalue()
196 self.assertEqual(data, b"hello world\n")
197 f = io.BytesIO(data)
198 self.read_ops(f, True)
199
200 def test_large_file_ops(self):
201 # On Windows and Mac OSX this test comsumes large resources; It takes
202 # a long time to build the >2GB file and takes >2GB of disk space
203 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000204 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 if not test_support.is_resource_enabled("largefile"):
206 print("\nTesting large file ops skipped on %s." % sys.platform,
207 file=sys.stderr)
208 print("It requires %d bytes and a long time." % self.LARGE,
209 file=sys.stderr)
210 print("Use 'regrtest.py -u largefile test_io' to run it.",
211 file=sys.stderr)
212 return
213 f = io.open(test_support.TESTFN, "w+b", 0)
214 self.large_file_ops(f)
215 f.close()
216 f = io.open(test_support.TESTFN, "w+b")
217 self.large_file_ops(f)
218 f.close()
219
220 def test_with_open(self):
221 for bufsize in (0, 1, 100):
222 f = None
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 f.write(b"xxx")
225 self.assertEqual(f.closed, True)
226 f = None
227 try:
228 with open(test_support.TESTFN, "wb", bufsize) as f:
229 1/0
230 except ZeroDivisionError:
231 self.assertEqual(f.closed, True)
232 else:
233 self.fail("1/0 didn't raise an exception")
234
235 def test_destructor(self):
236 record = []
237 class MyFileIO(io.FileIO):
238 def __del__(self):
239 record.append(1)
240 io.FileIO.__del__(self)
241 def close(self):
242 record.append(2)
243 io.FileIO.close(self)
244 def flush(self):
245 record.append(3)
246 io.FileIO.flush(self)
247 f = MyFileIO(test_support.TESTFN, "w")
248 f.write("xxx")
249 del f
250 self.assertEqual(record, [1, 2, 3])
251
252 def test_close_flushes(self):
253 f = io.open(test_support.TESTFN, "wb")
254 f.write(b"xxx")
255 f.close()
256 f = io.open(test_support.TESTFN, "rb")
257 self.assertEqual(f.read(), b"xxx")
258 f.close()
259
260 def XXXtest_array_writes(self):
261 # XXX memory view not available yet
262 a = array.array('i', range(10))
263 n = len(memoryview(a))
264 f = io.open(test_support.TESTFN, "wb", 0)
265 self.assertEqual(f.write(a), n)
266 f.close()
267 f = io.open(test_support.TESTFN, "wb")
268 self.assertEqual(f.write(a), n)
269 f.close()
270
271 def test_closefd(self):
272 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
273 closefd=False)
274
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000275 def testReadClosed(self):
276 with io.open(test_support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(test_support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError,
288 io.open, test_support.TESTFN, "r", closefd=False)
289
290 def test_closefd_attr(self):
291 with io.open(test_support.TESTFN, "wb") as f:
292 f.write(b"egg\n")
293 with io.open(test_support.TESTFN, "r") as f:
294 self.assertEqual(f.buffer.raw.closefd, True)
295 file = io.open(f.fileno(), "r", closefd=False)
296 self.assertEqual(file.buffer.raw.closefd, False)
297
298
Christian Heimes1a6387e2008-03-26 12:49:49 +0000299class MemorySeekTestMixin:
300
301 def testInit(self):
302 buf = self.buftype("1234567890")
303 bytesIo = self.ioclass(buf)
304
305 def testRead(self):
306 buf = self.buftype("1234567890")
307 bytesIo = self.ioclass(buf)
308
309 self.assertEquals(buf[:1], bytesIo.read(1))
310 self.assertEquals(buf[1:5], bytesIo.read(4))
311 self.assertEquals(buf[5:], bytesIo.read(900))
312 self.assertEquals(self.EOF, bytesIo.read())
313
314 def testReadNoArgs(self):
315 buf = self.buftype("1234567890")
316 bytesIo = self.ioclass(buf)
317
318 self.assertEquals(buf, bytesIo.read())
319 self.assertEquals(self.EOF, bytesIo.read())
320
321 def testSeek(self):
322 buf = self.buftype("1234567890")
323 bytesIo = self.ioclass(buf)
324
325 bytesIo.read(5)
326 bytesIo.seek(0)
327 self.assertEquals(buf, bytesIo.read())
328
329 bytesIo.seek(3)
330 self.assertEquals(buf[3:], bytesIo.read())
331 self.assertRaises(TypeError, bytesIo.seek, 0.0)
332
333 def testTell(self):
334 buf = self.buftype("1234567890")
335 bytesIo = self.ioclass(buf)
336
337 self.assertEquals(0, bytesIo.tell())
338 bytesIo.seek(5)
339 self.assertEquals(5, bytesIo.tell())
340 bytesIo.seek(10000)
341 self.assertEquals(10000, bytesIo.tell())
342
343
344class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
345 @staticmethod
346 def buftype(s):
347 return s.encode("utf-8")
348 ioclass = io.BytesIO
349 EOF = b""
350
351
352class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
353 buftype = str
354 ioclass = io.StringIO
355 EOF = ""
356
357
358class BufferedReaderTest(unittest.TestCase):
359
360 def testRead(self):
361 rawio = MockRawIO((b"abc", b"d", b"efg"))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcdef", bufio.read(6))
365
366 def testBuffering(self):
367 data = b"abcdefghi"
368 dlen = len(data)
369
370 tests = [
371 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
372 [ 100, [ 3, 3, 3], [ dlen ] ],
373 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
374 ]
375
376 for bufsize, buf_read_sizes, raw_read_sizes in tests:
377 rawio = MockFileIO(data)
378 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
379 pos = 0
380 for nbytes in buf_read_sizes:
381 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
382 pos += nbytes
383 self.assertEquals(rawio.read_history, raw_read_sizes)
384
385 def testReadNonBlocking(self):
386 # Inject some None's in there to simulate EWOULDBLOCK
387 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
388 bufio = io.BufferedReader(rawio)
389
390 self.assertEquals(b"abcd", bufio.read(6))
391 self.assertEquals(b"e", bufio.read(1))
392 self.assertEquals(b"fg", bufio.read())
393 self.assert_(None is bufio.read())
394 self.assertEquals(b"", bufio.read())
395
396 def testReadToEof(self):
397 rawio = MockRawIO((b"abc", b"d", b"efg"))
398 bufio = io.BufferedReader(rawio)
399
400 self.assertEquals(b"abcdefg", bufio.read(9000))
401
402 def testReadNoArgs(self):
403 rawio = MockRawIO((b"abc", b"d", b"efg"))
404 bufio = io.BufferedReader(rawio)
405
406 self.assertEquals(b"abcdefg", bufio.read())
407
408 def testFileno(self):
409 rawio = MockRawIO((b"abc", b"d", b"efg"))
410 bufio = io.BufferedReader(rawio)
411
412 self.assertEquals(42, bufio.fileno())
413
414 def testFilenoNoFileno(self):
415 # XXX will we always have fileno() function? If so, kill
416 # this test. Else, write it.
417 pass
418
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000419 def testThreads(self):
420 try:
421 # Write out many bytes with exactly the same number of 0's,
422 # 1's... 255's. This will help us check that concurrent reading
423 # doesn't duplicate or forget contents.
424 N = 1000
425 l = range(256) * N
426 random.shuffle(l)
427 s = bytes(bytearray(l))
428 with io.open(test_support.TESTFN, "wb") as f:
429 f.write(s)
430 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
431 bufio = io.BufferedReader(raw, 8)
432 errors = []
433 results = []
434 def f():
435 try:
436 # Intra-buffer read then buffer-flushing read
437 for n in cycle([1, 19]):
438 s = bufio.read(n)
439 if not s:
440 break
441 # list.append() is atomic
442 results.append(s)
443 except Exception as e:
444 errors.append(e)
445 raise
446 threads = [threading.Thread(target=f) for x in range(20)]
447 for t in threads:
448 t.start()
449 time.sleep(0.02) # yield
450 for t in threads:
451 t.join()
452 self.assertFalse(errors,
453 "the following exceptions were caught: %r" % errors)
454 s = b''.join(results)
455 for i in range(256):
456 c = bytes(bytearray([i]))
457 self.assertEqual(s.count(c), N)
458 finally:
459 test_support.unlink(test_support.TESTFN)
460
461
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462
463class BufferedWriterTest(unittest.TestCase):
464
465 def testWrite(self):
466 # Write to the buffered IO but don't overflow the buffer.
467 writer = MockRawIO()
468 bufio = io.BufferedWriter(writer, 8)
469
470 bufio.write(b"abc")
471
472 self.assertFalse(writer._write_stack)
473
474 def testWriteOverflow(self):
475 writer = MockRawIO()
476 bufio = io.BufferedWriter(writer, 8)
477
478 bufio.write(b"abc")
479 bufio.write(b"defghijkl")
480
481 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
482
483 def testWriteNonBlocking(self):
484 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
485 bufio = io.BufferedWriter(raw, 8, 16)
486
487 bufio.write(b"asdf")
488 bufio.write(b"asdfa")
489 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
490
491 bufio.write(b"asdfasdfasdf")
492 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
493 bufio.write(b"asdfasdfasdf")
494 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
495 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
496
497 bufio.write(b"asdfasdfasdf")
498
499 # XXX I don't like this test. It relies too heavily on how the
500 # algorithm actually works, which we might change. Refactor
501 # later.
502
503 def testFileno(self):
504 rawio = MockRawIO((b"abc", b"d", b"efg"))
505 bufio = io.BufferedWriter(rawio)
506
507 self.assertEquals(42, bufio.fileno())
508
509 def testFlush(self):
510 writer = MockRawIO()
511 bufio = io.BufferedWriter(writer, 8)
512
513 bufio.write(b"abc")
514 bufio.flush()
515
516 self.assertEquals(b"abc", writer._write_stack[0])
517
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000518 def testThreads(self):
519 # BufferedWriter should not raise exceptions or crash
520 # when called from multiple threads.
521 try:
522 # We use a real file object because it allows us to
523 # exercise situations where the GIL is released before
524 # writing the buffer to the raw streams. This is in addition
525 # to concurrency issues due to switching threads in the middle
526 # of Python code.
527 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
528 bufio = io.BufferedWriter(raw, 8)
529 errors = []
530 def f():
531 try:
532 # Write enough bytes to flush the buffer
533 s = b"a" * 19
534 for i in range(50):
535 bufio.write(s)
536 except Exception as e:
537 errors.append(e)
538 raise
539 threads = [threading.Thread(target=f) for x in range(20)]
540 for t in threads:
541 t.start()
542 time.sleep(0.02) # yield
543 for t in threads:
544 t.join()
545 self.assertFalse(errors,
546 "the following exceptions were caught: %r" % errors)
547 finally:
548 test_support.unlink(test_support.TESTFN)
549
Christian Heimes1a6387e2008-03-26 12:49:49 +0000550
551class BufferedRWPairTest(unittest.TestCase):
552
553 def testRWPair(self):
554 r = MockRawIO(())
555 w = MockRawIO()
556 pair = io.BufferedRWPair(r, w)
557
558 # XXX need implementation
559
560
561class BufferedRandomTest(unittest.TestCase):
562
563 def testReadAndWrite(self):
564 raw = MockRawIO((b"asdf", b"ghjk"))
565 rw = io.BufferedRandom(raw, 8, 12)
566
567 self.assertEqual(b"as", rw.read(2))
568 rw.write(b"ddd")
569 rw.write(b"eee")
570 self.assertFalse(raw._write_stack) # Buffer writes
571 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
572 self.assertEquals(b"dddeee", raw._write_stack[0])
573
574 def testSeekAndTell(self):
575 raw = io.BytesIO(b"asdfghjkl")
576 rw = io.BufferedRandom(raw)
577
578 self.assertEquals(b"as", rw.read(2))
579 self.assertEquals(2, rw.tell())
580 rw.seek(0, 0)
581 self.assertEquals(b"asdf", rw.read(4))
582
583 rw.write(b"asdf")
584 rw.seek(0, 0)
585 self.assertEquals(b"asdfasdfl", rw.read())
586 self.assertEquals(9, rw.tell())
587 rw.seek(-4, 2)
588 self.assertEquals(5, rw.tell())
589 rw.seek(2, 1)
590 self.assertEquals(7, rw.tell())
591 self.assertEquals(b"fl", rw.read(11))
592 self.assertRaises(TypeError, rw.seek, 0.0)
593
594# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
595# properties:
596# - A single output character can correspond to many bytes of input.
597# - The number of input bytes to complete the character can be
598# undetermined until the last input byte is received.
599# - The number of input bytes can vary depending on previous input.
600# - A single input byte can correspond to many characters of output.
601# - The number of output characters can be undetermined until the
602# last input byte is received.
603# - The number of output characters can vary depending on previous input.
604
605class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
606 """
607 For testing seek/tell behavior with a stateful, buffering decoder.
608
609 Input is a sequence of words. Words may be fixed-length (length set
610 by input) or variable-length (period-terminated). In variable-length
611 mode, extra periods are ignored. Possible words are:
612 - 'i' followed by a number sets the input length, I (maximum 99).
613 When I is set to 0, words are space-terminated.
614 - 'o' followed by a number sets the output length, O (maximum 99).
615 - Any other word is converted into a word followed by a period on
616 the output. The output word consists of the input word truncated
617 or padded out with hyphens to make its length equal to O. If O
618 is 0, the word is output verbatim without truncating or padding.
619 I and O are initially set to 1. When I changes, any buffered input is
620 re-scanned according to the new I. EOF also terminates the last word.
621 """
622
623 def __init__(self, errors='strict'):
624 codecs.IncrementalDecoder.__init__(self, errors)
625 self.reset()
626
627 def __repr__(self):
628 return '<SID %x>' % id(self)
629
630 def reset(self):
631 self.i = 1
632 self.o = 1
633 self.buffer = bytearray()
634
635 def getstate(self):
636 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
637 return bytes(self.buffer), i*100 + o
638
639 def setstate(self, state):
640 buffer, io = state
641 self.buffer = bytearray(buffer)
642 i, o = divmod(io, 100)
643 self.i, self.o = i ^ 1, o ^ 1
644
645 def decode(self, input, final=False):
646 output = ''
647 for b in input:
648 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000649 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000650 if self.buffer:
651 output += self.process_word()
652 else:
653 self.buffer.append(b)
654 else: # fixed-length, terminate after self.i bytes
655 self.buffer.append(b)
656 if len(self.buffer) == self.i:
657 output += self.process_word()
658 if final and self.buffer: # EOF terminates the last word
659 output += self.process_word()
660 return output
661
662 def process_word(self):
663 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000664 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000665 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000666 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000667 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
668 else:
669 output = self.buffer.decode('ascii')
670 if len(output) < self.o:
671 output += '-'*self.o # pad out with hyphens
672 if self.o:
673 output = output[:self.o] # truncate to output length
674 output += '.'
675 self.buffer = bytearray()
676 return output
677
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000678 codecEnabled = False
679
680 @classmethod
681 def lookupTestDecoder(cls, name):
682 if cls.codecEnabled and name == 'test_decoder':
683 return codecs.CodecInfo(
684 name='test_decoder', encode=None, decode=None,
685 incrementalencoder=None,
686 streamreader=None, streamwriter=None,
687 incrementaldecoder=cls)
688
689# Register the previous decoder for testing.
690# Disabled by default, tests will enable it.
691codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
692
693
Christian Heimes1a6387e2008-03-26 12:49:49 +0000694class StatefulIncrementalDecoderTest(unittest.TestCase):
695 """
696 Make sure the StatefulIncrementalDecoder actually works.
697 """
698
699 test_cases = [
700 # I=1, O=1 (fixed-length input == fixed-length output)
701 (b'abcd', False, 'a.b.c.d.'),
702 # I=0, O=0 (variable-length input, variable-length output)
703 (b'oiabcd', True, 'abcd.'),
704 # I=0, O=0 (should ignore extra periods)
705 (b'oi...abcd...', True, 'abcd.'),
706 # I=0, O=6 (variable-length input, fixed-length output)
707 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
708 # I=2, O=6 (fixed-length input < fixed-length output)
709 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
710 # I=6, O=3 (fixed-length input > fixed-length output)
711 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
712 # I=0, then 3; O=29, then 15 (with longer output)
713 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
714 'a----------------------------.' +
715 'b----------------------------.' +
716 'cde--------------------------.' +
717 'abcdefghijabcde.' +
718 'a.b------------.' +
719 '.c.------------.' +
720 'd.e------------.' +
721 'k--------------.' +
722 'l--------------.' +
723 'm--------------.')
724 ]
725
726 def testDecoder(self):
727 # Try a few one-shot test cases.
728 for input, eof, output in self.test_cases:
729 d = StatefulIncrementalDecoder()
730 self.assertEquals(d.decode(input, eof), output)
731
732 # Also test an unfinished decode, followed by forcing EOF.
733 d = StatefulIncrementalDecoder()
734 self.assertEquals(d.decode(b'oiabcd'), '')
735 self.assertEquals(d.decode(b'', 1), 'abcd.')
736
737class TextIOWrapperTest(unittest.TestCase):
738
739 def setUp(self):
740 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
741 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
742
743 def tearDown(self):
744 test_support.unlink(test_support.TESTFN)
745
746 def testLineBuffering(self):
747 r = io.BytesIO()
748 b = io.BufferedWriter(r, 1000)
749 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
750 t.write(u"X")
751 self.assertEquals(r.getvalue(), b"") # No flush happened
752 t.write(u"Y\nZ")
753 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
754 t.write(u"A\rB")
755 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
756
757 def testEncodingErrorsReading(self):
758 # (1) default
759 b = io.BytesIO(b"abc\n\xff\n")
760 t = io.TextIOWrapper(b, encoding="ascii")
761 self.assertRaises(UnicodeError, t.read)
762 # (2) explicit strict
763 b = io.BytesIO(b"abc\n\xff\n")
764 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
765 self.assertRaises(UnicodeError, t.read)
766 # (3) ignore
767 b = io.BytesIO(b"abc\n\xff\n")
768 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
769 self.assertEquals(t.read(), "abc\n\n")
770 # (4) replace
771 b = io.BytesIO(b"abc\n\xff\n")
772 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
773 self.assertEquals(t.read(), u"abc\n\ufffd\n")
774
775 def testEncodingErrorsWriting(self):
776 # (1) default
777 b = io.BytesIO()
778 t = io.TextIOWrapper(b, encoding="ascii")
779 self.assertRaises(UnicodeError, t.write, u"\xff")
780 # (2) explicit strict
781 b = io.BytesIO()
782 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
783 self.assertRaises(UnicodeError, t.write, u"\xff")
784 # (3) ignore
785 b = io.BytesIO()
786 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
787 newline="\n")
788 t.write(u"abc\xffdef\n")
789 t.flush()
790 self.assertEquals(b.getvalue(), b"abcdef\n")
791 # (4) replace
792 b = io.BytesIO()
793 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
794 newline="\n")
795 t.write(u"abc\xffdef\n")
796 t.flush()
797 self.assertEquals(b.getvalue(), b"abc?def\n")
798
799 def testNewlinesInput(self):
800 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
801 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
802 for newline, expected in [
803 (None, normalized.decode("ascii").splitlines(True)),
804 ("", testdata.decode("ascii").splitlines(True)),
805 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
806 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
807 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
808 ]:
809 buf = io.BytesIO(testdata)
810 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
811 self.assertEquals(txt.readlines(), expected)
812 txt.seek(0)
813 self.assertEquals(txt.read(), "".join(expected))
814
815 def testNewlinesOutput(self):
816 testdict = {
817 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
818 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
819 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
820 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
821 }
822 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
823 for newline, expected in tests:
824 buf = io.BytesIO()
825 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
826 txt.write("AAA\nB")
827 txt.write("BB\nCCC\n")
828 txt.write("X\rY\r\nZ")
829 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000830 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000831 self.assertEquals(buf.getvalue(), expected)
832
833 def testNewlines(self):
834 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
835
836 tests = [
837 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
838 [ '', input_lines ],
839 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
840 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
841 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
842 ]
843
844 encodings = ('utf-8', 'latin-1')
845
846 # Try a range of buffer sizes to test the case where \r is the last
847 # character in TextIOWrapper._pending_line.
848 for encoding in encodings:
849 # XXX: str.encode() should return bytes
850 data = bytes(''.join(input_lines).encode(encoding))
851 for do_reads in (False, True):
852 for bufsize in range(1, 10):
853 for newline, exp_lines in tests:
854 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
855 textio = io.TextIOWrapper(bufio, newline=newline,
856 encoding=encoding)
857 if do_reads:
858 got_lines = []
859 while True:
860 c2 = textio.read(2)
861 if c2 == '':
862 break
863 self.assertEquals(len(c2), 2)
864 got_lines.append(c2 + textio.readline())
865 else:
866 got_lines = list(textio)
867
868 for got_line, exp_line in zip(got_lines, exp_lines):
869 self.assertEquals(got_line, exp_line)
870 self.assertEquals(len(got_lines), len(exp_lines))
871
872 def testNewlinesInput(self):
873 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
874 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
875 for newline, expected in [
876 (None, normalized.decode("ascii").splitlines(True)),
877 ("", testdata.decode("ascii").splitlines(True)),
878 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
879 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
880 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
881 ]:
882 buf = io.BytesIO(testdata)
883 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
884 self.assertEquals(txt.readlines(), expected)
885 txt.seek(0)
886 self.assertEquals(txt.read(), "".join(expected))
887
888 def testNewlinesOutput(self):
889 data = u"AAA\nBBB\rCCC\n"
890 data_lf = b"AAA\nBBB\rCCC\n"
891 data_cr = b"AAA\rBBB\rCCC\r"
892 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
893 save_linesep = os.linesep
894 try:
895 for os.linesep, newline, expected in [
896 ("\n", None, data_lf),
897 ("\r\n", None, data_crlf),
898 ("\n", "", data_lf),
899 ("\r\n", "", data_lf),
900 ("\n", "\n", data_lf),
901 ("\r\n", "\n", data_lf),
902 ("\n", "\r", data_cr),
903 ("\r\n", "\r", data_cr),
904 ("\n", "\r\n", data_crlf),
905 ("\r\n", "\r\n", data_crlf),
906 ]:
907 buf = io.BytesIO()
908 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
909 txt.write(data)
910 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000911 self.assertEquals(buf.closed, True)
912 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000913 finally:
914 os.linesep = save_linesep
915
916 # Systematic tests of the text I/O API
917
918 def testBasicIO(self):
919 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
920 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
921 f = io.open(test_support.TESTFN, "w+", encoding=enc)
922 f._CHUNK_SIZE = chunksize
923 self.assertEquals(f.write(u"abc"), 3)
924 f.close()
925 f = io.open(test_support.TESTFN, "r+", encoding=enc)
926 f._CHUNK_SIZE = chunksize
927 self.assertEquals(f.tell(), 0)
928 self.assertEquals(f.read(), u"abc")
929 cookie = f.tell()
930 self.assertEquals(f.seek(0), 0)
931 self.assertEquals(f.read(2), u"ab")
932 self.assertEquals(f.read(1), u"c")
933 self.assertEquals(f.read(1), u"")
934 self.assertEquals(f.read(), u"")
935 self.assertEquals(f.tell(), cookie)
936 self.assertEquals(f.seek(0), 0)
937 self.assertEquals(f.seek(0, 2), cookie)
938 self.assertEquals(f.write(u"def"), 3)
939 self.assertEquals(f.seek(cookie), cookie)
940 self.assertEquals(f.read(), u"def")
941 if enc.startswith("utf"):
942 self.multi_line_test(f, enc)
943 f.close()
944
945 def multi_line_test(self, f, enc):
946 f.seek(0)
947 f.truncate()
948 sample = u"s\xff\u0fff\uffff"
949 wlines = []
950 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
951 chars = []
952 for i in range(size):
953 chars.append(sample[i % len(sample)])
954 line = u"".join(chars) + u"\n"
955 wlines.append((f.tell(), line))
956 f.write(line)
957 f.seek(0)
958 rlines = []
959 while True:
960 pos = f.tell()
961 line = f.readline()
962 if not line:
963 break
964 rlines.append((pos, line))
965 self.assertEquals(rlines, wlines)
966
967 def testTelling(self):
968 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
969 p0 = f.tell()
970 f.write(u"\xff\n")
971 p1 = f.tell()
972 f.write(u"\xff\n")
973 p2 = f.tell()
974 f.seek(0)
975 self.assertEquals(f.tell(), p0)
976 self.assertEquals(f.readline(), u"\xff\n")
977 self.assertEquals(f.tell(), p1)
978 self.assertEquals(f.readline(), u"\xff\n")
979 self.assertEquals(f.tell(), p2)
980 f.seek(0)
981 for line in f:
982 self.assertEquals(line, u"\xff\n")
983 self.assertRaises(IOError, f.tell)
984 self.assertEquals(f.tell(), p2)
985 f.close()
986
987 def testSeeking(self):
988 chunk_size = io.TextIOWrapper._CHUNK_SIZE
989 prefix_size = chunk_size - 2
990 u_prefix = "a" * prefix_size
991 prefix = bytes(u_prefix.encode("utf-8"))
992 self.assertEquals(len(u_prefix), len(prefix))
993 u_suffix = "\u8888\n"
994 suffix = bytes(u_suffix.encode("utf-8"))
995 line = prefix + suffix
996 f = io.open(test_support.TESTFN, "wb")
997 f.write(line*2)
998 f.close()
999 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1000 s = f.read(prefix_size)
1001 self.assertEquals(s, unicode(prefix, "ascii"))
1002 self.assertEquals(f.tell(), prefix_size)
1003 self.assertEquals(f.readline(), u_suffix)
1004
1005 def testSeekingToo(self):
1006 # Regression test for a specific bug
1007 data = b'\xe0\xbf\xbf\n'
1008 f = io.open(test_support.TESTFN, "wb")
1009 f.write(data)
1010 f.close()
1011 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1012 f._CHUNK_SIZE # Just test that it exists
1013 f._CHUNK_SIZE = 2
1014 f.readline()
1015 f.tell()
1016
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001017 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001018 """Test seek/tell using the StatefulIncrementalDecoder."""
1019
Christian Heimes1a6387e2008-03-26 12:49:49 +00001020 def testSeekAndTellWithData(data, min_pos=0):
1021 """Tell/seek to various points within a data stream and ensure
1022 that the decoded data returned by read() is consistent."""
1023 f = io.open(test_support.TESTFN, 'wb')
1024 f.write(data)
1025 f.close()
1026 f = io.open(test_support.TESTFN, encoding='test_decoder')
1027 decoded = f.read()
1028 f.close()
1029
1030 for i in range(min_pos, len(decoded) + 1): # seek positions
1031 for j in [1, 5, len(decoded) - i]: # read lengths
1032 f = io.open(test_support.TESTFN, encoding='test_decoder')
1033 self.assertEquals(f.read(i), decoded[:i])
1034 cookie = f.tell()
1035 self.assertEquals(f.read(j), decoded[i:i + j])
1036 f.seek(cookie)
1037 self.assertEquals(f.read(), decoded[i:])
1038 f.close()
1039
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001040 # Enable the test decoder.
1041 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001042
1043 # Run the tests.
1044 try:
1045 # Try each test case.
1046 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1047 testSeekAndTellWithData(input)
1048
1049 # Position each test case so that it crosses a chunk boundary.
1050 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1052 offset = CHUNK_SIZE - len(input)//2
1053 prefix = b'.'*offset
1054 # Don't bother seeking into the prefix (takes too long).
1055 min_pos = offset*2
1056 testSeekAndTellWithData(prefix + input, min_pos)
1057
1058 # Ensure our test decoder won't interfere with subsequent tests.
1059 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001060 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001061
1062 def testEncodedWrites(self):
1063 data = u"1234567890"
1064 tests = ("utf-16",
1065 "utf-16-le",
1066 "utf-16-be",
1067 "utf-32",
1068 "utf-32-le",
1069 "utf-32-be")
1070 for encoding in tests:
1071 buf = io.BytesIO()
1072 f = io.TextIOWrapper(buf, encoding=encoding)
1073 # Check if the BOM is written only once (see issue1753).
1074 f.write(data)
1075 f.write(data)
1076 f.seek(0)
1077 self.assertEquals(f.read(), data * 2)
1078 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1079
1080 def timingTest(self):
1081 timer = time.time
1082 enc = "utf8"
1083 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1084 nlines = 10000
1085 nchars = len(line)
1086 nbytes = len(line.encode(enc))
1087 for chunk_size in (32, 64, 128, 256):
1088 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1089 f._CHUNK_SIZE = chunk_size
1090 t0 = timer()
1091 for i in range(nlines):
1092 f.write(line)
1093 f.flush()
1094 t1 = timer()
1095 f.seek(0)
1096 for line in f:
1097 pass
1098 t2 = timer()
1099 f.seek(0)
1100 while f.readline():
1101 pass
1102 t3 = timer()
1103 f.seek(0)
1104 while f.readline():
1105 f.tell()
1106 t4 = timer()
1107 f.close()
1108 if test_support.verbose:
1109 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1110 (nlines, nchars, nbytes))
1111 print("File chunk size: %6s" % f._CHUNK_SIZE)
1112 print("Writing: %6.3f seconds" % (t1-t0))
1113 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1114 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1115 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1116
1117 def testReadOneByOne(self):
1118 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1119 reads = ""
1120 while True:
1121 c = txt.read(1)
1122 if not c:
1123 break
1124 reads += c
1125 self.assertEquals(reads, "AA\nBB")
1126
1127 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1128 def testReadByChunk(self):
1129 # make sure "\r\n" straddles 128 char boundary.
1130 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1131 reads = ""
1132 while True:
1133 c = txt.read(128)
1134 if not c:
1135 break
1136 reads += c
1137 self.assertEquals(reads, "A"*127+"\nB")
1138
1139 def test_issue1395_1(self):
1140 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1141
1142 # read one char at a time
1143 reads = ""
1144 while True:
1145 c = txt.read(1)
1146 if not c:
1147 break
1148 reads += c
1149 self.assertEquals(reads, self.normalized)
1150
1151 def test_issue1395_2(self):
1152 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1153 txt._CHUNK_SIZE = 4
1154
1155 reads = ""
1156 while True:
1157 c = txt.read(4)
1158 if not c:
1159 break
1160 reads += c
1161 self.assertEquals(reads, self.normalized)
1162
1163 def test_issue1395_3(self):
1164 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1165 txt._CHUNK_SIZE = 4
1166
1167 reads = txt.read(4)
1168 reads += txt.read(4)
1169 reads += txt.readline()
1170 reads += txt.readline()
1171 reads += txt.readline()
1172 self.assertEquals(reads, self.normalized)
1173
1174 def test_issue1395_4(self):
1175 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1176 txt._CHUNK_SIZE = 4
1177
1178 reads = txt.read(4)
1179 reads += txt.read()
1180 self.assertEquals(reads, self.normalized)
1181
1182 def test_issue1395_5(self):
1183 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1184 txt._CHUNK_SIZE = 4
1185
1186 reads = txt.read(4)
1187 pos = txt.tell()
1188 txt.seek(0)
1189 txt.seek(pos)
1190 self.assertEquals(txt.read(4), "BBB\n")
1191
1192 def test_issue2282(self):
1193 buffer = io.BytesIO(self.testdata)
1194 txt = io.TextIOWrapper(buffer, encoding="ascii")
1195
1196 self.assertEqual(buffer.seekable(), txt.seekable())
1197
1198 def test_newline_decoder(self):
1199 import codecs
1200 decoder = codecs.getincrementaldecoder("utf-8")()
1201 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1202
1203 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1204
1205 self.assertEquals(decoder.decode(b'\xe8'), u"")
1206 self.assertEquals(decoder.decode(b'\xa2'), u"")
1207 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1208
1209 self.assertEquals(decoder.decode(b'\xe8'), u"")
1210 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1211
1212 decoder.setstate((b'', 0))
1213 self.assertEquals(decoder.decode(b'\n'), u"\n")
1214 self.assertEquals(decoder.decode(b'\r'), u"")
1215 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1216 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1217
1218 self.assertEquals(decoder.decode(b'\r'), u"")
1219 self.assertEquals(decoder.decode(b'a'), u"\na")
1220
1221 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1222 self.assertEquals(decoder.decode(b'\r'), u"")
1223 self.assertEquals(decoder.decode(b'\r'), u"\n")
1224 self.assertEquals(decoder.decode(b'\na'), u"\na")
1225
1226 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1227 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1228 self.assertEquals(decoder.decode(b'\n'), u"\n")
1229 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1230 self.assertEquals(decoder.decode(b'\n'), u"\n")
1231
1232 decoder = codecs.getincrementaldecoder("utf-8")()
1233 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1234 self.assertEquals(decoder.newlines, None)
1235 decoder.decode(b"abc\n\r")
1236 self.assertEquals(decoder.newlines, u'\n')
1237 decoder.decode(b"\nabc")
1238 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1239 decoder.decode(b"abc\r")
1240 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1241 decoder.decode(b"abc")
1242 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1243 decoder.decode(b"abc\r")
1244 decoder.reset()
1245 self.assertEquals(decoder.decode(b"abc"), "abc")
1246 self.assertEquals(decoder.newlines, None)
1247
1248# XXX Tests for open()
1249
1250class MiscIOTest(unittest.TestCase):
1251
Benjamin Petersonad100c32008-11-20 22:06:22 +00001252 def tearDown(self):
1253 test_support.unlink(test_support.TESTFN)
1254
Christian Heimes1a6387e2008-03-26 12:49:49 +00001255 def testImport__all__(self):
1256 for name in io.__all__:
1257 obj = getattr(io, name, None)
1258 self.assert_(obj is not None, name)
1259 if name == "open":
1260 continue
1261 elif "error" in name.lower():
1262 self.assert_(issubclass(obj, Exception), name)
1263 else:
1264 self.assert_(issubclass(obj, io.IOBase))
1265
1266
Benjamin Petersonad100c32008-11-20 22:06:22 +00001267 def test_attributes(self):
1268 f = io.open(test_support.TESTFN, "wb", buffering=0)
1269 self.assertEquals(f.mode, "w")
1270 f.close()
1271
1272 f = io.open(test_support.TESTFN, "U")
1273 self.assertEquals(f.name, test_support.TESTFN)
1274 self.assertEquals(f.buffer.name, test_support.TESTFN)
1275 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1276 self.assertEquals(f.mode, "U")
1277 self.assertEquals(f.buffer.mode, "r")
1278 self.assertEquals(f.buffer.raw.mode, "r")
1279 f.close()
1280
1281 f = io.open(test_support.TESTFN, "w+")
1282 self.assertEquals(f.mode, "w+")
1283 self.assertEquals(f.buffer.mode, "r+") # Does it really matter?
1284 self.assertEquals(f.buffer.raw.mode, "r+")
1285
1286 g = io.open(f.fileno(), "wb", closefd=False)
1287 self.assertEquals(g.mode, "w")
1288 self.assertEquals(g.raw.mode, "w")
1289 self.assertEquals(g.name, f.fileno())
1290 self.assertEquals(g.raw.name, f.fileno())
1291 f.close()
1292 g.close()
1293
1294
Christian Heimes1a6387e2008-03-26 12:49:49 +00001295def test_main():
1296 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001297 BufferedReaderTest, BufferedWriterTest,
1298 BufferedRWPairTest, BufferedRandomTest,
1299 StatefulIncrementalDecoderTest,
1300 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001301
1302if __name__ == "__main__":
1303 unittest.main()