blob: f25f710fa4184a4354e9a6fdb3dc5b62f5f02d89 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Georg Brandla4f46e12010-02-07 17:03:15 +000033from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000034from collections import deque
35from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000036
37import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000040try:
41 import threading
42except ImportError:
43 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000044
45__metaclass__ = type
46bytes = support.py3k_bytes
47
48def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
50 with io.open(__file__, "r", encoding="latin1") as f:
51 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000052
53
Antoine Pitrou19690592009-06-12 20:14:08 +000054class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000055
56 def __init__(self, read_stack=()):
57 self._read_stack = list(read_stack)
58 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000059 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000060 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000061
62 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000064 try:
65 return self._read_stack.pop(0)
66 except:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000067 self._extraneous_reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return b""
69
70 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000071 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000072 return len(b)
73
74 def writable(self):
75 return True
76
77 def fileno(self):
78 return 42
79
80 def readable(self):
81 return True
82
83 def seekable(self):
84 return True
85
86 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000088
89 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000090 return 0 # same comment as above
91
92 def readinto(self, buf):
93 self._reads += 1
94 max_len = len(buf)
95 try:
96 data = self._read_stack[0]
97 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000098 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000099 return 0
100 if data is None:
101 del self._read_stack[0]
102 return None
103 n = len(data)
104 if len(data) <= max_len:
105 del self._read_stack[0]
106 buf[:n] = data
107 return n
108 else:
109 buf[:] = data[:max_len]
110 self._read_stack[0] = data[max_len:]
111 return max_len
112
113 def truncate(self, pos=None):
114 return pos
115
116class CMockRawIO(MockRawIO, io.RawIOBase):
117 pass
118
119class PyMockRawIO(MockRawIO, pyio.RawIOBase):
120 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000121
122
Antoine Pitrou19690592009-06-12 20:14:08 +0000123class MisbehavedRawIO(MockRawIO):
124 def write(self, b):
125 return MockRawIO.write(self, b) * 2
126
127 def read(self, n=None):
128 return MockRawIO.read(self, n) * 2
129
130 def seek(self, pos, whence):
131 return -123
132
133 def tell(self):
134 return -456
135
136 def readinto(self, buf):
137 MockRawIO.readinto(self, buf)
138 return len(buf) * 5
139
140class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
141 pass
142
143class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
144 pass
145
146
147class CloseFailureIO(MockRawIO):
148 closed = 0
149
150 def close(self):
151 if not self.closed:
152 self.closed = 1
153 raise IOError
154
155class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
156 pass
157
158class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
159 pass
160
161
162class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163
164 def __init__(self, data):
165 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000166 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000167
168 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000169 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000170 self.read_history.append(None if res is None else len(res))
171 return res
172
Antoine Pitrou19690592009-06-12 20:14:08 +0000173 def readinto(self, b):
174 res = super(MockFileIO, self).readinto(b)
175 self.read_history.append(res)
176 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000177
Antoine Pitrou19690592009-06-12 20:14:08 +0000178class CMockFileIO(MockFileIO, io.BytesIO):
179 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
Antoine Pitrou19690592009-06-12 20:14:08 +0000181class PyMockFileIO(MockFileIO, pyio.BytesIO):
182 pass
183
184
185class MockNonBlockWriterIO:
186
187 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000188 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000189 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191 def pop_written(self):
192 s = b"".join(self._write_stack)
193 self._write_stack[:] = []
194 return s
195
196 def block_on(self, char):
197 """Block when a given char is encountered."""
198 self._blocker_char = char
199
200 def readable(self):
201 return True
202
203 def seekable(self):
204 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000205
206 def writable(self):
207 return True
208
Antoine Pitrou19690592009-06-12 20:14:08 +0000209 def write(self, b):
210 b = bytes(b)
211 n = -1
212 if self._blocker_char:
213 try:
214 n = b.index(self._blocker_char)
215 except ValueError:
216 pass
217 else:
218 self._blocker_char = None
219 self._write_stack.append(b[:n])
220 raise self.BlockingIOError(0, "test blocking", n)
221 self._write_stack.append(b)
222 return len(b)
223
224class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
225 BlockingIOError = io.BlockingIOError
226
227class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
228 BlockingIOError = pyio.BlockingIOError
229
Christian Heimes1a6387e2008-03-26 12:49:49 +0000230
231class IOTest(unittest.TestCase):
232
Antoine Pitrou19690592009-06-12 20:14:08 +0000233 def setUp(self):
234 support.unlink(support.TESTFN)
235
Christian Heimes1a6387e2008-03-26 12:49:49 +0000236 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000237 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000238
239 def write_ops(self, f):
240 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000241 f.truncate(0)
242 self.assertEqual(f.tell(), 5)
243 f.seek(0)
244
245 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000246 self.assertEqual(f.seek(0), 0)
247 self.assertEqual(f.write(b"Hello."), 6)
248 self.assertEqual(f.tell(), 6)
249 self.assertEqual(f.seek(-1, 1), 5)
250 self.assertEqual(f.tell(), 5)
251 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
252 self.assertEqual(f.seek(0), 0)
253 self.assertEqual(f.write(b"h"), 1)
254 self.assertEqual(f.seek(-1, 2), 13)
255 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000256
Christian Heimes1a6387e2008-03-26 12:49:49 +0000257 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000258 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertRaises(TypeError, f.seek, 0.0)
260
261 def read_ops(self, f, buffered=False):
262 data = f.read(5)
263 self.assertEqual(data, b"hello")
264 data = bytearray(data)
265 self.assertEqual(f.readinto(data), 5)
266 self.assertEqual(data, b" worl")
267 self.assertEqual(f.readinto(data), 2)
268 self.assertEqual(len(data), 5)
269 self.assertEqual(data[:2], b"d\n")
270 self.assertEqual(f.seek(0), 0)
271 self.assertEqual(f.read(20), b"hello world\n")
272 self.assertEqual(f.read(1), b"")
273 self.assertEqual(f.readinto(bytearray(b"x")), 0)
274 self.assertEqual(f.seek(-6, 2), 6)
275 self.assertEqual(f.read(5), b"world")
276 self.assertEqual(f.read(0), b"")
277 self.assertEqual(f.readinto(bytearray()), 0)
278 self.assertEqual(f.seek(-6, 1), 5)
279 self.assertEqual(f.read(5), b" worl")
280 self.assertEqual(f.tell(), 10)
281 self.assertRaises(TypeError, f.seek, 0.0)
282 if buffered:
283 f.seek(0)
284 self.assertEqual(f.read(), b"hello world\n")
285 f.seek(6)
286 self.assertEqual(f.read(), b"world\n")
287 self.assertEqual(f.read(), b"")
288
289 LARGE = 2**31
290
291 def large_file_ops(self, f):
292 assert f.readable()
293 assert f.writable()
294 self.assertEqual(f.seek(self.LARGE), self.LARGE)
295 self.assertEqual(f.tell(), self.LARGE)
296 self.assertEqual(f.write(b"xxx"), 3)
297 self.assertEqual(f.tell(), self.LARGE + 3)
298 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
299 self.assertEqual(f.truncate(), self.LARGE + 2)
300 self.assertEqual(f.tell(), self.LARGE + 2)
301 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
302 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000303 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000304 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
305 self.assertEqual(f.seek(-1, 2), self.LARGE)
306 self.assertEqual(f.read(2), b"x")
307
Antoine Pitrou19690592009-06-12 20:14:08 +0000308 def test_invalid_operations(self):
309 # Try writing on a file opened in read mode and vice-versa.
310 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000311 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 self.assertRaises(IOError, fp.read)
313 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000314 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000315 self.assertRaises(IOError, fp.write, b"blah")
316 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000317 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000318 self.assertRaises(IOError, fp.write, "blah")
319 self.assertRaises(IOError, fp.writelines, ["blah\n"])
320
Christian Heimes1a6387e2008-03-26 12:49:49 +0000321 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000322 with self.open(support.TESTFN, "wb", buffering=0) as f:
323 self.assertEqual(f.readable(), False)
324 self.assertEqual(f.writable(), True)
325 self.assertEqual(f.seekable(), True)
326 self.write_ops(f)
327 with self.open(support.TESTFN, "rb", buffering=0) as f:
328 self.assertEqual(f.readable(), True)
329 self.assertEqual(f.writable(), False)
330 self.assertEqual(f.seekable(), True)
331 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000332
333 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000334 with self.open(support.TESTFN, "wb") as f:
335 self.assertEqual(f.readable(), False)
336 self.assertEqual(f.writable(), True)
337 self.assertEqual(f.seekable(), True)
338 self.write_ops(f)
339 with self.open(support.TESTFN, "rb") as f:
340 self.assertEqual(f.readable(), True)
341 self.assertEqual(f.writable(), False)
342 self.assertEqual(f.seekable(), True)
343 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000344
345 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000346 with self.open(support.TESTFN, "wb") as f:
347 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
348 with self.open(support.TESTFN, "rb") as f:
349 self.assertEqual(f.readline(), b"abc\n")
350 self.assertEqual(f.readline(10), b"def\n")
351 self.assertEqual(f.readline(2), b"xy")
352 self.assertEqual(f.readline(4), b"zzy\n")
353 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000354 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000355 self.assertRaises(TypeError, f.readline, 5.3)
356 with self.open(support.TESTFN, "r") as f:
357 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000358
359 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000360 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000361 self.write_ops(f)
362 data = f.getvalue()
363 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000364 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000365 self.read_ops(f, True)
366
367 def test_large_file_ops(self):
368 # On Windows and Mac OSX this test comsumes large resources; It takes
369 # a long time to build the >2GB file and takes >2GB of disk space
370 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000371 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
372 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000373 print("\nTesting large file ops skipped on %s." % sys.platform,
374 file=sys.stderr)
375 print("It requires %d bytes and a long time." % self.LARGE,
376 file=sys.stderr)
377 print("Use 'regrtest.py -u largefile test_io' to run it.",
378 file=sys.stderr)
379 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000380 with self.open(support.TESTFN, "w+b", 0) as f:
381 self.large_file_ops(f)
382 with self.open(support.TESTFN, "w+b") as f:
383 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384
385 def test_with_open(self):
386 for bufsize in (0, 1, 100):
387 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000388 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389 f.write(b"xxx")
390 self.assertEqual(f.closed, True)
391 f = None
392 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000393 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000394 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000395 except ZeroDivisionError:
396 self.assertEqual(f.closed, True)
397 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000398 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000399
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400 # issue 5008
401 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000403 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000404 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000405 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000406 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000407 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000408 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000409 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000410
Christian Heimes1a6387e2008-03-26 12:49:49 +0000411 def test_destructor(self):
412 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000413 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000414 def __del__(self):
415 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000416 try:
417 f = super(MyFileIO, self).__del__
418 except AttributeError:
419 pass
420 else:
421 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000422 def close(self):
423 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000425 def flush(self):
426 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 super(MyFileIO, self).flush()
428 f = MyFileIO(support.TESTFN, "wb")
429 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000430 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 support.gc_collect()
432 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000433 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000434 self.assertEqual(f.read(), b"xxx")
435
436 def _check_base_destructor(self, base):
437 record = []
438 class MyIO(base):
439 def __init__(self):
440 # This exercises the availability of attributes on object
441 # destruction.
442 # (in the C version, close() is called by the tp_dealloc
443 # function, not by __del__)
444 self.on_del = 1
445 self.on_close = 2
446 self.on_flush = 3
447 def __del__(self):
448 record.append(self.on_del)
449 try:
450 f = super(MyIO, self).__del__
451 except AttributeError:
452 pass
453 else:
454 f()
455 def close(self):
456 record.append(self.on_close)
457 super(MyIO, self).close()
458 def flush(self):
459 record.append(self.on_flush)
460 super(MyIO, self).flush()
461 f = MyIO()
462 del f
463 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000464 self.assertEqual(record, [1, 2, 3])
465
Antoine Pitrou19690592009-06-12 20:14:08 +0000466 def test_IOBase_destructor(self):
467 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000468
Antoine Pitrou19690592009-06-12 20:14:08 +0000469 def test_RawIOBase_destructor(self):
470 self._check_base_destructor(self.RawIOBase)
471
472 def test_BufferedIOBase_destructor(self):
473 self._check_base_destructor(self.BufferedIOBase)
474
475 def test_TextIOBase_destructor(self):
476 self._check_base_destructor(self.TextIOBase)
477
478 def test_close_flushes(self):
479 with self.open(support.TESTFN, "wb") as f:
480 f.write(b"xxx")
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
483
484 def test_array_writes(self):
485 a = array.array(b'i', range(10))
486 n = len(a.tostring())
487 with self.open(support.TESTFN, "wb", 0) as f:
488 self.assertEqual(f.write(a), n)
489 with self.open(support.TESTFN, "wb") as f:
490 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000491
492 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000493 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000494 closefd=False)
495
Antoine Pitrou19690592009-06-12 20:14:08 +0000496 def test_read_closed(self):
497 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000498 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000499 with self.open(support.TESTFN, "r") as f:
500 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000501 self.assertEqual(file.read(), "egg\n")
502 file.seek(0)
503 file.close()
504 self.assertRaises(ValueError, file.read)
505
506 def test_no_closefd_with_filename(self):
507 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000509
510 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000511 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000512 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000516 self.assertEqual(file.buffer.raw.closefd, False)
517
Antoine Pitrou19690592009-06-12 20:14:08 +0000518 def test_garbage_collection(self):
519 # FileIO objects are collected, and collecting them flushes
520 # all data to disk.
521 f = self.FileIO(support.TESTFN, "wb")
522 f.write(b"abcxxx")
523 f.f = f
524 wr = weakref.ref(f)
525 del f
526 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000527 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000528 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000529 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_unbounded_file(self):
532 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
533 zero = "/dev/zero"
534 if not os.path.exists(zero):
535 self.skipTest("{0} does not exist".format(zero))
536 if sys.maxsize > 0x7FFFFFFF:
537 self.skipTest("test can only run in a 32-bit address space")
538 if support.real_max_memuse < support._2G:
539 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000540 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000541 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000542 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000543 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000544 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000545 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000546
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000547 def test_flush_error_on_close(self):
548 f = self.open(support.TESTFN, "wb", buffering=0)
549 def bad_flush():
550 raise IOError()
551 f.flush = bad_flush
552 self.assertRaises(IOError, f.close) # exception not swallowed
553
554 def test_multi_close(self):
555 f = self.open(support.TESTFN, "wb", buffering=0)
556 f.close()
557 f.close()
558 f.close()
559 self.assertRaises(ValueError, f.flush)
560
Antoine Pitrou19690592009-06-12 20:14:08 +0000561class CIOTest(IOTest):
562 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000563
Antoine Pitrou19690592009-06-12 20:14:08 +0000564class PyIOTest(IOTest):
565 test_array_writes = unittest.skip(
566 "len(array.array) returns number of elements rather than bytelength"
567 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000568
569
Antoine Pitrou19690592009-06-12 20:14:08 +0000570class CommonBufferedTests:
571 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
572
573 def test_detach(self):
574 raw = self.MockRawIO()
575 buf = self.tp(raw)
576 self.assertIs(buf.detach(), raw)
577 self.assertRaises(ValueError, buf.detach)
578
579 def test_fileno(self):
580 rawio = self.MockRawIO()
581 bufio = self.tp(rawio)
582
583 self.assertEquals(42, bufio.fileno())
584
585 def test_no_fileno(self):
586 # XXX will we always have fileno() function? If so, kill
587 # this test. Else, write it.
588 pass
589
590 def test_invalid_args(self):
591 rawio = self.MockRawIO()
592 bufio = self.tp(rawio)
593 # Invalid whence
594 self.assertRaises(ValueError, bufio.seek, 0, -1)
595 self.assertRaises(ValueError, bufio.seek, 0, 3)
596
597 def test_override_destructor(self):
598 tp = self.tp
599 record = []
600 class MyBufferedIO(tp):
601 def __del__(self):
602 record.append(1)
603 try:
604 f = super(MyBufferedIO, self).__del__
605 except AttributeError:
606 pass
607 else:
608 f()
609 def close(self):
610 record.append(2)
611 super(MyBufferedIO, self).close()
612 def flush(self):
613 record.append(3)
614 super(MyBufferedIO, self).flush()
615 rawio = self.MockRawIO()
616 bufio = MyBufferedIO(rawio)
617 writable = bufio.writable()
618 del bufio
619 support.gc_collect()
620 if writable:
621 self.assertEqual(record, [1, 2, 3])
622 else:
623 self.assertEqual(record, [1, 2])
624
625 def test_context_manager(self):
626 # Test usability as a context manager
627 rawio = self.MockRawIO()
628 bufio = self.tp(rawio)
629 def _with():
630 with bufio:
631 pass
632 _with()
633 # bufio should now be closed, and using it a second time should raise
634 # a ValueError.
635 self.assertRaises(ValueError, _with)
636
637 def test_error_through_destructor(self):
638 # Test that the exception state is not modified by a destructor,
639 # even if close() fails.
640 rawio = self.CloseFailureIO()
641 def f():
642 self.tp(rawio).xyzzy
643 with support.captured_output("stderr") as s:
644 self.assertRaises(AttributeError, f)
645 s = s.getvalue().strip()
646 if s:
647 # The destructor *may* have printed an unraisable error, check it
648 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000649 self.assertTrue(s.startswith("Exception IOError: "), s)
650 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000651
652 def test_repr(self):
653 raw = self.MockRawIO()
654 b = self.tp(raw)
655 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
656 self.assertEqual(repr(b), "<%s>" % clsname)
657 raw.name = "dummy"
658 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
659 raw.name = b"dummy"
660 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000661
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000662 def test_flush_error_on_close(self):
663 raw = self.MockRawIO()
664 def bad_flush():
665 raise IOError()
666 raw.flush = bad_flush
667 b = self.tp(raw)
668 self.assertRaises(IOError, b.close) # exception not swallowed
669
670 def test_multi_close(self):
671 raw = self.MockRawIO()
672 b = self.tp(raw)
673 b.close()
674 b.close()
675 b.close()
676 self.assertRaises(ValueError, b.flush)
677
Christian Heimes1a6387e2008-03-26 12:49:49 +0000678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
680 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000681
Antoine Pitrou19690592009-06-12 20:14:08 +0000682 def test_constructor(self):
683 rawio = self.MockRawIO([b"abc"])
684 bufio = self.tp(rawio)
685 bufio.__init__(rawio)
686 bufio.__init__(rawio, buffer_size=1024)
687 bufio.__init__(rawio, buffer_size=16)
688 self.assertEquals(b"abc", bufio.read())
689 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
690 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
691 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
692 rawio = self.MockRawIO([b"abc"])
693 bufio.__init__(rawio)
694 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000695
Antoine Pitrou19690592009-06-12 20:14:08 +0000696 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000697 for arg in (None, 7):
698 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
699 bufio = self.tp(rawio)
700 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000701 # Invalid args
702 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000703
Antoine Pitrou19690592009-06-12 20:14:08 +0000704 def test_read1(self):
705 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
706 bufio = self.tp(rawio)
707 self.assertEquals(b"a", bufio.read(1))
708 self.assertEquals(b"b", bufio.read1(1))
709 self.assertEquals(rawio._reads, 1)
710 self.assertEquals(b"c", bufio.read1(100))
711 self.assertEquals(rawio._reads, 1)
712 self.assertEquals(b"d", bufio.read1(100))
713 self.assertEquals(rawio._reads, 2)
714 self.assertEquals(b"efg", bufio.read1(100))
715 self.assertEquals(rawio._reads, 3)
716 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000717 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000718 # Invalid args
719 self.assertRaises(ValueError, bufio.read1, -1)
720
721 def test_readinto(self):
722 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
723 bufio = self.tp(rawio)
724 b = bytearray(2)
725 self.assertEquals(bufio.readinto(b), 2)
726 self.assertEquals(b, b"ab")
727 self.assertEquals(bufio.readinto(b), 2)
728 self.assertEquals(b, b"cd")
729 self.assertEquals(bufio.readinto(b), 2)
730 self.assertEquals(b, b"ef")
731 self.assertEquals(bufio.readinto(b), 1)
732 self.assertEquals(b, b"gf")
733 self.assertEquals(bufio.readinto(b), 0)
734 self.assertEquals(b, b"gf")
735
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000736 def test_readlines(self):
737 def bufio():
738 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
739 return self.tp(rawio)
740 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
741 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
742 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
743
Antoine Pitrou19690592009-06-12 20:14:08 +0000744 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000745 data = b"abcdefghi"
746 dlen = len(data)
747
748 tests = [
749 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
750 [ 100, [ 3, 3, 3], [ dlen ] ],
751 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
752 ]
753
754 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000755 rawio = self.MockFileIO(data)
756 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000757 pos = 0
758 for nbytes in buf_read_sizes:
759 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
760 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000761 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000762 self.assertEquals(rawio.read_history, raw_read_sizes)
763
Antoine Pitrou19690592009-06-12 20:14:08 +0000764 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000765 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000766 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
767 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000768
769 self.assertEquals(b"abcd", bufio.read(6))
770 self.assertEquals(b"e", bufio.read(1))
771 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000772 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000773 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000774 self.assertEquals(b"", bufio.read())
775
Antoine Pitrou19690592009-06-12 20:14:08 +0000776 def test_read_past_eof(self):
777 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
778 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000779
780 self.assertEquals(b"abcdefg", bufio.read(9000))
781
Antoine Pitrou19690592009-06-12 20:14:08 +0000782 def test_read_all(self):
783 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
784 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000785
786 self.assertEquals(b"abcdefg", bufio.read())
787
Victor Stinner6a102812010-04-27 23:55:59 +0000788 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +0000789 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000790 try:
791 # Write out many bytes with exactly the same number of 0's,
792 # 1's... 255's. This will help us check that concurrent reading
793 # doesn't duplicate or forget contents.
794 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000795 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000796 random.shuffle(l)
797 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000798 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000799 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000800 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000801 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000802 errors = []
803 results = []
804 def f():
805 try:
806 # Intra-buffer read then buffer-flushing read
807 for n in cycle([1, 19]):
808 s = bufio.read(n)
809 if not s:
810 break
811 # list.append() is atomic
812 results.append(s)
813 except Exception as e:
814 errors.append(e)
815 raise
816 threads = [threading.Thread(target=f) for x in range(20)]
817 for t in threads:
818 t.start()
819 time.sleep(0.02) # yield
820 for t in threads:
821 t.join()
822 self.assertFalse(errors,
823 "the following exceptions were caught: %r" % errors)
824 s = b''.join(results)
825 for i in range(256):
826 c = bytes(bytearray([i]))
827 self.assertEqual(s.count(c), N)
828 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000829 support.unlink(support.TESTFN)
830
831 def test_misbehaved_io(self):
832 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
833 bufio = self.tp(rawio)
834 self.assertRaises(IOError, bufio.seek, 0)
835 self.assertRaises(IOError, bufio.tell)
836
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000837 def test_no_extraneous_read(self):
838 # Issue #9550; when the raw IO object has satisfied the read request,
839 # we should not issue any additional reads, otherwise it may block
840 # (e.g. socket).
841 bufsize = 16
842 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
843 rawio = self.MockRawIO([b"x" * n])
844 bufio = self.tp(rawio, bufsize)
845 self.assertEqual(bufio.read(n), b"x" * n)
846 # Simple case: one raw read is enough to satisfy the request.
847 self.assertEqual(rawio._extraneous_reads, 0,
848 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
849 # A more complex case where two raw reads are needed to satisfy
850 # the request.
851 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
852 bufio = self.tp(rawio, bufsize)
853 self.assertEqual(bufio.read(n), b"x" * n)
854 self.assertEqual(rawio._extraneous_reads, 0,
855 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
856
857
Antoine Pitrou19690592009-06-12 20:14:08 +0000858class CBufferedReaderTest(BufferedReaderTest):
859 tp = io.BufferedReader
860
861 def test_constructor(self):
862 BufferedReaderTest.test_constructor(self)
863 # The allocation can succeed on 32-bit builds, e.g. with more
864 # than 2GB RAM and a 64-bit kernel.
865 if sys.maxsize > 0x7FFFFFFF:
866 rawio = self.MockRawIO()
867 bufio = self.tp(rawio)
868 self.assertRaises((OverflowError, MemoryError, ValueError),
869 bufio.__init__, rawio, sys.maxsize)
870
871 def test_initialization(self):
872 rawio = self.MockRawIO([b"abc"])
873 bufio = self.tp(rawio)
874 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
875 self.assertRaises(ValueError, bufio.read)
876 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
877 self.assertRaises(ValueError, bufio.read)
878 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
879 self.assertRaises(ValueError, bufio.read)
880
881 def test_misbehaved_io_read(self):
882 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
883 bufio = self.tp(rawio)
884 # _pyio.BufferedReader seems to implement reading different, so that
885 # checking this is not so easy.
886 self.assertRaises(IOError, bufio.read, 10)
887
888 def test_garbage_collection(self):
889 # C BufferedReader objects are collected.
890 # The Python version has __del__, so it ends into gc.garbage instead
891 rawio = self.FileIO(support.TESTFN, "w+b")
892 f = self.tp(rawio)
893 f.f = f
894 wr = weakref.ref(f)
895 del f
896 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000897 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000898
899class PyBufferedReaderTest(BufferedReaderTest):
900 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000901
902
Antoine Pitrou19690592009-06-12 20:14:08 +0000903class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
904 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000905
Antoine Pitrou19690592009-06-12 20:14:08 +0000906 def test_constructor(self):
907 rawio = self.MockRawIO()
908 bufio = self.tp(rawio)
909 bufio.__init__(rawio)
910 bufio.__init__(rawio, buffer_size=1024)
911 bufio.__init__(rawio, buffer_size=16)
912 self.assertEquals(3, bufio.write(b"abc"))
913 bufio.flush()
914 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
915 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
916 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
917 bufio.__init__(rawio)
918 self.assertEquals(3, bufio.write(b"ghi"))
919 bufio.flush()
920 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000921
Antoine Pitrou19690592009-06-12 20:14:08 +0000922 def test_detach_flush(self):
923 raw = self.MockRawIO()
924 buf = self.tp(raw)
925 buf.write(b"howdy!")
926 self.assertFalse(raw._write_stack)
927 buf.detach()
928 self.assertEqual(raw._write_stack, [b"howdy!"])
929
930 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000931 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000932 writer = self.MockRawIO()
933 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000934 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000935 self.assertFalse(writer._write_stack)
936
Antoine Pitrou19690592009-06-12 20:14:08 +0000937 def test_write_overflow(self):
938 writer = self.MockRawIO()
939 bufio = self.tp(writer, 8)
940 contents = b"abcdefghijklmnop"
941 for n in range(0, len(contents), 3):
942 bufio.write(contents[n:n+3])
943 flushed = b"".join(writer._write_stack)
944 # At least (total - 8) bytes were implicitly flushed, perhaps more
945 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000946 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947
Antoine Pitrou19690592009-06-12 20:14:08 +0000948 def check_writes(self, intermediate_func):
949 # Lots of writes, test the flushed output is as expected.
950 contents = bytes(range(256)) * 1000
951 n = 0
952 writer = self.MockRawIO()
953 bufio = self.tp(writer, 13)
954 # Generator of write sizes: repeat each N 15 times then proceed to N+1
955 def gen_sizes():
956 for size in count(1):
957 for i in range(15):
958 yield size
959 sizes = gen_sizes()
960 while n < len(contents):
961 size = min(next(sizes), len(contents) - n)
962 self.assertEquals(bufio.write(contents[n:n+size]), size)
963 intermediate_func(bufio)
964 n += size
965 bufio.flush()
966 self.assertEquals(contents,
967 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000968
Antoine Pitrou19690592009-06-12 20:14:08 +0000969 def test_writes(self):
970 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000971
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 def test_writes_and_flushes(self):
973 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000974
Antoine Pitrou19690592009-06-12 20:14:08 +0000975 def test_writes_and_seeks(self):
976 def _seekabs(bufio):
977 pos = bufio.tell()
978 bufio.seek(pos + 1, 0)
979 bufio.seek(pos - 1, 0)
980 bufio.seek(pos, 0)
981 self.check_writes(_seekabs)
982 def _seekrel(bufio):
983 pos = bufio.seek(0, 1)
984 bufio.seek(+1, 1)
985 bufio.seek(-1, 1)
986 bufio.seek(pos, 0)
987 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000988
Antoine Pitrou19690592009-06-12 20:14:08 +0000989 def test_writes_and_truncates(self):
990 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000991
Antoine Pitrou19690592009-06-12 20:14:08 +0000992 def test_write_non_blocking(self):
993 raw = self.MockNonBlockWriterIO()
994 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000995
Antoine Pitrou19690592009-06-12 20:14:08 +0000996 self.assertEquals(bufio.write(b"abcd"), 4)
997 self.assertEquals(bufio.write(b"efghi"), 5)
998 # 1 byte will be written, the rest will be buffered
999 raw.block_on(b"k")
1000 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001001
Antoine Pitrou19690592009-06-12 20:14:08 +00001002 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1003 raw.block_on(b"0")
1004 try:
1005 bufio.write(b"opqrwxyz0123456789")
1006 except self.BlockingIOError as e:
1007 written = e.characters_written
1008 else:
1009 self.fail("BlockingIOError should have been raised")
1010 self.assertEquals(written, 16)
1011 self.assertEquals(raw.pop_written(),
1012 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001013
Antoine Pitrou19690592009-06-12 20:14:08 +00001014 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1015 s = raw.pop_written()
1016 # Previously buffered bytes were flushed
1017 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001018
Antoine Pitrou19690592009-06-12 20:14:08 +00001019 def test_write_and_rewind(self):
1020 raw = io.BytesIO()
1021 bufio = self.tp(raw, 4)
1022 self.assertEqual(bufio.write(b"abcdef"), 6)
1023 self.assertEqual(bufio.tell(), 6)
1024 bufio.seek(0, 0)
1025 self.assertEqual(bufio.write(b"XY"), 2)
1026 bufio.seek(6, 0)
1027 self.assertEqual(raw.getvalue(), b"XYcdef")
1028 self.assertEqual(bufio.write(b"123456"), 6)
1029 bufio.flush()
1030 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001031
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 def test_flush(self):
1033 writer = self.MockRawIO()
1034 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001035 bufio.write(b"abc")
1036 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001037 self.assertEquals(b"abc", writer._write_stack[0])
1038
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 def test_destructor(self):
1040 writer = self.MockRawIO()
1041 bufio = self.tp(writer, 8)
1042 bufio.write(b"abc")
1043 del bufio
1044 support.gc_collect()
1045 self.assertEquals(b"abc", writer._write_stack[0])
1046
1047 def test_truncate(self):
1048 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001049 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001050 bufio = self.tp(raw, 8)
1051 bufio.write(b"abcdef")
1052 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001053 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001054 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001055 self.assertEqual(f.read(), b"abc")
1056
Victor Stinner6a102812010-04-27 23:55:59 +00001057 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +00001058 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001059 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001060 # Write out many bytes from many threads and test they were
1061 # all flushed.
1062 N = 1000
1063 contents = bytes(range(256)) * N
1064 sizes = cycle([1, 19])
1065 n = 0
1066 queue = deque()
1067 while n < len(contents):
1068 size = next(sizes)
1069 queue.append(contents[n:n+size])
1070 n += size
1071 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001072 # We use a real file object because it allows us to
1073 # exercise situations where the GIL is released before
1074 # writing the buffer to the raw streams. This is in addition
1075 # to concurrency issues due to switching threads in the middle
1076 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001077 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001078 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001079 errors = []
1080 def f():
1081 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 while True:
1083 try:
1084 s = queue.popleft()
1085 except IndexError:
1086 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001087 bufio.write(s)
1088 except Exception as e:
1089 errors.append(e)
1090 raise
1091 threads = [threading.Thread(target=f) for x in range(20)]
1092 for t in threads:
1093 t.start()
1094 time.sleep(0.02) # yield
1095 for t in threads:
1096 t.join()
1097 self.assertFalse(errors,
1098 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001099 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001100 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001101 s = f.read()
1102 for i in range(256):
1103 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001104 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001105 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001106
Antoine Pitrou19690592009-06-12 20:14:08 +00001107 def test_misbehaved_io(self):
1108 rawio = self.MisbehavedRawIO()
1109 bufio = self.tp(rawio, 5)
1110 self.assertRaises(IOError, bufio.seek, 0)
1111 self.assertRaises(IOError, bufio.tell)
1112 self.assertRaises(IOError, bufio.write, b"abcdef")
1113
1114 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001115 with support.check_warnings(("max_buffer_size is deprecated",
1116 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001118
1119
1120class CBufferedWriterTest(BufferedWriterTest):
1121 tp = io.BufferedWriter
1122
1123 def test_constructor(self):
1124 BufferedWriterTest.test_constructor(self)
1125 # The allocation can succeed on 32-bit builds, e.g. with more
1126 # than 2GB RAM and a 64-bit kernel.
1127 if sys.maxsize > 0x7FFFFFFF:
1128 rawio = self.MockRawIO()
1129 bufio = self.tp(rawio)
1130 self.assertRaises((OverflowError, MemoryError, ValueError),
1131 bufio.__init__, rawio, sys.maxsize)
1132
1133 def test_initialization(self):
1134 rawio = self.MockRawIO()
1135 bufio = self.tp(rawio)
1136 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1137 self.assertRaises(ValueError, bufio.write, b"def")
1138 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1139 self.assertRaises(ValueError, bufio.write, b"def")
1140 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1141 self.assertRaises(ValueError, bufio.write, b"def")
1142
1143 def test_garbage_collection(self):
1144 # C BufferedWriter objects are collected, and collecting them flushes
1145 # all data to disk.
1146 # The Python version has __del__, so it ends into gc.garbage instead
1147 rawio = self.FileIO(support.TESTFN, "w+b")
1148 f = self.tp(rawio)
1149 f.write(b"123xxx")
1150 f.x = f
1151 wr = weakref.ref(f)
1152 del f
1153 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001154 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001155 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001156 self.assertEqual(f.read(), b"123xxx")
1157
1158
1159class PyBufferedWriterTest(BufferedWriterTest):
1160 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001161
1162class BufferedRWPairTest(unittest.TestCase):
1163
Antoine Pitrou19690592009-06-12 20:14:08 +00001164 def test_constructor(self):
1165 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001166 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001167
Antoine Pitrou19690592009-06-12 20:14:08 +00001168 def test_detach(self):
1169 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1170 self.assertRaises(self.UnsupportedOperation, pair.detach)
1171
1172 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001173 with support.check_warnings(("max_buffer_size is deprecated",
1174 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001175 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001176
1177 def test_constructor_with_not_readable(self):
1178 class NotReadable(MockRawIO):
1179 def readable(self):
1180 return False
1181
1182 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1183
1184 def test_constructor_with_not_writeable(self):
1185 class NotWriteable(MockRawIO):
1186 def writable(self):
1187 return False
1188
1189 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1190
1191 def test_read(self):
1192 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1193
1194 self.assertEqual(pair.read(3), b"abc")
1195 self.assertEqual(pair.read(1), b"d")
1196 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001197 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1198 self.assertEqual(pair.read(None), b"abc")
1199
1200 def test_readlines(self):
1201 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1202 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1203 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1204 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001205
1206 def test_read1(self):
1207 # .read1() is delegated to the underlying reader object, so this test
1208 # can be shallow.
1209 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1210
1211 self.assertEqual(pair.read1(3), b"abc")
1212
1213 def test_readinto(self):
1214 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1215
1216 data = bytearray(5)
1217 self.assertEqual(pair.readinto(data), 5)
1218 self.assertEqual(data, b"abcde")
1219
1220 def test_write(self):
1221 w = self.MockRawIO()
1222 pair = self.tp(self.MockRawIO(), w)
1223
1224 pair.write(b"abc")
1225 pair.flush()
1226 pair.write(b"def")
1227 pair.flush()
1228 self.assertEqual(w._write_stack, [b"abc", b"def"])
1229
1230 def test_peek(self):
1231 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1232
1233 self.assertTrue(pair.peek(3).startswith(b"abc"))
1234 self.assertEqual(pair.read(3), b"abc")
1235
1236 def test_readable(self):
1237 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1238 self.assertTrue(pair.readable())
1239
1240 def test_writeable(self):
1241 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1242 self.assertTrue(pair.writable())
1243
1244 def test_seekable(self):
1245 # BufferedRWPairs are never seekable, even if their readers and writers
1246 # are.
1247 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1248 self.assertFalse(pair.seekable())
1249
1250 # .flush() is delegated to the underlying writer object and has been
1251 # tested in the test_write method.
1252
1253 def test_close_and_closed(self):
1254 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1255 self.assertFalse(pair.closed)
1256 pair.close()
1257 self.assertTrue(pair.closed)
1258
1259 def test_isatty(self):
1260 class SelectableIsAtty(MockRawIO):
1261 def __init__(self, isatty):
1262 MockRawIO.__init__(self)
1263 self._isatty = isatty
1264
1265 def isatty(self):
1266 return self._isatty
1267
1268 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1269 self.assertFalse(pair.isatty())
1270
1271 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1272 self.assertTrue(pair.isatty())
1273
1274 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1275 self.assertTrue(pair.isatty())
1276
1277 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1278 self.assertTrue(pair.isatty())
1279
1280class CBufferedRWPairTest(BufferedRWPairTest):
1281 tp = io.BufferedRWPair
1282
1283class PyBufferedRWPairTest(BufferedRWPairTest):
1284 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001285
1286
Antoine Pitrou19690592009-06-12 20:14:08 +00001287class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1288 read_mode = "rb+"
1289 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001290
Antoine Pitrou19690592009-06-12 20:14:08 +00001291 def test_constructor(self):
1292 BufferedReaderTest.test_constructor(self)
1293 BufferedWriterTest.test_constructor(self)
1294
1295 def test_read_and_write(self):
1296 raw = self.MockRawIO((b"asdf", b"ghjk"))
1297 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001298
1299 self.assertEqual(b"as", rw.read(2))
1300 rw.write(b"ddd")
1301 rw.write(b"eee")
1302 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001303 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001304 self.assertEquals(b"dddeee", raw._write_stack[0])
1305
Antoine Pitrou19690592009-06-12 20:14:08 +00001306 def test_seek_and_tell(self):
1307 raw = self.BytesIO(b"asdfghjkl")
1308 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001309
1310 self.assertEquals(b"as", rw.read(2))
1311 self.assertEquals(2, rw.tell())
1312 rw.seek(0, 0)
1313 self.assertEquals(b"asdf", rw.read(4))
1314
1315 rw.write(b"asdf")
1316 rw.seek(0, 0)
1317 self.assertEquals(b"asdfasdfl", rw.read())
1318 self.assertEquals(9, rw.tell())
1319 rw.seek(-4, 2)
1320 self.assertEquals(5, rw.tell())
1321 rw.seek(2, 1)
1322 self.assertEquals(7, rw.tell())
1323 self.assertEquals(b"fl", rw.read(11))
1324 self.assertRaises(TypeError, rw.seek, 0.0)
1325
Antoine Pitrou19690592009-06-12 20:14:08 +00001326 def check_flush_and_read(self, read_func):
1327 raw = self.BytesIO(b"abcdefghi")
1328 bufio = self.tp(raw)
1329
1330 self.assertEquals(b"ab", read_func(bufio, 2))
1331 bufio.write(b"12")
1332 self.assertEquals(b"ef", read_func(bufio, 2))
1333 self.assertEquals(6, bufio.tell())
1334 bufio.flush()
1335 self.assertEquals(6, bufio.tell())
1336 self.assertEquals(b"ghi", read_func(bufio))
1337 raw.seek(0, 0)
1338 raw.write(b"XYZ")
1339 # flush() resets the read buffer
1340 bufio.flush()
1341 bufio.seek(0, 0)
1342 self.assertEquals(b"XYZ", read_func(bufio, 3))
1343
1344 def test_flush_and_read(self):
1345 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1346
1347 def test_flush_and_readinto(self):
1348 def _readinto(bufio, n=-1):
1349 b = bytearray(n if n >= 0 else 9999)
1350 n = bufio.readinto(b)
1351 return bytes(b[:n])
1352 self.check_flush_and_read(_readinto)
1353
1354 def test_flush_and_peek(self):
1355 def _peek(bufio, n=-1):
1356 # This relies on the fact that the buffer can contain the whole
1357 # raw stream, otherwise peek() can return less.
1358 b = bufio.peek(n)
1359 if n != -1:
1360 b = b[:n]
1361 bufio.seek(len(b), 1)
1362 return b
1363 self.check_flush_and_read(_peek)
1364
1365 def test_flush_and_write(self):
1366 raw = self.BytesIO(b"abcdefghi")
1367 bufio = self.tp(raw)
1368
1369 bufio.write(b"123")
1370 bufio.flush()
1371 bufio.write(b"45")
1372 bufio.flush()
1373 bufio.seek(0, 0)
1374 self.assertEquals(b"12345fghi", raw.getvalue())
1375 self.assertEquals(b"12345fghi", bufio.read())
1376
1377 def test_threads(self):
1378 BufferedReaderTest.test_threads(self)
1379 BufferedWriterTest.test_threads(self)
1380
1381 def test_writes_and_peek(self):
1382 def _peek(bufio):
1383 bufio.peek(1)
1384 self.check_writes(_peek)
1385 def _peek(bufio):
1386 pos = bufio.tell()
1387 bufio.seek(-1, 1)
1388 bufio.peek(1)
1389 bufio.seek(pos, 0)
1390 self.check_writes(_peek)
1391
1392 def test_writes_and_reads(self):
1393 def _read(bufio):
1394 bufio.seek(-1, 1)
1395 bufio.read(1)
1396 self.check_writes(_read)
1397
1398 def test_writes_and_read1s(self):
1399 def _read1(bufio):
1400 bufio.seek(-1, 1)
1401 bufio.read1(1)
1402 self.check_writes(_read1)
1403
1404 def test_writes_and_readintos(self):
1405 def _read(bufio):
1406 bufio.seek(-1, 1)
1407 bufio.readinto(bytearray(1))
1408 self.check_writes(_read)
1409
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001410 def test_write_after_readahead(self):
1411 # Issue #6629: writing after the buffer was filled by readahead should
1412 # first rewind the raw stream.
1413 for overwrite_size in [1, 5]:
1414 raw = self.BytesIO(b"A" * 10)
1415 bufio = self.tp(raw, 4)
1416 # Trigger readahead
1417 self.assertEqual(bufio.read(1), b"A")
1418 self.assertEqual(bufio.tell(), 1)
1419 # Overwriting should rewind the raw stream if it needs so
1420 bufio.write(b"B" * overwrite_size)
1421 self.assertEqual(bufio.tell(), overwrite_size + 1)
1422 # If the write size was smaller than the buffer size, flush() and
1423 # check that rewind happens.
1424 bufio.flush()
1425 self.assertEqual(bufio.tell(), overwrite_size + 1)
1426 s = raw.getvalue()
1427 self.assertEqual(s,
1428 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1429
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001430 def test_truncate_after_read_or_write(self):
1431 raw = self.BytesIO(b"A" * 10)
1432 bufio = self.tp(raw, 100)
1433 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1434 self.assertEqual(bufio.truncate(), 2)
1435 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1436 self.assertEqual(bufio.truncate(), 4)
1437
Antoine Pitrou19690592009-06-12 20:14:08 +00001438 def test_misbehaved_io(self):
1439 BufferedReaderTest.test_misbehaved_io(self)
1440 BufferedWriterTest.test_misbehaved_io(self)
1441
1442class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1443 tp = io.BufferedRandom
1444
1445 def test_constructor(self):
1446 BufferedRandomTest.test_constructor(self)
1447 # The allocation can succeed on 32-bit builds, e.g. with more
1448 # than 2GB RAM and a 64-bit kernel.
1449 if sys.maxsize > 0x7FFFFFFF:
1450 rawio = self.MockRawIO()
1451 bufio = self.tp(rawio)
1452 self.assertRaises((OverflowError, MemoryError, ValueError),
1453 bufio.__init__, rawio, sys.maxsize)
1454
1455 def test_garbage_collection(self):
1456 CBufferedReaderTest.test_garbage_collection(self)
1457 CBufferedWriterTest.test_garbage_collection(self)
1458
1459class PyBufferedRandomTest(BufferedRandomTest):
1460 tp = pyio.BufferedRandom
1461
1462
Christian Heimes1a6387e2008-03-26 12:49:49 +00001463# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1464# properties:
1465# - A single output character can correspond to many bytes of input.
1466# - The number of input bytes to complete the character can be
1467# undetermined until the last input byte is received.
1468# - The number of input bytes can vary depending on previous input.
1469# - A single input byte can correspond to many characters of output.
1470# - The number of output characters can be undetermined until the
1471# last input byte is received.
1472# - The number of output characters can vary depending on previous input.
1473
1474class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1475 """
1476 For testing seek/tell behavior with a stateful, buffering decoder.
1477
1478 Input is a sequence of words. Words may be fixed-length (length set
1479 by input) or variable-length (period-terminated). In variable-length
1480 mode, extra periods are ignored. Possible words are:
1481 - 'i' followed by a number sets the input length, I (maximum 99).
1482 When I is set to 0, words are space-terminated.
1483 - 'o' followed by a number sets the output length, O (maximum 99).
1484 - Any other word is converted into a word followed by a period on
1485 the output. The output word consists of the input word truncated
1486 or padded out with hyphens to make its length equal to O. If O
1487 is 0, the word is output verbatim without truncating or padding.
1488 I and O are initially set to 1. When I changes, any buffered input is
1489 re-scanned according to the new I. EOF also terminates the last word.
1490 """
1491
1492 def __init__(self, errors='strict'):
1493 codecs.IncrementalDecoder.__init__(self, errors)
1494 self.reset()
1495
1496 def __repr__(self):
1497 return '<SID %x>' % id(self)
1498
1499 def reset(self):
1500 self.i = 1
1501 self.o = 1
1502 self.buffer = bytearray()
1503
1504 def getstate(self):
1505 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1506 return bytes(self.buffer), i*100 + o
1507
1508 def setstate(self, state):
1509 buffer, io = state
1510 self.buffer = bytearray(buffer)
1511 i, o = divmod(io, 100)
1512 self.i, self.o = i ^ 1, o ^ 1
1513
1514 def decode(self, input, final=False):
1515 output = ''
1516 for b in input:
1517 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001518 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001519 if self.buffer:
1520 output += self.process_word()
1521 else:
1522 self.buffer.append(b)
1523 else: # fixed-length, terminate after self.i bytes
1524 self.buffer.append(b)
1525 if len(self.buffer) == self.i:
1526 output += self.process_word()
1527 if final and self.buffer: # EOF terminates the last word
1528 output += self.process_word()
1529 return output
1530
1531 def process_word(self):
1532 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001533 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001534 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001535 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001536 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1537 else:
1538 output = self.buffer.decode('ascii')
1539 if len(output) < self.o:
1540 output += '-'*self.o # pad out with hyphens
1541 if self.o:
1542 output = output[:self.o] # truncate to output length
1543 output += '.'
1544 self.buffer = bytearray()
1545 return output
1546
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001547 codecEnabled = False
1548
1549 @classmethod
1550 def lookupTestDecoder(cls, name):
1551 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001552 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001553 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001554 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001555 incrementalencoder=None,
1556 streamreader=None, streamwriter=None,
1557 incrementaldecoder=cls)
1558
1559# Register the previous decoder for testing.
1560# Disabled by default, tests will enable it.
1561codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1562
1563
Christian Heimes1a6387e2008-03-26 12:49:49 +00001564class StatefulIncrementalDecoderTest(unittest.TestCase):
1565 """
1566 Make sure the StatefulIncrementalDecoder actually works.
1567 """
1568
1569 test_cases = [
1570 # I=1, O=1 (fixed-length input == fixed-length output)
1571 (b'abcd', False, 'a.b.c.d.'),
1572 # I=0, O=0 (variable-length input, variable-length output)
1573 (b'oiabcd', True, 'abcd.'),
1574 # I=0, O=0 (should ignore extra periods)
1575 (b'oi...abcd...', True, 'abcd.'),
1576 # I=0, O=6 (variable-length input, fixed-length output)
1577 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1578 # I=2, O=6 (fixed-length input < fixed-length output)
1579 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1580 # I=6, O=3 (fixed-length input > fixed-length output)
1581 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1582 # I=0, then 3; O=29, then 15 (with longer output)
1583 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1584 'a----------------------------.' +
1585 'b----------------------------.' +
1586 'cde--------------------------.' +
1587 'abcdefghijabcde.' +
1588 'a.b------------.' +
1589 '.c.------------.' +
1590 'd.e------------.' +
1591 'k--------------.' +
1592 'l--------------.' +
1593 'm--------------.')
1594 ]
1595
Antoine Pitrou19690592009-06-12 20:14:08 +00001596 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001597 # Try a few one-shot test cases.
1598 for input, eof, output in self.test_cases:
1599 d = StatefulIncrementalDecoder()
1600 self.assertEquals(d.decode(input, eof), output)
1601
1602 # Also test an unfinished decode, followed by forcing EOF.
1603 d = StatefulIncrementalDecoder()
1604 self.assertEquals(d.decode(b'oiabcd'), '')
1605 self.assertEquals(d.decode(b'', 1), 'abcd.')
1606
1607class TextIOWrapperTest(unittest.TestCase):
1608
1609 def setUp(self):
1610 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1611 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001612 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001613
1614 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001615 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001616
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 def test_constructor(self):
1618 r = self.BytesIO(b"\xc3\xa9\n\n")
1619 b = self.BufferedReader(r, 1000)
1620 t = self.TextIOWrapper(b)
1621 t.__init__(b, encoding="latin1", newline="\r\n")
1622 self.assertEquals(t.encoding, "latin1")
1623 self.assertEquals(t.line_buffering, False)
1624 t.__init__(b, encoding="utf8", line_buffering=True)
1625 self.assertEquals(t.encoding, "utf8")
1626 self.assertEquals(t.line_buffering, True)
1627 self.assertEquals("\xe9\n", t.readline())
1628 self.assertRaises(TypeError, t.__init__, b, newline=42)
1629 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1630
1631 def test_detach(self):
1632 r = self.BytesIO()
1633 b = self.BufferedWriter(r)
1634 t = self.TextIOWrapper(b)
1635 self.assertIs(t.detach(), b)
1636
1637 t = self.TextIOWrapper(b, encoding="ascii")
1638 t.write("howdy")
1639 self.assertFalse(r.getvalue())
1640 t.detach()
1641 self.assertEqual(r.getvalue(), b"howdy")
1642 self.assertRaises(ValueError, t.detach)
1643
1644 def test_repr(self):
1645 raw = self.BytesIO("hello".encode("utf-8"))
1646 b = self.BufferedReader(raw)
1647 t = self.TextIOWrapper(b, encoding="utf-8")
1648 modname = self.TextIOWrapper.__module__
1649 self.assertEqual(repr(t),
1650 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1651 raw.name = "dummy"
1652 self.assertEqual(repr(t),
1653 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1654 raw.name = b"dummy"
1655 self.assertEqual(repr(t),
1656 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1657
1658 def test_line_buffering(self):
1659 r = self.BytesIO()
1660 b = self.BufferedWriter(r, 1000)
1661 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1662 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001663 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001664 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001665 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001666 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1668
Antoine Pitrou19690592009-06-12 20:14:08 +00001669 def test_encoding(self):
1670 # Check the encoding attribute is always set, and valid
1671 b = self.BytesIO()
1672 t = self.TextIOWrapper(b, encoding="utf8")
1673 self.assertEqual(t.encoding, "utf8")
1674 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001675 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001676 codecs.lookup(t.encoding)
1677
1678 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001679 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001680 b = self.BytesIO(b"abc\n\xff\n")
1681 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001682 self.assertRaises(UnicodeError, t.read)
1683 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001684 b = self.BytesIO(b"abc\n\xff\n")
1685 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001686 self.assertRaises(UnicodeError, t.read)
1687 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001688 b = self.BytesIO(b"abc\n\xff\n")
1689 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001690 self.assertEquals(t.read(), "abc\n\n")
1691 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001692 b = self.BytesIO(b"abc\n\xff\n")
1693 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1694 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001695
Antoine Pitrou19690592009-06-12 20:14:08 +00001696 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001697 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001698 b = self.BytesIO()
1699 t = self.TextIOWrapper(b, encoding="ascii")
1700 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001701 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001702 b = self.BytesIO()
1703 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1704 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001705 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001706 b = self.BytesIO()
1707 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001708 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001709 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001710 t.flush()
1711 self.assertEquals(b.getvalue(), b"abcdef\n")
1712 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001713 b = self.BytesIO()
1714 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001715 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001716 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001717 t.flush()
1718 self.assertEquals(b.getvalue(), b"abc?def\n")
1719
Antoine Pitrou19690592009-06-12 20:14:08 +00001720 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001721 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1722
1723 tests = [
1724 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1725 [ '', input_lines ],
1726 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1727 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1728 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1729 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001730 encodings = (
1731 'utf-8', 'latin-1',
1732 'utf-16', 'utf-16-le', 'utf-16-be',
1733 'utf-32', 'utf-32-le', 'utf-32-be',
1734 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001735
1736 # Try a range of buffer sizes to test the case where \r is the last
1737 # character in TextIOWrapper._pending_line.
1738 for encoding in encodings:
1739 # XXX: str.encode() should return bytes
1740 data = bytes(''.join(input_lines).encode(encoding))
1741 for do_reads in (False, True):
1742 for bufsize in range(1, 10):
1743 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001744 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1745 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001746 encoding=encoding)
1747 if do_reads:
1748 got_lines = []
1749 while True:
1750 c2 = textio.read(2)
1751 if c2 == '':
1752 break
1753 self.assertEquals(len(c2), 2)
1754 got_lines.append(c2 + textio.readline())
1755 else:
1756 got_lines = list(textio)
1757
1758 for got_line, exp_line in zip(got_lines, exp_lines):
1759 self.assertEquals(got_line, exp_line)
1760 self.assertEquals(len(got_lines), len(exp_lines))
1761
Antoine Pitrou19690592009-06-12 20:14:08 +00001762 def test_newlines_input(self):
1763 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001764 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1765 for newline, expected in [
1766 (None, normalized.decode("ascii").splitlines(True)),
1767 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001768 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1769 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1770 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001771 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001772 buf = self.BytesIO(testdata)
1773 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001774 self.assertEquals(txt.readlines(), expected)
1775 txt.seek(0)
1776 self.assertEquals(txt.read(), "".join(expected))
1777
Antoine Pitrou19690592009-06-12 20:14:08 +00001778 def test_newlines_output(self):
1779 testdict = {
1780 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1781 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1782 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1783 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1784 }
1785 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1786 for newline, expected in tests:
1787 buf = self.BytesIO()
1788 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1789 txt.write("AAA\nB")
1790 txt.write("BB\nCCC\n")
1791 txt.write("X\rY\r\nZ")
1792 txt.flush()
1793 self.assertEquals(buf.closed, False)
1794 self.assertEquals(buf.getvalue(), expected)
1795
1796 def test_destructor(self):
1797 l = []
1798 base = self.BytesIO
1799 class MyBytesIO(base):
1800 def close(self):
1801 l.append(self.getvalue())
1802 base.close(self)
1803 b = MyBytesIO()
1804 t = self.TextIOWrapper(b, encoding="ascii")
1805 t.write("abc")
1806 del t
1807 support.gc_collect()
1808 self.assertEquals([b"abc"], l)
1809
1810 def test_override_destructor(self):
1811 record = []
1812 class MyTextIO(self.TextIOWrapper):
1813 def __del__(self):
1814 record.append(1)
1815 try:
1816 f = super(MyTextIO, self).__del__
1817 except AttributeError:
1818 pass
1819 else:
1820 f()
1821 def close(self):
1822 record.append(2)
1823 super(MyTextIO, self).close()
1824 def flush(self):
1825 record.append(3)
1826 super(MyTextIO, self).flush()
1827 b = self.BytesIO()
1828 t = MyTextIO(b, encoding="ascii")
1829 del t
1830 support.gc_collect()
1831 self.assertEqual(record, [1, 2, 3])
1832
1833 def test_error_through_destructor(self):
1834 # Test that the exception state is not modified by a destructor,
1835 # even if close() fails.
1836 rawio = self.CloseFailureIO()
1837 def f():
1838 self.TextIOWrapper(rawio).xyzzy
1839 with support.captured_output("stderr") as s:
1840 self.assertRaises(AttributeError, f)
1841 s = s.getvalue().strip()
1842 if s:
1843 # The destructor *may* have printed an unraisable error, check it
1844 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001845 self.assertTrue(s.startswith("Exception IOError: "), s)
1846 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001847
1848 # Systematic tests of the text I/O API
1849
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001851 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1852 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001853 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001854 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001855 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001857 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001858 f._CHUNK_SIZE = chunksize
1859 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001860 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001861 cookie = f.tell()
1862 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001863 self.assertEquals(f.read(None), "abc")
1864 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001865 self.assertEquals(f.read(2), "ab")
1866 self.assertEquals(f.read(1), "c")
1867 self.assertEquals(f.read(1), "")
1868 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001869 self.assertEquals(f.tell(), cookie)
1870 self.assertEquals(f.seek(0), 0)
1871 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001872 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001873 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 if enc.startswith("utf"):
1876 self.multi_line_test(f, enc)
1877 f.close()
1878
1879 def multi_line_test(self, f, enc):
1880 f.seek(0)
1881 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001882 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001883 wlines = []
1884 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1885 chars = []
1886 for i in range(size):
1887 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 wlines.append((f.tell(), line))
1890 f.write(line)
1891 f.seek(0)
1892 rlines = []
1893 while True:
1894 pos = f.tell()
1895 line = f.readline()
1896 if not line:
1897 break
1898 rlines.append((pos, line))
1899 self.assertEquals(rlines, wlines)
1900
Antoine Pitrou19690592009-06-12 20:14:08 +00001901 def test_telling(self):
1902 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001903 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001904 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001905 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001907 p2 = f.tell()
1908 f.seek(0)
1909 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001910 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001911 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001912 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001913 self.assertEquals(f.tell(), p2)
1914 f.seek(0)
1915 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001916 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917 self.assertRaises(IOError, f.tell)
1918 self.assertEquals(f.tell(), p2)
1919 f.close()
1920
Antoine Pitrou19690592009-06-12 20:14:08 +00001921 def test_seeking(self):
1922 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923 prefix_size = chunk_size - 2
1924 u_prefix = "a" * prefix_size
1925 prefix = bytes(u_prefix.encode("utf-8"))
1926 self.assertEquals(len(u_prefix), len(prefix))
1927 u_suffix = "\u8888\n"
1928 suffix = bytes(u_suffix.encode("utf-8"))
1929 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001930 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931 f.write(line*2)
1932 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001933 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001934 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 self.assertEquals(f.tell(), prefix_size)
1937 self.assertEquals(f.readline(), u_suffix)
1938
Antoine Pitrou19690592009-06-12 20:14:08 +00001939 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001940 # Regression test for a specific bug
1941 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001942 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943 f.write(data)
1944 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001945 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001946 f._CHUNK_SIZE # Just test that it exists
1947 f._CHUNK_SIZE = 2
1948 f.readline()
1949 f.tell()
1950
Antoine Pitrou19690592009-06-12 20:14:08 +00001951 def test_seek_and_tell(self):
1952 #Test seek/tell using the StatefulIncrementalDecoder.
1953 # Make test faster by doing smaller seeks
1954 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001955
Antoine Pitrou19690592009-06-12 20:14:08 +00001956 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957 """Tell/seek to various points within a data stream and ensure
1958 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001960 f.write(data)
1961 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001962 f = self.open(support.TESTFN, encoding='test_decoder')
1963 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001964 decoded = f.read()
1965 f.close()
1966
1967 for i in range(min_pos, len(decoded) + 1): # seek positions
1968 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001969 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001970 self.assertEquals(f.read(i), decoded[:i])
1971 cookie = f.tell()
1972 self.assertEquals(f.read(j), decoded[i:i + j])
1973 f.seek(cookie)
1974 self.assertEquals(f.read(), decoded[i:])
1975 f.close()
1976
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001977 # Enable the test decoder.
1978 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001979
1980 # Run the tests.
1981 try:
1982 # Try each test case.
1983 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985
1986 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1988 offset = CHUNK_SIZE - len(input)//2
1989 prefix = b'.'*offset
1990 # Don't bother seeking into the prefix (takes too long).
1991 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001992 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001993
1994 # Ensure our test decoder won't interfere with subsequent tests.
1995 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001996 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001997
Antoine Pitrou19690592009-06-12 20:14:08 +00001998 def test_encoded_writes(self):
1999 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000 tests = ("utf-16",
2001 "utf-16-le",
2002 "utf-16-be",
2003 "utf-32",
2004 "utf-32-le",
2005 "utf-32-be")
2006 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002007 buf = self.BytesIO()
2008 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002009 # Check if the BOM is written only once (see issue1753).
2010 f.write(data)
2011 f.write(data)
2012 f.seek(0)
2013 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002014 f.seek(0)
2015 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002016 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2017
Antoine Pitrou19690592009-06-12 20:14:08 +00002018 def test_unreadable(self):
2019 class UnReadable(self.BytesIO):
2020 def readable(self):
2021 return False
2022 txt = self.TextIOWrapper(UnReadable())
2023 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002024
Antoine Pitrou19690592009-06-12 20:14:08 +00002025 def test_read_one_by_one(self):
2026 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 reads = ""
2028 while True:
2029 c = txt.read(1)
2030 if not c:
2031 break
2032 reads += c
2033 self.assertEquals(reads, "AA\nBB")
2034
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002035 def test_readlines(self):
2036 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2037 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2038 txt.seek(0)
2039 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2040 txt.seek(0)
2041 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2042
Christian Heimes1a6387e2008-03-26 12:49:49 +00002043 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002044 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002047 reads = ""
2048 while True:
2049 c = txt.read(128)
2050 if not c:
2051 break
2052 reads += c
2053 self.assertEquals(reads, "A"*127+"\nB")
2054
2055 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002056 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002057
2058 # read one char at a time
2059 reads = ""
2060 while True:
2061 c = txt.read(1)
2062 if not c:
2063 break
2064 reads += c
2065 self.assertEquals(reads, self.normalized)
2066
2067 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002068 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069 txt._CHUNK_SIZE = 4
2070
2071 reads = ""
2072 while True:
2073 c = txt.read(4)
2074 if not c:
2075 break
2076 reads += c
2077 self.assertEquals(reads, self.normalized)
2078
2079 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002080 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002081 txt._CHUNK_SIZE = 4
2082
2083 reads = txt.read(4)
2084 reads += txt.read(4)
2085 reads += txt.readline()
2086 reads += txt.readline()
2087 reads += txt.readline()
2088 self.assertEquals(reads, self.normalized)
2089
2090 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002091 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002092 txt._CHUNK_SIZE = 4
2093
2094 reads = txt.read(4)
2095 reads += txt.read()
2096 self.assertEquals(reads, self.normalized)
2097
2098 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002099 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002100 txt._CHUNK_SIZE = 4
2101
2102 reads = txt.read(4)
2103 pos = txt.tell()
2104 txt.seek(0)
2105 txt.seek(pos)
2106 self.assertEquals(txt.read(4), "BBB\n")
2107
2108 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002109 buffer = self.BytesIO(self.testdata)
2110 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002111
2112 self.assertEqual(buffer.seekable(), txt.seekable())
2113
Antoine Pitrou19690592009-06-12 20:14:08 +00002114 def test_append_bom(self):
2115 # The BOM is not written again when appending to a non-empty file
2116 filename = support.TESTFN
2117 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2118 with self.open(filename, 'w', encoding=charset) as f:
2119 f.write('aaa')
2120 pos = f.tell()
2121 with self.open(filename, 'rb') as f:
2122 self.assertEquals(f.read(), 'aaa'.encode(charset))
2123
2124 with self.open(filename, 'a', encoding=charset) as f:
2125 f.write('xxx')
2126 with self.open(filename, 'rb') as f:
2127 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2128
Antoine Pitrou19690592009-06-12 20:14:08 +00002129 def test_seek_bom(self):
2130 # Same test, but when seeking manually
2131 filename = support.TESTFN
2132 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2133 with self.open(filename, 'w', encoding=charset) as f:
2134 f.write('aaa')
2135 pos = f.tell()
2136 with self.open(filename, 'r+', encoding=charset) as f:
2137 f.seek(pos)
2138 f.write('zzz')
2139 f.seek(0)
2140 f.write('bbb')
2141 with self.open(filename, 'rb') as f:
2142 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2143
2144 def test_errors_property(self):
2145 with self.open(support.TESTFN, "w") as f:
2146 self.assertEqual(f.errors, "strict")
2147 with self.open(support.TESTFN, "w", errors="replace") as f:
2148 self.assertEqual(f.errors, "replace")
2149
Victor Stinner6a102812010-04-27 23:55:59 +00002150 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002151 def test_threads_write(self):
2152 # Issue6750: concurrent writes could duplicate data
2153 event = threading.Event()
2154 with self.open(support.TESTFN, "w", buffering=1) as f:
2155 def run(n):
2156 text = "Thread%03d\n" % n
2157 event.wait()
2158 f.write(text)
2159 threads = [threading.Thread(target=lambda n=x: run(n))
2160 for x in range(20)]
2161 for t in threads:
2162 t.start()
2163 time.sleep(0.02)
2164 event.set()
2165 for t in threads:
2166 t.join()
2167 with self.open(support.TESTFN) as f:
2168 content = f.read()
2169 for n in range(20):
2170 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2171
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002172 def test_flush_error_on_close(self):
2173 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2174 def bad_flush():
2175 raise IOError()
2176 txt.flush = bad_flush
2177 self.assertRaises(IOError, txt.close) # exception not swallowed
2178
2179 def test_multi_close(self):
2180 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2181 txt.close()
2182 txt.close()
2183 txt.close()
2184 self.assertRaises(ValueError, txt.flush)
2185
Antoine Pitrou19690592009-06-12 20:14:08 +00002186class CTextIOWrapperTest(TextIOWrapperTest):
2187
2188 def test_initialization(self):
2189 r = self.BytesIO(b"\xc3\xa9\n\n")
2190 b = self.BufferedReader(r, 1000)
2191 t = self.TextIOWrapper(b)
2192 self.assertRaises(TypeError, t.__init__, b, newline=42)
2193 self.assertRaises(ValueError, t.read)
2194 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2195 self.assertRaises(ValueError, t.read)
2196
2197 def test_garbage_collection(self):
2198 # C TextIOWrapper objects are collected, and collecting them flushes
2199 # all data to disk.
2200 # The Python version has __del__, so it ends in gc.garbage instead.
2201 rawio = io.FileIO(support.TESTFN, "wb")
2202 b = self.BufferedWriter(rawio)
2203 t = self.TextIOWrapper(b, encoding="ascii")
2204 t.write("456def")
2205 t.x = t
2206 wr = weakref.ref(t)
2207 del t
2208 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002209 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002210 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002211 self.assertEqual(f.read(), b"456def")
2212
2213class PyTextIOWrapperTest(TextIOWrapperTest):
2214 pass
2215
2216
2217class IncrementalNewlineDecoderTest(unittest.TestCase):
2218
2219 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002220 # UTF-8 specific tests for a newline decoder
2221 def _check_decode(b, s, **kwargs):
2222 # We exercise getstate() / setstate() as well as decode()
2223 state = decoder.getstate()
2224 self.assertEquals(decoder.decode(b, **kwargs), s)
2225 decoder.setstate(state)
2226 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002227
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002228 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002229
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002230 _check_decode(b'\xe8', "")
2231 _check_decode(b'\xa2', "")
2232 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002234 _check_decode(b'\xe8', "")
2235 _check_decode(b'\xa2', "")
2236 _check_decode(b'\x88', "\u8888")
2237
2238 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002239 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2240
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002241 decoder.reset()
2242 _check_decode(b'\n', "\n")
2243 _check_decode(b'\r', "")
2244 _check_decode(b'', "\n", final=True)
2245 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002247 _check_decode(b'\r', "")
2248 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002249
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002250 _check_decode(b'\r\r\n', "\n\n")
2251 _check_decode(b'\r', "")
2252 _check_decode(b'\r', "\n")
2253 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002254
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002255 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2256 _check_decode(b'\xe8\xa2\x88', "\u8888")
2257 _check_decode(b'\n', "\n")
2258 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2259 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002260
Antoine Pitrou19690592009-06-12 20:14:08 +00002261 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002262 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002263 if encoding is not None:
2264 encoder = codecs.getincrementalencoder(encoding)()
2265 def _decode_bytewise(s):
2266 # Decode one byte at a time
2267 for b in encoder.encode(s):
2268 result.append(decoder.decode(b))
2269 else:
2270 encoder = None
2271 def _decode_bytewise(s):
2272 # Decode one char at a time
2273 for c in s:
2274 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002275 self.assertEquals(decoder.newlines, None)
2276 _decode_bytewise("abc\n\r")
2277 self.assertEquals(decoder.newlines, '\n')
2278 _decode_bytewise("\nabc")
2279 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2280 _decode_bytewise("abc\r")
2281 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2282 _decode_bytewise("abc")
2283 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2284 _decode_bytewise("abc\r")
2285 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2286 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002287 input = "abc"
2288 if encoder is not None:
2289 encoder.reset()
2290 input = encoder.encode(input)
2291 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002292 self.assertEquals(decoder.newlines, None)
2293
2294 def test_newline_decoder(self):
2295 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002296 # None meaning the IncrementalNewlineDecoder takes unicode input
2297 # rather than bytes input
2298 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002299 'utf-16', 'utf-16-le', 'utf-16-be',
2300 'utf-32', 'utf-32-le', 'utf-32-be',
2301 )
2302 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002303 decoder = enc and codecs.getincrementaldecoder(enc)()
2304 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2305 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002306 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002307 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2308 self.check_newline_decoding_utf8(decoder)
2309
2310 def test_newline_bytes(self):
2311 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2312 def _check(dec):
2313 self.assertEquals(dec.newlines, None)
2314 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2315 self.assertEquals(dec.newlines, None)
2316 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2317 self.assertEquals(dec.newlines, None)
2318 dec = self.IncrementalNewlineDecoder(None, translate=False)
2319 _check(dec)
2320 dec = self.IncrementalNewlineDecoder(None, translate=True)
2321 _check(dec)
2322
2323class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2324 pass
2325
2326class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2327 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002328
Christian Heimes1a6387e2008-03-26 12:49:49 +00002329
2330# XXX Tests for open()
2331
2332class MiscIOTest(unittest.TestCase):
2333
Benjamin Petersonad100c32008-11-20 22:06:22 +00002334 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002335 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002336
Antoine Pitrou19690592009-06-12 20:14:08 +00002337 def test___all__(self):
2338 for name in self.io.__all__:
2339 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002340 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002341 if name == "open":
2342 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002343 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002344 self.assertTrue(issubclass(obj, Exception), name)
2345 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002346 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002347
Benjamin Petersonad100c32008-11-20 22:06:22 +00002348 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002349 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002350 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002351 f.close()
2352
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 f = self.open(support.TESTFN, "U")
2354 self.assertEquals(f.name, support.TESTFN)
2355 self.assertEquals(f.buffer.name, support.TESTFN)
2356 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002357 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002358 self.assertEquals(f.buffer.mode, "rb")
2359 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002360 f.close()
2361
Antoine Pitrou19690592009-06-12 20:14:08 +00002362 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002363 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002364 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2365 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002366
Antoine Pitrou19690592009-06-12 20:14:08 +00002367 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002368 self.assertEquals(g.mode, "wb")
2369 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002370 self.assertEquals(g.name, f.fileno())
2371 self.assertEquals(g.raw.name, f.fileno())
2372 f.close()
2373 g.close()
2374
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 def test_io_after_close(self):
2376 for kwargs in [
2377 {"mode": "w"},
2378 {"mode": "wb"},
2379 {"mode": "w", "buffering": 1},
2380 {"mode": "w", "buffering": 2},
2381 {"mode": "wb", "buffering": 0},
2382 {"mode": "r"},
2383 {"mode": "rb"},
2384 {"mode": "r", "buffering": 1},
2385 {"mode": "r", "buffering": 2},
2386 {"mode": "rb", "buffering": 0},
2387 {"mode": "w+"},
2388 {"mode": "w+b"},
2389 {"mode": "w+", "buffering": 1},
2390 {"mode": "w+", "buffering": 2},
2391 {"mode": "w+b", "buffering": 0},
2392 ]:
2393 f = self.open(support.TESTFN, **kwargs)
2394 f.close()
2395 self.assertRaises(ValueError, f.flush)
2396 self.assertRaises(ValueError, f.fileno)
2397 self.assertRaises(ValueError, f.isatty)
2398 self.assertRaises(ValueError, f.__iter__)
2399 if hasattr(f, "peek"):
2400 self.assertRaises(ValueError, f.peek, 1)
2401 self.assertRaises(ValueError, f.read)
2402 if hasattr(f, "read1"):
2403 self.assertRaises(ValueError, f.read1, 1024)
2404 if hasattr(f, "readinto"):
2405 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2406 self.assertRaises(ValueError, f.readline)
2407 self.assertRaises(ValueError, f.readlines)
2408 self.assertRaises(ValueError, f.seek, 0)
2409 self.assertRaises(ValueError, f.tell)
2410 self.assertRaises(ValueError, f.truncate)
2411 self.assertRaises(ValueError, f.write,
2412 b"" if "b" in kwargs['mode'] else "")
2413 self.assertRaises(ValueError, f.writelines, [])
2414 self.assertRaises(ValueError, next, f)
2415
2416 def test_blockingioerror(self):
2417 # Various BlockingIOError issues
2418 self.assertRaises(TypeError, self.BlockingIOError)
2419 self.assertRaises(TypeError, self.BlockingIOError, 1)
2420 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2421 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2422 b = self.BlockingIOError(1, "")
2423 self.assertEqual(b.characters_written, 0)
2424 class C(unicode):
2425 pass
2426 c = C("")
2427 b = self.BlockingIOError(1, c)
2428 c.b = b
2429 b.c = c
2430 wr = weakref.ref(c)
2431 del c, b
2432 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002433 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002434
2435 def test_abcs(self):
2436 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002437 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2438 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2439 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2440 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002441
2442 def _check_abc_inheritance(self, abcmodule):
2443 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002444 self.assertIsInstance(f, abcmodule.IOBase)
2445 self.assertIsInstance(f, abcmodule.RawIOBase)
2446 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2447 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002448 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002449 self.assertIsInstance(f, abcmodule.IOBase)
2450 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2451 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2452 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002453 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002454 self.assertIsInstance(f, abcmodule.IOBase)
2455 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2456 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2457 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002458
2459 def test_abc_inheritance(self):
2460 # Test implementations inherit from their respective ABCs
2461 self._check_abc_inheritance(self)
2462
2463 def test_abc_inheritance_official(self):
2464 # Test implementations inherit from the official ABCs of the
2465 # baseline "io" module.
2466 self._check_abc_inheritance(io)
2467
2468class CMiscIOTest(MiscIOTest):
2469 io = io
2470
2471class PyMiscIOTest(MiscIOTest):
2472 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002473
Christian Heimes1a6387e2008-03-26 12:49:49 +00002474def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002475 tests = (CIOTest, PyIOTest,
2476 CBufferedReaderTest, PyBufferedReaderTest,
2477 CBufferedWriterTest, PyBufferedWriterTest,
2478 CBufferedRWPairTest, PyBufferedRWPairTest,
2479 CBufferedRandomTest, PyBufferedRandomTest,
2480 StatefulIncrementalDecoderTest,
2481 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2482 CTextIOWrapperTest, PyTextIOWrapperTest,
2483 CMiscIOTest, PyMiscIOTest,
2484 )
2485
2486 # Put the namespaces of the IO module we are testing and some useful mock
2487 # classes in the __dict__ of each test.
2488 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2489 MockNonBlockWriterIO)
2490 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2491 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2492 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2493 globs = globals()
2494 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2495 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2496 # Avoid turning open into a bound method.
2497 py_io_ns["open"] = pyio.OpenWrapper
2498 for test in tests:
2499 if test.__name__.startswith("C"):
2500 for name, obj in c_io_ns.items():
2501 setattr(test, name, obj)
2502 elif test.__name__.startswith("Py"):
2503 for name, obj in py_io_ns.items():
2504 setattr(test, name, obj)
2505
2506 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002507
2508if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002509 test_main()