blob: bd62c606c01080ed6eaa496a3138532e9caf3842 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
3__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
Guido van Rossumfef70982014-01-25 17:24:51 -08004 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
12from .tasks import coroutine
13
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
16 'Exception raised by Queue.get(block=0)/get_nowait().'
17 pass
18
19
20class QueueFull(Exception):
21 'Exception raised by Queue.put(block=0)/put_nowait().'
22 pass
23
24
25# Un-exported aliases for temporary backward compatibility.
26# Will disappear soon.
27Full = QueueFull
28Empty = QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
30
31class Queue:
32 """A queue, useful for coordinating producer and consumer coroutines.
33
34 If maxsize is less than or equal to zero, the queue size is infinite. If it
35 is an integer greater than 0, then "yield from put()" will block when the
36 queue reaches maxsize, until an item is removed by get().
37
38 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010039 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040 interrupted between calling qsize() and doing an operation on the Queue.
41 """
42
43 def __init__(self, maxsize=0, *, loop=None):
44 if loop is None:
45 self._loop = events.get_event_loop()
46 else:
47 self._loop = loop
48 self._maxsize = maxsize
49
50 # Futures.
51 self._getters = collections.deque()
52 # Pairs of (item, Future).
53 self._putters = collections.deque()
54 self._init(maxsize)
55
56 def _init(self, maxsize):
57 self._queue = collections.deque()
58
59 def _get(self):
60 return self._queue.popleft()
61
62 def _put(self, item):
63 self._queue.append(item)
64
65 def __repr__(self):
66 return '<{} at {:#x} {}>'.format(
67 type(self).__name__, id(self), self._format())
68
69 def __str__(self):
70 return '<{} {}>'.format(type(self).__name__, self._format())
71
72 def _format(self):
73 result = 'maxsize={!r}'.format(self._maxsize)
74 if getattr(self, '_queue', None):
75 result += ' _queue={!r}'.format(list(self._queue))
76 if self._getters:
77 result += ' _getters[{}]'.format(len(self._getters))
78 if self._putters:
79 result += ' _putters[{}]'.format(len(self._putters))
80 return result
81
82 def _consume_done_getters(self):
83 # Delete waiters at the head of the get() queue who've timed out.
84 while self._getters and self._getters[0].done():
85 self._getters.popleft()
86
87 def _consume_done_putters(self):
88 # Delete waiters at the head of the put() queue who've timed out.
89 while self._putters and self._putters[0][1].done():
90 self._putters.popleft()
91
92 def qsize(self):
93 """Number of items in the queue."""
94 return len(self._queue)
95
96 @property
97 def maxsize(self):
98 """Number of items allowed in the queue."""
99 return self._maxsize
100
101 def empty(self):
102 """Return True if the queue is empty, False otherwise."""
103 return not self._queue
104
105 def full(self):
106 """Return True if there are maxsize items in the queue.
107
108 Note: if the Queue was initialized with maxsize=0 (the default),
109 then full() is never True.
110 """
111 if self._maxsize <= 0:
112 return False
113 else:
114 return self.qsize() == self._maxsize
115
116 @coroutine
117 def put(self, item):
118 """Put an item into the queue.
119
120 If you yield from put(), wait until a free slot is available
121 before adding item.
122 """
123 self._consume_done_getters()
124 if self._getters:
125 assert not self._queue, (
126 'queue non-empty, why are getters waiting?')
127
128 getter = self._getters.popleft()
129
130 # Use _put and _get instead of passing item straight to getter, in
131 # case a subclass has logic that must run (e.g. JoinableQueue).
132 self._put(item)
133 getter.set_result(self._get())
134
135 elif self._maxsize > 0 and self._maxsize == self.qsize():
136 waiter = futures.Future(loop=self._loop)
137
138 self._putters.append((item, waiter))
139 yield from waiter
140
141 else:
142 self._put(item)
143
144 def put_nowait(self, item):
145 """Put an item into the queue without blocking.
146
Guido van Rossumfef70982014-01-25 17:24:51 -0800147 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148 """
149 self._consume_done_getters()
150 if self._getters:
151 assert not self._queue, (
152 'queue non-empty, why are getters waiting?')
153
154 getter = self._getters.popleft()
155
156 # Use _put and _get instead of passing item straight to getter, in
157 # case a subclass has logic that must run (e.g. JoinableQueue).
158 self._put(item)
159 getter.set_result(self._get())
160
161 elif self._maxsize > 0 and self._maxsize == self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800162 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 else:
164 self._put(item)
165
166 @coroutine
167 def get(self):
168 """Remove and return an item from the queue.
169
170 If you yield from get(), wait until a item is available.
171 """
172 self._consume_done_putters()
173 if self._putters:
174 assert self.full(), 'queue not full, why are putters waiting?'
175 item, putter = self._putters.popleft()
176 self._put(item)
177
178 # When a getter runs and frees up a slot so this putter can
179 # run, we need to defer the put for a tick to ensure that
180 # getters and putters alternate perfectly. See
181 # ChannelTest.test_wait.
182 self._loop.call_soon(putter.set_result, None)
183
184 return self._get()
185
186 elif self.qsize():
187 return self._get()
188 else:
189 waiter = futures.Future(loop=self._loop)
190
191 self._getters.append(waiter)
192 return (yield from waiter)
193
194 def get_nowait(self):
195 """Remove and return an item from the queue.
196
Guido van Rossumfef70982014-01-25 17:24:51 -0800197 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 """
199 self._consume_done_putters()
200 if self._putters:
201 assert self.full(), 'queue not full, why are putters waiting?'
202 item, putter = self._putters.popleft()
203 self._put(item)
204 # Wake putter on next tick.
205 putter.set_result(None)
206
207 return self._get()
208
209 elif self.qsize():
210 return self._get()
211 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800212 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213
214
215class PriorityQueue(Queue):
216 """A subclass of Queue; retrieves entries in priority order (lowest first).
217
218 Entries are typically tuples of the form: (priority number, data).
219 """
220
221 def _init(self, maxsize):
222 self._queue = []
223
224 def _put(self, item, heappush=heapq.heappush):
225 heappush(self._queue, item)
226
227 def _get(self, heappop=heapq.heappop):
228 return heappop(self._queue)
229
230
231class LifoQueue(Queue):
232 """A subclass of Queue that retrieves most recently added entries first."""
233
234 def _init(self, maxsize):
235 self._queue = []
236
237 def _put(self, item):
238 self._queue.append(item)
239
240 def _get(self):
241 return self._queue.pop()
242
243
244class JoinableQueue(Queue):
245 """A subclass of Queue with task_done() and join() methods."""
246
247 def __init__(self, maxsize=0, *, loop=None):
248 super().__init__(maxsize=maxsize, loop=loop)
249 self._unfinished_tasks = 0
250 self._finished = locks.Event(loop=self._loop)
251 self._finished.set()
252
253 def _format(self):
254 result = Queue._format(self)
255 if self._unfinished_tasks:
256 result += ' tasks={}'.format(self._unfinished_tasks)
257 return result
258
259 def _put(self, item):
260 super()._put(item)
261 self._unfinished_tasks += 1
262 self._finished.clear()
263
264 def task_done(self):
265 """Indicate that a formerly enqueued task is complete.
266
267 Used by queue consumers. For each get() used to fetch a task,
268 a subsequent call to task_done() tells the queue that the processing
269 on the task is complete.
270
271 If a join() is currently blocking, it will resume when all items have
272 been processed (meaning that a task_done() call was received for every
273 item that had been put() into the queue).
274
275 Raises ValueError if called more times than there were items placed in
276 the queue.
277 """
278 if self._unfinished_tasks <= 0:
279 raise ValueError('task_done() called too many times')
280 self._unfinished_tasks -= 1
281 if self._unfinished_tasks == 0:
282 self._finished.set()
283
284 @coroutine
285 def join(self):
286 """Block until all items in the queue have been gotten and processed.
287
288 The count of unfinished tasks goes up whenever an item is added to the
289 queue. The count goes down whenever a consumer thread calls task_done()
290 to indicate that the item was retrieved and all work on it is complete.
291 When the count of unfinished tasks drops to zero, join() unblocks.
292 """
293 if self._unfinished_tasks > 0:
294 yield from self._finished.wait()