Issue #16165: Fix sched.scheduler.run() method was block a scheduler for
other threads.
diff --git a/Lib/sched.py b/Lib/sched.py
index 4b1f7ac..ccf8ce9 100644
--- a/Lib/sched.py
+++ b/Lib/sched.py
@@ -128,27 +128,29 @@
"""
# localize variable access to minimize overhead
# and to improve thread safety
- with self._lock:
- q = self._queue
- delayfunc = self.delayfunc
- timefunc = self.timefunc
- pop = heapq.heappop
- while q:
- time, priority, action, argument, kwargs = checked_event = q[0]
+ lock = self._lock
+ q = self._queue
+ delayfunc = self.delayfunc
+ timefunc = self.timefunc
+ pop = heapq.heappop
+ while True:
+ with lock:
+ if not q:
+ break
+ time, priority, action, argument, kwargs = q[0]
now = timefunc()
- if now < time:
- if not blocking:
- return time - now
- delayfunc(time - now)
+ if time > now:
+ delay = True
else:
- event = pop(q)
- # Verify that the event was not removed or altered
- # by another thread after we last looked at q[0].
- if event is checked_event:
- action(*argument, **kwargs)
- delayfunc(0) # Let other threads run
- else:
- heapq.heappush(q, event)
+ delay = False
+ pop(q)
+ if delay:
+ if not blocking:
+ return time - now
+ delayfunc(time - now)
+ else:
+ action(*argument, **kwargs)
+ delayfunc(0) # Let other threads run
@property
def queue(self):
diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py
index 50ada52..d0112e4 100644
--- a/Lib/test/test_sched.py
+++ b/Lib/test/test_sched.py
@@ -4,7 +4,10 @@
import time
import unittest
from test import support
-
+try:
+ import threading
+except ImportError:
+ threading = None
class TestCase(unittest.TestCase):
@@ -26,6 +29,20 @@
scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_enter_concurrent(self):
+ l = []
+ fun = lambda x: l.append(x)
+ scheduler = sched.scheduler(time.time, time.sleep)
+ scheduler.enter(0.03, 1, fun, (0.03,))
+ t = threading.Thread(target=scheduler.run)
+ t.start()
+ for x in [0.05, 0.04, 0.02, 0.01]:
+ z = scheduler.enter(x, 1, fun, (x,))
+ scheduler.run()
+ t.join()
+ self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+
def test_priority(self):
l = []
fun = lambda x: l.append(x)
@@ -50,6 +67,24 @@
scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_cancel_concurrent(self):
+ l = []
+ fun = lambda x: l.append(x)
+ scheduler = sched.scheduler(time.time, time.sleep)
+ now = time.time()
+ event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
+ event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
+ event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
+ event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
+ event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
+ t = threading.Thread(target=scheduler.run)
+ t.start()
+ scheduler.cancel(event1)
+ scheduler.cancel(event5)
+ t.join()
+ self.assertEqual(l, [0.02, 0.03, 0.04])
+
def test_empty(self):
l = []
fun = lambda x: l.append(x)
diff --git a/Misc/NEWS b/Misc/NEWS
index 3317f68..7a654b9 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -124,6 +124,9 @@
Library
-------
+- Issue #16165: Fix sched.scheduler.run() method was block a scheduler for
+ other threads.
+
- Issue #16641: Fix default values of sched.scheduler.enter arguments were
modifiable.