blob: fe8e7850920d3eac3a3547c52539ee69a8688840 [file] [log] [blame]
Serhiy Storchaka369a7822013-01-09 00:13:38 +02001import queue
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +00002import sched
3import time
4import unittest
5from test import support
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +02006try:
7 import threading
8except ImportError:
9 threading = None
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000010
Serhiy Storchaka369a7822013-01-09 00:13:38 +020011TIMEOUT = 10
12
13
14class Timer:
15 def __init__(self):
16 self._cond = threading.Condition()
17 self._time = 0
18 self._stop = 0
19
20 def time(self):
21 with self._cond:
22 return self._time
23
24 # increase the time but not beyond the established limit
25 def sleep(self, t):
26 assert t >= 0
27 with self._cond:
28 t += self._time
29 while self._stop < t:
30 self._time = self._stop
31 self._cond.wait()
32 self._time = t
33
34 # advance time limit for user code
35 def advance(self, t):
36 assert t >= 0
37 with self._cond:
38 self._stop += t
39 self._cond.notify_all()
40
41
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000042class TestCase(unittest.TestCase):
43
44 def test_enter(self):
45 l = []
46 fun = lambda x: l.append(x)
47 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natalif670ca52012-02-16 19:49:48 +010048 for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000049 z = scheduler.enter(x, 1, fun, (x,))
50 scheduler.run()
Charles-François Natalif670ca52012-02-16 19:49:48 +010051 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000052
53 def test_enterabs(self):
54 l = []
55 fun = lambda x: l.append(x)
56 scheduler = sched.scheduler(time.time, time.sleep)
57 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
58 z = scheduler.enterabs(x, 1, fun, (x,))
59 scheduler.run()
60 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
61
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +020062 @unittest.skipUnless(threading, 'Threading required for this test.')
63 def test_enter_concurrent(self):
Serhiy Storchaka369a7822013-01-09 00:13:38 +020064 q = queue.Queue()
65 fun = q.put
66 timer = Timer()
67 scheduler = sched.scheduler(timer.time, timer.sleep)
68 scheduler.enter(1, 1, fun, (1,))
69 scheduler.enter(3, 1, fun, (3,))
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +020070 t = threading.Thread(target=scheduler.run)
71 t.start()
Serhiy Storchaka369a7822013-01-09 00:13:38 +020072 timer.advance(1)
73 self.assertEqual(q.get(timeout=TIMEOUT), 1)
74 self.assertTrue(q.empty())
75 for x in [4, 5, 2]:
76 z = scheduler.enter(x - 1, 1, fun, (x,))
77 timer.advance(2)
78 self.assertEqual(q.get(timeout=TIMEOUT), 2)
79 self.assertEqual(q.get(timeout=TIMEOUT), 3)
80 self.assertTrue(q.empty())
81 timer.advance(1)
82 self.assertEqual(q.get(timeout=TIMEOUT), 4)
83 self.assertTrue(q.empty())
84 timer.advance(1)
85 self.assertEqual(q.get(timeout=TIMEOUT), 5)
86 self.assertTrue(q.empty())
87 timer.advance(1000)
88 t.join(timeout=TIMEOUT)
89 self.assertFalse(t.is_alive())
90 self.assertTrue(q.empty())
91 self.assertEqual(timer.time(), 5)
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +020092
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000093 def test_priority(self):
94 l = []
95 fun = lambda x: l.append(x)
96 scheduler = sched.scheduler(time.time, time.sleep)
97 for priority in [1, 2, 3, 4, 5]:
Charles-François Natalif670ca52012-02-16 19:49:48 +010098 z = scheduler.enterabs(0.01, priority, fun, (priority,))
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000099 scheduler.run()
100 self.assertEqual(l, [1, 2, 3, 4, 5])
101
102 def test_cancel(self):
103 l = []
104 fun = lambda x: l.append(x)
105 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natalif670ca52012-02-16 19:49:48 +0100106 now = time.time()
107 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
108 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
109 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
110 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
111 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000112 scheduler.cancel(event1)
113 scheduler.cancel(event5)
114 scheduler.run()
115 self.assertEqual(l, [0.02, 0.03, 0.04])
116
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200117 @unittest.skipUnless(threading, 'Threading required for this test.')
118 def test_cancel_concurrent(self):
Serhiy Storchaka369a7822013-01-09 00:13:38 +0200119 q = queue.Queue()
120 fun = q.put
121 timer = Timer()
122 scheduler = sched.scheduler(timer.time, timer.sleep)
123 now = timer.time()
124 event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
125 event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
126 event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
127 event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
128 event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200129 t = threading.Thread(target=scheduler.run)
130 t.start()
Serhiy Storchaka369a7822013-01-09 00:13:38 +0200131 timer.advance(1)
132 self.assertEqual(q.get(timeout=TIMEOUT), 1)
133 self.assertTrue(q.empty())
134 scheduler.cancel(event2)
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200135 scheduler.cancel(event5)
Serhiy Storchaka369a7822013-01-09 00:13:38 +0200136 timer.advance(1)
137 self.assertTrue(q.empty())
138 timer.advance(1)
139 self.assertEqual(q.get(timeout=TIMEOUT), 3)
140 self.assertTrue(q.empty())
141 timer.advance(1)
142 self.assertEqual(q.get(timeout=TIMEOUT), 4)
143 self.assertTrue(q.empty())
144 timer.advance(1000)
145 t.join(timeout=TIMEOUT)
146 self.assertFalse(t.is_alive())
147 self.assertTrue(q.empty())
148 self.assertEqual(timer.time(), 4)
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200149
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000150 def test_empty(self):
151 l = []
152 fun = lambda x: l.append(x)
153 scheduler = sched.scheduler(time.time, time.sleep)
154 self.assertTrue(scheduler.empty())
155 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
156 z = scheduler.enterabs(x, 1, fun, (x,))
157 self.assertFalse(scheduler.empty())
158 scheduler.run()
159 self.assertTrue(scheduler.empty())
160
161 def test_queue(self):
162 l = []
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000163 fun = lambda x: l.append(x)
164 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natali4a72ebe2012-02-16 19:51:45 +0100165 now = time.time()
166 e5 = scheduler.enterabs(now + 0.05, 1, fun)
167 e1 = scheduler.enterabs(now + 0.01, 1, fun)
168 e2 = scheduler.enterabs(now + 0.02, 1, fun)
169 e4 = scheduler.enterabs(now + 0.04, 1, fun)
170 e3 = scheduler.enterabs(now + 0.03, 1, fun)
Giampaolo Rodola'6a5dcd42011-11-26 12:17:42 +0100171 # queue property is supposed to return an order list of
172 # upcoming events
Raymond Hettinger889b92d2013-07-13 22:42:09 -0700173 self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000174
Giampaolo Rodola'be55d992011-11-22 13:33:34 +0100175 def test_args_kwargs(self):
176 flag = []
177
178 def fun(*a, **b):
179 flag.append(None)
180 self.assertEqual(a, (1,2,3))
181 self.assertEqual(b, {"foo":1})
182
183 scheduler = sched.scheduler(time.time, time.sleep)
184 z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
185 scheduler.run()
186 self.assertEqual(flag, [None])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000187
Giampaolo Rodola'556ba042011-12-14 14:38:45 +0100188 def test_run_non_blocking(self):
189 l = []
190 fun = lambda x: l.append(x)
191 scheduler = sched.scheduler(time.time, time.sleep)
192 for x in [10, 9, 8, 7, 6]:
193 scheduler.enter(x, 1, fun, (x,))
194 scheduler.run(blocking=False)
195 self.assertEqual(l, [])
196
197
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000198if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -0400199 unittest.main()