blob: 39d3e5b00bd9259157abcc0a002f7ba3b8c9a8c6 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
9import unittest
10from itertools import chain
11from test import test_support
12
13import codecs
14import io # The module under test
15
16
17class MockRawIO(io.RawIOBase):
18
19 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
22
23 def read(self, n=None):
24 try:
25 return self._read_stack.pop(0)
26 except:
27 return b""
28
29 def write(self, b):
30 self._write_stack.append(b[:])
31 return len(b)
32
33 def writable(self):
34 return True
35
36 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
42 def seekable(self):
43 return True
44
45 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
49 return 42
50
51
52class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
64class MockNonBlockWriterIO(io.RawIOBase):
65
66 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
68 self._write_stack = []
69
70 def write(self, b):
71 self._write_stack.append(b[:])
72 n = self._blocking_script.pop(0)
73 if (n < 0):
74 raise io.BlockingIOError(0, "test blocking", -n)
75 else:
76 return n
77
78 def writable(self):
79 return True
80
81
82class IOTest(unittest.TestCase):
83
84 def tearDown(self):
85 test_support.unlink(test_support.TESTFN)
86
87 def write_ops(self, f):
88 self.assertEqual(f.write(b"blah."), 5)
89 self.assertEqual(f.seek(0), 0)
90 self.assertEqual(f.write(b"Hello."), 6)
91 self.assertEqual(f.tell(), 6)
92 self.assertEqual(f.seek(-1, 1), 5)
93 self.assertEqual(f.tell(), 5)
94 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
95 self.assertEqual(f.seek(0), 0)
96 self.assertEqual(f.write(b"h"), 1)
97 self.assertEqual(f.seek(-1, 2), 13)
98 self.assertEqual(f.tell(), 13)
99 self.assertEqual(f.truncate(12), 12)
100 self.assertEqual(f.tell(), 13)
101 self.assertRaises(TypeError, f.seek, 0.0)
102
103 def read_ops(self, f, buffered=False):
104 data = f.read(5)
105 self.assertEqual(data, b"hello")
106 data = bytearray(data)
107 self.assertEqual(f.readinto(data), 5)
108 self.assertEqual(data, b" worl")
109 self.assertEqual(f.readinto(data), 2)
110 self.assertEqual(len(data), 5)
111 self.assertEqual(data[:2], b"d\n")
112 self.assertEqual(f.seek(0), 0)
113 self.assertEqual(f.read(20), b"hello world\n")
114 self.assertEqual(f.read(1), b"")
115 self.assertEqual(f.readinto(bytearray(b"x")), 0)
116 self.assertEqual(f.seek(-6, 2), 6)
117 self.assertEqual(f.read(5), b"world")
118 self.assertEqual(f.read(0), b"")
119 self.assertEqual(f.readinto(bytearray()), 0)
120 self.assertEqual(f.seek(-6, 1), 5)
121 self.assertEqual(f.read(5), b" worl")
122 self.assertEqual(f.tell(), 10)
123 self.assertRaises(TypeError, f.seek, 0.0)
124 if buffered:
125 f.seek(0)
126 self.assertEqual(f.read(), b"hello world\n")
127 f.seek(6)
128 self.assertEqual(f.read(), b"world\n")
129 self.assertEqual(f.read(), b"")
130
131 LARGE = 2**31
132
133 def large_file_ops(self, f):
134 assert f.readable()
135 assert f.writable()
136 self.assertEqual(f.seek(self.LARGE), self.LARGE)
137 self.assertEqual(f.tell(), self.LARGE)
138 self.assertEqual(f.write(b"xxx"), 3)
139 self.assertEqual(f.tell(), self.LARGE + 3)
140 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
141 self.assertEqual(f.truncate(), self.LARGE + 2)
142 self.assertEqual(f.tell(), self.LARGE + 2)
143 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
144 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
147 self.assertEqual(f.seek(-1, 2), self.LARGE)
148 self.assertEqual(f.read(2), b"x")
149
150 def test_raw_file_io(self):
151 f = io.open(test_support.TESTFN, "wb", buffering=0)
152 self.assertEqual(f.readable(), False)
153 self.assertEqual(f.writable(), True)
154 self.assertEqual(f.seekable(), True)
155 self.write_ops(f)
156 f.close()
157 f = io.open(test_support.TESTFN, "rb", buffering=0)
158 self.assertEqual(f.readable(), True)
159 self.assertEqual(f.writable(), False)
160 self.assertEqual(f.seekable(), True)
161 self.read_ops(f)
162 f.close()
163
164 def test_buffered_file_io(self):
165 f = io.open(test_support.TESTFN, "wb")
166 self.assertEqual(f.readable(), False)
167 self.assertEqual(f.writable(), True)
168 self.assertEqual(f.seekable(), True)
169 self.write_ops(f)
170 f.close()
171 f = io.open(test_support.TESTFN, "rb")
172 self.assertEqual(f.readable(), True)
173 self.assertEqual(f.writable(), False)
174 self.assertEqual(f.seekable(), True)
175 self.read_ops(f, True)
176 f.close()
177
178 def test_readline(self):
179 f = io.open(test_support.TESTFN, "wb")
180 f.write(b"abc\ndef\nxyzzy\nfoo")
181 f.close()
182 f = io.open(test_support.TESTFN, "rb")
183 self.assertEqual(f.readline(), b"abc\n")
184 self.assertEqual(f.readline(10), b"def\n")
185 self.assertEqual(f.readline(2), b"xy")
186 self.assertEqual(f.readline(4), b"zzy\n")
187 self.assertEqual(f.readline(), b"foo")
188 f.close()
189
190 def test_raw_bytes_io(self):
191 f = io.BytesIO()
192 self.write_ops(f)
193 data = f.getvalue()
194 self.assertEqual(data, b"hello world\n")
195 f = io.BytesIO(data)
196 self.read_ops(f, True)
197
198 def test_large_file_ops(self):
199 # On Windows and Mac OSX this test comsumes large resources; It takes
200 # a long time to build the >2GB file and takes >2GB of disk space
201 # therefore the resource must be enabled to run this test.
202 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
203 if not test_support.is_resource_enabled("largefile"):
204 print("\nTesting large file ops skipped on %s." % sys.platform,
205 file=sys.stderr)
206 print("It requires %d bytes and a long time." % self.LARGE,
207 file=sys.stderr)
208 print("Use 'regrtest.py -u largefile test_io' to run it.",
209 file=sys.stderr)
210 return
211 f = io.open(test_support.TESTFN, "w+b", 0)
212 self.large_file_ops(f)
213 f.close()
214 f = io.open(test_support.TESTFN, "w+b")
215 self.large_file_ops(f)
216 f.close()
217
218 def test_with_open(self):
219 for bufsize in (0, 1, 100):
220 f = None
221 with open(test_support.TESTFN, "wb", bufsize) as f:
222 f.write(b"xxx")
223 self.assertEqual(f.closed, True)
224 f = None
225 try:
226 with open(test_support.TESTFN, "wb", bufsize) as f:
227 1/0
228 except ZeroDivisionError:
229 self.assertEqual(f.closed, True)
230 else:
231 self.fail("1/0 didn't raise an exception")
232
233 def test_destructor(self):
234 record = []
235 class MyFileIO(io.FileIO):
236 def __del__(self):
237 record.append(1)
238 io.FileIO.__del__(self)
239 def close(self):
240 record.append(2)
241 io.FileIO.close(self)
242 def flush(self):
243 record.append(3)
244 io.FileIO.flush(self)
245 f = MyFileIO(test_support.TESTFN, "w")
246 f.write("xxx")
247 del f
248 self.assertEqual(record, [1, 2, 3])
249
250 def test_close_flushes(self):
251 f = io.open(test_support.TESTFN, "wb")
252 f.write(b"xxx")
253 f.close()
254 f = io.open(test_support.TESTFN, "rb")
255 self.assertEqual(f.read(), b"xxx")
256 f.close()
257
258 def XXXtest_array_writes(self):
259 # XXX memory view not available yet
260 a = array.array('i', range(10))
261 n = len(memoryview(a))
262 f = io.open(test_support.TESTFN, "wb", 0)
263 self.assertEqual(f.write(a), n)
264 f.close()
265 f = io.open(test_support.TESTFN, "wb")
266 self.assertEqual(f.write(a), n)
267 f.close()
268
269 def test_closefd(self):
270 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
271 closefd=False)
272
273class MemorySeekTestMixin:
274
275 def testInit(self):
276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
278
279 def testRead(self):
280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
286 self.assertEquals(self.EOF, bytesIo.read())
287
288 def testReadNoArgs(self):
289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
291
292 self.assertEquals(buf, bytesIo.read())
293 self.assertEquals(self.EOF, bytesIo.read())
294
295 def testSeek(self):
296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
306
307 def testTell(self):
308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
317
318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
322 ioclass = io.BytesIO
323 EOF = b""
324
325
326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
332class BufferedReaderTest(unittest.TestCase):
333
334 def testRead(self):
335 rawio = MockRawIO((b"abc", b"d", b"efg"))
336 bufio = io.BufferedReader(rawio)
337
338 self.assertEquals(b"abcdef", bufio.read(6))
339
340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
353 pos = 0
354 for nbytes in buf_read_sizes:
355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
356 pos += nbytes
357 self.assertEquals(rawio.read_history, raw_read_sizes)
358
359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
369
370 def testReadToEof(self):
371 rawio = MockRawIO((b"abc", b"d", b"efg"))
372 bufio = io.BufferedReader(rawio)
373
374 self.assertEquals(b"abcdefg", bufio.read(9000))
375
376 def testReadNoArgs(self):
377 rawio = MockRawIO((b"abc", b"d", b"efg"))
378 bufio = io.BufferedReader(rawio)
379
380 self.assertEquals(b"abcdefg", bufio.read())
381
382 def testFileno(self):
383 rawio = MockRawIO((b"abc", b"d", b"efg"))
384 bufio = io.BufferedReader(rawio)
385
386 self.assertEquals(42, bufio.fileno())
387
388 def testFilenoNoFileno(self):
389 # XXX will we always have fileno() function? If so, kill
390 # this test. Else, write it.
391 pass
392
393
394class BufferedWriterTest(unittest.TestCase):
395
396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
398 writer = MockRawIO()
399 bufio = io.BufferedWriter(writer, 8)
400
401 bufio.write(b"abc")
402
403 self.assertFalse(writer._write_stack)
404
405 def testWriteOverflow(self):
406 writer = MockRawIO()
407 bufio = io.BufferedWriter(writer, 8)
408
409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
411
412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
413
414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
416 bufio = io.BufferedWriter(raw, 8, 16)
417
418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
422 bufio.write(b"asdfasdfasdf")
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
424 bufio.write(b"asdfasdfasdf")
425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
428 bufio.write(b"asdfasdfasdf")
429
430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
433
434 def testFileno(self):
435 rawio = MockRawIO((b"abc", b"d", b"efg"))
436 bufio = io.BufferedWriter(rawio)
437
438 self.assertEquals(42, bufio.fileno())
439
440 def testFlush(self):
441 writer = MockRawIO()
442 bufio = io.BufferedWriter(writer, 8)
443
444 bufio.write(b"abc")
445 bufio.flush()
446
447 self.assertEquals(b"abc", writer._write_stack[0])
448
449
450class BufferedRWPairTest(unittest.TestCase):
451
452 def testRWPair(self):
453 r = MockRawIO(())
454 w = MockRawIO()
455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
459
460class BufferedRandomTest(unittest.TestCase):
461
462 def testReadAndWrite(self):
463 raw = MockRawIO((b"asdf", b"ghjk"))
464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
469 self.assertFalse(raw._write_stack) # Buffer writes
470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
471 self.assertEquals(b"dddeee", raw._write_stack[0])
472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
491 self.assertRaises(TypeError, rw.seek, 0.0)
492
493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
523 codecs.IncrementalDecoder.__init__(self, errors)
524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000548 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000563 if self.buffer[0] == 'i':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000565 elif self.buffer[0] == 'o':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
577class StatefulIncrementalDecoderTest(unittest.TestCase):
578 """
579 Make sure the StatefulIncrementalDecoder actually works.
580 """
581
582 test_cases = [
583 # I=1, O=1 (fixed-length input == fixed-length output)
584 (b'abcd', False, 'a.b.c.d.'),
585 # I=0, O=0 (variable-length input, variable-length output)
586 (b'oiabcd', True, 'abcd.'),
587 # I=0, O=0 (should ignore extra periods)
588 (b'oi...abcd...', True, 'abcd.'),
589 # I=0, O=6 (variable-length input, fixed-length output)
590 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
591 # I=2, O=6 (fixed-length input < fixed-length output)
592 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
593 # I=6, O=3 (fixed-length input > fixed-length output)
594 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
595 # I=0, then 3; O=29, then 15 (with longer output)
596 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
597 'a----------------------------.' +
598 'b----------------------------.' +
599 'cde--------------------------.' +
600 'abcdefghijabcde.' +
601 'a.b------------.' +
602 '.c.------------.' +
603 'd.e------------.' +
604 'k--------------.' +
605 'l--------------.' +
606 'm--------------.')
607 ]
608
609 def testDecoder(self):
610 # Try a few one-shot test cases.
611 for input, eof, output in self.test_cases:
612 d = StatefulIncrementalDecoder()
613 self.assertEquals(d.decode(input, eof), output)
614
615 # Also test an unfinished decode, followed by forcing EOF.
616 d = StatefulIncrementalDecoder()
617 self.assertEquals(d.decode(b'oiabcd'), '')
618 self.assertEquals(d.decode(b'', 1), 'abcd.')
619
620class TextIOWrapperTest(unittest.TestCase):
621
622 def setUp(self):
623 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
624 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
625
626 def tearDown(self):
627 test_support.unlink(test_support.TESTFN)
628
629 def testLineBuffering(self):
630 r = io.BytesIO()
631 b = io.BufferedWriter(r, 1000)
632 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
633 t.write(u"X")
634 self.assertEquals(r.getvalue(), b"") # No flush happened
635 t.write(u"Y\nZ")
636 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
637 t.write(u"A\rB")
638 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
639
640 def testEncodingErrorsReading(self):
641 # (1) default
642 b = io.BytesIO(b"abc\n\xff\n")
643 t = io.TextIOWrapper(b, encoding="ascii")
644 self.assertRaises(UnicodeError, t.read)
645 # (2) explicit strict
646 b = io.BytesIO(b"abc\n\xff\n")
647 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
648 self.assertRaises(UnicodeError, t.read)
649 # (3) ignore
650 b = io.BytesIO(b"abc\n\xff\n")
651 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
652 self.assertEquals(t.read(), "abc\n\n")
653 # (4) replace
654 b = io.BytesIO(b"abc\n\xff\n")
655 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
656 self.assertEquals(t.read(), u"abc\n\ufffd\n")
657
658 def testEncodingErrorsWriting(self):
659 # (1) default
660 b = io.BytesIO()
661 t = io.TextIOWrapper(b, encoding="ascii")
662 self.assertRaises(UnicodeError, t.write, u"\xff")
663 # (2) explicit strict
664 b = io.BytesIO()
665 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
666 self.assertRaises(UnicodeError, t.write, u"\xff")
667 # (3) ignore
668 b = io.BytesIO()
669 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
670 newline="\n")
671 t.write(u"abc\xffdef\n")
672 t.flush()
673 self.assertEquals(b.getvalue(), b"abcdef\n")
674 # (4) replace
675 b = io.BytesIO()
676 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
677 newline="\n")
678 t.write(u"abc\xffdef\n")
679 t.flush()
680 self.assertEquals(b.getvalue(), b"abc?def\n")
681
682 def testNewlinesInput(self):
683 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
684 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
685 for newline, expected in [
686 (None, normalized.decode("ascii").splitlines(True)),
687 ("", testdata.decode("ascii").splitlines(True)),
688 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
689 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
690 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
691 ]:
692 buf = io.BytesIO(testdata)
693 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
694 self.assertEquals(txt.readlines(), expected)
695 txt.seek(0)
696 self.assertEquals(txt.read(), "".join(expected))
697
698 def testNewlinesOutput(self):
699 testdict = {
700 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
701 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
702 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
703 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
704 }
705 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
706 for newline, expected in tests:
707 buf = io.BytesIO()
708 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
709 txt.write("AAA\nB")
710 txt.write("BB\nCCC\n")
711 txt.write("X\rY\r\nZ")
712 txt.flush()
713 self.assertEquals(buf.getvalue(), expected)
714
715 def testNewlines(self):
716 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
717
718 tests = [
719 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
720 [ '', input_lines ],
721 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
722 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
723 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
724 ]
725
726 encodings = ('utf-8', 'latin-1')
727
728 # Try a range of buffer sizes to test the case where \r is the last
729 # character in TextIOWrapper._pending_line.
730 for encoding in encodings:
731 # XXX: str.encode() should return bytes
732 data = bytes(''.join(input_lines).encode(encoding))
733 for do_reads in (False, True):
734 for bufsize in range(1, 10):
735 for newline, exp_lines in tests:
736 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
737 textio = io.TextIOWrapper(bufio, newline=newline,
738 encoding=encoding)
739 if do_reads:
740 got_lines = []
741 while True:
742 c2 = textio.read(2)
743 if c2 == '':
744 break
745 self.assertEquals(len(c2), 2)
746 got_lines.append(c2 + textio.readline())
747 else:
748 got_lines = list(textio)
749
750 for got_line, exp_line in zip(got_lines, exp_lines):
751 self.assertEquals(got_line, exp_line)
752 self.assertEquals(len(got_lines), len(exp_lines))
753
754 def testNewlinesInput(self):
755 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
756 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
757 for newline, expected in [
758 (None, normalized.decode("ascii").splitlines(True)),
759 ("", testdata.decode("ascii").splitlines(True)),
760 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
761 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
762 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
763 ]:
764 buf = io.BytesIO(testdata)
765 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
766 self.assertEquals(txt.readlines(), expected)
767 txt.seek(0)
768 self.assertEquals(txt.read(), "".join(expected))
769
770 def testNewlinesOutput(self):
771 data = u"AAA\nBBB\rCCC\n"
772 data_lf = b"AAA\nBBB\rCCC\n"
773 data_cr = b"AAA\rBBB\rCCC\r"
774 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
775 save_linesep = os.linesep
776 try:
777 for os.linesep, newline, expected in [
778 ("\n", None, data_lf),
779 ("\r\n", None, data_crlf),
780 ("\n", "", data_lf),
781 ("\r\n", "", data_lf),
782 ("\n", "\n", data_lf),
783 ("\r\n", "\n", data_lf),
784 ("\n", "\r", data_cr),
785 ("\r\n", "\r", data_cr),
786 ("\n", "\r\n", data_crlf),
787 ("\r\n", "\r\n", data_crlf),
788 ]:
789 buf = io.BytesIO()
790 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
791 txt.write(data)
792 txt.close()
793 self.assertEquals(buf.getvalue(), expected)
794 finally:
795 os.linesep = save_linesep
796
797 # Systematic tests of the text I/O API
798
799 def testBasicIO(self):
800 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
801 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
802 f = io.open(test_support.TESTFN, "w+", encoding=enc)
803 f._CHUNK_SIZE = chunksize
804 self.assertEquals(f.write(u"abc"), 3)
805 f.close()
806 f = io.open(test_support.TESTFN, "r+", encoding=enc)
807 f._CHUNK_SIZE = chunksize
808 self.assertEquals(f.tell(), 0)
809 self.assertEquals(f.read(), u"abc")
810 cookie = f.tell()
811 self.assertEquals(f.seek(0), 0)
812 self.assertEquals(f.read(2), u"ab")
813 self.assertEquals(f.read(1), u"c")
814 self.assertEquals(f.read(1), u"")
815 self.assertEquals(f.read(), u"")
816 self.assertEquals(f.tell(), cookie)
817 self.assertEquals(f.seek(0), 0)
818 self.assertEquals(f.seek(0, 2), cookie)
819 self.assertEquals(f.write(u"def"), 3)
820 self.assertEquals(f.seek(cookie), cookie)
821 self.assertEquals(f.read(), u"def")
822 if enc.startswith("utf"):
823 self.multi_line_test(f, enc)
824 f.close()
825
826 def multi_line_test(self, f, enc):
827 f.seek(0)
828 f.truncate()
829 sample = u"s\xff\u0fff\uffff"
830 wlines = []
831 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
832 chars = []
833 for i in range(size):
834 chars.append(sample[i % len(sample)])
835 line = u"".join(chars) + u"\n"
836 wlines.append((f.tell(), line))
837 f.write(line)
838 f.seek(0)
839 rlines = []
840 while True:
841 pos = f.tell()
842 line = f.readline()
843 if not line:
844 break
845 rlines.append((pos, line))
846 self.assertEquals(rlines, wlines)
847
848 def testTelling(self):
849 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
850 p0 = f.tell()
851 f.write(u"\xff\n")
852 p1 = f.tell()
853 f.write(u"\xff\n")
854 p2 = f.tell()
855 f.seek(0)
856 self.assertEquals(f.tell(), p0)
857 self.assertEquals(f.readline(), u"\xff\n")
858 self.assertEquals(f.tell(), p1)
859 self.assertEquals(f.readline(), u"\xff\n")
860 self.assertEquals(f.tell(), p2)
861 f.seek(0)
862 for line in f:
863 self.assertEquals(line, u"\xff\n")
864 self.assertRaises(IOError, f.tell)
865 self.assertEquals(f.tell(), p2)
866 f.close()
867
868 def testSeeking(self):
869 chunk_size = io.TextIOWrapper._CHUNK_SIZE
870 prefix_size = chunk_size - 2
871 u_prefix = "a" * prefix_size
872 prefix = bytes(u_prefix.encode("utf-8"))
873 self.assertEquals(len(u_prefix), len(prefix))
874 u_suffix = "\u8888\n"
875 suffix = bytes(u_suffix.encode("utf-8"))
876 line = prefix + suffix
877 f = io.open(test_support.TESTFN, "wb")
878 f.write(line*2)
879 f.close()
880 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
881 s = f.read(prefix_size)
882 self.assertEquals(s, unicode(prefix, "ascii"))
883 self.assertEquals(f.tell(), prefix_size)
884 self.assertEquals(f.readline(), u_suffix)
885
886 def testSeekingToo(self):
887 # Regression test for a specific bug
888 data = b'\xe0\xbf\xbf\n'
889 f = io.open(test_support.TESTFN, "wb")
890 f.write(data)
891 f.close()
892 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
893 f._CHUNK_SIZE # Just test that it exists
894 f._CHUNK_SIZE = 2
895 f.readline()
896 f.tell()
897
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000898 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000899 """Test seek/tell using the StatefulIncrementalDecoder."""
900
901 def lookupTestDecoder(name):
902 if self.codecEnabled and name == 'test_decoder':
903 return codecs.CodecInfo(
904 name='test_decoder', encode=None, decode=None,
905 incrementalencoder=None,
906 streamreader=None, streamwriter=None,
907 incrementaldecoder=StatefulIncrementalDecoder)
908
909 def testSeekAndTellWithData(data, min_pos=0):
910 """Tell/seek to various points within a data stream and ensure
911 that the decoded data returned by read() is consistent."""
912 f = io.open(test_support.TESTFN, 'wb')
913 f.write(data)
914 f.close()
915 f = io.open(test_support.TESTFN, encoding='test_decoder')
916 decoded = f.read()
917 f.close()
918
919 for i in range(min_pos, len(decoded) + 1): # seek positions
920 for j in [1, 5, len(decoded) - i]: # read lengths
921 f = io.open(test_support.TESTFN, encoding='test_decoder')
922 self.assertEquals(f.read(i), decoded[:i])
923 cookie = f.tell()
924 self.assertEquals(f.read(j), decoded[i:i + j])
925 f.seek(cookie)
926 self.assertEquals(f.read(), decoded[i:])
927 f.close()
928
929 # Register a special incremental decoder for testing.
930 codecs.register(lookupTestDecoder)
931 self.codecEnabled = 1
932
933 # Run the tests.
934 try:
935 # Try each test case.
936 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
937 testSeekAndTellWithData(input)
938
939 # Position each test case so that it crosses a chunk boundary.
940 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
941 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
942 offset = CHUNK_SIZE - len(input)//2
943 prefix = b'.'*offset
944 # Don't bother seeking into the prefix (takes too long).
945 min_pos = offset*2
946 testSeekAndTellWithData(prefix + input, min_pos)
947
948 # Ensure our test decoder won't interfere with subsequent tests.
949 finally:
950 self.codecEnabled = 0
951
952 def testEncodedWrites(self):
953 data = u"1234567890"
954 tests = ("utf-16",
955 "utf-16-le",
956 "utf-16-be",
957 "utf-32",
958 "utf-32-le",
959 "utf-32-be")
960 for encoding in tests:
961 buf = io.BytesIO()
962 f = io.TextIOWrapper(buf, encoding=encoding)
963 # Check if the BOM is written only once (see issue1753).
964 f.write(data)
965 f.write(data)
966 f.seek(0)
967 self.assertEquals(f.read(), data * 2)
968 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
969
970 def timingTest(self):
971 timer = time.time
972 enc = "utf8"
973 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
974 nlines = 10000
975 nchars = len(line)
976 nbytes = len(line.encode(enc))
977 for chunk_size in (32, 64, 128, 256):
978 f = io.open(test_support.TESTFN, "w+", encoding=enc)
979 f._CHUNK_SIZE = chunk_size
980 t0 = timer()
981 for i in range(nlines):
982 f.write(line)
983 f.flush()
984 t1 = timer()
985 f.seek(0)
986 for line in f:
987 pass
988 t2 = timer()
989 f.seek(0)
990 while f.readline():
991 pass
992 t3 = timer()
993 f.seek(0)
994 while f.readline():
995 f.tell()
996 t4 = timer()
997 f.close()
998 if test_support.verbose:
999 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1000 (nlines, nchars, nbytes))
1001 print("File chunk size: %6s" % f._CHUNK_SIZE)
1002 print("Writing: %6.3f seconds" % (t1-t0))
1003 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1004 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1005 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1006
1007 def testReadOneByOne(self):
1008 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1009 reads = ""
1010 while True:
1011 c = txt.read(1)
1012 if not c:
1013 break
1014 reads += c
1015 self.assertEquals(reads, "AA\nBB")
1016
1017 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1018 def testReadByChunk(self):
1019 # make sure "\r\n" straddles 128 char boundary.
1020 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1021 reads = ""
1022 while True:
1023 c = txt.read(128)
1024 if not c:
1025 break
1026 reads += c
1027 self.assertEquals(reads, "A"*127+"\nB")
1028
1029 def test_issue1395_1(self):
1030 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1031
1032 # read one char at a time
1033 reads = ""
1034 while True:
1035 c = txt.read(1)
1036 if not c:
1037 break
1038 reads += c
1039 self.assertEquals(reads, self.normalized)
1040
1041 def test_issue1395_2(self):
1042 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1043 txt._CHUNK_SIZE = 4
1044
1045 reads = ""
1046 while True:
1047 c = txt.read(4)
1048 if not c:
1049 break
1050 reads += c
1051 self.assertEquals(reads, self.normalized)
1052
1053 def test_issue1395_3(self):
1054 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1055 txt._CHUNK_SIZE = 4
1056
1057 reads = txt.read(4)
1058 reads += txt.read(4)
1059 reads += txt.readline()
1060 reads += txt.readline()
1061 reads += txt.readline()
1062 self.assertEquals(reads, self.normalized)
1063
1064 def test_issue1395_4(self):
1065 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1066 txt._CHUNK_SIZE = 4
1067
1068 reads = txt.read(4)
1069 reads += txt.read()
1070 self.assertEquals(reads, self.normalized)
1071
1072 def test_issue1395_5(self):
1073 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1074 txt._CHUNK_SIZE = 4
1075
1076 reads = txt.read(4)
1077 pos = txt.tell()
1078 txt.seek(0)
1079 txt.seek(pos)
1080 self.assertEquals(txt.read(4), "BBB\n")
1081
1082 def test_issue2282(self):
1083 buffer = io.BytesIO(self.testdata)
1084 txt = io.TextIOWrapper(buffer, encoding="ascii")
1085
1086 self.assertEqual(buffer.seekable(), txt.seekable())
1087
1088 def test_newline_decoder(self):
1089 import codecs
1090 decoder = codecs.getincrementaldecoder("utf-8")()
1091 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1092
1093 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1094
1095 self.assertEquals(decoder.decode(b'\xe8'), u"")
1096 self.assertEquals(decoder.decode(b'\xa2'), u"")
1097 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1098
1099 self.assertEquals(decoder.decode(b'\xe8'), u"")
1100 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1101
1102 decoder.setstate((b'', 0))
1103 self.assertEquals(decoder.decode(b'\n'), u"\n")
1104 self.assertEquals(decoder.decode(b'\r'), u"")
1105 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1106 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1107
1108 self.assertEquals(decoder.decode(b'\r'), u"")
1109 self.assertEquals(decoder.decode(b'a'), u"\na")
1110
1111 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1112 self.assertEquals(decoder.decode(b'\r'), u"")
1113 self.assertEquals(decoder.decode(b'\r'), u"\n")
1114 self.assertEquals(decoder.decode(b'\na'), u"\na")
1115
1116 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1117 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1118 self.assertEquals(decoder.decode(b'\n'), u"\n")
1119 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1120 self.assertEquals(decoder.decode(b'\n'), u"\n")
1121
1122 decoder = codecs.getincrementaldecoder("utf-8")()
1123 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1124 self.assertEquals(decoder.newlines, None)
1125 decoder.decode(b"abc\n\r")
1126 self.assertEquals(decoder.newlines, u'\n')
1127 decoder.decode(b"\nabc")
1128 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1129 decoder.decode(b"abc\r")
1130 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1131 decoder.decode(b"abc")
1132 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1133 decoder.decode(b"abc\r")
1134 decoder.reset()
1135 self.assertEquals(decoder.decode(b"abc"), "abc")
1136 self.assertEquals(decoder.newlines, None)
1137
1138# XXX Tests for open()
1139
1140class MiscIOTest(unittest.TestCase):
1141
1142 def testImport__all__(self):
1143 for name in io.__all__:
1144 obj = getattr(io, name, None)
1145 self.assert_(obj is not None, name)
1146 if name == "open":
1147 continue
1148 elif "error" in name.lower():
1149 self.assert_(issubclass(obj, Exception), name)
1150 else:
1151 self.assert_(issubclass(obj, io.IOBase))
1152
1153
1154def test_main():
1155 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1156 BufferedReaderTest,
1157 BufferedWriterTest, BufferedRWPairTest,
1158 BufferedRandomTest, TextIOWrapperTest,
1159 MiscIOTest)
1160
1161if __name__ == "__main__":
1162 unittest.main()