blob: 8a7da60947b037a37cc56a6a409a63433873315f [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
90 self.assertEqual(f.write(b"blah."), 5)
91 self.assertEqual(f.seek(0), 0)
92 self.assertEqual(f.write(b"Hello."), 6)
93 self.assertEqual(f.tell(), 6)
94 self.assertEqual(f.seek(-1, 1), 5)
95 self.assertEqual(f.tell(), 5)
96 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"h"), 1)
99 self.assertEqual(f.seek(-1, 2), 13)
100 self.assertEqual(f.tell(), 13)
101 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000102 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000103 self.assertRaises(TypeError, f.seek, 0.0)
104
105 def read_ops(self, f, buffered=False):
106 data = f.read(5)
107 self.assertEqual(data, b"hello")
108 data = bytearray(data)
109 self.assertEqual(f.readinto(data), 5)
110 self.assertEqual(data, b" worl")
111 self.assertEqual(f.readinto(data), 2)
112 self.assertEqual(len(data), 5)
113 self.assertEqual(data[:2], b"d\n")
114 self.assertEqual(f.seek(0), 0)
115 self.assertEqual(f.read(20), b"hello world\n")
116 self.assertEqual(f.read(1), b"")
117 self.assertEqual(f.readinto(bytearray(b"x")), 0)
118 self.assertEqual(f.seek(-6, 2), 6)
119 self.assertEqual(f.read(5), b"world")
120 self.assertEqual(f.read(0), b"")
121 self.assertEqual(f.readinto(bytearray()), 0)
122 self.assertEqual(f.seek(-6, 1), 5)
123 self.assertEqual(f.read(5), b" worl")
124 self.assertEqual(f.tell(), 10)
125 self.assertRaises(TypeError, f.seek, 0.0)
126 if buffered:
127 f.seek(0)
128 self.assertEqual(f.read(), b"hello world\n")
129 f.seek(6)
130 self.assertEqual(f.read(), b"world\n")
131 self.assertEqual(f.read(), b"")
132
133 LARGE = 2**31
134
135 def large_file_ops(self, f):
136 assert f.readable()
137 assert f.writable()
138 self.assertEqual(f.seek(self.LARGE), self.LARGE)
139 self.assertEqual(f.tell(), self.LARGE)
140 self.assertEqual(f.write(b"xxx"), 3)
141 self.assertEqual(f.tell(), self.LARGE + 3)
142 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
143 self.assertEqual(f.truncate(), self.LARGE + 2)
144 self.assertEqual(f.tell(), self.LARGE + 2)
145 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
146 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000147 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000148 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
149 self.assertEqual(f.seek(-1, 2), self.LARGE)
150 self.assertEqual(f.read(2), b"x")
151
152 def test_raw_file_io(self):
153 f = io.open(test_support.TESTFN, "wb", buffering=0)
154 self.assertEqual(f.readable(), False)
155 self.assertEqual(f.writable(), True)
156 self.assertEqual(f.seekable(), True)
157 self.write_ops(f)
158 f.close()
159 f = io.open(test_support.TESTFN, "rb", buffering=0)
160 self.assertEqual(f.readable(), True)
161 self.assertEqual(f.writable(), False)
162 self.assertEqual(f.seekable(), True)
163 self.read_ops(f)
164 f.close()
165
166 def test_buffered_file_io(self):
167 f = io.open(test_support.TESTFN, "wb")
168 self.assertEqual(f.readable(), False)
169 self.assertEqual(f.writable(), True)
170 self.assertEqual(f.seekable(), True)
171 self.write_ops(f)
172 f.close()
173 f = io.open(test_support.TESTFN, "rb")
174 self.assertEqual(f.readable(), True)
175 self.assertEqual(f.writable(), False)
176 self.assertEqual(f.seekable(), True)
177 self.read_ops(f, True)
178 f.close()
179
180 def test_readline(self):
181 f = io.open(test_support.TESTFN, "wb")
182 f.write(b"abc\ndef\nxyzzy\nfoo")
183 f.close()
184 f = io.open(test_support.TESTFN, "rb")
185 self.assertEqual(f.readline(), b"abc\n")
186 self.assertEqual(f.readline(10), b"def\n")
187 self.assertEqual(f.readline(2), b"xy")
188 self.assertEqual(f.readline(4), b"zzy\n")
189 self.assertEqual(f.readline(), b"foo")
190 f.close()
191
192 def test_raw_bytes_io(self):
193 f = io.BytesIO()
194 self.write_ops(f)
195 data = f.getvalue()
196 self.assertEqual(data, b"hello world\n")
197 f = io.BytesIO(data)
198 self.read_ops(f, True)
199
200 def test_large_file_ops(self):
201 # On Windows and Mac OSX this test comsumes large resources; It takes
202 # a long time to build the >2GB file and takes >2GB of disk space
203 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000204 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205 if not test_support.is_resource_enabled("largefile"):
206 print("\nTesting large file ops skipped on %s." % sys.platform,
207 file=sys.stderr)
208 print("It requires %d bytes and a long time." % self.LARGE,
209 file=sys.stderr)
210 print("Use 'regrtest.py -u largefile test_io' to run it.",
211 file=sys.stderr)
212 return
213 f = io.open(test_support.TESTFN, "w+b", 0)
214 self.large_file_ops(f)
215 f.close()
216 f = io.open(test_support.TESTFN, "w+b")
217 self.large_file_ops(f)
218 f.close()
219
220 def test_with_open(self):
221 for bufsize in (0, 1, 100):
222 f = None
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 f.write(b"xxx")
225 self.assertEqual(f.closed, True)
226 f = None
227 try:
228 with open(test_support.TESTFN, "wb", bufsize) as f:
229 1/0
230 except ZeroDivisionError:
231 self.assertEqual(f.closed, True)
232 else:
233 self.fail("1/0 didn't raise an exception")
234
235 def test_destructor(self):
236 record = []
237 class MyFileIO(io.FileIO):
238 def __del__(self):
239 record.append(1)
240 io.FileIO.__del__(self)
241 def close(self):
242 record.append(2)
243 io.FileIO.close(self)
244 def flush(self):
245 record.append(3)
246 io.FileIO.flush(self)
247 f = MyFileIO(test_support.TESTFN, "w")
248 f.write("xxx")
249 del f
250 self.assertEqual(record, [1, 2, 3])
251
252 def test_close_flushes(self):
253 f = io.open(test_support.TESTFN, "wb")
254 f.write(b"xxx")
255 f.close()
256 f = io.open(test_support.TESTFN, "rb")
257 self.assertEqual(f.read(), b"xxx")
258 f.close()
259
260 def XXXtest_array_writes(self):
261 # XXX memory view not available yet
262 a = array.array('i', range(10))
263 n = len(memoryview(a))
264 f = io.open(test_support.TESTFN, "wb", 0)
265 self.assertEqual(f.write(a), n)
266 f.close()
267 f = io.open(test_support.TESTFN, "wb")
268 self.assertEqual(f.write(a), n)
269 f.close()
270
271 def test_closefd(self):
272 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
273 closefd=False)
274
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000275 def testReadClosed(self):
276 with io.open(test_support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(test_support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError,
288 io.open, test_support.TESTFN, "r", closefd=False)
289
290 def test_closefd_attr(self):
291 with io.open(test_support.TESTFN, "wb") as f:
292 f.write(b"egg\n")
293 with io.open(test_support.TESTFN, "r") as f:
294 self.assertEqual(f.buffer.raw.closefd, True)
295 file = io.open(f.fileno(), "r", closefd=False)
296 self.assertEqual(file.buffer.raw.closefd, False)
297
298
Christian Heimes1a6387e2008-03-26 12:49:49 +0000299class MemorySeekTestMixin:
300
301 def testInit(self):
302 buf = self.buftype("1234567890")
303 bytesIo = self.ioclass(buf)
304
305 def testRead(self):
306 buf = self.buftype("1234567890")
307 bytesIo = self.ioclass(buf)
308
309 self.assertEquals(buf[:1], bytesIo.read(1))
310 self.assertEquals(buf[1:5], bytesIo.read(4))
311 self.assertEquals(buf[5:], bytesIo.read(900))
312 self.assertEquals(self.EOF, bytesIo.read())
313
314 def testReadNoArgs(self):
315 buf = self.buftype("1234567890")
316 bytesIo = self.ioclass(buf)
317
318 self.assertEquals(buf, bytesIo.read())
319 self.assertEquals(self.EOF, bytesIo.read())
320
321 def testSeek(self):
322 buf = self.buftype("1234567890")
323 bytesIo = self.ioclass(buf)
324
325 bytesIo.read(5)
326 bytesIo.seek(0)
327 self.assertEquals(buf, bytesIo.read())
328
329 bytesIo.seek(3)
330 self.assertEquals(buf[3:], bytesIo.read())
331 self.assertRaises(TypeError, bytesIo.seek, 0.0)
332
333 def testTell(self):
334 buf = self.buftype("1234567890")
335 bytesIo = self.ioclass(buf)
336
337 self.assertEquals(0, bytesIo.tell())
338 bytesIo.seek(5)
339 self.assertEquals(5, bytesIo.tell())
340 bytesIo.seek(10000)
341 self.assertEquals(10000, bytesIo.tell())
342
343
344class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
345 @staticmethod
346 def buftype(s):
347 return s.encode("utf-8")
348 ioclass = io.BytesIO
349 EOF = b""
350
351
352class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
353 buftype = str
354 ioclass = io.StringIO
355 EOF = ""
356
357
358class BufferedReaderTest(unittest.TestCase):
359
360 def testRead(self):
361 rawio = MockRawIO((b"abc", b"d", b"efg"))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcdef", bufio.read(6))
365
366 def testBuffering(self):
367 data = b"abcdefghi"
368 dlen = len(data)
369
370 tests = [
371 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
372 [ 100, [ 3, 3, 3], [ dlen ] ],
373 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
374 ]
375
376 for bufsize, buf_read_sizes, raw_read_sizes in tests:
377 rawio = MockFileIO(data)
378 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
379 pos = 0
380 for nbytes in buf_read_sizes:
381 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
382 pos += nbytes
383 self.assertEquals(rawio.read_history, raw_read_sizes)
384
385 def testReadNonBlocking(self):
386 # Inject some None's in there to simulate EWOULDBLOCK
387 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
388 bufio = io.BufferedReader(rawio)
389
390 self.assertEquals(b"abcd", bufio.read(6))
391 self.assertEquals(b"e", bufio.read(1))
392 self.assertEquals(b"fg", bufio.read())
393 self.assert_(None is bufio.read())
394 self.assertEquals(b"", bufio.read())
395
396 def testReadToEof(self):
397 rawio = MockRawIO((b"abc", b"d", b"efg"))
398 bufio = io.BufferedReader(rawio)
399
400 self.assertEquals(b"abcdefg", bufio.read(9000))
401
402 def testReadNoArgs(self):
403 rawio = MockRawIO((b"abc", b"d", b"efg"))
404 bufio = io.BufferedReader(rawio)
405
406 self.assertEquals(b"abcdefg", bufio.read())
407
408 def testFileno(self):
409 rawio = MockRawIO((b"abc", b"d", b"efg"))
410 bufio = io.BufferedReader(rawio)
411
412 self.assertEquals(42, bufio.fileno())
413
414 def testFilenoNoFileno(self):
415 # XXX will we always have fileno() function? If so, kill
416 # this test. Else, write it.
417 pass
418
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000419 def testThreads(self):
420 try:
421 # Write out many bytes with exactly the same number of 0's,
422 # 1's... 255's. This will help us check that concurrent reading
423 # doesn't duplicate or forget contents.
424 N = 1000
425 l = range(256) * N
426 random.shuffle(l)
427 s = bytes(bytearray(l))
428 with io.open(test_support.TESTFN, "wb") as f:
429 f.write(s)
430 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
431 bufio = io.BufferedReader(raw, 8)
432 errors = []
433 results = []
434 def f():
435 try:
436 # Intra-buffer read then buffer-flushing read
437 for n in cycle([1, 19]):
438 s = bufio.read(n)
439 if not s:
440 break
441 # list.append() is atomic
442 results.append(s)
443 except Exception as e:
444 errors.append(e)
445 raise
446 threads = [threading.Thread(target=f) for x in range(20)]
447 for t in threads:
448 t.start()
449 time.sleep(0.02) # yield
450 for t in threads:
451 t.join()
452 self.assertFalse(errors,
453 "the following exceptions were caught: %r" % errors)
454 s = b''.join(results)
455 for i in range(256):
456 c = bytes(bytearray([i]))
457 self.assertEqual(s.count(c), N)
458 finally:
459 test_support.unlink(test_support.TESTFN)
460
461
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462
463class BufferedWriterTest(unittest.TestCase):
464
465 def testWrite(self):
466 # Write to the buffered IO but don't overflow the buffer.
467 writer = MockRawIO()
468 bufio = io.BufferedWriter(writer, 8)
469
470 bufio.write(b"abc")
471
472 self.assertFalse(writer._write_stack)
473
474 def testWriteOverflow(self):
475 writer = MockRawIO()
476 bufio = io.BufferedWriter(writer, 8)
477
478 bufio.write(b"abc")
479 bufio.write(b"defghijkl")
480
481 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
482
483 def testWriteNonBlocking(self):
484 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
485 bufio = io.BufferedWriter(raw, 8, 16)
486
487 bufio.write(b"asdf")
488 bufio.write(b"asdfa")
489 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
490
491 bufio.write(b"asdfasdfasdf")
492 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
493 bufio.write(b"asdfasdfasdf")
494 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
495 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
496
497 bufio.write(b"asdfasdfasdf")
498
499 # XXX I don't like this test. It relies too heavily on how the
500 # algorithm actually works, which we might change. Refactor
501 # later.
502
503 def testFileno(self):
504 rawio = MockRawIO((b"abc", b"d", b"efg"))
505 bufio = io.BufferedWriter(rawio)
506
507 self.assertEquals(42, bufio.fileno())
508
509 def testFlush(self):
510 writer = MockRawIO()
511 bufio = io.BufferedWriter(writer, 8)
512
513 bufio.write(b"abc")
514 bufio.flush()
515
516 self.assertEquals(b"abc", writer._write_stack[0])
517
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000518 def testThreads(self):
519 # BufferedWriter should not raise exceptions or crash
520 # when called from multiple threads.
521 try:
522 # We use a real file object because it allows us to
523 # exercise situations where the GIL is released before
524 # writing the buffer to the raw streams. This is in addition
525 # to concurrency issues due to switching threads in the middle
526 # of Python code.
527 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
528 bufio = io.BufferedWriter(raw, 8)
529 errors = []
530 def f():
531 try:
532 # Write enough bytes to flush the buffer
533 s = b"a" * 19
534 for i in range(50):
535 bufio.write(s)
536 except Exception as e:
537 errors.append(e)
538 raise
539 threads = [threading.Thread(target=f) for x in range(20)]
540 for t in threads:
541 t.start()
542 time.sleep(0.02) # yield
543 for t in threads:
544 t.join()
545 self.assertFalse(errors,
546 "the following exceptions were caught: %r" % errors)
547 finally:
548 test_support.unlink(test_support.TESTFN)
549
Christian Heimes1a6387e2008-03-26 12:49:49 +0000550
551class BufferedRWPairTest(unittest.TestCase):
552
553 def testRWPair(self):
554 r = MockRawIO(())
555 w = MockRawIO()
556 pair = io.BufferedRWPair(r, w)
557
558 # XXX need implementation
559
560
561class BufferedRandomTest(unittest.TestCase):
562
563 def testReadAndWrite(self):
564 raw = MockRawIO((b"asdf", b"ghjk"))
565 rw = io.BufferedRandom(raw, 8, 12)
566
567 self.assertEqual(b"as", rw.read(2))
568 rw.write(b"ddd")
569 rw.write(b"eee")
570 self.assertFalse(raw._write_stack) # Buffer writes
571 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
572 self.assertEquals(b"dddeee", raw._write_stack[0])
573
574 def testSeekAndTell(self):
575 raw = io.BytesIO(b"asdfghjkl")
576 rw = io.BufferedRandom(raw)
577
578 self.assertEquals(b"as", rw.read(2))
579 self.assertEquals(2, rw.tell())
580 rw.seek(0, 0)
581 self.assertEquals(b"asdf", rw.read(4))
582
583 rw.write(b"asdf")
584 rw.seek(0, 0)
585 self.assertEquals(b"asdfasdfl", rw.read())
586 self.assertEquals(9, rw.tell())
587 rw.seek(-4, 2)
588 self.assertEquals(5, rw.tell())
589 rw.seek(2, 1)
590 self.assertEquals(7, rw.tell())
591 self.assertEquals(b"fl", rw.read(11))
592 self.assertRaises(TypeError, rw.seek, 0.0)
593
594# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
595# properties:
596# - A single output character can correspond to many bytes of input.
597# - The number of input bytes to complete the character can be
598# undetermined until the last input byte is received.
599# - The number of input bytes can vary depending on previous input.
600# - A single input byte can correspond to many characters of output.
601# - The number of output characters can be undetermined until the
602# last input byte is received.
603# - The number of output characters can vary depending on previous input.
604
605class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
606 """
607 For testing seek/tell behavior with a stateful, buffering decoder.
608
609 Input is a sequence of words. Words may be fixed-length (length set
610 by input) or variable-length (period-terminated). In variable-length
611 mode, extra periods are ignored. Possible words are:
612 - 'i' followed by a number sets the input length, I (maximum 99).
613 When I is set to 0, words are space-terminated.
614 - 'o' followed by a number sets the output length, O (maximum 99).
615 - Any other word is converted into a word followed by a period on
616 the output. The output word consists of the input word truncated
617 or padded out with hyphens to make its length equal to O. If O
618 is 0, the word is output verbatim without truncating or padding.
619 I and O are initially set to 1. When I changes, any buffered input is
620 re-scanned according to the new I. EOF also terminates the last word.
621 """
622
623 def __init__(self, errors='strict'):
624 codecs.IncrementalDecoder.__init__(self, errors)
625 self.reset()
626
627 def __repr__(self):
628 return '<SID %x>' % id(self)
629
630 def reset(self):
631 self.i = 1
632 self.o = 1
633 self.buffer = bytearray()
634
635 def getstate(self):
636 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
637 return bytes(self.buffer), i*100 + o
638
639 def setstate(self, state):
640 buffer, io = state
641 self.buffer = bytearray(buffer)
642 i, o = divmod(io, 100)
643 self.i, self.o = i ^ 1, o ^ 1
644
645 def decode(self, input, final=False):
646 output = ''
647 for b in input:
648 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000649 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000650 if self.buffer:
651 output += self.process_word()
652 else:
653 self.buffer.append(b)
654 else: # fixed-length, terminate after self.i bytes
655 self.buffer.append(b)
656 if len(self.buffer) == self.i:
657 output += self.process_word()
658 if final and self.buffer: # EOF terminates the last word
659 output += self.process_word()
660 return output
661
662 def process_word(self):
663 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000664 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000665 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000666 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000667 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
668 else:
669 output = self.buffer.decode('ascii')
670 if len(output) < self.o:
671 output += '-'*self.o # pad out with hyphens
672 if self.o:
673 output = output[:self.o] # truncate to output length
674 output += '.'
675 self.buffer = bytearray()
676 return output
677
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000678 codecEnabled = False
679
680 @classmethod
681 def lookupTestDecoder(cls, name):
682 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000683 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000684 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000685 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000686 incrementalencoder=None,
687 streamreader=None, streamwriter=None,
688 incrementaldecoder=cls)
689
690# Register the previous decoder for testing.
691# Disabled by default, tests will enable it.
692codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
693
694
Christian Heimes1a6387e2008-03-26 12:49:49 +0000695class StatefulIncrementalDecoderTest(unittest.TestCase):
696 """
697 Make sure the StatefulIncrementalDecoder actually works.
698 """
699
700 test_cases = [
701 # I=1, O=1 (fixed-length input == fixed-length output)
702 (b'abcd', False, 'a.b.c.d.'),
703 # I=0, O=0 (variable-length input, variable-length output)
704 (b'oiabcd', True, 'abcd.'),
705 # I=0, O=0 (should ignore extra periods)
706 (b'oi...abcd...', True, 'abcd.'),
707 # I=0, O=6 (variable-length input, fixed-length output)
708 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
709 # I=2, O=6 (fixed-length input < fixed-length output)
710 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
711 # I=6, O=3 (fixed-length input > fixed-length output)
712 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
713 # I=0, then 3; O=29, then 15 (with longer output)
714 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
715 'a----------------------------.' +
716 'b----------------------------.' +
717 'cde--------------------------.' +
718 'abcdefghijabcde.' +
719 'a.b------------.' +
720 '.c.------------.' +
721 'd.e------------.' +
722 'k--------------.' +
723 'l--------------.' +
724 'm--------------.')
725 ]
726
727 def testDecoder(self):
728 # Try a few one-shot test cases.
729 for input, eof, output in self.test_cases:
730 d = StatefulIncrementalDecoder()
731 self.assertEquals(d.decode(input, eof), output)
732
733 # Also test an unfinished decode, followed by forcing EOF.
734 d = StatefulIncrementalDecoder()
735 self.assertEquals(d.decode(b'oiabcd'), '')
736 self.assertEquals(d.decode(b'', 1), 'abcd.')
737
738class TextIOWrapperTest(unittest.TestCase):
739
740 def setUp(self):
741 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
742 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
743
744 def tearDown(self):
745 test_support.unlink(test_support.TESTFN)
746
747 def testLineBuffering(self):
748 r = io.BytesIO()
749 b = io.BufferedWriter(r, 1000)
750 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
751 t.write(u"X")
752 self.assertEquals(r.getvalue(), b"") # No flush happened
753 t.write(u"Y\nZ")
754 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
755 t.write(u"A\rB")
756 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
757
758 def testEncodingErrorsReading(self):
759 # (1) default
760 b = io.BytesIO(b"abc\n\xff\n")
761 t = io.TextIOWrapper(b, encoding="ascii")
762 self.assertRaises(UnicodeError, t.read)
763 # (2) explicit strict
764 b = io.BytesIO(b"abc\n\xff\n")
765 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
766 self.assertRaises(UnicodeError, t.read)
767 # (3) ignore
768 b = io.BytesIO(b"abc\n\xff\n")
769 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
770 self.assertEquals(t.read(), "abc\n\n")
771 # (4) replace
772 b = io.BytesIO(b"abc\n\xff\n")
773 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
774 self.assertEquals(t.read(), u"abc\n\ufffd\n")
775
776 def testEncodingErrorsWriting(self):
777 # (1) default
778 b = io.BytesIO()
779 t = io.TextIOWrapper(b, encoding="ascii")
780 self.assertRaises(UnicodeError, t.write, u"\xff")
781 # (2) explicit strict
782 b = io.BytesIO()
783 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
784 self.assertRaises(UnicodeError, t.write, u"\xff")
785 # (3) ignore
786 b = io.BytesIO()
787 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
788 newline="\n")
789 t.write(u"abc\xffdef\n")
790 t.flush()
791 self.assertEquals(b.getvalue(), b"abcdef\n")
792 # (4) replace
793 b = io.BytesIO()
794 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
795 newline="\n")
796 t.write(u"abc\xffdef\n")
797 t.flush()
798 self.assertEquals(b.getvalue(), b"abc?def\n")
799
800 def testNewlinesInput(self):
801 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
802 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
803 for newline, expected in [
804 (None, normalized.decode("ascii").splitlines(True)),
805 ("", testdata.decode("ascii").splitlines(True)),
806 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
807 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
808 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
809 ]:
810 buf = io.BytesIO(testdata)
811 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
812 self.assertEquals(txt.readlines(), expected)
813 txt.seek(0)
814 self.assertEquals(txt.read(), "".join(expected))
815
816 def testNewlinesOutput(self):
817 testdict = {
818 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
819 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
820 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
821 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
822 }
823 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
824 for newline, expected in tests:
825 buf = io.BytesIO()
826 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
827 txt.write("AAA\nB")
828 txt.write("BB\nCCC\n")
829 txt.write("X\rY\r\nZ")
830 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000831 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000832 self.assertEquals(buf.getvalue(), expected)
833
834 def testNewlines(self):
835 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
836
837 tests = [
838 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
839 [ '', input_lines ],
840 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
841 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
842 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
843 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +0000844 encodings = (
845 'utf-8', 'latin-1',
846 'utf-16', 'utf-16-le', 'utf-16-be',
847 'utf-32', 'utf-32-le', 'utf-32-be',
848 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849
850 # Try a range of buffer sizes to test the case where \r is the last
851 # character in TextIOWrapper._pending_line.
852 for encoding in encodings:
853 # XXX: str.encode() should return bytes
854 data = bytes(''.join(input_lines).encode(encoding))
855 for do_reads in (False, True):
856 for bufsize in range(1, 10):
857 for newline, exp_lines in tests:
858 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
859 textio = io.TextIOWrapper(bufio, newline=newline,
860 encoding=encoding)
861 if do_reads:
862 got_lines = []
863 while True:
864 c2 = textio.read(2)
865 if c2 == '':
866 break
867 self.assertEquals(len(c2), 2)
868 got_lines.append(c2 + textio.readline())
869 else:
870 got_lines = list(textio)
871
872 for got_line, exp_line in zip(got_lines, exp_lines):
873 self.assertEquals(got_line, exp_line)
874 self.assertEquals(len(got_lines), len(exp_lines))
875
876 def testNewlinesInput(self):
877 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
878 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
879 for newline, expected in [
880 (None, normalized.decode("ascii").splitlines(True)),
881 ("", testdata.decode("ascii").splitlines(True)),
882 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
883 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
884 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
885 ]:
886 buf = io.BytesIO(testdata)
887 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
888 self.assertEquals(txt.readlines(), expected)
889 txt.seek(0)
890 self.assertEquals(txt.read(), "".join(expected))
891
892 def testNewlinesOutput(self):
893 data = u"AAA\nBBB\rCCC\n"
894 data_lf = b"AAA\nBBB\rCCC\n"
895 data_cr = b"AAA\rBBB\rCCC\r"
896 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
897 save_linesep = os.linesep
898 try:
899 for os.linesep, newline, expected in [
900 ("\n", None, data_lf),
901 ("\r\n", None, data_crlf),
902 ("\n", "", data_lf),
903 ("\r\n", "", data_lf),
904 ("\n", "\n", data_lf),
905 ("\r\n", "\n", data_lf),
906 ("\n", "\r", data_cr),
907 ("\r\n", "\r", data_cr),
908 ("\n", "\r\n", data_crlf),
909 ("\r\n", "\r\n", data_crlf),
910 ]:
911 buf = io.BytesIO()
912 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
913 txt.write(data)
914 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000915 self.assertEquals(buf.closed, True)
916 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917 finally:
918 os.linesep = save_linesep
919
920 # Systematic tests of the text I/O API
921
922 def testBasicIO(self):
923 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
924 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
925 f = io.open(test_support.TESTFN, "w+", encoding=enc)
926 f._CHUNK_SIZE = chunksize
927 self.assertEquals(f.write(u"abc"), 3)
928 f.close()
929 f = io.open(test_support.TESTFN, "r+", encoding=enc)
930 f._CHUNK_SIZE = chunksize
931 self.assertEquals(f.tell(), 0)
932 self.assertEquals(f.read(), u"abc")
933 cookie = f.tell()
934 self.assertEquals(f.seek(0), 0)
935 self.assertEquals(f.read(2), u"ab")
936 self.assertEquals(f.read(1), u"c")
937 self.assertEquals(f.read(1), u"")
938 self.assertEquals(f.read(), u"")
939 self.assertEquals(f.tell(), cookie)
940 self.assertEquals(f.seek(0), 0)
941 self.assertEquals(f.seek(0, 2), cookie)
942 self.assertEquals(f.write(u"def"), 3)
943 self.assertEquals(f.seek(cookie), cookie)
944 self.assertEquals(f.read(), u"def")
945 if enc.startswith("utf"):
946 self.multi_line_test(f, enc)
947 f.close()
948
949 def multi_line_test(self, f, enc):
950 f.seek(0)
951 f.truncate()
952 sample = u"s\xff\u0fff\uffff"
953 wlines = []
954 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
955 chars = []
956 for i in range(size):
957 chars.append(sample[i % len(sample)])
958 line = u"".join(chars) + u"\n"
959 wlines.append((f.tell(), line))
960 f.write(line)
961 f.seek(0)
962 rlines = []
963 while True:
964 pos = f.tell()
965 line = f.readline()
966 if not line:
967 break
968 rlines.append((pos, line))
969 self.assertEquals(rlines, wlines)
970
971 def testTelling(self):
972 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
973 p0 = f.tell()
974 f.write(u"\xff\n")
975 p1 = f.tell()
976 f.write(u"\xff\n")
977 p2 = f.tell()
978 f.seek(0)
979 self.assertEquals(f.tell(), p0)
980 self.assertEquals(f.readline(), u"\xff\n")
981 self.assertEquals(f.tell(), p1)
982 self.assertEquals(f.readline(), u"\xff\n")
983 self.assertEquals(f.tell(), p2)
984 f.seek(0)
985 for line in f:
986 self.assertEquals(line, u"\xff\n")
987 self.assertRaises(IOError, f.tell)
988 self.assertEquals(f.tell(), p2)
989 f.close()
990
991 def testSeeking(self):
992 chunk_size = io.TextIOWrapper._CHUNK_SIZE
993 prefix_size = chunk_size - 2
994 u_prefix = "a" * prefix_size
995 prefix = bytes(u_prefix.encode("utf-8"))
996 self.assertEquals(len(u_prefix), len(prefix))
997 u_suffix = "\u8888\n"
998 suffix = bytes(u_suffix.encode("utf-8"))
999 line = prefix + suffix
1000 f = io.open(test_support.TESTFN, "wb")
1001 f.write(line*2)
1002 f.close()
1003 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1004 s = f.read(prefix_size)
1005 self.assertEquals(s, unicode(prefix, "ascii"))
1006 self.assertEquals(f.tell(), prefix_size)
1007 self.assertEquals(f.readline(), u_suffix)
1008
1009 def testSeekingToo(self):
1010 # Regression test for a specific bug
1011 data = b'\xe0\xbf\xbf\n'
1012 f = io.open(test_support.TESTFN, "wb")
1013 f.write(data)
1014 f.close()
1015 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1016 f._CHUNK_SIZE # Just test that it exists
1017 f._CHUNK_SIZE = 2
1018 f.readline()
1019 f.tell()
1020
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001021 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022 """Test seek/tell using the StatefulIncrementalDecoder."""
1023
Christian Heimes1a6387e2008-03-26 12:49:49 +00001024 def testSeekAndTellWithData(data, min_pos=0):
1025 """Tell/seek to various points within a data stream and ensure
1026 that the decoded data returned by read() is consistent."""
1027 f = io.open(test_support.TESTFN, 'wb')
1028 f.write(data)
1029 f.close()
1030 f = io.open(test_support.TESTFN, encoding='test_decoder')
1031 decoded = f.read()
1032 f.close()
1033
1034 for i in range(min_pos, len(decoded) + 1): # seek positions
1035 for j in [1, 5, len(decoded) - i]: # read lengths
1036 f = io.open(test_support.TESTFN, encoding='test_decoder')
1037 self.assertEquals(f.read(i), decoded[:i])
1038 cookie = f.tell()
1039 self.assertEquals(f.read(j), decoded[i:i + j])
1040 f.seek(cookie)
1041 self.assertEquals(f.read(), decoded[i:])
1042 f.close()
1043
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001044 # Enable the test decoder.
1045 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001046
1047 # Run the tests.
1048 try:
1049 # Try each test case.
1050 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1051 testSeekAndTellWithData(input)
1052
1053 # Position each test case so that it crosses a chunk boundary.
1054 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1055 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1056 offset = CHUNK_SIZE - len(input)//2
1057 prefix = b'.'*offset
1058 # Don't bother seeking into the prefix (takes too long).
1059 min_pos = offset*2
1060 testSeekAndTellWithData(prefix + input, min_pos)
1061
1062 # Ensure our test decoder won't interfere with subsequent tests.
1063 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001064 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065
1066 def testEncodedWrites(self):
1067 data = u"1234567890"
1068 tests = ("utf-16",
1069 "utf-16-le",
1070 "utf-16-be",
1071 "utf-32",
1072 "utf-32-le",
1073 "utf-32-be")
1074 for encoding in tests:
1075 buf = io.BytesIO()
1076 f = io.TextIOWrapper(buf, encoding=encoding)
1077 # Check if the BOM is written only once (see issue1753).
1078 f.write(data)
1079 f.write(data)
1080 f.seek(0)
1081 self.assertEquals(f.read(), data * 2)
1082 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1083
1084 def timingTest(self):
1085 timer = time.time
1086 enc = "utf8"
1087 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1088 nlines = 10000
1089 nchars = len(line)
1090 nbytes = len(line.encode(enc))
1091 for chunk_size in (32, 64, 128, 256):
1092 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1093 f._CHUNK_SIZE = chunk_size
1094 t0 = timer()
1095 for i in range(nlines):
1096 f.write(line)
1097 f.flush()
1098 t1 = timer()
1099 f.seek(0)
1100 for line in f:
1101 pass
1102 t2 = timer()
1103 f.seek(0)
1104 while f.readline():
1105 pass
1106 t3 = timer()
1107 f.seek(0)
1108 while f.readline():
1109 f.tell()
1110 t4 = timer()
1111 f.close()
1112 if test_support.verbose:
1113 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1114 (nlines, nchars, nbytes))
1115 print("File chunk size: %6s" % f._CHUNK_SIZE)
1116 print("Writing: %6.3f seconds" % (t1-t0))
1117 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1118 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1119 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1120
1121 def testReadOneByOne(self):
1122 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1123 reads = ""
1124 while True:
1125 c = txt.read(1)
1126 if not c:
1127 break
1128 reads += c
1129 self.assertEquals(reads, "AA\nBB")
1130
1131 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1132 def testReadByChunk(self):
1133 # make sure "\r\n" straddles 128 char boundary.
1134 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1135 reads = ""
1136 while True:
1137 c = txt.read(128)
1138 if not c:
1139 break
1140 reads += c
1141 self.assertEquals(reads, "A"*127+"\nB")
1142
1143 def test_issue1395_1(self):
1144 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1145
1146 # read one char at a time
1147 reads = ""
1148 while True:
1149 c = txt.read(1)
1150 if not c:
1151 break
1152 reads += c
1153 self.assertEquals(reads, self.normalized)
1154
1155 def test_issue1395_2(self):
1156 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1157 txt._CHUNK_SIZE = 4
1158
1159 reads = ""
1160 while True:
1161 c = txt.read(4)
1162 if not c:
1163 break
1164 reads += c
1165 self.assertEquals(reads, self.normalized)
1166
1167 def test_issue1395_3(self):
1168 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1169 txt._CHUNK_SIZE = 4
1170
1171 reads = txt.read(4)
1172 reads += txt.read(4)
1173 reads += txt.readline()
1174 reads += txt.readline()
1175 reads += txt.readline()
1176 self.assertEquals(reads, self.normalized)
1177
1178 def test_issue1395_4(self):
1179 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1180 txt._CHUNK_SIZE = 4
1181
1182 reads = txt.read(4)
1183 reads += txt.read()
1184 self.assertEquals(reads, self.normalized)
1185
1186 def test_issue1395_5(self):
1187 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1188 txt._CHUNK_SIZE = 4
1189
1190 reads = txt.read(4)
1191 pos = txt.tell()
1192 txt.seek(0)
1193 txt.seek(pos)
1194 self.assertEquals(txt.read(4), "BBB\n")
1195
1196 def test_issue2282(self):
1197 buffer = io.BytesIO(self.testdata)
1198 txt = io.TextIOWrapper(buffer, encoding="ascii")
1199
1200 self.assertEqual(buffer.seekable(), txt.seekable())
1201
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001202 def check_newline_decoder_utf8(self, decoder):
1203 # UTF-8 specific tests for a newline decoder
1204 def _check_decode(b, s, **kwargs):
1205 # We exercise getstate() / setstate() as well as decode()
1206 state = decoder.getstate()
1207 self.assertEquals(decoder.decode(b, **kwargs), s)
1208 decoder.setstate(state)
1209 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001210
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001211 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001212
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001213 _check_decode(b'\xe8', "")
1214 _check_decode(b'\xa2', "")
1215 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001216
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001217 _check_decode(b'\xe8', "")
1218 _check_decode(b'\xa2', "")
1219 _check_decode(b'\x88', "\u8888")
1220
1221 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001222 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1223
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001224 decoder.reset()
1225 _check_decode(b'\n', "\n")
1226 _check_decode(b'\r', "")
1227 _check_decode(b'', "\n", final=True)
1228 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001229
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001230 _check_decode(b'\r', "")
1231 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001232
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001233 _check_decode(b'\r\r\n', "\n\n")
1234 _check_decode(b'\r', "")
1235 _check_decode(b'\r', "\n")
1236 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001237
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001238 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1239 _check_decode(b'\xe8\xa2\x88', "\u8888")
1240 _check_decode(b'\n', "\n")
1241 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1242 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001243
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001244 def check_newline_decoder(self, decoder, encoding):
1245 result = []
1246 encoder = codecs.getincrementalencoder(encoding)()
1247 def _decode_bytewise(s):
1248 for b in encoder.encode(s):
1249 result.append(decoder.decode(b))
1250 self.assertEquals(decoder.newlines, None)
1251 _decode_bytewise("abc\n\r")
1252 self.assertEquals(decoder.newlines, '\n')
1253 _decode_bytewise("\nabc")
1254 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1255 _decode_bytewise("abc\r")
1256 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1257 _decode_bytewise("abc")
1258 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1259 _decode_bytewise("abc\r")
1260 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1261 decoder.reset()
1262 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1263 self.assertEquals(decoder.newlines, None)
1264
1265 def test_newline_decoder(self):
1266 encodings = (
1267 'utf-8', 'latin-1',
1268 'utf-16', 'utf-16-le', 'utf-16-be',
1269 'utf-32', 'utf-32-le', 'utf-32-be',
1270 )
1271 for enc in encodings:
1272 decoder = codecs.getincrementaldecoder(enc)()
1273 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1274 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001275 decoder = codecs.getincrementaldecoder("utf-8")()
1276 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001277 self.check_newline_decoder_utf8(decoder)
1278
Christian Heimes1a6387e2008-03-26 12:49:49 +00001279
1280# XXX Tests for open()
1281
1282class MiscIOTest(unittest.TestCase):
1283
Benjamin Petersonad100c32008-11-20 22:06:22 +00001284 def tearDown(self):
1285 test_support.unlink(test_support.TESTFN)
1286
Christian Heimes1a6387e2008-03-26 12:49:49 +00001287 def testImport__all__(self):
1288 for name in io.__all__:
1289 obj = getattr(io, name, None)
1290 self.assert_(obj is not None, name)
1291 if name == "open":
1292 continue
1293 elif "error" in name.lower():
1294 self.assert_(issubclass(obj, Exception), name)
1295 else:
1296 self.assert_(issubclass(obj, io.IOBase))
1297
1298
Benjamin Petersonad100c32008-11-20 22:06:22 +00001299 def test_attributes(self):
1300 f = io.open(test_support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001301 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001302 f.close()
1303
1304 f = io.open(test_support.TESTFN, "U")
1305 self.assertEquals(f.name, test_support.TESTFN)
1306 self.assertEquals(f.buffer.name, test_support.TESTFN)
1307 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1308 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001309 self.assertEquals(f.buffer.mode, "rb")
1310 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001311 f.close()
1312
1313 f = io.open(test_support.TESTFN, "w+")
1314 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001315 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1316 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001317
1318 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00001319 self.assertEquals(g.mode, "wb")
1320 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00001321 self.assertEquals(g.name, f.fileno())
1322 self.assertEquals(g.raw.name, f.fileno())
1323 f.close()
1324 g.close()
1325
1326
Christian Heimes1a6387e2008-03-26 12:49:49 +00001327def test_main():
1328 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001329 BufferedReaderTest, BufferedWriterTest,
1330 BufferedRWPairTest, BufferedRandomTest,
1331 StatefulIncrementalDecoderTest,
1332 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001333
1334if __name__ == "__main__":
1335 unittest.main()