blob: 38cbcbcc2bf6f8f454da0d28932002a624aee4f4 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
588 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000589
Antoine Pitrou19690592009-06-12 20:14:08 +0000590class PyIOTest(IOTest):
591 test_array_writes = unittest.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000594
595
Antoine Pitrou19690592009-06-12 20:14:08 +0000596class CommonBufferedTests:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599 def test_detach(self):
600 raw = self.MockRawIO()
601 buf = self.tp(raw)
602 self.assertIs(buf.detach(), raw)
603 self.assertRaises(ValueError, buf.detach)
604
605 def test_fileno(self):
606 rawio = self.MockRawIO()
607 bufio = self.tp(rawio)
608
609 self.assertEquals(42, bufio.fileno())
610
611 def test_no_fileno(self):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
614 pass
615
616 def test_invalid_args(self):
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 # Invalid whence
620 self.assertRaises(ValueError, bufio.seek, 0, -1)
621 self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623 def test_override_destructor(self):
624 tp = self.tp
625 record = []
626 class MyBufferedIO(tp):
627 def __del__(self):
628 record.append(1)
629 try:
630 f = super(MyBufferedIO, self).__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(2)
637 super(MyBufferedIO, self).close()
638 def flush(self):
639 record.append(3)
640 super(MyBufferedIO, self).flush()
641 rawio = self.MockRawIO()
642 bufio = MyBufferedIO(rawio)
643 writable = bufio.writable()
644 del bufio
645 support.gc_collect()
646 if writable:
647 self.assertEqual(record, [1, 2, 3])
648 else:
649 self.assertEqual(record, [1, 2])
650
651 def test_context_manager(self):
652 # Test usability as a context manager
653 rawio = self.MockRawIO()
654 bufio = self.tp(rawio)
655 def _with():
656 with bufio:
657 pass
658 _with()
659 # bufio should now be closed, and using it a second time should raise
660 # a ValueError.
661 self.assertRaises(ValueError, _with)
662
663 def test_error_through_destructor(self):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio = self.CloseFailureIO()
667 def f():
668 self.tp(rawio).xyzzy
669 with support.captured_output("stderr") as s:
670 self.assertRaises(AttributeError, f)
671 s = s.getvalue().strip()
672 if s:
673 # The destructor *may* have printed an unraisable error, check it
674 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000675 self.assertTrue(s.startswith("Exception IOError: "), s)
676 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000677
678 def test_repr(self):
679 raw = self.MockRawIO()
680 b = self.tp(raw)
681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682 self.assertEqual(repr(b), "<%s>" % clsname)
683 raw.name = "dummy"
684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685 raw.name = b"dummy"
686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000687
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000688 def test_flush_error_on_close(self):
689 raw = self.MockRawIO()
690 def bad_flush():
691 raise IOError()
692 raw.flush = bad_flush
693 b = self.tp(raw)
694 self.assertRaises(IOError, b.close) # exception not swallowed
695
696 def test_multi_close(self):
697 raw = self.MockRawIO()
698 b = self.tp(raw)
699 b.close()
700 b.close()
701 b.close()
702 self.assertRaises(ValueError, b.flush)
703
Christian Heimes1a6387e2008-03-26 12:49:49 +0000704
Antoine Pitrou19690592009-06-12 20:14:08 +0000705class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
706 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000707
Antoine Pitrou19690592009-06-12 20:14:08 +0000708 def test_constructor(self):
709 rawio = self.MockRawIO([b"abc"])
710 bufio = self.tp(rawio)
711 bufio.__init__(rawio)
712 bufio.__init__(rawio, buffer_size=1024)
713 bufio.__init__(rawio, buffer_size=16)
714 self.assertEquals(b"abc", bufio.read())
715 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
717 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
718 rawio = self.MockRawIO([b"abc"])
719 bufio.__init__(rawio)
720 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000721
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000723 for arg in (None, 7):
724 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
725 bufio = self.tp(rawio)
726 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000727 # Invalid args
728 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729
Antoine Pitrou19690592009-06-12 20:14:08 +0000730 def test_read1(self):
731 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732 bufio = self.tp(rawio)
733 self.assertEquals(b"a", bufio.read(1))
734 self.assertEquals(b"b", bufio.read1(1))
735 self.assertEquals(rawio._reads, 1)
736 self.assertEquals(b"c", bufio.read1(100))
737 self.assertEquals(rawio._reads, 1)
738 self.assertEquals(b"d", bufio.read1(100))
739 self.assertEquals(rawio._reads, 2)
740 self.assertEquals(b"efg", bufio.read1(100))
741 self.assertEquals(rawio._reads, 3)
742 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000743 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000744 # Invalid args
745 self.assertRaises(ValueError, bufio.read1, -1)
746
747 def test_readinto(self):
748 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
749 bufio = self.tp(rawio)
750 b = bytearray(2)
751 self.assertEquals(bufio.readinto(b), 2)
752 self.assertEquals(b, b"ab")
753 self.assertEquals(bufio.readinto(b), 2)
754 self.assertEquals(b, b"cd")
755 self.assertEquals(bufio.readinto(b), 2)
756 self.assertEquals(b, b"ef")
757 self.assertEquals(bufio.readinto(b), 1)
758 self.assertEquals(b, b"gf")
759 self.assertEquals(bufio.readinto(b), 0)
760 self.assertEquals(b, b"gf")
761
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000762 def test_readlines(self):
763 def bufio():
764 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
765 return self.tp(rawio)
766 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
767 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
768 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
769
Antoine Pitrou19690592009-06-12 20:14:08 +0000770 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000771 data = b"abcdefghi"
772 dlen = len(data)
773
774 tests = [
775 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
776 [ 100, [ 3, 3, 3], [ dlen ] ],
777 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
778 ]
779
780 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000781 rawio = self.MockFileIO(data)
782 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000783 pos = 0
784 for nbytes in buf_read_sizes:
785 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
786 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000787 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000788 self.assertEquals(rawio.read_history, raw_read_sizes)
789
Antoine Pitrou19690592009-06-12 20:14:08 +0000790 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000791 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000792 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
793 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000794
795 self.assertEquals(b"abcd", bufio.read(6))
796 self.assertEquals(b"e", bufio.read(1))
797 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000798 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000799 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000800 self.assertEquals(b"", bufio.read())
801
Antoine Pitrou19690592009-06-12 20:14:08 +0000802 def test_read_past_eof(self):
803 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
804 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000805
806 self.assertEquals(b"abcdefg", bufio.read(9000))
807
Antoine Pitrou19690592009-06-12 20:14:08 +0000808 def test_read_all(self):
809 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000811
812 self.assertEquals(b"abcdefg", bufio.read())
813
Victor Stinner6a102812010-04-27 23:55:59 +0000814 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +0000815 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000816 try:
817 # Write out many bytes with exactly the same number of 0's,
818 # 1's... 255's. This will help us check that concurrent reading
819 # doesn't duplicate or forget contents.
820 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000821 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000822 random.shuffle(l)
823 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000824 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000825 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000826 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000827 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000828 errors = []
829 results = []
830 def f():
831 try:
832 # Intra-buffer read then buffer-flushing read
833 for n in cycle([1, 19]):
834 s = bufio.read(n)
835 if not s:
836 break
837 # list.append() is atomic
838 results.append(s)
839 except Exception as e:
840 errors.append(e)
841 raise
842 threads = [threading.Thread(target=f) for x in range(20)]
843 for t in threads:
844 t.start()
845 time.sleep(0.02) # yield
846 for t in threads:
847 t.join()
848 self.assertFalse(errors,
849 "the following exceptions were caught: %r" % errors)
850 s = b''.join(results)
851 for i in range(256):
852 c = bytes(bytearray([i]))
853 self.assertEqual(s.count(c), N)
854 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000855 support.unlink(support.TESTFN)
856
857 def test_misbehaved_io(self):
858 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
859 bufio = self.tp(rawio)
860 self.assertRaises(IOError, bufio.seek, 0)
861 self.assertRaises(IOError, bufio.tell)
862
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000863 def test_no_extraneous_read(self):
864 # Issue #9550; when the raw IO object has satisfied the read request,
865 # we should not issue any additional reads, otherwise it may block
866 # (e.g. socket).
867 bufsize = 16
868 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
869 rawio = self.MockRawIO([b"x" * n])
870 bufio = self.tp(rawio, bufsize)
871 self.assertEqual(bufio.read(n), b"x" * n)
872 # Simple case: one raw read is enough to satisfy the request.
873 self.assertEqual(rawio._extraneous_reads, 0,
874 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
875 # A more complex case where two raw reads are needed to satisfy
876 # the request.
877 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
878 bufio = self.tp(rawio, bufsize)
879 self.assertEqual(bufio.read(n), b"x" * n)
880 self.assertEqual(rawio._extraneous_reads, 0,
881 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
882
883
Antoine Pitrou19690592009-06-12 20:14:08 +0000884class CBufferedReaderTest(BufferedReaderTest):
885 tp = io.BufferedReader
886
887 def test_constructor(self):
888 BufferedReaderTest.test_constructor(self)
889 # The allocation can succeed on 32-bit builds, e.g. with more
890 # than 2GB RAM and a 64-bit kernel.
891 if sys.maxsize > 0x7FFFFFFF:
892 rawio = self.MockRawIO()
893 bufio = self.tp(rawio)
894 self.assertRaises((OverflowError, MemoryError, ValueError),
895 bufio.__init__, rawio, sys.maxsize)
896
897 def test_initialization(self):
898 rawio = self.MockRawIO([b"abc"])
899 bufio = self.tp(rawio)
900 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
901 self.assertRaises(ValueError, bufio.read)
902 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
903 self.assertRaises(ValueError, bufio.read)
904 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
905 self.assertRaises(ValueError, bufio.read)
906
907 def test_misbehaved_io_read(self):
908 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
909 bufio = self.tp(rawio)
910 # _pyio.BufferedReader seems to implement reading different, so that
911 # checking this is not so easy.
912 self.assertRaises(IOError, bufio.read, 10)
913
914 def test_garbage_collection(self):
915 # C BufferedReader objects are collected.
916 # The Python version has __del__, so it ends into gc.garbage instead
917 rawio = self.FileIO(support.TESTFN, "w+b")
918 f = self.tp(rawio)
919 f.f = f
920 wr = weakref.ref(f)
921 del f
922 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000923 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000924
925class PyBufferedReaderTest(BufferedReaderTest):
926 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000927
928
Antoine Pitrou19690592009-06-12 20:14:08 +0000929class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
930 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000931
Antoine Pitrou19690592009-06-12 20:14:08 +0000932 def test_constructor(self):
933 rawio = self.MockRawIO()
934 bufio = self.tp(rawio)
935 bufio.__init__(rawio)
936 bufio.__init__(rawio, buffer_size=1024)
937 bufio.__init__(rawio, buffer_size=16)
938 self.assertEquals(3, bufio.write(b"abc"))
939 bufio.flush()
940 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
942 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
943 bufio.__init__(rawio)
944 self.assertEquals(3, bufio.write(b"ghi"))
945 bufio.flush()
946 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947
Antoine Pitrou19690592009-06-12 20:14:08 +0000948 def test_detach_flush(self):
949 raw = self.MockRawIO()
950 buf = self.tp(raw)
951 buf.write(b"howdy!")
952 self.assertFalse(raw._write_stack)
953 buf.detach()
954 self.assertEqual(raw._write_stack, [b"howdy!"])
955
956 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000958 writer = self.MockRawIO()
959 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000960 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000961 self.assertFalse(writer._write_stack)
962
Antoine Pitrou19690592009-06-12 20:14:08 +0000963 def test_write_overflow(self):
964 writer = self.MockRawIO()
965 bufio = self.tp(writer, 8)
966 contents = b"abcdefghijklmnop"
967 for n in range(0, len(contents), 3):
968 bufio.write(contents[n:n+3])
969 flushed = b"".join(writer._write_stack)
970 # At least (total - 8) bytes were implicitly flushed, perhaps more
971 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000972 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000973
Antoine Pitrou19690592009-06-12 20:14:08 +0000974 def check_writes(self, intermediate_func):
975 # Lots of writes, test the flushed output is as expected.
976 contents = bytes(range(256)) * 1000
977 n = 0
978 writer = self.MockRawIO()
979 bufio = self.tp(writer, 13)
980 # Generator of write sizes: repeat each N 15 times then proceed to N+1
981 def gen_sizes():
982 for size in count(1):
983 for i in range(15):
984 yield size
985 sizes = gen_sizes()
986 while n < len(contents):
987 size = min(next(sizes), len(contents) - n)
988 self.assertEquals(bufio.write(contents[n:n+size]), size)
989 intermediate_func(bufio)
990 n += size
991 bufio.flush()
992 self.assertEquals(contents,
993 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000994
Antoine Pitrou19690592009-06-12 20:14:08 +0000995 def test_writes(self):
996 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000997
Antoine Pitrou19690592009-06-12 20:14:08 +0000998 def test_writes_and_flushes(self):
999 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001000
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 def test_writes_and_seeks(self):
1002 def _seekabs(bufio):
1003 pos = bufio.tell()
1004 bufio.seek(pos + 1, 0)
1005 bufio.seek(pos - 1, 0)
1006 bufio.seek(pos, 0)
1007 self.check_writes(_seekabs)
1008 def _seekrel(bufio):
1009 pos = bufio.seek(0, 1)
1010 bufio.seek(+1, 1)
1011 bufio.seek(-1, 1)
1012 bufio.seek(pos, 0)
1013 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001014
Antoine Pitrou19690592009-06-12 20:14:08 +00001015 def test_writes_and_truncates(self):
1016 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001017
Antoine Pitrou19690592009-06-12 20:14:08 +00001018 def test_write_non_blocking(self):
1019 raw = self.MockNonBlockWriterIO()
1020 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001021
Antoine Pitrou19690592009-06-12 20:14:08 +00001022 self.assertEquals(bufio.write(b"abcd"), 4)
1023 self.assertEquals(bufio.write(b"efghi"), 5)
1024 # 1 byte will be written, the rest will be buffered
1025 raw.block_on(b"k")
1026 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001027
Antoine Pitrou19690592009-06-12 20:14:08 +00001028 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1029 raw.block_on(b"0")
1030 try:
1031 bufio.write(b"opqrwxyz0123456789")
1032 except self.BlockingIOError as e:
1033 written = e.characters_written
1034 else:
1035 self.fail("BlockingIOError should have been raised")
1036 self.assertEquals(written, 16)
1037 self.assertEquals(raw.pop_written(),
1038 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001039
Antoine Pitrou19690592009-06-12 20:14:08 +00001040 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1041 s = raw.pop_written()
1042 # Previously buffered bytes were flushed
1043 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001044
Antoine Pitrou19690592009-06-12 20:14:08 +00001045 def test_write_and_rewind(self):
1046 raw = io.BytesIO()
1047 bufio = self.tp(raw, 4)
1048 self.assertEqual(bufio.write(b"abcdef"), 6)
1049 self.assertEqual(bufio.tell(), 6)
1050 bufio.seek(0, 0)
1051 self.assertEqual(bufio.write(b"XY"), 2)
1052 bufio.seek(6, 0)
1053 self.assertEqual(raw.getvalue(), b"XYcdef")
1054 self.assertEqual(bufio.write(b"123456"), 6)
1055 bufio.flush()
1056 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001057
Antoine Pitrou19690592009-06-12 20:14:08 +00001058 def test_flush(self):
1059 writer = self.MockRawIO()
1060 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001061 bufio.write(b"abc")
1062 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001063 self.assertEquals(b"abc", writer._write_stack[0])
1064
Antoine Pitrou19690592009-06-12 20:14:08 +00001065 def test_destructor(self):
1066 writer = self.MockRawIO()
1067 bufio = self.tp(writer, 8)
1068 bufio.write(b"abc")
1069 del bufio
1070 support.gc_collect()
1071 self.assertEquals(b"abc", writer._write_stack[0])
1072
1073 def test_truncate(self):
1074 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001075 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001076 bufio = self.tp(raw, 8)
1077 bufio.write(b"abcdef")
1078 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001079 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001080 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001081 self.assertEqual(f.read(), b"abc")
1082
Victor Stinner6a102812010-04-27 23:55:59 +00001083 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +00001084 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001085 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001086 # Write out many bytes from many threads and test they were
1087 # all flushed.
1088 N = 1000
1089 contents = bytes(range(256)) * N
1090 sizes = cycle([1, 19])
1091 n = 0
1092 queue = deque()
1093 while n < len(contents):
1094 size = next(sizes)
1095 queue.append(contents[n:n+size])
1096 n += size
1097 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001098 # We use a real file object because it allows us to
1099 # exercise situations where the GIL is released before
1100 # writing the buffer to the raw streams. This is in addition
1101 # to concurrency issues due to switching threads in the middle
1102 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001103 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001104 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001105 errors = []
1106 def f():
1107 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001108 while True:
1109 try:
1110 s = queue.popleft()
1111 except IndexError:
1112 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001113 bufio.write(s)
1114 except Exception as e:
1115 errors.append(e)
1116 raise
1117 threads = [threading.Thread(target=f) for x in range(20)]
1118 for t in threads:
1119 t.start()
1120 time.sleep(0.02) # yield
1121 for t in threads:
1122 t.join()
1123 self.assertFalse(errors,
1124 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001125 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001126 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001127 s = f.read()
1128 for i in range(256):
1129 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001130 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001131 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001132
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 def test_misbehaved_io(self):
1134 rawio = self.MisbehavedRawIO()
1135 bufio = self.tp(rawio, 5)
1136 self.assertRaises(IOError, bufio.seek, 0)
1137 self.assertRaises(IOError, bufio.tell)
1138 self.assertRaises(IOError, bufio.write, b"abcdef")
1139
1140 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001141 with support.check_warnings(("max_buffer_size is deprecated",
1142 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001143 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001144
1145
1146class CBufferedWriterTest(BufferedWriterTest):
1147 tp = io.BufferedWriter
1148
1149 def test_constructor(self):
1150 BufferedWriterTest.test_constructor(self)
1151 # The allocation can succeed on 32-bit builds, e.g. with more
1152 # than 2GB RAM and a 64-bit kernel.
1153 if sys.maxsize > 0x7FFFFFFF:
1154 rawio = self.MockRawIO()
1155 bufio = self.tp(rawio)
1156 self.assertRaises((OverflowError, MemoryError, ValueError),
1157 bufio.__init__, rawio, sys.maxsize)
1158
1159 def test_initialization(self):
1160 rawio = self.MockRawIO()
1161 bufio = self.tp(rawio)
1162 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1163 self.assertRaises(ValueError, bufio.write, b"def")
1164 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1165 self.assertRaises(ValueError, bufio.write, b"def")
1166 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1167 self.assertRaises(ValueError, bufio.write, b"def")
1168
1169 def test_garbage_collection(self):
1170 # C BufferedWriter objects are collected, and collecting them flushes
1171 # all data to disk.
1172 # The Python version has __del__, so it ends into gc.garbage instead
1173 rawio = self.FileIO(support.TESTFN, "w+b")
1174 f = self.tp(rawio)
1175 f.write(b"123xxx")
1176 f.x = f
1177 wr = weakref.ref(f)
1178 del f
1179 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001180 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001181 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001182 self.assertEqual(f.read(), b"123xxx")
1183
1184
1185class PyBufferedWriterTest(BufferedWriterTest):
1186 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001187
1188class BufferedRWPairTest(unittest.TestCase):
1189
Antoine Pitrou19690592009-06-12 20:14:08 +00001190 def test_constructor(self):
1191 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001192 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001193
Antoine Pitrou19690592009-06-12 20:14:08 +00001194 def test_detach(self):
1195 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1196 self.assertRaises(self.UnsupportedOperation, pair.detach)
1197
1198 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001199 with support.check_warnings(("max_buffer_size is deprecated",
1200 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001201 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001202
1203 def test_constructor_with_not_readable(self):
1204 class NotReadable(MockRawIO):
1205 def readable(self):
1206 return False
1207
1208 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1209
1210 def test_constructor_with_not_writeable(self):
1211 class NotWriteable(MockRawIO):
1212 def writable(self):
1213 return False
1214
1215 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1216
1217 def test_read(self):
1218 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1219
1220 self.assertEqual(pair.read(3), b"abc")
1221 self.assertEqual(pair.read(1), b"d")
1222 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001223 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1224 self.assertEqual(pair.read(None), b"abc")
1225
1226 def test_readlines(self):
1227 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1228 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1229 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1230 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001231
1232 def test_read1(self):
1233 # .read1() is delegated to the underlying reader object, so this test
1234 # can be shallow.
1235 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1236
1237 self.assertEqual(pair.read1(3), b"abc")
1238
1239 def test_readinto(self):
1240 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1241
1242 data = bytearray(5)
1243 self.assertEqual(pair.readinto(data), 5)
1244 self.assertEqual(data, b"abcde")
1245
1246 def test_write(self):
1247 w = self.MockRawIO()
1248 pair = self.tp(self.MockRawIO(), w)
1249
1250 pair.write(b"abc")
1251 pair.flush()
1252 pair.write(b"def")
1253 pair.flush()
1254 self.assertEqual(w._write_stack, [b"abc", b"def"])
1255
1256 def test_peek(self):
1257 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1258
1259 self.assertTrue(pair.peek(3).startswith(b"abc"))
1260 self.assertEqual(pair.read(3), b"abc")
1261
1262 def test_readable(self):
1263 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1264 self.assertTrue(pair.readable())
1265
1266 def test_writeable(self):
1267 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1268 self.assertTrue(pair.writable())
1269
1270 def test_seekable(self):
1271 # BufferedRWPairs are never seekable, even if their readers and writers
1272 # are.
1273 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1274 self.assertFalse(pair.seekable())
1275
1276 # .flush() is delegated to the underlying writer object and has been
1277 # tested in the test_write method.
1278
1279 def test_close_and_closed(self):
1280 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1281 self.assertFalse(pair.closed)
1282 pair.close()
1283 self.assertTrue(pair.closed)
1284
1285 def test_isatty(self):
1286 class SelectableIsAtty(MockRawIO):
1287 def __init__(self, isatty):
1288 MockRawIO.__init__(self)
1289 self._isatty = isatty
1290
1291 def isatty(self):
1292 return self._isatty
1293
1294 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1295 self.assertFalse(pair.isatty())
1296
1297 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1298 self.assertTrue(pair.isatty())
1299
1300 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1301 self.assertTrue(pair.isatty())
1302
1303 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1304 self.assertTrue(pair.isatty())
1305
1306class CBufferedRWPairTest(BufferedRWPairTest):
1307 tp = io.BufferedRWPair
1308
1309class PyBufferedRWPairTest(BufferedRWPairTest):
1310 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001311
1312
Antoine Pitrou19690592009-06-12 20:14:08 +00001313class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1314 read_mode = "rb+"
1315 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001316
Antoine Pitrou19690592009-06-12 20:14:08 +00001317 def test_constructor(self):
1318 BufferedReaderTest.test_constructor(self)
1319 BufferedWriterTest.test_constructor(self)
1320
1321 def test_read_and_write(self):
1322 raw = self.MockRawIO((b"asdf", b"ghjk"))
1323 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001324
1325 self.assertEqual(b"as", rw.read(2))
1326 rw.write(b"ddd")
1327 rw.write(b"eee")
1328 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001329 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001330 self.assertEquals(b"dddeee", raw._write_stack[0])
1331
Antoine Pitrou19690592009-06-12 20:14:08 +00001332 def test_seek_and_tell(self):
1333 raw = self.BytesIO(b"asdfghjkl")
1334 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001335
1336 self.assertEquals(b"as", rw.read(2))
1337 self.assertEquals(2, rw.tell())
1338 rw.seek(0, 0)
1339 self.assertEquals(b"asdf", rw.read(4))
1340
1341 rw.write(b"asdf")
1342 rw.seek(0, 0)
1343 self.assertEquals(b"asdfasdfl", rw.read())
1344 self.assertEquals(9, rw.tell())
1345 rw.seek(-4, 2)
1346 self.assertEquals(5, rw.tell())
1347 rw.seek(2, 1)
1348 self.assertEquals(7, rw.tell())
1349 self.assertEquals(b"fl", rw.read(11))
1350 self.assertRaises(TypeError, rw.seek, 0.0)
1351
Antoine Pitrou19690592009-06-12 20:14:08 +00001352 def check_flush_and_read(self, read_func):
1353 raw = self.BytesIO(b"abcdefghi")
1354 bufio = self.tp(raw)
1355
1356 self.assertEquals(b"ab", read_func(bufio, 2))
1357 bufio.write(b"12")
1358 self.assertEquals(b"ef", read_func(bufio, 2))
1359 self.assertEquals(6, bufio.tell())
1360 bufio.flush()
1361 self.assertEquals(6, bufio.tell())
1362 self.assertEquals(b"ghi", read_func(bufio))
1363 raw.seek(0, 0)
1364 raw.write(b"XYZ")
1365 # flush() resets the read buffer
1366 bufio.flush()
1367 bufio.seek(0, 0)
1368 self.assertEquals(b"XYZ", read_func(bufio, 3))
1369
1370 def test_flush_and_read(self):
1371 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1372
1373 def test_flush_and_readinto(self):
1374 def _readinto(bufio, n=-1):
1375 b = bytearray(n if n >= 0 else 9999)
1376 n = bufio.readinto(b)
1377 return bytes(b[:n])
1378 self.check_flush_and_read(_readinto)
1379
1380 def test_flush_and_peek(self):
1381 def _peek(bufio, n=-1):
1382 # This relies on the fact that the buffer can contain the whole
1383 # raw stream, otherwise peek() can return less.
1384 b = bufio.peek(n)
1385 if n != -1:
1386 b = b[:n]
1387 bufio.seek(len(b), 1)
1388 return b
1389 self.check_flush_and_read(_peek)
1390
1391 def test_flush_and_write(self):
1392 raw = self.BytesIO(b"abcdefghi")
1393 bufio = self.tp(raw)
1394
1395 bufio.write(b"123")
1396 bufio.flush()
1397 bufio.write(b"45")
1398 bufio.flush()
1399 bufio.seek(0, 0)
1400 self.assertEquals(b"12345fghi", raw.getvalue())
1401 self.assertEquals(b"12345fghi", bufio.read())
1402
1403 def test_threads(self):
1404 BufferedReaderTest.test_threads(self)
1405 BufferedWriterTest.test_threads(self)
1406
1407 def test_writes_and_peek(self):
1408 def _peek(bufio):
1409 bufio.peek(1)
1410 self.check_writes(_peek)
1411 def _peek(bufio):
1412 pos = bufio.tell()
1413 bufio.seek(-1, 1)
1414 bufio.peek(1)
1415 bufio.seek(pos, 0)
1416 self.check_writes(_peek)
1417
1418 def test_writes_and_reads(self):
1419 def _read(bufio):
1420 bufio.seek(-1, 1)
1421 bufio.read(1)
1422 self.check_writes(_read)
1423
1424 def test_writes_and_read1s(self):
1425 def _read1(bufio):
1426 bufio.seek(-1, 1)
1427 bufio.read1(1)
1428 self.check_writes(_read1)
1429
1430 def test_writes_and_readintos(self):
1431 def _read(bufio):
1432 bufio.seek(-1, 1)
1433 bufio.readinto(bytearray(1))
1434 self.check_writes(_read)
1435
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001436 def test_write_after_readahead(self):
1437 # Issue #6629: writing after the buffer was filled by readahead should
1438 # first rewind the raw stream.
1439 for overwrite_size in [1, 5]:
1440 raw = self.BytesIO(b"A" * 10)
1441 bufio = self.tp(raw, 4)
1442 # Trigger readahead
1443 self.assertEqual(bufio.read(1), b"A")
1444 self.assertEqual(bufio.tell(), 1)
1445 # Overwriting should rewind the raw stream if it needs so
1446 bufio.write(b"B" * overwrite_size)
1447 self.assertEqual(bufio.tell(), overwrite_size + 1)
1448 # If the write size was smaller than the buffer size, flush() and
1449 # check that rewind happens.
1450 bufio.flush()
1451 self.assertEqual(bufio.tell(), overwrite_size + 1)
1452 s = raw.getvalue()
1453 self.assertEqual(s,
1454 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1455
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001456 def test_truncate_after_read_or_write(self):
1457 raw = self.BytesIO(b"A" * 10)
1458 bufio = self.tp(raw, 100)
1459 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1460 self.assertEqual(bufio.truncate(), 2)
1461 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1462 self.assertEqual(bufio.truncate(), 4)
1463
Antoine Pitrou19690592009-06-12 20:14:08 +00001464 def test_misbehaved_io(self):
1465 BufferedReaderTest.test_misbehaved_io(self)
1466 BufferedWriterTest.test_misbehaved_io(self)
1467
1468class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1469 tp = io.BufferedRandom
1470
1471 def test_constructor(self):
1472 BufferedRandomTest.test_constructor(self)
1473 # The allocation can succeed on 32-bit builds, e.g. with more
1474 # than 2GB RAM and a 64-bit kernel.
1475 if sys.maxsize > 0x7FFFFFFF:
1476 rawio = self.MockRawIO()
1477 bufio = self.tp(rawio)
1478 self.assertRaises((OverflowError, MemoryError, ValueError),
1479 bufio.__init__, rawio, sys.maxsize)
1480
1481 def test_garbage_collection(self):
1482 CBufferedReaderTest.test_garbage_collection(self)
1483 CBufferedWriterTest.test_garbage_collection(self)
1484
1485class PyBufferedRandomTest(BufferedRandomTest):
1486 tp = pyio.BufferedRandom
1487
1488
Christian Heimes1a6387e2008-03-26 12:49:49 +00001489# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1490# properties:
1491# - A single output character can correspond to many bytes of input.
1492# - The number of input bytes to complete the character can be
1493# undetermined until the last input byte is received.
1494# - The number of input bytes can vary depending on previous input.
1495# - A single input byte can correspond to many characters of output.
1496# - The number of output characters can be undetermined until the
1497# last input byte is received.
1498# - The number of output characters can vary depending on previous input.
1499
1500class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1501 """
1502 For testing seek/tell behavior with a stateful, buffering decoder.
1503
1504 Input is a sequence of words. Words may be fixed-length (length set
1505 by input) or variable-length (period-terminated). In variable-length
1506 mode, extra periods are ignored. Possible words are:
1507 - 'i' followed by a number sets the input length, I (maximum 99).
1508 When I is set to 0, words are space-terminated.
1509 - 'o' followed by a number sets the output length, O (maximum 99).
1510 - Any other word is converted into a word followed by a period on
1511 the output. The output word consists of the input word truncated
1512 or padded out with hyphens to make its length equal to O. If O
1513 is 0, the word is output verbatim without truncating or padding.
1514 I and O are initially set to 1. When I changes, any buffered input is
1515 re-scanned according to the new I. EOF also terminates the last word.
1516 """
1517
1518 def __init__(self, errors='strict'):
1519 codecs.IncrementalDecoder.__init__(self, errors)
1520 self.reset()
1521
1522 def __repr__(self):
1523 return '<SID %x>' % id(self)
1524
1525 def reset(self):
1526 self.i = 1
1527 self.o = 1
1528 self.buffer = bytearray()
1529
1530 def getstate(self):
1531 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1532 return bytes(self.buffer), i*100 + o
1533
1534 def setstate(self, state):
1535 buffer, io = state
1536 self.buffer = bytearray(buffer)
1537 i, o = divmod(io, 100)
1538 self.i, self.o = i ^ 1, o ^ 1
1539
1540 def decode(self, input, final=False):
1541 output = ''
1542 for b in input:
1543 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001544 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001545 if self.buffer:
1546 output += self.process_word()
1547 else:
1548 self.buffer.append(b)
1549 else: # fixed-length, terminate after self.i bytes
1550 self.buffer.append(b)
1551 if len(self.buffer) == self.i:
1552 output += self.process_word()
1553 if final and self.buffer: # EOF terminates the last word
1554 output += self.process_word()
1555 return output
1556
1557 def process_word(self):
1558 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001559 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001560 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001561 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001562 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1563 else:
1564 output = self.buffer.decode('ascii')
1565 if len(output) < self.o:
1566 output += '-'*self.o # pad out with hyphens
1567 if self.o:
1568 output = output[:self.o] # truncate to output length
1569 output += '.'
1570 self.buffer = bytearray()
1571 return output
1572
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001573 codecEnabled = False
1574
1575 @classmethod
1576 def lookupTestDecoder(cls, name):
1577 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001578 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001579 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001580 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001581 incrementalencoder=None,
1582 streamreader=None, streamwriter=None,
1583 incrementaldecoder=cls)
1584
1585# Register the previous decoder for testing.
1586# Disabled by default, tests will enable it.
1587codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1588
1589
Christian Heimes1a6387e2008-03-26 12:49:49 +00001590class StatefulIncrementalDecoderTest(unittest.TestCase):
1591 """
1592 Make sure the StatefulIncrementalDecoder actually works.
1593 """
1594
1595 test_cases = [
1596 # I=1, O=1 (fixed-length input == fixed-length output)
1597 (b'abcd', False, 'a.b.c.d.'),
1598 # I=0, O=0 (variable-length input, variable-length output)
1599 (b'oiabcd', True, 'abcd.'),
1600 # I=0, O=0 (should ignore extra periods)
1601 (b'oi...abcd...', True, 'abcd.'),
1602 # I=0, O=6 (variable-length input, fixed-length output)
1603 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1604 # I=2, O=6 (fixed-length input < fixed-length output)
1605 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1606 # I=6, O=3 (fixed-length input > fixed-length output)
1607 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1608 # I=0, then 3; O=29, then 15 (with longer output)
1609 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1610 'a----------------------------.' +
1611 'b----------------------------.' +
1612 'cde--------------------------.' +
1613 'abcdefghijabcde.' +
1614 'a.b------------.' +
1615 '.c.------------.' +
1616 'd.e------------.' +
1617 'k--------------.' +
1618 'l--------------.' +
1619 'm--------------.')
1620 ]
1621
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001623 # Try a few one-shot test cases.
1624 for input, eof, output in self.test_cases:
1625 d = StatefulIncrementalDecoder()
1626 self.assertEquals(d.decode(input, eof), output)
1627
1628 # Also test an unfinished decode, followed by forcing EOF.
1629 d = StatefulIncrementalDecoder()
1630 self.assertEquals(d.decode(b'oiabcd'), '')
1631 self.assertEquals(d.decode(b'', 1), 'abcd.')
1632
1633class TextIOWrapperTest(unittest.TestCase):
1634
1635 def setUp(self):
1636 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1637 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001638 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001639
1640 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001641 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001642
Antoine Pitrou19690592009-06-12 20:14:08 +00001643 def test_constructor(self):
1644 r = self.BytesIO(b"\xc3\xa9\n\n")
1645 b = self.BufferedReader(r, 1000)
1646 t = self.TextIOWrapper(b)
1647 t.__init__(b, encoding="latin1", newline="\r\n")
1648 self.assertEquals(t.encoding, "latin1")
1649 self.assertEquals(t.line_buffering, False)
1650 t.__init__(b, encoding="utf8", line_buffering=True)
1651 self.assertEquals(t.encoding, "utf8")
1652 self.assertEquals(t.line_buffering, True)
1653 self.assertEquals("\xe9\n", t.readline())
1654 self.assertRaises(TypeError, t.__init__, b, newline=42)
1655 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1656
1657 def test_detach(self):
1658 r = self.BytesIO()
1659 b = self.BufferedWriter(r)
1660 t = self.TextIOWrapper(b)
1661 self.assertIs(t.detach(), b)
1662
1663 t = self.TextIOWrapper(b, encoding="ascii")
1664 t.write("howdy")
1665 self.assertFalse(r.getvalue())
1666 t.detach()
1667 self.assertEqual(r.getvalue(), b"howdy")
1668 self.assertRaises(ValueError, t.detach)
1669
1670 def test_repr(self):
1671 raw = self.BytesIO("hello".encode("utf-8"))
1672 b = self.BufferedReader(raw)
1673 t = self.TextIOWrapper(b, encoding="utf-8")
1674 modname = self.TextIOWrapper.__module__
1675 self.assertEqual(repr(t),
1676 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1677 raw.name = "dummy"
1678 self.assertEqual(repr(t),
1679 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1680 raw.name = b"dummy"
1681 self.assertEqual(repr(t),
1682 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1683
1684 def test_line_buffering(self):
1685 r = self.BytesIO()
1686 b = self.BufferedWriter(r, 1000)
1687 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1688 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001689 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001690 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001691 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001692 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001693 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1694
Antoine Pitrou19690592009-06-12 20:14:08 +00001695 def test_encoding(self):
1696 # Check the encoding attribute is always set, and valid
1697 b = self.BytesIO()
1698 t = self.TextIOWrapper(b, encoding="utf8")
1699 self.assertEqual(t.encoding, "utf8")
1700 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001701 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001702 codecs.lookup(t.encoding)
1703
1704 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001705 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001706 b = self.BytesIO(b"abc\n\xff\n")
1707 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001708 self.assertRaises(UnicodeError, t.read)
1709 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001710 b = self.BytesIO(b"abc\n\xff\n")
1711 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001712 self.assertRaises(UnicodeError, t.read)
1713 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001714 b = self.BytesIO(b"abc\n\xff\n")
1715 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001716 self.assertEquals(t.read(), "abc\n\n")
1717 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001718 b = self.BytesIO(b"abc\n\xff\n")
1719 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1720 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001721
Antoine Pitrou19690592009-06-12 20:14:08 +00001722 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001723 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001724 b = self.BytesIO()
1725 t = self.TextIOWrapper(b, encoding="ascii")
1726 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001727 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001728 b = self.BytesIO()
1729 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1730 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001731 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001732 b = self.BytesIO()
1733 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001734 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001735 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001736 t.flush()
1737 self.assertEquals(b.getvalue(), b"abcdef\n")
1738 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001739 b = self.BytesIO()
1740 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001741 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001742 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001743 t.flush()
1744 self.assertEquals(b.getvalue(), b"abc?def\n")
1745
Antoine Pitrou19690592009-06-12 20:14:08 +00001746 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001747 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1748
1749 tests = [
1750 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1751 [ '', input_lines ],
1752 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1753 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1754 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1755 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001756 encodings = (
1757 'utf-8', 'latin-1',
1758 'utf-16', 'utf-16-le', 'utf-16-be',
1759 'utf-32', 'utf-32-le', 'utf-32-be',
1760 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001761
1762 # Try a range of buffer sizes to test the case where \r is the last
1763 # character in TextIOWrapper._pending_line.
1764 for encoding in encodings:
1765 # XXX: str.encode() should return bytes
1766 data = bytes(''.join(input_lines).encode(encoding))
1767 for do_reads in (False, True):
1768 for bufsize in range(1, 10):
1769 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001770 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1771 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001772 encoding=encoding)
1773 if do_reads:
1774 got_lines = []
1775 while True:
1776 c2 = textio.read(2)
1777 if c2 == '':
1778 break
1779 self.assertEquals(len(c2), 2)
1780 got_lines.append(c2 + textio.readline())
1781 else:
1782 got_lines = list(textio)
1783
1784 for got_line, exp_line in zip(got_lines, exp_lines):
1785 self.assertEquals(got_line, exp_line)
1786 self.assertEquals(len(got_lines), len(exp_lines))
1787
Antoine Pitrou19690592009-06-12 20:14:08 +00001788 def test_newlines_input(self):
1789 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001790 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1791 for newline, expected in [
1792 (None, normalized.decode("ascii").splitlines(True)),
1793 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001794 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1795 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1796 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001797 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001798 buf = self.BytesIO(testdata)
1799 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800 self.assertEquals(txt.readlines(), expected)
1801 txt.seek(0)
1802 self.assertEquals(txt.read(), "".join(expected))
1803
Antoine Pitrou19690592009-06-12 20:14:08 +00001804 def test_newlines_output(self):
1805 testdict = {
1806 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1807 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1808 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1809 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1810 }
1811 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1812 for newline, expected in tests:
1813 buf = self.BytesIO()
1814 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1815 txt.write("AAA\nB")
1816 txt.write("BB\nCCC\n")
1817 txt.write("X\rY\r\nZ")
1818 txt.flush()
1819 self.assertEquals(buf.closed, False)
1820 self.assertEquals(buf.getvalue(), expected)
1821
1822 def test_destructor(self):
1823 l = []
1824 base = self.BytesIO
1825 class MyBytesIO(base):
1826 def close(self):
1827 l.append(self.getvalue())
1828 base.close(self)
1829 b = MyBytesIO()
1830 t = self.TextIOWrapper(b, encoding="ascii")
1831 t.write("abc")
1832 del t
1833 support.gc_collect()
1834 self.assertEquals([b"abc"], l)
1835
1836 def test_override_destructor(self):
1837 record = []
1838 class MyTextIO(self.TextIOWrapper):
1839 def __del__(self):
1840 record.append(1)
1841 try:
1842 f = super(MyTextIO, self).__del__
1843 except AttributeError:
1844 pass
1845 else:
1846 f()
1847 def close(self):
1848 record.append(2)
1849 super(MyTextIO, self).close()
1850 def flush(self):
1851 record.append(3)
1852 super(MyTextIO, self).flush()
1853 b = self.BytesIO()
1854 t = MyTextIO(b, encoding="ascii")
1855 del t
1856 support.gc_collect()
1857 self.assertEqual(record, [1, 2, 3])
1858
1859 def test_error_through_destructor(self):
1860 # Test that the exception state is not modified by a destructor,
1861 # even if close() fails.
1862 rawio = self.CloseFailureIO()
1863 def f():
1864 self.TextIOWrapper(rawio).xyzzy
1865 with support.captured_output("stderr") as s:
1866 self.assertRaises(AttributeError, f)
1867 s = s.getvalue().strip()
1868 if s:
1869 # The destructor *may* have printed an unraisable error, check it
1870 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001871 self.assertTrue(s.startswith("Exception IOError: "), s)
1872 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001873
1874 # Systematic tests of the text I/O API
1875
Antoine Pitrou19690592009-06-12 20:14:08 +00001876 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001877 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1878 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001879 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001880 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001881 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001883 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001884 f._CHUNK_SIZE = chunksize
1885 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001886 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887 cookie = f.tell()
1888 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001889 self.assertEquals(f.read(None), "abc")
1890 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001891 self.assertEquals(f.read(2), "ab")
1892 self.assertEquals(f.read(1), "c")
1893 self.assertEquals(f.read(1), "")
1894 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001895 self.assertEquals(f.tell(), cookie)
1896 self.assertEquals(f.seek(0), 0)
1897 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001900 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001901 if enc.startswith("utf"):
1902 self.multi_line_test(f, enc)
1903 f.close()
1904
1905 def multi_line_test(self, f, enc):
1906 f.seek(0)
1907 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001908 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001909 wlines = []
1910 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1911 chars = []
1912 for i in range(size):
1913 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 wlines.append((f.tell(), line))
1916 f.write(line)
1917 f.seek(0)
1918 rlines = []
1919 while True:
1920 pos = f.tell()
1921 line = f.readline()
1922 if not line:
1923 break
1924 rlines.append((pos, line))
1925 self.assertEquals(rlines, wlines)
1926
Antoine Pitrou19690592009-06-12 20:14:08 +00001927 def test_telling(self):
1928 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001929 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001930 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001932 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001933 p2 = f.tell()
1934 f.seek(0)
1935 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001936 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001937 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 self.assertEquals(f.tell(), p2)
1940 f.seek(0)
1941 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001942 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943 self.assertRaises(IOError, f.tell)
1944 self.assertEquals(f.tell(), p2)
1945 f.close()
1946
Antoine Pitrou19690592009-06-12 20:14:08 +00001947 def test_seeking(self):
1948 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001949 prefix_size = chunk_size - 2
1950 u_prefix = "a" * prefix_size
1951 prefix = bytes(u_prefix.encode("utf-8"))
1952 self.assertEquals(len(u_prefix), len(prefix))
1953 u_suffix = "\u8888\n"
1954 suffix = bytes(u_suffix.encode("utf-8"))
1955 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001956 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957 f.write(line*2)
1958 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001960 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001961 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 self.assertEquals(f.tell(), prefix_size)
1963 self.assertEquals(f.readline(), u_suffix)
1964
Antoine Pitrou19690592009-06-12 20:14:08 +00001965 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966 # Regression test for a specific bug
1967 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001968 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969 f.write(data)
1970 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001972 f._CHUNK_SIZE # Just test that it exists
1973 f._CHUNK_SIZE = 2
1974 f.readline()
1975 f.tell()
1976
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 def test_seek_and_tell(self):
1978 #Test seek/tell using the StatefulIncrementalDecoder.
1979 # Make test faster by doing smaller seeks
1980 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981
Antoine Pitrou19690592009-06-12 20:14:08 +00001982 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 """Tell/seek to various points within a data stream and ensure
1984 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001986 f.write(data)
1987 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 f = self.open(support.TESTFN, encoding='test_decoder')
1989 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001990 decoded = f.read()
1991 f.close()
1992
1993 for i in range(min_pos, len(decoded) + 1): # seek positions
1994 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001995 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001996 self.assertEquals(f.read(i), decoded[:i])
1997 cookie = f.tell()
1998 self.assertEquals(f.read(j), decoded[i:i + j])
1999 f.seek(cookie)
2000 self.assertEquals(f.read(), decoded[i:])
2001 f.close()
2002
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002003 # Enable the test decoder.
2004 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002005
2006 # Run the tests.
2007 try:
2008 # Try each test case.
2009 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002010 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011
2012 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002013 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2014 offset = CHUNK_SIZE - len(input)//2
2015 prefix = b'.'*offset
2016 # Don't bother seeking into the prefix (takes too long).
2017 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002018 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002019
2020 # Ensure our test decoder won't interfere with subsequent tests.
2021 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002022 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002023
Antoine Pitrou19690592009-06-12 20:14:08 +00002024 def test_encoded_writes(self):
2025 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002026 tests = ("utf-16",
2027 "utf-16-le",
2028 "utf-16-be",
2029 "utf-32",
2030 "utf-32-le",
2031 "utf-32-be")
2032 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002033 buf = self.BytesIO()
2034 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 # Check if the BOM is written only once (see issue1753).
2036 f.write(data)
2037 f.write(data)
2038 f.seek(0)
2039 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002040 f.seek(0)
2041 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002042 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2043
Antoine Pitrou19690592009-06-12 20:14:08 +00002044 def test_unreadable(self):
2045 class UnReadable(self.BytesIO):
2046 def readable(self):
2047 return False
2048 txt = self.TextIOWrapper(UnReadable())
2049 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002050
Antoine Pitrou19690592009-06-12 20:14:08 +00002051 def test_read_one_by_one(self):
2052 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053 reads = ""
2054 while True:
2055 c = txt.read(1)
2056 if not c:
2057 break
2058 reads += c
2059 self.assertEquals(reads, "AA\nBB")
2060
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002061 def test_readlines(self):
2062 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2063 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2064 txt.seek(0)
2065 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2066 txt.seek(0)
2067 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2068
Christian Heimes1a6387e2008-03-26 12:49:49 +00002069 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002070 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002071 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002072 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073 reads = ""
2074 while True:
2075 c = txt.read(128)
2076 if not c:
2077 break
2078 reads += c
2079 self.assertEquals(reads, "A"*127+"\nB")
2080
2081 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002082 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002083
2084 # read one char at a time
2085 reads = ""
2086 while True:
2087 c = txt.read(1)
2088 if not c:
2089 break
2090 reads += c
2091 self.assertEquals(reads, self.normalized)
2092
2093 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002094 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002095 txt._CHUNK_SIZE = 4
2096
2097 reads = ""
2098 while True:
2099 c = txt.read(4)
2100 if not c:
2101 break
2102 reads += c
2103 self.assertEquals(reads, self.normalized)
2104
2105 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002106 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107 txt._CHUNK_SIZE = 4
2108
2109 reads = txt.read(4)
2110 reads += txt.read(4)
2111 reads += txt.readline()
2112 reads += txt.readline()
2113 reads += txt.readline()
2114 self.assertEquals(reads, self.normalized)
2115
2116 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002117 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118 txt._CHUNK_SIZE = 4
2119
2120 reads = txt.read(4)
2121 reads += txt.read()
2122 self.assertEquals(reads, self.normalized)
2123
2124 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002125 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002126 txt._CHUNK_SIZE = 4
2127
2128 reads = txt.read(4)
2129 pos = txt.tell()
2130 txt.seek(0)
2131 txt.seek(pos)
2132 self.assertEquals(txt.read(4), "BBB\n")
2133
2134 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002135 buffer = self.BytesIO(self.testdata)
2136 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002137
2138 self.assertEqual(buffer.seekable(), txt.seekable())
2139
Antoine Pitrou19690592009-06-12 20:14:08 +00002140 def test_append_bom(self):
2141 # The BOM is not written again when appending to a non-empty file
2142 filename = support.TESTFN
2143 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2144 with self.open(filename, 'w', encoding=charset) as f:
2145 f.write('aaa')
2146 pos = f.tell()
2147 with self.open(filename, 'rb') as f:
2148 self.assertEquals(f.read(), 'aaa'.encode(charset))
2149
2150 with self.open(filename, 'a', encoding=charset) as f:
2151 f.write('xxx')
2152 with self.open(filename, 'rb') as f:
2153 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2154
Antoine Pitrou19690592009-06-12 20:14:08 +00002155 def test_seek_bom(self):
2156 # Same test, but when seeking manually
2157 filename = support.TESTFN
2158 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2159 with self.open(filename, 'w', encoding=charset) as f:
2160 f.write('aaa')
2161 pos = f.tell()
2162 with self.open(filename, 'r+', encoding=charset) as f:
2163 f.seek(pos)
2164 f.write('zzz')
2165 f.seek(0)
2166 f.write('bbb')
2167 with self.open(filename, 'rb') as f:
2168 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2169
2170 def test_errors_property(self):
2171 with self.open(support.TESTFN, "w") as f:
2172 self.assertEqual(f.errors, "strict")
2173 with self.open(support.TESTFN, "w", errors="replace") as f:
2174 self.assertEqual(f.errors, "replace")
2175
Victor Stinner6a102812010-04-27 23:55:59 +00002176 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002177 def test_threads_write(self):
2178 # Issue6750: concurrent writes could duplicate data
2179 event = threading.Event()
2180 with self.open(support.TESTFN, "w", buffering=1) as f:
2181 def run(n):
2182 text = "Thread%03d\n" % n
2183 event.wait()
2184 f.write(text)
2185 threads = [threading.Thread(target=lambda n=x: run(n))
2186 for x in range(20)]
2187 for t in threads:
2188 t.start()
2189 time.sleep(0.02)
2190 event.set()
2191 for t in threads:
2192 t.join()
2193 with self.open(support.TESTFN) as f:
2194 content = f.read()
2195 for n in range(20):
2196 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2197
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002198 def test_flush_error_on_close(self):
2199 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2200 def bad_flush():
2201 raise IOError()
2202 txt.flush = bad_flush
2203 self.assertRaises(IOError, txt.close) # exception not swallowed
2204
2205 def test_multi_close(self):
2206 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2207 txt.close()
2208 txt.close()
2209 txt.close()
2210 self.assertRaises(ValueError, txt.flush)
2211
Antoine Pitrou19690592009-06-12 20:14:08 +00002212class CTextIOWrapperTest(TextIOWrapperTest):
2213
2214 def test_initialization(self):
2215 r = self.BytesIO(b"\xc3\xa9\n\n")
2216 b = self.BufferedReader(r, 1000)
2217 t = self.TextIOWrapper(b)
2218 self.assertRaises(TypeError, t.__init__, b, newline=42)
2219 self.assertRaises(ValueError, t.read)
2220 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2221 self.assertRaises(ValueError, t.read)
2222
2223 def test_garbage_collection(self):
2224 # C TextIOWrapper objects are collected, and collecting them flushes
2225 # all data to disk.
2226 # The Python version has __del__, so it ends in gc.garbage instead.
2227 rawio = io.FileIO(support.TESTFN, "wb")
2228 b = self.BufferedWriter(rawio)
2229 t = self.TextIOWrapper(b, encoding="ascii")
2230 t.write("456def")
2231 t.x = t
2232 wr = weakref.ref(t)
2233 del t
2234 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002235 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002236 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 self.assertEqual(f.read(), b"456def")
2238
2239class PyTextIOWrapperTest(TextIOWrapperTest):
2240 pass
2241
2242
2243class IncrementalNewlineDecoderTest(unittest.TestCase):
2244
2245 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002246 # UTF-8 specific tests for a newline decoder
2247 def _check_decode(b, s, **kwargs):
2248 # We exercise getstate() / setstate() as well as decode()
2249 state = decoder.getstate()
2250 self.assertEquals(decoder.decode(b, **kwargs), s)
2251 decoder.setstate(state)
2252 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002253
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002254 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002256 _check_decode(b'\xe8', "")
2257 _check_decode(b'\xa2', "")
2258 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002259
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002260 _check_decode(b'\xe8', "")
2261 _check_decode(b'\xa2', "")
2262 _check_decode(b'\x88', "\u8888")
2263
2264 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002265 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2266
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002267 decoder.reset()
2268 _check_decode(b'\n', "\n")
2269 _check_decode(b'\r', "")
2270 _check_decode(b'', "\n", final=True)
2271 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002272
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002273 _check_decode(b'\r', "")
2274 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002275
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002276 _check_decode(b'\r\r\n', "\n\n")
2277 _check_decode(b'\r', "")
2278 _check_decode(b'\r', "\n")
2279 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002280
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002281 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2282 _check_decode(b'\xe8\xa2\x88', "\u8888")
2283 _check_decode(b'\n', "\n")
2284 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2285 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002286
Antoine Pitrou19690592009-06-12 20:14:08 +00002287 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002288 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 if encoding is not None:
2290 encoder = codecs.getincrementalencoder(encoding)()
2291 def _decode_bytewise(s):
2292 # Decode one byte at a time
2293 for b in encoder.encode(s):
2294 result.append(decoder.decode(b))
2295 else:
2296 encoder = None
2297 def _decode_bytewise(s):
2298 # Decode one char at a time
2299 for c in s:
2300 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002301 self.assertEquals(decoder.newlines, None)
2302 _decode_bytewise("abc\n\r")
2303 self.assertEquals(decoder.newlines, '\n')
2304 _decode_bytewise("\nabc")
2305 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2306 _decode_bytewise("abc\r")
2307 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2308 _decode_bytewise("abc")
2309 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2310 _decode_bytewise("abc\r")
2311 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2312 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002313 input = "abc"
2314 if encoder is not None:
2315 encoder.reset()
2316 input = encoder.encode(input)
2317 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002318 self.assertEquals(decoder.newlines, None)
2319
2320 def test_newline_decoder(self):
2321 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002322 # None meaning the IncrementalNewlineDecoder takes unicode input
2323 # rather than bytes input
2324 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002325 'utf-16', 'utf-16-le', 'utf-16-be',
2326 'utf-32', 'utf-32-le', 'utf-32-be',
2327 )
2328 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002329 decoder = enc and codecs.getincrementaldecoder(enc)()
2330 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2331 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002332 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2334 self.check_newline_decoding_utf8(decoder)
2335
2336 def test_newline_bytes(self):
2337 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2338 def _check(dec):
2339 self.assertEquals(dec.newlines, None)
2340 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2341 self.assertEquals(dec.newlines, None)
2342 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2343 self.assertEquals(dec.newlines, None)
2344 dec = self.IncrementalNewlineDecoder(None, translate=False)
2345 _check(dec)
2346 dec = self.IncrementalNewlineDecoder(None, translate=True)
2347 _check(dec)
2348
2349class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2350 pass
2351
2352class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2353 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002354
Christian Heimes1a6387e2008-03-26 12:49:49 +00002355
2356# XXX Tests for open()
2357
2358class MiscIOTest(unittest.TestCase):
2359
Benjamin Petersonad100c32008-11-20 22:06:22 +00002360 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002361 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002362
Antoine Pitrou19690592009-06-12 20:14:08 +00002363 def test___all__(self):
2364 for name in self.io.__all__:
2365 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002366 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002367 if name == "open":
2368 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002369 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002370 self.assertTrue(issubclass(obj, Exception), name)
2371 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002372 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002373
Benjamin Petersonad100c32008-11-20 22:06:22 +00002374 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002376 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002377 f.close()
2378
Antoine Pitrou19690592009-06-12 20:14:08 +00002379 f = self.open(support.TESTFN, "U")
2380 self.assertEquals(f.name, support.TESTFN)
2381 self.assertEquals(f.buffer.name, support.TESTFN)
2382 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002383 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002384 self.assertEquals(f.buffer.mode, "rb")
2385 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002386 f.close()
2387
Antoine Pitrou19690592009-06-12 20:14:08 +00002388 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002389 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002390 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2391 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002392
Antoine Pitrou19690592009-06-12 20:14:08 +00002393 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002394 self.assertEquals(g.mode, "wb")
2395 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002396 self.assertEquals(g.name, f.fileno())
2397 self.assertEquals(g.raw.name, f.fileno())
2398 f.close()
2399 g.close()
2400
Antoine Pitrou19690592009-06-12 20:14:08 +00002401 def test_io_after_close(self):
2402 for kwargs in [
2403 {"mode": "w"},
2404 {"mode": "wb"},
2405 {"mode": "w", "buffering": 1},
2406 {"mode": "w", "buffering": 2},
2407 {"mode": "wb", "buffering": 0},
2408 {"mode": "r"},
2409 {"mode": "rb"},
2410 {"mode": "r", "buffering": 1},
2411 {"mode": "r", "buffering": 2},
2412 {"mode": "rb", "buffering": 0},
2413 {"mode": "w+"},
2414 {"mode": "w+b"},
2415 {"mode": "w+", "buffering": 1},
2416 {"mode": "w+", "buffering": 2},
2417 {"mode": "w+b", "buffering": 0},
2418 ]:
2419 f = self.open(support.TESTFN, **kwargs)
2420 f.close()
2421 self.assertRaises(ValueError, f.flush)
2422 self.assertRaises(ValueError, f.fileno)
2423 self.assertRaises(ValueError, f.isatty)
2424 self.assertRaises(ValueError, f.__iter__)
2425 if hasattr(f, "peek"):
2426 self.assertRaises(ValueError, f.peek, 1)
2427 self.assertRaises(ValueError, f.read)
2428 if hasattr(f, "read1"):
2429 self.assertRaises(ValueError, f.read1, 1024)
2430 if hasattr(f, "readinto"):
2431 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2432 self.assertRaises(ValueError, f.readline)
2433 self.assertRaises(ValueError, f.readlines)
2434 self.assertRaises(ValueError, f.seek, 0)
2435 self.assertRaises(ValueError, f.tell)
2436 self.assertRaises(ValueError, f.truncate)
2437 self.assertRaises(ValueError, f.write,
2438 b"" if "b" in kwargs['mode'] else "")
2439 self.assertRaises(ValueError, f.writelines, [])
2440 self.assertRaises(ValueError, next, f)
2441
2442 def test_blockingioerror(self):
2443 # Various BlockingIOError issues
2444 self.assertRaises(TypeError, self.BlockingIOError)
2445 self.assertRaises(TypeError, self.BlockingIOError, 1)
2446 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2447 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2448 b = self.BlockingIOError(1, "")
2449 self.assertEqual(b.characters_written, 0)
2450 class C(unicode):
2451 pass
2452 c = C("")
2453 b = self.BlockingIOError(1, c)
2454 c.b = b
2455 b.c = c
2456 wr = weakref.ref(c)
2457 del c, b
2458 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002459 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002460
2461 def test_abcs(self):
2462 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002463 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2464 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2465 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2466 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002467
2468 def _check_abc_inheritance(self, abcmodule):
2469 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002470 self.assertIsInstance(f, abcmodule.IOBase)
2471 self.assertIsInstance(f, abcmodule.RawIOBase)
2472 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2473 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002474 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002475 self.assertIsInstance(f, abcmodule.IOBase)
2476 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2477 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2478 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002479 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002480 self.assertIsInstance(f, abcmodule.IOBase)
2481 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2482 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2483 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002484
2485 def test_abc_inheritance(self):
2486 # Test implementations inherit from their respective ABCs
2487 self._check_abc_inheritance(self)
2488
2489 def test_abc_inheritance_official(self):
2490 # Test implementations inherit from the official ABCs of the
2491 # baseline "io" module.
2492 self._check_abc_inheritance(io)
2493
2494class CMiscIOTest(MiscIOTest):
2495 io = io
2496
2497class PyMiscIOTest(MiscIOTest):
2498 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002499
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002500
2501@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2502class SignalsTest(unittest.TestCase):
2503
2504 def setUp(self):
2505 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2506
2507 def tearDown(self):
2508 signal.signal(signal.SIGALRM, self.oldalrm)
2509
2510 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002511 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002512
2513 @unittest.skipUnless(threading, 'Threading required for this test.')
2514 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2515 """Check that a partial write, when it gets interrupted, properly
2516 invokes the signal handler."""
2517 read_results = []
2518 def _read():
2519 s = os.read(r, 1)
2520 read_results.append(s)
2521 t = threading.Thread(target=_read)
2522 t.daemon = True
2523 r, w = os.pipe()
2524 try:
2525 wio = self.io.open(w, **fdopen_kwargs)
2526 t.start()
2527 signal.alarm(1)
2528 # Fill the pipe enough that the write will be blocking.
2529 # It will be interrupted by the timer armed above. Since the
2530 # other thread has read one byte, the low-level write will
2531 # return with a successful (partial) result rather than an EINTR.
2532 # The buffered IO layer must check for pending signal
2533 # handlers, which in this case will invoke alarm_interrupt().
2534 self.assertRaises(ZeroDivisionError,
2535 wio.write, item * (1024 * 1024))
2536 t.join()
2537 # We got one byte, get another one and check that it isn't a
2538 # repeat of the first one.
2539 read_results.append(os.read(r, 1))
2540 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2541 finally:
2542 os.close(w)
2543 os.close(r)
2544 # This is deliberate. If we didn't close the file descriptor
2545 # before closing wio, wio would try to flush its internal
2546 # buffer, and block again.
2547 try:
2548 wio.close()
2549 except IOError as e:
2550 if e.errno != errno.EBADF:
2551 raise
2552
2553 def test_interrupted_write_unbuffered(self):
2554 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2555
2556 def test_interrupted_write_buffered(self):
2557 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2558
2559 def test_interrupted_write_text(self):
2560 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2561
2562class CSignalsTest(SignalsTest):
2563 io = io
2564
2565class PySignalsTest(SignalsTest):
2566 io = pyio
2567
2568
Christian Heimes1a6387e2008-03-26 12:49:49 +00002569def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002570 tests = (CIOTest, PyIOTest,
2571 CBufferedReaderTest, PyBufferedReaderTest,
2572 CBufferedWriterTest, PyBufferedWriterTest,
2573 CBufferedRWPairTest, PyBufferedRWPairTest,
2574 CBufferedRandomTest, PyBufferedRandomTest,
2575 StatefulIncrementalDecoderTest,
2576 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2577 CTextIOWrapperTest, PyTextIOWrapperTest,
2578 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002579 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002580 )
2581
2582 # Put the namespaces of the IO module we are testing and some useful mock
2583 # classes in the __dict__ of each test.
2584 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002585 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002586 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2587 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2588 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2589 globs = globals()
2590 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2591 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2592 # Avoid turning open into a bound method.
2593 py_io_ns["open"] = pyio.OpenWrapper
2594 for test in tests:
2595 if test.__name__.startswith("C"):
2596 for name, obj in c_io_ns.items():
2597 setattr(test, name, obj)
2598 elif test.__name__.startswith("Py"):
2599 for name, obj in py_io_ns.items():
2600 setattr(test, name, obj)
2601
2602 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002603
2604if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002605 test_main()