blob: ae82f94053e07db1dd6fd16e14203cda609a889f [file] [log] [blame]
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +00001#!/usr/bin/env python
2
3import sched
4import time
5import unittest
6from test import support
7
8
9class TestCase(unittest.TestCase):
10
11 def test_enter(self):
12 l = []
13 fun = lambda x: l.append(x)
14 scheduler = sched.scheduler(time.time, time.sleep)
15 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
16 z = scheduler.enter(x, 1, fun, (x,))
17 scheduler.run()
18 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
19
20 def test_enterabs(self):
21 l = []
22 fun = lambda x: l.append(x)
23 scheduler = sched.scheduler(time.time, time.sleep)
24 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
25 z = scheduler.enterabs(x, 1, fun, (x,))
26 scheduler.run()
27 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
28
29 def test_priority(self):
30 l = []
31 fun = lambda x: l.append(x)
32 scheduler = sched.scheduler(time.time, time.sleep)
33 for priority in [1, 2, 3, 4, 5]:
34 z = scheduler.enter(0.01, priority, fun, (priority,))
35 scheduler.run()
36 self.assertEqual(l, [1, 2, 3, 4, 5])
37
38 def test_cancel(self):
39 l = []
40 fun = lambda x: l.append(x)
41 scheduler = sched.scheduler(time.time, time.sleep)
42 event1 = scheduler.enter(0.01, 1, fun, (0.01,))
43 event2 = scheduler.enter(0.02, 1, fun, (0.02,))
44 event3 = scheduler.enter(0.03, 1, fun, (0.03,))
45 event4 = scheduler.enter(0.04, 1, fun, (0.04,))
46 event5 = scheduler.enter(0.05, 1, fun, (0.05,))
47 scheduler.cancel(event1)
48 scheduler.cancel(event5)
49 scheduler.run()
50 self.assertEqual(l, [0.02, 0.03, 0.04])
51
52 def test_empty(self):
53 l = []
54 fun = lambda x: l.append(x)
55 scheduler = sched.scheduler(time.time, time.sleep)
56 self.assertTrue(scheduler.empty())
57 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
58 z = scheduler.enterabs(x, 1, fun, (x,))
59 self.assertFalse(scheduler.empty())
60 scheduler.run()
61 self.assertTrue(scheduler.empty())
62
63 def test_queue(self):
64 l = []
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000065 fun = lambda x: l.append(x)
66 scheduler = sched.scheduler(time.time, time.sleep)
Giampaolo Rodola'6a5dcd42011-11-26 12:17:42 +010067 e5 = scheduler.enter(0.05, 1, fun)
68 e1 = scheduler.enter(0.01, 1, fun)
69 e2 = scheduler.enter(0.02, 1, fun)
70 e4 = scheduler.enter(0.04, 1, fun)
71 e3 = scheduler.enter(0.03, 1, fun)
72 # queue property is supposed to return an order list of
73 # upcoming events
74 self.assertEqual(list(scheduler.queue), [e1, e2, e3, e4, e5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000075
Giampaolo Rodola'be55d992011-11-22 13:33:34 +010076 def test_args_kwargs(self):
77 flag = []
78
79 def fun(*a, **b):
80 flag.append(None)
81 self.assertEqual(a, (1,2,3))
82 self.assertEqual(b, {"foo":1})
83
84 scheduler = sched.scheduler(time.time, time.sleep)
85 z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
86 scheduler.run()
87 self.assertEqual(flag, [None])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000088
Giampaolo Rodola'556ba042011-12-14 14:38:45 +010089 def test_run_non_blocking(self):
90 l = []
91 fun = lambda x: l.append(x)
92 scheduler = sched.scheduler(time.time, time.sleep)
93 for x in [10, 9, 8, 7, 6]:
94 scheduler.enter(x, 1, fun, (x,))
95 scheduler.run(blocking=False)
96 self.assertEqual(l, [])
97
98
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000099def test_main():
100 support.run_unittest(TestCase)
101
102if __name__ == "__main__":
103 test_main()