blob: 8f6c257701f70ca845425ca6516bbd7de9412865 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
3__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
Guido van Rossumfef70982014-01-25 17:24:51 -08004 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
12from .tasks import coroutine
13
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
16 'Exception raised by Queue.get(block=0)/get_nowait().'
17 pass
18
19
20class QueueFull(Exception):
21 'Exception raised by Queue.put(block=0)/put_nowait().'
22 pass
23
24
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Queue:
26 """A queue, useful for coordinating producer and consumer coroutines.
27
28 If maxsize is less than or equal to zero, the queue size is infinite. If it
29 is an integer greater than 0, then "yield from put()" will block when the
30 queue reaches maxsize, until an item is removed by get().
31
32 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010033 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 interrupted between calling qsize() and doing an operation on the Queue.
35 """
36
37 def __init__(self, maxsize=0, *, loop=None):
38 if loop is None:
39 self._loop = events.get_event_loop()
40 else:
41 self._loop = loop
42 self._maxsize = maxsize
43
44 # Futures.
45 self._getters = collections.deque()
46 # Pairs of (item, Future).
47 self._putters = collections.deque()
48 self._init(maxsize)
49
50 def _init(self, maxsize):
51 self._queue = collections.deque()
52
53 def _get(self):
54 return self._queue.popleft()
55
56 def _put(self, item):
57 self._queue.append(item)
58
59 def __repr__(self):
60 return '<{} at {:#x} {}>'.format(
61 type(self).__name__, id(self), self._format())
62
63 def __str__(self):
64 return '<{} {}>'.format(type(self).__name__, self._format())
65
66 def _format(self):
67 result = 'maxsize={!r}'.format(self._maxsize)
68 if getattr(self, '_queue', None):
69 result += ' _queue={!r}'.format(list(self._queue))
70 if self._getters:
71 result += ' _getters[{}]'.format(len(self._getters))
72 if self._putters:
73 result += ' _putters[{}]'.format(len(self._putters))
74 return result
75
76 def _consume_done_getters(self):
77 # Delete waiters at the head of the get() queue who've timed out.
78 while self._getters and self._getters[0].done():
79 self._getters.popleft()
80
81 def _consume_done_putters(self):
82 # Delete waiters at the head of the put() queue who've timed out.
83 while self._putters and self._putters[0][1].done():
84 self._putters.popleft()
85
86 def qsize(self):
87 """Number of items in the queue."""
88 return len(self._queue)
89
90 @property
91 def maxsize(self):
92 """Number of items allowed in the queue."""
93 return self._maxsize
94
95 def empty(self):
96 """Return True if the queue is empty, False otherwise."""
97 return not self._queue
98
99 def full(self):
100 """Return True if there are maxsize items in the queue.
101
102 Note: if the Queue was initialized with maxsize=0 (the default),
103 then full() is never True.
104 """
105 if self._maxsize <= 0:
106 return False
107 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200108 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
110 @coroutine
111 def put(self, item):
112 """Put an item into the queue.
113
Victor Stinner952ec982014-12-22 22:09:50 +0100114 Put an item into the queue. If the queue is full, wait until a free
115 slot is available before adding item.
116
117 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700118 """
119 self._consume_done_getters()
120 if self._getters:
121 assert not self._queue, (
122 'queue non-empty, why are getters waiting?')
123
124 getter = self._getters.popleft()
125
126 # Use _put and _get instead of passing item straight to getter, in
127 # case a subclass has logic that must run (e.g. JoinableQueue).
128 self._put(item)
129 getter.set_result(self._get())
130
Victor Stinner66dc6b02014-06-17 23:36:21 +0200131 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 waiter = futures.Future(loop=self._loop)
133
134 self._putters.append((item, waiter))
135 yield from waiter
136
137 else:
138 self._put(item)
139
140 def put_nowait(self, item):
141 """Put an item into the queue without blocking.
142
Guido van Rossumfef70982014-01-25 17:24:51 -0800143 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 """
145 self._consume_done_getters()
146 if self._getters:
147 assert not self._queue, (
148 'queue non-empty, why are getters waiting?')
149
150 getter = self._getters.popleft()
151
152 # Use _put and _get instead of passing item straight to getter, in
153 # case a subclass has logic that must run (e.g. JoinableQueue).
154 self._put(item)
155 getter.set_result(self._get())
156
Victor Stinner66dc6b02014-06-17 23:36:21 +0200157 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800158 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 else:
160 self._put(item)
161
162 @coroutine
163 def get(self):
164 """Remove and return an item from the queue.
165
Victor Stinner952ec982014-12-22 22:09:50 +0100166 If queue is empty, wait until an item is available.
167
168 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169 """
170 self._consume_done_putters()
171 if self._putters:
172 assert self.full(), 'queue not full, why are putters waiting?'
173 item, putter = self._putters.popleft()
174 self._put(item)
175
176 # When a getter runs and frees up a slot so this putter can
177 # run, we need to defer the put for a tick to ensure that
178 # getters and putters alternate perfectly. See
179 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200180 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181
182 return self._get()
183
184 elif self.qsize():
185 return self._get()
186 else:
187 waiter = futures.Future(loop=self._loop)
188
189 self._getters.append(waiter)
190 return (yield from waiter)
191
192 def get_nowait(self):
193 """Remove and return an item from the queue.
194
Guido van Rossumfef70982014-01-25 17:24:51 -0800195 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 """
197 self._consume_done_putters()
198 if self._putters:
199 assert self.full(), 'queue not full, why are putters waiting?'
200 item, putter = self._putters.popleft()
201 self._put(item)
202 # Wake putter on next tick.
203 putter.set_result(None)
204
205 return self._get()
206
207 elif self.qsize():
208 return self._get()
209 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800210 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211
212
213class PriorityQueue(Queue):
214 """A subclass of Queue; retrieves entries in priority order (lowest first).
215
216 Entries are typically tuples of the form: (priority number, data).
217 """
218
219 def _init(self, maxsize):
220 self._queue = []
221
222 def _put(self, item, heappush=heapq.heappush):
223 heappush(self._queue, item)
224
225 def _get(self, heappop=heapq.heappop):
226 return heappop(self._queue)
227
228
229class LifoQueue(Queue):
230 """A subclass of Queue that retrieves most recently added entries first."""
231
232 def _init(self, maxsize):
233 self._queue = []
234
235 def _put(self, item):
236 self._queue.append(item)
237
238 def _get(self):
239 return self._queue.pop()
240
241
242class JoinableQueue(Queue):
243 """A subclass of Queue with task_done() and join() methods."""
244
245 def __init__(self, maxsize=0, *, loop=None):
246 super().__init__(maxsize=maxsize, loop=loop)
247 self._unfinished_tasks = 0
248 self._finished = locks.Event(loop=self._loop)
249 self._finished.set()
250
251 def _format(self):
252 result = Queue._format(self)
253 if self._unfinished_tasks:
254 result += ' tasks={}'.format(self._unfinished_tasks)
255 return result
256
257 def _put(self, item):
258 super()._put(item)
259 self._unfinished_tasks += 1
260 self._finished.clear()
261
262 def task_done(self):
263 """Indicate that a formerly enqueued task is complete.
264
265 Used by queue consumers. For each get() used to fetch a task,
266 a subsequent call to task_done() tells the queue that the processing
267 on the task is complete.
268
269 If a join() is currently blocking, it will resume when all items have
270 been processed (meaning that a task_done() call was received for every
271 item that had been put() into the queue).
272
273 Raises ValueError if called more times than there were items placed in
274 the queue.
275 """
276 if self._unfinished_tasks <= 0:
277 raise ValueError('task_done() called too many times')
278 self._unfinished_tasks -= 1
279 if self._unfinished_tasks == 0:
280 self._finished.set()
281
282 @coroutine
283 def join(self):
284 """Block until all items in the queue have been gotten and processed.
285
286 The count of unfinished tasks goes up whenever an item is added to the
287 queue. The count goes down whenever a consumer thread calls task_done()
288 to indicate that the item was retrieved and all work on it is complete.
289 When the count of unfinished tasks drops to zero, join() unblocks.
290 """
291 if self._unfinished_tasks > 0:
292 yield from self._finished.wait()