blob: 1e30d61c506e7989bd6635c0da7d3a1814ff45b8 [file] [log] [blame]
Christian Heimes4fbc72b2008-03-22 00:47:35 +00001# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2#
3# Permission is hereby granted, free of charge, to any person obtaining
4# a copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish,
7# distribute, sublicense, and/or sell copies of the Software, and to
8# permit persons to whom the Software is furnished to do so, subject to
9# the following conditions:
10#
11# The above copyright notice and this permission notice shall be
12# included in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21"""
22Tests for epoll wrapper.
23"""
24import os
25import socket
26import errno
27import time
28import select
29import tempfile
30import unittest
31
32from test import test_support
33if not hasattr(select, "epoll"):
34 raise test_support.TestSkipped("test works only on Linux 2.6")
35
Christian Heimesfe337bf2008-03-23 21:54:12 +000036try:
37 select.epoll()
38except IOError as e:
39 if e.errno == errno.ENOSYS:
40 raise test_support.TestSkipped("kernel doesn't support epoll()")
41
Christian Heimes4fbc72b2008-03-22 00:47:35 +000042class TestEPoll(unittest.TestCase):
43
44 def setUp(self):
45 self.serverSocket = socket.socket()
46 self.serverSocket.bind(('127.0.0.1', 0))
47 self.serverSocket.listen(1)
48 self.connections = [self.serverSocket]
49
50
51 def tearDown(self):
52 for skt in self.connections:
53 skt.close()
54
55 def _connected_pair(self):
56 client = socket.socket()
57 client.setblocking(False)
58 try:
59 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
60 except socket.error as e:
61 self.assertEquals(e.args[0], errno.EINPROGRESS)
62 else:
63 raise AssertionError("Connect should have raised EINPROGRESS")
64 server, addr = self.serverSocket.accept()
65
66 self.connections.extend((client, server))
67 return client, server
68
69 def test_create(self):
70 try:
71 ep = select.epoll(16)
72 except OSError as e:
73 raise AssertionError(str(e))
74 self.assert_(ep.fileno() > 0, ep.fileno())
75 self.assert_(not ep.closed)
76 ep.close()
77 self.assert_(ep.closed)
78 self.assertRaises(ValueError, ep.fileno)
79
80 def test_badcreate(self):
81 self.assertRaises(TypeError, select.epoll, 1, 2, 3)
82 self.assertRaises(TypeError, select.epoll, 'foo')
83 self.assertRaises(TypeError, select.epoll, None)
84 self.assertRaises(TypeError, select.epoll, ())
85 self.assertRaises(TypeError, select.epoll, ['foo'])
86 self.assertRaises(TypeError, select.epoll, {})
87
88 def test_add(self):
89 server, client = self._connected_pair()
90
91 ep = select.epoll(2)
92 try:
93 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
94 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
95 finally:
96 ep.close()
97
98 def test_fromfd(self):
99 server, client = self._connected_pair()
100
101 ep = select.epoll(2)
102 ep2 = select.epoll.fromfd(ep.fileno())
103
104 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
105 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
106
107 events = ep.poll(1, 4)
108 events2 = ep2.poll(0.9, 4)
109 self.assertEqual(len(events), 2)
110 self.assertEqual(len(events2), 2)
111
112 ep.close()
113 try:
114 ep2.poll(1, 4)
115 except IOError as e:
116 self.failUnlessEqual(e.args[0], errno.EBADF, e)
117 else:
118 self.fail("epoll on closed fd didn't raise EBADF")
119
120 def test_control_and_wait(self):
121 client, server = self._connected_pair()
122
123 ep = select.epoll(16)
124 ep.register(server.fileno(),
125 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
126 ep.register(client.fileno(),
127 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
128
129 now = time.time()
130 events = ep.poll(1, 4)
131 then = time.time()
132 self.failIf(then - now > 0.1, then - now)
133
134 events.sort()
135 expected = [(client.fileno(), select.EPOLLOUT),
136 (server.fileno(), select.EPOLLOUT)]
137 expected.sort()
138
139 self.assertEquals(events, expected)
140 self.failIf(then - now > 0.01, then - now)
141
142 now = time.time()
143 events = ep.poll(timeout=2.1, maxevents=4)
144 then = time.time()
145 self.failIf(events)
146
147 client.send(b"Hello!")
148 server.send(b"world!!!")
149
150 now = time.time()
151 events = ep.poll(1, 4)
152 then = time.time()
153 self.failIf(then - now > 0.01)
154
155 events.sort()
156 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
157 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
158 expected.sort()
159
160 self.assertEquals(events, expected)
161
162 ep.unregister(client.fileno())
163 ep.modify(server.fileno(), select.EPOLLOUT)
164 now = time.time()
165 events = ep.poll(1, 4)
166 then = time.time()
167 self.failIf(then - now > 0.01)
168
169 expected = [(server.fileno(), select.EPOLLOUT)]
170 self.assertEquals(events, expected)
171
172 def test_errors(self):
173 self.assertRaises(ValueError, select.epoll, -2)
174 self.assertRaises(ValueError, select.epoll().register, -1,
175 select.EPOLLIN)
176
177 def test_unregister_closed(self):
178 server, client = self._connected_pair()
179 fd = server.fileno()
180 ep = select.epoll(16)
181 ep.register(server)
182
183 now = time.time()
184 events = ep.poll(1, 4)
185 then = time.time()
186 self.failIf(then - now > 0.01)
187
188 server.close()
189 ep.unregister(fd)
190
191def test_main():
192 test_support.run_unittest(TestEPoll)
193
194if __name__ == "__main__":
195 test_main()