blob: 0c892bd2baf8fb66d5401a7f2c719ed43615a0ee [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
5import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00006from array import array
7from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +00008try:
9 import threading
10except ImportError:
11 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000012
Antoine Pitrou47a5f482009-06-12 20:41:52 +000013from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000014from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015from UserList import UserList
16
17class AutoFileTests(unittest.TestCase):
18 # file tests for which a test file is automatically set up
19
20 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000021 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000022
23 def tearDown(self):
24 if self.f:
25 self.f.close()
26 os.remove(TESTFN)
27
28 def testWeakRefs(self):
29 # verify weak references
30 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000031 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000032 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000033 self.f.close()
34 self.f = None
35 self.assertRaises(ReferenceError, getattr, p, 'tell')
36
37 def testAttributes(self):
38 # verify expected attributes exist
39 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000040 with test_support.check_py3k_warnings():
41 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000042 f.name # merely shouldn't blow up
43 f.mode # ditto
44 f.closed # ditto
45
Ezio Melottid80b4bf2010-03-17 13:52:48 +000046 with test_support.check_py3k_warnings():
47 # verify softspace is writable
48 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000049
50 # verify the others aren't
51 for attr in 'name', 'mode', 'closed':
52 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
53
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000054 def testReadinto(self):
55 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000056 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000058 a = array('c', 'x'*10)
59 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000061 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000062
63 def testWritelinesUserList(self):
64 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000065 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000066 self.f.writelines(l)
67 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000070 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000071
72 def testWritelinesIntegers(self):
73 # verify writelines with integers
74 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
75
76 def testWritelinesIntegersUserList(self):
77 # verify writelines with integers in UserList
78 l = UserList([1,2,3])
79 self.assertRaises(TypeError, self.f.writelines, l)
80
81 def testWritelinesNonString(self):
82 # verify writelines with non-string object
83 class NonString:
84 pass
85
86 self.assertRaises(TypeError, self.f.writelines,
87 [NonString(), NonString()])
88
Antoine Pitrou47a5f482009-06-12 20:41:52 +000089 def testRepr(self):
90 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +000091 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +020092 # see issue #14161
93 # Windows doesn't like \r\n\t" in the file name, but ' is ok
94 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
95 with open(fname, 'w') as f:
96 self.addCleanup(os.remove, fname)
97 self.assertTrue(repr(f).startswith(
98 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +000099
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000100 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000101 self.f.close()
102 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000103 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000104 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000105 self.assertTrue(not f.isatty())
106 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000107
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000108 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000109 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000110 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000111
112 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000113 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
114 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000115 'write', '__iter__']
116 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000117 if sys.platform.startswith('atheos'):
118 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000119
120 # __exit__ should close the file
121 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000122 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000123
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000124 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000125 method = getattr(self.f, methodname)
126 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000128 with test_support.check_py3k_warnings():
129 for methodname in deprecated_methods:
130 method = getattr(self.f, methodname)
131 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000132 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000133
134 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000135 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000136 # it must also return None if an exception was given
137 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000138 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000139 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000140 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000141
142 def testReadWhenWriting(self):
143 self.assertRaises(IOError, self.f.read)
144
Benjamin Petersonbf775542010-10-16 19:20:12 +0000145 def testNastyWritelinesGenerator(self):
146 def nasty():
147 for i in range(5):
148 if i == 3:
149 self.f.close()
150 yield str(i)
151 self.assertRaises(ValueError, self.f.writelines, nasty())
152
Antoine Pitroubb445a12010-02-05 17:05:54 +0000153 def testIssue5677(self):
154 # Remark: Do not perform more than one test per open file,
155 # since that does NOT catch the readline error on Windows.
156 data = 'xxx'
157 for mode in ['w', 'wb', 'a', 'ab']:
158 for attr in ['read', 'readline', 'readlines']:
159 self.f = open(TESTFN, mode)
160 self.f.write(data)
161 self.assertRaises(IOError, getattr(self.f, attr))
162 self.f.close()
163
164 self.f = open(TESTFN, mode)
165 self.f.write(data)
166 self.assertRaises(IOError, lambda: [line for line in self.f])
167 self.f.close()
168
169 self.f = open(TESTFN, mode)
170 self.f.write(data)
171 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
172 self.f.close()
173
174 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
175 self.f = open(TESTFN, mode)
176 self.assertRaises(IOError, self.f.write, data)
177 self.f.close()
178
179 self.f = open(TESTFN, mode)
180 self.assertRaises(IOError, self.f.writelines, [data, data])
181 self.f.close()
182
183 self.f = open(TESTFN, mode)
184 self.assertRaises(IOError, self.f.truncate)
185 self.f.close()
186
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000187class OtherFileTests(unittest.TestCase):
188
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000189 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000190 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000191 for mode in (None, "w"):
192 try:
193 if mode:
194 f = open(this_dir, mode)
195 else:
196 f = open(this_dir)
197 except IOError as e:
198 self.assertEqual(e.filename, this_dir)
199 else:
200 self.fail("opening a directory didn't raise an IOError")
201
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000202 def testModeStrings(self):
203 # check invalid mode strings
204 for mode in ("", "aU", "wU+"):
205 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000206 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000207 except ValueError:
208 pass
209 else:
210 f.close()
211 self.fail('%r is an invalid file mode' % mode)
212
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000213 # Some invalid modes fail on Windows, but pass on Unix
214 # Issue3965: avoid a crash on Windows when filename is unicode
215 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
216 try:
217 f = open(name, "rr")
218 except (IOError, ValueError):
219 pass
220 else:
221 f.close()
222
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000223 def testStdin(self):
224 # This causes the interpreter to exit on OSF1 v5.1.
225 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000226 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000227 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000228 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000229 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000230 ' Test manually.')
231 self.assertRaises(IOError, sys.stdin.truncate)
232
233 def testUnicodeOpen(self):
234 # verify repr works for unicode too
235 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000236 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000237 f.close()
238 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000239
240 def testBadModeArgument(self):
241 # verify that we get a sensible error message for bad mode argument
242 bad_mode = "qwerty"
243 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000244 f = open(TESTFN, bad_mode)
245 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000246 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000247 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000248 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000249 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000250 # if msg.args[0] == 0, we're probably on Windows where there may
251 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000252 else:
253 f.close()
254 self.fail("no error for invalid mode: %s" % bad_mode)
255
256 def testSetBufferSize(self):
257 # make sure that explicitly setting the buffer size doesn't cause
258 # misbehaviour especially with repeated close() calls
259 for s in (-1, 0, 1, 512):
260 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000261 f = open(TESTFN, 'w', s)
262 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000263 f.close()
264 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000265 f = open(TESTFN, 'r', s)
266 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000267 f.close()
268 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000269 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000270 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000271 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000272
273 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000274 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000275
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000276 def bug801631():
277 # SF bug <http://www.python.org/sf/801631>
278 # "file.truncate fault on windows"
279 f = open(TESTFN, 'wb')
280 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000281 f.close()
282
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000283 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000285 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000286 self.fail("Read on file opened for update failed %r" % data)
287 if f.tell() != 5:
288 self.fail("File pos after read wrong %d" % f.tell())
289
290 f.truncate()
291 if f.tell() != 5:
292 self.fail("File pos after ftruncate wrong %d" % f.tell())
293
294 f.close()
295 size = os.path.getsize(TESTFN)
296 if size != 5:
297 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000298
299 try:
300 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000301 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000302 os.unlink(TESTFN)
303
304 def testIteration(self):
305 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000306 # various read* methods. Ostensibly, the mixture could just be tested
307 # to work when it should work according to the Python language,
308 # instead of fail when it should fail according to the current CPython
309 # implementation. People don't always program Python the way they
310 # should, though, and the implemenation might change in subtle ways,
311 # so we explicitly test for errors, too; the test will just have to
312 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000313 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000314 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000315 assert not dataoffset % len(filler), \
316 "dataoffset must be multiple of len(filler)"
317 nchunks = dataoffset // len(filler)
318 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000319 "spam, spam and eggs\n",
320 "eggs, spam, ham and spam\n",
321 "saussages, spam, spam and eggs\n",
322 "spam, ham, spam and eggs\n",
323 "spam, spam, spam, spam, spam, ham, spam\n",
324 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000325 ]
326 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000327 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000328
329 try:
330 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000331 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000332 bag.write(filler * nchunks)
333 bag.writelines(testlines)
334 bag.close()
335 # Test for appropriate errors mixing read* and iteration
336 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000337 f = open(TESTFN)
338 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000339 self.fail, "Broken testfile"
340 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000341 try:
342 meth(*args)
343 except ValueError:
344 pass
345 else:
346 self.fail("%s%r after next() didn't raise ValueError" %
347 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000348 f.close()
349
350 # Test to see if harmless (by accident) mixing of read* and
351 # iteration still works. This depends on the size of the internal
352 # iteration buffer (currently 8192,) but we can test it in a
353 # flexible manner. Each line in the bag o' ham is 4 bytes
354 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
355 # exactly on the buffer boundary for any power-of-2 buffersize
356 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000357 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000358 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000359 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000360 testline = testlines.pop(0)
361 try:
362 line = f.readline()
363 except ValueError:
364 self.fail("readline() after next() with supposedly empty "
365 "iteration-buffer failed anyway")
366 if line != testline:
367 self.fail("readline() after next() with empty buffer "
368 "failed. Got %r, expected %r" % (line, testline))
369 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000370 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000371 try:
372 f.readinto(buf)
373 except ValueError:
374 self.fail("readinto() after next() with supposedly empty "
375 "iteration-buffer failed anyway")
376 line = buf.tostring()
377 if line != testline:
378 self.fail("readinto() after next() with empty buffer "
379 "failed. Got %r, expected %r" % (line, testline))
380
381 testline = testlines.pop(0)
382 try:
383 line = f.read(len(testline))
384 except ValueError:
385 self.fail("read() after next() with supposedly empty "
386 "iteration-buffer failed anyway")
387 if line != testline:
388 self.fail("read() after next() with empty buffer "
389 "failed. Got %r, expected %r" % (line, testline))
390 try:
391 lines = f.readlines()
392 except ValueError:
393 self.fail("readlines() after next() with supposedly empty "
394 "iteration-buffer failed anyway")
395 if lines != testlines:
396 self.fail("readlines() after next() with empty buffer "
397 "failed. Got %r, expected %r" % (line, testline))
398 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000399 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000400 try:
401 for line in f:
402 pass
403 try:
404 f.readline()
405 f.readinto(buf)
406 f.read()
407 f.readlines()
408 except ValueError:
409 self.fail("read* failed after next() consumed file")
410 finally:
411 f.close()
412 finally:
413 os.unlink(TESTFN)
414
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000415class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000416
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000417 def testExit(self):
418 # test that exiting with context calls subclass' close
419 class C(file):
420 def __init__(self, *args):
421 self.subclass_closed = False
422 file.__init__(self, *args)
423 def close(self):
424 self.subclass_closed = True
425 file.close(self)
426
427 with C(TESTFN, 'w') as f:
428 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000429 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000430
431
Victor Stinner6a102812010-04-27 23:55:59 +0000432@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000433class FileThreadingTests(unittest.TestCase):
434 # These tests check the ability to call various methods of file objects
435 # (including close()) concurrently without crashing the Python interpreter.
436 # See #815646, #595601
437
438 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000439 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000440 self.f = None
441 self.filename = TESTFN
442 with open(self.filename, "w") as f:
443 f.write("\n".join("0123456789"))
444 self._count_lock = threading.Lock()
445 self.close_count = 0
446 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000447 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000448
449 def tearDown(self):
450 if self.f:
451 try:
452 self.f.close()
453 except (EnvironmentError, ValueError):
454 pass
455 try:
456 os.remove(self.filename)
457 except EnvironmentError:
458 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000459 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000460
461 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000462 if self.use_buffering:
463 self.f = open(self.filename, "w+", buffering=1024*16)
464 else:
465 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000466
467 def _close_file(self):
468 with self._count_lock:
469 self.close_count += 1
470 self.f.close()
471 with self._count_lock:
472 self.close_success_count += 1
473
474 def _close_and_reopen_file(self):
475 self._close_file()
476 # if close raises an exception thats fine, self.f remains valid so
477 # we don't need to reopen.
478 self._create_file()
479
480 def _run_workers(self, func, nb_workers, duration=0.2):
481 with self._count_lock:
482 self.close_count = 0
483 self.close_success_count = 0
484 self.do_continue = True
485 threads = []
486 try:
487 for i in range(nb_workers):
488 t = threading.Thread(target=func)
489 t.start()
490 threads.append(t)
491 for _ in xrange(100):
492 time.sleep(duration/100)
493 with self._count_lock:
494 if self.close_count-self.close_success_count > nb_workers+1:
495 if test_support.verbose:
496 print 'Q',
497 break
498 time.sleep(duration)
499 finally:
500 self.do_continue = False
501 for t in threads:
502 t.join()
503
504 def _test_close_open_io(self, io_func, nb_workers=5):
505 def worker():
506 self._create_file()
507 funcs = itertools.cycle((
508 lambda: io_func(),
509 lambda: self._close_and_reopen_file(),
510 ))
511 for f in funcs:
512 if not self.do_continue:
513 break
514 try:
515 f()
516 except (IOError, ValueError):
517 pass
518 self._run_workers(worker, nb_workers)
519 if test_support.verbose:
520 # Useful verbose statistics when tuning this test to take
521 # less time to run but still ensuring that its still useful.
522 #
523 # the percent of close calls that raised an error
524 percent = 100. - 100.*self.close_success_count/self.close_count
525 print self.close_count, ('%.4f ' % percent),
526
527 def test_close_open(self):
528 def io_func():
529 pass
530 self._test_close_open_io(io_func)
531
532 def test_close_open_flush(self):
533 def io_func():
534 self.f.flush()
535 self._test_close_open_io(io_func)
536
537 def test_close_open_iter(self):
538 def io_func():
539 list(iter(self.f))
540 self._test_close_open_io(io_func)
541
542 def test_close_open_isatty(self):
543 def io_func():
544 self.f.isatty()
545 self._test_close_open_io(io_func)
546
547 def test_close_open_print(self):
548 def io_func():
549 print >> self.f, ''
550 self._test_close_open_io(io_func)
551
Antoine Pitrou83137c22010-05-17 19:56:59 +0000552 def test_close_open_print_buffered(self):
553 self.use_buffering = True
554 def io_func():
555 print >> self.f, ''
556 self._test_close_open_io(io_func)
557
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000558 def test_close_open_read(self):
559 def io_func():
560 self.f.read(0)
561 self._test_close_open_io(io_func)
562
563 def test_close_open_readinto(self):
564 def io_func():
565 a = array('c', 'xxxxx')
566 self.f.readinto(a)
567 self._test_close_open_io(io_func)
568
569 def test_close_open_readline(self):
570 def io_func():
571 self.f.readline()
572 self._test_close_open_io(io_func)
573
574 def test_close_open_readlines(self):
575 def io_func():
576 self.f.readlines()
577 self._test_close_open_io(io_func)
578
579 def test_close_open_seek(self):
580 def io_func():
581 self.f.seek(0, 0)
582 self._test_close_open_io(io_func)
583
584 def test_close_open_tell(self):
585 def io_func():
586 self.f.tell()
587 self._test_close_open_io(io_func)
588
589 def test_close_open_truncate(self):
590 def io_func():
591 self.f.truncate()
592 self._test_close_open_io(io_func)
593
594 def test_close_open_write(self):
595 def io_func():
596 self.f.write('')
597 self._test_close_open_io(io_func)
598
599 def test_close_open_writelines(self):
600 def io_func():
601 self.f.writelines('')
602 self._test_close_open_io(io_func)
603
604
605class StdoutTests(unittest.TestCase):
606
607 def test_move_stdout_on_write(self):
608 # Issue 3242: sys.stdout can be replaced (and freed) during a
609 # print statement; prevent a segfault in this case
610 save_stdout = sys.stdout
611
612 class File:
613 def write(self, data):
614 if '\n' in data:
615 sys.stdout = save_stdout
616
617 try:
618 sys.stdout = File()
619 print "some text"
620 finally:
621 sys.stdout = save_stdout
622
623 def test_del_stdout_before_print(self):
624 # Issue 4597: 'print' with no argument wasn't reporting when
625 # sys.stdout was deleted.
626 save_stdout = sys.stdout
627 del sys.stdout
628 try:
629 print
630 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000631 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000632 else:
633 self.fail("Expected RuntimeError")
634 finally:
635 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000636
Victor Stinnercaafd772010-09-08 10:51:01 +0000637 def test_unicode(self):
638 import subprocess
639
640 def get_message(encoding, *code):
641 code = '\n'.join(code)
642 env = os.environ.copy()
643 env['PYTHONIOENCODING'] = encoding
644 process = subprocess.Popen([sys.executable, "-c", code],
645 stdout=subprocess.PIPE, env=env)
646 stdout, stderr = process.communicate()
647 self.assertEqual(process.returncode, 0)
648 return stdout
649
650 def check_message(text, encoding, expected):
651 stdout = get_message(encoding,
652 "import sys",
653 "sys.stdout.write(%r)" % text,
654 "sys.stdout.flush()")
655 self.assertEqual(stdout, expected)
656
Victor Stinner3a68f912010-09-08 11:45:16 +0000657 # test the encoding
658 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
659 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
660 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000661
Victor Stinner3a68f912010-09-08 11:45:16 +0000662 # test the error handler
663 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
664 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
665 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
666
667 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000668 for objtype in ('buffer', 'bytearray'):
669 stdout = get_message('ascii',
670 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000671 r'sys.stdout.write(%s("\xe9"))' % objtype,
672 'sys.stdout.flush()')
673 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000674
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000675
676def test_main():
677 # Historically, these tests have been sloppy about removing TESTFN.
678 # So get rid of it no matter what.
679 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000680 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
681 FileThreadingTests, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000682 finally:
683 if os.path.exists(TESTFN):
684 os.unlink(TESTFN)
685
686if __name__ == '__main__':
687 test_main()