blob: e4cdc16bdef9b73a7b92f5ac162cab27c144c4e9 [file] [log] [blame]
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00001# Test case for the os.poll() function
Fred Drake004d5e62000-10-23 17:22:08 +00002
Serhiy Storchakac3603892013-08-20 20:38:21 +03003import os
4import random
5import select
Serhiy Storchakaa92cc912013-12-14 19:11:04 +02006from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
Serhiy Storchakac3603892013-08-20 20:38:21 +03007try:
8 import threading
9except ImportError:
10 threading = None
11import time
12import unittest
13from test.test_support import TESTFN, run_unittest, reap_threads
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000014
15try:
16 select.poll
17except AttributeError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +000018 raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000019
20
21def find_ready_matching(ready, flag):
22 match = []
23 for fd, mode in ready:
24 if mode & flag:
25 match.append(fd)
26 return match
27
Georg Brandleecce792006-10-29 19:20:45 +000028class PollTests(unittest.TestCase):
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000029
Georg Brandleecce792006-10-29 19:20:45 +000030 def test_poll1(self):
31 # Basic functional test of poll object
32 # Create a bunch of pipe and test that poll works with them.
Tim Petersabd8a332006-11-03 02:32:46 +000033
Georg Brandleecce792006-10-29 19:20:45 +000034 p = select.poll()
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000035
Georg Brandleecce792006-10-29 19:20:45 +000036 NUM_PIPES = 12
37 MSG = " This is a test."
38 MSG_LEN = len(MSG)
39 readers = []
40 writers = []
41 r2w = {}
42 w2r = {}
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000043
Georg Brandleecce792006-10-29 19:20:45 +000044 for i in range(NUM_PIPES):
45 rd, wr = os.pipe()
Christian Heimes0e9ab5f2008-03-21 23:49:44 +000046 p.register(rd)
47 p.modify(rd, select.POLLIN)
Georg Brandleecce792006-10-29 19:20:45 +000048 p.register(wr, select.POLLOUT)
49 readers.append(rd)
50 writers.append(wr)
51 r2w[rd] = wr
52 w2r[wr] = rd
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000053
Georg Brandleecce792006-10-29 19:20:45 +000054 bufs = []
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000055
Georg Brandleecce792006-10-29 19:20:45 +000056 while writers:
57 ready = p.poll()
58 ready_writers = find_ready_matching(ready, select.POLLOUT)
59 if not ready_writers:
60 raise RuntimeError, "no pipes ready for writing"
61 wr = random.choice(ready_writers)
62 os.write(wr, MSG)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000063
Georg Brandleecce792006-10-29 19:20:45 +000064 ready = p.poll()
65 ready_readers = find_ready_matching(ready, select.POLLIN)
66 if not ready_readers:
67 raise RuntimeError, "no pipes ready for reading"
68 rd = random.choice(ready_readers)
69 buf = os.read(rd, MSG_LEN)
70 self.assertEqual(len(buf), MSG_LEN)
71 bufs.append(buf)
72 os.close(r2w[rd]) ; os.close( rd )
73 p.unregister( r2w[rd] )
74 p.unregister( rd )
75 writers.remove(r2w[rd])
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000076
Georg Brandleecce792006-10-29 19:20:45 +000077 self.assertEqual(bufs, [MSG] * NUM_PIPES)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000078
Georg Brandleecce792006-10-29 19:20:45 +000079 def poll_unit_tests(self):
80 # returns NVAL for invalid file descriptor
81 FD = 42
82 try:
83 os.close(FD)
84 except OSError:
85 pass
86 p = select.poll()
87 p.register(FD)
88 r = p.poll()
89 self.assertEqual(r[0], (FD, select.POLLNVAL))
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000090
Georg Brandleecce792006-10-29 19:20:45 +000091 f = open(TESTFN, 'w')
92 fd = f.fileno()
93 p = select.poll()
94 p.register(f)
95 r = p.poll()
96 self.assertEqual(r[0][0], fd)
97 f.close()
98 r = p.poll()
99 self.assertEqual(r[0], (fd, select.POLLNVAL))
100 os.unlink(TESTFN)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000101
Georg Brandleecce792006-10-29 19:20:45 +0000102 # type error for invalid arguments
103 p = select.poll()
104 self.assertRaises(TypeError, p.register, p)
105 self.assertRaises(TypeError, p.unregister, p)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000106
Georg Brandleecce792006-10-29 19:20:45 +0000107 # can't unregister non-existent object
108 p = select.poll()
109 self.assertRaises(KeyError, p.unregister, 3)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000110
Georg Brandleecce792006-10-29 19:20:45 +0000111 # Test error cases
112 pollster = select.poll()
113 class Nope:
114 pass
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000115
Georg Brandleecce792006-10-29 19:20:45 +0000116 class Almost:
117 def fileno(self):
118 return 'fileno'
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000119
Georg Brandleecce792006-10-29 19:20:45 +0000120 self.assertRaises(TypeError, pollster.register, Nope(), 0)
121 self.assertRaises(TypeError, pollster.register, Almost(), 0)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000122
Georg Brandleecce792006-10-29 19:20:45 +0000123 # Another test case for poll(). This is copied from the test case for
124 # select(), modified to use poll() instead.
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000125
Georg Brandleecce792006-10-29 19:20:45 +0000126 def test_poll2(self):
127 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
128 p = os.popen(cmd, 'r')
129 pollster = select.poll()
130 pollster.register( p, select.POLLIN )
131 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
132 fdlist = pollster.poll(tout)
133 if (fdlist == []):
134 continue
135 fd, flags = fdlist[0]
136 if flags & select.POLLHUP:
137 line = p.readline()
138 if line != "":
139 self.fail('error: pipe seems to be closed, but still returns data')
140 continue
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000141
Georg Brandleecce792006-10-29 19:20:45 +0000142 elif flags & select.POLLIN:
143 line = p.readline()
144 if not line:
145 break
146 continue
147 else:
148 self.fail('Unexpected return value from select.poll: %s' % fdlist)
149 p.close()
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000150
Georg Brandleecce792006-10-29 19:20:45 +0000151 def test_poll3(self):
152 # test int overflow
153 pollster = select.poll()
154 pollster.register(1)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000155
Georg Brandleecce792006-10-29 19:20:45 +0000156 self.assertRaises(OverflowError, pollster.poll, 1L << 64)
Neal Norwitz0f46bbf2005-11-03 05:00:25 +0000157
Georg Brandleecce792006-10-29 19:20:45 +0000158 x = 2 + 3
159 if x != 5:
160 self.fail('Overflow must have occurred')
Neal Norwitz0f46bbf2005-11-03 05:00:25 +0000161
Serhiy Storchakaa92cc912013-12-14 19:11:04 +0200162 # Issues #15989, #17919
163 self.assertRaises(OverflowError, pollster.register, 0, -1)
164 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
165 self.assertRaises(OverflowError, pollster.modify, 1, -1)
166 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
167 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
168 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
Serhiy Storchaka74f49ab2013-01-19 12:55:39 +0200169
Serhiy Storchakac3603892013-08-20 20:38:21 +0300170 @unittest.skipUnless(threading, 'Threading required for this test.')
171 @reap_threads
172 def test_threaded_poll(self):
173 r, w = os.pipe()
174 self.addCleanup(os.close, r)
175 self.addCleanup(os.close, w)
176 rfds = []
177 for i in range(10):
178 fd = os.dup(r)
179 self.addCleanup(os.close, fd)
180 rfds.append(fd)
181 pollster = select.poll()
182 for fd in rfds:
183 pollster.register(fd, select.POLLIN)
184
185 t = threading.Thread(target=pollster.poll)
186 t.start()
187 try:
188 time.sleep(0.5)
189 # trigger ufds array reallocation
190 for fd in rfds:
191 pollster.unregister(fd)
192 pollster.register(w, select.POLLOUT)
193 self.assertRaises(RuntimeError, pollster.poll)
194 finally:
195 # and make the call to poll() from the thread return
196 os.write(w, b'spam')
197 t.join()
198
199
Georg Brandleecce792006-10-29 19:20:45 +0000200def test_main():
201 run_unittest(PollTests)
Tim Peters536cf992005-12-25 23:18:31 +0000202
Georg Brandleecce792006-10-29 19:20:45 +0000203if __name__ == '__main__':
204 test_main()