blob: d9705e740574f6df3c7a6c781f233bc4a53ee731 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
588 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000589
Antoine Pitrou19690592009-06-12 20:14:08 +0000590class PyIOTest(IOTest):
591 test_array_writes = unittest.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000594
595
Antoine Pitrou19690592009-06-12 20:14:08 +0000596class CommonBufferedTests:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599 def test_detach(self):
600 raw = self.MockRawIO()
601 buf = self.tp(raw)
602 self.assertIs(buf.detach(), raw)
603 self.assertRaises(ValueError, buf.detach)
604
605 def test_fileno(self):
606 rawio = self.MockRawIO()
607 bufio = self.tp(rawio)
608
Ezio Melotti2623a372010-11-21 13:34:58 +0000609 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000610
611 def test_no_fileno(self):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
614 pass
615
616 def test_invalid_args(self):
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 # Invalid whence
620 self.assertRaises(ValueError, bufio.seek, 0, -1)
621 self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623 def test_override_destructor(self):
624 tp = self.tp
625 record = []
626 class MyBufferedIO(tp):
627 def __del__(self):
628 record.append(1)
629 try:
630 f = super(MyBufferedIO, self).__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(2)
637 super(MyBufferedIO, self).close()
638 def flush(self):
639 record.append(3)
640 super(MyBufferedIO, self).flush()
641 rawio = self.MockRawIO()
642 bufio = MyBufferedIO(rawio)
643 writable = bufio.writable()
644 del bufio
645 support.gc_collect()
646 if writable:
647 self.assertEqual(record, [1, 2, 3])
648 else:
649 self.assertEqual(record, [1, 2])
650
651 def test_context_manager(self):
652 # Test usability as a context manager
653 rawio = self.MockRawIO()
654 bufio = self.tp(rawio)
655 def _with():
656 with bufio:
657 pass
658 _with()
659 # bufio should now be closed, and using it a second time should raise
660 # a ValueError.
661 self.assertRaises(ValueError, _with)
662
663 def test_error_through_destructor(self):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio = self.CloseFailureIO()
667 def f():
668 self.tp(rawio).xyzzy
669 with support.captured_output("stderr") as s:
670 self.assertRaises(AttributeError, f)
671 s = s.getvalue().strip()
672 if s:
673 # The destructor *may* have printed an unraisable error, check it
674 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000675 self.assertTrue(s.startswith("Exception IOError: "), s)
676 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000677
678 def test_repr(self):
679 raw = self.MockRawIO()
680 b = self.tp(raw)
681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682 self.assertEqual(repr(b), "<%s>" % clsname)
683 raw.name = "dummy"
684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685 raw.name = b"dummy"
686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000687
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000688 def test_flush_error_on_close(self):
689 raw = self.MockRawIO()
690 def bad_flush():
691 raise IOError()
692 raw.flush = bad_flush
693 b = self.tp(raw)
694 self.assertRaises(IOError, b.close) # exception not swallowed
695
696 def test_multi_close(self):
697 raw = self.MockRawIO()
698 b = self.tp(raw)
699 b.close()
700 b.close()
701 b.close()
702 self.assertRaises(ValueError, b.flush)
703
Antoine Pitroufc9ead62010-12-21 21:26:55 +0000704 def test_readonly_attributes(self):
705 raw = self.MockRawIO()
706 buf = self.tp(raw)
707 x = self.MockRawIO()
708 with self.assertRaises((AttributeError, TypeError)):
709 buf.raw = x
710
Christian Heimes1a6387e2008-03-26 12:49:49 +0000711
Antoine Pitrou19690592009-06-12 20:14:08 +0000712class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
713 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000714
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 def test_constructor(self):
716 rawio = self.MockRawIO([b"abc"])
717 bufio = self.tp(rawio)
718 bufio.__init__(rawio)
719 bufio.__init__(rawio, buffer_size=1024)
720 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000721 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
723 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
724 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
725 rawio = self.MockRawIO([b"abc"])
726 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000727 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000730 for arg in (None, 7):
731 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000733 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000734 # Invalid args
735 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000736
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 def test_read1(self):
738 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
739 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000740 self.assertEqual(b"a", bufio.read(1))
741 self.assertEqual(b"b", bufio.read1(1))
742 self.assertEqual(rawio._reads, 1)
743 self.assertEqual(b"c", bufio.read1(100))
744 self.assertEqual(rawio._reads, 1)
745 self.assertEqual(b"d", bufio.read1(100))
746 self.assertEqual(rawio._reads, 2)
747 self.assertEqual(b"efg", bufio.read1(100))
748 self.assertEqual(rawio._reads, 3)
749 self.assertEqual(b"", bufio.read1(100))
750 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000751 # Invalid args
752 self.assertRaises(ValueError, bufio.read1, -1)
753
754 def test_readinto(self):
755 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756 bufio = self.tp(rawio)
757 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000758 self.assertEqual(bufio.readinto(b), 2)
759 self.assertEqual(b, b"ab")
760 self.assertEqual(bufio.readinto(b), 2)
761 self.assertEqual(b, b"cd")
762 self.assertEqual(bufio.readinto(b), 2)
763 self.assertEqual(b, b"ef")
764 self.assertEqual(bufio.readinto(b), 1)
765 self.assertEqual(b, b"gf")
766 self.assertEqual(bufio.readinto(b), 0)
767 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000768
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000769 def test_readlines(self):
770 def bufio():
771 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
772 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000773 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
774 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
775 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000776
Antoine Pitrou19690592009-06-12 20:14:08 +0000777 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000778 data = b"abcdefghi"
779 dlen = len(data)
780
781 tests = [
782 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
783 [ 100, [ 3, 3, 3], [ dlen ] ],
784 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
785 ]
786
787 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000788 rawio = self.MockFileIO(data)
789 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000790 pos = 0
791 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000792 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000793 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000794 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000795 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000796
Antoine Pitrou19690592009-06-12 20:14:08 +0000797 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000798 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000799 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
800 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000801 self.assertEqual(b"abcd", bufio.read(6))
802 self.assertEqual(b"e", bufio.read(1))
803 self.assertEqual(b"fg", bufio.read())
804 self.assertEqual(b"", bufio.peek(1))
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200805 self.assertIsNone(bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000806 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807
Victor Stinnerdaf17e92011-05-25 22:52:37 +0200808 rawio = self.MockRawIO((b"a", None, None))
809 self.assertEqual(b"a", rawio.readall())
810 self.assertIsNone(rawio.readall())
811
Antoine Pitrou19690592009-06-12 20:14:08 +0000812 def test_read_past_eof(self):
813 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
814 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000815
Ezio Melotti2623a372010-11-21 13:34:58 +0000816 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000817
Antoine Pitrou19690592009-06-12 20:14:08 +0000818 def test_read_all(self):
819 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
820 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000821
Ezio Melotti2623a372010-11-21 13:34:58 +0000822 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000823
Victor Stinner6a102812010-04-27 23:55:59 +0000824 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000825 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000826 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000827 try:
828 # Write out many bytes with exactly the same number of 0's,
829 # 1's... 255's. This will help us check that concurrent reading
830 # doesn't duplicate or forget contents.
831 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000832 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000833 random.shuffle(l)
834 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000835 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000836 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000837 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000838 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000839 errors = []
840 results = []
841 def f():
842 try:
843 # Intra-buffer read then buffer-flushing read
844 for n in cycle([1, 19]):
845 s = bufio.read(n)
846 if not s:
847 break
848 # list.append() is atomic
849 results.append(s)
850 except Exception as e:
851 errors.append(e)
852 raise
853 threads = [threading.Thread(target=f) for x in range(20)]
854 for t in threads:
855 t.start()
856 time.sleep(0.02) # yield
857 for t in threads:
858 t.join()
859 self.assertFalse(errors,
860 "the following exceptions were caught: %r" % errors)
861 s = b''.join(results)
862 for i in range(256):
863 c = bytes(bytearray([i]))
864 self.assertEqual(s.count(c), N)
865 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000866 support.unlink(support.TESTFN)
867
868 def test_misbehaved_io(self):
869 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
870 bufio = self.tp(rawio)
871 self.assertRaises(IOError, bufio.seek, 0)
872 self.assertRaises(IOError, bufio.tell)
873
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000874 def test_no_extraneous_read(self):
875 # Issue #9550; when the raw IO object has satisfied the read request,
876 # we should not issue any additional reads, otherwise it may block
877 # (e.g. socket).
878 bufsize = 16
879 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
880 rawio = self.MockRawIO([b"x" * n])
881 bufio = self.tp(rawio, bufsize)
882 self.assertEqual(bufio.read(n), b"x" * n)
883 # Simple case: one raw read is enough to satisfy the request.
884 self.assertEqual(rawio._extraneous_reads, 0,
885 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
886 # A more complex case where two raw reads are needed to satisfy
887 # the request.
888 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
889 bufio = self.tp(rawio, bufsize)
890 self.assertEqual(bufio.read(n), b"x" * n)
891 self.assertEqual(rawio._extraneous_reads, 0,
892 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
893
894
Antoine Pitrou19690592009-06-12 20:14:08 +0000895class CBufferedReaderTest(BufferedReaderTest):
896 tp = io.BufferedReader
897
898 def test_constructor(self):
899 BufferedReaderTest.test_constructor(self)
900 # The allocation can succeed on 32-bit builds, e.g. with more
901 # than 2GB RAM and a 64-bit kernel.
902 if sys.maxsize > 0x7FFFFFFF:
903 rawio = self.MockRawIO()
904 bufio = self.tp(rawio)
905 self.assertRaises((OverflowError, MemoryError, ValueError),
906 bufio.__init__, rawio, sys.maxsize)
907
908 def test_initialization(self):
909 rawio = self.MockRawIO([b"abc"])
910 bufio = self.tp(rawio)
911 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
912 self.assertRaises(ValueError, bufio.read)
913 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
914 self.assertRaises(ValueError, bufio.read)
915 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
916 self.assertRaises(ValueError, bufio.read)
917
918 def test_misbehaved_io_read(self):
919 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
920 bufio = self.tp(rawio)
921 # _pyio.BufferedReader seems to implement reading different, so that
922 # checking this is not so easy.
923 self.assertRaises(IOError, bufio.read, 10)
924
925 def test_garbage_collection(self):
926 # C BufferedReader objects are collected.
927 # The Python version has __del__, so it ends into gc.garbage instead
928 rawio = self.FileIO(support.TESTFN, "w+b")
929 f = self.tp(rawio)
930 f.f = f
931 wr = weakref.ref(f)
932 del f
933 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000934 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000935
936class PyBufferedReaderTest(BufferedReaderTest):
937 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000938
939
Antoine Pitrou19690592009-06-12 20:14:08 +0000940class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
941 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000942
Antoine Pitrou19690592009-06-12 20:14:08 +0000943 def test_constructor(self):
944 rawio = self.MockRawIO()
945 bufio = self.tp(rawio)
946 bufio.__init__(rawio)
947 bufio.__init__(rawio, buffer_size=1024)
948 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000949 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000950 bufio.flush()
951 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
952 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
953 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
954 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000955 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000956 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000957 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000958
Antoine Pitrou19690592009-06-12 20:14:08 +0000959 def test_detach_flush(self):
960 raw = self.MockRawIO()
961 buf = self.tp(raw)
962 buf.write(b"howdy!")
963 self.assertFalse(raw._write_stack)
964 buf.detach()
965 self.assertEqual(raw._write_stack, [b"howdy!"])
966
967 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000968 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000969 writer = self.MockRawIO()
970 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000971 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000972 self.assertFalse(writer._write_stack)
973
Antoine Pitrou19690592009-06-12 20:14:08 +0000974 def test_write_overflow(self):
975 writer = self.MockRawIO()
976 bufio = self.tp(writer, 8)
977 contents = b"abcdefghijklmnop"
978 for n in range(0, len(contents), 3):
979 bufio.write(contents[n:n+3])
980 flushed = b"".join(writer._write_stack)
981 # At least (total - 8) bytes were implicitly flushed, perhaps more
982 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000983 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000984
Antoine Pitrou19690592009-06-12 20:14:08 +0000985 def check_writes(self, intermediate_func):
986 # Lots of writes, test the flushed output is as expected.
987 contents = bytes(range(256)) * 1000
988 n = 0
989 writer = self.MockRawIO()
990 bufio = self.tp(writer, 13)
991 # Generator of write sizes: repeat each N 15 times then proceed to N+1
992 def gen_sizes():
993 for size in count(1):
994 for i in range(15):
995 yield size
996 sizes = gen_sizes()
997 while n < len(contents):
998 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +0000999 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001000 intermediate_func(bufio)
1001 n += size
1002 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001003 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +00001004 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001005
Antoine Pitrou19690592009-06-12 20:14:08 +00001006 def test_writes(self):
1007 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001008
Antoine Pitrou19690592009-06-12 20:14:08 +00001009 def test_writes_and_flushes(self):
1010 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001011
Antoine Pitrou19690592009-06-12 20:14:08 +00001012 def test_writes_and_seeks(self):
1013 def _seekabs(bufio):
1014 pos = bufio.tell()
1015 bufio.seek(pos + 1, 0)
1016 bufio.seek(pos - 1, 0)
1017 bufio.seek(pos, 0)
1018 self.check_writes(_seekabs)
1019 def _seekrel(bufio):
1020 pos = bufio.seek(0, 1)
1021 bufio.seek(+1, 1)
1022 bufio.seek(-1, 1)
1023 bufio.seek(pos, 0)
1024 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001025
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 def test_writes_and_truncates(self):
1027 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001028
Antoine Pitrou19690592009-06-12 20:14:08 +00001029 def test_write_non_blocking(self):
1030 raw = self.MockNonBlockWriterIO()
1031 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001032
Ezio Melotti2623a372010-11-21 13:34:58 +00001033 self.assertEqual(bufio.write(b"abcd"), 4)
1034 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001035 # 1 byte will be written, the rest will be buffered
1036 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001037 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001038
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1040 raw.block_on(b"0")
1041 try:
1042 bufio.write(b"opqrwxyz0123456789")
1043 except self.BlockingIOError as e:
1044 written = e.characters_written
1045 else:
1046 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001047 self.assertEqual(written, 16)
1048 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001049 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001050
Ezio Melotti2623a372010-11-21 13:34:58 +00001051 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001052 s = raw.pop_written()
1053 # Previously buffered bytes were flushed
1054 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001055
Antoine Pitrou19690592009-06-12 20:14:08 +00001056 def test_write_and_rewind(self):
1057 raw = io.BytesIO()
1058 bufio = self.tp(raw, 4)
1059 self.assertEqual(bufio.write(b"abcdef"), 6)
1060 self.assertEqual(bufio.tell(), 6)
1061 bufio.seek(0, 0)
1062 self.assertEqual(bufio.write(b"XY"), 2)
1063 bufio.seek(6, 0)
1064 self.assertEqual(raw.getvalue(), b"XYcdef")
1065 self.assertEqual(bufio.write(b"123456"), 6)
1066 bufio.flush()
1067 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001068
Antoine Pitrou19690592009-06-12 20:14:08 +00001069 def test_flush(self):
1070 writer = self.MockRawIO()
1071 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001072 bufio.write(b"abc")
1073 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001074 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001075
Antoine Pitrou19690592009-06-12 20:14:08 +00001076 def test_destructor(self):
1077 writer = self.MockRawIO()
1078 bufio = self.tp(writer, 8)
1079 bufio.write(b"abc")
1080 del bufio
1081 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001082 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001083
1084 def test_truncate(self):
1085 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001086 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001087 bufio = self.tp(raw, 8)
1088 bufio.write(b"abcdef")
1089 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001090 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001091 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001092 self.assertEqual(f.read(), b"abc")
1093
Victor Stinner6a102812010-04-27 23:55:59 +00001094 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001095 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001096 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001097 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001098 # Write out many bytes from many threads and test they were
1099 # all flushed.
1100 N = 1000
1101 contents = bytes(range(256)) * N
1102 sizes = cycle([1, 19])
1103 n = 0
1104 queue = deque()
1105 while n < len(contents):
1106 size = next(sizes)
1107 queue.append(contents[n:n+size])
1108 n += size
1109 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001110 # We use a real file object because it allows us to
1111 # exercise situations where the GIL is released before
1112 # writing the buffer to the raw streams. This is in addition
1113 # to concurrency issues due to switching threads in the middle
1114 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001115 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001117 errors = []
1118 def f():
1119 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001120 while True:
1121 try:
1122 s = queue.popleft()
1123 except IndexError:
1124 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001125 bufio.write(s)
1126 except Exception as e:
1127 errors.append(e)
1128 raise
1129 threads = [threading.Thread(target=f) for x in range(20)]
1130 for t in threads:
1131 t.start()
1132 time.sleep(0.02) # yield
1133 for t in threads:
1134 t.join()
1135 self.assertFalse(errors,
1136 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001137 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001138 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001139 s = f.read()
1140 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001141 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001142 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001143 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001144
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 def test_misbehaved_io(self):
1146 rawio = self.MisbehavedRawIO()
1147 bufio = self.tp(rawio, 5)
1148 self.assertRaises(IOError, bufio.seek, 0)
1149 self.assertRaises(IOError, bufio.tell)
1150 self.assertRaises(IOError, bufio.write, b"abcdef")
1151
1152 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001153 with support.check_warnings(("max_buffer_size is deprecated",
1154 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001155 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001156
1157
1158class CBufferedWriterTest(BufferedWriterTest):
1159 tp = io.BufferedWriter
1160
1161 def test_constructor(self):
1162 BufferedWriterTest.test_constructor(self)
1163 # The allocation can succeed on 32-bit builds, e.g. with more
1164 # than 2GB RAM and a 64-bit kernel.
1165 if sys.maxsize > 0x7FFFFFFF:
1166 rawio = self.MockRawIO()
1167 bufio = self.tp(rawio)
1168 self.assertRaises((OverflowError, MemoryError, ValueError),
1169 bufio.__init__, rawio, sys.maxsize)
1170
1171 def test_initialization(self):
1172 rawio = self.MockRawIO()
1173 bufio = self.tp(rawio)
1174 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1175 self.assertRaises(ValueError, bufio.write, b"def")
1176 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1177 self.assertRaises(ValueError, bufio.write, b"def")
1178 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1179 self.assertRaises(ValueError, bufio.write, b"def")
1180
1181 def test_garbage_collection(self):
1182 # C BufferedWriter objects are collected, and collecting them flushes
1183 # all data to disk.
1184 # The Python version has __del__, so it ends into gc.garbage instead
1185 rawio = self.FileIO(support.TESTFN, "w+b")
1186 f = self.tp(rawio)
1187 f.write(b"123xxx")
1188 f.x = f
1189 wr = weakref.ref(f)
1190 del f
1191 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001192 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001193 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001194 self.assertEqual(f.read(), b"123xxx")
1195
1196
1197class PyBufferedWriterTest(BufferedWriterTest):
1198 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001199
1200class BufferedRWPairTest(unittest.TestCase):
1201
Antoine Pitrou19690592009-06-12 20:14:08 +00001202 def test_constructor(self):
1203 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001204 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001205
Antoine Pitrou19690592009-06-12 20:14:08 +00001206 def test_detach(self):
1207 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1208 self.assertRaises(self.UnsupportedOperation, pair.detach)
1209
1210 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001211 with support.check_warnings(("max_buffer_size is deprecated",
1212 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001213 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001214
1215 def test_constructor_with_not_readable(self):
1216 class NotReadable(MockRawIO):
1217 def readable(self):
1218 return False
1219
1220 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1221
1222 def test_constructor_with_not_writeable(self):
1223 class NotWriteable(MockRawIO):
1224 def writable(self):
1225 return False
1226
1227 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1228
1229 def test_read(self):
1230 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1231
1232 self.assertEqual(pair.read(3), b"abc")
1233 self.assertEqual(pair.read(1), b"d")
1234 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001235 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1236 self.assertEqual(pair.read(None), b"abc")
1237
1238 def test_readlines(self):
1239 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1240 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1241 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1242 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001243
1244 def test_read1(self):
1245 # .read1() is delegated to the underlying reader object, so this test
1246 # can be shallow.
1247 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1248
1249 self.assertEqual(pair.read1(3), b"abc")
1250
1251 def test_readinto(self):
1252 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1253
1254 data = bytearray(5)
1255 self.assertEqual(pair.readinto(data), 5)
1256 self.assertEqual(data, b"abcde")
1257
1258 def test_write(self):
1259 w = self.MockRawIO()
1260 pair = self.tp(self.MockRawIO(), w)
1261
1262 pair.write(b"abc")
1263 pair.flush()
1264 pair.write(b"def")
1265 pair.flush()
1266 self.assertEqual(w._write_stack, [b"abc", b"def"])
1267
1268 def test_peek(self):
1269 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1270
1271 self.assertTrue(pair.peek(3).startswith(b"abc"))
1272 self.assertEqual(pair.read(3), b"abc")
1273
1274 def test_readable(self):
1275 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1276 self.assertTrue(pair.readable())
1277
1278 def test_writeable(self):
1279 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1280 self.assertTrue(pair.writable())
1281
1282 def test_seekable(self):
1283 # BufferedRWPairs are never seekable, even if their readers and writers
1284 # are.
1285 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1286 self.assertFalse(pair.seekable())
1287
1288 # .flush() is delegated to the underlying writer object and has been
1289 # tested in the test_write method.
1290
1291 def test_close_and_closed(self):
1292 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1293 self.assertFalse(pair.closed)
1294 pair.close()
1295 self.assertTrue(pair.closed)
1296
1297 def test_isatty(self):
1298 class SelectableIsAtty(MockRawIO):
1299 def __init__(self, isatty):
1300 MockRawIO.__init__(self)
1301 self._isatty = isatty
1302
1303 def isatty(self):
1304 return self._isatty
1305
1306 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1307 self.assertFalse(pair.isatty())
1308
1309 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1310 self.assertTrue(pair.isatty())
1311
1312 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1313 self.assertTrue(pair.isatty())
1314
1315 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1316 self.assertTrue(pair.isatty())
1317
1318class CBufferedRWPairTest(BufferedRWPairTest):
1319 tp = io.BufferedRWPair
1320
1321class PyBufferedRWPairTest(BufferedRWPairTest):
1322 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001323
1324
Antoine Pitrou19690592009-06-12 20:14:08 +00001325class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1326 read_mode = "rb+"
1327 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001328
Antoine Pitrou19690592009-06-12 20:14:08 +00001329 def test_constructor(self):
1330 BufferedReaderTest.test_constructor(self)
1331 BufferedWriterTest.test_constructor(self)
1332
1333 def test_read_and_write(self):
1334 raw = self.MockRawIO((b"asdf", b"ghjk"))
1335 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001336
1337 self.assertEqual(b"as", rw.read(2))
1338 rw.write(b"ddd")
1339 rw.write(b"eee")
1340 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001341 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001342 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001343
Antoine Pitrou19690592009-06-12 20:14:08 +00001344 def test_seek_and_tell(self):
1345 raw = self.BytesIO(b"asdfghjkl")
1346 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001347
Ezio Melotti2623a372010-11-21 13:34:58 +00001348 self.assertEqual(b"as", rw.read(2))
1349 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001350 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001351 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001352
1353 rw.write(b"asdf")
1354 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001355 self.assertEqual(b"asdfasdfl", rw.read())
1356 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001357 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001358 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001359 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001360 self.assertEqual(7, rw.tell())
1361 self.assertEqual(b"fl", rw.read(11))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001362 self.assertRaises(TypeError, rw.seek, 0.0)
1363
Antoine Pitrou19690592009-06-12 20:14:08 +00001364 def check_flush_and_read(self, read_func):
1365 raw = self.BytesIO(b"abcdefghi")
1366 bufio = self.tp(raw)
1367
Ezio Melotti2623a372010-11-21 13:34:58 +00001368 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001369 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001370 self.assertEqual(b"ef", read_func(bufio, 2))
1371 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001372 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001373 self.assertEqual(6, bufio.tell())
1374 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001375 raw.seek(0, 0)
1376 raw.write(b"XYZ")
1377 # flush() resets the read buffer
1378 bufio.flush()
1379 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001380 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001381
1382 def test_flush_and_read(self):
1383 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1384
1385 def test_flush_and_readinto(self):
1386 def _readinto(bufio, n=-1):
1387 b = bytearray(n if n >= 0 else 9999)
1388 n = bufio.readinto(b)
1389 return bytes(b[:n])
1390 self.check_flush_and_read(_readinto)
1391
1392 def test_flush_and_peek(self):
1393 def _peek(bufio, n=-1):
1394 # This relies on the fact that the buffer can contain the whole
1395 # raw stream, otherwise peek() can return less.
1396 b = bufio.peek(n)
1397 if n != -1:
1398 b = b[:n]
1399 bufio.seek(len(b), 1)
1400 return b
1401 self.check_flush_and_read(_peek)
1402
1403 def test_flush_and_write(self):
1404 raw = self.BytesIO(b"abcdefghi")
1405 bufio = self.tp(raw)
1406
1407 bufio.write(b"123")
1408 bufio.flush()
1409 bufio.write(b"45")
1410 bufio.flush()
1411 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001412 self.assertEqual(b"12345fghi", raw.getvalue())
1413 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001414
1415 def test_threads(self):
1416 BufferedReaderTest.test_threads(self)
1417 BufferedWriterTest.test_threads(self)
1418
1419 def test_writes_and_peek(self):
1420 def _peek(bufio):
1421 bufio.peek(1)
1422 self.check_writes(_peek)
1423 def _peek(bufio):
1424 pos = bufio.tell()
1425 bufio.seek(-1, 1)
1426 bufio.peek(1)
1427 bufio.seek(pos, 0)
1428 self.check_writes(_peek)
1429
1430 def test_writes_and_reads(self):
1431 def _read(bufio):
1432 bufio.seek(-1, 1)
1433 bufio.read(1)
1434 self.check_writes(_read)
1435
1436 def test_writes_and_read1s(self):
1437 def _read1(bufio):
1438 bufio.seek(-1, 1)
1439 bufio.read1(1)
1440 self.check_writes(_read1)
1441
1442 def test_writes_and_readintos(self):
1443 def _read(bufio):
1444 bufio.seek(-1, 1)
1445 bufio.readinto(bytearray(1))
1446 self.check_writes(_read)
1447
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001448 def test_write_after_readahead(self):
1449 # Issue #6629: writing after the buffer was filled by readahead should
1450 # first rewind the raw stream.
1451 for overwrite_size in [1, 5]:
1452 raw = self.BytesIO(b"A" * 10)
1453 bufio = self.tp(raw, 4)
1454 # Trigger readahead
1455 self.assertEqual(bufio.read(1), b"A")
1456 self.assertEqual(bufio.tell(), 1)
1457 # Overwriting should rewind the raw stream if it needs so
1458 bufio.write(b"B" * overwrite_size)
1459 self.assertEqual(bufio.tell(), overwrite_size + 1)
1460 # If the write size was smaller than the buffer size, flush() and
1461 # check that rewind happens.
1462 bufio.flush()
1463 self.assertEqual(bufio.tell(), overwrite_size + 1)
1464 s = raw.getvalue()
1465 self.assertEqual(s,
1466 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1467
Antoine Pitrouee46a7b2011-05-13 00:31:52 +02001468 def test_write_rewind_write(self):
1469 # Various combinations of reading / writing / seeking backwards / writing again
1470 def mutate(bufio, pos1, pos2):
1471 assert pos2 >= pos1
1472 # Fill the buffer
1473 bufio.seek(pos1)
1474 bufio.read(pos2 - pos1)
1475 bufio.write(b'\x02')
1476 # This writes earlier than the previous write, but still inside
1477 # the buffer.
1478 bufio.seek(pos1)
1479 bufio.write(b'\x01')
1480
1481 b = b"\x80\x81\x82\x83\x84"
1482 for i in range(0, len(b)):
1483 for j in range(i, len(b)):
1484 raw = self.BytesIO(b)
1485 bufio = self.tp(raw, 100)
1486 mutate(bufio, i, j)
1487 bufio.flush()
1488 expected = bytearray(b)
1489 expected[j] = 2
1490 expected[i] = 1
1491 self.assertEqual(raw.getvalue(), expected,
1492 "failed result for i=%d, j=%d" % (i, j))
1493
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001494 def test_truncate_after_read_or_write(self):
1495 raw = self.BytesIO(b"A" * 10)
1496 bufio = self.tp(raw, 100)
1497 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1498 self.assertEqual(bufio.truncate(), 2)
1499 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1500 self.assertEqual(bufio.truncate(), 4)
1501
Antoine Pitrou19690592009-06-12 20:14:08 +00001502 def test_misbehaved_io(self):
1503 BufferedReaderTest.test_misbehaved_io(self)
1504 BufferedWriterTest.test_misbehaved_io(self)
1505
1506class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1507 tp = io.BufferedRandom
1508
1509 def test_constructor(self):
1510 BufferedRandomTest.test_constructor(self)
1511 # The allocation can succeed on 32-bit builds, e.g. with more
1512 # than 2GB RAM and a 64-bit kernel.
1513 if sys.maxsize > 0x7FFFFFFF:
1514 rawio = self.MockRawIO()
1515 bufio = self.tp(rawio)
1516 self.assertRaises((OverflowError, MemoryError, ValueError),
1517 bufio.__init__, rawio, sys.maxsize)
1518
1519 def test_garbage_collection(self):
1520 CBufferedReaderTest.test_garbage_collection(self)
1521 CBufferedWriterTest.test_garbage_collection(self)
1522
1523class PyBufferedRandomTest(BufferedRandomTest):
1524 tp = pyio.BufferedRandom
1525
1526
Christian Heimes1a6387e2008-03-26 12:49:49 +00001527# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1528# properties:
1529# - A single output character can correspond to many bytes of input.
1530# - The number of input bytes to complete the character can be
1531# undetermined until the last input byte is received.
1532# - The number of input bytes can vary depending on previous input.
1533# - A single input byte can correspond to many characters of output.
1534# - The number of output characters can be undetermined until the
1535# last input byte is received.
1536# - The number of output characters can vary depending on previous input.
1537
1538class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1539 """
1540 For testing seek/tell behavior with a stateful, buffering decoder.
1541
1542 Input is a sequence of words. Words may be fixed-length (length set
1543 by input) or variable-length (period-terminated). In variable-length
1544 mode, extra periods are ignored. Possible words are:
1545 - 'i' followed by a number sets the input length, I (maximum 99).
1546 When I is set to 0, words are space-terminated.
1547 - 'o' followed by a number sets the output length, O (maximum 99).
1548 - Any other word is converted into a word followed by a period on
1549 the output. The output word consists of the input word truncated
1550 or padded out with hyphens to make its length equal to O. If O
1551 is 0, the word is output verbatim without truncating or padding.
1552 I and O are initially set to 1. When I changes, any buffered input is
1553 re-scanned according to the new I. EOF also terminates the last word.
1554 """
1555
1556 def __init__(self, errors='strict'):
1557 codecs.IncrementalDecoder.__init__(self, errors)
1558 self.reset()
1559
1560 def __repr__(self):
1561 return '<SID %x>' % id(self)
1562
1563 def reset(self):
1564 self.i = 1
1565 self.o = 1
1566 self.buffer = bytearray()
1567
1568 def getstate(self):
1569 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1570 return bytes(self.buffer), i*100 + o
1571
1572 def setstate(self, state):
1573 buffer, io = state
1574 self.buffer = bytearray(buffer)
1575 i, o = divmod(io, 100)
1576 self.i, self.o = i ^ 1, o ^ 1
1577
1578 def decode(self, input, final=False):
1579 output = ''
1580 for b in input:
1581 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001582 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001583 if self.buffer:
1584 output += self.process_word()
1585 else:
1586 self.buffer.append(b)
1587 else: # fixed-length, terminate after self.i bytes
1588 self.buffer.append(b)
1589 if len(self.buffer) == self.i:
1590 output += self.process_word()
1591 if final and self.buffer: # EOF terminates the last word
1592 output += self.process_word()
1593 return output
1594
1595 def process_word(self):
1596 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001597 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001598 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001599 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1601 else:
1602 output = self.buffer.decode('ascii')
1603 if len(output) < self.o:
1604 output += '-'*self.o # pad out with hyphens
1605 if self.o:
1606 output = output[:self.o] # truncate to output length
1607 output += '.'
1608 self.buffer = bytearray()
1609 return output
1610
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001611 codecEnabled = False
1612
1613 @classmethod
1614 def lookupTestDecoder(cls, name):
1615 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001616 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001617 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001618 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001619 incrementalencoder=None,
1620 streamreader=None, streamwriter=None,
1621 incrementaldecoder=cls)
1622
1623# Register the previous decoder for testing.
1624# Disabled by default, tests will enable it.
1625codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1626
1627
Christian Heimes1a6387e2008-03-26 12:49:49 +00001628class StatefulIncrementalDecoderTest(unittest.TestCase):
1629 """
1630 Make sure the StatefulIncrementalDecoder actually works.
1631 """
1632
1633 test_cases = [
1634 # I=1, O=1 (fixed-length input == fixed-length output)
1635 (b'abcd', False, 'a.b.c.d.'),
1636 # I=0, O=0 (variable-length input, variable-length output)
1637 (b'oiabcd', True, 'abcd.'),
1638 # I=0, O=0 (should ignore extra periods)
1639 (b'oi...abcd...', True, 'abcd.'),
1640 # I=0, O=6 (variable-length input, fixed-length output)
1641 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1642 # I=2, O=6 (fixed-length input < fixed-length output)
1643 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1644 # I=6, O=3 (fixed-length input > fixed-length output)
1645 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1646 # I=0, then 3; O=29, then 15 (with longer output)
1647 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1648 'a----------------------------.' +
1649 'b----------------------------.' +
1650 'cde--------------------------.' +
1651 'abcdefghijabcde.' +
1652 'a.b------------.' +
1653 '.c.------------.' +
1654 'd.e------------.' +
1655 'k--------------.' +
1656 'l--------------.' +
1657 'm--------------.')
1658 ]
1659
Antoine Pitrou19690592009-06-12 20:14:08 +00001660 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001661 # Try a few one-shot test cases.
1662 for input, eof, output in self.test_cases:
1663 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001664 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001665
1666 # Also test an unfinished decode, followed by forcing EOF.
1667 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001668 self.assertEqual(d.decode(b'oiabcd'), '')
1669 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001670
1671class TextIOWrapperTest(unittest.TestCase):
1672
1673 def setUp(self):
1674 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1675 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001676 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001677
1678 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001679 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001680
Antoine Pitrou19690592009-06-12 20:14:08 +00001681 def test_constructor(self):
1682 r = self.BytesIO(b"\xc3\xa9\n\n")
1683 b = self.BufferedReader(r, 1000)
1684 t = self.TextIOWrapper(b)
1685 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001686 self.assertEqual(t.encoding, "latin1")
1687 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001688 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001689 self.assertEqual(t.encoding, "utf8")
1690 self.assertEqual(t.line_buffering, True)
1691 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001692 self.assertRaises(TypeError, t.__init__, b, newline=42)
1693 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1694
1695 def test_detach(self):
1696 r = self.BytesIO()
1697 b = self.BufferedWriter(r)
1698 t = self.TextIOWrapper(b)
1699 self.assertIs(t.detach(), b)
1700
1701 t = self.TextIOWrapper(b, encoding="ascii")
1702 t.write("howdy")
1703 self.assertFalse(r.getvalue())
1704 t.detach()
1705 self.assertEqual(r.getvalue(), b"howdy")
1706 self.assertRaises(ValueError, t.detach)
1707
1708 def test_repr(self):
1709 raw = self.BytesIO("hello".encode("utf-8"))
1710 b = self.BufferedReader(raw)
1711 t = self.TextIOWrapper(b, encoding="utf-8")
1712 modname = self.TextIOWrapper.__module__
1713 self.assertEqual(repr(t),
1714 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1715 raw.name = "dummy"
1716 self.assertEqual(repr(t),
1717 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1718 raw.name = b"dummy"
1719 self.assertEqual(repr(t),
1720 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1721
1722 def test_line_buffering(self):
1723 r = self.BytesIO()
1724 b = self.BufferedWriter(r, 1000)
1725 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1726 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001727 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001728 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001729 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001730 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001731 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001732
Antoine Pitrou19690592009-06-12 20:14:08 +00001733 def test_encoding(self):
1734 # Check the encoding attribute is always set, and valid
1735 b = self.BytesIO()
1736 t = self.TextIOWrapper(b, encoding="utf8")
1737 self.assertEqual(t.encoding, "utf8")
1738 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001739 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001740 codecs.lookup(t.encoding)
1741
1742 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001743 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001744 b = self.BytesIO(b"abc\n\xff\n")
1745 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001746 self.assertRaises(UnicodeError, t.read)
1747 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001748 b = self.BytesIO(b"abc\n\xff\n")
1749 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001750 self.assertRaises(UnicodeError, t.read)
1751 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001752 b = self.BytesIO(b"abc\n\xff\n")
1753 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001754 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001755 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001756 b = self.BytesIO(b"abc\n\xff\n")
1757 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001758 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001759
Antoine Pitrou19690592009-06-12 20:14:08 +00001760 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001761 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001762 b = self.BytesIO()
1763 t = self.TextIOWrapper(b, encoding="ascii")
1764 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001765 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001766 b = self.BytesIO()
1767 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1768 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001769 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001770 b = self.BytesIO()
1771 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001772 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001773 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001774 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001775 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001776 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001777 b = self.BytesIO()
1778 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001779 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001780 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001781 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001782 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001783
Antoine Pitrou19690592009-06-12 20:14:08 +00001784 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001785 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1786
1787 tests = [
1788 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1789 [ '', input_lines ],
1790 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1791 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1792 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1793 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001794 encodings = (
1795 'utf-8', 'latin-1',
1796 'utf-16', 'utf-16-le', 'utf-16-be',
1797 'utf-32', 'utf-32-le', 'utf-32-be',
1798 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001799
1800 # Try a range of buffer sizes to test the case where \r is the last
1801 # character in TextIOWrapper._pending_line.
1802 for encoding in encodings:
1803 # XXX: str.encode() should return bytes
1804 data = bytes(''.join(input_lines).encode(encoding))
1805 for do_reads in (False, True):
1806 for bufsize in range(1, 10):
1807 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001808 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1809 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001810 encoding=encoding)
1811 if do_reads:
1812 got_lines = []
1813 while True:
1814 c2 = textio.read(2)
1815 if c2 == '':
1816 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001817 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001818 got_lines.append(c2 + textio.readline())
1819 else:
1820 got_lines = list(textio)
1821
1822 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001823 self.assertEqual(got_line, exp_line)
1824 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825
Antoine Pitrou19690592009-06-12 20:14:08 +00001826 def test_newlines_input(self):
1827 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001828 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1829 for newline, expected in [
1830 (None, normalized.decode("ascii").splitlines(True)),
1831 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001832 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1833 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1834 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001836 buf = self.BytesIO(testdata)
1837 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001838 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001839 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001840 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001841
Antoine Pitrou19690592009-06-12 20:14:08 +00001842 def test_newlines_output(self):
1843 testdict = {
1844 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1845 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1846 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1847 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1848 }
1849 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1850 for newline, expected in tests:
1851 buf = self.BytesIO()
1852 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1853 txt.write("AAA\nB")
1854 txt.write("BB\nCCC\n")
1855 txt.write("X\rY\r\nZ")
1856 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001857 self.assertEqual(buf.closed, False)
1858 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001859
1860 def test_destructor(self):
1861 l = []
1862 base = self.BytesIO
1863 class MyBytesIO(base):
1864 def close(self):
1865 l.append(self.getvalue())
1866 base.close(self)
1867 b = MyBytesIO()
1868 t = self.TextIOWrapper(b, encoding="ascii")
1869 t.write("abc")
1870 del t
1871 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001872 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001873
1874 def test_override_destructor(self):
1875 record = []
1876 class MyTextIO(self.TextIOWrapper):
1877 def __del__(self):
1878 record.append(1)
1879 try:
1880 f = super(MyTextIO, self).__del__
1881 except AttributeError:
1882 pass
1883 else:
1884 f()
1885 def close(self):
1886 record.append(2)
1887 super(MyTextIO, self).close()
1888 def flush(self):
1889 record.append(3)
1890 super(MyTextIO, self).flush()
1891 b = self.BytesIO()
1892 t = MyTextIO(b, encoding="ascii")
1893 del t
1894 support.gc_collect()
1895 self.assertEqual(record, [1, 2, 3])
1896
1897 def test_error_through_destructor(self):
1898 # Test that the exception state is not modified by a destructor,
1899 # even if close() fails.
1900 rawio = self.CloseFailureIO()
1901 def f():
1902 self.TextIOWrapper(rawio).xyzzy
1903 with support.captured_output("stderr") as s:
1904 self.assertRaises(AttributeError, f)
1905 s = s.getvalue().strip()
1906 if s:
1907 # The destructor *may* have printed an unraisable error, check it
1908 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001909 self.assertTrue(s.startswith("Exception IOError: "), s)
1910 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001911
1912 # Systematic tests of the text I/O API
1913
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001915 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1916 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001917 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001918 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001919 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001920 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001921 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001923 self.assertEqual(f.tell(), 0)
1924 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001925 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001926 self.assertEqual(f.seek(0), 0)
1927 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001928 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001929 self.assertEqual(f.read(2), "ab")
1930 self.assertEqual(f.read(1), "c")
1931 self.assertEqual(f.read(1), "")
1932 self.assertEqual(f.read(), "")
1933 self.assertEqual(f.tell(), cookie)
1934 self.assertEqual(f.seek(0), 0)
1935 self.assertEqual(f.seek(0, 2), cookie)
1936 self.assertEqual(f.write("def"), 3)
1937 self.assertEqual(f.seek(cookie), cookie)
1938 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 if enc.startswith("utf"):
1940 self.multi_line_test(f, enc)
1941 f.close()
1942
1943 def multi_line_test(self, f, enc):
1944 f.seek(0)
1945 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001946 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001947 wlines = []
1948 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1949 chars = []
1950 for i in range(size):
1951 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001952 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001953 wlines.append((f.tell(), line))
1954 f.write(line)
1955 f.seek(0)
1956 rlines = []
1957 while True:
1958 pos = f.tell()
1959 line = f.readline()
1960 if not line:
1961 break
1962 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00001963 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001964
Antoine Pitrou19690592009-06-12 20:14:08 +00001965 def test_telling(self):
1966 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001967 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001968 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001970 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001971 p2 = f.tell()
1972 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001973 self.assertEqual(f.tell(), p0)
1974 self.assertEqual(f.readline(), "\xff\n")
1975 self.assertEqual(f.tell(), p1)
1976 self.assertEqual(f.readline(), "\xff\n")
1977 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001978 f.seek(0)
1979 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00001980 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001981 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00001982 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983 f.close()
1984
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 def test_seeking(self):
1986 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987 prefix_size = chunk_size - 2
1988 u_prefix = "a" * prefix_size
1989 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00001990 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001991 u_suffix = "\u8888\n"
1992 suffix = bytes(u_suffix.encode("utf-8"))
1993 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995 f.write(line*2)
1996 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00001999 self.assertEqual(s, prefix.decode("ascii"))
2000 self.assertEqual(f.tell(), prefix_size)
2001 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002002
Antoine Pitrou19690592009-06-12 20:14:08 +00002003 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002004 # Regression test for a specific bug
2005 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00002006 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002007 f.write(data)
2008 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002009 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002010 f._CHUNK_SIZE # Just test that it exists
2011 f._CHUNK_SIZE = 2
2012 f.readline()
2013 f.tell()
2014
Antoine Pitrou19690592009-06-12 20:14:08 +00002015 def test_seek_and_tell(self):
2016 #Test seek/tell using the StatefulIncrementalDecoder.
2017 # Make test faster by doing smaller seeks
2018 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00002019
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 """Tell/seek to various points within a data stream and ensure
2022 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00002023 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00002024 f.write(data)
2025 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 f = self.open(support.TESTFN, encoding='test_decoder')
2027 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00002028 decoded = f.read()
2029 f.close()
2030
2031 for i in range(min_pos, len(decoded) + 1): # seek positions
2032 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00002033 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00002034 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002035 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002036 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002037 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002038 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002039 f.close()
2040
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002041 # Enable the test decoder.
2042 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002043
2044 # Run the tests.
2045 try:
2046 # Try each test case.
2047 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002048 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002049
2050 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2052 offset = CHUNK_SIZE - len(input)//2
2053 prefix = b'.'*offset
2054 # Don't bother seeking into the prefix (takes too long).
2055 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002056 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002057
2058 # Ensure our test decoder won't interfere with subsequent tests.
2059 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002060 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002061
Antoine Pitrou19690592009-06-12 20:14:08 +00002062 def test_encoded_writes(self):
2063 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002064 tests = ("utf-16",
2065 "utf-16-le",
2066 "utf-16-be",
2067 "utf-32",
2068 "utf-32-le",
2069 "utf-32-be")
2070 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002071 buf = self.BytesIO()
2072 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073 # Check if the BOM is written only once (see issue1753).
2074 f.write(data)
2075 f.write(data)
2076 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002077 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002078 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002079 self.assertEqual(f.read(), data * 2)
2080 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002081
Antoine Pitrou19690592009-06-12 20:14:08 +00002082 def test_unreadable(self):
2083 class UnReadable(self.BytesIO):
2084 def readable(self):
2085 return False
2086 txt = self.TextIOWrapper(UnReadable())
2087 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002088
Antoine Pitrou19690592009-06-12 20:14:08 +00002089 def test_read_one_by_one(self):
2090 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002091 reads = ""
2092 while True:
2093 c = txt.read(1)
2094 if not c:
2095 break
2096 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002097 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002098
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002099 def test_readlines(self):
2100 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2101 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2102 txt.seek(0)
2103 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2104 txt.seek(0)
2105 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2106
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002108 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002109 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002110 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002111 reads = ""
2112 while True:
2113 c = txt.read(128)
2114 if not c:
2115 break
2116 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002117 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002118
2119 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002120 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002121
2122 # read one char at a time
2123 reads = ""
2124 while True:
2125 c = txt.read(1)
2126 if not c:
2127 break
2128 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002129 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002130
2131 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002132 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002133 txt._CHUNK_SIZE = 4
2134
2135 reads = ""
2136 while True:
2137 c = txt.read(4)
2138 if not c:
2139 break
2140 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002141 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002142
2143 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002144 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002145 txt._CHUNK_SIZE = 4
2146
2147 reads = txt.read(4)
2148 reads += txt.read(4)
2149 reads += txt.readline()
2150 reads += txt.readline()
2151 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002152 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002153
2154 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156 txt._CHUNK_SIZE = 4
2157
2158 reads = txt.read(4)
2159 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002160 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002161
2162 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002163 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002164 txt._CHUNK_SIZE = 4
2165
2166 reads = txt.read(4)
2167 pos = txt.tell()
2168 txt.seek(0)
2169 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002170 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002171
2172 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002173 buffer = self.BytesIO(self.testdata)
2174 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002175
2176 self.assertEqual(buffer.seekable(), txt.seekable())
2177
Antoine Pitrou19690592009-06-12 20:14:08 +00002178 def test_append_bom(self):
2179 # The BOM is not written again when appending to a non-empty file
2180 filename = support.TESTFN
2181 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2182 with self.open(filename, 'w', encoding=charset) as f:
2183 f.write('aaa')
2184 pos = f.tell()
2185 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002186 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002187
2188 with self.open(filename, 'a', encoding=charset) as f:
2189 f.write('xxx')
2190 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002191 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002192
Antoine Pitrou19690592009-06-12 20:14:08 +00002193 def test_seek_bom(self):
2194 # Same test, but when seeking manually
2195 filename = support.TESTFN
2196 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2197 with self.open(filename, 'w', encoding=charset) as f:
2198 f.write('aaa')
2199 pos = f.tell()
2200 with self.open(filename, 'r+', encoding=charset) as f:
2201 f.seek(pos)
2202 f.write('zzz')
2203 f.seek(0)
2204 f.write('bbb')
2205 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002206 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002207
2208 def test_errors_property(self):
2209 with self.open(support.TESTFN, "w") as f:
2210 self.assertEqual(f.errors, "strict")
2211 with self.open(support.TESTFN, "w", errors="replace") as f:
2212 self.assertEqual(f.errors, "replace")
2213
Victor Stinner6a102812010-04-27 23:55:59 +00002214 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002215 def test_threads_write(self):
2216 # Issue6750: concurrent writes could duplicate data
2217 event = threading.Event()
2218 with self.open(support.TESTFN, "w", buffering=1) as f:
2219 def run(n):
2220 text = "Thread%03d\n" % n
2221 event.wait()
2222 f.write(text)
2223 threads = [threading.Thread(target=lambda n=x: run(n))
2224 for x in range(20)]
2225 for t in threads:
2226 t.start()
2227 time.sleep(0.02)
2228 event.set()
2229 for t in threads:
2230 t.join()
2231 with self.open(support.TESTFN) as f:
2232 content = f.read()
2233 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002234 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002235
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002236 def test_flush_error_on_close(self):
2237 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2238 def bad_flush():
2239 raise IOError()
2240 txt.flush = bad_flush
2241 self.assertRaises(IOError, txt.close) # exception not swallowed
2242
2243 def test_multi_close(self):
2244 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2245 txt.close()
2246 txt.close()
2247 txt.close()
2248 self.assertRaises(ValueError, txt.flush)
2249
Antoine Pitroufc9ead62010-12-21 21:26:55 +00002250 def test_readonly_attributes(self):
2251 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2252 buf = self.BytesIO(self.testdata)
2253 with self.assertRaises((AttributeError, TypeError)):
2254 txt.buffer = buf
2255
Antoine Pitrou19690592009-06-12 20:14:08 +00002256class CTextIOWrapperTest(TextIOWrapperTest):
2257
2258 def test_initialization(self):
2259 r = self.BytesIO(b"\xc3\xa9\n\n")
2260 b = self.BufferedReader(r, 1000)
2261 t = self.TextIOWrapper(b)
2262 self.assertRaises(TypeError, t.__init__, b, newline=42)
2263 self.assertRaises(ValueError, t.read)
2264 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2265 self.assertRaises(ValueError, t.read)
2266
2267 def test_garbage_collection(self):
2268 # C TextIOWrapper objects are collected, and collecting them flushes
2269 # all data to disk.
2270 # The Python version has __del__, so it ends in gc.garbage instead.
2271 rawio = io.FileIO(support.TESTFN, "wb")
2272 b = self.BufferedWriter(rawio)
2273 t = self.TextIOWrapper(b, encoding="ascii")
2274 t.write("456def")
2275 t.x = t
2276 wr = weakref.ref(t)
2277 del t
2278 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002279 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002280 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002281 self.assertEqual(f.read(), b"456def")
2282
2283class PyTextIOWrapperTest(TextIOWrapperTest):
2284 pass
2285
2286
2287class IncrementalNewlineDecoderTest(unittest.TestCase):
2288
2289 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002290 # UTF-8 specific tests for a newline decoder
2291 def _check_decode(b, s, **kwargs):
2292 # We exercise getstate() / setstate() as well as decode()
2293 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002294 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002295 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002296 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002297
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002298 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002299
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002300 _check_decode(b'\xe8', "")
2301 _check_decode(b'\xa2', "")
2302 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002303
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002304 _check_decode(b'\xe8', "")
2305 _check_decode(b'\xa2', "")
2306 _check_decode(b'\x88', "\u8888")
2307
2308 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002309 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2310
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002311 decoder.reset()
2312 _check_decode(b'\n', "\n")
2313 _check_decode(b'\r', "")
2314 _check_decode(b'', "\n", final=True)
2315 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002316
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002317 _check_decode(b'\r', "")
2318 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002320 _check_decode(b'\r\r\n', "\n\n")
2321 _check_decode(b'\r', "")
2322 _check_decode(b'\r', "\n")
2323 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002324
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002325 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2326 _check_decode(b'\xe8\xa2\x88', "\u8888")
2327 _check_decode(b'\n', "\n")
2328 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2329 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002330
Antoine Pitrou19690592009-06-12 20:14:08 +00002331 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002332 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002333 if encoding is not None:
2334 encoder = codecs.getincrementalencoder(encoding)()
2335 def _decode_bytewise(s):
2336 # Decode one byte at a time
2337 for b in encoder.encode(s):
2338 result.append(decoder.decode(b))
2339 else:
2340 encoder = None
2341 def _decode_bytewise(s):
2342 # Decode one char at a time
2343 for c in s:
2344 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002345 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002346 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002347 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002348 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002349 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002350 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002351 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002352 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002353 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002354 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002355 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002356 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002357 input = "abc"
2358 if encoder is not None:
2359 encoder.reset()
2360 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002361 self.assertEqual(decoder.decode(input), "abc")
2362 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002363
2364 def test_newline_decoder(self):
2365 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002366 # None meaning the IncrementalNewlineDecoder takes unicode input
2367 # rather than bytes input
2368 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002369 'utf-16', 'utf-16-le', 'utf-16-be',
2370 'utf-32', 'utf-32-le', 'utf-32-be',
2371 )
2372 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002373 decoder = enc and codecs.getincrementaldecoder(enc)()
2374 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2375 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002376 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2378 self.check_newline_decoding_utf8(decoder)
2379
2380 def test_newline_bytes(self):
2381 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2382 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002383 self.assertEqual(dec.newlines, None)
2384 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2385 self.assertEqual(dec.newlines, None)
2386 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2387 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002388 dec = self.IncrementalNewlineDecoder(None, translate=False)
2389 _check(dec)
2390 dec = self.IncrementalNewlineDecoder(None, translate=True)
2391 _check(dec)
2392
2393class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2394 pass
2395
2396class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2397 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002398
Christian Heimes1a6387e2008-03-26 12:49:49 +00002399
2400# XXX Tests for open()
2401
2402class MiscIOTest(unittest.TestCase):
2403
Benjamin Petersonad100c32008-11-20 22:06:22 +00002404 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002406
Antoine Pitrou19690592009-06-12 20:14:08 +00002407 def test___all__(self):
2408 for name in self.io.__all__:
2409 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002410 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002411 if name == "open":
2412 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002413 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002414 self.assertTrue(issubclass(obj, Exception), name)
2415 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002416 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002417
Benjamin Petersonad100c32008-11-20 22:06:22 +00002418 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002419 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002420 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002421 f.close()
2422
Antoine Pitrou19690592009-06-12 20:14:08 +00002423 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002424 self.assertEqual(f.name, support.TESTFN)
2425 self.assertEqual(f.buffer.name, support.TESTFN)
2426 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2427 self.assertEqual(f.mode, "U")
2428 self.assertEqual(f.buffer.mode, "rb")
2429 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002430 f.close()
2431
Antoine Pitrou19690592009-06-12 20:14:08 +00002432 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002433 self.assertEqual(f.mode, "w+")
2434 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2435 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002436
Antoine Pitrou19690592009-06-12 20:14:08 +00002437 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002438 self.assertEqual(g.mode, "wb")
2439 self.assertEqual(g.raw.mode, "wb")
2440 self.assertEqual(g.name, f.fileno())
2441 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002442 f.close()
2443 g.close()
2444
Antoine Pitrou19690592009-06-12 20:14:08 +00002445 def test_io_after_close(self):
2446 for kwargs in [
2447 {"mode": "w"},
2448 {"mode": "wb"},
2449 {"mode": "w", "buffering": 1},
2450 {"mode": "w", "buffering": 2},
2451 {"mode": "wb", "buffering": 0},
2452 {"mode": "r"},
2453 {"mode": "rb"},
2454 {"mode": "r", "buffering": 1},
2455 {"mode": "r", "buffering": 2},
2456 {"mode": "rb", "buffering": 0},
2457 {"mode": "w+"},
2458 {"mode": "w+b"},
2459 {"mode": "w+", "buffering": 1},
2460 {"mode": "w+", "buffering": 2},
2461 {"mode": "w+b", "buffering": 0},
2462 ]:
2463 f = self.open(support.TESTFN, **kwargs)
2464 f.close()
2465 self.assertRaises(ValueError, f.flush)
2466 self.assertRaises(ValueError, f.fileno)
2467 self.assertRaises(ValueError, f.isatty)
2468 self.assertRaises(ValueError, f.__iter__)
2469 if hasattr(f, "peek"):
2470 self.assertRaises(ValueError, f.peek, 1)
2471 self.assertRaises(ValueError, f.read)
2472 if hasattr(f, "read1"):
2473 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinner5100a402011-05-25 22:15:36 +02002474 if hasattr(f, "readall"):
2475 self.assertRaises(ValueError, f.readall)
Antoine Pitrou19690592009-06-12 20:14:08 +00002476 if hasattr(f, "readinto"):
2477 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2478 self.assertRaises(ValueError, f.readline)
2479 self.assertRaises(ValueError, f.readlines)
2480 self.assertRaises(ValueError, f.seek, 0)
2481 self.assertRaises(ValueError, f.tell)
2482 self.assertRaises(ValueError, f.truncate)
2483 self.assertRaises(ValueError, f.write,
2484 b"" if "b" in kwargs['mode'] else "")
2485 self.assertRaises(ValueError, f.writelines, [])
2486 self.assertRaises(ValueError, next, f)
2487
2488 def test_blockingioerror(self):
2489 # Various BlockingIOError issues
2490 self.assertRaises(TypeError, self.BlockingIOError)
2491 self.assertRaises(TypeError, self.BlockingIOError, 1)
2492 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2493 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2494 b = self.BlockingIOError(1, "")
2495 self.assertEqual(b.characters_written, 0)
2496 class C(unicode):
2497 pass
2498 c = C("")
2499 b = self.BlockingIOError(1, c)
2500 c.b = b
2501 b.c = c
2502 wr = weakref.ref(c)
2503 del c, b
2504 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002505 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002506
2507 def test_abcs(self):
2508 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002509 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2510 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2511 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2512 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002513
2514 def _check_abc_inheritance(self, abcmodule):
2515 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002516 self.assertIsInstance(f, abcmodule.IOBase)
2517 self.assertIsInstance(f, abcmodule.RawIOBase)
2518 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2519 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002520 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002521 self.assertIsInstance(f, abcmodule.IOBase)
2522 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2523 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2524 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002525 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002526 self.assertIsInstance(f, abcmodule.IOBase)
2527 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2528 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2529 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002530
2531 def test_abc_inheritance(self):
2532 # Test implementations inherit from their respective ABCs
2533 self._check_abc_inheritance(self)
2534
2535 def test_abc_inheritance_official(self):
2536 # Test implementations inherit from the official ABCs of the
2537 # baseline "io" module.
2538 self._check_abc_inheritance(io)
2539
2540class CMiscIOTest(MiscIOTest):
2541 io = io
2542
2543class PyMiscIOTest(MiscIOTest):
2544 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002545
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002546
2547@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2548class SignalsTest(unittest.TestCase):
2549
2550 def setUp(self):
2551 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2552
2553 def tearDown(self):
2554 signal.signal(signal.SIGALRM, self.oldalrm)
2555
2556 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002557 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002558
2559 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinner49d495f2011-07-04 11:44:46 +02002560 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2561 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002562 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2563 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou6439c002011-02-25 21:35:47 +00002564 invokes the signal handler, and bubbles up the exception raised
2565 in the latter."""
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002566 read_results = []
2567 def _read():
2568 s = os.read(r, 1)
2569 read_results.append(s)
2570 t = threading.Thread(target=_read)
2571 t.daemon = True
2572 r, w = os.pipe()
2573 try:
2574 wio = self.io.open(w, **fdopen_kwargs)
2575 t.start()
2576 signal.alarm(1)
2577 # Fill the pipe enough that the write will be blocking.
2578 # It will be interrupted by the timer armed above. Since the
2579 # other thread has read one byte, the low-level write will
2580 # return with a successful (partial) result rather than an EINTR.
2581 # The buffered IO layer must check for pending signal
2582 # handlers, which in this case will invoke alarm_interrupt().
2583 self.assertRaises(ZeroDivisionError,
2584 wio.write, item * (1024 * 1024))
2585 t.join()
2586 # We got one byte, get another one and check that it isn't a
2587 # repeat of the first one.
2588 read_results.append(os.read(r, 1))
2589 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2590 finally:
2591 os.close(w)
2592 os.close(r)
2593 # This is deliberate. If we didn't close the file descriptor
2594 # before closing wio, wio would try to flush its internal
2595 # buffer, and block again.
2596 try:
2597 wio.close()
2598 except IOError as e:
2599 if e.errno != errno.EBADF:
2600 raise
2601
2602 def test_interrupted_write_unbuffered(self):
2603 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2604
2605 def test_interrupted_write_buffered(self):
2606 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2607
2608 def test_interrupted_write_text(self):
2609 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2610
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002611 def check_reentrant_write(self, data, **fdopen_kwargs):
2612 def on_alarm(*args):
2613 # Will be called reentrantly from the same thread
2614 wio.write(data)
2615 1/0
2616 signal.signal(signal.SIGALRM, on_alarm)
2617 r, w = os.pipe()
2618 wio = self.io.open(w, **fdopen_kwargs)
2619 try:
2620 signal.alarm(1)
2621 # Either the reentrant call to wio.write() fails with RuntimeError,
2622 # or the signal handler raises ZeroDivisionError.
2623 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2624 while 1:
2625 for i in range(100):
2626 wio.write(data)
2627 wio.flush()
2628 # Make sure the buffer doesn't fill up and block further writes
2629 os.read(r, len(data) * 100)
2630 exc = cm.exception
2631 if isinstance(exc, RuntimeError):
2632 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2633 finally:
2634 wio.close()
2635 os.close(r)
2636
2637 def test_reentrant_write_buffered(self):
2638 self.check_reentrant_write(b"xy", mode="wb")
2639
2640 def test_reentrant_write_text(self):
2641 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2642
Antoine Pitrou6439c002011-02-25 21:35:47 +00002643 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2644 """Check that a buffered read, when it gets interrupted (either
2645 returning a partial result or EINTR), properly invokes the signal
2646 handler and retries if the latter returned successfully."""
2647 r, w = os.pipe()
2648 fdopen_kwargs["closefd"] = False
2649 def alarm_handler(sig, frame):
2650 os.write(w, b"bar")
2651 signal.signal(signal.SIGALRM, alarm_handler)
2652 try:
2653 rio = self.io.open(r, **fdopen_kwargs)
2654 os.write(w, b"foo")
2655 signal.alarm(1)
2656 # Expected behaviour:
2657 # - first raw read() returns partial b"foo"
2658 # - second raw read() returns EINTR
2659 # - third raw read() returns b"bar"
2660 self.assertEqual(decode(rio.read(6)), "foobar")
2661 finally:
2662 rio.close()
2663 os.close(w)
2664 os.close(r)
2665
2666 def test_interrupterd_read_retry_buffered(self):
2667 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2668 mode="rb")
2669
2670 def test_interrupterd_read_retry_text(self):
2671 self.check_interrupted_read_retry(lambda x: x,
2672 mode="r")
2673
2674 @unittest.skipUnless(threading, 'Threading required for this test.')
2675 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2676 """Check that a buffered write, when it gets interrupted (either
2677 returning a partial result or EINTR), properly invokes the signal
2678 handler and retries if the latter returned successfully."""
2679 select = support.import_module("select")
2680 # A quantity that exceeds the buffer size of an anonymous pipe's
2681 # write end.
2682 N = 1024 * 1024
2683 r, w = os.pipe()
2684 fdopen_kwargs["closefd"] = False
2685 # We need a separate thread to read from the pipe and allow the
2686 # write() to finish. This thread is started after the SIGALRM is
2687 # received (forcing a first EINTR in write()).
2688 read_results = []
2689 write_finished = False
2690 def _read():
2691 while not write_finished:
2692 while r in select.select([r], [], [], 1.0)[0]:
2693 s = os.read(r, 1024)
2694 read_results.append(s)
2695 t = threading.Thread(target=_read)
2696 t.daemon = True
2697 def alarm1(sig, frame):
2698 signal.signal(signal.SIGALRM, alarm2)
2699 signal.alarm(1)
2700 def alarm2(sig, frame):
2701 t.start()
2702 signal.signal(signal.SIGALRM, alarm1)
2703 try:
2704 wio = self.io.open(w, **fdopen_kwargs)
2705 signal.alarm(1)
2706 # Expected behaviour:
2707 # - first raw write() is partial (because of the limited pipe buffer
2708 # and the first alarm)
2709 # - second raw write() returns EINTR (because of the second alarm)
2710 # - subsequent write()s are successful (either partial or complete)
2711 self.assertEqual(N, wio.write(item * N))
2712 wio.flush()
2713 write_finished = True
2714 t.join()
2715 self.assertEqual(N, sum(len(x) for x in read_results))
2716 finally:
2717 write_finished = True
2718 os.close(w)
2719 os.close(r)
2720 # This is deliberate. If we didn't close the file descriptor
2721 # before closing wio, wio would try to flush its internal
2722 # buffer, and could block (in case of failure).
2723 try:
2724 wio.close()
2725 except IOError as e:
2726 if e.errno != errno.EBADF:
2727 raise
2728
2729 def test_interrupterd_write_retry_buffered(self):
2730 self.check_interrupted_write_retry(b"x", mode="wb")
2731
2732 def test_interrupterd_write_retry_text(self):
2733 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2734
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002735
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002736class CSignalsTest(SignalsTest):
2737 io = io
2738
2739class PySignalsTest(SignalsTest):
2740 io = pyio
2741
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002742 # Handling reentrancy issues would slow down _pyio even more, so the
2743 # tests are disabled.
2744 test_reentrant_write_buffered = None
2745 test_reentrant_write_text = None
2746
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002747
Christian Heimes1a6387e2008-03-26 12:49:49 +00002748def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002749 tests = (CIOTest, PyIOTest,
2750 CBufferedReaderTest, PyBufferedReaderTest,
2751 CBufferedWriterTest, PyBufferedWriterTest,
2752 CBufferedRWPairTest, PyBufferedRWPairTest,
2753 CBufferedRandomTest, PyBufferedRandomTest,
2754 StatefulIncrementalDecoderTest,
2755 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2756 CTextIOWrapperTest, PyTextIOWrapperTest,
2757 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002758 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002759 )
2760
2761 # Put the namespaces of the IO module we are testing and some useful mock
2762 # classes in the __dict__ of each test.
2763 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002764 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002765 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2766 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2767 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2768 globs = globals()
2769 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2770 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2771 # Avoid turning open into a bound method.
2772 py_io_ns["open"] = pyio.OpenWrapper
2773 for test in tests:
2774 if test.__name__.startswith("C"):
2775 for name, obj in c_io_ns.items():
2776 setattr(test, name, obj)
2777 elif test.__name__.startswith("Py"):
2778 for name, obj in py_io_ns.items():
2779 setattr(test, name, obj)
2780
2781 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002782
2783if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002784 test_main()