blob: 1791705efde7245c68386fbd1566f841c7614ac5 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
3
4import os
5import sys
6import time
7import array
8import unittest
9from itertools import chain
10from test import test_support
11
12import codecs
13import io # The module under test
14
15
16class MockRawIO(io.RawIOBase):
17
18 def __init__(self, read_stack=()):
19 self._read_stack = list(read_stack)
20 self._write_stack = []
21
22 def read(self, n=None):
23 try:
24 return self._read_stack.pop(0)
25 except:
26 return b""
27
28 def write(self, b):
29 self._write_stack.append(b[:])
30 return len(b)
31
32 def writable(self):
33 return True
34
35 def fileno(self):
36 return 42
37
38 def readable(self):
39 return True
40
41 def seekable(self):
42 return True
43
44 def seek(self, pos, whence):
45 pass
46
47 def tell(self):
48 return 42
49
50
51class MockFileIO(io.BytesIO):
52
53 def __init__(self, data):
54 self.read_history = []
55 io.BytesIO.__init__(self, data)
56
57 def read(self, n=None):
58 res = io.BytesIO.read(self, n)
59 self.read_history.append(None if res is None else len(res))
60 return res
61
62
63class MockNonBlockWriterIO(io.RawIOBase):
64
65 def __init__(self, blocking_script):
66 self._blocking_script = list(blocking_script)
67 self._write_stack = []
68
69 def write(self, b):
70 self._write_stack.append(b[:])
71 n = self._blocking_script.pop(0)
72 if (n < 0):
73 raise io.BlockingIOError(0, "test blocking", -n)
74 else:
75 return n
76
77 def writable(self):
78 return True
79
80
81class IOTest(unittest.TestCase):
82
83 def tearDown(self):
84 test_support.unlink(test_support.TESTFN)
85
86 def write_ops(self, f):
87 self.assertEqual(f.write(b"blah."), 5)
88 self.assertEqual(f.seek(0), 0)
89 self.assertEqual(f.write(b"Hello."), 6)
90 self.assertEqual(f.tell(), 6)
91 self.assertEqual(f.seek(-1, 1), 5)
92 self.assertEqual(f.tell(), 5)
93 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
94 self.assertEqual(f.seek(0), 0)
95 self.assertEqual(f.write(b"h"), 1)
96 self.assertEqual(f.seek(-1, 2), 13)
97 self.assertEqual(f.tell(), 13)
98 self.assertEqual(f.truncate(12), 12)
99 self.assertEqual(f.tell(), 13)
100 self.assertRaises(TypeError, f.seek, 0.0)
101
102 def read_ops(self, f, buffered=False):
103 data = f.read(5)
104 self.assertEqual(data, b"hello")
105 data = bytearray(data)
106 self.assertEqual(f.readinto(data), 5)
107 self.assertEqual(data, b" worl")
108 self.assertEqual(f.readinto(data), 2)
109 self.assertEqual(len(data), 5)
110 self.assertEqual(data[:2], b"d\n")
111 self.assertEqual(f.seek(0), 0)
112 self.assertEqual(f.read(20), b"hello world\n")
113 self.assertEqual(f.read(1), b"")
114 self.assertEqual(f.readinto(bytearray(b"x")), 0)
115 self.assertEqual(f.seek(-6, 2), 6)
116 self.assertEqual(f.read(5), b"world")
117 self.assertEqual(f.read(0), b"")
118 self.assertEqual(f.readinto(bytearray()), 0)
119 self.assertEqual(f.seek(-6, 1), 5)
120 self.assertEqual(f.read(5), b" worl")
121 self.assertEqual(f.tell(), 10)
122 self.assertRaises(TypeError, f.seek, 0.0)
123 if buffered:
124 f.seek(0)
125 self.assertEqual(f.read(), b"hello world\n")
126 f.seek(6)
127 self.assertEqual(f.read(), b"world\n")
128 self.assertEqual(f.read(), b"")
129
130 LARGE = 2**31
131
132 def large_file_ops(self, f):
133 assert f.readable()
134 assert f.writable()
135 self.assertEqual(f.seek(self.LARGE), self.LARGE)
136 self.assertEqual(f.tell(), self.LARGE)
137 self.assertEqual(f.write(b"xxx"), 3)
138 self.assertEqual(f.tell(), self.LARGE + 3)
139 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
140 self.assertEqual(f.truncate(), self.LARGE + 2)
141 self.assertEqual(f.tell(), self.LARGE + 2)
142 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
143 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
144 self.assertEqual(f.tell(), self.LARGE + 2)
145 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
146 self.assertEqual(f.seek(-1, 2), self.LARGE)
147 self.assertEqual(f.read(2), b"x")
148
149 def test_raw_file_io(self):
150 f = io.open(test_support.TESTFN, "wb", buffering=0)
151 self.assertEqual(f.readable(), False)
152 self.assertEqual(f.writable(), True)
153 self.assertEqual(f.seekable(), True)
154 self.write_ops(f)
155 f.close()
156 f = io.open(test_support.TESTFN, "rb", buffering=0)
157 self.assertEqual(f.readable(), True)
158 self.assertEqual(f.writable(), False)
159 self.assertEqual(f.seekable(), True)
160 self.read_ops(f)
161 f.close()
162
163 def test_buffered_file_io(self):
164 f = io.open(test_support.TESTFN, "wb")
165 self.assertEqual(f.readable(), False)
166 self.assertEqual(f.writable(), True)
167 self.assertEqual(f.seekable(), True)
168 self.write_ops(f)
169 f.close()
170 f = io.open(test_support.TESTFN, "rb")
171 self.assertEqual(f.readable(), True)
172 self.assertEqual(f.writable(), False)
173 self.assertEqual(f.seekable(), True)
174 self.read_ops(f, True)
175 f.close()
176
177 def test_readline(self):
178 f = io.open(test_support.TESTFN, "wb")
179 f.write(b"abc\ndef\nxyzzy\nfoo")
180 f.close()
181 f = io.open(test_support.TESTFN, "rb")
182 self.assertEqual(f.readline(), b"abc\n")
183 self.assertEqual(f.readline(10), b"def\n")
184 self.assertEqual(f.readline(2), b"xy")
185 self.assertEqual(f.readline(4), b"zzy\n")
186 self.assertEqual(f.readline(), b"foo")
187 f.close()
188
189 def test_raw_bytes_io(self):
190 f = io.BytesIO()
191 self.write_ops(f)
192 data = f.getvalue()
193 self.assertEqual(data, b"hello world\n")
194 f = io.BytesIO(data)
195 self.read_ops(f, True)
196
197 def test_large_file_ops(self):
198 # On Windows and Mac OSX this test comsumes large resources; It takes
199 # a long time to build the >2GB file and takes >2GB of disk space
200 # therefore the resource must be enabled to run this test.
201 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
202 if not test_support.is_resource_enabled("largefile"):
203 print("\nTesting large file ops skipped on %s." % sys.platform,
204 file=sys.stderr)
205 print("It requires %d bytes and a long time." % self.LARGE,
206 file=sys.stderr)
207 print("Use 'regrtest.py -u largefile test_io' to run it.",
208 file=sys.stderr)
209 return
210 f = io.open(test_support.TESTFN, "w+b", 0)
211 self.large_file_ops(f)
212 f.close()
213 f = io.open(test_support.TESTFN, "w+b")
214 self.large_file_ops(f)
215 f.close()
216
217 def test_with_open(self):
218 for bufsize in (0, 1, 100):
219 f = None
220 with open(test_support.TESTFN, "wb", bufsize) as f:
221 f.write(b"xxx")
222 self.assertEqual(f.closed, True)
223 f = None
224 try:
225 with open(test_support.TESTFN, "wb", bufsize) as f:
226 1/0
227 except ZeroDivisionError:
228 self.assertEqual(f.closed, True)
229 else:
230 self.fail("1/0 didn't raise an exception")
231
232 def test_destructor(self):
233 record = []
234 class MyFileIO(io.FileIO):
235 def __del__(self):
236 record.append(1)
237 io.FileIO.__del__(self)
238 def close(self):
239 record.append(2)
240 io.FileIO.close(self)
241 def flush(self):
242 record.append(3)
243 io.FileIO.flush(self)
244 f = MyFileIO(test_support.TESTFN, "w")
245 f.write("xxx")
246 del f
247 self.assertEqual(record, [1, 2, 3])
248
249 def test_close_flushes(self):
250 f = io.open(test_support.TESTFN, "wb")
251 f.write(b"xxx")
252 f.close()
253 f = io.open(test_support.TESTFN, "rb")
254 self.assertEqual(f.read(), b"xxx")
255 f.close()
256
257 def XXXtest_array_writes(self):
258 # XXX memory view not available yet
259 a = array.array('i', range(10))
260 n = len(memoryview(a))
261 f = io.open(test_support.TESTFN, "wb", 0)
262 self.assertEqual(f.write(a), n)
263 f.close()
264 f = io.open(test_support.TESTFN, "wb")
265 self.assertEqual(f.write(a), n)
266 f.close()
267
268 def test_closefd(self):
269 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
270 closefd=False)
271
272class MemorySeekTestMixin:
273
274 def testInit(self):
275 buf = self.buftype("1234567890")
276 bytesIo = self.ioclass(buf)
277
278 def testRead(self):
279 buf = self.buftype("1234567890")
280 bytesIo = self.ioclass(buf)
281
282 self.assertEquals(buf[:1], bytesIo.read(1))
283 self.assertEquals(buf[1:5], bytesIo.read(4))
284 self.assertEquals(buf[5:], bytesIo.read(900))
285 self.assertEquals(self.EOF, bytesIo.read())
286
287 def testReadNoArgs(self):
288 buf = self.buftype("1234567890")
289 bytesIo = self.ioclass(buf)
290
291 self.assertEquals(buf, bytesIo.read())
292 self.assertEquals(self.EOF, bytesIo.read())
293
294 def testSeek(self):
295 buf = self.buftype("1234567890")
296 bytesIo = self.ioclass(buf)
297
298 bytesIo.read(5)
299 bytesIo.seek(0)
300 self.assertEquals(buf, bytesIo.read())
301
302 bytesIo.seek(3)
303 self.assertEquals(buf[3:], bytesIo.read())
304 self.assertRaises(TypeError, bytesIo.seek, 0.0)
305
306 def testTell(self):
307 buf = self.buftype("1234567890")
308 bytesIo = self.ioclass(buf)
309
310 self.assertEquals(0, bytesIo.tell())
311 bytesIo.seek(5)
312 self.assertEquals(5, bytesIo.tell())
313 bytesIo.seek(10000)
314 self.assertEquals(10000, bytesIo.tell())
315
316
317class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
318 @staticmethod
319 def buftype(s):
320 return s.encode("utf-8")
321 ioclass = io.BytesIO
322 EOF = b""
323
324
325class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
326 buftype = str
327 ioclass = io.StringIO
328 EOF = ""
329
330
331class BufferedReaderTest(unittest.TestCase):
332
333 def testRead(self):
334 rawio = MockRawIO((b"abc", b"d", b"efg"))
335 bufio = io.BufferedReader(rawio)
336
337 self.assertEquals(b"abcdef", bufio.read(6))
338
339 def testBuffering(self):
340 data = b"abcdefghi"
341 dlen = len(data)
342
343 tests = [
344 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
345 [ 100, [ 3, 3, 3], [ dlen ] ],
346 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
347 ]
348
349 for bufsize, buf_read_sizes, raw_read_sizes in tests:
350 rawio = MockFileIO(data)
351 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
352 pos = 0
353 for nbytes in buf_read_sizes:
354 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
355 pos += nbytes
356 self.assertEquals(rawio.read_history, raw_read_sizes)
357
358 def testReadNonBlocking(self):
359 # Inject some None's in there to simulate EWOULDBLOCK
360 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
361 bufio = io.BufferedReader(rawio)
362
363 self.assertEquals(b"abcd", bufio.read(6))
364 self.assertEquals(b"e", bufio.read(1))
365 self.assertEquals(b"fg", bufio.read())
366 self.assert_(None is bufio.read())
367 self.assertEquals(b"", bufio.read())
368
369 def testReadToEof(self):
370 rawio = MockRawIO((b"abc", b"d", b"efg"))
371 bufio = io.BufferedReader(rawio)
372
373 self.assertEquals(b"abcdefg", bufio.read(9000))
374
375 def testReadNoArgs(self):
376 rawio = MockRawIO((b"abc", b"d", b"efg"))
377 bufio = io.BufferedReader(rawio)
378
379 self.assertEquals(b"abcdefg", bufio.read())
380
381 def testFileno(self):
382 rawio = MockRawIO((b"abc", b"d", b"efg"))
383 bufio = io.BufferedReader(rawio)
384
385 self.assertEquals(42, bufio.fileno())
386
387 def testFilenoNoFileno(self):
388 # XXX will we always have fileno() function? If so, kill
389 # this test. Else, write it.
390 pass
391
392
393class BufferedWriterTest(unittest.TestCase):
394
395 def testWrite(self):
396 # Write to the buffered IO but don't overflow the buffer.
397 writer = MockRawIO()
398 bufio = io.BufferedWriter(writer, 8)
399
400 bufio.write(b"abc")
401
402 self.assertFalse(writer._write_stack)
403
404 def testWriteOverflow(self):
405 writer = MockRawIO()
406 bufio = io.BufferedWriter(writer, 8)
407
408 bufio.write(b"abc")
409 bufio.write(b"defghijkl")
410
411 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
412
413 def testWriteNonBlocking(self):
414 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
415 bufio = io.BufferedWriter(raw, 8, 16)
416
417 bufio.write(b"asdf")
418 bufio.write(b"asdfa")
419 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
420
421 bufio.write(b"asdfasdfasdf")
422 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
423 bufio.write(b"asdfasdfasdf")
424 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
425 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
426
427 bufio.write(b"asdfasdfasdf")
428
429 # XXX I don't like this test. It relies too heavily on how the
430 # algorithm actually works, which we might change. Refactor
431 # later.
432
433 def testFileno(self):
434 rawio = MockRawIO((b"abc", b"d", b"efg"))
435 bufio = io.BufferedWriter(rawio)
436
437 self.assertEquals(42, bufio.fileno())
438
439 def testFlush(self):
440 writer = MockRawIO()
441 bufio = io.BufferedWriter(writer, 8)
442
443 bufio.write(b"abc")
444 bufio.flush()
445
446 self.assertEquals(b"abc", writer._write_stack[0])
447
448
449class BufferedRWPairTest(unittest.TestCase):
450
451 def testRWPair(self):
452 r = MockRawIO(())
453 w = MockRawIO()
454 pair = io.BufferedRWPair(r, w)
455
456 # XXX need implementation
457
458
459class BufferedRandomTest(unittest.TestCase):
460
461 def testReadAndWrite(self):
462 raw = MockRawIO((b"asdf", b"ghjk"))
463 rw = io.BufferedRandom(raw, 8, 12)
464
465 self.assertEqual(b"as", rw.read(2))
466 rw.write(b"ddd")
467 rw.write(b"eee")
468 self.assertFalse(raw._write_stack) # Buffer writes
469 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
470 self.assertEquals(b"dddeee", raw._write_stack[0])
471
472 def testSeekAndTell(self):
473 raw = io.BytesIO(b"asdfghjkl")
474 rw = io.BufferedRandom(raw)
475
476 self.assertEquals(b"as", rw.read(2))
477 self.assertEquals(2, rw.tell())
478 rw.seek(0, 0)
479 self.assertEquals(b"asdf", rw.read(4))
480
481 rw.write(b"asdf")
482 rw.seek(0, 0)
483 self.assertEquals(b"asdfasdfl", rw.read())
484 self.assertEquals(9, rw.tell())
485 rw.seek(-4, 2)
486 self.assertEquals(5, rw.tell())
487 rw.seek(2, 1)
488 self.assertEquals(7, rw.tell())
489 self.assertEquals(b"fl", rw.read(11))
490 self.assertRaises(TypeError, rw.seek, 0.0)
491
492# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
493# properties:
494# - A single output character can correspond to many bytes of input.
495# - The number of input bytes to complete the character can be
496# undetermined until the last input byte is received.
497# - The number of input bytes can vary depending on previous input.
498# - A single input byte can correspond to many characters of output.
499# - The number of output characters can be undetermined until the
500# last input byte is received.
501# - The number of output characters can vary depending on previous input.
502
503class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
504 """
505 For testing seek/tell behavior with a stateful, buffering decoder.
506
507 Input is a sequence of words. Words may be fixed-length (length set
508 by input) or variable-length (period-terminated). In variable-length
509 mode, extra periods are ignored. Possible words are:
510 - 'i' followed by a number sets the input length, I (maximum 99).
511 When I is set to 0, words are space-terminated.
512 - 'o' followed by a number sets the output length, O (maximum 99).
513 - Any other word is converted into a word followed by a period on
514 the output. The output word consists of the input word truncated
515 or padded out with hyphens to make its length equal to O. If O
516 is 0, the word is output verbatim without truncating or padding.
517 I and O are initially set to 1. When I changes, any buffered input is
518 re-scanned according to the new I. EOF also terminates the last word.
519 """
520
521 def __init__(self, errors='strict'):
522 codecs.IncrementalDecoder.__init__(self, errors)
523 self.reset()
524
525 def __repr__(self):
526 return '<SID %x>' % id(self)
527
528 def reset(self):
529 self.i = 1
530 self.o = 1
531 self.buffer = bytearray()
532
533 def getstate(self):
534 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
535 return bytes(self.buffer), i*100 + o
536
537 def setstate(self, state):
538 buffer, io = state
539 self.buffer = bytearray(buffer)
540 i, o = divmod(io, 100)
541 self.i, self.o = i ^ 1, o ^ 1
542
543 def decode(self, input, final=False):
544 output = ''
545 for b in input:
546 if self.i == 0: # variable-length, terminated with period
547 if b == ord('.'):
548 if self.buffer:
549 output += self.process_word()
550 else:
551 self.buffer.append(b)
552 else: # fixed-length, terminate after self.i bytes
553 self.buffer.append(b)
554 if len(self.buffer) == self.i:
555 output += self.process_word()
556 if final and self.buffer: # EOF terminates the last word
557 output += self.process_word()
558 return output
559
560 def process_word(self):
561 output = ''
562 if self.buffer[0] == ord('i'):
563 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
564 elif self.buffer[0] == ord('o'):
565 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
566 else:
567 output = self.buffer.decode('ascii')
568 if len(output) < self.o:
569 output += '-'*self.o # pad out with hyphens
570 if self.o:
571 output = output[:self.o] # truncate to output length
572 output += '.'
573 self.buffer = bytearray()
574 return output
575
576class StatefulIncrementalDecoderTest(unittest.TestCase):
577 """
578 Make sure the StatefulIncrementalDecoder actually works.
579 """
580
581 test_cases = [
582 # I=1, O=1 (fixed-length input == fixed-length output)
583 (b'abcd', False, 'a.b.c.d.'),
584 # I=0, O=0 (variable-length input, variable-length output)
585 (b'oiabcd', True, 'abcd.'),
586 # I=0, O=0 (should ignore extra periods)
587 (b'oi...abcd...', True, 'abcd.'),
588 # I=0, O=6 (variable-length input, fixed-length output)
589 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
590 # I=2, O=6 (fixed-length input < fixed-length output)
591 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
592 # I=6, O=3 (fixed-length input > fixed-length output)
593 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
594 # I=0, then 3; O=29, then 15 (with longer output)
595 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
596 'a----------------------------.' +
597 'b----------------------------.' +
598 'cde--------------------------.' +
599 'abcdefghijabcde.' +
600 'a.b------------.' +
601 '.c.------------.' +
602 'd.e------------.' +
603 'k--------------.' +
604 'l--------------.' +
605 'm--------------.')
606 ]
607
608 def testDecoder(self):
609 # Try a few one-shot test cases.
610 for input, eof, output in self.test_cases:
611 d = StatefulIncrementalDecoder()
612 self.assertEquals(d.decode(input, eof), output)
613
614 # Also test an unfinished decode, followed by forcing EOF.
615 d = StatefulIncrementalDecoder()
616 self.assertEquals(d.decode(b'oiabcd'), '')
617 self.assertEquals(d.decode(b'', 1), 'abcd.')
618
619class TextIOWrapperTest(unittest.TestCase):
620
621 def setUp(self):
622 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
623 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
624
625 def tearDown(self):
626 test_support.unlink(test_support.TESTFN)
627
628 def testLineBuffering(self):
629 r = io.BytesIO()
630 b = io.BufferedWriter(r, 1000)
631 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
632 t.write(u"X")
633 self.assertEquals(r.getvalue(), b"") # No flush happened
634 t.write(u"Y\nZ")
635 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
636 t.write(u"A\rB")
637 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
638
639 def testEncodingErrorsReading(self):
640 # (1) default
641 b = io.BytesIO(b"abc\n\xff\n")
642 t = io.TextIOWrapper(b, encoding="ascii")
643 self.assertRaises(UnicodeError, t.read)
644 # (2) explicit strict
645 b = io.BytesIO(b"abc\n\xff\n")
646 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
647 self.assertRaises(UnicodeError, t.read)
648 # (3) ignore
649 b = io.BytesIO(b"abc\n\xff\n")
650 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
651 self.assertEquals(t.read(), "abc\n\n")
652 # (4) replace
653 b = io.BytesIO(b"abc\n\xff\n")
654 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
655 self.assertEquals(t.read(), u"abc\n\ufffd\n")
656
657 def testEncodingErrorsWriting(self):
658 # (1) default
659 b = io.BytesIO()
660 t = io.TextIOWrapper(b, encoding="ascii")
661 self.assertRaises(UnicodeError, t.write, u"\xff")
662 # (2) explicit strict
663 b = io.BytesIO()
664 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
665 self.assertRaises(UnicodeError, t.write, u"\xff")
666 # (3) ignore
667 b = io.BytesIO()
668 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
669 newline="\n")
670 t.write(u"abc\xffdef\n")
671 t.flush()
672 self.assertEquals(b.getvalue(), b"abcdef\n")
673 # (4) replace
674 b = io.BytesIO()
675 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
676 newline="\n")
677 t.write(u"abc\xffdef\n")
678 t.flush()
679 self.assertEquals(b.getvalue(), b"abc?def\n")
680
681 def testNewlinesInput(self):
682 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
683 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
684 for newline, expected in [
685 (None, normalized.decode("ascii").splitlines(True)),
686 ("", testdata.decode("ascii").splitlines(True)),
687 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
688 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
689 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
690 ]:
691 buf = io.BytesIO(testdata)
692 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
693 self.assertEquals(txt.readlines(), expected)
694 txt.seek(0)
695 self.assertEquals(txt.read(), "".join(expected))
696
697 def testNewlinesOutput(self):
698 testdict = {
699 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
700 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
701 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
702 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
703 }
704 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
705 for newline, expected in tests:
706 buf = io.BytesIO()
707 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
708 txt.write("AAA\nB")
709 txt.write("BB\nCCC\n")
710 txt.write("X\rY\r\nZ")
711 txt.flush()
712 self.assertEquals(buf.getvalue(), expected)
713
714 def testNewlines(self):
715 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
716
717 tests = [
718 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
719 [ '', input_lines ],
720 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
721 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
722 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
723 ]
724
725 encodings = ('utf-8', 'latin-1')
726
727 # Try a range of buffer sizes to test the case where \r is the last
728 # character in TextIOWrapper._pending_line.
729 for encoding in encodings:
730 # XXX: str.encode() should return bytes
731 data = bytes(''.join(input_lines).encode(encoding))
732 for do_reads in (False, True):
733 for bufsize in range(1, 10):
734 for newline, exp_lines in tests:
735 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
736 textio = io.TextIOWrapper(bufio, newline=newline,
737 encoding=encoding)
738 if do_reads:
739 got_lines = []
740 while True:
741 c2 = textio.read(2)
742 if c2 == '':
743 break
744 self.assertEquals(len(c2), 2)
745 got_lines.append(c2 + textio.readline())
746 else:
747 got_lines = list(textio)
748
749 for got_line, exp_line in zip(got_lines, exp_lines):
750 self.assertEquals(got_line, exp_line)
751 self.assertEquals(len(got_lines), len(exp_lines))
752
753 def testNewlinesInput(self):
754 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
755 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
756 for newline, expected in [
757 (None, normalized.decode("ascii").splitlines(True)),
758 ("", testdata.decode("ascii").splitlines(True)),
759 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
760 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
761 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
762 ]:
763 buf = io.BytesIO(testdata)
764 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
765 self.assertEquals(txt.readlines(), expected)
766 txt.seek(0)
767 self.assertEquals(txt.read(), "".join(expected))
768
769 def testNewlinesOutput(self):
770 data = u"AAA\nBBB\rCCC\n"
771 data_lf = b"AAA\nBBB\rCCC\n"
772 data_cr = b"AAA\rBBB\rCCC\r"
773 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
774 save_linesep = os.linesep
775 try:
776 for os.linesep, newline, expected in [
777 ("\n", None, data_lf),
778 ("\r\n", None, data_crlf),
779 ("\n", "", data_lf),
780 ("\r\n", "", data_lf),
781 ("\n", "\n", data_lf),
782 ("\r\n", "\n", data_lf),
783 ("\n", "\r", data_cr),
784 ("\r\n", "\r", data_cr),
785 ("\n", "\r\n", data_crlf),
786 ("\r\n", "\r\n", data_crlf),
787 ]:
788 buf = io.BytesIO()
789 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
790 txt.write(data)
791 txt.close()
792 self.assertEquals(buf.getvalue(), expected)
793 finally:
794 os.linesep = save_linesep
795
796 # Systematic tests of the text I/O API
797
798 def testBasicIO(self):
799 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
800 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
801 f = io.open(test_support.TESTFN, "w+", encoding=enc)
802 f._CHUNK_SIZE = chunksize
803 self.assertEquals(f.write(u"abc"), 3)
804 f.close()
805 f = io.open(test_support.TESTFN, "r+", encoding=enc)
806 f._CHUNK_SIZE = chunksize
807 self.assertEquals(f.tell(), 0)
808 self.assertEquals(f.read(), u"abc")
809 cookie = f.tell()
810 self.assertEquals(f.seek(0), 0)
811 self.assertEquals(f.read(2), u"ab")
812 self.assertEquals(f.read(1), u"c")
813 self.assertEquals(f.read(1), u"")
814 self.assertEquals(f.read(), u"")
815 self.assertEquals(f.tell(), cookie)
816 self.assertEquals(f.seek(0), 0)
817 self.assertEquals(f.seek(0, 2), cookie)
818 self.assertEquals(f.write(u"def"), 3)
819 self.assertEquals(f.seek(cookie), cookie)
820 self.assertEquals(f.read(), u"def")
821 if enc.startswith("utf"):
822 self.multi_line_test(f, enc)
823 f.close()
824
825 def multi_line_test(self, f, enc):
826 f.seek(0)
827 f.truncate()
828 sample = u"s\xff\u0fff\uffff"
829 wlines = []
830 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
831 chars = []
832 for i in range(size):
833 chars.append(sample[i % len(sample)])
834 line = u"".join(chars) + u"\n"
835 wlines.append((f.tell(), line))
836 f.write(line)
837 f.seek(0)
838 rlines = []
839 while True:
840 pos = f.tell()
841 line = f.readline()
842 if not line:
843 break
844 rlines.append((pos, line))
845 self.assertEquals(rlines, wlines)
846
847 def testTelling(self):
848 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
849 p0 = f.tell()
850 f.write(u"\xff\n")
851 p1 = f.tell()
852 f.write(u"\xff\n")
853 p2 = f.tell()
854 f.seek(0)
855 self.assertEquals(f.tell(), p0)
856 self.assertEquals(f.readline(), u"\xff\n")
857 self.assertEquals(f.tell(), p1)
858 self.assertEquals(f.readline(), u"\xff\n")
859 self.assertEquals(f.tell(), p2)
860 f.seek(0)
861 for line in f:
862 self.assertEquals(line, u"\xff\n")
863 self.assertRaises(IOError, f.tell)
864 self.assertEquals(f.tell(), p2)
865 f.close()
866
867 def testSeeking(self):
868 chunk_size = io.TextIOWrapper._CHUNK_SIZE
869 prefix_size = chunk_size - 2
870 u_prefix = "a" * prefix_size
871 prefix = bytes(u_prefix.encode("utf-8"))
872 self.assertEquals(len(u_prefix), len(prefix))
873 u_suffix = "\u8888\n"
874 suffix = bytes(u_suffix.encode("utf-8"))
875 line = prefix + suffix
876 f = io.open(test_support.TESTFN, "wb")
877 f.write(line*2)
878 f.close()
879 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
880 s = f.read(prefix_size)
881 self.assertEquals(s, unicode(prefix, "ascii"))
882 self.assertEquals(f.tell(), prefix_size)
883 self.assertEquals(f.readline(), u_suffix)
884
885 def testSeekingToo(self):
886 # Regression test for a specific bug
887 data = b'\xe0\xbf\xbf\n'
888 f = io.open(test_support.TESTFN, "wb")
889 f.write(data)
890 f.close()
891 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
892 f._CHUNK_SIZE # Just test that it exists
893 f._CHUNK_SIZE = 2
894 f.readline()
895 f.tell()
896
897 # FIXME: figure out why the test fails with Python 2.6
898 def XXXtestSeekAndTell(self):
899 """Test seek/tell using the StatefulIncrementalDecoder."""
900
901 def lookupTestDecoder(name):
902 if self.codecEnabled and name == 'test_decoder':
903 return codecs.CodecInfo(
904 name='test_decoder', encode=None, decode=None,
905 incrementalencoder=None,
906 streamreader=None, streamwriter=None,
907 incrementaldecoder=StatefulIncrementalDecoder)
908
909 def testSeekAndTellWithData(data, min_pos=0):
910 """Tell/seek to various points within a data stream and ensure
911 that the decoded data returned by read() is consistent."""
912 f = io.open(test_support.TESTFN, 'wb')
913 f.write(data)
914 f.close()
915 f = io.open(test_support.TESTFN, encoding='test_decoder')
916 decoded = f.read()
917 f.close()
918
919 for i in range(min_pos, len(decoded) + 1): # seek positions
920 for j in [1, 5, len(decoded) - i]: # read lengths
921 f = io.open(test_support.TESTFN, encoding='test_decoder')
922 self.assertEquals(f.read(i), decoded[:i])
923 cookie = f.tell()
924 self.assertEquals(f.read(j), decoded[i:i + j])
925 f.seek(cookie)
926 self.assertEquals(f.read(), decoded[i:])
927 f.close()
928
929 # Register a special incremental decoder for testing.
930 codecs.register(lookupTestDecoder)
931 self.codecEnabled = 1
932
933 # Run the tests.
934 try:
935 # Try each test case.
936 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
937 testSeekAndTellWithData(input)
938
939 # Position each test case so that it crosses a chunk boundary.
940 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
941 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
942 offset = CHUNK_SIZE - len(input)//2
943 prefix = b'.'*offset
944 # Don't bother seeking into the prefix (takes too long).
945 min_pos = offset*2
946 testSeekAndTellWithData(prefix + input, min_pos)
947
948 # Ensure our test decoder won't interfere with subsequent tests.
949 finally:
950 self.codecEnabled = 0
951
952 def testEncodedWrites(self):
953 data = u"1234567890"
954 tests = ("utf-16",
955 "utf-16-le",
956 "utf-16-be",
957 "utf-32",
958 "utf-32-le",
959 "utf-32-be")
960 for encoding in tests:
961 buf = io.BytesIO()
962 f = io.TextIOWrapper(buf, encoding=encoding)
963 # Check if the BOM is written only once (see issue1753).
964 f.write(data)
965 f.write(data)
966 f.seek(0)
967 self.assertEquals(f.read(), data * 2)
968 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
969
970 def timingTest(self):
971 timer = time.time
972 enc = "utf8"
973 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
974 nlines = 10000
975 nchars = len(line)
976 nbytes = len(line.encode(enc))
977 for chunk_size in (32, 64, 128, 256):
978 f = io.open(test_support.TESTFN, "w+", encoding=enc)
979 f._CHUNK_SIZE = chunk_size
980 t0 = timer()
981 for i in range(nlines):
982 f.write(line)
983 f.flush()
984 t1 = timer()
985 f.seek(0)
986 for line in f:
987 pass
988 t2 = timer()
989 f.seek(0)
990 while f.readline():
991 pass
992 t3 = timer()
993 f.seek(0)
994 while f.readline():
995 f.tell()
996 t4 = timer()
997 f.close()
998 if test_support.verbose:
999 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1000 (nlines, nchars, nbytes))
1001 print("File chunk size: %6s" % f._CHUNK_SIZE)
1002 print("Writing: %6.3f seconds" % (t1-t0))
1003 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1004 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1005 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1006
1007 def testReadOneByOne(self):
1008 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1009 reads = ""
1010 while True:
1011 c = txt.read(1)
1012 if not c:
1013 break
1014 reads += c
1015 self.assertEquals(reads, "AA\nBB")
1016
1017 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1018 def testReadByChunk(self):
1019 # make sure "\r\n" straddles 128 char boundary.
1020 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1021 reads = ""
1022 while True:
1023 c = txt.read(128)
1024 if not c:
1025 break
1026 reads += c
1027 self.assertEquals(reads, "A"*127+"\nB")
1028
1029 def test_issue1395_1(self):
1030 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1031
1032 # read one char at a time
1033 reads = ""
1034 while True:
1035 c = txt.read(1)
1036 if not c:
1037 break
1038 reads += c
1039 self.assertEquals(reads, self.normalized)
1040
1041 def test_issue1395_2(self):
1042 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1043 txt._CHUNK_SIZE = 4
1044
1045 reads = ""
1046 while True:
1047 c = txt.read(4)
1048 if not c:
1049 break
1050 reads += c
1051 self.assertEquals(reads, self.normalized)
1052
1053 def test_issue1395_3(self):
1054 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1055 txt._CHUNK_SIZE = 4
1056
1057 reads = txt.read(4)
1058 reads += txt.read(4)
1059 reads += txt.readline()
1060 reads += txt.readline()
1061 reads += txt.readline()
1062 self.assertEquals(reads, self.normalized)
1063
1064 def test_issue1395_4(self):
1065 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1066 txt._CHUNK_SIZE = 4
1067
1068 reads = txt.read(4)
1069 reads += txt.read()
1070 self.assertEquals(reads, self.normalized)
1071
1072 def test_issue1395_5(self):
1073 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1074 txt._CHUNK_SIZE = 4
1075
1076 reads = txt.read(4)
1077 pos = txt.tell()
1078 txt.seek(0)
1079 txt.seek(pos)
1080 self.assertEquals(txt.read(4), "BBB\n")
1081
1082 def test_issue2282(self):
1083 buffer = io.BytesIO(self.testdata)
1084 txt = io.TextIOWrapper(buffer, encoding="ascii")
1085
1086 self.assertEqual(buffer.seekable(), txt.seekable())
1087
1088 def test_newline_decoder(self):
1089 import codecs
1090 decoder = codecs.getincrementaldecoder("utf-8")()
1091 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1092
1093 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1094
1095 self.assertEquals(decoder.decode(b'\xe8'), u"")
1096 self.assertEquals(decoder.decode(b'\xa2'), u"")
1097 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1098
1099 self.assertEquals(decoder.decode(b'\xe8'), u"")
1100 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1101
1102 decoder.setstate((b'', 0))
1103 self.assertEquals(decoder.decode(b'\n'), u"\n")
1104 self.assertEquals(decoder.decode(b'\r'), u"")
1105 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1106 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1107
1108 self.assertEquals(decoder.decode(b'\r'), u"")
1109 self.assertEquals(decoder.decode(b'a'), u"\na")
1110
1111 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1112 self.assertEquals(decoder.decode(b'\r'), u"")
1113 self.assertEquals(decoder.decode(b'\r'), u"\n")
1114 self.assertEquals(decoder.decode(b'\na'), u"\na")
1115
1116 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1117 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1118 self.assertEquals(decoder.decode(b'\n'), u"\n")
1119 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1120 self.assertEquals(decoder.decode(b'\n'), u"\n")
1121
1122 decoder = codecs.getincrementaldecoder("utf-8")()
1123 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1124 self.assertEquals(decoder.newlines, None)
1125 decoder.decode(b"abc\n\r")
1126 self.assertEquals(decoder.newlines, u'\n')
1127 decoder.decode(b"\nabc")
1128 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1129 decoder.decode(b"abc\r")
1130 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1131 decoder.decode(b"abc")
1132 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1133 decoder.decode(b"abc\r")
1134 decoder.reset()
1135 self.assertEquals(decoder.decode(b"abc"), "abc")
1136 self.assertEquals(decoder.newlines, None)
1137
1138# XXX Tests for open()
1139
1140class MiscIOTest(unittest.TestCase):
1141
1142 def testImport__all__(self):
1143 for name in io.__all__:
1144 obj = getattr(io, name, None)
1145 self.assert_(obj is not None, name)
1146 if name == "open":
1147 continue
1148 elif "error" in name.lower():
1149 self.assert_(issubclass(obj, Exception), name)
1150 else:
1151 self.assert_(issubclass(obj, io.IOBase))
1152
1153
1154def test_main():
1155 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1156 BufferedReaderTest,
1157 BufferedWriterTest, BufferedRWPairTest,
1158 BufferedRandomTest, TextIOWrapperTest,
1159 MiscIOTest)
1160
1161if __name__ == "__main__":
1162 unittest.main()