blob: 1e195ed624da9068685d1fb650d16f7af8a2bccd [file] [log] [blame]
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00001# Test case for the os.poll() function
Fred Drake004d5e62000-10-23 17:22:08 +00002
Serhiy Storchakac3603892013-08-20 20:38:21 +03003import os
4import random
5import select
Serhiy Storchakac3603892013-08-20 20:38:21 +03006try:
7 import threading
8except ImportError:
9 threading = None
10import time
11import unittest
Serhiy Storchaka76249ea2014-02-07 10:06:05 +020012from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000013
14try:
15 select.poll
16except AttributeError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +000017 raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000018
19
20def find_ready_matching(ready, flag):
21 match = []
22 for fd, mode in ready:
23 if mode & flag:
24 match.append(fd)
25 return match
26
Georg Brandleecce792006-10-29 19:20:45 +000027class PollTests(unittest.TestCase):
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000028
Georg Brandleecce792006-10-29 19:20:45 +000029 def test_poll1(self):
30 # Basic functional test of poll object
31 # Create a bunch of pipe and test that poll works with them.
Tim Petersabd8a332006-11-03 02:32:46 +000032
Georg Brandleecce792006-10-29 19:20:45 +000033 p = select.poll()
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000034
Georg Brandleecce792006-10-29 19:20:45 +000035 NUM_PIPES = 12
36 MSG = " This is a test."
37 MSG_LEN = len(MSG)
38 readers = []
39 writers = []
40 r2w = {}
41 w2r = {}
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000042
Georg Brandleecce792006-10-29 19:20:45 +000043 for i in range(NUM_PIPES):
44 rd, wr = os.pipe()
Christian Heimes0e9ab5f2008-03-21 23:49:44 +000045 p.register(rd)
46 p.modify(rd, select.POLLIN)
Georg Brandleecce792006-10-29 19:20:45 +000047 p.register(wr, select.POLLOUT)
48 readers.append(rd)
49 writers.append(wr)
50 r2w[rd] = wr
51 w2r[wr] = rd
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000052
Georg Brandleecce792006-10-29 19:20:45 +000053 bufs = []
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000054
Georg Brandleecce792006-10-29 19:20:45 +000055 while writers:
56 ready = p.poll()
57 ready_writers = find_ready_matching(ready, select.POLLOUT)
58 if not ready_writers:
59 raise RuntimeError, "no pipes ready for writing"
60 wr = random.choice(ready_writers)
61 os.write(wr, MSG)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000062
Georg Brandleecce792006-10-29 19:20:45 +000063 ready = p.poll()
64 ready_readers = find_ready_matching(ready, select.POLLIN)
65 if not ready_readers:
66 raise RuntimeError, "no pipes ready for reading"
67 rd = random.choice(ready_readers)
68 buf = os.read(rd, MSG_LEN)
69 self.assertEqual(len(buf), MSG_LEN)
70 bufs.append(buf)
71 os.close(r2w[rd]) ; os.close( rd )
72 p.unregister( r2w[rd] )
73 p.unregister( rd )
74 writers.remove(r2w[rd])
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000075
Georg Brandleecce792006-10-29 19:20:45 +000076 self.assertEqual(bufs, [MSG] * NUM_PIPES)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000077
Georg Brandleecce792006-10-29 19:20:45 +000078 def poll_unit_tests(self):
79 # returns NVAL for invalid file descriptor
80 FD = 42
81 try:
82 os.close(FD)
83 except OSError:
84 pass
85 p = select.poll()
86 p.register(FD)
87 r = p.poll()
88 self.assertEqual(r[0], (FD, select.POLLNVAL))
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000089
Georg Brandleecce792006-10-29 19:20:45 +000090 f = open(TESTFN, 'w')
91 fd = f.fileno()
92 p = select.poll()
93 p.register(f)
94 r = p.poll()
95 self.assertEqual(r[0][0], fd)
96 f.close()
97 r = p.poll()
98 self.assertEqual(r[0], (fd, select.POLLNVAL))
99 os.unlink(TESTFN)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000100
Georg Brandleecce792006-10-29 19:20:45 +0000101 # type error for invalid arguments
102 p = select.poll()
103 self.assertRaises(TypeError, p.register, p)
104 self.assertRaises(TypeError, p.unregister, p)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000105
Georg Brandleecce792006-10-29 19:20:45 +0000106 # can't unregister non-existent object
107 p = select.poll()
108 self.assertRaises(KeyError, p.unregister, 3)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000109
Georg Brandleecce792006-10-29 19:20:45 +0000110 # Test error cases
111 pollster = select.poll()
112 class Nope:
113 pass
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000114
Georg Brandleecce792006-10-29 19:20:45 +0000115 class Almost:
116 def fileno(self):
117 return 'fileno'
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000118
Georg Brandleecce792006-10-29 19:20:45 +0000119 self.assertRaises(TypeError, pollster.register, Nope(), 0)
120 self.assertRaises(TypeError, pollster.register, Almost(), 0)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000121
Georg Brandleecce792006-10-29 19:20:45 +0000122 # Another test case for poll(). This is copied from the test case for
123 # select(), modified to use poll() instead.
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000124
Georg Brandleecce792006-10-29 19:20:45 +0000125 def test_poll2(self):
126 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
127 p = os.popen(cmd, 'r')
128 pollster = select.poll()
129 pollster.register( p, select.POLLIN )
130 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
131 fdlist = pollster.poll(tout)
132 if (fdlist == []):
133 continue
134 fd, flags = fdlist[0]
135 if flags & select.POLLHUP:
136 line = p.readline()
137 if line != "":
138 self.fail('error: pipe seems to be closed, but still returns data')
139 continue
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000140
Georg Brandleecce792006-10-29 19:20:45 +0000141 elif flags & select.POLLIN:
142 line = p.readline()
143 if not line:
144 break
145 continue
146 else:
147 self.fail('Unexpected return value from select.poll: %s' % fdlist)
148 p.close()
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000149
Georg Brandleecce792006-10-29 19:20:45 +0000150 def test_poll3(self):
151 # test int overflow
152 pollster = select.poll()
153 pollster.register(1)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000154
Georg Brandleecce792006-10-29 19:20:45 +0000155 self.assertRaises(OverflowError, pollster.poll, 1L << 64)
Neal Norwitz0f46bbf2005-11-03 05:00:25 +0000156
Georg Brandleecce792006-10-29 19:20:45 +0000157 x = 2 + 3
158 if x != 5:
159 self.fail('Overflow must have occurred')
Neal Norwitz0f46bbf2005-11-03 05:00:25 +0000160
Serhiy Storchakaa92cc912013-12-14 19:11:04 +0200161 # Issues #15989, #17919
162 self.assertRaises(OverflowError, pollster.register, 0, -1)
Serhiy Storchaka76249ea2014-02-07 10:06:05 +0200163 self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
Serhiy Storchakaa92cc912013-12-14 19:11:04 +0200164 self.assertRaises(OverflowError, pollster.modify, 1, -1)
Serhiy Storchaka76249ea2014-02-07 10:06:05 +0200165 self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
166
167 @cpython_only
168 def test_poll_c_limits(self):
169 from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
170 pollster = select.poll()
171 pollster.register(1)
172
173 # Issues #15989, #17919
174 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
Serhiy Storchakaa92cc912013-12-14 19:11:04 +0200175 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
176 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
177 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
Serhiy Storchaka74f49ab2013-01-19 12:55:39 +0200178
Serhiy Storchakac3603892013-08-20 20:38:21 +0300179 @unittest.skipUnless(threading, 'Threading required for this test.')
180 @reap_threads
181 def test_threaded_poll(self):
182 r, w = os.pipe()
183 self.addCleanup(os.close, r)
184 self.addCleanup(os.close, w)
185 rfds = []
186 for i in range(10):
187 fd = os.dup(r)
188 self.addCleanup(os.close, fd)
189 rfds.append(fd)
190 pollster = select.poll()
191 for fd in rfds:
192 pollster.register(fd, select.POLLIN)
193
194 t = threading.Thread(target=pollster.poll)
195 t.start()
196 try:
197 time.sleep(0.5)
198 # trigger ufds array reallocation
199 for fd in rfds:
200 pollster.unregister(fd)
201 pollster.register(w, select.POLLOUT)
202 self.assertRaises(RuntimeError, pollster.poll)
203 finally:
204 # and make the call to poll() from the thread return
205 os.write(w, b'spam')
206 t.join()
207
208
Georg Brandleecce792006-10-29 19:20:45 +0000209def test_main():
210 run_unittest(PollTests)
Tim Peters536cf992005-12-25 23:18:31 +0000211
Georg Brandleecce792006-10-29 19:20:45 +0000212if __name__ == '__main__':
213 test_main()