blob: e7579922fa04b35031d5a494ae2ba4aeed1d8aba [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Ezio Melotti1d55ec32010-08-02 23:34:49 +000012from itertools import cycle, count
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
Antoine Pitrouca5a06a2010-01-27 21:48:46 +000090
91 self.assertEqual(f.write(b"blah."), 5)
92 f.truncate(0)
93 self.assertEqual(f.tell(), 5)
94 f.seek(0)
95
Christian Heimes1a6387e2008-03-26 12:49:49 +000096 self.assertEqual(f.write(b"blah."), 5)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"Hello."), 6)
99 self.assertEqual(f.tell(), 6)
100 self.assertEqual(f.seek(-1, 1), 5)
101 self.assertEqual(f.tell(), 5)
102 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
103 self.assertEqual(f.seek(0), 0)
104 self.assertEqual(f.write(b"h"), 1)
105 self.assertEqual(f.seek(-1, 2), 13)
106 self.assertEqual(f.tell(), 13)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000107
Christian Heimes1a6387e2008-03-26 12:49:49 +0000108 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000109 self.assertEqual(f.tell(), 13)
Antoine Pitrouc4006102010-05-15 20:33:07 +0000110 self.assertEqual(f.write(b"hij"), 3)
111 self.assertEqual(f.seek(0,1), 16)
112 self.assertEqual(f.tell(), 16)
113 self.assertEqual(f.truncate(12), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000114 self.assertRaises(TypeError, f.seek, 0.0)
115
116 def read_ops(self, f, buffered=False):
117 data = f.read(5)
118 self.assertEqual(data, b"hello")
119 data = bytearray(data)
120 self.assertEqual(f.readinto(data), 5)
121 self.assertEqual(data, b" worl")
122 self.assertEqual(f.readinto(data), 2)
123 self.assertEqual(len(data), 5)
124 self.assertEqual(data[:2], b"d\n")
125 self.assertEqual(f.seek(0), 0)
126 self.assertEqual(f.read(20), b"hello world\n")
127 self.assertEqual(f.read(1), b"")
128 self.assertEqual(f.readinto(bytearray(b"x")), 0)
129 self.assertEqual(f.seek(-6, 2), 6)
130 self.assertEqual(f.read(5), b"world")
131 self.assertEqual(f.read(0), b"")
132 self.assertEqual(f.readinto(bytearray()), 0)
133 self.assertEqual(f.seek(-6, 1), 5)
134 self.assertEqual(f.read(5), b" worl")
135 self.assertEqual(f.tell(), 10)
Antoine Pitrouc4006102010-05-15 20:33:07 +0000136 f.seek(0)
137 f.read(2)
138 f.seek(0, 1)
139 self.assertEqual(f.tell(), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000140 self.assertRaises(TypeError, f.seek, 0.0)
141 if buffered:
142 f.seek(0)
143 self.assertEqual(f.read(), b"hello world\n")
144 f.seek(6)
145 self.assertEqual(f.read(), b"world\n")
146 self.assertEqual(f.read(), b"")
147
148 LARGE = 2**31
149
150 def large_file_ops(self, f):
151 assert f.readable()
152 assert f.writable()
153 self.assertEqual(f.seek(self.LARGE), self.LARGE)
154 self.assertEqual(f.tell(), self.LARGE)
155 self.assertEqual(f.write(b"xxx"), 3)
156 self.assertEqual(f.tell(), self.LARGE + 3)
157 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
158 self.assertEqual(f.truncate(), self.LARGE + 2)
159 self.assertEqual(f.tell(), self.LARGE + 2)
160 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
161 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouca5a06a2010-01-27 21:48:46 +0000162 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
164 self.assertEqual(f.seek(-1, 2), self.LARGE)
165 self.assertEqual(f.read(2), b"x")
166
167 def test_raw_file_io(self):
168 f = io.open(test_support.TESTFN, "wb", buffering=0)
169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
174 f = io.open(test_support.TESTFN, "rb", buffering=0)
175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
178 self.read_ops(f)
179 f.close()
180
181 def test_buffered_file_io(self):
182 f = io.open(test_support.TESTFN, "wb")
183 self.assertEqual(f.readable(), False)
184 self.assertEqual(f.writable(), True)
185 self.assertEqual(f.seekable(), True)
186 self.write_ops(f)
187 f.close()
188 f = io.open(test_support.TESTFN, "rb")
189 self.assertEqual(f.readable(), True)
190 self.assertEqual(f.writable(), False)
191 self.assertEqual(f.seekable(), True)
192 self.read_ops(f, True)
Antoine Pitrouc4006102010-05-15 20:33:07 +0000193 f = io.open(test_support.TESTFN, "r+b")
194 self.assertEqual(f.readable(), True)
195 self.assertEqual(f.writable(), True)
196 self.assertEqual(f.seekable(), True)
197 self.write_ops(f)
198 f.seek(0)
199 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200 f.close()
201
202 def test_readline(self):
203 f = io.open(test_support.TESTFN, "wb")
204 f.write(b"abc\ndef\nxyzzy\nfoo")
205 f.close()
206 f = io.open(test_support.TESTFN, "rb")
207 self.assertEqual(f.readline(), b"abc\n")
208 self.assertEqual(f.readline(10), b"def\n")
209 self.assertEqual(f.readline(2), b"xy")
210 self.assertEqual(f.readline(4), b"zzy\n")
211 self.assertEqual(f.readline(), b"foo")
212 f.close()
213
214 def test_raw_bytes_io(self):
215 f = io.BytesIO()
216 self.write_ops(f)
217 data = f.getvalue()
218 self.assertEqual(data, b"hello world\n")
219 f = io.BytesIO(data)
220 self.read_ops(f, True)
221
222 def test_large_file_ops(self):
223 # On Windows and Mac OSX this test comsumes large resources; It takes
224 # a long time to build the >2GB file and takes >2GB of disk space
225 # therefore the resource must be enabled to run this test.
Andrew MacIntyre41c56b52008-09-22 14:23:45 +0000226 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000227 if not test_support.is_resource_enabled("largefile"):
228 print("\nTesting large file ops skipped on %s." % sys.platform,
229 file=sys.stderr)
230 print("It requires %d bytes and a long time." % self.LARGE,
231 file=sys.stderr)
232 print("Use 'regrtest.py -u largefile test_io' to run it.",
233 file=sys.stderr)
234 return
235 f = io.open(test_support.TESTFN, "w+b", 0)
236 self.large_file_ops(f)
237 f.close()
238 f = io.open(test_support.TESTFN, "w+b")
239 self.large_file_ops(f)
240 f.close()
241
242 def test_with_open(self):
243 for bufsize in (0, 1, 100):
244 f = None
245 with open(test_support.TESTFN, "wb", bufsize) as f:
246 f.write(b"xxx")
247 self.assertEqual(f.closed, True)
248 f = None
249 try:
250 with open(test_support.TESTFN, "wb", bufsize) as f:
Ezio Melotti3efafd72010-08-02 18:40:55 +0000251 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000252 except ZeroDivisionError:
253 self.assertEqual(f.closed, True)
254 else:
Ezio Melotti3efafd72010-08-02 18:40:55 +0000255 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000256
Antoine Pitrou19fec8b2009-01-21 00:56:37 +0000257 # issue 5008
258 def test_append_mode_tell(self):
259 with io.open(test_support.TESTFN, "wb") as f:
260 f.write(b"xxx")
261 with io.open(test_support.TESTFN, "ab", buffering=0) as f:
262 self.assertEqual(f.tell(), 3)
263 with io.open(test_support.TESTFN, "ab") as f:
264 self.assertEqual(f.tell(), 3)
265 with io.open(test_support.TESTFN, "a") as f:
266 self.assert_(f.tell() > 0)
267
Christian Heimes1a6387e2008-03-26 12:49:49 +0000268 def test_destructor(self):
269 record = []
270 class MyFileIO(io.FileIO):
271 def __del__(self):
272 record.append(1)
273 io.FileIO.__del__(self)
274 def close(self):
275 record.append(2)
276 io.FileIO.close(self)
277 def flush(self):
278 record.append(3)
279 io.FileIO.flush(self)
280 f = MyFileIO(test_support.TESTFN, "w")
281 f.write("xxx")
282 del f
283 self.assertEqual(record, [1, 2, 3])
284
285 def test_close_flushes(self):
286 f = io.open(test_support.TESTFN, "wb")
287 f.write(b"xxx")
288 f.close()
289 f = io.open(test_support.TESTFN, "rb")
290 self.assertEqual(f.read(), b"xxx")
291 f.close()
292
293 def XXXtest_array_writes(self):
294 # XXX memory view not available yet
295 a = array.array('i', range(10))
296 n = len(memoryview(a))
297 f = io.open(test_support.TESTFN, "wb", 0)
298 self.assertEqual(f.write(a), n)
299 f.close()
300 f = io.open(test_support.TESTFN, "wb")
301 self.assertEqual(f.write(a), n)
302 f.close()
303
304 def test_closefd(self):
305 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
306 closefd=False)
307
Georg Brandld2094602008-12-05 08:51:30 +0000308 def testReadClosed(self):
309 with io.open(test_support.TESTFN, "w") as f:
310 f.write("egg\n")
311 with io.open(test_support.TESTFN, "r") as f:
312 file = io.open(f.fileno(), "r", closefd=False)
313 self.assertEqual(file.read(), "egg\n")
314 file.seek(0)
315 file.close()
316 self.assertRaises(ValueError, file.read)
317
318 def test_no_closefd_with_filename(self):
319 # can't use closefd in combination with a file name
320 self.assertRaises(ValueError,
321 io.open, test_support.TESTFN, "r", closefd=False)
322
323 def test_closefd_attr(self):
324 with io.open(test_support.TESTFN, "wb") as f:
325 f.write(b"egg\n")
326 with io.open(test_support.TESTFN, "r") as f:
327 self.assertEqual(f.buffer.raw.closefd, True)
328 file = io.open(f.fileno(), "r", closefd=False)
329 self.assertEqual(file.buffer.raw.closefd, False)
330
Antoine Pitrou01a255a2010-05-03 16:48:13 +0000331 def test_flush_error_on_close(self):
332 f = io.open(test_support.TESTFN, "wb", buffering=0)
333 def bad_flush():
334 raise IOError()
335 f.flush = bad_flush
336 self.assertRaises(IOError, f.close) # exception not swallowed
337
338 def test_multi_close(self):
339 f = io.open(test_support.TESTFN, "wb", buffering=0)
340 f.close()
341 f.close()
342 f.close()
343 self.assertRaises(ValueError, f.flush)
344
Georg Brandld2094602008-12-05 08:51:30 +0000345
Christian Heimes1a6387e2008-03-26 12:49:49 +0000346class MemorySeekTestMixin:
347
348 def testInit(self):
349 buf = self.buftype("1234567890")
350 bytesIo = self.ioclass(buf)
351
352 def testRead(self):
353 buf = self.buftype("1234567890")
354 bytesIo = self.ioclass(buf)
355
356 self.assertEquals(buf[:1], bytesIo.read(1))
357 self.assertEquals(buf[1:5], bytesIo.read(4))
358 self.assertEquals(buf[5:], bytesIo.read(900))
359 self.assertEquals(self.EOF, bytesIo.read())
360
361 def testReadNoArgs(self):
362 buf = self.buftype("1234567890")
363 bytesIo = self.ioclass(buf)
364
365 self.assertEquals(buf, bytesIo.read())
366 self.assertEquals(self.EOF, bytesIo.read())
367
368 def testSeek(self):
369 buf = self.buftype("1234567890")
370 bytesIo = self.ioclass(buf)
371
372 bytesIo.read(5)
373 bytesIo.seek(0)
374 self.assertEquals(buf, bytesIo.read())
375
376 bytesIo.seek(3)
377 self.assertEquals(buf[3:], bytesIo.read())
378 self.assertRaises(TypeError, bytesIo.seek, 0.0)
379
380 def testTell(self):
381 buf = self.buftype("1234567890")
382 bytesIo = self.ioclass(buf)
383
384 self.assertEquals(0, bytesIo.tell())
385 bytesIo.seek(5)
386 self.assertEquals(5, bytesIo.tell())
387 bytesIo.seek(10000)
388 self.assertEquals(10000, bytesIo.tell())
389
390
391class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
392 @staticmethod
393 def buftype(s):
394 return s.encode("utf-8")
395 ioclass = io.BytesIO
396 EOF = b""
397
398
399class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
400 buftype = str
401 ioclass = io.StringIO
402 EOF = ""
403
404
405class BufferedReaderTest(unittest.TestCase):
406
407 def testRead(self):
408 rawio = MockRawIO((b"abc", b"d", b"efg"))
409 bufio = io.BufferedReader(rawio)
410
411 self.assertEquals(b"abcdef", bufio.read(6))
412
413 def testBuffering(self):
414 data = b"abcdefghi"
415 dlen = len(data)
416
417 tests = [
418 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
419 [ 100, [ 3, 3, 3], [ dlen ] ],
420 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
421 ]
422
423 for bufsize, buf_read_sizes, raw_read_sizes in tests:
424 rawio = MockFileIO(data)
425 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
426 pos = 0
427 for nbytes in buf_read_sizes:
428 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
429 pos += nbytes
430 self.assertEquals(rawio.read_history, raw_read_sizes)
431
432 def testReadNonBlocking(self):
433 # Inject some None's in there to simulate EWOULDBLOCK
434 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
435 bufio = io.BufferedReader(rawio)
436
437 self.assertEquals(b"abcd", bufio.read(6))
438 self.assertEquals(b"e", bufio.read(1))
439 self.assertEquals(b"fg", bufio.read())
440 self.assert_(None is bufio.read())
441 self.assertEquals(b"", bufio.read())
442
443 def testReadToEof(self):
444 rawio = MockRawIO((b"abc", b"d", b"efg"))
445 bufio = io.BufferedReader(rawio)
446
447 self.assertEquals(b"abcdefg", bufio.read(9000))
448
449 def testReadNoArgs(self):
450 rawio = MockRawIO((b"abc", b"d", b"efg"))
451 bufio = io.BufferedReader(rawio)
452
453 self.assertEquals(b"abcdefg", bufio.read())
454
455 def testFileno(self):
456 rawio = MockRawIO((b"abc", b"d", b"efg"))
457 bufio = io.BufferedReader(rawio)
458
459 self.assertEquals(42, bufio.fileno())
460
461 def testFilenoNoFileno(self):
462 # XXX will we always have fileno() function? If so, kill
463 # this test. Else, write it.
464 pass
465
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000466 def testThreads(self):
467 try:
468 # Write out many bytes with exactly the same number of 0's,
469 # 1's... 255's. This will help us check that concurrent reading
470 # doesn't duplicate or forget contents.
471 N = 1000
472 l = range(256) * N
473 random.shuffle(l)
474 s = bytes(bytearray(l))
475 with io.open(test_support.TESTFN, "wb") as f:
476 f.write(s)
477 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
478 bufio = io.BufferedReader(raw, 8)
479 errors = []
480 results = []
481 def f():
482 try:
483 # Intra-buffer read then buffer-flushing read
484 for n in cycle([1, 19]):
485 s = bufio.read(n)
486 if not s:
487 break
488 # list.append() is atomic
489 results.append(s)
490 except Exception as e:
491 errors.append(e)
492 raise
493 threads = [threading.Thread(target=f) for x in range(20)]
494 for t in threads:
495 t.start()
496 time.sleep(0.02) # yield
497 for t in threads:
498 t.join()
499 self.assertFalse(errors,
500 "the following exceptions were caught: %r" % errors)
501 s = b''.join(results)
502 for i in range(256):
503 c = bytes(bytearray([i]))
504 self.assertEqual(s.count(c), N)
505 finally:
506 test_support.unlink(test_support.TESTFN)
507
508
Christian Heimes1a6387e2008-03-26 12:49:49 +0000509
510class BufferedWriterTest(unittest.TestCase):
511
512 def testWrite(self):
513 # Write to the buffered IO but don't overflow the buffer.
514 writer = MockRawIO()
515 bufio = io.BufferedWriter(writer, 8)
516
517 bufio.write(b"abc")
518
519 self.assertFalse(writer._write_stack)
520
521 def testWriteOverflow(self):
522 writer = MockRawIO()
523 bufio = io.BufferedWriter(writer, 8)
524
525 bufio.write(b"abc")
526 bufio.write(b"defghijkl")
527
528 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
529
530 def testWriteNonBlocking(self):
Antoine Pitrouc4006102010-05-15 20:33:07 +0000531 raw = MockNonBlockWriterIO((9, 2, 10, -6, 10, 8, 12))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000532 bufio = io.BufferedWriter(raw, 8, 16)
533
534 bufio.write(b"asdf")
535 bufio.write(b"asdfa")
536 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
537
538 bufio.write(b"asdfasdfasdf")
539 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
540 bufio.write(b"asdfasdfasdf")
541 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
542 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
543
544 bufio.write(b"asdfasdfasdf")
545
546 # XXX I don't like this test. It relies too heavily on how the
547 # algorithm actually works, which we might change. Refactor
548 # later.
549
550 def testFileno(self):
551 rawio = MockRawIO((b"abc", b"d", b"efg"))
552 bufio = io.BufferedWriter(rawio)
553
554 self.assertEquals(42, bufio.fileno())
555
556 def testFlush(self):
557 writer = MockRawIO()
558 bufio = io.BufferedWriter(writer, 8)
559
560 bufio.write(b"abc")
561 bufio.flush()
562
563 self.assertEquals(b"abc", writer._write_stack[0])
564
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000565 def testThreads(self):
566 # BufferedWriter should not raise exceptions or crash
567 # when called from multiple threads.
568 try:
569 # We use a real file object because it allows us to
570 # exercise situations where the GIL is released before
571 # writing the buffer to the raw streams. This is in addition
572 # to concurrency issues due to switching threads in the middle
573 # of Python code.
574 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
575 bufio = io.BufferedWriter(raw, 8)
576 errors = []
577 def f():
578 try:
579 # Write enough bytes to flush the buffer
580 s = b"a" * 19
581 for i in range(50):
582 bufio.write(s)
583 except Exception as e:
584 errors.append(e)
585 raise
586 threads = [threading.Thread(target=f) for x in range(20)]
587 for t in threads:
588 t.start()
589 time.sleep(0.02) # yield
590 for t in threads:
591 t.join()
592 self.assertFalse(errors,
593 "the following exceptions were caught: %r" % errors)
594 finally:
595 test_support.unlink(test_support.TESTFN)
596
Antoine Pitrou01a255a2010-05-03 16:48:13 +0000597 def test_flush_error_on_close(self):
598 raw = MockRawIO()
599 def bad_flush():
600 raise IOError()
601 raw.flush = bad_flush
602 b = io.BufferedWriter(raw)
603 self.assertRaises(IOError, b.close) # exception not swallowed
604
605 def test_multi_close(self):
606 raw = MockRawIO()
607 b = io.BufferedWriter(raw)
608 b.close()
609 b.close()
610 b.close()
611 self.assertRaises(ValueError, b.flush)
612
Christian Heimes1a6387e2008-03-26 12:49:49 +0000613
614class BufferedRWPairTest(unittest.TestCase):
615
616 def testRWPair(self):
617 r = MockRawIO(())
618 w = MockRawIO()
619 pair = io.BufferedRWPair(r, w)
Benjamin Peterson828a7062008-12-27 17:05:29 +0000620 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000621
Benjamin Peterson828a7062008-12-27 17:05:29 +0000622 # XXX More Tests
Christian Heimes1a6387e2008-03-26 12:49:49 +0000623
624
625class BufferedRandomTest(unittest.TestCase):
626
627 def testReadAndWrite(self):
628 raw = MockRawIO((b"asdf", b"ghjk"))
629 rw = io.BufferedRandom(raw, 8, 12)
630
631 self.assertEqual(b"as", rw.read(2))
632 rw.write(b"ddd")
633 rw.write(b"eee")
634 self.assertFalse(raw._write_stack) # Buffer writes
635 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
636 self.assertEquals(b"dddeee", raw._write_stack[0])
637
638 def testSeekAndTell(self):
639 raw = io.BytesIO(b"asdfghjkl")
640 rw = io.BufferedRandom(raw)
641
642 self.assertEquals(b"as", rw.read(2))
643 self.assertEquals(2, rw.tell())
644 rw.seek(0, 0)
645 self.assertEquals(b"asdf", rw.read(4))
646
647 rw.write(b"asdf")
648 rw.seek(0, 0)
649 self.assertEquals(b"asdfasdfl", rw.read())
650 self.assertEquals(9, rw.tell())
651 rw.seek(-4, 2)
652 self.assertEquals(5, rw.tell())
653 rw.seek(2, 1)
654 self.assertEquals(7, rw.tell())
655 self.assertEquals(b"fl", rw.read(11))
656 self.assertRaises(TypeError, rw.seek, 0.0)
657
658# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
659# properties:
660# - A single output character can correspond to many bytes of input.
661# - The number of input bytes to complete the character can be
662# undetermined until the last input byte is received.
663# - The number of input bytes can vary depending on previous input.
664# - A single input byte can correspond to many characters of output.
665# - The number of output characters can be undetermined until the
666# last input byte is received.
667# - The number of output characters can vary depending on previous input.
668
669class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
670 """
671 For testing seek/tell behavior with a stateful, buffering decoder.
672
673 Input is a sequence of words. Words may be fixed-length (length set
674 by input) or variable-length (period-terminated). In variable-length
675 mode, extra periods are ignored. Possible words are:
676 - 'i' followed by a number sets the input length, I (maximum 99).
677 When I is set to 0, words are space-terminated.
678 - 'o' followed by a number sets the output length, O (maximum 99).
679 - Any other word is converted into a word followed by a period on
680 the output. The output word consists of the input word truncated
681 or padded out with hyphens to make its length equal to O. If O
682 is 0, the word is output verbatim without truncating or padding.
683 I and O are initially set to 1. When I changes, any buffered input is
684 re-scanned according to the new I. EOF also terminates the last word.
685 """
686
687 def __init__(self, errors='strict'):
688 codecs.IncrementalDecoder.__init__(self, errors)
689 self.reset()
690
691 def __repr__(self):
692 return '<SID %x>' % id(self)
693
694 def reset(self):
695 self.i = 1
696 self.o = 1
697 self.buffer = bytearray()
698
699 def getstate(self):
700 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
701 return bytes(self.buffer), i*100 + o
702
703 def setstate(self, state):
704 buffer, io = state
705 self.buffer = bytearray(buffer)
706 i, o = divmod(io, 100)
707 self.i, self.o = i ^ 1, o ^ 1
708
709 def decode(self, input, final=False):
710 output = ''
711 for b in input:
712 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000713 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714 if self.buffer:
715 output += self.process_word()
716 else:
717 self.buffer.append(b)
718 else: # fixed-length, terminate after self.i bytes
719 self.buffer.append(b)
720 if len(self.buffer) == self.i:
721 output += self.process_word()
722 if final and self.buffer: # EOF terminates the last word
723 output += self.process_word()
724 return output
725
726 def process_word(self):
727 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000728 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000730 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000731 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
732 else:
733 output = self.buffer.decode('ascii')
734 if len(output) < self.o:
735 output += '-'*self.o # pad out with hyphens
736 if self.o:
737 output = output[:self.o] # truncate to output length
738 output += '.'
739 self.buffer = bytearray()
740 return output
741
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000742 codecEnabled = False
743
744 @classmethod
745 def lookupTestDecoder(cls, name):
746 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000747 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000748 return codecs.CodecInfo(
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000749 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000750 incrementalencoder=None,
751 streamreader=None, streamwriter=None,
752 incrementaldecoder=cls)
753
754# Register the previous decoder for testing.
755# Disabled by default, tests will enable it.
756codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
757
758
Christian Heimes1a6387e2008-03-26 12:49:49 +0000759class StatefulIncrementalDecoderTest(unittest.TestCase):
760 """
761 Make sure the StatefulIncrementalDecoder actually works.
762 """
763
764 test_cases = [
765 # I=1, O=1 (fixed-length input == fixed-length output)
766 (b'abcd', False, 'a.b.c.d.'),
767 # I=0, O=0 (variable-length input, variable-length output)
768 (b'oiabcd', True, 'abcd.'),
769 # I=0, O=0 (should ignore extra periods)
770 (b'oi...abcd...', True, 'abcd.'),
771 # I=0, O=6 (variable-length input, fixed-length output)
772 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
773 # I=2, O=6 (fixed-length input < fixed-length output)
774 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
775 # I=6, O=3 (fixed-length input > fixed-length output)
776 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
777 # I=0, then 3; O=29, then 15 (with longer output)
778 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
779 'a----------------------------.' +
780 'b----------------------------.' +
781 'cde--------------------------.' +
782 'abcdefghijabcde.' +
783 'a.b------------.' +
784 '.c.------------.' +
785 'd.e------------.' +
786 'k--------------.' +
787 'l--------------.' +
788 'm--------------.')
789 ]
790
791 def testDecoder(self):
792 # Try a few one-shot test cases.
793 for input, eof, output in self.test_cases:
794 d = StatefulIncrementalDecoder()
795 self.assertEquals(d.decode(input, eof), output)
796
797 # Also test an unfinished decode, followed by forcing EOF.
798 d = StatefulIncrementalDecoder()
799 self.assertEquals(d.decode(b'oiabcd'), '')
800 self.assertEquals(d.decode(b'', 1), 'abcd.')
801
Victor Stinner8243ddb2010-07-28 01:58:41 +0000802 def test_append_bom(self):
803 # The BOM is not written again when appending to a non-empty file
804 filename = test_support.TESTFN
805 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
806 with io.open(filename, 'w', encoding=charset) as f:
807 f.write('aaa')
808 pos = f.tell()
809 with io.open(filename, 'rb') as f:
810 self.assertEquals(f.read(), 'aaa'.encode(charset))
811
812 with io.open(filename, 'a', encoding=charset) as f:
813 f.write('xxx')
814 with io.open(filename, 'rb') as f:
815 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
816
817 def test_seek_bom(self):
818 # Same test, but when seeking manually
819 filename = test_support.TESTFN
820 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
821 with io.open(filename, 'w', encoding=charset) as f:
822 f.write('aaa')
823 pos = f.tell()
824 with io.open(filename, 'r+', encoding=charset) as f:
825 f.seek(pos)
826 f.write('zzz')
827 f.seek(0)
828 f.write('bbb')
829 with io.open(filename, 'rb') as f:
830 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
831
832
Christian Heimes1a6387e2008-03-26 12:49:49 +0000833class TextIOWrapperTest(unittest.TestCase):
834
835 def setUp(self):
836 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
837 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
838
839 def tearDown(self):
840 test_support.unlink(test_support.TESTFN)
841
842 def testLineBuffering(self):
843 r = io.BytesIO()
844 b = io.BufferedWriter(r, 1000)
845 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
846 t.write(u"X")
847 self.assertEquals(r.getvalue(), b"") # No flush happened
848 t.write(u"Y\nZ")
849 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
850 t.write(u"A\rB")
851 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
852
853 def testEncodingErrorsReading(self):
854 # (1) default
855 b = io.BytesIO(b"abc\n\xff\n")
856 t = io.TextIOWrapper(b, encoding="ascii")
857 self.assertRaises(UnicodeError, t.read)
858 # (2) explicit strict
859 b = io.BytesIO(b"abc\n\xff\n")
860 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
861 self.assertRaises(UnicodeError, t.read)
862 # (3) ignore
863 b = io.BytesIO(b"abc\n\xff\n")
864 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
865 self.assertEquals(t.read(), "abc\n\n")
866 # (4) replace
867 b = io.BytesIO(b"abc\n\xff\n")
868 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
869 self.assertEquals(t.read(), u"abc\n\ufffd\n")
870
871 def testEncodingErrorsWriting(self):
872 # (1) default
873 b = io.BytesIO()
874 t = io.TextIOWrapper(b, encoding="ascii")
875 self.assertRaises(UnicodeError, t.write, u"\xff")
876 # (2) explicit strict
877 b = io.BytesIO()
878 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
879 self.assertRaises(UnicodeError, t.write, u"\xff")
880 # (3) ignore
881 b = io.BytesIO()
882 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
883 newline="\n")
884 t.write(u"abc\xffdef\n")
885 t.flush()
886 self.assertEquals(b.getvalue(), b"abcdef\n")
887 # (4) replace
888 b = io.BytesIO()
889 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
890 newline="\n")
891 t.write(u"abc\xffdef\n")
892 t.flush()
893 self.assertEquals(b.getvalue(), b"abc?def\n")
894
895 def testNewlinesInput(self):
896 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
897 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
898 for newline, expected in [
899 (None, normalized.decode("ascii").splitlines(True)),
900 ("", testdata.decode("ascii").splitlines(True)),
901 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
902 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
903 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
904 ]:
905 buf = io.BytesIO(testdata)
906 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
907 self.assertEquals(txt.readlines(), expected)
908 txt.seek(0)
909 self.assertEquals(txt.read(), "".join(expected))
910
911 def testNewlinesOutput(self):
912 testdict = {
913 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
914 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
915 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
916 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
917 }
918 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
919 for newline, expected in tests:
920 buf = io.BytesIO()
921 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
922 txt.write("AAA\nB")
923 txt.write("BB\nCCC\n")
924 txt.write("X\rY\r\nZ")
925 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000926 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000927 self.assertEquals(buf.getvalue(), expected)
928
929 def testNewlines(self):
930 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
931
932 tests = [
933 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
934 [ '', input_lines ],
935 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
936 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
937 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
938 ]
Antoine Pitrouf8638a82008-12-14 18:08:37 +0000939 encodings = (
940 'utf-8', 'latin-1',
941 'utf-16', 'utf-16-le', 'utf-16-be',
942 'utf-32', 'utf-32-le', 'utf-32-be',
943 )
Christian Heimes1a6387e2008-03-26 12:49:49 +0000944
945 # Try a range of buffer sizes to test the case where \r is the last
946 # character in TextIOWrapper._pending_line.
947 for encoding in encodings:
948 # XXX: str.encode() should return bytes
949 data = bytes(''.join(input_lines).encode(encoding))
950 for do_reads in (False, True):
951 for bufsize in range(1, 10):
952 for newline, exp_lines in tests:
953 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
954 textio = io.TextIOWrapper(bufio, newline=newline,
955 encoding=encoding)
956 if do_reads:
957 got_lines = []
958 while True:
959 c2 = textio.read(2)
960 if c2 == '':
961 break
962 self.assertEquals(len(c2), 2)
963 got_lines.append(c2 + textio.readline())
964 else:
965 got_lines = list(textio)
966
967 for got_line, exp_line in zip(got_lines, exp_lines):
968 self.assertEquals(got_line, exp_line)
969 self.assertEquals(len(got_lines), len(exp_lines))
970
971 def testNewlinesInput(self):
972 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
973 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
974 for newline, expected in [
975 (None, normalized.decode("ascii").splitlines(True)),
976 ("", testdata.decode("ascii").splitlines(True)),
977 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
978 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
979 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
980 ]:
981 buf = io.BytesIO(testdata)
982 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
983 self.assertEquals(txt.readlines(), expected)
984 txt.seek(0)
985 self.assertEquals(txt.read(), "".join(expected))
986
987 def testNewlinesOutput(self):
988 data = u"AAA\nBBB\rCCC\n"
989 data_lf = b"AAA\nBBB\rCCC\n"
990 data_cr = b"AAA\rBBB\rCCC\r"
991 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
992 save_linesep = os.linesep
993 try:
994 for os.linesep, newline, expected in [
995 ("\n", None, data_lf),
996 ("\r\n", None, data_crlf),
997 ("\n", "", data_lf),
998 ("\r\n", "", data_lf),
999 ("\n", "\n", data_lf),
1000 ("\r\n", "\n", data_lf),
1001 ("\n", "\r", data_cr),
1002 ("\r\n", "\r", data_cr),
1003 ("\n", "\r\n", data_crlf),
1004 ("\r\n", "\r\n", data_crlf),
1005 ]:
1006 buf = io.BytesIO()
1007 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
1008 txt.write(data)
1009 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +00001010 self.assertEquals(buf.closed, True)
1011 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001012 finally:
1013 os.linesep = save_linesep
1014
1015 # Systematic tests of the text I/O API
1016
1017 def testBasicIO(self):
1018 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1019 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1020 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1021 f._CHUNK_SIZE = chunksize
1022 self.assertEquals(f.write(u"abc"), 3)
1023 f.close()
1024 f = io.open(test_support.TESTFN, "r+", encoding=enc)
1025 f._CHUNK_SIZE = chunksize
1026 self.assertEquals(f.tell(), 0)
1027 self.assertEquals(f.read(), u"abc")
1028 cookie = f.tell()
1029 self.assertEquals(f.seek(0), 0)
1030 self.assertEquals(f.read(2), u"ab")
1031 self.assertEquals(f.read(1), u"c")
1032 self.assertEquals(f.read(1), u"")
1033 self.assertEquals(f.read(), u"")
1034 self.assertEquals(f.tell(), cookie)
1035 self.assertEquals(f.seek(0), 0)
1036 self.assertEquals(f.seek(0, 2), cookie)
1037 self.assertEquals(f.write(u"def"), 3)
1038 self.assertEquals(f.seek(cookie), cookie)
1039 self.assertEquals(f.read(), u"def")
1040 if enc.startswith("utf"):
1041 self.multi_line_test(f, enc)
1042 f.close()
1043
1044 def multi_line_test(self, f, enc):
1045 f.seek(0)
1046 f.truncate()
1047 sample = u"s\xff\u0fff\uffff"
1048 wlines = []
1049 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1050 chars = []
1051 for i in range(size):
1052 chars.append(sample[i % len(sample)])
1053 line = u"".join(chars) + u"\n"
1054 wlines.append((f.tell(), line))
1055 f.write(line)
1056 f.seek(0)
1057 rlines = []
1058 while True:
1059 pos = f.tell()
1060 line = f.readline()
1061 if not line:
1062 break
1063 rlines.append((pos, line))
1064 self.assertEquals(rlines, wlines)
1065
1066 def testTelling(self):
1067 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
1068 p0 = f.tell()
1069 f.write(u"\xff\n")
1070 p1 = f.tell()
1071 f.write(u"\xff\n")
1072 p2 = f.tell()
1073 f.seek(0)
1074 self.assertEquals(f.tell(), p0)
1075 self.assertEquals(f.readline(), u"\xff\n")
1076 self.assertEquals(f.tell(), p1)
1077 self.assertEquals(f.readline(), u"\xff\n")
1078 self.assertEquals(f.tell(), p2)
1079 f.seek(0)
1080 for line in f:
1081 self.assertEquals(line, u"\xff\n")
1082 self.assertRaises(IOError, f.tell)
1083 self.assertEquals(f.tell(), p2)
1084 f.close()
1085
1086 def testSeeking(self):
1087 chunk_size = io.TextIOWrapper._CHUNK_SIZE
1088 prefix_size = chunk_size - 2
1089 u_prefix = "a" * prefix_size
1090 prefix = bytes(u_prefix.encode("utf-8"))
1091 self.assertEquals(len(u_prefix), len(prefix))
1092 u_suffix = "\u8888\n"
1093 suffix = bytes(u_suffix.encode("utf-8"))
1094 line = prefix + suffix
1095 f = io.open(test_support.TESTFN, "wb")
1096 f.write(line*2)
1097 f.close()
1098 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1099 s = f.read(prefix_size)
1100 self.assertEquals(s, unicode(prefix, "ascii"))
1101 self.assertEquals(f.tell(), prefix_size)
1102 self.assertEquals(f.readline(), u_suffix)
1103
1104 def testSeekingToo(self):
1105 # Regression test for a specific bug
1106 data = b'\xe0\xbf\xbf\n'
1107 f = io.open(test_support.TESTFN, "wb")
1108 f.write(data)
1109 f.close()
1110 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
1111 f._CHUNK_SIZE # Just test that it exists
1112 f._CHUNK_SIZE = 2
1113 f.readline()
1114 f.tell()
1115
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001116 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001117 """Test seek/tell using the StatefulIncrementalDecoder."""
1118
Christian Heimes1a6387e2008-03-26 12:49:49 +00001119 def testSeekAndTellWithData(data, min_pos=0):
1120 """Tell/seek to various points within a data stream and ensure
1121 that the decoded data returned by read() is consistent."""
1122 f = io.open(test_support.TESTFN, 'wb')
1123 f.write(data)
1124 f.close()
1125 f = io.open(test_support.TESTFN, encoding='test_decoder')
1126 decoded = f.read()
1127 f.close()
1128
1129 for i in range(min_pos, len(decoded) + 1): # seek positions
1130 for j in [1, 5, len(decoded) - i]: # read lengths
1131 f = io.open(test_support.TESTFN, encoding='test_decoder')
1132 self.assertEquals(f.read(i), decoded[:i])
1133 cookie = f.tell()
1134 self.assertEquals(f.read(j), decoded[i:i + j])
1135 f.seek(cookie)
1136 self.assertEquals(f.read(), decoded[i:])
1137 f.close()
1138
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001139 # Enable the test decoder.
1140 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001141
1142 # Run the tests.
1143 try:
1144 # Try each test case.
1145 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1146 testSeekAndTellWithData(input)
1147
1148 # Position each test case so that it crosses a chunk boundary.
1149 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1150 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1151 offset = CHUNK_SIZE - len(input)//2
1152 prefix = b'.'*offset
1153 # Don't bother seeking into the prefix (takes too long).
1154 min_pos = offset*2
1155 testSeekAndTellWithData(prefix + input, min_pos)
1156
1157 # Ensure our test decoder won't interfere with subsequent tests.
1158 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001159 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001160
1161 def testEncodedWrites(self):
1162 data = u"1234567890"
1163 tests = ("utf-16",
1164 "utf-16-le",
1165 "utf-16-be",
1166 "utf-32",
1167 "utf-32-le",
1168 "utf-32-be")
1169 for encoding in tests:
1170 buf = io.BytesIO()
1171 f = io.TextIOWrapper(buf, encoding=encoding)
1172 # Check if the BOM is written only once (see issue1753).
1173 f.write(data)
1174 f.write(data)
1175 f.seek(0)
1176 self.assertEquals(f.read(), data * 2)
1177 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1178
1179 def timingTest(self):
1180 timer = time.time
1181 enc = "utf8"
1182 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1183 nlines = 10000
1184 nchars = len(line)
1185 nbytes = len(line.encode(enc))
1186 for chunk_size in (32, 64, 128, 256):
1187 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1188 f._CHUNK_SIZE = chunk_size
1189 t0 = timer()
1190 for i in range(nlines):
1191 f.write(line)
1192 f.flush()
1193 t1 = timer()
1194 f.seek(0)
1195 for line in f:
1196 pass
1197 t2 = timer()
1198 f.seek(0)
1199 while f.readline():
1200 pass
1201 t3 = timer()
1202 f.seek(0)
1203 while f.readline():
1204 f.tell()
1205 t4 = timer()
1206 f.close()
1207 if test_support.verbose:
1208 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1209 (nlines, nchars, nbytes))
1210 print("File chunk size: %6s" % f._CHUNK_SIZE)
1211 print("Writing: %6.3f seconds" % (t1-t0))
1212 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1213 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1214 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1215
1216 def testReadOneByOne(self):
1217 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1218 reads = ""
1219 while True:
1220 c = txt.read(1)
1221 if not c:
1222 break
1223 reads += c
1224 self.assertEquals(reads, "AA\nBB")
1225
1226 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1227 def testReadByChunk(self):
1228 # make sure "\r\n" straddles 128 char boundary.
1229 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1230 reads = ""
1231 while True:
1232 c = txt.read(128)
1233 if not c:
1234 break
1235 reads += c
1236 self.assertEquals(reads, "A"*127+"\nB")
1237
1238 def test_issue1395_1(self):
1239 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1240
1241 # read one char at a time
1242 reads = ""
1243 while True:
1244 c = txt.read(1)
1245 if not c:
1246 break
1247 reads += c
1248 self.assertEquals(reads, self.normalized)
1249
1250 def test_issue1395_2(self):
1251 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1252 txt._CHUNK_SIZE = 4
1253
1254 reads = ""
1255 while True:
1256 c = txt.read(4)
1257 if not c:
1258 break
1259 reads += c
1260 self.assertEquals(reads, self.normalized)
1261
1262 def test_issue1395_3(self):
1263 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1264 txt._CHUNK_SIZE = 4
1265
1266 reads = txt.read(4)
1267 reads += txt.read(4)
1268 reads += txt.readline()
1269 reads += txt.readline()
1270 reads += txt.readline()
1271 self.assertEquals(reads, self.normalized)
1272
1273 def test_issue1395_4(self):
1274 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1275 txt._CHUNK_SIZE = 4
1276
1277 reads = txt.read(4)
1278 reads += txt.read()
1279 self.assertEquals(reads, self.normalized)
1280
1281 def test_issue1395_5(self):
1282 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1283 txt._CHUNK_SIZE = 4
1284
1285 reads = txt.read(4)
1286 pos = txt.tell()
1287 txt.seek(0)
1288 txt.seek(pos)
1289 self.assertEquals(txt.read(4), "BBB\n")
1290
1291 def test_issue2282(self):
1292 buffer = io.BytesIO(self.testdata)
1293 txt = io.TextIOWrapper(buffer, encoding="ascii")
1294
1295 self.assertEqual(buffer.seekable(), txt.seekable())
1296
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001297 def check_newline_decoder_utf8(self, decoder):
1298 # UTF-8 specific tests for a newline decoder
1299 def _check_decode(b, s, **kwargs):
1300 # We exercise getstate() / setstate() as well as decode()
1301 state = decoder.getstate()
1302 self.assertEquals(decoder.decode(b, **kwargs), s)
1303 decoder.setstate(state)
1304 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001305
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001306 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001307
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001308 _check_decode(b'\xe8', "")
1309 _check_decode(b'\xa2', "")
1310 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001311
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001312 _check_decode(b'\xe8', "")
1313 _check_decode(b'\xa2', "")
1314 _check_decode(b'\x88', "\u8888")
1315
1316 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001317 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1318
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001319 decoder.reset()
1320 _check_decode(b'\n', "\n")
1321 _check_decode(b'\r', "")
1322 _check_decode(b'', "\n", final=True)
1323 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001324
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001325 _check_decode(b'\r', "")
1326 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001327
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001328 _check_decode(b'\r\r\n', "\n\n")
1329 _check_decode(b'\r', "")
1330 _check_decode(b'\r', "\n")
1331 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001332
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001333 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1334 _check_decode(b'\xe8\xa2\x88', "\u8888")
1335 _check_decode(b'\n', "\n")
1336 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1337 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001338
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001339 def check_newline_decoder(self, decoder, encoding):
1340 result = []
1341 encoder = codecs.getincrementalencoder(encoding)()
1342 def _decode_bytewise(s):
1343 for b in encoder.encode(s):
1344 result.append(decoder.decode(b))
1345 self.assertEquals(decoder.newlines, None)
1346 _decode_bytewise("abc\n\r")
1347 self.assertEquals(decoder.newlines, '\n')
1348 _decode_bytewise("\nabc")
1349 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1350 _decode_bytewise("abc\r")
1351 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1352 _decode_bytewise("abc")
1353 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1354 _decode_bytewise("abc\r")
1355 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1356 decoder.reset()
1357 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1358 self.assertEquals(decoder.newlines, None)
1359
1360 def test_newline_decoder(self):
1361 encodings = (
1362 'utf-8', 'latin-1',
1363 'utf-16', 'utf-16-le', 'utf-16-be',
1364 'utf-32', 'utf-32-le', 'utf-32-be',
1365 )
1366 for enc in encodings:
1367 decoder = codecs.getincrementaldecoder(enc)()
1368 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1369 self.check_newline_decoder(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001370 decoder = codecs.getincrementaldecoder("utf-8")()
1371 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrouf8638a82008-12-14 18:08:37 +00001372 self.check_newline_decoder_utf8(decoder)
1373
Antoine Pitrou01a255a2010-05-03 16:48:13 +00001374 def test_flush_error_on_close(self):
1375 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1376 def bad_flush():
1377 raise IOError()
1378 txt.flush = bad_flush
1379 self.assertRaises(IOError, txt.close) # exception not swallowed
1380
1381 def test_multi_close(self):
1382 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1383 txt.close()
1384 txt.close()
1385 txt.close()
1386 self.assertRaises(ValueError, txt.flush)
1387
Christian Heimes1a6387e2008-03-26 12:49:49 +00001388
1389# XXX Tests for open()
1390
1391class MiscIOTest(unittest.TestCase):
1392
Georg Brandld2094602008-12-05 08:51:30 +00001393 def tearDown(self):
1394 test_support.unlink(test_support.TESTFN)
1395
Christian Heimes1a6387e2008-03-26 12:49:49 +00001396 def testImport__all__(self):
1397 for name in io.__all__:
1398 obj = getattr(io, name, None)
1399 self.assert_(obj is not None, name)
1400 if name == "open":
1401 continue
1402 elif "error" in name.lower():
1403 self.assert_(issubclass(obj, Exception), name)
1404 else:
1405 self.assert_(issubclass(obj, io.IOBase))
1406
1407
Georg Brandld2094602008-12-05 08:51:30 +00001408 def test_attributes(self):
1409 f = io.open(test_support.TESTFN, "wb", buffering=0)
Georg Brandlfa71a902008-12-05 09:08:28 +00001410 self.assertEquals(f.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001411 f.close()
1412
1413 f = io.open(test_support.TESTFN, "U")
1414 self.assertEquals(f.name, test_support.TESTFN)
1415 self.assertEquals(f.buffer.name, test_support.TESTFN)
1416 self.assertEquals(f.buffer.raw.name, test_support.TESTFN)
1417 self.assertEquals(f.mode, "U")
Georg Brandlfa71a902008-12-05 09:08:28 +00001418 self.assertEquals(f.buffer.mode, "rb")
1419 self.assertEquals(f.buffer.raw.mode, "rb")
Georg Brandld2094602008-12-05 08:51:30 +00001420 f.close()
1421
1422 f = io.open(test_support.TESTFN, "w+")
1423 self.assertEquals(f.mode, "w+")
Georg Brandlfa71a902008-12-05 09:08:28 +00001424 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1425 self.assertEquals(f.buffer.raw.mode, "rb+")
Georg Brandld2094602008-12-05 08:51:30 +00001426
1427 g = io.open(f.fileno(), "wb", closefd=False)
Georg Brandlfa71a902008-12-05 09:08:28 +00001428 self.assertEquals(g.mode, "wb")
1429 self.assertEquals(g.raw.mode, "wb")
Georg Brandld2094602008-12-05 08:51:30 +00001430 self.assertEquals(g.name, f.fileno())
1431 self.assertEquals(g.raw.name, f.fileno())
1432 f.close()
1433 g.close()
1434
1435
Christian Heimes1a6387e2008-03-26 12:49:49 +00001436def test_main():
1437 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001438 BufferedReaderTest, BufferedWriterTest,
1439 BufferedRWPairTest, BufferedRandomTest,
1440 StatefulIncrementalDecoderTest,
1441 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001442
1443if __name__ == "__main__":
1444 unittest.main()