blob: b26edfbe4ff0cab6083da01002040ed9082da41c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinnereaf16ab2015-07-25 02:40:40 +02003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Victor Stinnereaf16ab2015-07-25 02:40:40 +02008from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import events
10from . import futures
11from . import locks
Victor Stinnere6ecea52015-07-09 23:13:50 +020012from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010016 """Exception raised when Queue.get_nowait() is called on a Queue object
17 which is empty.
18 """
Guido van Rossumfef70982014-01-25 17:24:51 -080019 pass
20
21
22class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010023 """Exception raised when the Queue.put_nowait() method is called on a Queue
24 object which is full.
25 """
Guido van Rossumfef70982014-01-25 17:24:51 -080026 pass
27
28
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029class Queue:
30 """A queue, useful for coordinating producer and consumer coroutines.
31
32 If maxsize is less than or equal to zero, the queue size is infinite. If it
33 is an integer greater than 0, then "yield from put()" will block when the
34 queue reaches maxsize, until an item is removed by get().
35
36 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010037 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 interrupted between calling qsize() and doing an operation on the Queue.
39 """
40
41 def __init__(self, maxsize=0, *, loop=None):
42 if loop is None:
43 self._loop = events.get_event_loop()
44 else:
45 self._loop = loop
46 self._maxsize = maxsize
47
48 # Futures.
49 self._getters = collections.deque()
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -040050 # Futures
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010052 self._unfinished_tasks = 0
53 self._finished = locks.Event(loop=self._loop)
54 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 self._init(maxsize)
56
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070057 # These three are overridable in subclasses.
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 def _init(self, maxsize):
60 self._queue = collections.deque()
61
62 def _get(self):
63 return self._queue.popleft()
64
65 def _put(self, item):
66 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070067
68 # End of the overridable methods.
69
70 def __put_internal(self, item):
71 self._put(item)
Victor Stinner4cb814c2015-02-17 22:53:28 +010072 self._unfinished_tasks += 1
73 self._finished.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074
75 def __repr__(self):
76 return '<{} at {:#x} {}>'.format(
77 type(self).__name__, id(self), self._format())
78
79 def __str__(self):
80 return '<{} {}>'.format(type(self).__name__, self._format())
81
82 def _format(self):
83 result = 'maxsize={!r}'.format(self._maxsize)
84 if getattr(self, '_queue', None):
85 result += ' _queue={!r}'.format(list(self._queue))
86 if self._getters:
87 result += ' _getters[{}]'.format(len(self._getters))
88 if self._putters:
89 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010090 if self._unfinished_tasks:
91 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 return result
93
94 def _consume_done_getters(self):
95 # Delete waiters at the head of the get() queue who've timed out.
96 while self._getters and self._getters[0].done():
97 self._getters.popleft()
98
99 def _consume_done_putters(self):
100 # Delete waiters at the head of the put() queue who've timed out.
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400101 while self._putters and self._putters[0].done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 self._putters.popleft()
103
104 def qsize(self):
105 """Number of items in the queue."""
106 return len(self._queue)
107
108 @property
109 def maxsize(self):
110 """Number of items allowed in the queue."""
111 return self._maxsize
112
113 def empty(self):
114 """Return True if the queue is empty, False otherwise."""
115 return not self._queue
116
117 def full(self):
118 """Return True if there are maxsize items in the queue.
119
120 Note: if the Queue was initialized with maxsize=0 (the default),
121 then full() is never True.
122 """
123 if self._maxsize <= 0:
124 return False
125 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200126 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 @coroutine
129 def put(self, item):
130 """Put an item into the queue.
131
Victor Stinner952ec982014-12-22 22:09:50 +0100132 Put an item into the queue. If the queue is full, wait until a free
133 slot is available before adding item.
134
135 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 """
137 self._consume_done_getters()
138 if self._getters:
139 assert not self._queue, (
140 'queue non-empty, why are getters waiting?')
141
142 getter = self._getters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700143 self.__put_internal(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100144
145 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 getter.set_result(self._get())
147
Victor Stinner66dc6b02014-06-17 23:36:21 +0200148 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 waiter = futures.Future(loop=self._loop)
150
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400151 self._putters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 yield from waiter
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400153 self._put(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
155 else:
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700156 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 def put_nowait(self, item):
159 """Put an item into the queue without blocking.
160
Guido van Rossumfef70982014-01-25 17:24:51 -0800161 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 """
163 self._consume_done_getters()
164 if self._getters:
165 assert not self._queue, (
166 'queue non-empty, why are getters waiting?')
167
168 getter = self._getters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700169 self.__put_internal(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100170
171 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 getter.set_result(self._get())
173
Victor Stinner66dc6b02014-06-17 23:36:21 +0200174 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800175 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176 else:
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700177 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178
179 @coroutine
180 def get(self):
181 """Remove and return an item from the queue.
182
Victor Stinner952ec982014-12-22 22:09:50 +0100183 If queue is empty, wait until an item is available.
184
185 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 """
187 self._consume_done_putters()
188 if self._putters:
189 assert self.full(), 'queue not full, why are putters waiting?'
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400190 putter = self._putters.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191
192 # When a getter runs and frees up a slot so this putter can
193 # run, we need to defer the put for a tick to ensure that
194 # getters and putters alternate perfectly. See
195 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200196 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197
198 return self._get()
199
200 elif self.qsize():
201 return self._get()
202 else:
203 waiter = futures.Future(loop=self._loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 self._getters.append(waiter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400205 try:
206 return (yield from waiter)
207 except futures.CancelledError:
208 # if we get CancelledError, it means someone cancelled this
209 # get() coroutine. But there is a chance that the waiter
210 # already is ready and contains an item that has just been
211 # removed from the queue. In this case, we need to put the item
212 # back into the front of the queue. This get() must either
213 # succeed without fault or, if it gets cancelled, it must be as
214 # if it never happened.
215 if waiter.done():
216 self._put_it_back(waiter.result())
217 raise
218
219 def _put_it_back(self, item):
220 """
221 This is called when we have a waiter to get() an item and this waiter
222 gets cancelled. In this case, we put the item back: wake up another
223 waiter or put it in the _queue.
224 """
225 self._consume_done_getters()
226 if self._getters:
227 assert not self._queue, (
228 'queue non-empty, why are getters waiting?')
229
230 getter = self._getters.popleft()
231 self._put_internal(item)
232
233 # getter cannot be cancelled, we just removed done getters
234 getter.set_result(item)
235 else:
236 self._queue.appendleft(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237
238 def get_nowait(self):
239 """Remove and return an item from the queue.
240
Guido van Rossumfef70982014-01-25 17:24:51 -0800241 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700242 """
243 self._consume_done_putters()
244 if self._putters:
245 assert self.full(), 'queue not full, why are putters waiting?'
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400246 putter = self._putters.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100248
249 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 putter.set_result(None)
251
252 return self._get()
253
254 elif self.qsize():
255 return self._get()
256 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800257 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258
Victor Stinner4cb814c2015-02-17 22:53:28 +0100259 def task_done(self):
260 """Indicate that a formerly enqueued task is complete.
261
262 Used by queue consumers. For each get() used to fetch a task,
263 a subsequent call to task_done() tells the queue that the processing
264 on the task is complete.
265
266 If a join() is currently blocking, it will resume when all items have
267 been processed (meaning that a task_done() call was received for every
268 item that had been put() into the queue).
269
270 Raises ValueError if called more times than there were items placed in
271 the queue.
272 """
273 if self._unfinished_tasks <= 0:
274 raise ValueError('task_done() called too many times')
275 self._unfinished_tasks -= 1
276 if self._unfinished_tasks == 0:
277 self._finished.set()
278
279 @coroutine
280 def join(self):
281 """Block until all items in the queue have been gotten and processed.
282
283 The count of unfinished tasks goes up whenever an item is added to the
284 queue. The count goes down whenever a consumer calls task_done() to
285 indicate that the item was retrieved and all work on it is complete.
286 When the count of unfinished tasks drops to zero, join() unblocks.
287 """
288 if self._unfinished_tasks > 0:
289 yield from self._finished.wait()
290
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291
292class PriorityQueue(Queue):
293 """A subclass of Queue; retrieves entries in priority order (lowest first).
294
295 Entries are typically tuples of the form: (priority number, data).
296 """
297
298 def _init(self, maxsize):
299 self._queue = []
300
301 def _put(self, item, heappush=heapq.heappush):
302 heappush(self._queue, item)
303
304 def _get(self, heappop=heapq.heappop):
305 return heappop(self._queue)
306
307
308class LifoQueue(Queue):
309 """A subclass of Queue that retrieves most recently added entries first."""
310
311 def _init(self, maxsize):
312 self._queue = []
313
314 def _put(self, item):
315 self._queue.append(item)
316
317 def _get(self):
318 return self._queue.pop()
319
320
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200321if not compat.PY35:
322 JoinableQueue = Queue
323 """Deprecated alias for Queue."""
324 __all__.append('JoinableQueue')