blob: 50ada522a2694f9d366eaff9014a98938335b6f1 [file] [log] [blame]
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +00001#!/usr/bin/env python
2
3import sched
4import time
5import unittest
6from test import support
7
8
9class TestCase(unittest.TestCase):
10
11 def test_enter(self):
12 l = []
13 fun = lambda x: l.append(x)
14 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natalif670ca52012-02-16 19:49:48 +010015 for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000016 z = scheduler.enter(x, 1, fun, (x,))
17 scheduler.run()
Charles-François Natalif670ca52012-02-16 19:49:48 +010018 self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000019
20 def test_enterabs(self):
21 l = []
22 fun = lambda x: l.append(x)
23 scheduler = sched.scheduler(time.time, time.sleep)
24 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
25 z = scheduler.enterabs(x, 1, fun, (x,))
26 scheduler.run()
27 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
28
29 def test_priority(self):
30 l = []
31 fun = lambda x: l.append(x)
32 scheduler = sched.scheduler(time.time, time.sleep)
33 for priority in [1, 2, 3, 4, 5]:
Charles-François Natalif670ca52012-02-16 19:49:48 +010034 z = scheduler.enterabs(0.01, priority, fun, (priority,))
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000035 scheduler.run()
36 self.assertEqual(l, [1, 2, 3, 4, 5])
37
38 def test_cancel(self):
39 l = []
40 fun = lambda x: l.append(x)
41 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natalif670ca52012-02-16 19:49:48 +010042 now = time.time()
43 event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
44 event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
45 event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
46 event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
47 event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000048 scheduler.cancel(event1)
49 scheduler.cancel(event5)
50 scheduler.run()
51 self.assertEqual(l, [0.02, 0.03, 0.04])
52
53 def test_empty(self):
54 l = []
55 fun = lambda x: l.append(x)
56 scheduler = sched.scheduler(time.time, time.sleep)
57 self.assertTrue(scheduler.empty())
58 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
59 z = scheduler.enterabs(x, 1, fun, (x,))
60 self.assertFalse(scheduler.empty())
61 scheduler.run()
62 self.assertTrue(scheduler.empty())
63
64 def test_queue(self):
65 l = []
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000066 fun = lambda x: l.append(x)
67 scheduler = sched.scheduler(time.time, time.sleep)
Charles-François Natali4a72ebe2012-02-16 19:51:45 +010068 now = time.time()
69 e5 = scheduler.enterabs(now + 0.05, 1, fun)
70 e1 = scheduler.enterabs(now + 0.01, 1, fun)
71 e2 = scheduler.enterabs(now + 0.02, 1, fun)
72 e4 = scheduler.enterabs(now + 0.04, 1, fun)
73 e3 = scheduler.enterabs(now + 0.03, 1, fun)
Giampaolo Rodola'6a5dcd42011-11-26 12:17:42 +010074 # queue property is supposed to return an order list of
75 # upcoming events
76 self.assertEqual(list(scheduler.queue), [e1, e2, e3, e4, e5])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000077
Giampaolo Rodola'be55d992011-11-22 13:33:34 +010078 def test_args_kwargs(self):
79 flag = []
80
81 def fun(*a, **b):
82 flag.append(None)
83 self.assertEqual(a, (1,2,3))
84 self.assertEqual(b, {"foo":1})
85
86 scheduler = sched.scheduler(time.time, time.sleep)
87 z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
88 scheduler.run()
89 self.assertEqual(flag, [None])
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +000090
Giampaolo Rodola'556ba042011-12-14 14:38:45 +010091 def test_run_non_blocking(self):
92 l = []
93 fun = lambda x: l.append(x)
94 scheduler = sched.scheduler(time.time, time.sleep)
95 for x in [10, 9, 8, 7, 6]:
96 scheduler.enter(x, 1, fun, (x,))
97 scheduler.run(blocking=False)
98 self.assertEqual(l, [])
99
100
Giampaolo Rodolàb5c23762010-08-04 09:28:05 +0000101def test_main():
102 support.run_unittest(TestCase)
103
104if __name__ == "__main__":
105 test_main()