blob: 50543c829e33c9f3427c56500c2dced23e2cd478 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
R David Murraye81a7732015-04-12 18:47:56 -04003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
10from . import locks
11from .tasks import coroutine
12
13
Guido van Rossumfef70982014-01-25 17:24:51 -080014class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010015 """Exception raised when Queue.get_nowait() is called on a Queue object
16 which is empty.
17 """
Guido van Rossumfef70982014-01-25 17:24:51 -080018 pass
19
20
21class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010022 """Exception raised when the Queue.put_nowait() method is called on a Queue
23 object which is full.
24 """
Guido van Rossumfef70982014-01-25 17:24:51 -080025 pass
26
27
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028class Queue:
29 """A queue, useful for coordinating producer and consumer coroutines.
30
31 If maxsize is less than or equal to zero, the queue size is infinite. If it
32 is an integer greater than 0, then "yield from put()" will block when the
33 queue reaches maxsize, until an item is removed by get().
34
35 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010036 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 interrupted between calling qsize() and doing an operation on the Queue.
38 """
39
40 def __init__(self, maxsize=0, *, loop=None):
41 if loop is None:
42 self._loop = events.get_event_loop()
43 else:
44 self._loop = loop
45 self._maxsize = maxsize
46
47 # Futures.
48 self._getters = collections.deque()
49 # Pairs of (item, Future).
50 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010051 self._unfinished_tasks = 0
52 self._finished = locks.Event(loop=self._loop)
53 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 self._init(maxsize)
55
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070056 # These three are overridable in subclasses.
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 def _init(self, maxsize):
59 self._queue = collections.deque()
60
61 def _get(self):
62 return self._queue.popleft()
63
64 def _put(self, item):
65 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070066
67 # End of the overridable methods.
68
69 def __put_internal(self, item):
70 self._put(item)
Victor Stinner4cb814c2015-02-17 22:53:28 +010071 self._unfinished_tasks += 1
72 self._finished.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073
74 def __repr__(self):
75 return '<{} at {:#x} {}>'.format(
76 type(self).__name__, id(self), self._format())
77
78 def __str__(self):
79 return '<{} {}>'.format(type(self).__name__, self._format())
80
81 def _format(self):
82 result = 'maxsize={!r}'.format(self._maxsize)
83 if getattr(self, '_queue', None):
84 result += ' _queue={!r}'.format(list(self._queue))
85 if self._getters:
86 result += ' _getters[{}]'.format(len(self._getters))
87 if self._putters:
88 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010089 if self._unfinished_tasks:
90 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 return result
92
93 def _consume_done_getters(self):
94 # Delete waiters at the head of the get() queue who've timed out.
95 while self._getters and self._getters[0].done():
96 self._getters.popleft()
97
98 def _consume_done_putters(self):
99 # Delete waiters at the head of the put() queue who've timed out.
100 while self._putters and self._putters[0][1].done():
101 self._putters.popleft()
102
103 def qsize(self):
104 """Number of items in the queue."""
105 return len(self._queue)
106
107 @property
108 def maxsize(self):
109 """Number of items allowed in the queue."""
110 return self._maxsize
111
112 def empty(self):
113 """Return True if the queue is empty, False otherwise."""
114 return not self._queue
115
116 def full(self):
117 """Return True if there are maxsize items in the queue.
118
119 Note: if the Queue was initialized with maxsize=0 (the default),
120 then full() is never True.
121 """
122 if self._maxsize <= 0:
123 return False
124 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200125 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126
127 @coroutine
128 def put(self, item):
129 """Put an item into the queue.
130
Victor Stinner952ec982014-12-22 22:09:50 +0100131 Put an item into the queue. If the queue is full, wait until a free
132 slot is available before adding item.
133
134 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 """
136 self._consume_done_getters()
137 if self._getters:
138 assert not self._queue, (
139 'queue non-empty, why are getters waiting?')
140
141 getter = self._getters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700142 self.__put_internal(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100143
144 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 getter.set_result(self._get())
146
Victor Stinner66dc6b02014-06-17 23:36:21 +0200147 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148 waiter = futures.Future(loop=self._loop)
149
150 self._putters.append((item, waiter))
151 yield from waiter
152
153 else:
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700154 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
156 def put_nowait(self, item):
157 """Put an item into the queue without blocking.
158
Guido van Rossumfef70982014-01-25 17:24:51 -0800159 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 """
161 self._consume_done_getters()
162 if self._getters:
163 assert not self._queue, (
164 'queue non-empty, why are getters waiting?')
165
166 getter = self._getters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700167 self.__put_internal(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100168
169 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170 getter.set_result(self._get())
171
Victor Stinner66dc6b02014-06-17 23:36:21 +0200172 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800173 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 else:
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700175 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176
177 @coroutine
178 def get(self):
179 """Remove and return an item from the queue.
180
Victor Stinner952ec982014-12-22 22:09:50 +0100181 If queue is empty, wait until an item is available.
182
183 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 """
185 self._consume_done_putters()
186 if self._putters:
187 assert self.full(), 'queue not full, why are putters waiting?'
188 item, putter = self._putters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700189 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190
191 # When a getter runs and frees up a slot so this putter can
192 # run, we need to defer the put for a tick to ensure that
193 # getters and putters alternate perfectly. See
194 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200195 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196
197 return self._get()
198
199 elif self.qsize():
200 return self._get()
201 else:
202 waiter = futures.Future(loop=self._loop)
203
204 self._getters.append(waiter)
205 return (yield from waiter)
206
207 def get_nowait(self):
208 """Remove and return an item from the queue.
209
Guido van Rossumfef70982014-01-25 17:24:51 -0800210 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700211 """
212 self._consume_done_putters()
213 if self._putters:
214 assert self.full(), 'queue not full, why are putters waiting?'
215 item, putter = self._putters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700216 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100218
219 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 putter.set_result(None)
221
222 return self._get()
223
224 elif self.qsize():
225 return self._get()
226 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800227 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
Victor Stinner4cb814c2015-02-17 22:53:28 +0100229 def task_done(self):
230 """Indicate that a formerly enqueued task is complete.
231
232 Used by queue consumers. For each get() used to fetch a task,
233 a subsequent call to task_done() tells the queue that the processing
234 on the task is complete.
235
236 If a join() is currently blocking, it will resume when all items have
237 been processed (meaning that a task_done() call was received for every
238 item that had been put() into the queue).
239
240 Raises ValueError if called more times than there were items placed in
241 the queue.
242 """
243 if self._unfinished_tasks <= 0:
244 raise ValueError('task_done() called too many times')
245 self._unfinished_tasks -= 1
246 if self._unfinished_tasks == 0:
247 self._finished.set()
248
249 @coroutine
250 def join(self):
251 """Block until all items in the queue have been gotten and processed.
252
253 The count of unfinished tasks goes up whenever an item is added to the
254 queue. The count goes down whenever a consumer calls task_done() to
255 indicate that the item was retrieved and all work on it is complete.
256 When the count of unfinished tasks drops to zero, join() unblocks.
257 """
258 if self._unfinished_tasks > 0:
259 yield from self._finished.wait()
260
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261
262class PriorityQueue(Queue):
263 """A subclass of Queue; retrieves entries in priority order (lowest first).
264
265 Entries are typically tuples of the form: (priority number, data).
266 """
267
268 def _init(self, maxsize):
269 self._queue = []
270
271 def _put(self, item, heappush=heapq.heappush):
272 heappush(self._queue, item)
273
274 def _get(self, heappop=heapq.heappop):
275 return heappop(self._queue)
276
277
278class LifoQueue(Queue):
279 """A subclass of Queue that retrieves most recently added entries first."""
280
281 def _init(self, maxsize):
282 self._queue = []
283
284 def _put(self, item):
285 self._queue.append(item)
286
287 def _get(self):
288 return self._queue.pop()