blob: 7110259dfffbc30def302885a2b84638b2f0d551 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
9import unittest
10from itertools import chain
11from test import test_support
12
13import codecs
14import io # The module under test
15
16
17class MockRawIO(io.RawIOBase):
18
19 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
22
23 def read(self, n=None):
24 try:
25 return self._read_stack.pop(0)
26 except:
27 return b""
28
29 def write(self, b):
30 self._write_stack.append(b[:])
31 return len(b)
32
33 def writable(self):
34 return True
35
36 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
42 def seekable(self):
43 return True
44
45 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
49 return 42
50
51
52class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
64class MockNonBlockWriterIO(io.RawIOBase):
65
66 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
68 self._write_stack = []
69
70 def write(self, b):
71 self._write_stack.append(b[:])
72 n = self._blocking_script.pop(0)
73 if (n < 0):
74 raise io.BlockingIOError(0, "test blocking", -n)
75 else:
76 return n
77
78 def writable(self):
79 return True
80
81
82class IOTest(unittest.TestCase):
83
84 def tearDown(self):
85 test_support.unlink(test_support.TESTFN)
86
87 def write_ops(self, f):
88 self.assertEqual(f.write(b"blah."), 5)
89 self.assertEqual(f.seek(0), 0)
90 self.assertEqual(f.write(b"Hello."), 6)
91 self.assertEqual(f.tell(), 6)
92 self.assertEqual(f.seek(-1, 1), 5)
93 self.assertEqual(f.tell(), 5)
94 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
95 self.assertEqual(f.seek(0), 0)
96 self.assertEqual(f.write(b"h"), 1)
97 self.assertEqual(f.seek(-1, 2), 13)
98 self.assertEqual(f.tell(), 13)
99 self.assertEqual(f.truncate(12), 12)
100 self.assertEqual(f.tell(), 13)
101 self.assertRaises(TypeError, f.seek, 0.0)
102
103 def read_ops(self, f, buffered=False):
104 data = f.read(5)
105 self.assertEqual(data, b"hello")
106 data = bytearray(data)
107 self.assertEqual(f.readinto(data), 5)
108 self.assertEqual(data, b" worl")
109 self.assertEqual(f.readinto(data), 2)
110 self.assertEqual(len(data), 5)
111 self.assertEqual(data[:2], b"d\n")
112 self.assertEqual(f.seek(0), 0)
113 self.assertEqual(f.read(20), b"hello world\n")
114 self.assertEqual(f.read(1), b"")
115 self.assertEqual(f.readinto(bytearray(b"x")), 0)
116 self.assertEqual(f.seek(-6, 2), 6)
117 self.assertEqual(f.read(5), b"world")
118 self.assertEqual(f.read(0), b"")
119 self.assertEqual(f.readinto(bytearray()), 0)
120 self.assertEqual(f.seek(-6, 1), 5)
121 self.assertEqual(f.read(5), b" worl")
122 self.assertEqual(f.tell(), 10)
123 self.assertRaises(TypeError, f.seek, 0.0)
124 if buffered:
125 f.seek(0)
126 self.assertEqual(f.read(), b"hello world\n")
127 f.seek(6)
128 self.assertEqual(f.read(), b"world\n")
129 self.assertEqual(f.read(), b"")
130
131 LARGE = 2**31
132
133 def large_file_ops(self, f):
134 assert f.readable()
135 assert f.writable()
136 self.assertEqual(f.seek(self.LARGE), self.LARGE)
137 self.assertEqual(f.tell(), self.LARGE)
138 self.assertEqual(f.write(b"xxx"), 3)
139 self.assertEqual(f.tell(), self.LARGE + 3)
140 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
141 self.assertEqual(f.truncate(), self.LARGE + 2)
142 self.assertEqual(f.tell(), self.LARGE + 2)
143 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
144 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
147 self.assertEqual(f.seek(-1, 2), self.LARGE)
148 self.assertEqual(f.read(2), b"x")
149
150 def test_raw_file_io(self):
151 f = io.open(test_support.TESTFN, "wb", buffering=0)
152 self.assertEqual(f.readable(), False)
153 self.assertEqual(f.writable(), True)
154 self.assertEqual(f.seekable(), True)
155 self.write_ops(f)
156 f.close()
157 f = io.open(test_support.TESTFN, "rb", buffering=0)
158 self.assertEqual(f.readable(), True)
159 self.assertEqual(f.writable(), False)
160 self.assertEqual(f.seekable(), True)
161 self.read_ops(f)
162 f.close()
163
164 def test_buffered_file_io(self):
165 f = io.open(test_support.TESTFN, "wb")
166 self.assertEqual(f.readable(), False)
167 self.assertEqual(f.writable(), True)
168 self.assertEqual(f.seekable(), True)
169 self.write_ops(f)
170 f.close()
171 f = io.open(test_support.TESTFN, "rb")
172 self.assertEqual(f.readable(), True)
173 self.assertEqual(f.writable(), False)
174 self.assertEqual(f.seekable(), True)
175 self.read_ops(f, True)
176 f.close()
177
178 def test_readline(self):
179 f = io.open(test_support.TESTFN, "wb")
180 f.write(b"abc\ndef\nxyzzy\nfoo")
181 f.close()
182 f = io.open(test_support.TESTFN, "rb")
183 self.assertEqual(f.readline(), b"abc\n")
184 self.assertEqual(f.readline(10), b"def\n")
185 self.assertEqual(f.readline(2), b"xy")
186 self.assertEqual(f.readline(4), b"zzy\n")
187 self.assertEqual(f.readline(), b"foo")
188 f.close()
189
190 def test_raw_bytes_io(self):
191 f = io.BytesIO()
192 self.write_ops(f)
193 data = f.getvalue()
194 self.assertEqual(data, b"hello world\n")
195 f = io.BytesIO(data)
196 self.read_ops(f, True)
197
198 def test_large_file_ops(self):
199 # On Windows and Mac OSX this test comsumes large resources; It takes
200 # a long time to build the >2GB file and takes >2GB of disk space
201 # therefore the resource must be enabled to run this test.
202 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
203 if not test_support.is_resource_enabled("largefile"):
204 print("\nTesting large file ops skipped on %s." % sys.platform,
205 file=sys.stderr)
206 print("It requires %d bytes and a long time." % self.LARGE,
207 file=sys.stderr)
208 print("Use 'regrtest.py -u largefile test_io' to run it.",
209 file=sys.stderr)
210 return
211 f = io.open(test_support.TESTFN, "w+b", 0)
212 self.large_file_ops(f)
213 f.close()
214 f = io.open(test_support.TESTFN, "w+b")
215 self.large_file_ops(f)
216 f.close()
217
218 def test_with_open(self):
219 for bufsize in (0, 1, 100):
220 f = None
221 with open(test_support.TESTFN, "wb", bufsize) as f:
222 f.write(b"xxx")
223 self.assertEqual(f.closed, True)
224 f = None
225 try:
226 with open(test_support.TESTFN, "wb", bufsize) as f:
227 1/0
228 except ZeroDivisionError:
229 self.assertEqual(f.closed, True)
230 else:
231 self.fail("1/0 didn't raise an exception")
232
233 def test_destructor(self):
234 record = []
235 class MyFileIO(io.FileIO):
236 def __del__(self):
237 record.append(1)
238 io.FileIO.__del__(self)
239 def close(self):
240 record.append(2)
241 io.FileIO.close(self)
242 def flush(self):
243 record.append(3)
244 io.FileIO.flush(self)
245 f = MyFileIO(test_support.TESTFN, "w")
246 f.write("xxx")
247 del f
248 self.assertEqual(record, [1, 2, 3])
249
250 def test_close_flushes(self):
251 f = io.open(test_support.TESTFN, "wb")
252 f.write(b"xxx")
253 f.close()
254 f = io.open(test_support.TESTFN, "rb")
255 self.assertEqual(f.read(), b"xxx")
256 f.close()
257
258 def XXXtest_array_writes(self):
259 # XXX memory view not available yet
260 a = array.array('i', range(10))
261 n = len(memoryview(a))
262 f = io.open(test_support.TESTFN, "wb", 0)
263 self.assertEqual(f.write(a), n)
264 f.close()
265 f = io.open(test_support.TESTFN, "wb")
266 self.assertEqual(f.write(a), n)
267 f.close()
268
269 def test_closefd(self):
270 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
271 closefd=False)
272
273class MemorySeekTestMixin:
274
275 def testInit(self):
276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
278
279 def testRead(self):
280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
286 self.assertEquals(self.EOF, bytesIo.read())
287
288 def testReadNoArgs(self):
289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
291
292 self.assertEquals(buf, bytesIo.read())
293 self.assertEquals(self.EOF, bytesIo.read())
294
295 def testSeek(self):
296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
306
307 def testTell(self):
308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
317
318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
322 ioclass = io.BytesIO
323 EOF = b""
324
325
326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
332class BufferedReaderTest(unittest.TestCase):
333
334 def testRead(self):
335 rawio = MockRawIO((b"abc", b"d", b"efg"))
336 bufio = io.BufferedReader(rawio)
337
338 self.assertEquals(b"abcdef", bufio.read(6))
339
340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
353 pos = 0
354 for nbytes in buf_read_sizes:
355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
356 pos += nbytes
357 self.assertEquals(rawio.read_history, raw_read_sizes)
358
359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
369
370 def testReadToEof(self):
371 rawio = MockRawIO((b"abc", b"d", b"efg"))
372 bufio = io.BufferedReader(rawio)
373
374 self.assertEquals(b"abcdefg", bufio.read(9000))
375
376 def testReadNoArgs(self):
377 rawio = MockRawIO((b"abc", b"d", b"efg"))
378 bufio = io.BufferedReader(rawio)
379
380 self.assertEquals(b"abcdefg", bufio.read())
381
382 def testFileno(self):
383 rawio = MockRawIO((b"abc", b"d", b"efg"))
384 bufio = io.BufferedReader(rawio)
385
386 self.assertEquals(42, bufio.fileno())
387
388 def testFilenoNoFileno(self):
389 # XXX will we always have fileno() function? If so, kill
390 # this test. Else, write it.
391 pass
392
393
394class BufferedWriterTest(unittest.TestCase):
395
396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
398 writer = MockRawIO()
399 bufio = io.BufferedWriter(writer, 8)
400
401 bufio.write(b"abc")
402
403 self.assertFalse(writer._write_stack)
404
405 def testWriteOverflow(self):
406 writer = MockRawIO()
407 bufio = io.BufferedWriter(writer, 8)
408
409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
411
412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
413
414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
416 bufio = io.BufferedWriter(raw, 8, 16)
417
418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
422 bufio.write(b"asdfasdfasdf")
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
424 bufio.write(b"asdfasdfasdf")
425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
428 bufio.write(b"asdfasdfasdf")
429
430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
433
434 def testFileno(self):
435 rawio = MockRawIO((b"abc", b"d", b"efg"))
436 bufio = io.BufferedWriter(rawio)
437
438 self.assertEquals(42, bufio.fileno())
439
440 def testFlush(self):
441 writer = MockRawIO()
442 bufio = io.BufferedWriter(writer, 8)
443
444 bufio.write(b"abc")
445 bufio.flush()
446
447 self.assertEquals(b"abc", writer._write_stack[0])
448
449
450class BufferedRWPairTest(unittest.TestCase):
451
452 def testRWPair(self):
453 r = MockRawIO(())
454 w = MockRawIO()
455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
459
460class BufferedRandomTest(unittest.TestCase):
461
462 def testReadAndWrite(self):
463 raw = MockRawIO((b"asdf", b"ghjk"))
464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
469 self.assertFalse(raw._write_stack) # Buffer writes
470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
471 self.assertEquals(b"dddeee", raw._write_stack[0])
472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
491 self.assertRaises(TypeError, rw.seek, 0.0)
492
493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
523 codecs.IncrementalDecoder.__init__(self, errors)
524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000548 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000563 if self.buffer[0] == 'i':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000565 elif self.buffer[0] == 'o':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000577 codecEnabled = False
578
579 @classmethod
580 def lookupTestDecoder(cls, name):
581 if cls.codecEnabled and name == 'test_decoder':
582 return codecs.CodecInfo(
583 name='test_decoder', encode=None, decode=None,
584 incrementalencoder=None,
585 streamreader=None, streamwriter=None,
586 incrementaldecoder=cls)
587
588# Register the previous decoder for testing.
589# Disabled by default, tests will enable it.
590codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
591
592
Christian Heimes1a6387e2008-03-26 12:49:49 +0000593class StatefulIncrementalDecoderTest(unittest.TestCase):
594 """
595 Make sure the StatefulIncrementalDecoder actually works.
596 """
597
598 test_cases = [
599 # I=1, O=1 (fixed-length input == fixed-length output)
600 (b'abcd', False, 'a.b.c.d.'),
601 # I=0, O=0 (variable-length input, variable-length output)
602 (b'oiabcd', True, 'abcd.'),
603 # I=0, O=0 (should ignore extra periods)
604 (b'oi...abcd...', True, 'abcd.'),
605 # I=0, O=6 (variable-length input, fixed-length output)
606 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
607 # I=2, O=6 (fixed-length input < fixed-length output)
608 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
609 # I=6, O=3 (fixed-length input > fixed-length output)
610 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
611 # I=0, then 3; O=29, then 15 (with longer output)
612 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
613 'a----------------------------.' +
614 'b----------------------------.' +
615 'cde--------------------------.' +
616 'abcdefghijabcde.' +
617 'a.b------------.' +
618 '.c.------------.' +
619 'd.e------------.' +
620 'k--------------.' +
621 'l--------------.' +
622 'm--------------.')
623 ]
624
625 def testDecoder(self):
626 # Try a few one-shot test cases.
627 for input, eof, output in self.test_cases:
628 d = StatefulIncrementalDecoder()
629 self.assertEquals(d.decode(input, eof), output)
630
631 # Also test an unfinished decode, followed by forcing EOF.
632 d = StatefulIncrementalDecoder()
633 self.assertEquals(d.decode(b'oiabcd'), '')
634 self.assertEquals(d.decode(b'', 1), 'abcd.')
635
636class TextIOWrapperTest(unittest.TestCase):
637
638 def setUp(self):
639 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
640 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
641
642 def tearDown(self):
643 test_support.unlink(test_support.TESTFN)
644
645 def testLineBuffering(self):
646 r = io.BytesIO()
647 b = io.BufferedWriter(r, 1000)
648 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
649 t.write(u"X")
650 self.assertEquals(r.getvalue(), b"") # No flush happened
651 t.write(u"Y\nZ")
652 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
653 t.write(u"A\rB")
654 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
655
656 def testEncodingErrorsReading(self):
657 # (1) default
658 b = io.BytesIO(b"abc\n\xff\n")
659 t = io.TextIOWrapper(b, encoding="ascii")
660 self.assertRaises(UnicodeError, t.read)
661 # (2) explicit strict
662 b = io.BytesIO(b"abc\n\xff\n")
663 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
664 self.assertRaises(UnicodeError, t.read)
665 # (3) ignore
666 b = io.BytesIO(b"abc\n\xff\n")
667 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
668 self.assertEquals(t.read(), "abc\n\n")
669 # (4) replace
670 b = io.BytesIO(b"abc\n\xff\n")
671 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
672 self.assertEquals(t.read(), u"abc\n\ufffd\n")
673
674 def testEncodingErrorsWriting(self):
675 # (1) default
676 b = io.BytesIO()
677 t = io.TextIOWrapper(b, encoding="ascii")
678 self.assertRaises(UnicodeError, t.write, u"\xff")
679 # (2) explicit strict
680 b = io.BytesIO()
681 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
682 self.assertRaises(UnicodeError, t.write, u"\xff")
683 # (3) ignore
684 b = io.BytesIO()
685 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
686 newline="\n")
687 t.write(u"abc\xffdef\n")
688 t.flush()
689 self.assertEquals(b.getvalue(), b"abcdef\n")
690 # (4) replace
691 b = io.BytesIO()
692 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
693 newline="\n")
694 t.write(u"abc\xffdef\n")
695 t.flush()
696 self.assertEquals(b.getvalue(), b"abc?def\n")
697
698 def testNewlinesInput(self):
699 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
700 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
701 for newline, expected in [
702 (None, normalized.decode("ascii").splitlines(True)),
703 ("", testdata.decode("ascii").splitlines(True)),
704 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
705 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
706 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
707 ]:
708 buf = io.BytesIO(testdata)
709 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
710 self.assertEquals(txt.readlines(), expected)
711 txt.seek(0)
712 self.assertEquals(txt.read(), "".join(expected))
713
714 def testNewlinesOutput(self):
715 testdict = {
716 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
717 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
718 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
719 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
720 }
721 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
722 for newline, expected in tests:
723 buf = io.BytesIO()
724 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
725 txt.write("AAA\nB")
726 txt.write("BB\nCCC\n")
727 txt.write("X\rY\r\nZ")
728 txt.flush()
729 self.assertEquals(buf.getvalue(), expected)
730
731 def testNewlines(self):
732 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
733
734 tests = [
735 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
736 [ '', input_lines ],
737 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
738 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
739 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
740 ]
741
742 encodings = ('utf-8', 'latin-1')
743
744 # Try a range of buffer sizes to test the case where \r is the last
745 # character in TextIOWrapper._pending_line.
746 for encoding in encodings:
747 # XXX: str.encode() should return bytes
748 data = bytes(''.join(input_lines).encode(encoding))
749 for do_reads in (False, True):
750 for bufsize in range(1, 10):
751 for newline, exp_lines in tests:
752 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
753 textio = io.TextIOWrapper(bufio, newline=newline,
754 encoding=encoding)
755 if do_reads:
756 got_lines = []
757 while True:
758 c2 = textio.read(2)
759 if c2 == '':
760 break
761 self.assertEquals(len(c2), 2)
762 got_lines.append(c2 + textio.readline())
763 else:
764 got_lines = list(textio)
765
766 for got_line, exp_line in zip(got_lines, exp_lines):
767 self.assertEquals(got_line, exp_line)
768 self.assertEquals(len(got_lines), len(exp_lines))
769
770 def testNewlinesInput(self):
771 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
772 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
773 for newline, expected in [
774 (None, normalized.decode("ascii").splitlines(True)),
775 ("", testdata.decode("ascii").splitlines(True)),
776 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
777 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
778 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
779 ]:
780 buf = io.BytesIO(testdata)
781 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
782 self.assertEquals(txt.readlines(), expected)
783 txt.seek(0)
784 self.assertEquals(txt.read(), "".join(expected))
785
786 def testNewlinesOutput(self):
787 data = u"AAA\nBBB\rCCC\n"
788 data_lf = b"AAA\nBBB\rCCC\n"
789 data_cr = b"AAA\rBBB\rCCC\r"
790 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
791 save_linesep = os.linesep
792 try:
793 for os.linesep, newline, expected in [
794 ("\n", None, data_lf),
795 ("\r\n", None, data_crlf),
796 ("\n", "", data_lf),
797 ("\r\n", "", data_lf),
798 ("\n", "\n", data_lf),
799 ("\r\n", "\n", data_lf),
800 ("\n", "\r", data_cr),
801 ("\r\n", "\r", data_cr),
802 ("\n", "\r\n", data_crlf),
803 ("\r\n", "\r\n", data_crlf),
804 ]:
805 buf = io.BytesIO()
806 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
807 txt.write(data)
808 txt.close()
809 self.assertEquals(buf.getvalue(), expected)
810 finally:
811 os.linesep = save_linesep
812
813 # Systematic tests of the text I/O API
814
815 def testBasicIO(self):
816 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
817 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
818 f = io.open(test_support.TESTFN, "w+", encoding=enc)
819 f._CHUNK_SIZE = chunksize
820 self.assertEquals(f.write(u"abc"), 3)
821 f.close()
822 f = io.open(test_support.TESTFN, "r+", encoding=enc)
823 f._CHUNK_SIZE = chunksize
824 self.assertEquals(f.tell(), 0)
825 self.assertEquals(f.read(), u"abc")
826 cookie = f.tell()
827 self.assertEquals(f.seek(0), 0)
828 self.assertEquals(f.read(2), u"ab")
829 self.assertEquals(f.read(1), u"c")
830 self.assertEquals(f.read(1), u"")
831 self.assertEquals(f.read(), u"")
832 self.assertEquals(f.tell(), cookie)
833 self.assertEquals(f.seek(0), 0)
834 self.assertEquals(f.seek(0, 2), cookie)
835 self.assertEquals(f.write(u"def"), 3)
836 self.assertEquals(f.seek(cookie), cookie)
837 self.assertEquals(f.read(), u"def")
838 if enc.startswith("utf"):
839 self.multi_line_test(f, enc)
840 f.close()
841
842 def multi_line_test(self, f, enc):
843 f.seek(0)
844 f.truncate()
845 sample = u"s\xff\u0fff\uffff"
846 wlines = []
847 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
848 chars = []
849 for i in range(size):
850 chars.append(sample[i % len(sample)])
851 line = u"".join(chars) + u"\n"
852 wlines.append((f.tell(), line))
853 f.write(line)
854 f.seek(0)
855 rlines = []
856 while True:
857 pos = f.tell()
858 line = f.readline()
859 if not line:
860 break
861 rlines.append((pos, line))
862 self.assertEquals(rlines, wlines)
863
864 def testTelling(self):
865 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
866 p0 = f.tell()
867 f.write(u"\xff\n")
868 p1 = f.tell()
869 f.write(u"\xff\n")
870 p2 = f.tell()
871 f.seek(0)
872 self.assertEquals(f.tell(), p0)
873 self.assertEquals(f.readline(), u"\xff\n")
874 self.assertEquals(f.tell(), p1)
875 self.assertEquals(f.readline(), u"\xff\n")
876 self.assertEquals(f.tell(), p2)
877 f.seek(0)
878 for line in f:
879 self.assertEquals(line, u"\xff\n")
880 self.assertRaises(IOError, f.tell)
881 self.assertEquals(f.tell(), p2)
882 f.close()
883
884 def testSeeking(self):
885 chunk_size = io.TextIOWrapper._CHUNK_SIZE
886 prefix_size = chunk_size - 2
887 u_prefix = "a" * prefix_size
888 prefix = bytes(u_prefix.encode("utf-8"))
889 self.assertEquals(len(u_prefix), len(prefix))
890 u_suffix = "\u8888\n"
891 suffix = bytes(u_suffix.encode("utf-8"))
892 line = prefix + suffix
893 f = io.open(test_support.TESTFN, "wb")
894 f.write(line*2)
895 f.close()
896 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
897 s = f.read(prefix_size)
898 self.assertEquals(s, unicode(prefix, "ascii"))
899 self.assertEquals(f.tell(), prefix_size)
900 self.assertEquals(f.readline(), u_suffix)
901
902 def testSeekingToo(self):
903 # Regression test for a specific bug
904 data = b'\xe0\xbf\xbf\n'
905 f = io.open(test_support.TESTFN, "wb")
906 f.write(data)
907 f.close()
908 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
909 f._CHUNK_SIZE # Just test that it exists
910 f._CHUNK_SIZE = 2
911 f.readline()
912 f.tell()
913
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000914 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000915 """Test seek/tell using the StatefulIncrementalDecoder."""
916
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917 def testSeekAndTellWithData(data, min_pos=0):
918 """Tell/seek to various points within a data stream and ensure
919 that the decoded data returned by read() is consistent."""
920 f = io.open(test_support.TESTFN, 'wb')
921 f.write(data)
922 f.close()
923 f = io.open(test_support.TESTFN, encoding='test_decoder')
924 decoded = f.read()
925 f.close()
926
927 for i in range(min_pos, len(decoded) + 1): # seek positions
928 for j in [1, 5, len(decoded) - i]: # read lengths
929 f = io.open(test_support.TESTFN, encoding='test_decoder')
930 self.assertEquals(f.read(i), decoded[:i])
931 cookie = f.tell()
932 self.assertEquals(f.read(j), decoded[i:i + j])
933 f.seek(cookie)
934 self.assertEquals(f.read(), decoded[i:])
935 f.close()
936
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000937 # Enable the test decoder.
938 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +0000939
940 # Run the tests.
941 try:
942 # Try each test case.
943 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
944 testSeekAndTellWithData(input)
945
946 # Position each test case so that it crosses a chunk boundary.
947 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
948 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
949 offset = CHUNK_SIZE - len(input)//2
950 prefix = b'.'*offset
951 # Don't bother seeking into the prefix (takes too long).
952 min_pos = offset*2
953 testSeekAndTellWithData(prefix + input, min_pos)
954
955 # Ensure our test decoder won't interfere with subsequent tests.
956 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000957 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000958
959 def testEncodedWrites(self):
960 data = u"1234567890"
961 tests = ("utf-16",
962 "utf-16-le",
963 "utf-16-be",
964 "utf-32",
965 "utf-32-le",
966 "utf-32-be")
967 for encoding in tests:
968 buf = io.BytesIO()
969 f = io.TextIOWrapper(buf, encoding=encoding)
970 # Check if the BOM is written only once (see issue1753).
971 f.write(data)
972 f.write(data)
973 f.seek(0)
974 self.assertEquals(f.read(), data * 2)
975 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
976
977 def timingTest(self):
978 timer = time.time
979 enc = "utf8"
980 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
981 nlines = 10000
982 nchars = len(line)
983 nbytes = len(line.encode(enc))
984 for chunk_size in (32, 64, 128, 256):
985 f = io.open(test_support.TESTFN, "w+", encoding=enc)
986 f._CHUNK_SIZE = chunk_size
987 t0 = timer()
988 for i in range(nlines):
989 f.write(line)
990 f.flush()
991 t1 = timer()
992 f.seek(0)
993 for line in f:
994 pass
995 t2 = timer()
996 f.seek(0)
997 while f.readline():
998 pass
999 t3 = timer()
1000 f.seek(0)
1001 while f.readline():
1002 f.tell()
1003 t4 = timer()
1004 f.close()
1005 if test_support.verbose:
1006 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1007 (nlines, nchars, nbytes))
1008 print("File chunk size: %6s" % f._CHUNK_SIZE)
1009 print("Writing: %6.3f seconds" % (t1-t0))
1010 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1011 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1012 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1013
1014 def testReadOneByOne(self):
1015 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1016 reads = ""
1017 while True:
1018 c = txt.read(1)
1019 if not c:
1020 break
1021 reads += c
1022 self.assertEquals(reads, "AA\nBB")
1023
1024 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1025 def testReadByChunk(self):
1026 # make sure "\r\n" straddles 128 char boundary.
1027 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1028 reads = ""
1029 while True:
1030 c = txt.read(128)
1031 if not c:
1032 break
1033 reads += c
1034 self.assertEquals(reads, "A"*127+"\nB")
1035
1036 def test_issue1395_1(self):
1037 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1038
1039 # read one char at a time
1040 reads = ""
1041 while True:
1042 c = txt.read(1)
1043 if not c:
1044 break
1045 reads += c
1046 self.assertEquals(reads, self.normalized)
1047
1048 def test_issue1395_2(self):
1049 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1050 txt._CHUNK_SIZE = 4
1051
1052 reads = ""
1053 while True:
1054 c = txt.read(4)
1055 if not c:
1056 break
1057 reads += c
1058 self.assertEquals(reads, self.normalized)
1059
1060 def test_issue1395_3(self):
1061 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1062 txt._CHUNK_SIZE = 4
1063
1064 reads = txt.read(4)
1065 reads += txt.read(4)
1066 reads += txt.readline()
1067 reads += txt.readline()
1068 reads += txt.readline()
1069 self.assertEquals(reads, self.normalized)
1070
1071 def test_issue1395_4(self):
1072 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1073 txt._CHUNK_SIZE = 4
1074
1075 reads = txt.read(4)
1076 reads += txt.read()
1077 self.assertEquals(reads, self.normalized)
1078
1079 def test_issue1395_5(self):
1080 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1081 txt._CHUNK_SIZE = 4
1082
1083 reads = txt.read(4)
1084 pos = txt.tell()
1085 txt.seek(0)
1086 txt.seek(pos)
1087 self.assertEquals(txt.read(4), "BBB\n")
1088
1089 def test_issue2282(self):
1090 buffer = io.BytesIO(self.testdata)
1091 txt = io.TextIOWrapper(buffer, encoding="ascii")
1092
1093 self.assertEqual(buffer.seekable(), txt.seekable())
1094
1095 def test_newline_decoder(self):
1096 import codecs
1097 decoder = codecs.getincrementaldecoder("utf-8")()
1098 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1099
1100 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1101
1102 self.assertEquals(decoder.decode(b'\xe8'), u"")
1103 self.assertEquals(decoder.decode(b'\xa2'), u"")
1104 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1105
1106 self.assertEquals(decoder.decode(b'\xe8'), u"")
1107 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1108
1109 decoder.setstate((b'', 0))
1110 self.assertEquals(decoder.decode(b'\n'), u"\n")
1111 self.assertEquals(decoder.decode(b'\r'), u"")
1112 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1113 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1114
1115 self.assertEquals(decoder.decode(b'\r'), u"")
1116 self.assertEquals(decoder.decode(b'a'), u"\na")
1117
1118 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1119 self.assertEquals(decoder.decode(b'\r'), u"")
1120 self.assertEquals(decoder.decode(b'\r'), u"\n")
1121 self.assertEquals(decoder.decode(b'\na'), u"\na")
1122
1123 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1124 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1125 self.assertEquals(decoder.decode(b'\n'), u"\n")
1126 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1127 self.assertEquals(decoder.decode(b'\n'), u"\n")
1128
1129 decoder = codecs.getincrementaldecoder("utf-8")()
1130 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1131 self.assertEquals(decoder.newlines, None)
1132 decoder.decode(b"abc\n\r")
1133 self.assertEquals(decoder.newlines, u'\n')
1134 decoder.decode(b"\nabc")
1135 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1136 decoder.decode(b"abc\r")
1137 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1138 decoder.decode(b"abc")
1139 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1140 decoder.decode(b"abc\r")
1141 decoder.reset()
1142 self.assertEquals(decoder.decode(b"abc"), "abc")
1143 self.assertEquals(decoder.newlines, None)
1144
1145# XXX Tests for open()
1146
1147class MiscIOTest(unittest.TestCase):
1148
1149 def testImport__all__(self):
1150 for name in io.__all__:
1151 obj = getattr(io, name, None)
1152 self.assert_(obj is not None, name)
1153 if name == "open":
1154 continue
1155 elif "error" in name.lower():
1156 self.assert_(issubclass(obj, Exception), name)
1157 else:
1158 self.assert_(issubclass(obj, io.IOBase))
1159
1160
1161def test_main():
1162 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1163 BufferedReaderTest,
1164 BufferedWriterTest, BufferedRWPairTest,
1165 BufferedRandomTest, TextIOWrapperTest,
1166 MiscIOTest)
1167
1168if __name__ == "__main__":
1169 unittest.main()