blob: 82d8372be2e942e7a3b1d18b7cd75d4dd8df628d [file] [log] [blame]
Guido van Rossum6f8f92f2000-09-01 19:27:34 +00001# Test case for the os.poll() function
2
3import sys, os, select, random
4from test_support import verbose, TestSkipped, TESTFN
5
6try:
7 select.poll
8except AttributeError:
9 raise TestSkipped, "select.poll not defined -- skipping test_poll"
10
11
12def find_ready_matching(ready, flag):
13 match = []
14 for fd, mode in ready:
15 if mode & flag:
16 match.append(fd)
17 return match
18
19def test_poll1():
20 """Basic functional test of poll object
21
22 Create a bunch of pipe and test that poll works with them.
23 """
24 print 'Running poll test 1'
25 p = select.poll()
26
27 NUM_PIPES = 12
28 MSG = " This is a test."
29 MSG_LEN = len(MSG)
30 readers = []
31 writers = []
32 r2w = {}
33 w2r = {}
34
35 for i in range(NUM_PIPES):
36 rd, wr = os.pipe()
37 p.register(rd, select.POLLIN)
38 p.register(wr, select.POLLOUT)
39 readers.append(rd)
40 writers.append(wr)
41 r2w[rd] = wr
42 w2r[wr] = rd
43
44 while writers:
45 ready = p.poll()
46 ready_writers = find_ready_matching(ready, select.POLLOUT)
47 if not ready_writers:
48 raise RuntimeError, "no pipes ready for writing"
49 wr = random.choice(ready_writers)
50 os.write(wr, MSG)
51
52 ready = p.poll()
53 ready_readers = find_ready_matching(ready, select.POLLIN)
54 if not ready_readers:
55 raise RuntimeError, "no pipes ready for reading"
56 rd = random.choice(ready_readers)
57 buf = os.read(rd, MSG_LEN)
58 assert len(buf) == MSG_LEN
59 print buf
60 os.close(r2w[rd]) ; os.close( rd )
61 p.unregister( r2w[rd] )
62 p.unregister( rd )
63 writers.remove(r2w[rd])
64
65 poll_unit_tests()
66 print 'Poll test 1 complete'
67
68def poll_unit_tests():
69 # returns NVAL for invalid file descriptor
70 FD = 42
71 try:
72 os.close(FD)
73 except OSError:
74 pass
75 p = select.poll()
76 p.register(FD)
77 r = p.poll()
78 assert r[0] == (FD, select.POLLNVAL)
79
80 f = open(TESTFN, 'w')
81 fd = f.fileno()
82 p = select.poll()
83 p.register(f)
84 r = p.poll()
85 assert r[0][0] == fd
86 f.close()
87 r = p.poll()
88 assert r[0] == (fd, select.POLLNVAL)
89 os.unlink(TESTFN)
90
91 # type error for invalid arguments
92 p = select.poll()
93 try:
94 p.register(p)
95 except TypeError:
96 pass
97 else:
98 print "Bogus register call did not raise TypeError"
99 try:
100 p.unregister(p)
101 except TypeError:
102 pass
103 else:
104 print "Bogus unregister call did not raise TypeError"
105
106 # can't unregister non-existent object
107 p = select.poll()
108 try:
109 p.unregister(3)
110 except KeyError:
111 pass
112 else:
113 print "Bogus unregister call did not raise KeyError"
114
115 # Test error cases
116 pollster = select.poll()
117 class Nope:
118 pass
119
120 class Almost:
121 def fileno(self):
122 return 'fileno'
123
124 try:
125 pollster.register( Nope(), 0 )
126 except TypeError: pass
127 else: print 'expected TypeError exception, not raised'
128
129 try:
130 pollster.register( Almost(), 0 )
131 except TypeError: pass
132 else: print 'expected TypeError exception, not raised'
133
134
135# Another test case for poll(). This is copied from the test case for
136# select(), modified to use poll() instead.
137
138def test_poll2():
139 print 'Running poll test 2'
140 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
141 p = os.popen(cmd, 'r')
142 pollster = select.poll()
143 pollster.register( p, select.POLLIN )
144 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
145 if verbose:
146 print 'timeout =', tout
147 fdlist = pollster.poll(tout)
148 if (fdlist == []):
149 continue
150 fd, flags = fdlist[0]
151 if flags & select.POLLHUP:
152 line = p.readline()
153 if line != "":
154 print 'error: pipe seems to be closed, but still returns data'
155 continue
156
157 elif flags & select.POLLIN:
158 line = p.readline()
159 if verbose:
160 print `line`
161 if not line:
162 if verbose:
163 print 'EOF'
164 break
165 continue
166 else:
167 print 'Unexpected return value from select.poll:', fdlist
168 p.close()
169 print 'Poll test 2 complete'
170
171test_poll1()
172test_poll2()