blob: 543dc6bc48bf0b3364bae41bef1267dbaa41f553 [file] [log] [blame]
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00001# Test case for the os.poll() function
2
3import sys, os, select, random
Tim Peters43dee062000-08-26 08:24:18 +00004from test_support import verbose, TestSkipped, TESTFN
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00005
6try:
7 select.poll
8except AttributeError:
9 raise TestSkipped, "select.poll not defined -- skipping test_poll"
10
11
12def find_ready_matching(ready, flag):
13 match = []
14 for fd, mode in ready:
15 if mode & flag:
16 match.append(fd)
17 return match
18
19def test_poll1():
20 """Basic functional test of poll object
21
22 Create a bunch of pipe and test that poll works with them.
23 """
24 print 'Running poll test 1'
25 p = select.poll()
26
27 NUM_PIPES = 12
28 MSG = " This is a test."
29 MSG_LEN = len(MSG)
30 readers = []
31 writers = []
32 r2w = {}
33 w2r = {}
34
35 for i in range(NUM_PIPES):
36 rd, wr = os.pipe()
37 p.register(rd, select.POLLIN)
38 p.register(wr, select.POLLOUT)
39 readers.append(rd)
40 writers.append(wr)
41 r2w[rd] = wr
42 w2r[wr] = rd
43
44 while writers:
45 ready = p.poll()
46 ready_writers = find_ready_matching(ready, select.POLLOUT)
47 if not ready_writers:
48 raise RuntimeError, "no pipes ready for writing"
49 wr = random.choice(ready_writers)
50 os.write(wr, MSG)
51
52 ready = p.poll()
53 ready_readers = find_ready_matching(ready, select.POLLIN)
54 if not ready_readers:
55 raise RuntimeError, "no pipes ready for reading"
56 rd = random.choice(ready_readers)
57 buf = os.read(rd, MSG_LEN)
58 assert len(buf) == MSG_LEN
59 print buf
60 os.close(r2w[rd])
61 writers.remove(r2w[rd])
62
63 poll_unit_tests()
64 print 'Poll test 1 complete'
65
66def poll_unit_tests():
67 # returns NVAL for invalid file descriptor
68 FD = 42
69 try:
70 os.close(FD)
71 except OSError:
72 pass
73 p = select.poll()
74 p.register(FD)
75 r = p.poll()
76 assert r[0] == (FD, select.POLLNVAL)
77
78 f = open(TESTFN, 'w')
79 fd = f.fileno()
80 p = select.poll()
81 p.register(f)
82 r = p.poll()
83 assert r[0][0] == fd
84 f.close()
85 r = p.poll()
86 assert r[0] == (fd, select.POLLNVAL)
87 os.unlink(TESTFN)
88
89 # type error for invalid arguments
90 p = select.poll()
91 try:
92 p.register(p)
93 except TypeError:
94 pass
95 else:
96 print "Bogus register call did not raise TypeError"
97 try:
98 p.unregister(p)
99 except TypeError:
100 pass
101 else:
102 print "Bogus unregister call did not raise TypeError"
103
104 # can't unregister non-existent object
105 p = select.poll()
106 try:
107 p.unregister(3)
108 except KeyError:
109 pass
110 else:
111 print "Bogus unregister call did not raise KeyError"
112
113 # Test error cases
114 pollster = select.poll()
115 class Nope:
116 pass
117
118 class Almost:
119 def fileno(self):
120 return 'fileno'
121
122 try:
123 pollster.register( Nope(), 0 )
124 except TypeError: pass
125 else: print 'expected TypeError exception, not raised'
126
127 try:
128 pollster.register( Almost(), 0 )
129 except TypeError: pass
130 else: print 'expected TypeError exception, not raised'
131
132
133# Another test case for poll(). This is copied from the test case for
134# select(), modified to use poll() instead.
135
136def test_poll2():
137 print 'Running poll test 2'
138 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
139 p = os.popen(cmd, 'r')
140 pollster = select.poll()
141 pollster.register( p, select.POLLIN )
142 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
143 if verbose:
144 print 'timeout =', tout
145 fdlist = pollster.poll(tout)
146 if (fdlist == []):
147 continue
148 if fdlist[0] == (p.fileno(),select.POLLHUP):
149 line = p.readline()
150 if line != "":
151 print 'error: pipe seems to be closed, but still returns data'
152 continue
153
154 elif fdlist[0] == (p.fileno(),select.POLLIN):
155 line = p.readline()
156 if verbose:
157 print `line`
158 if not line:
159 if verbose:
160 print 'EOF'
161 break
162 continue
163 else:
164 print 'Unexpected return value from select.poll:', fdlist
165 p.close()
166 print 'Poll test 2 complete'
167
168test_poll1()
169test_poll2()