blob: 6283db3268f93ea9da2afcd7ac7be488cf93e5b5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
3__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
Guido van Rossumfef70982014-01-25 17:24:51 -08004 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
12from .tasks import coroutine
13
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
16 'Exception raised by Queue.get(block=0)/get_nowait().'
17 pass
18
19
20class QueueFull(Exception):
21 'Exception raised by Queue.put(block=0)/put_nowait().'
22 pass
23
24
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025class Queue:
26 """A queue, useful for coordinating producer and consumer coroutines.
27
28 If maxsize is less than or equal to zero, the queue size is infinite. If it
29 is an integer greater than 0, then "yield from put()" will block when the
30 queue reaches maxsize, until an item is removed by get().
31
32 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010033 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034 interrupted between calling qsize() and doing an operation on the Queue.
35 """
36
37 def __init__(self, maxsize=0, *, loop=None):
38 if loop is None:
39 self._loop = events.get_event_loop()
40 else:
41 self._loop = loop
42 self._maxsize = maxsize
43
44 # Futures.
45 self._getters = collections.deque()
46 # Pairs of (item, Future).
47 self._putters = collections.deque()
48 self._init(maxsize)
49
50 def _init(self, maxsize):
51 self._queue = collections.deque()
52
53 def _get(self):
54 return self._queue.popleft()
55
56 def _put(self, item):
57 self._queue.append(item)
58
59 def __repr__(self):
60 return '<{} at {:#x} {}>'.format(
61 type(self).__name__, id(self), self._format())
62
63 def __str__(self):
64 return '<{} {}>'.format(type(self).__name__, self._format())
65
66 def _format(self):
67 result = 'maxsize={!r}'.format(self._maxsize)
68 if getattr(self, '_queue', None):
69 result += ' _queue={!r}'.format(list(self._queue))
70 if self._getters:
71 result += ' _getters[{}]'.format(len(self._getters))
72 if self._putters:
73 result += ' _putters[{}]'.format(len(self._putters))
74 return result
75
76 def _consume_done_getters(self):
77 # Delete waiters at the head of the get() queue who've timed out.
78 while self._getters and self._getters[0].done():
79 self._getters.popleft()
80
81 def _consume_done_putters(self):
82 # Delete waiters at the head of the put() queue who've timed out.
83 while self._putters and self._putters[0][1].done():
84 self._putters.popleft()
85
86 def qsize(self):
87 """Number of items in the queue."""
88 return len(self._queue)
89
90 @property
91 def maxsize(self):
92 """Number of items allowed in the queue."""
93 return self._maxsize
94
95 def empty(self):
96 """Return True if the queue is empty, False otherwise."""
97 return not self._queue
98
99 def full(self):
100 """Return True if there are maxsize items in the queue.
101
102 Note: if the Queue was initialized with maxsize=0 (the default),
103 then full() is never True.
104 """
105 if self._maxsize <= 0:
106 return False
107 else:
108 return self.qsize() == self._maxsize
109
110 @coroutine
111 def put(self, item):
112 """Put an item into the queue.
113
114 If you yield from put(), wait until a free slot is available
115 before adding item.
116 """
117 self._consume_done_getters()
118 if self._getters:
119 assert not self._queue, (
120 'queue non-empty, why are getters waiting?')
121
122 getter = self._getters.popleft()
123
124 # Use _put and _get instead of passing item straight to getter, in
125 # case a subclass has logic that must run (e.g. JoinableQueue).
126 self._put(item)
127 getter.set_result(self._get())
128
129 elif self._maxsize > 0 and self._maxsize == self.qsize():
130 waiter = futures.Future(loop=self._loop)
131
132 self._putters.append((item, waiter))
133 yield from waiter
134
135 else:
136 self._put(item)
137
138 def put_nowait(self, item):
139 """Put an item into the queue without blocking.
140
Guido van Rossumfef70982014-01-25 17:24:51 -0800141 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 """
143 self._consume_done_getters()
144 if self._getters:
145 assert not self._queue, (
146 'queue non-empty, why are getters waiting?')
147
148 getter = self._getters.popleft()
149
150 # Use _put and _get instead of passing item straight to getter, in
151 # case a subclass has logic that must run (e.g. JoinableQueue).
152 self._put(item)
153 getter.set_result(self._get())
154
155 elif self._maxsize > 0 and self._maxsize == self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800156 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 else:
158 self._put(item)
159
160 @coroutine
161 def get(self):
162 """Remove and return an item from the queue.
163
164 If you yield from get(), wait until a item is available.
165 """
166 self._consume_done_putters()
167 if self._putters:
168 assert self.full(), 'queue not full, why are putters waiting?'
169 item, putter = self._putters.popleft()
170 self._put(item)
171
172 # When a getter runs and frees up a slot so this putter can
173 # run, we need to defer the put for a tick to ensure that
174 # getters and putters alternate perfectly. See
175 # ChannelTest.test_wait.
176 self._loop.call_soon(putter.set_result, None)
177
178 return self._get()
179
180 elif self.qsize():
181 return self._get()
182 else:
183 waiter = futures.Future(loop=self._loop)
184
185 self._getters.append(waiter)
186 return (yield from waiter)
187
188 def get_nowait(self):
189 """Remove and return an item from the queue.
190
Guido van Rossumfef70982014-01-25 17:24:51 -0800191 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192 """
193 self._consume_done_putters()
194 if self._putters:
195 assert self.full(), 'queue not full, why are putters waiting?'
196 item, putter = self._putters.popleft()
197 self._put(item)
198 # Wake putter on next tick.
199 putter.set_result(None)
200
201 return self._get()
202
203 elif self.qsize():
204 return self._get()
205 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800206 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207
208
209class PriorityQueue(Queue):
210 """A subclass of Queue; retrieves entries in priority order (lowest first).
211
212 Entries are typically tuples of the form: (priority number, data).
213 """
214
215 def _init(self, maxsize):
216 self._queue = []
217
218 def _put(self, item, heappush=heapq.heappush):
219 heappush(self._queue, item)
220
221 def _get(self, heappop=heapq.heappop):
222 return heappop(self._queue)
223
224
225class LifoQueue(Queue):
226 """A subclass of Queue that retrieves most recently added entries first."""
227
228 def _init(self, maxsize):
229 self._queue = []
230
231 def _put(self, item):
232 self._queue.append(item)
233
234 def _get(self):
235 return self._queue.pop()
236
237
238class JoinableQueue(Queue):
239 """A subclass of Queue with task_done() and join() methods."""
240
241 def __init__(self, maxsize=0, *, loop=None):
242 super().__init__(maxsize=maxsize, loop=loop)
243 self._unfinished_tasks = 0
244 self._finished = locks.Event(loop=self._loop)
245 self._finished.set()
246
247 def _format(self):
248 result = Queue._format(self)
249 if self._unfinished_tasks:
250 result += ' tasks={}'.format(self._unfinished_tasks)
251 return result
252
253 def _put(self, item):
254 super()._put(item)
255 self._unfinished_tasks += 1
256 self._finished.clear()
257
258 def task_done(self):
259 """Indicate that a formerly enqueued task is complete.
260
261 Used by queue consumers. For each get() used to fetch a task,
262 a subsequent call to task_done() tells the queue that the processing
263 on the task is complete.
264
265 If a join() is currently blocking, it will resume when all items have
266 been processed (meaning that a task_done() call was received for every
267 item that had been put() into the queue).
268
269 Raises ValueError if called more times than there were items placed in
270 the queue.
271 """
272 if self._unfinished_tasks <= 0:
273 raise ValueError('task_done() called too many times')
274 self._unfinished_tasks -= 1
275 if self._unfinished_tasks == 0:
276 self._finished.set()
277
278 @coroutine
279 def join(self):
280 """Block until all items in the queue have been gotten and processed.
281
282 The count of unfinished tasks goes up whenever an item is added to the
283 queue. The count goes down whenever a consumer thread calls task_done()
284 to indicate that the item was retrieved and all work on it is complete.
285 When the count of unfinished tasks drops to zero, join() unblocks.
286 """
287 if self._unfinished_tasks > 0:
288 yield from self._finished.wait()