blob: ebfd9e77a3aa299a5dddac2c10294aa13392413d [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
5import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00006from array import array
7from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +00008try:
9 import threading
10except ImportError:
11 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000012
Antoine Pitrou47a5f482009-06-12 20:41:52 +000013from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000014from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015from UserList import UserList
16
17class AutoFileTests(unittest.TestCase):
18 # file tests for which a test file is automatically set up
19
20 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000021 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000022
23 def tearDown(self):
24 if self.f:
25 self.f.close()
26 os.remove(TESTFN)
27
28 def testWeakRefs(self):
29 # verify weak references
30 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000031 p.write('teststring')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000032 self.assertEquals(self.f.tell(), p.tell())
33 self.f.close()
34 self.f = None
35 self.assertRaises(ReferenceError, getattr, p, 'tell')
36
37 def testAttributes(self):
38 # verify expected attributes exist
39 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000040 with test_support.check_py3k_warnings():
41 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000042 f.name # merely shouldn't blow up
43 f.mode # ditto
44 f.closed # ditto
45
Ezio Melottid80b4bf2010-03-17 13:52:48 +000046 with test_support.check_py3k_warnings():
47 # verify softspace is writable
48 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000049
50 # verify the others aren't
51 for attr in 'name', 'mode', 'closed':
52 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
53
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000054 def testReadinto(self):
55 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000056 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000058 a = array('c', 'x'*10)
59 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 n = self.f.readinto(a)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 self.assertEquals('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000062
63 def testWritelinesUserList(self):
64 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000065 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000066 self.f.writelines(l)
67 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 buf = self.f.read()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000070 self.assertEquals(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000071
72 def testWritelinesIntegers(self):
73 # verify writelines with integers
74 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
75
76 def testWritelinesIntegersUserList(self):
77 # verify writelines with integers in UserList
78 l = UserList([1,2,3])
79 self.assertRaises(TypeError, self.f.writelines, l)
80
81 def testWritelinesNonString(self):
82 # verify writelines with non-string object
83 class NonString:
84 pass
85
86 self.assertRaises(TypeError, self.f.writelines,
87 [NonString(), NonString()])
88
Antoine Pitrou47a5f482009-06-12 20:41:52 +000089 def testRepr(self):
90 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +000091 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +000092
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000093 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +000094 self.f.close()
95 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000096 f = self.f
97 self.assertEquals(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +000098 self.assertTrue(not f.isatty())
99 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000100
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000101 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000102 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000103 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000104
105 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000106 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
107 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000108 'write', '__iter__']
109 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000110 if sys.platform.startswith('atheos'):
111 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000112
113 # __exit__ should close the file
114 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000115 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000116
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000117 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000118 method = getattr(self.f, methodname)
119 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000120 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000121 with test_support.check_py3k_warnings():
122 for methodname in deprecated_methods:
123 method = getattr(self.f, methodname)
124 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000125 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000126
127 # file is closed, __exit__ shouldn't do anything
128 self.assertEquals(self.f.__exit__(None, None, None), None)
129 # it must also return None if an exception was given
130 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000131 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000132 except:
133 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
134
135 def testReadWhenWriting(self):
136 self.assertRaises(IOError, self.f.read)
137
Antoine Pitroubb445a12010-02-05 17:05:54 +0000138 def testIssue5677(self):
139 # Remark: Do not perform more than one test per open file,
140 # since that does NOT catch the readline error on Windows.
141 data = 'xxx'
142 for mode in ['w', 'wb', 'a', 'ab']:
143 for attr in ['read', 'readline', 'readlines']:
144 self.f = open(TESTFN, mode)
145 self.f.write(data)
146 self.assertRaises(IOError, getattr(self.f, attr))
147 self.f.close()
148
149 self.f = open(TESTFN, mode)
150 self.f.write(data)
151 self.assertRaises(IOError, lambda: [line for line in self.f])
152 self.f.close()
153
154 self.f = open(TESTFN, mode)
155 self.f.write(data)
156 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
157 self.f.close()
158
159 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
160 self.f = open(TESTFN, mode)
161 self.assertRaises(IOError, self.f.write, data)
162 self.f.close()
163
164 self.f = open(TESTFN, mode)
165 self.assertRaises(IOError, self.f.writelines, [data, data])
166 self.f.close()
167
168 self.f = open(TESTFN, mode)
169 self.assertRaises(IOError, self.f.truncate)
170 self.f.close()
171
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000172class OtherFileTests(unittest.TestCase):
173
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000174 def testOpenDir(self):
175 this_dir = os.path.dirname(__file__)
176 for mode in (None, "w"):
177 try:
178 if mode:
179 f = open(this_dir, mode)
180 else:
181 f = open(this_dir)
182 except IOError as e:
183 self.assertEqual(e.filename, this_dir)
184 else:
185 self.fail("opening a directory didn't raise an IOError")
186
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000187 def testModeStrings(self):
188 # check invalid mode strings
189 for mode in ("", "aU", "wU+"):
190 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000191 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000192 except ValueError:
193 pass
194 else:
195 f.close()
196 self.fail('%r is an invalid file mode' % mode)
197
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000198 # Some invalid modes fail on Windows, but pass on Unix
199 # Issue3965: avoid a crash on Windows when filename is unicode
200 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
201 try:
202 f = open(name, "rr")
203 except (IOError, ValueError):
204 pass
205 else:
206 f.close()
207
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000208 def testStdin(self):
209 # This causes the interpreter to exit on OSF1 v5.1.
210 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000211 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000212 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000213 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000214 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000215 ' Test manually.')
216 self.assertRaises(IOError, sys.stdin.truncate)
217
218 def testUnicodeOpen(self):
219 # verify repr works for unicode too
220 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000221 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000222 f.close()
223 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000224
225 def testBadModeArgument(self):
226 # verify that we get a sensible error message for bad mode argument
227 bad_mode = "qwerty"
228 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000229 f = open(TESTFN, bad_mode)
230 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000231 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000232 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000233 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000234 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000235 # if msg.args[0] == 0, we're probably on Windows where there may
236 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000237 else:
238 f.close()
239 self.fail("no error for invalid mode: %s" % bad_mode)
240
241 def testSetBufferSize(self):
242 # make sure that explicitly setting the buffer size doesn't cause
243 # misbehaviour especially with repeated close() calls
244 for s in (-1, 0, 1, 512):
245 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000246 f = open(TESTFN, 'w', s)
247 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000248 f.close()
249 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000250 f = open(TESTFN, 'r', s)
251 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000252 f.close()
253 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000254 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000255 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
256 self.assertEquals(d, s)
257
258 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000259 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000260
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000261 def bug801631():
262 # SF bug <http://www.python.org/sf/801631>
263 # "file.truncate fault on windows"
264 f = open(TESTFN, 'wb')
265 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000266 f.close()
267
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000268 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000269 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000270 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000271 self.fail("Read on file opened for update failed %r" % data)
272 if f.tell() != 5:
273 self.fail("File pos after read wrong %d" % f.tell())
274
275 f.truncate()
276 if f.tell() != 5:
277 self.fail("File pos after ftruncate wrong %d" % f.tell())
278
279 f.close()
280 size = os.path.getsize(TESTFN)
281 if size != 5:
282 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000283
284 try:
285 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000286 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000287 os.unlink(TESTFN)
288
289 def testIteration(self):
290 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000291 # various read* methods. Ostensibly, the mixture could just be tested
292 # to work when it should work according to the Python language,
293 # instead of fail when it should fail according to the current CPython
294 # implementation. People don't always program Python the way they
295 # should, though, and the implemenation might change in subtle ways,
296 # so we explicitly test for errors, too; the test will just have to
297 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000298 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000299 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000300 assert not dataoffset % len(filler), \
301 "dataoffset must be multiple of len(filler)"
302 nchunks = dataoffset // len(filler)
303 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000304 "spam, spam and eggs\n",
305 "eggs, spam, ham and spam\n",
306 "saussages, spam, spam and eggs\n",
307 "spam, ham, spam and eggs\n",
308 "spam, spam, spam, spam, spam, ham, spam\n",
309 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000310 ]
311 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000312 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000313
314 try:
315 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000316 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000317 bag.write(filler * nchunks)
318 bag.writelines(testlines)
319 bag.close()
320 # Test for appropriate errors mixing read* and iteration
321 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000322 f = open(TESTFN)
323 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000324 self.fail, "Broken testfile"
325 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000326 try:
327 meth(*args)
328 except ValueError:
329 pass
330 else:
331 self.fail("%s%r after next() didn't raise ValueError" %
332 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000333 f.close()
334
335 # Test to see if harmless (by accident) mixing of read* and
336 # iteration still works. This depends on the size of the internal
337 # iteration buffer (currently 8192,) but we can test it in a
338 # flexible manner. Each line in the bag o' ham is 4 bytes
339 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
340 # exactly on the buffer boundary for any power-of-2 buffersize
341 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000342 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000343 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000344 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000345 testline = testlines.pop(0)
346 try:
347 line = f.readline()
348 except ValueError:
349 self.fail("readline() after next() with supposedly empty "
350 "iteration-buffer failed anyway")
351 if line != testline:
352 self.fail("readline() after next() with empty buffer "
353 "failed. Got %r, expected %r" % (line, testline))
354 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000355 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000356 try:
357 f.readinto(buf)
358 except ValueError:
359 self.fail("readinto() after next() with supposedly empty "
360 "iteration-buffer failed anyway")
361 line = buf.tostring()
362 if line != testline:
363 self.fail("readinto() after next() with empty buffer "
364 "failed. Got %r, expected %r" % (line, testline))
365
366 testline = testlines.pop(0)
367 try:
368 line = f.read(len(testline))
369 except ValueError:
370 self.fail("read() after next() with supposedly empty "
371 "iteration-buffer failed anyway")
372 if line != testline:
373 self.fail("read() after next() with empty buffer "
374 "failed. Got %r, expected %r" % (line, testline))
375 try:
376 lines = f.readlines()
377 except ValueError:
378 self.fail("readlines() after next() with supposedly empty "
379 "iteration-buffer failed anyway")
380 if lines != testlines:
381 self.fail("readlines() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line, testline))
383 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000384 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000385 try:
386 for line in f:
387 pass
388 try:
389 f.readline()
390 f.readinto(buf)
391 f.read()
392 f.readlines()
393 except ValueError:
394 self.fail("read* failed after next() consumed file")
395 finally:
396 f.close()
397 finally:
398 os.unlink(TESTFN)
399
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000400class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000401
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000402 def testExit(self):
403 # test that exiting with context calls subclass' close
404 class C(file):
405 def __init__(self, *args):
406 self.subclass_closed = False
407 file.__init__(self, *args)
408 def close(self):
409 self.subclass_closed = True
410 file.close(self)
411
412 with C(TESTFN, 'w') as f:
413 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000414 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000415
416
Victor Stinner6a102812010-04-27 23:55:59 +0000417@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000418class FileThreadingTests(unittest.TestCase):
419 # These tests check the ability to call various methods of file objects
420 # (including close()) concurrently without crashing the Python interpreter.
421 # See #815646, #595601
422
423 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000424 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000425 self.f = None
426 self.filename = TESTFN
427 with open(self.filename, "w") as f:
428 f.write("\n".join("0123456789"))
429 self._count_lock = threading.Lock()
430 self.close_count = 0
431 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000432 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000433
434 def tearDown(self):
435 if self.f:
436 try:
437 self.f.close()
438 except (EnvironmentError, ValueError):
439 pass
440 try:
441 os.remove(self.filename)
442 except EnvironmentError:
443 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000444 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000445
446 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000447 if self.use_buffering:
448 self.f = open(self.filename, "w+", buffering=1024*16)
449 else:
450 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000451
452 def _close_file(self):
453 with self._count_lock:
454 self.close_count += 1
455 self.f.close()
456 with self._count_lock:
457 self.close_success_count += 1
458
459 def _close_and_reopen_file(self):
460 self._close_file()
461 # if close raises an exception thats fine, self.f remains valid so
462 # we don't need to reopen.
463 self._create_file()
464
465 def _run_workers(self, func, nb_workers, duration=0.2):
466 with self._count_lock:
467 self.close_count = 0
468 self.close_success_count = 0
469 self.do_continue = True
470 threads = []
471 try:
472 for i in range(nb_workers):
473 t = threading.Thread(target=func)
474 t.start()
475 threads.append(t)
476 for _ in xrange(100):
477 time.sleep(duration/100)
478 with self._count_lock:
479 if self.close_count-self.close_success_count > nb_workers+1:
480 if test_support.verbose:
481 print 'Q',
482 break
483 time.sleep(duration)
484 finally:
485 self.do_continue = False
486 for t in threads:
487 t.join()
488
489 def _test_close_open_io(self, io_func, nb_workers=5):
490 def worker():
491 self._create_file()
492 funcs = itertools.cycle((
493 lambda: io_func(),
494 lambda: self._close_and_reopen_file(),
495 ))
496 for f in funcs:
497 if not self.do_continue:
498 break
499 try:
500 f()
501 except (IOError, ValueError):
502 pass
503 self._run_workers(worker, nb_workers)
504 if test_support.verbose:
505 # Useful verbose statistics when tuning this test to take
506 # less time to run but still ensuring that its still useful.
507 #
508 # the percent of close calls that raised an error
509 percent = 100. - 100.*self.close_success_count/self.close_count
510 print self.close_count, ('%.4f ' % percent),
511
512 def test_close_open(self):
513 def io_func():
514 pass
515 self._test_close_open_io(io_func)
516
517 def test_close_open_flush(self):
518 def io_func():
519 self.f.flush()
520 self._test_close_open_io(io_func)
521
522 def test_close_open_iter(self):
523 def io_func():
524 list(iter(self.f))
525 self._test_close_open_io(io_func)
526
527 def test_close_open_isatty(self):
528 def io_func():
529 self.f.isatty()
530 self._test_close_open_io(io_func)
531
532 def test_close_open_print(self):
533 def io_func():
534 print >> self.f, ''
535 self._test_close_open_io(io_func)
536
Antoine Pitrou83137c22010-05-17 19:56:59 +0000537 def test_close_open_print_buffered(self):
538 self.use_buffering = True
539 def io_func():
540 print >> self.f, ''
541 self._test_close_open_io(io_func)
542
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000543 def test_close_open_read(self):
544 def io_func():
545 self.f.read(0)
546 self._test_close_open_io(io_func)
547
548 def test_close_open_readinto(self):
549 def io_func():
550 a = array('c', 'xxxxx')
551 self.f.readinto(a)
552 self._test_close_open_io(io_func)
553
554 def test_close_open_readline(self):
555 def io_func():
556 self.f.readline()
557 self._test_close_open_io(io_func)
558
559 def test_close_open_readlines(self):
560 def io_func():
561 self.f.readlines()
562 self._test_close_open_io(io_func)
563
564 def test_close_open_seek(self):
565 def io_func():
566 self.f.seek(0, 0)
567 self._test_close_open_io(io_func)
568
569 def test_close_open_tell(self):
570 def io_func():
571 self.f.tell()
572 self._test_close_open_io(io_func)
573
574 def test_close_open_truncate(self):
575 def io_func():
576 self.f.truncate()
577 self._test_close_open_io(io_func)
578
579 def test_close_open_write(self):
580 def io_func():
581 self.f.write('')
582 self._test_close_open_io(io_func)
583
584 def test_close_open_writelines(self):
585 def io_func():
586 self.f.writelines('')
587 self._test_close_open_io(io_func)
588
589
590class StdoutTests(unittest.TestCase):
591
592 def test_move_stdout_on_write(self):
593 # Issue 3242: sys.stdout can be replaced (and freed) during a
594 # print statement; prevent a segfault in this case
595 save_stdout = sys.stdout
596
597 class File:
598 def write(self, data):
599 if '\n' in data:
600 sys.stdout = save_stdout
601
602 try:
603 sys.stdout = File()
604 print "some text"
605 finally:
606 sys.stdout = save_stdout
607
608 def test_del_stdout_before_print(self):
609 # Issue 4597: 'print' with no argument wasn't reporting when
610 # sys.stdout was deleted.
611 save_stdout = sys.stdout
612 del sys.stdout
613 try:
614 print
615 except RuntimeError as e:
616 self.assertEquals(str(e), "lost sys.stdout")
617 else:
618 self.fail("Expected RuntimeError")
619 finally:
620 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000621
Victor Stinnercaafd772010-09-08 10:51:01 +0000622 def test_unicode(self):
623 import subprocess
624
625 def get_message(encoding, *code):
626 code = '\n'.join(code)
627 env = os.environ.copy()
628 env['PYTHONIOENCODING'] = encoding
629 process = subprocess.Popen([sys.executable, "-c", code],
630 stdout=subprocess.PIPE, env=env)
631 stdout, stderr = process.communicate()
632 self.assertEqual(process.returncode, 0)
633 return stdout
634
635 def check_message(text, encoding, expected):
636 stdout = get_message(encoding,
637 "import sys",
638 "sys.stdout.write(%r)" % text,
639 "sys.stdout.flush()")
640 self.assertEqual(stdout, expected)
641
Victor Stinner3a68f912010-09-08 11:45:16 +0000642 # test the encoding
643 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
644 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
645 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000646
Victor Stinner3a68f912010-09-08 11:45:16 +0000647 # test the error handler
648 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
649 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
650 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
651
652 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000653 for objtype in ('buffer', 'bytearray'):
654 stdout = get_message('ascii',
655 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000656 r'sys.stdout.write(%s("\xe9"))' % objtype,
657 'sys.stdout.flush()')
658 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000659
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000660
661def test_main():
662 # Historically, these tests have been sloppy about removing TESTFN.
663 # So get rid of it no matter what.
664 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000665 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
666 FileThreadingTests, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000667 finally:
668 if os.path.exists(TESTFN):
669 os.unlink(TESTFN)
670
671if __name__ == '__main__':
672 test_main()