blob: dce0d53c70389ce049f6462dea4d121707acc8c7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
3__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
Guido van Rossumfef70982014-01-25 17:24:51 -08004 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
12from .tasks import coroutine
13
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
16 'Exception raised by Queue.get(block=0)/get_nowait().'
17 pass
18
19
20class QueueFull(Exception):
21 'Exception raised by Queue.put(block=0)/put_nowait().'
22 pass
23
24
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Queue:
26 """A queue, useful for coordinating producer and consumer coroutines.
27
28 If maxsize is less than or equal to zero, the queue size is infinite. If it
29 is an integer greater than 0, then "yield from put()" will block when the
30 queue reaches maxsize, until an item is removed by get().
31
32 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010033 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 interrupted between calling qsize() and doing an operation on the Queue.
35 """
36
37 def __init__(self, maxsize=0, *, loop=None):
38 if loop is None:
39 self._loop = events.get_event_loop()
40 else:
41 self._loop = loop
42 self._maxsize = maxsize
43
44 # Futures.
45 self._getters = collections.deque()
46 # Pairs of (item, Future).
47 self._putters = collections.deque()
48 self._init(maxsize)
49
50 def _init(self, maxsize):
51 self._queue = collections.deque()
52
53 def _get(self):
54 return self._queue.popleft()
55
56 def _put(self, item):
57 self._queue.append(item)
58
59 def __repr__(self):
60 return '<{} at {:#x} {}>'.format(
61 type(self).__name__, id(self), self._format())
62
63 def __str__(self):
64 return '<{} {}>'.format(type(self).__name__, self._format())
65
66 def _format(self):
67 result = 'maxsize={!r}'.format(self._maxsize)
68 if getattr(self, '_queue', None):
69 result += ' _queue={!r}'.format(list(self._queue))
70 if self._getters:
71 result += ' _getters[{}]'.format(len(self._getters))
72 if self._putters:
73 result += ' _putters[{}]'.format(len(self._putters))
74 return result
75
76 def _consume_done_getters(self):
77 # Delete waiters at the head of the get() queue who've timed out.
78 while self._getters and self._getters[0].done():
79 self._getters.popleft()
80
81 def _consume_done_putters(self):
82 # Delete waiters at the head of the put() queue who've timed out.
83 while self._putters and self._putters[0][1].done():
84 self._putters.popleft()
85
86 def qsize(self):
87 """Number of items in the queue."""
88 return len(self._queue)
89
90 @property
91 def maxsize(self):
92 """Number of items allowed in the queue."""
93 return self._maxsize
94
95 def empty(self):
96 """Return True if the queue is empty, False otherwise."""
97 return not self._queue
98
99 def full(self):
100 """Return True if there are maxsize items in the queue.
101
102 Note: if the Queue was initialized with maxsize=0 (the default),
103 then full() is never True.
104 """
105 if self._maxsize <= 0:
106 return False
107 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200108 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
110 @coroutine
111 def put(self, item):
112 """Put an item into the queue.
113
Victor Stinner952ec982014-12-22 22:09:50 +0100114 Put an item into the queue. If the queue is full, wait until a free
115 slot is available before adding item.
116
117 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 """
119 self._consume_done_getters()
120 if self._getters:
121 assert not self._queue, (
122 'queue non-empty, why are getters waiting?')
123
124 getter = self._getters.popleft()
125
126 # Use _put and _get instead of passing item straight to getter, in
127 # case a subclass has logic that must run (e.g. JoinableQueue).
128 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100129
130 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131 getter.set_result(self._get())
132
Victor Stinner66dc6b02014-06-17 23:36:21 +0200133 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134 waiter = futures.Future(loop=self._loop)
135
136 self._putters.append((item, waiter))
137 yield from waiter
138
139 else:
140 self._put(item)
141
142 def put_nowait(self, item):
143 """Put an item into the queue without blocking.
144
Guido van Rossumfef70982014-01-25 17:24:51 -0800145 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 """
147 self._consume_done_getters()
148 if self._getters:
149 assert not self._queue, (
150 'queue non-empty, why are getters waiting?')
151
152 getter = self._getters.popleft()
153
154 # Use _put and _get instead of passing item straight to getter, in
155 # case a subclass has logic that must run (e.g. JoinableQueue).
156 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100157
158 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 getter.set_result(self._get())
160
Victor Stinner66dc6b02014-06-17 23:36:21 +0200161 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800162 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 else:
164 self._put(item)
165
166 @coroutine
167 def get(self):
168 """Remove and return an item from the queue.
169
Victor Stinner952ec982014-12-22 22:09:50 +0100170 If queue is empty, wait until an item is available.
171
172 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 """
174 self._consume_done_putters()
175 if self._putters:
176 assert self.full(), 'queue not full, why are putters waiting?'
177 item, putter = self._putters.popleft()
178 self._put(item)
179
180 # When a getter runs and frees up a slot so this putter can
181 # run, we need to defer the put for a tick to ensure that
182 # getters and putters alternate perfectly. See
183 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200184 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
186 return self._get()
187
188 elif self.qsize():
189 return self._get()
190 else:
191 waiter = futures.Future(loop=self._loop)
192
193 self._getters.append(waiter)
194 return (yield from waiter)
195
196 def get_nowait(self):
197 """Remove and return an item from the queue.
198
Guido van Rossumfef70982014-01-25 17:24:51 -0800199 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 """
201 self._consume_done_putters()
202 if self._putters:
203 assert self.full(), 'queue not full, why are putters waiting?'
204 item, putter = self._putters.popleft()
205 self._put(item)
206 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100207
208 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700209 putter.set_result(None)
210
211 return self._get()
212
213 elif self.qsize():
214 return self._get()
215 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800216 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218
219class PriorityQueue(Queue):
220 """A subclass of Queue; retrieves entries in priority order (lowest first).
221
222 Entries are typically tuples of the form: (priority number, data).
223 """
224
225 def _init(self, maxsize):
226 self._queue = []
227
228 def _put(self, item, heappush=heapq.heappush):
229 heappush(self._queue, item)
230
231 def _get(self, heappop=heapq.heappop):
232 return heappop(self._queue)
233
234
235class LifoQueue(Queue):
236 """A subclass of Queue that retrieves most recently added entries first."""
237
238 def _init(self, maxsize):
239 self._queue = []
240
241 def _put(self, item):
242 self._queue.append(item)
243
244 def _get(self):
245 return self._queue.pop()
246
247
248class JoinableQueue(Queue):
249 """A subclass of Queue with task_done() and join() methods."""
250
251 def __init__(self, maxsize=0, *, loop=None):
252 super().__init__(maxsize=maxsize, loop=loop)
253 self._unfinished_tasks = 0
254 self._finished = locks.Event(loop=self._loop)
255 self._finished.set()
256
257 def _format(self):
258 result = Queue._format(self)
259 if self._unfinished_tasks:
260 result += ' tasks={}'.format(self._unfinished_tasks)
261 return result
262
263 def _put(self, item):
264 super()._put(item)
265 self._unfinished_tasks += 1
266 self._finished.clear()
267
268 def task_done(self):
269 """Indicate that a formerly enqueued task is complete.
270
271 Used by queue consumers. For each get() used to fetch a task,
272 a subsequent call to task_done() tells the queue that the processing
273 on the task is complete.
274
275 If a join() is currently blocking, it will resume when all items have
276 been processed (meaning that a task_done() call was received for every
277 item that had been put() into the queue).
278
279 Raises ValueError if called more times than there were items placed in
280 the queue.
281 """
282 if self._unfinished_tasks <= 0:
283 raise ValueError('task_done() called too many times')
284 self._unfinished_tasks -= 1
285 if self._unfinished_tasks == 0:
286 self._finished.set()
287
288 @coroutine
289 def join(self):
290 """Block until all items in the queue have been gotten and processed.
291
292 The count of unfinished tasks goes up whenever an item is added to the
293 queue. The count goes down whenever a consumer thread calls task_done()
294 to indicate that the item was retrieved and all work on it is complete.
295 When the count of unfinished tasks drops to zero, join() unblocks.
296 """
297 if self._unfinished_tasks > 0:
298 yield from self._finished.wait()