Merged revisions 65686 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r65686 | antoine.pitrou | 2008-08-14 23:04:30 +0200 (jeu., 14 août 2008) | 3 lines

  Issue #3476: make BufferedReader and BufferedWriter thread-safe
........
diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py
index 9ccf3f5..d63dfa1 100644
--- a/Lib/test/test_cmd_line.py
+++ b/Lib/test/test_cmd_line.py
@@ -3,11 +3,15 @@
 # See test_cmd_line_script.py for testing of script execution
 
 import test.support, unittest
+import os
 import sys
 import subprocess
 
 def _spawn_python(*args):
-    cmd_line = [sys.executable, '-E']
+    cmd_line = [sys.executable]
+    # When testing -S, we need PYTHONPATH to work (see test_site_flag())
+    if '-S' not in args:
+        cmd_line.append('-E')
     cmd_line.extend(args)
     return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
@@ -59,6 +63,16 @@
         self.verify_valid_flag('-Qwarnall')
 
     def test_site_flag(self):
+        if os.name == 'posix':
+            # Workaround bug #586680 by adding the extension dir to PYTHONPATH
+            from distutils.util import get_platform
+            s = "./build/lib.%s-%.3s" % (get_platform(), sys.version)
+            if hasattr(sys, 'gettotalrefcount'):
+                s += '-pydebug'
+            p = os.environ.get('PYTHONPATH', '')
+            if p:
+                p += ':'
+            os.environ['PYTHONPATH'] = p + s
         self.verify_valid_flag('-S')
 
     def test_usage(self):
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 320c7c3..bcb37d7 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -4,8 +4,10 @@
 import sys
 import time
 import array
+import threading
+import random
 import unittest
-from itertools import chain
+from itertools import chain, cycle
 from test import support
 
 import codecs
@@ -390,6 +392,49 @@
         # this test. Else, write it.
         pass
 
+    def testThreads(self):
+        try:
+            # Write out many bytes with exactly the same number of 0's,
+            # 1's... 255's. This will help us check that concurrent reading
+            # doesn't duplicate or forget contents.
+            N = 1000
+            l = list(range(256)) * N
+            random.shuffle(l)
+            s = bytes(bytearray(l))
+            with io.open(support.TESTFN, "wb") as f:
+                f.write(s)
+            with io.open(support.TESTFN, "rb", buffering=0) as raw:
+                bufio = io.BufferedReader(raw, 8)
+                errors = []
+                results = []
+                def f():
+                    try:
+                        # Intra-buffer read then buffer-flushing read
+                        for n in cycle([1, 19]):
+                            s = bufio.read(n)
+                            if not s:
+                                break
+                            # list.append() is atomic
+                            results.append(s)
+                    except Exception as e:
+                        errors.append(e)
+                        raise
+                threads = [threading.Thread(target=f) for x in range(20)]
+                for t in threads:
+                    t.start()
+                time.sleep(0.02) # yield
+                for t in threads:
+                    t.join()
+                self.assertFalse(errors,
+                    "the following exceptions were caught: %r" % errors)
+                s = b''.join(results)
+                for i in range(256):
+                    c = bytes(bytearray([i]))
+                    self.assertEqual(s.count(c), N)
+        finally:
+            support.unlink(support.TESTFN)
+
+
 
 class BufferedWriterTest(unittest.TestCase):
 
@@ -446,6 +491,38 @@
 
         self.assertEquals(b"abc", writer._write_stack[0])
 
+    def testThreads(self):
+        # BufferedWriter should not raise exceptions or crash
+        # when called from multiple threads.
+        try:
+            # We use a real file object because it allows us to
+            # exercise situations where the GIL is released before
+            # writing the buffer to the raw streams. This is in addition
+            # to concurrency issues due to switching threads in the middle
+            # of Python code.
+            with io.open(support.TESTFN, "wb", buffering=0) as raw:
+                bufio = io.BufferedWriter(raw, 8)
+                errors = []
+                def f():
+                    try:
+                        # Write enough bytes to flush the buffer
+                        s = b"a" * 19
+                        for i in range(50):
+                            bufio.write(s)
+                    except Exception as e:
+                        errors.append(e)
+                        raise
+                threads = [threading.Thread(target=f) for x in range(20)]
+                for t in threads:
+                    t.start()
+                time.sleep(0.02) # yield
+                for t in threads:
+                    t.join()
+                self.assertFalse(errors,
+                    "the following exceptions were caught: %r" % errors)
+        finally:
+            support.unlink(support.TESTFN)
+
 
 class BufferedRWPairTest(unittest.TestCase):