blob: 980aee619ddce61698da5cd2eb562d33be2ed00d [file] [log] [blame]
Guido van Rossum4acc25b2000-02-02 15:10:15 +00001"""A multi-producer, multi-consumer queue."""
Guido van Rossum9022fce1992-08-25 12:30:44 +00002
Martin v. Löwis77ac4292002-10-15 15:11:13 +00003from time import time as _time, sleep as _sleep
4
Brett Cannonb42bb5a2003-07-01 05:34:27 +00005__all__ = ['Empty', 'Full', 'Queue']
6
Tim Petersb8c94172001-01-15 22:53:46 +00007class Empty(Exception):
8 "Exception raised by Queue.get(block=0)/get_nowait()."
9 pass
10
11class Full(Exception):
12 "Exception raised by Queue.put(block=0)/put_nowait()."
13 pass
Guido van Rossum9022fce1992-08-25 12:30:44 +000014
15class Queue:
Guido van Rossuma41c6911999-09-09 14:54:28 +000016 def __init__(self, maxsize=0):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000017 """Initialize a queue object with a given maximum size.
Guido van Rossum9022fce1992-08-25 12:30:44 +000018
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000019 If maxsize is <= 0, the queue size is infinite.
20 """
Guido van Rossuma0934242002-12-30 22:36:09 +000021 try:
22 import thread
23 except ImportError:
24 import dummy_thread as thread
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000025 self._init(maxsize)
26 self.mutex = thread.allocate_lock()
27 self.esema = thread.allocate_lock()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000028 self.esema.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000029 self.fsema = thread.allocate_lock()
Guido van Rossum9022fce1992-08-25 12:30:44 +000030
Barry Warsaw3d96d521997-11-20 19:56:38 +000031 def qsize(self):
Guido van Rossum9e1721f1999-02-08 18:34:01 +000032 """Return the approximate size of the queue (not reliable!)."""
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000033 self.mutex.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000034 n = self._qsize()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000035 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000036 return n
Barry Warsaw3d96d521997-11-20 19:56:38 +000037
38 def empty(self):
Martin v. Löwis77ac4292002-10-15 15:11:13 +000039 """Return True if the queue is empty, False otherwise (not reliable!)."""
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000040 self.mutex.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000041 n = self._empty()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000042 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000043 return n
Barry Warsaw3d96d521997-11-20 19:56:38 +000044
45 def full(self):
Martin v. Löwis77ac4292002-10-15 15:11:13 +000046 """Return True if the queue is full, False otherwise (not reliable!)."""
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000047 self.mutex.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000048 n = self._full()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000049 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000050 return n
Barry Warsaw3d96d521997-11-20 19:56:38 +000051
Martin v. Löwis77ac4292002-10-15 15:11:13 +000052 def put(self, item, block=True, timeout=None):
Guido van Rossumc09e6b11998-04-09 14:25:32 +000053 """Put an item into the queue.
54
Martin v. Löwis77ac4292002-10-15 15:11:13 +000055 If optional args 'block' is true and 'timeout' is None (the default),
56 block if necessary until a free slot is available. If 'timeout' is
57 a positive number, it blocks at most 'timeout' seconds and raises
58 the Full exception if no free slot was available within that time.
59 Otherwise ('block' is false), put an item on the queue if a free slot
60 is immediately available, else raise the Full exception ('timeout'
61 is ignored in that case).
Guido van Rossum9e1721f1999-02-08 18:34:01 +000062 """
63 if block:
Martin v. Löwis77ac4292002-10-15 15:11:13 +000064 if timeout is None:
65 # blocking, w/o timeout, i.e. forever
66 self.fsema.acquire()
67 elif timeout >= 0:
68 # waiting max. 'timeout' seconds.
69 # this code snipped is from threading.py: _Event.wait():
70 # Balancing act: We can't afford a pure busy loop, so we
71 # have to sleep; but if we sleep the whole timeout time,
72 # we'll be unresponsive. The scheme here sleeps very
73 # little at first, longer as time goes on, but never longer
74 # than 20 times per second (or the timeout time remaining).
75 delay = 0.0005 # 500 us -> initial delay of 1 ms
76 endtime = _time() + timeout
77 while True:
78 if self.fsema.acquire(0):
79 break
80 remaining = endtime - _time()
81 if remaining <= 0: #time is over and no slot was free
82 raise Full
83 delay = min(delay * 2, remaining, .05)
84 _sleep(delay) #reduce CPU usage by using a sleep
85 else:
86 raise ValueError("'timeout' must be a positive number")
Guido van Rossum9e1721f1999-02-08 18:34:01 +000087 elif not self.fsema.acquire(0):
88 raise Full
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000089 self.mutex.acquire()
Mark Hammond3b959db2002-04-19 00:11:32 +000090 release_fsema = True
91 try:
92 was_empty = self._empty()
93 self._put(item)
94 # If we fail before here, the empty state has
95 # not changed, so we can skip the release of esema
96 if was_empty:
97 self.esema.release()
98 # If we fail before here, the queue can not be full, so
99 # release_full_sema remains True
100 release_fsema = not self._full()
101 finally:
102 # Catching system level exceptions here (RecursionDepth,
103 # OutOfMemory, etc) - so do as little as possible in terms
104 # of Python calls.
105 if release_fsema:
106 self.fsema.release()
107 self.mutex.release()
Barry Warsaw3d96d521997-11-20 19:56:38 +0000108
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000109 def put_nowait(self, item):
110 """Put an item into the queue without blocking.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000111
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000112 Only enqueue the item if a free slot is immediately available.
113 Otherwise raise the Full exception.
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000114 """
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000115 return self.put(item, False)
Barry Warsaw3d96d521997-11-20 19:56:38 +0000116
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000117 def get(self, block=True, timeout=None):
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000118 """Remove and return an item from the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000119
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000120 If optional args 'block' is true and 'timeout' is None (the default),
121 block if necessary until an item is available. If 'timeout' is
122 a positive number, it blocks at most 'timeout' seconds and raises
123 the Empty exception if no item was available within that time.
124 Otherwise ('block' is false), return an item if one is immediately
125 available, else raise the Empty exception ('timeout' is ignored
126 in that case).
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000127 """
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000128 if block:
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000129 if timeout is None:
130 # blocking, w/o timeout, i.e. forever
131 self.esema.acquire()
132 elif timeout >= 0:
133 # waiting max. 'timeout' seconds.
134 # this code snipped is from threading.py: _Event.wait():
135 # Balancing act: We can't afford a pure busy loop, so we
136 # have to sleep; but if we sleep the whole timeout time,
137 # we'll be unresponsive. The scheme here sleeps very
138 # little at first, longer as time goes on, but never longer
139 # than 20 times per second (or the timeout time remaining).
140 delay = 0.0005 # 500 us -> initial delay of 1 ms
141 endtime = _time() + timeout
142 while 1:
143 if self.esema.acquire(0):
144 break
145 remaining = endtime - _time()
146 if remaining <= 0: #time is over and no element arrived
147 raise Empty
148 delay = min(delay * 2, remaining, .05)
149 _sleep(delay) #reduce CPU usage by using a sleep
150 else:
151 raise ValueError("'timeout' must be a positive number")
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000152 elif not self.esema.acquire(0):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000153 raise Empty
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000154 self.mutex.acquire()
Mark Hammond3b959db2002-04-19 00:11:32 +0000155 release_esema = True
156 try:
157 was_full = self._full()
158 item = self._get()
159 # If we fail before here, the full state has
160 # not changed, so we can skip the release of fsema
161 if was_full:
162 self.fsema.release()
163 # Failure means empty state also unchanged - release_esema
164 # remains True.
165 release_esema = not self._empty()
166 finally:
167 if release_esema:
168 self.esema.release()
169 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000170 return item
Guido van Rossum9022fce1992-08-25 12:30:44 +0000171
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000172 def get_nowait(self):
173 """Remove and return an item from the queue without blocking.
Guido van Rossum9022fce1992-08-25 12:30:44 +0000174
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000175 Only get an item if one is immediately available. Otherwise
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000176 raise the Empty exception.
177 """
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000178 return self.get(False)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000179
Barry Warsaw3d96d521997-11-20 19:56:38 +0000180 # Override these methods to implement other queue organizations
181 # (e.g. stack or priority queue).
182 # These will only be called with appropriate locks held
Guido van Rossum9022fce1992-08-25 12:30:44 +0000183
Barry Warsaw3d96d521997-11-20 19:56:38 +0000184 # Initialize the queue representation
185 def _init(self, maxsize):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000186 self.maxsize = maxsize
187 self.queue = []
Guido van Rossum9022fce1992-08-25 12:30:44 +0000188
Barry Warsaw3d96d521997-11-20 19:56:38 +0000189 def _qsize(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000190 return len(self.queue)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000191
Jeremy Hyltona05e2932000-06-28 14:48:01 +0000192 # Check whether the queue is empty
Barry Warsaw3d96d521997-11-20 19:56:38 +0000193 def _empty(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000194 return not self.queue
Guido van Rossum9022fce1992-08-25 12:30:44 +0000195
Barry Warsaw3d96d521997-11-20 19:56:38 +0000196 # Check whether the queue is full
197 def _full(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000198 return self.maxsize > 0 and len(self.queue) == self.maxsize
Guido van Rossum9022fce1992-08-25 12:30:44 +0000199
Barry Warsaw3d96d521997-11-20 19:56:38 +0000200 # Put a new item in the queue
201 def _put(self, item):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000202 self.queue.append(item)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000203
Barry Warsaw3d96d521997-11-20 19:56:38 +0000204 # Get an item from the queue
205 def _get(self):
Raymond Hettinger46ac8eb2002-06-30 03:39:14 +0000206 return self.queue.pop(0)