blob: 694ef96215002411ee51f34e3f1a9d216f2b6651 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Analogue of `multiprocessing.connection` which uses queues instead of sockets
3#
4# multiprocessing/dummy/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerkef453802013-01-01 14:25:59 +00007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10__all__ = [ 'Client', 'Listener', 'Pipe' ]
11
12from queue import Queue
13
14
15families = [None]
16
17
18class Listener(object):
19
20 def __init__(self, address=None, family=None, backlog=1):
21 self._backlog_queue = Queue(backlog)
22
23 def accept(self):
24 return Connection(*self._backlog_queue.get())
25
26 def close(self):
27 self._backlog_queue = None
28
29 address = property(lambda self: self._backlog_queue)
30
Richard Oudkerkd69cfe82012-06-18 17:47:52 +010031 def __enter__(self):
32 return self
33
34 def __exit__(self, exc_type, exc_value, exc_tb):
35 self.close()
36
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38def Client(address):
39 _in, _out = Queue(), Queue()
40 address.put((_out, _in))
41 return Connection(_in, _out)
42
43
44def Pipe(duplex=True):
45 a, b = Queue(), Queue()
46 return Connection(a, b), Connection(b, a)
47
48
49class Connection(object):
50
51 def __init__(self, _in, _out):
52 self._out = _out
53 self._in = _in
54 self.send = self.send_bytes = _out.put
55 self.recv = self.recv_bytes = _in.get
56
57 def poll(self, timeout=0.0):
58 if self._in.qsize() > 0:
59 return True
60 if timeout <= 0.0:
61 return False
62 self._in.not_empty.acquire()
63 self._in.not_empty.wait(timeout)
64 self._in.not_empty.release()
65 return self._in.qsize() > 0
66
67 def close(self):
68 pass
Richard Oudkerkd69cfe82012-06-18 17:47:52 +010069
70 def __enter__(self):
71 return self
72
73 def __exit__(self, exc_type, exc_value, exc_tb):
74 self.close()