blob: 1fe6ad442cbd4f1450fe49c99e0e2a177bdd2e7c [file] [log] [blame]
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +00001#!/usr/bin/env python
2
Serhiy Storchaka369a7822013-01-09 00:13:38 +02003import queue
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +00004import sched
5import time
6import unittest
7from test import support
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +02008try:
9 import threading
10except ImportError:
11 threading = None
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000012
Serhiy Storchaka369a7822013-01-09 00:13:38 +020013TIMEOUT = 10
14
15
16class Timer:
17 def __init__(self):
18 self._cond = threading.Condition()
19 self._time = 0
20 self._stop = 0
21
22 def time(self):
23 with self._cond:
24 return self._time
25
26 # increase the time but not beyond the established limit
27 def sleep(self, t):
28 assert t >= 0
29 with self._cond:
30 t += self._time
31 while self._stop < t:
32 self._time = self._stop
33 self._cond.wait()
34 self._time = t
35
36 # advance time limit for user code
37 def advance(self, t):
38 assert t >= 0
39 with self._cond:
40 self._stop += t
41 self._cond.notify_all()
42
43
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000044class TestCase(unittest.TestCase):
45
46 def test_enter(self):
47 l = []
48 fun = lambda x: l.append(x)
49 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natalif670ca52012-02-16 19:49:48 +010050 for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000051 z = scheduler.enter(x, 1, fun, (x,))
52 scheduler.run()
Charles-François Natalif670ca52012-02-16 19:49:48 +010053 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000054
55 def test_enterabs(self):
56 l = []
57 fun = lambda x: l.append(x)
58 scheduler = sched.scheduler(time.time, time.sleep)
59 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
60 z = scheduler.enterabs(x, 1, fun, (x,))
61 scheduler.run()
62 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
63
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +020064 @unittest.skipUnless(threading, 'Threading required for this test.')
65 def test_enter_concurrent(self):
Serhiy Storchaka369a7822013-01-09 00:13:38 +020066 q = queue.Queue()
67 fun = q.put
68 timer = Timer()
69 scheduler = sched.scheduler(timer.time, timer.sleep)
70 scheduler.enter(1, 1, fun, (1,))
71 scheduler.enter(3, 1, fun, (3,))
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +020072 t = threading.Thread(target=scheduler.run)
73 t.start()
Serhiy Storchaka369a7822013-01-09 00:13:38 +020074 timer.advance(1)
75 self.assertEqual(q.get(timeout=TIMEOUT), 1)
76 self.assertTrue(q.empty())
77 for x in [4, 5, 2]:
78 z = scheduler.enter(x - 1, 1, fun, (x,))
79 timer.advance(2)
80 self.assertEqual(q.get(timeout=TIMEOUT), 2)
81 self.assertEqual(q.get(timeout=TIMEOUT), 3)
82 self.assertTrue(q.empty())
83 timer.advance(1)
84 self.assertEqual(q.get(timeout=TIMEOUT), 4)
85 self.assertTrue(q.empty())
86 timer.advance(1)
87 self.assertEqual(q.get(timeout=TIMEOUT), 5)
88 self.assertTrue(q.empty())
89 timer.advance(1000)
90 t.join(timeout=TIMEOUT)
91 self.assertFalse(t.is_alive())
92 self.assertTrue(q.empty())
93 self.assertEqual(timer.time(), 5)
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +020094
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000095 def test_priority(self):
96 l = []
97 fun = lambda x: l.append(x)
98 scheduler = sched.scheduler(time.time, time.sleep)
99 for priority in [1, 2, 3, 4, 5]:
Charles-François Natalif670ca52012-02-16 19:49:48 +0100100 z = scheduler.enterabs(0.01, priority, fun, (priority,))
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000101 scheduler.run()
102 self.assertEqual(l, [1, 2, 3, 4, 5])
103
104 def test_cancel(self):
105 l = []
106 fun = lambda x: l.append(x)
107 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natalif670ca52012-02-16 19:49:48 +0100108 now = time.time()
109 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
110 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
111 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
112 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
113 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000114 scheduler.cancel(event1)
115 scheduler.cancel(event5)
116 scheduler.run()
117 self.assertEqual(l, [0.02, 0.03, 0.04])
118
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200119 @unittest.skipUnless(threading, 'Threading required for this test.')
120 def test_cancel_concurrent(self):
Serhiy Storchaka369a7822013-01-09 00:13:38 +0200121 q = queue.Queue()
122 fun = q.put
123 timer = Timer()
124 scheduler = sched.scheduler(timer.time, timer.sleep)
125 now = timer.time()
126 event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
127 event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
128 event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
129 event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
130 event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200131 t = threading.Thread(target=scheduler.run)
132 t.start()
Serhiy Storchaka369a7822013-01-09 00:13:38 +0200133 timer.advance(1)
134 self.assertEqual(q.get(timeout=TIMEOUT), 1)
135 self.assertTrue(q.empty())
136 scheduler.cancel(event2)
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200137 scheduler.cancel(event5)
Serhiy Storchaka369a7822013-01-09 00:13:38 +0200138 timer.advance(1)
139 self.assertTrue(q.empty())
140 timer.advance(1)
141 self.assertEqual(q.get(timeout=TIMEOUT), 3)
142 self.assertTrue(q.empty())
143 timer.advance(1)
144 self.assertEqual(q.get(timeout=TIMEOUT), 4)
145 self.assertTrue(q.empty())
146 timer.advance(1000)
147 t.join(timeout=TIMEOUT)
148 self.assertFalse(t.is_alive())
149 self.assertTrue(q.empty())
150 self.assertEqual(timer.time(), 4)
Serhiy Storchakaf2b9cf42012-12-29 21:34:11 +0200151
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000152 def test_empty(self):
153 l = []
154 fun = lambda x: l.append(x)
155 scheduler = sched.scheduler(time.time, time.sleep)
156 self.assertTrue(scheduler.empty())
157 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
158 z = scheduler.enterabs(x, 1, fun, (x,))
159 self.assertFalse(scheduler.empty())
160 scheduler.run()
161 self.assertTrue(scheduler.empty())
162
163 def test_queue(self):
164 l = []
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000165 fun = lambda x: l.append(x)
166 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natali4a72ebe2012-02-16 19:51:45 +0100167 now = time.time()
168 e5 = scheduler.enterabs(now + 0.05, 1, fun)
169 e1 = scheduler.enterabs(now + 0.01, 1, fun)
170 e2 = scheduler.enterabs(now + 0.02, 1, fun)
171 e4 = scheduler.enterabs(now + 0.04, 1, fun)
172 e3 = scheduler.enterabs(now + 0.03, 1, fun)
Giampaolo Rodola'6a5dcd42011-11-26 12:17:42 +0100173 # queue property is supposed to return an order list of
174 # upcoming events
175 self.assertEqual(list(scheduler.queue), [e1, e2, e3, e4, e5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000176
Giampaolo Rodola'be55d992011-11-22 13:33:34 +0100177 def test_args_kwargs(self):
178 flag = []
179
180 def fun(*a, **b):
181 flag.append(None)
182 self.assertEqual(a, (1,2,3))
183 self.assertEqual(b, {"foo":1})
184
185 scheduler = sched.scheduler(time.time, time.sleep)
186 z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
187 scheduler.run()
188 self.assertEqual(flag, [None])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000189
Giampaolo Rodola'556ba042011-12-14 14:38:45 +0100190 def test_run_non_blocking(self):
191 l = []
192 fun = lambda x: l.append(x)
193 scheduler = sched.scheduler(time.time, time.sleep)
194 for x in [10, 9, 8, 7, 6]:
195 scheduler.enter(x, 1, fun, (x,))
196 scheduler.run(blocking=False)
197 self.assertEqual(l, [])
198
199
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000200def test_main():
201 support.run_unittest(TestCase)
202
203if __name__ == "__main__":
204 test_main()