| """ |
| Tests for kqueue wrapper. |
| """ |
| import errno |
| import os |
| import select |
| import socket |
| import sys |
| import time |
| import unittest |
| |
| from test import support |
| if not hasattr(select, "kqueue"): |
| raise unittest.SkipTest("test works only on BSD") |
| |
| class TestKQueue(unittest.TestCase): |
| def test_create_queue(self): |
| kq = select.kqueue() |
| self.assertTrue(kq.fileno() > 0, kq.fileno()) |
| self.assertTrue(not kq.closed) |
| kq.close() |
| self.assertTrue(kq.closed) |
| self.assertRaises(ValueError, kq.fileno) |
| |
| def test_create_event(self): |
| from operator import lt, le, gt, ge |
| |
| fd = os.open(os.devnull, os.O_WRONLY) |
| self.addCleanup(os.close, fd) |
| |
| ev = select.kevent(fd) |
| other = select.kevent(1000) |
| self.assertEqual(ev.ident, fd) |
| self.assertEqual(ev.filter, select.KQ_FILTER_READ) |
| self.assertEqual(ev.flags, select.KQ_EV_ADD) |
| self.assertEqual(ev.fflags, 0) |
| self.assertEqual(ev.data, 0) |
| self.assertEqual(ev.udata, 0) |
| self.assertEqual(ev, ev) |
| self.assertNotEqual(ev, other) |
| self.assertTrue(ev < other) |
| self.assertTrue(other >= ev) |
| for op in lt, le, gt, ge: |
| self.assertRaises(TypeError, op, ev, None) |
| self.assertRaises(TypeError, op, ev, 1) |
| self.assertRaises(TypeError, op, ev, "ev") |
| |
| ev = select.kevent(fd, select.KQ_FILTER_WRITE) |
| self.assertEqual(ev.ident, fd) |
| self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) |
| self.assertEqual(ev.flags, select.KQ_EV_ADD) |
| self.assertEqual(ev.fflags, 0) |
| self.assertEqual(ev.data, 0) |
| self.assertEqual(ev.udata, 0) |
| self.assertEqual(ev, ev) |
| self.assertNotEqual(ev, other) |
| |
| ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) |
| self.assertEqual(ev.ident, fd) |
| self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) |
| self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) |
| self.assertEqual(ev.fflags, 0) |
| self.assertEqual(ev.data, 0) |
| self.assertEqual(ev.udata, 0) |
| self.assertEqual(ev, ev) |
| self.assertNotEqual(ev, other) |
| |
| ev = select.kevent(1, 2, 3, 4, 5, 6) |
| self.assertEqual(ev.ident, 1) |
| self.assertEqual(ev.filter, 2) |
| self.assertEqual(ev.flags, 3) |
| self.assertEqual(ev.fflags, 4) |
| self.assertEqual(ev.data, 5) |
| self.assertEqual(ev.udata, 6) |
| self.assertEqual(ev, ev) |
| self.assertNotEqual(ev, other) |
| |
| bignum = 0x7fff |
| ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum) |
| self.assertEqual(ev.ident, bignum) |
| self.assertEqual(ev.filter, 1) |
| self.assertEqual(ev.flags, 2) |
| self.assertEqual(ev.fflags, 3) |
| self.assertEqual(ev.data, bignum - 1) |
| self.assertEqual(ev.udata, bignum) |
| self.assertEqual(ev, ev) |
| self.assertNotEqual(ev, other) |
| |
| def test_queue_event(self): |
| serverSocket = socket.socket() |
| serverSocket.bind(('127.0.0.1', 0)) |
| serverSocket.listen(1) |
| client = socket.socket() |
| client.setblocking(False) |
| try: |
| client.connect(('127.0.0.1', serverSocket.getsockname()[1])) |
| except OSError as e: |
| self.assertEqual(e.args[0], errno.EINPROGRESS) |
| else: |
| #raise AssertionError("Connect should have raised EINPROGRESS") |
| pass # FreeBSD doesn't raise an exception here |
| server, addr = serverSocket.accept() |
| |
| kq = select.kqueue() |
| kq2 = select.kqueue.fromfd(kq.fileno()) |
| |
| ev = select.kevent(server.fileno(), |
| select.KQ_FILTER_WRITE, |
| select.KQ_EV_ADD | select.KQ_EV_ENABLE) |
| kq.control([ev], 0) |
| ev = select.kevent(server.fileno(), |
| select.KQ_FILTER_READ, |
| select.KQ_EV_ADD | select.KQ_EV_ENABLE) |
| kq.control([ev], 0) |
| ev = select.kevent(client.fileno(), |
| select.KQ_FILTER_WRITE, |
| select.KQ_EV_ADD | select.KQ_EV_ENABLE) |
| kq2.control([ev], 0) |
| ev = select.kevent(client.fileno(), |
| select.KQ_FILTER_READ, |
| select.KQ_EV_ADD | select.KQ_EV_ENABLE) |
| kq2.control([ev], 0) |
| |
| events = kq.control(None, 4, 1) |
| events = set((e.ident, e.filter) for e in events) |
| self.assertEqual(events, set([ |
| (client.fileno(), select.KQ_FILTER_WRITE), |
| (server.fileno(), select.KQ_FILTER_WRITE)])) |
| |
| client.send(b"Hello!") |
| server.send(b"world!!!") |
| |
| # We may need to call it several times |
| for i in range(10): |
| events = kq.control(None, 4, 1) |
| if len(events) == 4: |
| break |
| time.sleep(1.0) |
| else: |
| self.fail('timeout waiting for event notifications') |
| |
| events = set((e.ident, e.filter) for e in events) |
| self.assertEqual(events, set([ |
| (client.fileno(), select.KQ_FILTER_WRITE), |
| (client.fileno(), select.KQ_FILTER_READ), |
| (server.fileno(), select.KQ_FILTER_WRITE), |
| (server.fileno(), select.KQ_FILTER_READ)])) |
| |
| # Remove completely client, and server read part |
| ev = select.kevent(client.fileno(), |
| select.KQ_FILTER_WRITE, |
| select.KQ_EV_DELETE) |
| kq.control([ev], 0) |
| ev = select.kevent(client.fileno(), |
| select.KQ_FILTER_READ, |
| select.KQ_EV_DELETE) |
| kq.control([ev], 0) |
| ev = select.kevent(server.fileno(), |
| select.KQ_FILTER_READ, |
| select.KQ_EV_DELETE) |
| kq.control([ev], 0, 0) |
| |
| events = kq.control([], 4, 0.99) |
| events = set((e.ident, e.filter) for e in events) |
| self.assertEqual(events, set([ |
| (server.fileno(), select.KQ_FILTER_WRITE)])) |
| |
| client.close() |
| server.close() |
| serverSocket.close() |
| |
| def testPair(self): |
| kq = select.kqueue() |
| a, b = socket.socketpair() |
| |
| a.send(b'foo') |
| event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) |
| event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) |
| r = kq.control([event1, event2], 1, 1) |
| self.assertTrue(r) |
| self.assertFalse(r[0].flags & select.KQ_EV_ERROR) |
| self.assertEqual(b.recv(r[0].data), b'foo') |
| |
| a.close() |
| b.close() |
| kq.close() |
| |
| def test_close(self): |
| open_file = open(__file__, "rb") |
| self.addCleanup(open_file.close) |
| fd = open_file.fileno() |
| kqueue = select.kqueue() |
| |
| # test fileno() method and closed attribute |
| self.assertIsInstance(kqueue.fileno(), int) |
| self.assertFalse(kqueue.closed) |
| |
| # test close() |
| kqueue.close() |
| self.assertTrue(kqueue.closed) |
| self.assertRaises(ValueError, kqueue.fileno) |
| |
| # close() can be called more than once |
| kqueue.close() |
| |
| # operations must fail with ValueError("I/O operation on closed ...") |
| self.assertRaises(ValueError, kqueue.control, None, 4) |
| |
| def test_fd_non_inheritable(self): |
| kqueue = select.kqueue() |
| self.addCleanup(kqueue.close) |
| self.assertEqual(os.get_inheritable(kqueue.fileno()), False) |
| |
| |
| def test_main(): |
| support.run_unittest(TestKQueue) |
| |
| if __name__ == "__main__": |
| test_main() |