blob: f6d427ccd5fa4c4913f9de68cabb0bdfa16b6d7c [file] [log] [blame]
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00001# Test case for the os.poll() function
Fred Drake004d5e62000-10-23 17:22:08 +00002
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00003import sys, os, select, random
Marc-André Lemburg36619082001-01-17 19:11:13 +00004from test_support import verify, verbose, TestSkipped, TESTFN
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +00005
6try:
7 select.poll
8except AttributeError:
9 raise TestSkipped, "select.poll not defined -- skipping test_poll"
10
11
12def find_ready_matching(ready, flag):
13 match = []
14 for fd, mode in ready:
15 if mode & flag:
16 match.append(fd)
17 return match
18
19def test_poll1():
20 """Basic functional test of poll object
21
22 Create a bunch of pipe and test that poll works with them.
23 """
24 print 'Running poll test 1'
25 p = select.poll()
26
27 NUM_PIPES = 12
28 MSG = " This is a test."
29 MSG_LEN = len(MSG)
30 readers = []
31 writers = []
32 r2w = {}
33 w2r = {}
34
35 for i in range(NUM_PIPES):
36 rd, wr = os.pipe()
37 p.register(rd, select.POLLIN)
38 p.register(wr, select.POLLOUT)
39 readers.append(rd)
40 writers.append(wr)
41 r2w[rd] = wr
42 w2r[wr] = rd
43
44 while writers:
45 ready = p.poll()
46 ready_writers = find_ready_matching(ready, select.POLLOUT)
47 if not ready_writers:
48 raise RuntimeError, "no pipes ready for writing"
49 wr = random.choice(ready_writers)
50 os.write(wr, MSG)
51
52 ready = p.poll()
53 ready_readers = find_ready_matching(ready, select.POLLIN)
54 if not ready_readers:
55 raise RuntimeError, "no pipes ready for reading"
56 rd = random.choice(ready_readers)
57 buf = os.read(rd, MSG_LEN)
Marc-André Lemburg36619082001-01-17 19:11:13 +000058 verify(len(buf) == MSG_LEN)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000059 print buf
Andrew M. Kuchlingd50a1872000-08-29 16:53:34 +000060 os.close(r2w[rd]) ; os.close( rd )
61 p.unregister( r2w[rd] )
62 p.unregister( rd )
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000063 writers.remove(r2w[rd])
64
65 poll_unit_tests()
66 print 'Poll test 1 complete'
67
68def poll_unit_tests():
69 # returns NVAL for invalid file descriptor
70 FD = 42
71 try:
72 os.close(FD)
73 except OSError:
74 pass
75 p = select.poll()
76 p.register(FD)
77 r = p.poll()
Marc-André Lemburg36619082001-01-17 19:11:13 +000078 verify(r[0] == (FD, select.POLLNVAL))
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000079
80 f = open(TESTFN, 'w')
81 fd = f.fileno()
82 p = select.poll()
83 p.register(f)
84 r = p.poll()
Marc-André Lemburg36619082001-01-17 19:11:13 +000085 verify(r[0][0] == fd)
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000086 f.close()
87 r = p.poll()
Marc-André Lemburg36619082001-01-17 19:11:13 +000088 verify(r[0] == (fd, select.POLLNVAL))
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +000089 os.unlink(TESTFN)
90
91 # type error for invalid arguments
92 p = select.poll()
93 try:
94 p.register(p)
95 except TypeError:
96 pass
97 else:
98 print "Bogus register call did not raise TypeError"
99 try:
100 p.unregister(p)
101 except TypeError:
102 pass
103 else:
104 print "Bogus unregister call did not raise TypeError"
105
106 # can't unregister non-existent object
107 p = select.poll()
108 try:
109 p.unregister(3)
110 except KeyError:
111 pass
112 else:
113 print "Bogus unregister call did not raise KeyError"
114
115 # Test error cases
116 pollster = select.poll()
117 class Nope:
118 pass
119
120 class Almost:
121 def fileno(self):
122 return 'fileno'
123
124 try:
125 pollster.register( Nope(), 0 )
126 except TypeError: pass
127 else: print 'expected TypeError exception, not raised'
128
129 try:
130 pollster.register( Almost(), 0 )
131 except TypeError: pass
132 else: print 'expected TypeError exception, not raised'
133
134
135# Another test case for poll(). This is copied from the test case for
136# select(), modified to use poll() instead.
137
138def test_poll2():
139 print 'Running poll test 2'
140 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
141 p = os.popen(cmd, 'r')
142 pollster = select.poll()
143 pollster.register( p, select.POLLIN )
144 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
145 if verbose:
146 print 'timeout =', tout
147 fdlist = pollster.poll(tout)
148 if (fdlist == []):
149 continue
Andrew M. Kuchlingd50a1872000-08-29 16:53:34 +0000150 fd, flags = fdlist[0]
151 if flags & select.POLLHUP:
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000152 line = p.readline()
153 if line != "":
154 print 'error: pipe seems to be closed, but still returns data'
155 continue
156
Andrew M. Kuchlingd50a1872000-08-29 16:53:34 +0000157 elif flags & select.POLLIN:
Andrew M. Kuchling3227cc82000-08-25 01:18:45 +0000158 line = p.readline()
159 if verbose:
160 print `line`
161 if not line:
162 if verbose:
163 print 'EOF'
164 break
165 continue
166 else:
167 print 'Unexpected return value from select.poll:', fdlist
168 p.close()
169 print 'Poll test 2 complete'
170
171test_poll1()
172test_poll2()