blob: 877a54ecc3b99db6d347d8e8c6ac27682e7c8ef0 [file] [log] [blame]
Christian Heimes4fbc72b2008-03-22 00:47:35 +00001# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2#
3# Permission is hereby granted, free of charge, to any person obtaining
4# a copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish,
7# distribute, sublicense, and/or sell copies of the Software, and to
8# permit persons to whom the Software is furnished to do so, subject to
9# the following conditions:
10#
11# The above copyright notice and this permission notice shall be
12# included in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21"""
22Tests for epoll wrapper.
23"""
24import os
25import socket
26import errno
27import time
28import select
29import tempfile
30import unittest
31
32from test import test_support
33if not hasattr(select, "epoll"):
34 raise test_support.TestSkipped("test works only on Linux 2.6")
35
36class TestEPoll(unittest.TestCase):
37
38 def setUp(self):
39 self.serverSocket = socket.socket()
40 self.serverSocket.bind(('127.0.0.1', 0))
41 self.serverSocket.listen(1)
42 self.connections = [self.serverSocket]
43
44
45 def tearDown(self):
46 for skt in self.connections:
47 skt.close()
48
49 def _connected_pair(self):
50 client = socket.socket()
51 client.setblocking(False)
52 try:
53 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
54 except socket.error as e:
55 self.assertEquals(e.args[0], errno.EINPROGRESS)
56 else:
57 raise AssertionError("Connect should have raised EINPROGRESS")
58 server, addr = self.serverSocket.accept()
59
60 self.connections.extend((client, server))
61 return client, server
62
63 def test_create(self):
64 try:
65 ep = select.epoll(16)
66 except OSError as e:
67 raise AssertionError(str(e))
68 self.assert_(ep.fileno() > 0, ep.fileno())
69 self.assert_(not ep.closed)
70 ep.close()
71 self.assert_(ep.closed)
72 self.assertRaises(ValueError, ep.fileno)
73
74 def test_badcreate(self):
75 self.assertRaises(TypeError, select.epoll, 1, 2, 3)
76 self.assertRaises(TypeError, select.epoll, 'foo')
77 self.assertRaises(TypeError, select.epoll, None)
78 self.assertRaises(TypeError, select.epoll, ())
79 self.assertRaises(TypeError, select.epoll, ['foo'])
80 self.assertRaises(TypeError, select.epoll, {})
81
82 def test_add(self):
83 server, client = self._connected_pair()
84
85 ep = select.epoll(2)
86 try:
87 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
88 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
89 finally:
90 ep.close()
91
92 def test_fromfd(self):
93 server, client = self._connected_pair()
94
95 ep = select.epoll(2)
96 ep2 = select.epoll.fromfd(ep.fileno())
97
98 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
99 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
100
101 events = ep.poll(1, 4)
102 events2 = ep2.poll(0.9, 4)
103 self.assertEqual(len(events), 2)
104 self.assertEqual(len(events2), 2)
105
106 ep.close()
107 try:
108 ep2.poll(1, 4)
109 except IOError as e:
110 self.failUnlessEqual(e.args[0], errno.EBADF, e)
111 else:
112 self.fail("epoll on closed fd didn't raise EBADF")
113
114 def test_control_and_wait(self):
115 client, server = self._connected_pair()
116
117 ep = select.epoll(16)
118 ep.register(server.fileno(),
119 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
120 ep.register(client.fileno(),
121 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
122
123 now = time.time()
124 events = ep.poll(1, 4)
125 then = time.time()
126 self.failIf(then - now > 0.1, then - now)
127
128 events.sort()
129 expected = [(client.fileno(), select.EPOLLOUT),
130 (server.fileno(), select.EPOLLOUT)]
131 expected.sort()
132
133 self.assertEquals(events, expected)
134 self.failIf(then - now > 0.01, then - now)
135
136 now = time.time()
137 events = ep.poll(timeout=2.1, maxevents=4)
138 then = time.time()
139 self.failIf(events)
140
141 client.send(b"Hello!")
142 server.send(b"world!!!")
143
144 now = time.time()
145 events = ep.poll(1, 4)
146 then = time.time()
147 self.failIf(then - now > 0.01)
148
149 events.sort()
150 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
151 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
152 expected.sort()
153
154 self.assertEquals(events, expected)
155
156 ep.unregister(client.fileno())
157 ep.modify(server.fileno(), select.EPOLLOUT)
158 now = time.time()
159 events = ep.poll(1, 4)
160 then = time.time()
161 self.failIf(then - now > 0.01)
162
163 expected = [(server.fileno(), select.EPOLLOUT)]
164 self.assertEquals(events, expected)
165
166 def test_errors(self):
167 self.assertRaises(ValueError, select.epoll, -2)
168 self.assertRaises(ValueError, select.epoll().register, -1,
169 select.EPOLLIN)
170
171 def test_unregister_closed(self):
172 server, client = self._connected_pair()
173 fd = server.fileno()
174 ep = select.epoll(16)
175 ep.register(server)
176
177 now = time.time()
178 events = ep.poll(1, 4)
179 then = time.time()
180 self.failIf(then - now > 0.01)
181
182 server.close()
183 ep.unregister(fd)
184
185def test_main():
186 test_support.run_unittest(TestEPoll)
187
188if __name__ == "__main__":
189 test_main()