blob: b93ce02806d9c943cf04be8ab786f342a1bebb2b [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
9import unittest
10from itertools import chain
11from test import test_support
12
13import codecs
14import io # The module under test
15
16
17class MockRawIO(io.RawIOBase):
18
19 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
22
23 def read(self, n=None):
24 try:
25 return self._read_stack.pop(0)
26 except:
27 return b""
28
29 def write(self, b):
30 self._write_stack.append(b[:])
31 return len(b)
32
33 def writable(self):
34 return True
35
36 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
42 def seekable(self):
43 return True
44
45 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
49 return 42
50
51
52class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
64class MockNonBlockWriterIO(io.RawIOBase):
65
66 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
68 self._write_stack = []
69
70 def write(self, b):
71 self._write_stack.append(b[:])
72 n = self._blocking_script.pop(0)
73 if (n < 0):
74 raise io.BlockingIOError(0, "test blocking", -n)
75 else:
76 return n
77
78 def writable(self):
79 return True
80
81
82class IOTest(unittest.TestCase):
83
84 def tearDown(self):
85 test_support.unlink(test_support.TESTFN)
86
87 def write_ops(self, f):
88 self.assertEqual(f.write(b"blah."), 5)
89 self.assertEqual(f.seek(0), 0)
90 self.assertEqual(f.write(b"Hello."), 6)
91 self.assertEqual(f.tell(), 6)
92 self.assertEqual(f.seek(-1, 1), 5)
93 self.assertEqual(f.tell(), 5)
94 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
95 self.assertEqual(f.seek(0), 0)
96 self.assertEqual(f.write(b"h"), 1)
97 self.assertEqual(f.seek(-1, 2), 13)
98 self.assertEqual(f.tell(), 13)
99 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000100 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000101 self.assertRaises(TypeError, f.seek, 0.0)
102
103 def read_ops(self, f, buffered=False):
104 data = f.read(5)
105 self.assertEqual(data, b"hello")
106 data = bytearray(data)
107 self.assertEqual(f.readinto(data), 5)
108 self.assertEqual(data, b" worl")
109 self.assertEqual(f.readinto(data), 2)
110 self.assertEqual(len(data), 5)
111 self.assertEqual(data[:2], b"d\n")
112 self.assertEqual(f.seek(0), 0)
113 self.assertEqual(f.read(20), b"hello world\n")
114 self.assertEqual(f.read(1), b"")
115 self.assertEqual(f.readinto(bytearray(b"x")), 0)
116 self.assertEqual(f.seek(-6, 2), 6)
117 self.assertEqual(f.read(5), b"world")
118 self.assertEqual(f.read(0), b"")
119 self.assertEqual(f.readinto(bytearray()), 0)
120 self.assertEqual(f.seek(-6, 1), 5)
121 self.assertEqual(f.read(5), b" worl")
122 self.assertEqual(f.tell(), 10)
123 self.assertRaises(TypeError, f.seek, 0.0)
124 if buffered:
125 f.seek(0)
126 self.assertEqual(f.read(), b"hello world\n")
127 f.seek(6)
128 self.assertEqual(f.read(), b"world\n")
129 self.assertEqual(f.read(), b"")
130
131 LARGE = 2**31
132
133 def large_file_ops(self, f):
134 assert f.readable()
135 assert f.writable()
136 self.assertEqual(f.seek(self.LARGE), self.LARGE)
137 self.assertEqual(f.tell(), self.LARGE)
138 self.assertEqual(f.write(b"xxx"), 3)
139 self.assertEqual(f.tell(), self.LARGE + 3)
140 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
141 self.assertEqual(f.truncate(), self.LARGE + 2)
142 self.assertEqual(f.tell(), self.LARGE + 2)
143 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
144 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000145 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000146 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
147 self.assertEqual(f.seek(-1, 2), self.LARGE)
148 self.assertEqual(f.read(2), b"x")
149
150 def test_raw_file_io(self):
151 f = io.open(test_support.TESTFN, "wb", buffering=0)
152 self.assertEqual(f.readable(), False)
153 self.assertEqual(f.writable(), True)
154 self.assertEqual(f.seekable(), True)
155 self.write_ops(f)
156 f.close()
157 f = io.open(test_support.TESTFN, "rb", buffering=0)
158 self.assertEqual(f.readable(), True)
159 self.assertEqual(f.writable(), False)
160 self.assertEqual(f.seekable(), True)
161 self.read_ops(f)
162 f.close()
163
164 def test_buffered_file_io(self):
165 f = io.open(test_support.TESTFN, "wb")
166 self.assertEqual(f.readable(), False)
167 self.assertEqual(f.writable(), True)
168 self.assertEqual(f.seekable(), True)
169 self.write_ops(f)
170 f.close()
171 f = io.open(test_support.TESTFN, "rb")
172 self.assertEqual(f.readable(), True)
173 self.assertEqual(f.writable(), False)
174 self.assertEqual(f.seekable(), True)
175 self.read_ops(f, True)
176 f.close()
177
178 def test_readline(self):
179 f = io.open(test_support.TESTFN, "wb")
180 f.write(b"abc\ndef\nxyzzy\nfoo")
181 f.close()
182 f = io.open(test_support.TESTFN, "rb")
183 self.assertEqual(f.readline(), b"abc\n")
184 self.assertEqual(f.readline(10), b"def\n")
185 self.assertEqual(f.readline(2), b"xy")
186 self.assertEqual(f.readline(4), b"zzy\n")
187 self.assertEqual(f.readline(), b"foo")
188 f.close()
189
190 def test_raw_bytes_io(self):
191 f = io.BytesIO()
192 self.write_ops(f)
193 data = f.getvalue()
194 self.assertEqual(data, b"hello world\n")
195 f = io.BytesIO(data)
196 self.read_ops(f, True)
197
198 def test_large_file_ops(self):
199 # On Windows and Mac OSX this test comsumes large resources; It takes
200 # a long time to build the >2GB file and takes >2GB of disk space
201 # therefore the resource must be enabled to run this test.
202 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
203 if not test_support.is_resource_enabled("largefile"):
204 print("\nTesting large file ops skipped on %s." % sys.platform,
205 file=sys.stderr)
206 print("It requires %d bytes and a long time." % self.LARGE,
207 file=sys.stderr)
208 print("Use 'regrtest.py -u largefile test_io' to run it.",
209 file=sys.stderr)
210 return
211 f = io.open(test_support.TESTFN, "w+b", 0)
212 self.large_file_ops(f)
213 f.close()
214 f = io.open(test_support.TESTFN, "w+b")
215 self.large_file_ops(f)
216 f.close()
217
218 def test_with_open(self):
219 for bufsize in (0, 1, 100):
220 f = None
221 with open(test_support.TESTFN, "wb", bufsize) as f:
222 f.write(b"xxx")
223 self.assertEqual(f.closed, True)
224 f = None
225 try:
226 with open(test_support.TESTFN, "wb", bufsize) as f:
227 1/0
228 except ZeroDivisionError:
229 self.assertEqual(f.closed, True)
230 else:
231 self.fail("1/0 didn't raise an exception")
232
233 def test_destructor(self):
234 record = []
235 class MyFileIO(io.FileIO):
236 def __del__(self):
237 record.append(1)
238 io.FileIO.__del__(self)
239 def close(self):
240 record.append(2)
241 io.FileIO.close(self)
242 def flush(self):
243 record.append(3)
244 io.FileIO.flush(self)
245 f = MyFileIO(test_support.TESTFN, "w")
246 f.write("xxx")
247 del f
248 self.assertEqual(record, [1, 2, 3])
249
250 def test_close_flushes(self):
251 f = io.open(test_support.TESTFN, "wb")
252 f.write(b"xxx")
253 f.close()
254 f = io.open(test_support.TESTFN, "rb")
255 self.assertEqual(f.read(), b"xxx")
256 f.close()
257
258 def XXXtest_array_writes(self):
259 # XXX memory view not available yet
260 a = array.array('i', range(10))
261 n = len(memoryview(a))
262 f = io.open(test_support.TESTFN, "wb", 0)
263 self.assertEqual(f.write(a), n)
264 f.close()
265 f = io.open(test_support.TESTFN, "wb")
266 self.assertEqual(f.write(a), n)
267 f.close()
268
269 def test_closefd(self):
270 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
271 closefd=False)
272
273class MemorySeekTestMixin:
274
275 def testInit(self):
276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
278
279 def testRead(self):
280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
286 self.assertEquals(self.EOF, bytesIo.read())
287
288 def testReadNoArgs(self):
289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
291
292 self.assertEquals(buf, bytesIo.read())
293 self.assertEquals(self.EOF, bytesIo.read())
294
295 def testSeek(self):
296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
306
307 def testTell(self):
308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
317
318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
322 ioclass = io.BytesIO
323 EOF = b""
324
325
326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
332class BufferedReaderTest(unittest.TestCase):
333
334 def testRead(self):
335 rawio = MockRawIO((b"abc", b"d", b"efg"))
336 bufio = io.BufferedReader(rawio)
337
338 self.assertEquals(b"abcdef", bufio.read(6))
339
340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
353 pos = 0
354 for nbytes in buf_read_sizes:
355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
356 pos += nbytes
357 self.assertEquals(rawio.read_history, raw_read_sizes)
358
359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
362 bufio = io.BufferedReader(rawio)
363
364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
369
370 def testReadToEof(self):
371 rawio = MockRawIO((b"abc", b"d", b"efg"))
372 bufio = io.BufferedReader(rawio)
373
374 self.assertEquals(b"abcdefg", bufio.read(9000))
375
376 def testReadNoArgs(self):
377 rawio = MockRawIO((b"abc", b"d", b"efg"))
378 bufio = io.BufferedReader(rawio)
379
380 self.assertEquals(b"abcdefg", bufio.read())
381
382 def testFileno(self):
383 rawio = MockRawIO((b"abc", b"d", b"efg"))
384 bufio = io.BufferedReader(rawio)
385
386 self.assertEquals(42, bufio.fileno())
387
388 def testFilenoNoFileno(self):
389 # XXX will we always have fileno() function? If so, kill
390 # this test. Else, write it.
391 pass
392
393
394class BufferedWriterTest(unittest.TestCase):
395
396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
398 writer = MockRawIO()
399 bufio = io.BufferedWriter(writer, 8)
400
401 bufio.write(b"abc")
402
403 self.assertFalse(writer._write_stack)
404
405 def testWriteOverflow(self):
406 writer = MockRawIO()
407 bufio = io.BufferedWriter(writer, 8)
408
409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
411
412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
413
414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
416 bufio = io.BufferedWriter(raw, 8, 16)
417
418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
422 bufio.write(b"asdfasdfasdf")
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
424 bufio.write(b"asdfasdfasdf")
425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
428 bufio.write(b"asdfasdfasdf")
429
430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
433
434 def testFileno(self):
435 rawio = MockRawIO((b"abc", b"d", b"efg"))
436 bufio = io.BufferedWriter(rawio)
437
438 self.assertEquals(42, bufio.fileno())
439
440 def testFlush(self):
441 writer = MockRawIO()
442 bufio = io.BufferedWriter(writer, 8)
443
444 bufio.write(b"abc")
445 bufio.flush()
446
447 self.assertEquals(b"abc", writer._write_stack[0])
448
449
450class BufferedRWPairTest(unittest.TestCase):
451
452 def testRWPair(self):
453 r = MockRawIO(())
454 w = MockRawIO()
455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
459
460class BufferedRandomTest(unittest.TestCase):
461
462 def testReadAndWrite(self):
463 raw = MockRawIO((b"asdf", b"ghjk"))
464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
469 self.assertFalse(raw._write_stack) # Buffer writes
470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
471 self.assertEquals(b"dddeee", raw._write_stack[0])
472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
491 self.assertRaises(TypeError, rw.seek, 0.0)
492
493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
523 codecs.IncrementalDecoder.__init__(self, errors)
524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000548 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000563 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000565 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000577 codecEnabled = False
578
579 @classmethod
580 def lookupTestDecoder(cls, name):
581 if cls.codecEnabled and name == 'test_decoder':
582 return codecs.CodecInfo(
583 name='test_decoder', encode=None, decode=None,
584 incrementalencoder=None,
585 streamreader=None, streamwriter=None,
586 incrementaldecoder=cls)
587
588# Register the previous decoder for testing.
589# Disabled by default, tests will enable it.
590codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
591
592
Christian Heimes1a6387e2008-03-26 12:49:49 +0000593class StatefulIncrementalDecoderTest(unittest.TestCase):
594 """
595 Make sure the StatefulIncrementalDecoder actually works.
596 """
597
598 test_cases = [
599 # I=1, O=1 (fixed-length input == fixed-length output)
600 (b'abcd', False, 'a.b.c.d.'),
601 # I=0, O=0 (variable-length input, variable-length output)
602 (b'oiabcd', True, 'abcd.'),
603 # I=0, O=0 (should ignore extra periods)
604 (b'oi...abcd...', True, 'abcd.'),
605 # I=0, O=6 (variable-length input, fixed-length output)
606 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
607 # I=2, O=6 (fixed-length input < fixed-length output)
608 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
609 # I=6, O=3 (fixed-length input > fixed-length output)
610 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
611 # I=0, then 3; O=29, then 15 (with longer output)
612 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
613 'a----------------------------.' +
614 'b----------------------------.' +
615 'cde--------------------------.' +
616 'abcdefghijabcde.' +
617 'a.b------------.' +
618 '.c.------------.' +
619 'd.e------------.' +
620 'k--------------.' +
621 'l--------------.' +
622 'm--------------.')
623 ]
624
625 def testDecoder(self):
626 # Try a few one-shot test cases.
627 for input, eof, output in self.test_cases:
628 d = StatefulIncrementalDecoder()
629 self.assertEquals(d.decode(input, eof), output)
630
631 # Also test an unfinished decode, followed by forcing EOF.
632 d = StatefulIncrementalDecoder()
633 self.assertEquals(d.decode(b'oiabcd'), '')
634 self.assertEquals(d.decode(b'', 1), 'abcd.')
635
636class TextIOWrapperTest(unittest.TestCase):
637
638 def setUp(self):
639 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
640 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
641
642 def tearDown(self):
643 test_support.unlink(test_support.TESTFN)
644
645 def testLineBuffering(self):
646 r = io.BytesIO()
647 b = io.BufferedWriter(r, 1000)
648 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
649 t.write(u"X")
650 self.assertEquals(r.getvalue(), b"") # No flush happened
651 t.write(u"Y\nZ")
652 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
653 t.write(u"A\rB")
654 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
655
656 def testEncodingErrorsReading(self):
657 # (1) default
658 b = io.BytesIO(b"abc\n\xff\n")
659 t = io.TextIOWrapper(b, encoding="ascii")
660 self.assertRaises(UnicodeError, t.read)
661 # (2) explicit strict
662 b = io.BytesIO(b"abc\n\xff\n")
663 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
664 self.assertRaises(UnicodeError, t.read)
665 # (3) ignore
666 b = io.BytesIO(b"abc\n\xff\n")
667 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
668 self.assertEquals(t.read(), "abc\n\n")
669 # (4) replace
670 b = io.BytesIO(b"abc\n\xff\n")
671 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
672 self.assertEquals(t.read(), u"abc\n\ufffd\n")
673
674 def testEncodingErrorsWriting(self):
675 # (1) default
676 b = io.BytesIO()
677 t = io.TextIOWrapper(b, encoding="ascii")
678 self.assertRaises(UnicodeError, t.write, u"\xff")
679 # (2) explicit strict
680 b = io.BytesIO()
681 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
682 self.assertRaises(UnicodeError, t.write, u"\xff")
683 # (3) ignore
684 b = io.BytesIO()
685 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
686 newline="\n")
687 t.write(u"abc\xffdef\n")
688 t.flush()
689 self.assertEquals(b.getvalue(), b"abcdef\n")
690 # (4) replace
691 b = io.BytesIO()
692 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
693 newline="\n")
694 t.write(u"abc\xffdef\n")
695 t.flush()
696 self.assertEquals(b.getvalue(), b"abc?def\n")
697
698 def testNewlinesInput(self):
699 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
700 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
701 for newline, expected in [
702 (None, normalized.decode("ascii").splitlines(True)),
703 ("", testdata.decode("ascii").splitlines(True)),
704 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
705 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
706 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
707 ]:
708 buf = io.BytesIO(testdata)
709 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
710 self.assertEquals(txt.readlines(), expected)
711 txt.seek(0)
712 self.assertEquals(txt.read(), "".join(expected))
713
714 def testNewlinesOutput(self):
715 testdict = {
716 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
717 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
718 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
719 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
720 }
721 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
722 for newline, expected in tests:
723 buf = io.BytesIO()
724 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
725 txt.write("AAA\nB")
726 txt.write("BB\nCCC\n")
727 txt.write("X\rY\r\nZ")
728 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000729 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000730 self.assertEquals(buf.getvalue(), expected)
731
732 def testNewlines(self):
733 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
734
735 tests = [
736 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
737 [ '', input_lines ],
738 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
739 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
740 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
741 ]
742
743 encodings = ('utf-8', 'latin-1')
744
745 # Try a range of buffer sizes to test the case where \r is the last
746 # character in TextIOWrapper._pending_line.
747 for encoding in encodings:
748 # XXX: str.encode() should return bytes
749 data = bytes(''.join(input_lines).encode(encoding))
750 for do_reads in (False, True):
751 for bufsize in range(1, 10):
752 for newline, exp_lines in tests:
753 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
754 textio = io.TextIOWrapper(bufio, newline=newline,
755 encoding=encoding)
756 if do_reads:
757 got_lines = []
758 while True:
759 c2 = textio.read(2)
760 if c2 == '':
761 break
762 self.assertEquals(len(c2), 2)
763 got_lines.append(c2 + textio.readline())
764 else:
765 got_lines = list(textio)
766
767 for got_line, exp_line in zip(got_lines, exp_lines):
768 self.assertEquals(got_line, exp_line)
769 self.assertEquals(len(got_lines), len(exp_lines))
770
771 def testNewlinesInput(self):
772 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
773 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
774 for newline, expected in [
775 (None, normalized.decode("ascii").splitlines(True)),
776 ("", testdata.decode("ascii").splitlines(True)),
777 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
778 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
779 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
780 ]:
781 buf = io.BytesIO(testdata)
782 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
783 self.assertEquals(txt.readlines(), expected)
784 txt.seek(0)
785 self.assertEquals(txt.read(), "".join(expected))
786
787 def testNewlinesOutput(self):
788 data = u"AAA\nBBB\rCCC\n"
789 data_lf = b"AAA\nBBB\rCCC\n"
790 data_cr = b"AAA\rBBB\rCCC\r"
791 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
792 save_linesep = os.linesep
793 try:
794 for os.linesep, newline, expected in [
795 ("\n", None, data_lf),
796 ("\r\n", None, data_crlf),
797 ("\n", "", data_lf),
798 ("\r\n", "", data_lf),
799 ("\n", "\n", data_lf),
800 ("\r\n", "\n", data_lf),
801 ("\n", "\r", data_cr),
802 ("\r\n", "\r", data_cr),
803 ("\n", "\r\n", data_crlf),
804 ("\r\n", "\r\n", data_crlf),
805 ]:
806 buf = io.BytesIO()
807 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
808 txt.write(data)
809 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000810 self.assertEquals(buf.closed, True)
811 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000812 finally:
813 os.linesep = save_linesep
814
815 # Systematic tests of the text I/O API
816
817 def testBasicIO(self):
818 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
819 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
820 f = io.open(test_support.TESTFN, "w+", encoding=enc)
821 f._CHUNK_SIZE = chunksize
822 self.assertEquals(f.write(u"abc"), 3)
823 f.close()
824 f = io.open(test_support.TESTFN, "r+", encoding=enc)
825 f._CHUNK_SIZE = chunksize
826 self.assertEquals(f.tell(), 0)
827 self.assertEquals(f.read(), u"abc")
828 cookie = f.tell()
829 self.assertEquals(f.seek(0), 0)
830 self.assertEquals(f.read(2), u"ab")
831 self.assertEquals(f.read(1), u"c")
832 self.assertEquals(f.read(1), u"")
833 self.assertEquals(f.read(), u"")
834 self.assertEquals(f.tell(), cookie)
835 self.assertEquals(f.seek(0), 0)
836 self.assertEquals(f.seek(0, 2), cookie)
837 self.assertEquals(f.write(u"def"), 3)
838 self.assertEquals(f.seek(cookie), cookie)
839 self.assertEquals(f.read(), u"def")
840 if enc.startswith("utf"):
841 self.multi_line_test(f, enc)
842 f.close()
843
844 def multi_line_test(self, f, enc):
845 f.seek(0)
846 f.truncate()
847 sample = u"s\xff\u0fff\uffff"
848 wlines = []
849 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
850 chars = []
851 for i in range(size):
852 chars.append(sample[i % len(sample)])
853 line = u"".join(chars) + u"\n"
854 wlines.append((f.tell(), line))
855 f.write(line)
856 f.seek(0)
857 rlines = []
858 while True:
859 pos = f.tell()
860 line = f.readline()
861 if not line:
862 break
863 rlines.append((pos, line))
864 self.assertEquals(rlines, wlines)
865
866 def testTelling(self):
867 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
868 p0 = f.tell()
869 f.write(u"\xff\n")
870 p1 = f.tell()
871 f.write(u"\xff\n")
872 p2 = f.tell()
873 f.seek(0)
874 self.assertEquals(f.tell(), p0)
875 self.assertEquals(f.readline(), u"\xff\n")
876 self.assertEquals(f.tell(), p1)
877 self.assertEquals(f.readline(), u"\xff\n")
878 self.assertEquals(f.tell(), p2)
879 f.seek(0)
880 for line in f:
881 self.assertEquals(line, u"\xff\n")
882 self.assertRaises(IOError, f.tell)
883 self.assertEquals(f.tell(), p2)
884 f.close()
885
886 def testSeeking(self):
887 chunk_size = io.TextIOWrapper._CHUNK_SIZE
888 prefix_size = chunk_size - 2
889 u_prefix = "a" * prefix_size
890 prefix = bytes(u_prefix.encode("utf-8"))
891 self.assertEquals(len(u_prefix), len(prefix))
892 u_suffix = "\u8888\n"
893 suffix = bytes(u_suffix.encode("utf-8"))
894 line = prefix + suffix
895 f = io.open(test_support.TESTFN, "wb")
896 f.write(line*2)
897 f.close()
898 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
899 s = f.read(prefix_size)
900 self.assertEquals(s, unicode(prefix, "ascii"))
901 self.assertEquals(f.tell(), prefix_size)
902 self.assertEquals(f.readline(), u_suffix)
903
904 def testSeekingToo(self):
905 # Regression test for a specific bug
906 data = b'\xe0\xbf\xbf\n'
907 f = io.open(test_support.TESTFN, "wb")
908 f.write(data)
909 f.close()
910 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
911 f._CHUNK_SIZE # Just test that it exists
912 f._CHUNK_SIZE = 2
913 f.readline()
914 f.tell()
915
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000916 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917 """Test seek/tell using the StatefulIncrementalDecoder."""
918
Christian Heimes1a6387e2008-03-26 12:49:49 +0000919 def testSeekAndTellWithData(data, min_pos=0):
920 """Tell/seek to various points within a data stream and ensure
921 that the decoded data returned by read() is consistent."""
922 f = io.open(test_support.TESTFN, 'wb')
923 f.write(data)
924 f.close()
925 f = io.open(test_support.TESTFN, encoding='test_decoder')
926 decoded = f.read()
927 f.close()
928
929 for i in range(min_pos, len(decoded) + 1): # seek positions
930 for j in [1, 5, len(decoded) - i]: # read lengths
931 f = io.open(test_support.TESTFN, encoding='test_decoder')
932 self.assertEquals(f.read(i), decoded[:i])
933 cookie = f.tell()
934 self.assertEquals(f.read(j), decoded[i:i + j])
935 f.seek(cookie)
936 self.assertEquals(f.read(), decoded[i:])
937 f.close()
938
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000939 # Enable the test decoder.
940 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
942 # Run the tests.
943 try:
944 # Try each test case.
945 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
946 testSeekAndTellWithData(input)
947
948 # Position each test case so that it crosses a chunk boundary.
949 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
950 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
951 offset = CHUNK_SIZE - len(input)//2
952 prefix = b'.'*offset
953 # Don't bother seeking into the prefix (takes too long).
954 min_pos = offset*2
955 testSeekAndTellWithData(prefix + input, min_pos)
956
957 # Ensure our test decoder won't interfere with subsequent tests.
958 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000959 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000960
961 def testEncodedWrites(self):
962 data = u"1234567890"
963 tests = ("utf-16",
964 "utf-16-le",
965 "utf-16-be",
966 "utf-32",
967 "utf-32-le",
968 "utf-32-be")
969 for encoding in tests:
970 buf = io.BytesIO()
971 f = io.TextIOWrapper(buf, encoding=encoding)
972 # Check if the BOM is written only once (see issue1753).
973 f.write(data)
974 f.write(data)
975 f.seek(0)
976 self.assertEquals(f.read(), data * 2)
977 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
978
979 def timingTest(self):
980 timer = time.time
981 enc = "utf8"
982 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
983 nlines = 10000
984 nchars = len(line)
985 nbytes = len(line.encode(enc))
986 for chunk_size in (32, 64, 128, 256):
987 f = io.open(test_support.TESTFN, "w+", encoding=enc)
988 f._CHUNK_SIZE = chunk_size
989 t0 = timer()
990 for i in range(nlines):
991 f.write(line)
992 f.flush()
993 t1 = timer()
994 f.seek(0)
995 for line in f:
996 pass
997 t2 = timer()
998 f.seek(0)
999 while f.readline():
1000 pass
1001 t3 = timer()
1002 f.seek(0)
1003 while f.readline():
1004 f.tell()
1005 t4 = timer()
1006 f.close()
1007 if test_support.verbose:
1008 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1009 (nlines, nchars, nbytes))
1010 print("File chunk size: %6s" % f._CHUNK_SIZE)
1011 print("Writing: %6.3f seconds" % (t1-t0))
1012 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1013 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1014 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1015
1016 def testReadOneByOne(self):
1017 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1018 reads = ""
1019 while True:
1020 c = txt.read(1)
1021 if not c:
1022 break
1023 reads += c
1024 self.assertEquals(reads, "AA\nBB")
1025
1026 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1027 def testReadByChunk(self):
1028 # make sure "\r\n" straddles 128 char boundary.
1029 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1030 reads = ""
1031 while True:
1032 c = txt.read(128)
1033 if not c:
1034 break
1035 reads += c
1036 self.assertEquals(reads, "A"*127+"\nB")
1037
1038 def test_issue1395_1(self):
1039 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1040
1041 # read one char at a time
1042 reads = ""
1043 while True:
1044 c = txt.read(1)
1045 if not c:
1046 break
1047 reads += c
1048 self.assertEquals(reads, self.normalized)
1049
1050 def test_issue1395_2(self):
1051 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1052 txt._CHUNK_SIZE = 4
1053
1054 reads = ""
1055 while True:
1056 c = txt.read(4)
1057 if not c:
1058 break
1059 reads += c
1060 self.assertEquals(reads, self.normalized)
1061
1062 def test_issue1395_3(self):
1063 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1064 txt._CHUNK_SIZE = 4
1065
1066 reads = txt.read(4)
1067 reads += txt.read(4)
1068 reads += txt.readline()
1069 reads += txt.readline()
1070 reads += txt.readline()
1071 self.assertEquals(reads, self.normalized)
1072
1073 def test_issue1395_4(self):
1074 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1075 txt._CHUNK_SIZE = 4
1076
1077 reads = txt.read(4)
1078 reads += txt.read()
1079 self.assertEquals(reads, self.normalized)
1080
1081 def test_issue1395_5(self):
1082 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1083 txt._CHUNK_SIZE = 4
1084
1085 reads = txt.read(4)
1086 pos = txt.tell()
1087 txt.seek(0)
1088 txt.seek(pos)
1089 self.assertEquals(txt.read(4), "BBB\n")
1090
1091 def test_issue2282(self):
1092 buffer = io.BytesIO(self.testdata)
1093 txt = io.TextIOWrapper(buffer, encoding="ascii")
1094
1095 self.assertEqual(buffer.seekable(), txt.seekable())
1096
1097 def test_newline_decoder(self):
1098 import codecs
1099 decoder = codecs.getincrementaldecoder("utf-8")()
1100 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1101
1102 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1103
1104 self.assertEquals(decoder.decode(b'\xe8'), u"")
1105 self.assertEquals(decoder.decode(b'\xa2'), u"")
1106 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1107
1108 self.assertEquals(decoder.decode(b'\xe8'), u"")
1109 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1110
1111 decoder.setstate((b'', 0))
1112 self.assertEquals(decoder.decode(b'\n'), u"\n")
1113 self.assertEquals(decoder.decode(b'\r'), u"")
1114 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1115 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1116
1117 self.assertEquals(decoder.decode(b'\r'), u"")
1118 self.assertEquals(decoder.decode(b'a'), u"\na")
1119
1120 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1121 self.assertEquals(decoder.decode(b'\r'), u"")
1122 self.assertEquals(decoder.decode(b'\r'), u"\n")
1123 self.assertEquals(decoder.decode(b'\na'), u"\na")
1124
1125 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1126 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1127 self.assertEquals(decoder.decode(b'\n'), u"\n")
1128 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1129 self.assertEquals(decoder.decode(b'\n'), u"\n")
1130
1131 decoder = codecs.getincrementaldecoder("utf-8")()
1132 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1133 self.assertEquals(decoder.newlines, None)
1134 decoder.decode(b"abc\n\r")
1135 self.assertEquals(decoder.newlines, u'\n')
1136 decoder.decode(b"\nabc")
1137 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1138 decoder.decode(b"abc\r")
1139 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1140 decoder.decode(b"abc")
1141 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1142 decoder.decode(b"abc\r")
1143 decoder.reset()
1144 self.assertEquals(decoder.decode(b"abc"), "abc")
1145 self.assertEquals(decoder.newlines, None)
1146
1147# XXX Tests for open()
1148
1149class MiscIOTest(unittest.TestCase):
1150
1151 def testImport__all__(self):
1152 for name in io.__all__:
1153 obj = getattr(io, name, None)
1154 self.assert_(obj is not None, name)
1155 if name == "open":
1156 continue
1157 elif "error" in name.lower():
1158 self.assert_(issubclass(obj, Exception), name)
1159 else:
1160 self.assert_(issubclass(obj, io.IOBase))
1161
1162
1163def test_main():
1164 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001165 BufferedReaderTest, BufferedWriterTest,
1166 BufferedRWPairTest, BufferedRandomTest,
1167 StatefulIncrementalDecoderTest,
1168 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001169
1170if __name__ == "__main__":
1171 unittest.main()