blob: e26b7083e75bd1b0b640557c0b8859a3532b4ef3 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
9import unittest
10from itertools import chain
11from test import test_support
12
13import codecs
14import io # The module under test
15
16
17class MockRawIO(io.RawIOBase):
18
19 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
22
23 def read(self, n=None):
24 try:
25 return self._read_stack.pop(0)
26 except:
27 return b""
28
29 def write(self, b):
30 self._write_stack.append(b[:])
31 return len(b)
32
33 def writable(self):
34 return True
35
36 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
42 def seekable(self):
43 return True
44
45 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
49 return 42
50
51
52class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
64class MockNonBlockWriterIO(io.RawIOBase):
65
66 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
68 self._write_stack = []
69
70 def write(self, b):
71 self._write_stack.append(b[:])
72 n = self._blocking_script.pop(0)
73 if (n < 0):
74 raise io.BlockingIOError(0, "test blocking", -n)
75 else:
76 return n
77
78 def writable(self):
79 return True
80
81
82class IOTest(unittest.TestCase):
83
84 def tearDown(self):
85 test_support.unlink(test_support.TESTFN)
86
87 def write_ops(self, f):
88 self.assertEqual(f.write(b"blah."), 5)
89 self.assertEqual(f.seek(0), 0)
90 self.assertEqual(f.write(b"Hello."), 6)
91 self.assertEqual(f.tell(), 6)
92 self.assertEqual(f.seek(-1, 1), 5)
93 self.assertEqual(f.tell(), 5)
94 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
95 self.assertEqual(f.seek(0), 0)
96 self.assertEqual(f.write(b"h"), 1)
97 self.assertEqual(f.seek(-1, 2), 13)
98 self.assertEqual(f.tell(), 13)
99 self.assertEqual(f.truncate(12), 12)
100 self.assertEqual(f.tell(), 13)
101 self.assertRaises(TypeError, f.seek, 0.0)
102
103 def read_ops(self, f, buffered=False):
104 data = f.read(5)
105 self.assertEqual(data, b"hello")
106 data = bytearray(data)
107 self.assertEqual(f.readinto(data), 5)
108 self.assertEqual(data, b" worl")
109 self.assertEqual(f.readinto(data), 2)
110 self.assertEqual(len(data), 5)
111 self.assertEqual(data[:2], b"d\n")
112 self.assertEqual(f.seek(0), 0)
113 self.assertEqual(f.read(20), b"hello world\n")
114 self.assertEqual(f.read(1), b"")
115 self.assertEqual(f.readinto(bytearray(b"x")), 0)
116 self.assertEqual(f.seek(-6, 2), 6)
117 self.assertEqual(f.read(5), b"world")
118 self.assertEqual(f.read(0), b"")
119 self.assertEqual(f.readinto(bytearray()), 0)
120 self.assertEqual(f.seek(-6, 1), 5)
121 self.assertEqual(f.read(5), b" worl")
122 self.assertEqual(f.tell(), 10)
123 self.assertRaises(TypeError, f.seek, 0.0)
124 if buffered:
125 f.seek(0)
126 self.assertEqual(f.read(), b"hello world\n")
127 f.seek(6)
128 self.assertEqual(f.read(), b"world\n")
129 self.assertEqual(f.read(), b"")
130
131 LARGE = 2**31
132
133 def large_file_ops(self, f):
134 assert f.readable()
135 assert f.writable()
136 self.assertEqual(f.seek(self.LARGE), self.LARGE)
137 self.assertEqual(f.tell(), self.LARGE)
138 self.assertEqual(f.write(b"xxx"), 3)
139 self.assertEqual(f.tell(), self.LARGE + 3)
140 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
141 self.assertEqual(f.truncate(), self.LARGE + 2)
142 self.assertEqual(f.tell(), self.LARGE + 2)
143 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
144 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
147 self.assertEqual(f.seek(-1, 2), self.LARGE)
148 self.assertEqual(f.read(2), b"x")
149
150 def test_raw_file_io(self):
151 f = io.open(test_support.TESTFN, "wb", buffering=0)
152 self.assertEqual(f.readable(), False)
153 self.assertEqual(f.writable(), True)
154 self.assertEqual(f.seekable(), True)
155 self.write_ops(f)
156 f.close()
157 f = io.open(test_support.TESTFN, "rb", buffering=0)
158 self.assertEqual(f.readable(), True)
159 self.assertEqual(f.writable(), False)
160 self.assertEqual(f.seekable(), True)
161 self.read_ops(f)
162 f.close()
163
164 def test_buffered_file_io(self):
165 f = io.open(test_support.TESTFN, "wb")
166 self.assertEqual(f.readable(), False)
167 self.assertEqual(f.writable(), True)
168 self.assertEqual(f.seekable(), True)
169 self.write_ops(f)
170 f.close()
171 f = io.open(test_support.TESTFN, "rb")
172 self.assertEqual(f.readable(), True)
173 self.assertEqual(f.writable(), False)
174 self.assertEqual(f.seekable(), True)
175 self.read_ops(f, True)
176 f.close()
177
178 def test_readline(self):
179 f = io.open(test_support.TESTFN, "wb")
180 f.write(b"abc\ndef\nxyzzy\nfoo")
181 f.close()
182 f = io.open(test_support.TESTFN, "rb")
183 self.assertEqual(f.readline(), b"abc\n")
184 self.assertEqual(f.readline(10), b"def\n")
185 self.assertEqual(f.readline(2), b"xy")
186 self.assertEqual(f.readline(4), b"zzy\n")
187 self.assertEqual(f.readline(), b"foo")
188 f.close()
189
190 def test_raw_bytes_io(self):
191 f = io.BytesIO()
192 self.write_ops(f)
193 data = f.getvalue()
194 self.assertEqual(data, b"hello world\n")
195 f = io.BytesIO(data)
196 self.read_ops(f, True)
197
198 def test_large_file_ops(self):
199 # On Windows and Mac OSX this test comsumes large resources; It takes
200 # a long time to build the >2GB file and takes >2GB of disk space
201 # therefore the resource must be enabled to run this test.
202 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
203 if not test_support.is_resource_enabled("largefile"):
204 print("\nTesting large file ops skipped on %s." % sys.platform,
205 file=sys.stderr)
206 print("It requires %d bytes and a long time." % self.LARGE,
207 file=sys.stderr)
208 print("Use 'regrtest.py -u largefile test_io' to run it.",
209 file=sys.stderr)
210 return
211 f = io.open(test_support.TESTFN, "w+b", 0)
212 self.large_file_ops(f)
213 f.close()
214 f = io.open(test_support.TESTFN, "w+b")
215 self.large_file_ops(f)
216 f.close()
217
218 def test_with_open(self):
219 for bufsize in (0, 1, 100):
220 f = None
221 with open(test_support.TESTFN, "wb", bufsize) as f:
222 f.write(b"xxx")
223 self.assertEqual(f.closed, True)
224 f = None
225 try:
226 with open(test_support.TESTFN, "wb", bufsize) as f:
227 1/0
228 except ZeroDivisionError:
229 self.assertEqual(f.closed, True)
230 else:
231 self.fail("1/0 didn't raise an exception")
232
233 def test_destructor(self):
234 record = []
235 class MyFileIO(io.FileIO):
236 def __del__(self):
237 record.append(1)
238 io.FileIO.__del__(self)
239 def close(self):
240 record.append(2)
241 io.FileIO.close(self)
242 def flush(self):
243 record.append(3)
244 io.FileIO.flush(self)
245 f = MyFileIO(test_support.TESTFN, "w")
246 f.write("xxx")
247 del f
248 self.assertEqual(record, [1, 2, 3])
249
250 def test_close_flushes(self):
251 f = io.open(test_support.TESTFN, "wb")
252 f.write(b"xxx")
253 f.close()
254 f = io.open(test_support.TESTFN, "rb")
255 self.assertEqual(f.read(), b"xxx")
256 f.close()
257
258 def XXXtest_array_writes(self):
259 # XXX memory view not available yet
260 a = array.array('i', range(10))
261 n = len(memoryview(a))
262 f = io.open(test_support.TESTFN, "wb", 0)
263 self.assertEqual(f.write(a), n)
264 f.close()
265 f = io.open(test_support.TESTFN, "wb")
266 self.assertEqual(f.write(a), n)
267 f.close()
268
269 def test_closefd(self):
270 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
271 closefd=False)
272
273class MemorySeekTestMixin:
274
275 def testInit(self):
276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
278
279 def testRead(self):
280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
286 self.assertEquals(self.EOF, bytesIo.read())
287
288 def testReadNoArgs(self):
289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
291
292 self.assertEquals(buf, bytesIo.read())
293 self.assertEquals(self.EOF, bytesIo.read())
294
295 def testSeek(self):
296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
306
307 def testTell(self):
308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
317
318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
322 ioclass = io.BytesIO
323 EOF = b""
324
325
326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
332class BufferedReaderTest(unittest.TestCase):
333
334 def testRead(self):
335 rawio = MockRawIO((b"abc", b"d", b"efg"))
336 bufio = io.BufferedReader(rawio)
337
338 self.assertEquals(b"abcdef", bufio.read(6))
339
340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
353 pos = 0
354 for nbytes in buf_read_sizes:
355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
356 pos += nbytes
357 self.assertEquals(rawio.read_history, raw_read_sizes)
358
359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
369
370 def testReadToEof(self):
371 rawio = MockRawIO((b"abc", b"d", b"efg"))
372 bufio = io.BufferedReader(rawio)
373
374 self.assertEquals(b"abcdefg", bufio.read(9000))
375
376 def testReadNoArgs(self):
377 rawio = MockRawIO((b"abc", b"d", b"efg"))
378 bufio = io.BufferedReader(rawio)
379
380 self.assertEquals(b"abcdefg", bufio.read())
381
382 def testFileno(self):
383 rawio = MockRawIO((b"abc", b"d", b"efg"))
384 bufio = io.BufferedReader(rawio)
385
386 self.assertEquals(42, bufio.fileno())
387
388 def testFilenoNoFileno(self):
389 # XXX will we always have fileno() function? If so, kill
390 # this test. Else, write it.
391 pass
392
393
394class BufferedWriterTest(unittest.TestCase):
395
396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
398 writer = MockRawIO()
399 bufio = io.BufferedWriter(writer, 8)
400
401 bufio.write(b"abc")
402
403 self.assertFalse(writer._write_stack)
404
405 def testWriteOverflow(self):
406 writer = MockRawIO()
407 bufio = io.BufferedWriter(writer, 8)
408
409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
411
412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
413
414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
416 bufio = io.BufferedWriter(raw, 8, 16)
417
418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
422 bufio.write(b"asdfasdfasdf")
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
424 bufio.write(b"asdfasdfasdf")
425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
428 bufio.write(b"asdfasdfasdf")
429
430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
433
434 def testFileno(self):
435 rawio = MockRawIO((b"abc", b"d", b"efg"))
436 bufio = io.BufferedWriter(rawio)
437
438 self.assertEquals(42, bufio.fileno())
439
440 def testFlush(self):
441 writer = MockRawIO()
442 bufio = io.BufferedWriter(writer, 8)
443
444 bufio.write(b"abc")
445 bufio.flush()
446
447 self.assertEquals(b"abc", writer._write_stack[0])
448
449
450class BufferedRWPairTest(unittest.TestCase):
451
452 def testRWPair(self):
453 r = MockRawIO(())
454 w = MockRawIO()
455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
459
460class BufferedRandomTest(unittest.TestCase):
461
462 def testReadAndWrite(self):
463 raw = MockRawIO((b"asdf", b"ghjk"))
464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
469 self.assertFalse(raw._write_stack) # Buffer writes
470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
471 self.assertEquals(b"dddeee", raw._write_stack[0])
472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
491 self.assertRaises(TypeError, rw.seek, 0.0)
492
493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
523 codecs.IncrementalDecoder.__init__(self, errors)
524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
548 if b == ord('.'):
549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
563 if self.buffer[0] == ord('i'):
564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
565 elif self.buffer[0] == ord('o'):
566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
577class StatefulIncrementalDecoderTest(unittest.TestCase):
578 """
579 Make sure the StatefulIncrementalDecoder actually works.
580 """
581
582 test_cases = [
583 # I=1, O=1 (fixed-length input == fixed-length output)
584 (b'abcd', False, 'a.b.c.d.'),
585 # I=0, O=0 (variable-length input, variable-length output)
586 (b'oiabcd', True, 'abcd.'),
587 # I=0, O=0 (should ignore extra periods)
588 (b'oi...abcd...', True, 'abcd.'),
589 # I=0, O=6 (variable-length input, fixed-length output)
590 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
591 # I=2, O=6 (fixed-length input < fixed-length output)
592 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
593 # I=6, O=3 (fixed-length input > fixed-length output)
594 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
595 # I=0, then 3; O=29, then 15 (with longer output)
596 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
597 'a----------------------------.' +
598 'b----------------------------.' +
599 'cde--------------------------.' +
600 'abcdefghijabcde.' +
601 'a.b------------.' +
602 '.c.------------.' +
603 'd.e------------.' +
604 'k--------------.' +
605 'l--------------.' +
606 'm--------------.')
607 ]
608
609 def testDecoder(self):
610 # Try a few one-shot test cases.
611 for input, eof, output in self.test_cases:
612 d = StatefulIncrementalDecoder()
613 self.assertEquals(d.decode(input, eof), output)
614
615 # Also test an unfinished decode, followed by forcing EOF.
616 d = StatefulIncrementalDecoder()
617 self.assertEquals(d.decode(b'oiabcd'), '')
618 self.assertEquals(d.decode(b'', 1), 'abcd.')
619
620class TextIOWrapperTest(unittest.TestCase):
621
622 def setUp(self):
623 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
624 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
625
626 def tearDown(self):
627 test_support.unlink(test_support.TESTFN)
628
629 def testLineBuffering(self):
630 r = io.BytesIO()
631 b = io.BufferedWriter(r, 1000)
632 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
633 t.write(u"X")
634 self.assertEquals(r.getvalue(), b"") # No flush happened
635 t.write(u"Y\nZ")
636 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
637 t.write(u"A\rB")
638 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
639
640 def testEncodingErrorsReading(self):
641 # (1) default
642 b = io.BytesIO(b"abc\n\xff\n")
643 t = io.TextIOWrapper(b, encoding="ascii")
644 self.assertRaises(UnicodeError, t.read)
645 # (2) explicit strict
646 b = io.BytesIO(b"abc\n\xff\n")
647 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
648 self.assertRaises(UnicodeError, t.read)
649 # (3) ignore
650 b = io.BytesIO(b"abc\n\xff\n")
651 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
652 self.assertEquals(t.read(), "abc\n\n")
653 # (4) replace
654 b = io.BytesIO(b"abc\n\xff\n")
655 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
656 self.assertEquals(t.read(), u"abc\n\ufffd\n")
657
658 def testEncodingErrorsWriting(self):
659 # (1) default
660 b = io.BytesIO()
661 t = io.TextIOWrapper(b, encoding="ascii")
662 self.assertRaises(UnicodeError, t.write, u"\xff")
663 # (2) explicit strict
664 b = io.BytesIO()
665 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
666 self.assertRaises(UnicodeError, t.write, u"\xff")
667 # (3) ignore
668 b = io.BytesIO()
669 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
670 newline="\n")
671 t.write(u"abc\xffdef\n")
672 t.flush()
673 self.assertEquals(b.getvalue(), b"abcdef\n")
674 # (4) replace
675 b = io.BytesIO()
676 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
677 newline="\n")
678 t.write(u"abc\xffdef\n")
679 t.flush()
680 self.assertEquals(b.getvalue(), b"abc?def\n")
681
682 def testNewlinesInput(self):
683 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
684 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
685 for newline, expected in [
686 (None, normalized.decode("ascii").splitlines(True)),
687 ("", testdata.decode("ascii").splitlines(True)),
688 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
689 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
690 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
691 ]:
692 buf = io.BytesIO(testdata)
693 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
694 self.assertEquals(txt.readlines(), expected)
695 txt.seek(0)
696 self.assertEquals(txt.read(), "".join(expected))
697
698 def testNewlinesOutput(self):
699 testdict = {
700 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
701 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
702 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
703 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
704 }
705 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
706 for newline, expected in tests:
707 buf = io.BytesIO()
708 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
709 txt.write("AAA\nB")
710 txt.write("BB\nCCC\n")
711 txt.write("X\rY\r\nZ")
712 txt.flush()
713 self.assertEquals(buf.getvalue(), expected)
714
715 def testNewlines(self):
716 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
717
718 tests = [
719 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
720 [ '', input_lines ],
721 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
722 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
723 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
724 ]
725
726 encodings = ('utf-8', 'latin-1')
727
728 # Try a range of buffer sizes to test the case where \r is the last
729 # character in TextIOWrapper._pending_line.
730 for encoding in encodings:
731 # XXX: str.encode() should return bytes
732 data = bytes(''.join(input_lines).encode(encoding))
733 for do_reads in (False, True):
734 for bufsize in range(1, 10):
735 for newline, exp_lines in tests:
736 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
737 textio = io.TextIOWrapper(bufio, newline=newline,
738 encoding=encoding)
739 if do_reads:
740 got_lines = []
741 while True:
742 c2 = textio.read(2)
743 if c2 == '':
744 break
745 self.assertEquals(len(c2), 2)
746 got_lines.append(c2 + textio.readline())
747 else:
748 got_lines = list(textio)
749
750 for got_line, exp_line in zip(got_lines, exp_lines):
751 self.assertEquals(got_line, exp_line)
752 self.assertEquals(len(got_lines), len(exp_lines))
753
754 def testNewlinesInput(self):
755 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
756 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
757 for newline, expected in [
758 (None, normalized.decode("ascii").splitlines(True)),
759 ("", testdata.decode("ascii").splitlines(True)),
760 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
761 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
762 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
763 ]:
764 buf = io.BytesIO(testdata)
765 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
766 self.assertEquals(txt.readlines(), expected)
767 txt.seek(0)
768 self.assertEquals(txt.read(), "".join(expected))
769
770 def testNewlinesOutput(self):
771 data = u"AAA\nBBB\rCCC\n"
772 data_lf = b"AAA\nBBB\rCCC\n"
773 data_cr = b"AAA\rBBB\rCCC\r"
774 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
775 save_linesep = os.linesep
776 try:
777 for os.linesep, newline, expected in [
778 ("\n", None, data_lf),
779 ("\r\n", None, data_crlf),
780 ("\n", "", data_lf),
781 ("\r\n", "", data_lf),
782 ("\n", "\n", data_lf),
783 ("\r\n", "\n", data_lf),
784 ("\n", "\r", data_cr),
785 ("\r\n", "\r", data_cr),
786 ("\n", "\r\n", data_crlf),
787 ("\r\n", "\r\n", data_crlf),
788 ]:
789 buf = io.BytesIO()
790 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
791 txt.write(data)
792 txt.close()
793 self.assertEquals(buf.getvalue(), expected)
794 finally:
795 os.linesep = save_linesep
796
797 # Systematic tests of the text I/O API
798
799 def testBasicIO(self):
800 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
801 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
802 f = io.open(test_support.TESTFN, "w+", encoding=enc)
803 f._CHUNK_SIZE = chunksize
804 self.assertEquals(f.write(u"abc"), 3)
805 f.close()
806 f = io.open(test_support.TESTFN, "r+", encoding=enc)
807 f._CHUNK_SIZE = chunksize
808 self.assertEquals(f.tell(), 0)
809 self.assertEquals(f.read(), u"abc")
810 cookie = f.tell()
811 self.assertEquals(f.seek(0), 0)
812 self.assertEquals(f.read(2), u"ab")
813 self.assertEquals(f.read(1), u"c")
814 self.assertEquals(f.read(1), u"")
815 self.assertEquals(f.read(), u"")
816 self.assertEquals(f.tell(), cookie)
817 self.assertEquals(f.seek(0), 0)
818 self.assertEquals(f.seek(0, 2), cookie)
819 self.assertEquals(f.write(u"def"), 3)
820 self.assertEquals(f.seek(cookie), cookie)
821 self.assertEquals(f.read(), u"def")
822 if enc.startswith("utf"):
823 self.multi_line_test(f, enc)
824 f.close()
825
826 def multi_line_test(self, f, enc):
827 f.seek(0)
828 f.truncate()
829 sample = u"s\xff\u0fff\uffff"
830 wlines = []
831 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
832 chars = []
833 for i in range(size):
834 chars.append(sample[i % len(sample)])
835 line = u"".join(chars) + u"\n"
836 wlines.append((f.tell(), line))
837 f.write(line)
838 f.seek(0)
839 rlines = []
840 while True:
841 pos = f.tell()
842 line = f.readline()
843 if not line:
844 break
845 rlines.append((pos, line))
846 self.assertEquals(rlines, wlines)
847
848 def testTelling(self):
849 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
850 p0 = f.tell()
851 f.write(u"\xff\n")
852 p1 = f.tell()
853 f.write(u"\xff\n")
854 p2 = f.tell()
855 f.seek(0)
856 self.assertEquals(f.tell(), p0)
857 self.assertEquals(f.readline(), u"\xff\n")
858 self.assertEquals(f.tell(), p1)
859 self.assertEquals(f.readline(), u"\xff\n")
860 self.assertEquals(f.tell(), p2)
861 f.seek(0)
862 for line in f:
863 self.assertEquals(line, u"\xff\n")
864 self.assertRaises(IOError, f.tell)
865 self.assertEquals(f.tell(), p2)
866 f.close()
867
868 def testSeeking(self):
869 chunk_size = io.TextIOWrapper._CHUNK_SIZE
870 prefix_size = chunk_size - 2
871 u_prefix = "a" * prefix_size
872 prefix = bytes(u_prefix.encode("utf-8"))
873 self.assertEquals(len(u_prefix), len(prefix))
874 u_suffix = "\u8888\n"
875 suffix = bytes(u_suffix.encode("utf-8"))
876 line = prefix + suffix
877 f = io.open(test_support.TESTFN, "wb")
878 f.write(line*2)
879 f.close()
880 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
881 s = f.read(prefix_size)
882 self.assertEquals(s, unicode(prefix, "ascii"))
883 self.assertEquals(f.tell(), prefix_size)
884 self.assertEquals(f.readline(), u_suffix)
885
886 def testSeekingToo(self):
887 # Regression test for a specific bug
888 data = b'\xe0\xbf\xbf\n'
889 f = io.open(test_support.TESTFN, "wb")
890 f.write(data)
891 f.close()
892 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
893 f._CHUNK_SIZE # Just test that it exists
894 f._CHUNK_SIZE = 2
895 f.readline()
896 f.tell()
897
898 # FIXME: figure out why the test fails with Python 2.6
899 def XXXtestSeekAndTell(self):
900 """Test seek/tell using the StatefulIncrementalDecoder."""
901
902 def lookupTestDecoder(name):
903 if self.codecEnabled and name == 'test_decoder':
904 return codecs.CodecInfo(
905 name='test_decoder', encode=None, decode=None,
906 incrementalencoder=None,
907 streamreader=None, streamwriter=None,
908 incrementaldecoder=StatefulIncrementalDecoder)
909
910 def testSeekAndTellWithData(data, min_pos=0):
911 """Tell/seek to various points within a data stream and ensure
912 that the decoded data returned by read() is consistent."""
913 f = io.open(test_support.TESTFN, 'wb')
914 f.write(data)
915 f.close()
916 f = io.open(test_support.TESTFN, encoding='test_decoder')
917 decoded = f.read()
918 f.close()
919
920 for i in range(min_pos, len(decoded) + 1): # seek positions
921 for j in [1, 5, len(decoded) - i]: # read lengths
922 f = io.open(test_support.TESTFN, encoding='test_decoder')
923 self.assertEquals(f.read(i), decoded[:i])
924 cookie = f.tell()
925 self.assertEquals(f.read(j), decoded[i:i + j])
926 f.seek(cookie)
927 self.assertEquals(f.read(), decoded[i:])
928 f.close()
929
930 # Register a special incremental decoder for testing.
931 codecs.register(lookupTestDecoder)
932 self.codecEnabled = 1
933
934 # Run the tests.
935 try:
936 # Try each test case.
937 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
938 testSeekAndTellWithData(input)
939
940 # Position each test case so that it crosses a chunk boundary.
941 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
942 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
943 offset = CHUNK_SIZE - len(input)//2
944 prefix = b'.'*offset
945 # Don't bother seeking into the prefix (takes too long).
946 min_pos = offset*2
947 testSeekAndTellWithData(prefix + input, min_pos)
948
949 # Ensure our test decoder won't interfere with subsequent tests.
950 finally:
951 self.codecEnabled = 0
952
953 def testEncodedWrites(self):
954 data = u"1234567890"
955 tests = ("utf-16",
956 "utf-16-le",
957 "utf-16-be",
958 "utf-32",
959 "utf-32-le",
960 "utf-32-be")
961 for encoding in tests:
962 buf = io.BytesIO()
963 f = io.TextIOWrapper(buf, encoding=encoding)
964 # Check if the BOM is written only once (see issue1753).
965 f.write(data)
966 f.write(data)
967 f.seek(0)
968 self.assertEquals(f.read(), data * 2)
969 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
970
971 def timingTest(self):
972 timer = time.time
973 enc = "utf8"
974 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
975 nlines = 10000
976 nchars = len(line)
977 nbytes = len(line.encode(enc))
978 for chunk_size in (32, 64, 128, 256):
979 f = io.open(test_support.TESTFN, "w+", encoding=enc)
980 f._CHUNK_SIZE = chunk_size
981 t0 = timer()
982 for i in range(nlines):
983 f.write(line)
984 f.flush()
985 t1 = timer()
986 f.seek(0)
987 for line in f:
988 pass
989 t2 = timer()
990 f.seek(0)
991 while f.readline():
992 pass
993 t3 = timer()
994 f.seek(0)
995 while f.readline():
996 f.tell()
997 t4 = timer()
998 f.close()
999 if test_support.verbose:
1000 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1001 (nlines, nchars, nbytes))
1002 print("File chunk size: %6s" % f._CHUNK_SIZE)
1003 print("Writing: %6.3f seconds" % (t1-t0))
1004 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1005 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1006 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1007
1008 def testReadOneByOne(self):
1009 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1010 reads = ""
1011 while True:
1012 c = txt.read(1)
1013 if not c:
1014 break
1015 reads += c
1016 self.assertEquals(reads, "AA\nBB")
1017
1018 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1019 def testReadByChunk(self):
1020 # make sure "\r\n" straddles 128 char boundary.
1021 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1022 reads = ""
1023 while True:
1024 c = txt.read(128)
1025 if not c:
1026 break
1027 reads += c
1028 self.assertEquals(reads, "A"*127+"\nB")
1029
1030 def test_issue1395_1(self):
1031 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1032
1033 # read one char at a time
1034 reads = ""
1035 while True:
1036 c = txt.read(1)
1037 if not c:
1038 break
1039 reads += c
1040 self.assertEquals(reads, self.normalized)
1041
1042 def test_issue1395_2(self):
1043 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1044 txt._CHUNK_SIZE = 4
1045
1046 reads = ""
1047 while True:
1048 c = txt.read(4)
1049 if not c:
1050 break
1051 reads += c
1052 self.assertEquals(reads, self.normalized)
1053
1054 def test_issue1395_3(self):
1055 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1056 txt._CHUNK_SIZE = 4
1057
1058 reads = txt.read(4)
1059 reads += txt.read(4)
1060 reads += txt.readline()
1061 reads += txt.readline()
1062 reads += txt.readline()
1063 self.assertEquals(reads, self.normalized)
1064
1065 def test_issue1395_4(self):
1066 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1067 txt._CHUNK_SIZE = 4
1068
1069 reads = txt.read(4)
1070 reads += txt.read()
1071 self.assertEquals(reads, self.normalized)
1072
1073 def test_issue1395_5(self):
1074 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1075 txt._CHUNK_SIZE = 4
1076
1077 reads = txt.read(4)
1078 pos = txt.tell()
1079 txt.seek(0)
1080 txt.seek(pos)
1081 self.assertEquals(txt.read(4), "BBB\n")
1082
1083 def test_issue2282(self):
1084 buffer = io.BytesIO(self.testdata)
1085 txt = io.TextIOWrapper(buffer, encoding="ascii")
1086
1087 self.assertEqual(buffer.seekable(), txt.seekable())
1088
1089 def test_newline_decoder(self):
1090 import codecs
1091 decoder = codecs.getincrementaldecoder("utf-8")()
1092 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1093
1094 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1095
1096 self.assertEquals(decoder.decode(b'\xe8'), u"")
1097 self.assertEquals(decoder.decode(b'\xa2'), u"")
1098 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1099
1100 self.assertEquals(decoder.decode(b'\xe8'), u"")
1101 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1102
1103 decoder.setstate((b'', 0))
1104 self.assertEquals(decoder.decode(b'\n'), u"\n")
1105 self.assertEquals(decoder.decode(b'\r'), u"")
1106 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1107 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1108
1109 self.assertEquals(decoder.decode(b'\r'), u"")
1110 self.assertEquals(decoder.decode(b'a'), u"\na")
1111
1112 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1113 self.assertEquals(decoder.decode(b'\r'), u"")
1114 self.assertEquals(decoder.decode(b'\r'), u"\n")
1115 self.assertEquals(decoder.decode(b'\na'), u"\na")
1116
1117 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1118 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1119 self.assertEquals(decoder.decode(b'\n'), u"\n")
1120 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1121 self.assertEquals(decoder.decode(b'\n'), u"\n")
1122
1123 decoder = codecs.getincrementaldecoder("utf-8")()
1124 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1125 self.assertEquals(decoder.newlines, None)
1126 decoder.decode(b"abc\n\r")
1127 self.assertEquals(decoder.newlines, u'\n')
1128 decoder.decode(b"\nabc")
1129 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1130 decoder.decode(b"abc\r")
1131 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1132 decoder.decode(b"abc")
1133 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1134 decoder.decode(b"abc\r")
1135 decoder.reset()
1136 self.assertEquals(decoder.decode(b"abc"), "abc")
1137 self.assertEquals(decoder.newlines, None)
1138
1139# XXX Tests for open()
1140
1141class MiscIOTest(unittest.TestCase):
1142
1143 def testImport__all__(self):
1144 for name in io.__all__:
1145 obj = getattr(io, name, None)
1146 self.assert_(obj is not None, name)
1147 if name == "open":
1148 continue
1149 elif "error" in name.lower():
1150 self.assert_(issubclass(obj, Exception), name)
1151 else:
1152 self.assert_(issubclass(obj, io.IOBase))
1153
1154
1155def test_main():
1156 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1157 BufferedReaderTest,
1158 BufferedWriterTest, BufferedRWPairTest,
1159 BufferedRandomTest, TextIOWrapperTest,
1160 MiscIOTest)
1161
1162if __name__ == "__main__":
1163 unittest.main()