blob: a06f56b047a1ed12e301ab80821e0c1669dabb50 [file] [log] [blame]
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00001
2# Test case for the os.poll() function
3
4import sys, os, select, random
5from test.test_support import verbose, TestSkipped, TESTFN
6
7try:
8 select.poll
9except AttributeError:
10 raise TestSkipped, "select.poll not defined -- skipping test_poll"
11
12
13def find_ready_matching(ready, flag):
14 match = []
15 for fd, mode in ready:
16 if mode & flag:
17 match.append(fd)
18 return match
19
20def test_poll1():
21 """Basic functional test of poll object
22
23 Create a bunch of pipe and test that poll works with them.
24 """
25 print 'Running poll test 1'
26 p = select.poll()
27
28 NUM_PIPES = 12
29 MSG = " This is a test."
30 MSG_LEN = len(MSG)
31 readers = []
32 writers = []
33 r2w = {}
34 w2r = {}
35
36 for i in range(NUM_PIPES):
37 rd, wr = os.pipe()
38 p.register(rd, select.POLLIN)
39 p.register(wr, select.POLLOUT)
40 readers.append(rd)
41 writers.append(wr)
42 r2w[rd] = wr
43 w2r[wr] = rd
44
45 while writers:
46 ready = p.poll()
47 ready_writers = find_ready_matching(ready, select.POLLOUT)
48 if not ready_writers:
49 raise RuntimeError, "no pipes ready for writing"
50 wr = random.choice(ready_writers)
51 os.write(wr, MSG)
52
53 ready = p.poll()
54 ready_readers = find_ready_matching(ready, select.POLLIN)
55 if not ready_readers:
56 raise RuntimeError, "no pipes ready for reading"
57 rd = random.choice(ready_readers)
58 buf = os.read(rd, MSG_LEN)
59 assert len(buf) == MSG_LEN
60 print buf
61 os.close(r2w[rd])
62 writers.remove(r2w[rd])
63
64 poll_unit_tests()
65 print 'Poll test 1 complete'
66
67def poll_unit_tests():
68 # returns NVAL for invalid file descriptor
69 FD = 42
70 try:
71 os.close(FD)
72 except OSError:
73 pass
74 p = select.poll()
75 p.register(FD)
76 r = p.poll()
77 assert r[0] == (FD, select.POLLNVAL)
78
79 f = open(TESTFN, 'w')
80 fd = f.fileno()
81 p = select.poll()
82 p.register(f)
83 r = p.poll()
84 assert r[0][0] == fd
85 f.close()
86 r = p.poll()
87 assert r[0] == (fd, select.POLLNVAL)
88 os.unlink(TESTFN)
89
90 # type error for invalid arguments
91 p = select.poll()
92 try:
93 p.register(p)
94 except TypeError:
95 pass
96 else:
97 print "Bogus register call did not raise TypeError"
98 try:
99 p.unregister(p)
100 except TypeError:
101 pass
102 else:
103 print "Bogus unregister call did not raise TypeError"
104
105 # can't unregister non-existent object
106 p = select.poll()
107 try:
108 p.unregister(3)
109 except KeyError:
110 pass
111 else:
112 print "Bogus unregister call did not raise KeyError"
113
114 # Test error cases
115 pollster = select.poll()
116 class Nope:
117 pass
118
119 class Almost:
120 def fileno(self):
121 return 'fileno'
122
123 try:
124 pollster.register( Nope(), 0 )
125 except TypeError: pass
126 else: print 'expected TypeError exception, not raised'
127
128 try:
129 pollster.register( Almost(), 0 )
130 except TypeError: pass
131 else: print 'expected TypeError exception, not raised'
132
133
134# Another test case for poll(). This is copied from the test case for
135# select(), modified to use poll() instead.
136
137def test_poll2():
138 print 'Running poll test 2'
139 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
140 p = os.popen(cmd, 'r')
141 pollster = select.poll()
142 pollster.register( p, select.POLLIN )
143 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
144 if verbose:
145 print 'timeout =', tout
146 fdlist = pollster.poll(tout)
147 if (fdlist == []):
148 continue
149 if fdlist[0] == (p.fileno(),select.POLLHUP):
150 line = p.readline()
151 if line != "":
152 print 'error: pipe seems to be closed, but still returns data'
153 continue
154
155 elif fdlist[0] == (p.fileno(),select.POLLIN):
156 line = p.readline()
157 if verbose:
158 print `line`
159 if not line:
160 if verbose:
161 print 'EOF'
162 break
163 continue
164 else:
165 print 'Unexpected return value from select.poll:', fdlist
166 p.close()
167 print 'Poll test 2 complete'
168
169test_poll1()
170test_poll2()
171