blob: 7fad7c79a3512872fa54a7e0d9237e6ea5d11211 [file] [log] [blame]
Christian Heimes1a6387e2008-03-26 12:49:49 +00001"""Unit tests for io.py."""
2from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +00003from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +00004
5import os
6import sys
7import time
8import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00009import threading
10import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000011import unittest
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000012from itertools import chain, cycle
Christian Heimes1a6387e2008-03-26 12:49:49 +000013from test import test_support
14
15import codecs
16import io # The module under test
17
18
19class MockRawIO(io.RawIOBase):
20
21 def __init__(self, read_stack=()):
22 self._read_stack = list(read_stack)
23 self._write_stack = []
24
25 def read(self, n=None):
26 try:
27 return self._read_stack.pop(0)
28 except:
29 return b""
30
31 def write(self, b):
32 self._write_stack.append(b[:])
33 return len(b)
34
35 def writable(self):
36 return True
37
38 def fileno(self):
39 return 42
40
41 def readable(self):
42 return True
43
44 def seekable(self):
45 return True
46
47 def seek(self, pos, whence):
48 pass
49
50 def tell(self):
51 return 42
52
53
54class MockFileIO(io.BytesIO):
55
56 def __init__(self, data):
57 self.read_history = []
58 io.BytesIO.__init__(self, data)
59
60 def read(self, n=None):
61 res = io.BytesIO.read(self, n)
62 self.read_history.append(None if res is None else len(res))
63 return res
64
65
66class MockNonBlockWriterIO(io.RawIOBase):
67
68 def __init__(self, blocking_script):
69 self._blocking_script = list(blocking_script)
70 self._write_stack = []
71
72 def write(self, b):
73 self._write_stack.append(b[:])
74 n = self._blocking_script.pop(0)
75 if (n < 0):
76 raise io.BlockingIOError(0, "test blocking", -n)
77 else:
78 return n
79
80 def writable(self):
81 return True
82
83
84class IOTest(unittest.TestCase):
85
86 def tearDown(self):
87 test_support.unlink(test_support.TESTFN)
88
89 def write_ops(self, f):
90 self.assertEqual(f.write(b"blah."), 5)
91 self.assertEqual(f.seek(0), 0)
92 self.assertEqual(f.write(b"Hello."), 6)
93 self.assertEqual(f.tell(), 6)
94 self.assertEqual(f.seek(-1, 1), 5)
95 self.assertEqual(f.tell(), 5)
96 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
97 self.assertEqual(f.seek(0), 0)
98 self.assertEqual(f.write(b"h"), 1)
99 self.assertEqual(f.seek(-1, 2), 13)
100 self.assertEqual(f.tell(), 13)
101 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000102 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000103 self.assertRaises(TypeError, f.seek, 0.0)
104
105 def read_ops(self, f, buffered=False):
106 data = f.read(5)
107 self.assertEqual(data, b"hello")
108 data = bytearray(data)
109 self.assertEqual(f.readinto(data), 5)
110 self.assertEqual(data, b" worl")
111 self.assertEqual(f.readinto(data), 2)
112 self.assertEqual(len(data), 5)
113 self.assertEqual(data[:2], b"d\n")
114 self.assertEqual(f.seek(0), 0)
115 self.assertEqual(f.read(20), b"hello world\n")
116 self.assertEqual(f.read(1), b"")
117 self.assertEqual(f.readinto(bytearray(b"x")), 0)
118 self.assertEqual(f.seek(-6, 2), 6)
119 self.assertEqual(f.read(5), b"world")
120 self.assertEqual(f.read(0), b"")
121 self.assertEqual(f.readinto(bytearray()), 0)
122 self.assertEqual(f.seek(-6, 1), 5)
123 self.assertEqual(f.read(5), b" worl")
124 self.assertEqual(f.tell(), 10)
125 self.assertRaises(TypeError, f.seek, 0.0)
126 if buffered:
127 f.seek(0)
128 self.assertEqual(f.read(), b"hello world\n")
129 f.seek(6)
130 self.assertEqual(f.read(), b"world\n")
131 self.assertEqual(f.read(), b"")
132
133 LARGE = 2**31
134
135 def large_file_ops(self, f):
136 assert f.readable()
137 assert f.writable()
138 self.assertEqual(f.seek(self.LARGE), self.LARGE)
139 self.assertEqual(f.tell(), self.LARGE)
140 self.assertEqual(f.write(b"xxx"), 3)
141 self.assertEqual(f.tell(), self.LARGE + 3)
142 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
143 self.assertEqual(f.truncate(), self.LARGE + 2)
144 self.assertEqual(f.tell(), self.LARGE + 2)
145 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
146 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000147 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000148 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
149 self.assertEqual(f.seek(-1, 2), self.LARGE)
150 self.assertEqual(f.read(2), b"x")
151
152 def test_raw_file_io(self):
153 f = io.open(test_support.TESTFN, "wb", buffering=0)
154 self.assertEqual(f.readable(), False)
155 self.assertEqual(f.writable(), True)
156 self.assertEqual(f.seekable(), True)
157 self.write_ops(f)
158 f.close()
159 f = io.open(test_support.TESTFN, "rb", buffering=0)
160 self.assertEqual(f.readable(), True)
161 self.assertEqual(f.writable(), False)
162 self.assertEqual(f.seekable(), True)
163 self.read_ops(f)
164 f.close()
165
166 def test_buffered_file_io(self):
167 f = io.open(test_support.TESTFN, "wb")
168 self.assertEqual(f.readable(), False)
169 self.assertEqual(f.writable(), True)
170 self.assertEqual(f.seekable(), True)
171 self.write_ops(f)
172 f.close()
173 f = io.open(test_support.TESTFN, "rb")
174 self.assertEqual(f.readable(), True)
175 self.assertEqual(f.writable(), False)
176 self.assertEqual(f.seekable(), True)
177 self.read_ops(f, True)
178 f.close()
179
180 def test_readline(self):
181 f = io.open(test_support.TESTFN, "wb")
182 f.write(b"abc\ndef\nxyzzy\nfoo")
183 f.close()
184 f = io.open(test_support.TESTFN, "rb")
185 self.assertEqual(f.readline(), b"abc\n")
186 self.assertEqual(f.readline(10), b"def\n")
187 self.assertEqual(f.readline(2), b"xy")
188 self.assertEqual(f.readline(4), b"zzy\n")
189 self.assertEqual(f.readline(), b"foo")
190 f.close()
191
192 def test_raw_bytes_io(self):
193 f = io.BytesIO()
194 self.write_ops(f)
195 data = f.getvalue()
196 self.assertEqual(data, b"hello world\n")
197 f = io.BytesIO(data)
198 self.read_ops(f, True)
199
200 def test_large_file_ops(self):
201 # On Windows and Mac OSX this test comsumes large resources; It takes
202 # a long time to build the >2GB file and takes >2GB of disk space
203 # therefore the resource must be enabled to run this test.
204 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
205 if not test_support.is_resource_enabled("largefile"):
206 print("\nTesting large file ops skipped on %s." % sys.platform,
207 file=sys.stderr)
208 print("It requires %d bytes and a long time." % self.LARGE,
209 file=sys.stderr)
210 print("Use 'regrtest.py -u largefile test_io' to run it.",
211 file=sys.stderr)
212 return
213 f = io.open(test_support.TESTFN, "w+b", 0)
214 self.large_file_ops(f)
215 f.close()
216 f = io.open(test_support.TESTFN, "w+b")
217 self.large_file_ops(f)
218 f.close()
219
220 def test_with_open(self):
221 for bufsize in (0, 1, 100):
222 f = None
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 f.write(b"xxx")
225 self.assertEqual(f.closed, True)
226 f = None
227 try:
228 with open(test_support.TESTFN, "wb", bufsize) as f:
229 1/0
230 except ZeroDivisionError:
231 self.assertEqual(f.closed, True)
232 else:
233 self.fail("1/0 didn't raise an exception")
234
235 def test_destructor(self):
236 record = []
237 class MyFileIO(io.FileIO):
238 def __del__(self):
239 record.append(1)
240 io.FileIO.__del__(self)
241 def close(self):
242 record.append(2)
243 io.FileIO.close(self)
244 def flush(self):
245 record.append(3)
246 io.FileIO.flush(self)
247 f = MyFileIO(test_support.TESTFN, "w")
248 f.write("xxx")
249 del f
250 self.assertEqual(record, [1, 2, 3])
251
252 def test_close_flushes(self):
253 f = io.open(test_support.TESTFN, "wb")
254 f.write(b"xxx")
255 f.close()
256 f = io.open(test_support.TESTFN, "rb")
257 self.assertEqual(f.read(), b"xxx")
258 f.close()
259
260 def XXXtest_array_writes(self):
261 # XXX memory view not available yet
262 a = array.array('i', range(10))
263 n = len(memoryview(a))
264 f = io.open(test_support.TESTFN, "wb", 0)
265 self.assertEqual(f.write(a), n)
266 f.close()
267 f = io.open(test_support.TESTFN, "wb")
268 self.assertEqual(f.write(a), n)
269 f.close()
270
271 def test_closefd(self):
272 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
273 closefd=False)
274
275class MemorySeekTestMixin:
276
277 def testInit(self):
278 buf = self.buftype("1234567890")
279 bytesIo = self.ioclass(buf)
280
281 def testRead(self):
282 buf = self.buftype("1234567890")
283 bytesIo = self.ioclass(buf)
284
285 self.assertEquals(buf[:1], bytesIo.read(1))
286 self.assertEquals(buf[1:5], bytesIo.read(4))
287 self.assertEquals(buf[5:], bytesIo.read(900))
288 self.assertEquals(self.EOF, bytesIo.read())
289
290 def testReadNoArgs(self):
291 buf = self.buftype("1234567890")
292 bytesIo = self.ioclass(buf)
293
294 self.assertEquals(buf, bytesIo.read())
295 self.assertEquals(self.EOF, bytesIo.read())
296
297 def testSeek(self):
298 buf = self.buftype("1234567890")
299 bytesIo = self.ioclass(buf)
300
301 bytesIo.read(5)
302 bytesIo.seek(0)
303 self.assertEquals(buf, bytesIo.read())
304
305 bytesIo.seek(3)
306 self.assertEquals(buf[3:], bytesIo.read())
307 self.assertRaises(TypeError, bytesIo.seek, 0.0)
308
309 def testTell(self):
310 buf = self.buftype("1234567890")
311 bytesIo = self.ioclass(buf)
312
313 self.assertEquals(0, bytesIo.tell())
314 bytesIo.seek(5)
315 self.assertEquals(5, bytesIo.tell())
316 bytesIo.seek(10000)
317 self.assertEquals(10000, bytesIo.tell())
318
319
320class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
321 @staticmethod
322 def buftype(s):
323 return s.encode("utf-8")
324 ioclass = io.BytesIO
325 EOF = b""
326
327
328class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
329 buftype = str
330 ioclass = io.StringIO
331 EOF = ""
332
333
334class BufferedReaderTest(unittest.TestCase):
335
336 def testRead(self):
337 rawio = MockRawIO((b"abc", b"d", b"efg"))
338 bufio = io.BufferedReader(rawio)
339
340 self.assertEquals(b"abcdef", bufio.read(6))
341
342 def testBuffering(self):
343 data = b"abcdefghi"
344 dlen = len(data)
345
346 tests = [
347 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
348 [ 100, [ 3, 3, 3], [ dlen ] ],
349 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
350 ]
351
352 for bufsize, buf_read_sizes, raw_read_sizes in tests:
353 rawio = MockFileIO(data)
354 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
355 pos = 0
356 for nbytes in buf_read_sizes:
357 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
358 pos += nbytes
359 self.assertEquals(rawio.read_history, raw_read_sizes)
360
361 def testReadNonBlocking(self):
362 # Inject some None's in there to simulate EWOULDBLOCK
363 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
364 bufio = io.BufferedReader(rawio)
365
366 self.assertEquals(b"abcd", bufio.read(6))
367 self.assertEquals(b"e", bufio.read(1))
368 self.assertEquals(b"fg", bufio.read())
369 self.assert_(None is bufio.read())
370 self.assertEquals(b"", bufio.read())
371
372 def testReadToEof(self):
373 rawio = MockRawIO((b"abc", b"d", b"efg"))
374 bufio = io.BufferedReader(rawio)
375
376 self.assertEquals(b"abcdefg", bufio.read(9000))
377
378 def testReadNoArgs(self):
379 rawio = MockRawIO((b"abc", b"d", b"efg"))
380 bufio = io.BufferedReader(rawio)
381
382 self.assertEquals(b"abcdefg", bufio.read())
383
384 def testFileno(self):
385 rawio = MockRawIO((b"abc", b"d", b"efg"))
386 bufio = io.BufferedReader(rawio)
387
388 self.assertEquals(42, bufio.fileno())
389
390 def testFilenoNoFileno(self):
391 # XXX will we always have fileno() function? If so, kill
392 # this test. Else, write it.
393 pass
394
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000395 def testThreads(self):
396 try:
397 # Write out many bytes with exactly the same number of 0's,
398 # 1's... 255's. This will help us check that concurrent reading
399 # doesn't duplicate or forget contents.
400 N = 1000
401 l = range(256) * N
402 random.shuffle(l)
403 s = bytes(bytearray(l))
404 with io.open(test_support.TESTFN, "wb") as f:
405 f.write(s)
406 with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
407 bufio = io.BufferedReader(raw, 8)
408 errors = []
409 results = []
410 def f():
411 try:
412 # Intra-buffer read then buffer-flushing read
413 for n in cycle([1, 19]):
414 s = bufio.read(n)
415 if not s:
416 break
417 # list.append() is atomic
418 results.append(s)
419 except Exception as e:
420 errors.append(e)
421 raise
422 threads = [threading.Thread(target=f) for x in range(20)]
423 for t in threads:
424 t.start()
425 time.sleep(0.02) # yield
426 for t in threads:
427 t.join()
428 self.assertFalse(errors,
429 "the following exceptions were caught: %r" % errors)
430 s = b''.join(results)
431 for i in range(256):
432 c = bytes(bytearray([i]))
433 self.assertEqual(s.count(c), N)
434 finally:
435 test_support.unlink(test_support.TESTFN)
436
437
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438
439class BufferedWriterTest(unittest.TestCase):
440
441 def testWrite(self):
442 # Write to the buffered IO but don't overflow the buffer.
443 writer = MockRawIO()
444 bufio = io.BufferedWriter(writer, 8)
445
446 bufio.write(b"abc")
447
448 self.assertFalse(writer._write_stack)
449
450 def testWriteOverflow(self):
451 writer = MockRawIO()
452 bufio = io.BufferedWriter(writer, 8)
453
454 bufio.write(b"abc")
455 bufio.write(b"defghijkl")
456
457 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
458
459 def testWriteNonBlocking(self):
460 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
461 bufio = io.BufferedWriter(raw, 8, 16)
462
463 bufio.write(b"asdf")
464 bufio.write(b"asdfa")
465 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
466
467 bufio.write(b"asdfasdfasdf")
468 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
469 bufio.write(b"asdfasdfasdf")
470 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
471 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
472
473 bufio.write(b"asdfasdfasdf")
474
475 # XXX I don't like this test. It relies too heavily on how the
476 # algorithm actually works, which we might change. Refactor
477 # later.
478
479 def testFileno(self):
480 rawio = MockRawIO((b"abc", b"d", b"efg"))
481 bufio = io.BufferedWriter(rawio)
482
483 self.assertEquals(42, bufio.fileno())
484
485 def testFlush(self):
486 writer = MockRawIO()
487 bufio = io.BufferedWriter(writer, 8)
488
489 bufio.write(b"abc")
490 bufio.flush()
491
492 self.assertEquals(b"abc", writer._write_stack[0])
493
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000494 def testThreads(self):
495 # BufferedWriter should not raise exceptions or crash
496 # when called from multiple threads.
497 try:
498 # We use a real file object because it allows us to
499 # exercise situations where the GIL is released before
500 # writing the buffer to the raw streams. This is in addition
501 # to concurrency issues due to switching threads in the middle
502 # of Python code.
503 with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
504 bufio = io.BufferedWriter(raw, 8)
505 errors = []
506 def f():
507 try:
508 # Write enough bytes to flush the buffer
509 s = b"a" * 19
510 for i in range(50):
511 bufio.write(s)
512 except Exception as e:
513 errors.append(e)
514 raise
515 threads = [threading.Thread(target=f) for x in range(20)]
516 for t in threads:
517 t.start()
518 time.sleep(0.02) # yield
519 for t in threads:
520 t.join()
521 self.assertFalse(errors,
522 "the following exceptions were caught: %r" % errors)
523 finally:
524 test_support.unlink(test_support.TESTFN)
525
Christian Heimes1a6387e2008-03-26 12:49:49 +0000526
527class BufferedRWPairTest(unittest.TestCase):
528
529 def testRWPair(self):
530 r = MockRawIO(())
531 w = MockRawIO()
532 pair = io.BufferedRWPair(r, w)
533
534 # XXX need implementation
535
536
537class BufferedRandomTest(unittest.TestCase):
538
539 def testReadAndWrite(self):
540 raw = MockRawIO((b"asdf", b"ghjk"))
541 rw = io.BufferedRandom(raw, 8, 12)
542
543 self.assertEqual(b"as", rw.read(2))
544 rw.write(b"ddd")
545 rw.write(b"eee")
546 self.assertFalse(raw._write_stack) # Buffer writes
547 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
548 self.assertEquals(b"dddeee", raw._write_stack[0])
549
550 def testSeekAndTell(self):
551 raw = io.BytesIO(b"asdfghjkl")
552 rw = io.BufferedRandom(raw)
553
554 self.assertEquals(b"as", rw.read(2))
555 self.assertEquals(2, rw.tell())
556 rw.seek(0, 0)
557 self.assertEquals(b"asdf", rw.read(4))
558
559 rw.write(b"asdf")
560 rw.seek(0, 0)
561 self.assertEquals(b"asdfasdfl", rw.read())
562 self.assertEquals(9, rw.tell())
563 rw.seek(-4, 2)
564 self.assertEquals(5, rw.tell())
565 rw.seek(2, 1)
566 self.assertEquals(7, rw.tell())
567 self.assertEquals(b"fl", rw.read(11))
568 self.assertRaises(TypeError, rw.seek, 0.0)
569
570# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
571# properties:
572# - A single output character can correspond to many bytes of input.
573# - The number of input bytes to complete the character can be
574# undetermined until the last input byte is received.
575# - The number of input bytes can vary depending on previous input.
576# - A single input byte can correspond to many characters of output.
577# - The number of output characters can be undetermined until the
578# last input byte is received.
579# - The number of output characters can vary depending on previous input.
580
581class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
582 """
583 For testing seek/tell behavior with a stateful, buffering decoder.
584
585 Input is a sequence of words. Words may be fixed-length (length set
586 by input) or variable-length (period-terminated). In variable-length
587 mode, extra periods are ignored. Possible words are:
588 - 'i' followed by a number sets the input length, I (maximum 99).
589 When I is set to 0, words are space-terminated.
590 - 'o' followed by a number sets the output length, O (maximum 99).
591 - Any other word is converted into a word followed by a period on
592 the output. The output word consists of the input word truncated
593 or padded out with hyphens to make its length equal to O. If O
594 is 0, the word is output verbatim without truncating or padding.
595 I and O are initially set to 1. When I changes, any buffered input is
596 re-scanned according to the new I. EOF also terminates the last word.
597 """
598
599 def __init__(self, errors='strict'):
600 codecs.IncrementalDecoder.__init__(self, errors)
601 self.reset()
602
603 def __repr__(self):
604 return '<SID %x>' % id(self)
605
606 def reset(self):
607 self.i = 1
608 self.o = 1
609 self.buffer = bytearray()
610
611 def getstate(self):
612 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
613 return bytes(self.buffer), i*100 + o
614
615 def setstate(self, state):
616 buffer, io = state
617 self.buffer = bytearray(buffer)
618 i, o = divmod(io, 100)
619 self.i, self.o = i ^ 1, o ^ 1
620
621 def decode(self, input, final=False):
622 output = ''
623 for b in input:
624 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000625 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +0000626 if self.buffer:
627 output += self.process_word()
628 else:
629 self.buffer.append(b)
630 else: # fixed-length, terminate after self.i bytes
631 self.buffer.append(b)
632 if len(self.buffer) == self.i:
633 output += self.process_word()
634 if final and self.buffer: # EOF terminates the last word
635 output += self.process_word()
636 return output
637
638 def process_word(self):
639 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000640 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +0000642 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000643 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
644 else:
645 output = self.buffer.decode('ascii')
646 if len(output) < self.o:
647 output += '-'*self.o # pad out with hyphens
648 if self.o:
649 output = output[:self.o] # truncate to output length
650 output += '.'
651 self.buffer = bytearray()
652 return output
653
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +0000654 codecEnabled = False
655
656 @classmethod
657 def lookupTestDecoder(cls, name):
658 if cls.codecEnabled and name == 'test_decoder':
659 return codecs.CodecInfo(
660 name='test_decoder', encode=None, decode=None,
661 incrementalencoder=None,
662 streamreader=None, streamwriter=None,
663 incrementaldecoder=cls)
664
665# Register the previous decoder for testing.
666# Disabled by default, tests will enable it.
667codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
668
669
Christian Heimes1a6387e2008-03-26 12:49:49 +0000670class StatefulIncrementalDecoderTest(unittest.TestCase):
671 """
672 Make sure the StatefulIncrementalDecoder actually works.
673 """
674
675 test_cases = [
676 # I=1, O=1 (fixed-length input == fixed-length output)
677 (b'abcd', False, 'a.b.c.d.'),
678 # I=0, O=0 (variable-length input, variable-length output)
679 (b'oiabcd', True, 'abcd.'),
680 # I=0, O=0 (should ignore extra periods)
681 (b'oi...abcd...', True, 'abcd.'),
682 # I=0, O=6 (variable-length input, fixed-length output)
683 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
684 # I=2, O=6 (fixed-length input < fixed-length output)
685 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
686 # I=6, O=3 (fixed-length input > fixed-length output)
687 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
688 # I=0, then 3; O=29, then 15 (with longer output)
689 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
690 'a----------------------------.' +
691 'b----------------------------.' +
692 'cde--------------------------.' +
693 'abcdefghijabcde.' +
694 'a.b------------.' +
695 '.c.------------.' +
696 'd.e------------.' +
697 'k--------------.' +
698 'l--------------.' +
699 'm--------------.')
700 ]
701
702 def testDecoder(self):
703 # Try a few one-shot test cases.
704 for input, eof, output in self.test_cases:
705 d = StatefulIncrementalDecoder()
706 self.assertEquals(d.decode(input, eof), output)
707
708 # Also test an unfinished decode, followed by forcing EOF.
709 d = StatefulIncrementalDecoder()
710 self.assertEquals(d.decode(b'oiabcd'), '')
711 self.assertEquals(d.decode(b'', 1), 'abcd.')
712
713class TextIOWrapperTest(unittest.TestCase):
714
715 def setUp(self):
716 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
717 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
718
719 def tearDown(self):
720 test_support.unlink(test_support.TESTFN)
721
722 def testLineBuffering(self):
723 r = io.BytesIO()
724 b = io.BufferedWriter(r, 1000)
725 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
726 t.write(u"X")
727 self.assertEquals(r.getvalue(), b"") # No flush happened
728 t.write(u"Y\nZ")
729 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
730 t.write(u"A\rB")
731 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
732
733 def testEncodingErrorsReading(self):
734 # (1) default
735 b = io.BytesIO(b"abc\n\xff\n")
736 t = io.TextIOWrapper(b, encoding="ascii")
737 self.assertRaises(UnicodeError, t.read)
738 # (2) explicit strict
739 b = io.BytesIO(b"abc\n\xff\n")
740 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
741 self.assertRaises(UnicodeError, t.read)
742 # (3) ignore
743 b = io.BytesIO(b"abc\n\xff\n")
744 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
745 self.assertEquals(t.read(), "abc\n\n")
746 # (4) replace
747 b = io.BytesIO(b"abc\n\xff\n")
748 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
749 self.assertEquals(t.read(), u"abc\n\ufffd\n")
750
751 def testEncodingErrorsWriting(self):
752 # (1) default
753 b = io.BytesIO()
754 t = io.TextIOWrapper(b, encoding="ascii")
755 self.assertRaises(UnicodeError, t.write, u"\xff")
756 # (2) explicit strict
757 b = io.BytesIO()
758 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
759 self.assertRaises(UnicodeError, t.write, u"\xff")
760 # (3) ignore
761 b = io.BytesIO()
762 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
763 newline="\n")
764 t.write(u"abc\xffdef\n")
765 t.flush()
766 self.assertEquals(b.getvalue(), b"abcdef\n")
767 # (4) replace
768 b = io.BytesIO()
769 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
770 newline="\n")
771 t.write(u"abc\xffdef\n")
772 t.flush()
773 self.assertEquals(b.getvalue(), b"abc?def\n")
774
775 def testNewlinesInput(self):
776 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
777 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
778 for newline, expected in [
779 (None, normalized.decode("ascii").splitlines(True)),
780 ("", testdata.decode("ascii").splitlines(True)),
781 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
782 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
783 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
784 ]:
785 buf = io.BytesIO(testdata)
786 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
787 self.assertEquals(txt.readlines(), expected)
788 txt.seek(0)
789 self.assertEquals(txt.read(), "".join(expected))
790
791 def testNewlinesOutput(self):
792 testdict = {
793 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
794 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
795 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
796 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
797 }
798 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
799 for newline, expected in tests:
800 buf = io.BytesIO()
801 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
802 txt.write("AAA\nB")
803 txt.write("BB\nCCC\n")
804 txt.write("X\rY\r\nZ")
805 txt.flush()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000806 self.assertEquals(buf.closed, False)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807 self.assertEquals(buf.getvalue(), expected)
808
809 def testNewlines(self):
810 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
811
812 tests = [
813 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
814 [ '', input_lines ],
815 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
816 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
817 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
818 ]
819
820 encodings = ('utf-8', 'latin-1')
821
822 # Try a range of buffer sizes to test the case where \r is the last
823 # character in TextIOWrapper._pending_line.
824 for encoding in encodings:
825 # XXX: str.encode() should return bytes
826 data = bytes(''.join(input_lines).encode(encoding))
827 for do_reads in (False, True):
828 for bufsize in range(1, 10):
829 for newline, exp_lines in tests:
830 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
831 textio = io.TextIOWrapper(bufio, newline=newline,
832 encoding=encoding)
833 if do_reads:
834 got_lines = []
835 while True:
836 c2 = textio.read(2)
837 if c2 == '':
838 break
839 self.assertEquals(len(c2), 2)
840 got_lines.append(c2 + textio.readline())
841 else:
842 got_lines = list(textio)
843
844 for got_line, exp_line in zip(got_lines, exp_lines):
845 self.assertEquals(got_line, exp_line)
846 self.assertEquals(len(got_lines), len(exp_lines))
847
848 def testNewlinesInput(self):
849 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
850 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
851 for newline, expected in [
852 (None, normalized.decode("ascii").splitlines(True)),
853 ("", testdata.decode("ascii").splitlines(True)),
854 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
855 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
856 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
857 ]:
858 buf = io.BytesIO(testdata)
859 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
860 self.assertEquals(txt.readlines(), expected)
861 txt.seek(0)
862 self.assertEquals(txt.read(), "".join(expected))
863
864 def testNewlinesOutput(self):
865 data = u"AAA\nBBB\rCCC\n"
866 data_lf = b"AAA\nBBB\rCCC\n"
867 data_cr = b"AAA\rBBB\rCCC\r"
868 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
869 save_linesep = os.linesep
870 try:
871 for os.linesep, newline, expected in [
872 ("\n", None, data_lf),
873 ("\r\n", None, data_crlf),
874 ("\n", "", data_lf),
875 ("\r\n", "", data_lf),
876 ("\n", "\n", data_lf),
877 ("\r\n", "\n", data_lf),
878 ("\n", "\r", data_cr),
879 ("\r\n", "\r", data_cr),
880 ("\n", "\r\n", data_crlf),
881 ("\r\n", "\r\n", data_crlf),
882 ]:
883 buf = io.BytesIO()
884 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
885 txt.write(data)
886 txt.close()
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000887 self.assertEquals(buf.closed, True)
888 self.assertRaises(ValueError, buf.getvalue)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000889 finally:
890 os.linesep = save_linesep
891
892 # Systematic tests of the text I/O API
893
894 def testBasicIO(self):
895 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
896 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
897 f = io.open(test_support.TESTFN, "w+", encoding=enc)
898 f._CHUNK_SIZE = chunksize
899 self.assertEquals(f.write(u"abc"), 3)
900 f.close()
901 f = io.open(test_support.TESTFN, "r+", encoding=enc)
902 f._CHUNK_SIZE = chunksize
903 self.assertEquals(f.tell(), 0)
904 self.assertEquals(f.read(), u"abc")
905 cookie = f.tell()
906 self.assertEquals(f.seek(0), 0)
907 self.assertEquals(f.read(2), u"ab")
908 self.assertEquals(f.read(1), u"c")
909 self.assertEquals(f.read(1), u"")
910 self.assertEquals(f.read(), u"")
911 self.assertEquals(f.tell(), cookie)
912 self.assertEquals(f.seek(0), 0)
913 self.assertEquals(f.seek(0, 2), cookie)
914 self.assertEquals(f.write(u"def"), 3)
915 self.assertEquals(f.seek(cookie), cookie)
916 self.assertEquals(f.read(), u"def")
917 if enc.startswith("utf"):
918 self.multi_line_test(f, enc)
919 f.close()
920
921 def multi_line_test(self, f, enc):
922 f.seek(0)
923 f.truncate()
924 sample = u"s\xff\u0fff\uffff"
925 wlines = []
926 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
927 chars = []
928 for i in range(size):
929 chars.append(sample[i % len(sample)])
930 line = u"".join(chars) + u"\n"
931 wlines.append((f.tell(), line))
932 f.write(line)
933 f.seek(0)
934 rlines = []
935 while True:
936 pos = f.tell()
937 line = f.readline()
938 if not line:
939 break
940 rlines.append((pos, line))
941 self.assertEquals(rlines, wlines)
942
943 def testTelling(self):
944 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
945 p0 = f.tell()
946 f.write(u"\xff\n")
947 p1 = f.tell()
948 f.write(u"\xff\n")
949 p2 = f.tell()
950 f.seek(0)
951 self.assertEquals(f.tell(), p0)
952 self.assertEquals(f.readline(), u"\xff\n")
953 self.assertEquals(f.tell(), p1)
954 self.assertEquals(f.readline(), u"\xff\n")
955 self.assertEquals(f.tell(), p2)
956 f.seek(0)
957 for line in f:
958 self.assertEquals(line, u"\xff\n")
959 self.assertRaises(IOError, f.tell)
960 self.assertEquals(f.tell(), p2)
961 f.close()
962
963 def testSeeking(self):
964 chunk_size = io.TextIOWrapper._CHUNK_SIZE
965 prefix_size = chunk_size - 2
966 u_prefix = "a" * prefix_size
967 prefix = bytes(u_prefix.encode("utf-8"))
968 self.assertEquals(len(u_prefix), len(prefix))
969 u_suffix = "\u8888\n"
970 suffix = bytes(u_suffix.encode("utf-8"))
971 line = prefix + suffix
972 f = io.open(test_support.TESTFN, "wb")
973 f.write(line*2)
974 f.close()
975 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
976 s = f.read(prefix_size)
977 self.assertEquals(s, unicode(prefix, "ascii"))
978 self.assertEquals(f.tell(), prefix_size)
979 self.assertEquals(f.readline(), u_suffix)
980
981 def testSeekingToo(self):
982 # Regression test for a specific bug
983 data = b'\xe0\xbf\xbf\n'
984 f = io.open(test_support.TESTFN, "wb")
985 f.write(data)
986 f.close()
987 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
988 f._CHUNK_SIZE # Just test that it exists
989 f._CHUNK_SIZE = 2
990 f.readline()
991 f.tell()
992
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +0000993 def testSeekAndTell(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000994 """Test seek/tell using the StatefulIncrementalDecoder."""
995
Christian Heimes1a6387e2008-03-26 12:49:49 +0000996 def testSeekAndTellWithData(data, min_pos=0):
997 """Tell/seek to various points within a data stream and ensure
998 that the decoded data returned by read() is consistent."""
999 f = io.open(test_support.TESTFN, 'wb')
1000 f.write(data)
1001 f.close()
1002 f = io.open(test_support.TESTFN, encoding='test_decoder')
1003 decoded = f.read()
1004 f.close()
1005
1006 for i in range(min_pos, len(decoded) + 1): # seek positions
1007 for j in [1, 5, len(decoded) - i]: # read lengths
1008 f = io.open(test_support.TESTFN, encoding='test_decoder')
1009 self.assertEquals(f.read(i), decoded[:i])
1010 cookie = f.tell()
1011 self.assertEquals(f.read(j), decoded[i:i + j])
1012 f.seek(cookie)
1013 self.assertEquals(f.read(), decoded[i:])
1014 f.close()
1015
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001016 # Enable the test decoder.
1017 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001018
1019 # Run the tests.
1020 try:
1021 # Try each test case.
1022 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1023 testSeekAndTellWithData(input)
1024
1025 # Position each test case so that it crosses a chunk boundary.
1026 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1027 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1028 offset = CHUNK_SIZE - len(input)//2
1029 prefix = b'.'*offset
1030 # Don't bother seeking into the prefix (takes too long).
1031 min_pos = offset*2
1032 testSeekAndTellWithData(prefix + input, min_pos)
1033
1034 # Ensure our test decoder won't interfere with subsequent tests.
1035 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001036 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001037
1038 def testEncodedWrites(self):
1039 data = u"1234567890"
1040 tests = ("utf-16",
1041 "utf-16-le",
1042 "utf-16-be",
1043 "utf-32",
1044 "utf-32-le",
1045 "utf-32-be")
1046 for encoding in tests:
1047 buf = io.BytesIO()
1048 f = io.TextIOWrapper(buf, encoding=encoding)
1049 # Check if the BOM is written only once (see issue1753).
1050 f.write(data)
1051 f.write(data)
1052 f.seek(0)
1053 self.assertEquals(f.read(), data * 2)
1054 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1055
1056 def timingTest(self):
1057 timer = time.time
1058 enc = "utf8"
1059 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
1060 nlines = 10000
1061 nchars = len(line)
1062 nbytes = len(line.encode(enc))
1063 for chunk_size in (32, 64, 128, 256):
1064 f = io.open(test_support.TESTFN, "w+", encoding=enc)
1065 f._CHUNK_SIZE = chunk_size
1066 t0 = timer()
1067 for i in range(nlines):
1068 f.write(line)
1069 f.flush()
1070 t1 = timer()
1071 f.seek(0)
1072 for line in f:
1073 pass
1074 t2 = timer()
1075 f.seek(0)
1076 while f.readline():
1077 pass
1078 t3 = timer()
1079 f.seek(0)
1080 while f.readline():
1081 f.tell()
1082 t4 = timer()
1083 f.close()
1084 if test_support.verbose:
1085 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1086 (nlines, nchars, nbytes))
1087 print("File chunk size: %6s" % f._CHUNK_SIZE)
1088 print("Writing: %6.3f seconds" % (t1-t0))
1089 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1090 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1091 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1092
1093 def testReadOneByOne(self):
1094 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1095 reads = ""
1096 while True:
1097 c = txt.read(1)
1098 if not c:
1099 break
1100 reads += c
1101 self.assertEquals(reads, "AA\nBB")
1102
1103 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1104 def testReadByChunk(self):
1105 # make sure "\r\n" straddles 128 char boundary.
1106 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1107 reads = ""
1108 while True:
1109 c = txt.read(128)
1110 if not c:
1111 break
1112 reads += c
1113 self.assertEquals(reads, "A"*127+"\nB")
1114
1115 def test_issue1395_1(self):
1116 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1117
1118 # read one char at a time
1119 reads = ""
1120 while True:
1121 c = txt.read(1)
1122 if not c:
1123 break
1124 reads += c
1125 self.assertEquals(reads, self.normalized)
1126
1127 def test_issue1395_2(self):
1128 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1129 txt._CHUNK_SIZE = 4
1130
1131 reads = ""
1132 while True:
1133 c = txt.read(4)
1134 if not c:
1135 break
1136 reads += c
1137 self.assertEquals(reads, self.normalized)
1138
1139 def test_issue1395_3(self):
1140 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1141 txt._CHUNK_SIZE = 4
1142
1143 reads = txt.read(4)
1144 reads += txt.read(4)
1145 reads += txt.readline()
1146 reads += txt.readline()
1147 reads += txt.readline()
1148 self.assertEquals(reads, self.normalized)
1149
1150 def test_issue1395_4(self):
1151 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1152 txt._CHUNK_SIZE = 4
1153
1154 reads = txt.read(4)
1155 reads += txt.read()
1156 self.assertEquals(reads, self.normalized)
1157
1158 def test_issue1395_5(self):
1159 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1160 txt._CHUNK_SIZE = 4
1161
1162 reads = txt.read(4)
1163 pos = txt.tell()
1164 txt.seek(0)
1165 txt.seek(pos)
1166 self.assertEquals(txt.read(4), "BBB\n")
1167
1168 def test_issue2282(self):
1169 buffer = io.BytesIO(self.testdata)
1170 txt = io.TextIOWrapper(buffer, encoding="ascii")
1171
1172 self.assertEqual(buffer.seekable(), txt.seekable())
1173
1174 def test_newline_decoder(self):
1175 import codecs
1176 decoder = codecs.getincrementaldecoder("utf-8")()
1177 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1178
1179 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1180
1181 self.assertEquals(decoder.decode(b'\xe8'), u"")
1182 self.assertEquals(decoder.decode(b'\xa2'), u"")
1183 self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
1184
1185 self.assertEquals(decoder.decode(b'\xe8'), u"")
1186 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1187
1188 decoder.setstate((b'', 0))
1189 self.assertEquals(decoder.decode(b'\n'), u"\n")
1190 self.assertEquals(decoder.decode(b'\r'), u"")
1191 self.assertEquals(decoder.decode(b'', final=True), u"\n")
1192 self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
1193
1194 self.assertEquals(decoder.decode(b'\r'), u"")
1195 self.assertEquals(decoder.decode(b'a'), u"\na")
1196
1197 self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
1198 self.assertEquals(decoder.decode(b'\r'), u"")
1199 self.assertEquals(decoder.decode(b'\r'), u"\n")
1200 self.assertEquals(decoder.decode(b'\na'), u"\na")
1201
1202 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
1203 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
1204 self.assertEquals(decoder.decode(b'\n'), u"\n")
1205 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
1206 self.assertEquals(decoder.decode(b'\n'), u"\n")
1207
1208 decoder = codecs.getincrementaldecoder("utf-8")()
1209 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1210 self.assertEquals(decoder.newlines, None)
1211 decoder.decode(b"abc\n\r")
1212 self.assertEquals(decoder.newlines, u'\n')
1213 decoder.decode(b"\nabc")
1214 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1215 decoder.decode(b"abc\r")
1216 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1217 decoder.decode(b"abc")
1218 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1219 decoder.decode(b"abc\r")
1220 decoder.reset()
1221 self.assertEquals(decoder.decode(b"abc"), "abc")
1222 self.assertEquals(decoder.newlines, None)
1223
1224# XXX Tests for open()
1225
1226class MiscIOTest(unittest.TestCase):
1227
1228 def testImport__all__(self):
1229 for name in io.__all__:
1230 obj = getattr(io, name, None)
1231 self.assert_(obj is not None, name)
1232 if name == "open":
1233 continue
1234 elif "error" in name.lower():
1235 self.assert_(issubclass(obj, Exception), name)
1236 else:
1237 self.assert_(issubclass(obj, io.IOBase))
1238
1239
1240def test_main():
1241 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001242 BufferedReaderTest, BufferedWriterTest,
1243 BufferedRWPairTest, BufferedRandomTest,
1244 StatefulIncrementalDecoderTest,
1245 TextIOWrapperTest, MiscIOTest)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001246
1247if __name__ == "__main__":
1248 unittest.main()