blob: 4aeb6c451797743cf8b80be9b00a8ddd67447254 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
3__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
Guido van Rossumfef70982014-01-25 17:24:51 -08004 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
12from .tasks import coroutine
13
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010016 """Exception raised when Queue.get_nowait() is called on a Queue object
17 which is empty.
18 """
Guido van Rossumfef70982014-01-25 17:24:51 -080019 pass
20
21
22class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010023 """Exception raised when the Queue.put_nowait() method is called on a Queue
24 object which is full.
25 """
Guido van Rossumfef70982014-01-25 17:24:51 -080026 pass
27
28
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029class Queue:
30 """A queue, useful for coordinating producer and consumer coroutines.
31
32 If maxsize is less than or equal to zero, the queue size is infinite. If it
33 is an integer greater than 0, then "yield from put()" will block when the
34 queue reaches maxsize, until an item is removed by get().
35
36 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010037 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 interrupted between calling qsize() and doing an operation on the Queue.
39 """
40
41 def __init__(self, maxsize=0, *, loop=None):
42 if loop is None:
43 self._loop = events.get_event_loop()
44 else:
45 self._loop = loop
46 self._maxsize = maxsize
47
48 # Futures.
49 self._getters = collections.deque()
50 # Pairs of (item, Future).
51 self._putters = collections.deque()
52 self._init(maxsize)
53
54 def _init(self, maxsize):
55 self._queue = collections.deque()
56
57 def _get(self):
58 return self._queue.popleft()
59
60 def _put(self, item):
61 self._queue.append(item)
62
63 def __repr__(self):
64 return '<{} at {:#x} {}>'.format(
65 type(self).__name__, id(self), self._format())
66
67 def __str__(self):
68 return '<{} {}>'.format(type(self).__name__, self._format())
69
70 def _format(self):
71 result = 'maxsize={!r}'.format(self._maxsize)
72 if getattr(self, '_queue', None):
73 result += ' _queue={!r}'.format(list(self._queue))
74 if self._getters:
75 result += ' _getters[{}]'.format(len(self._getters))
76 if self._putters:
77 result += ' _putters[{}]'.format(len(self._putters))
78 return result
79
80 def _consume_done_getters(self):
81 # Delete waiters at the head of the get() queue who've timed out.
82 while self._getters and self._getters[0].done():
83 self._getters.popleft()
84
85 def _consume_done_putters(self):
86 # Delete waiters at the head of the put() queue who've timed out.
87 while self._putters and self._putters[0][1].done():
88 self._putters.popleft()
89
90 def qsize(self):
91 """Number of items in the queue."""
92 return len(self._queue)
93
94 @property
95 def maxsize(self):
96 """Number of items allowed in the queue."""
97 return self._maxsize
98
99 def empty(self):
100 """Return True if the queue is empty, False otherwise."""
101 return not self._queue
102
103 def full(self):
104 """Return True if there are maxsize items in the queue.
105
106 Note: if the Queue was initialized with maxsize=0 (the default),
107 then full() is never True.
108 """
109 if self._maxsize <= 0:
110 return False
111 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200112 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 @coroutine
115 def put(self, item):
116 """Put an item into the queue.
117
Victor Stinner952ec982014-12-22 22:09:50 +0100118 Put an item into the queue. If the queue is full, wait until a free
119 slot is available before adding item.
120
121 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700122 """
123 self._consume_done_getters()
124 if self._getters:
125 assert not self._queue, (
126 'queue non-empty, why are getters waiting?')
127
128 getter = self._getters.popleft()
129
130 # Use _put and _get instead of passing item straight to getter, in
131 # case a subclass has logic that must run (e.g. JoinableQueue).
132 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100133
134 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 getter.set_result(self._get())
136
Victor Stinner66dc6b02014-06-17 23:36:21 +0200137 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 waiter = futures.Future(loop=self._loop)
139
140 self._putters.append((item, waiter))
141 yield from waiter
142
143 else:
144 self._put(item)
145
146 def put_nowait(self, item):
147 """Put an item into the queue without blocking.
148
Guido van Rossumfef70982014-01-25 17:24:51 -0800149 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 """
151 self._consume_done_getters()
152 if self._getters:
153 assert not self._queue, (
154 'queue non-empty, why are getters waiting?')
155
156 getter = self._getters.popleft()
157
158 # Use _put and _get instead of passing item straight to getter, in
159 # case a subclass has logic that must run (e.g. JoinableQueue).
160 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100161
162 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 getter.set_result(self._get())
164
Victor Stinner66dc6b02014-06-17 23:36:21 +0200165 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800166 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 else:
168 self._put(item)
169
170 @coroutine
171 def get(self):
172 """Remove and return an item from the queue.
173
Victor Stinner952ec982014-12-22 22:09:50 +0100174 If queue is empty, wait until an item is available.
175
176 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 """
178 self._consume_done_putters()
179 if self._putters:
180 assert self.full(), 'queue not full, why are putters waiting?'
181 item, putter = self._putters.popleft()
182 self._put(item)
183
184 # When a getter runs and frees up a slot so this putter can
185 # run, we need to defer the put for a tick to ensure that
186 # getters and putters alternate perfectly. See
187 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200188 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189
190 return self._get()
191
192 elif self.qsize():
193 return self._get()
194 else:
195 waiter = futures.Future(loop=self._loop)
196
197 self._getters.append(waiter)
198 return (yield from waiter)
199
200 def get_nowait(self):
201 """Remove and return an item from the queue.
202
Guido van Rossumfef70982014-01-25 17:24:51 -0800203 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 """
205 self._consume_done_putters()
206 if self._putters:
207 assert self.full(), 'queue not full, why are putters waiting?'
208 item, putter = self._putters.popleft()
209 self._put(item)
210 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100211
212 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 putter.set_result(None)
214
215 return self._get()
216
217 elif self.qsize():
218 return self._get()
219 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800220 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
222
223class PriorityQueue(Queue):
224 """A subclass of Queue; retrieves entries in priority order (lowest first).
225
226 Entries are typically tuples of the form: (priority number, data).
227 """
228
229 def _init(self, maxsize):
230 self._queue = []
231
232 def _put(self, item, heappush=heapq.heappush):
233 heappush(self._queue, item)
234
235 def _get(self, heappop=heapq.heappop):
236 return heappop(self._queue)
237
238
239class LifoQueue(Queue):
240 """A subclass of Queue that retrieves most recently added entries first."""
241
242 def _init(self, maxsize):
243 self._queue = []
244
245 def _put(self, item):
246 self._queue.append(item)
247
248 def _get(self):
249 return self._queue.pop()
250
251
252class JoinableQueue(Queue):
253 """A subclass of Queue with task_done() and join() methods."""
254
255 def __init__(self, maxsize=0, *, loop=None):
256 super().__init__(maxsize=maxsize, loop=loop)
257 self._unfinished_tasks = 0
258 self._finished = locks.Event(loop=self._loop)
259 self._finished.set()
260
261 def _format(self):
262 result = Queue._format(self)
263 if self._unfinished_tasks:
264 result += ' tasks={}'.format(self._unfinished_tasks)
265 return result
266
267 def _put(self, item):
268 super()._put(item)
269 self._unfinished_tasks += 1
270 self._finished.clear()
271
272 def task_done(self):
273 """Indicate that a formerly enqueued task is complete.
274
275 Used by queue consumers. For each get() used to fetch a task,
276 a subsequent call to task_done() tells the queue that the processing
277 on the task is complete.
278
279 If a join() is currently blocking, it will resume when all items have
280 been processed (meaning that a task_done() call was received for every
281 item that had been put() into the queue).
282
283 Raises ValueError if called more times than there were items placed in
284 the queue.
285 """
286 if self._unfinished_tasks <= 0:
287 raise ValueError('task_done() called too many times')
288 self._unfinished_tasks -= 1
289 if self._unfinished_tasks == 0:
290 self._finished.set()
291
292 @coroutine
293 def join(self):
294 """Block until all items in the queue have been gotten and processed.
295
296 The count of unfinished tasks goes up whenever an item is added to the
297 queue. The count goes down whenever a consumer thread calls task_done()
298 to indicate that the item was retrieved and all work on it is complete.
299 When the count of unfinished tasks drops to zero, join() unblocks.
300 """
301 if self._unfinished_tasks > 0:
302 yield from self._finished.wait()