blob: 3b4dc21ab8641f97e1e7cbf3ce543bdff6505f5a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinner4cb814c2015-02-17 22:53:28 +01003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty',
4 'JoinableQueue']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
Victor Stinnere6ecea52015-07-09 23:13:50 +020012from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010016 """Exception raised when Queue.get_nowait() is called on a Queue object
17 which is empty.
18 """
Guido van Rossumfef70982014-01-25 17:24:51 -080019 pass
20
21
22class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010023 """Exception raised when the Queue.put_nowait() method is called on a Queue
24 object which is full.
25 """
Guido van Rossumfef70982014-01-25 17:24:51 -080026 pass
27
28
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029class Queue:
30 """A queue, useful for coordinating producer and consumer coroutines.
31
32 If maxsize is less than or equal to zero, the queue size is infinite. If it
33 is an integer greater than 0, then "yield from put()" will block when the
34 queue reaches maxsize, until an item is removed by get().
35
36 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010037 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 interrupted between calling qsize() and doing an operation on the Queue.
39 """
40
41 def __init__(self, maxsize=0, *, loop=None):
42 if loop is None:
43 self._loop = events.get_event_loop()
44 else:
45 self._loop = loop
46 self._maxsize = maxsize
47
48 # Futures.
49 self._getters = collections.deque()
50 # Pairs of (item, Future).
51 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010052 self._unfinished_tasks = 0
53 self._finished = locks.Event(loop=self._loop)
54 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 self._init(maxsize)
56
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070057 # These three are overridable in subclasses.
58
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059 def _init(self, maxsize):
60 self._queue = collections.deque()
61
62 def _get(self):
63 return self._queue.popleft()
64
65 def _put(self, item):
66 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070067
68 # End of the overridable methods.
69
70 def __put_internal(self, item):
71 self._put(item)
Victor Stinner4cb814c2015-02-17 22:53:28 +010072 self._unfinished_tasks += 1
73 self._finished.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074
75 def __repr__(self):
76 return '<{} at {:#x} {}>'.format(
77 type(self).__name__, id(self), self._format())
78
79 def __str__(self):
80 return '<{} {}>'.format(type(self).__name__, self._format())
81
82 def _format(self):
83 result = 'maxsize={!r}'.format(self._maxsize)
84 if getattr(self, '_queue', None):
85 result += ' _queue={!r}'.format(list(self._queue))
86 if self._getters:
87 result += ' _getters[{}]'.format(len(self._getters))
88 if self._putters:
89 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010090 if self._unfinished_tasks:
91 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 return result
93
94 def _consume_done_getters(self):
95 # Delete waiters at the head of the get() queue who've timed out.
96 while self._getters and self._getters[0].done():
97 self._getters.popleft()
98
99 def _consume_done_putters(self):
100 # Delete waiters at the head of the put() queue who've timed out.
101 while self._putters and self._putters[0][1].done():
102 self._putters.popleft()
103
104 def qsize(self):
105 """Number of items in the queue."""
106 return len(self._queue)
107
108 @property
109 def maxsize(self):
110 """Number of items allowed in the queue."""
111 return self._maxsize
112
113 def empty(self):
114 """Return True if the queue is empty, False otherwise."""
115 return not self._queue
116
117 def full(self):
118 """Return True if there are maxsize items in the queue.
119
120 Note: if the Queue was initialized with maxsize=0 (the default),
121 then full() is never True.
122 """
123 if self._maxsize <= 0:
124 return False
125 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200126 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 @coroutine
129 def put(self, item):
130 """Put an item into the queue.
131
Victor Stinner952ec982014-12-22 22:09:50 +0100132 Put an item into the queue. If the queue is full, wait until a free
133 slot is available before adding item.
134
135 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 """
137 self._consume_done_getters()
138 if self._getters:
139 assert not self._queue, (
140 'queue non-empty, why are getters waiting?')
141
142 getter = self._getters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700143 self.__put_internal(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100144
145 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 getter.set_result(self._get())
147
Victor Stinner66dc6b02014-06-17 23:36:21 +0200148 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 waiter = futures.Future(loop=self._loop)
150
151 self._putters.append((item, waiter))
152 yield from waiter
153
154 else:
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700155 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156
157 def put_nowait(self, item):
158 """Put an item into the queue without blocking.
159
Guido van Rossumfef70982014-01-25 17:24:51 -0800160 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 """
162 self._consume_done_getters()
163 if self._getters:
164 assert not self._queue, (
165 'queue non-empty, why are getters waiting?')
166
167 getter = self._getters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700168 self.__put_internal(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100169
170 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 getter.set_result(self._get())
172
Victor Stinner66dc6b02014-06-17 23:36:21 +0200173 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800174 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 else:
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700176 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177
178 @coroutine
179 def get(self):
180 """Remove and return an item from the queue.
181
Victor Stinner952ec982014-12-22 22:09:50 +0100182 If queue is empty, wait until an item is available.
183
184 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 """
186 self._consume_done_putters()
187 if self._putters:
188 assert self.full(), 'queue not full, why are putters waiting?'
189 item, putter = self._putters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700190 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700191
192 # When a getter runs and frees up a slot so this putter can
193 # run, we need to defer the put for a tick to ensure that
194 # getters and putters alternate perfectly. See
195 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200196 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700197
198 return self._get()
199
200 elif self.qsize():
201 return self._get()
202 else:
203 waiter = futures.Future(loop=self._loop)
204
205 self._getters.append(waiter)
206 return (yield from waiter)
207
208 def get_nowait(self):
209 """Remove and return an item from the queue.
210
Guido van Rossumfef70982014-01-25 17:24:51 -0800211 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 """
213 self._consume_done_putters()
214 if self._putters:
215 assert self.full(), 'queue not full, why are putters waiting?'
216 item, putter = self._putters.popleft()
Guido van Rossum0bd16bc2015-04-20 09:24:24 -0700217 self.__put_internal(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100219
220 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221 putter.set_result(None)
222
223 return self._get()
224
225 elif self.qsize():
226 return self._get()
227 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800228 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229
Victor Stinner4cb814c2015-02-17 22:53:28 +0100230 def task_done(self):
231 """Indicate that a formerly enqueued task is complete.
232
233 Used by queue consumers. For each get() used to fetch a task,
234 a subsequent call to task_done() tells the queue that the processing
235 on the task is complete.
236
237 If a join() is currently blocking, it will resume when all items have
238 been processed (meaning that a task_done() call was received for every
239 item that had been put() into the queue).
240
241 Raises ValueError if called more times than there were items placed in
242 the queue.
243 """
244 if self._unfinished_tasks <= 0:
245 raise ValueError('task_done() called too many times')
246 self._unfinished_tasks -= 1
247 if self._unfinished_tasks == 0:
248 self._finished.set()
249
250 @coroutine
251 def join(self):
252 """Block until all items in the queue have been gotten and processed.
253
254 The count of unfinished tasks goes up whenever an item is added to the
255 queue. The count goes down whenever a consumer calls task_done() to
256 indicate that the item was retrieved and all work on it is complete.
257 When the count of unfinished tasks drops to zero, join() unblocks.
258 """
259 if self._unfinished_tasks > 0:
260 yield from self._finished.wait()
261
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262
263class PriorityQueue(Queue):
264 """A subclass of Queue; retrieves entries in priority order (lowest first).
265
266 Entries are typically tuples of the form: (priority number, data).
267 """
268
269 def _init(self, maxsize):
270 self._queue = []
271
272 def _put(self, item, heappush=heapq.heappush):
273 heappush(self._queue, item)
274
275 def _get(self, heappop=heapq.heappop):
276 return heappop(self._queue)
277
278
279class LifoQueue(Queue):
280 """A subclass of Queue that retrieves most recently added entries first."""
281
282 def _init(self, maxsize):
283 self._queue = []
284
285 def _put(self, item):
286 self._queue.append(item)
287
288 def _get(self):
289 return self._queue.pop()
290
291
Victor Stinner4cb814c2015-02-17 22:53:28 +0100292JoinableQueue = Queue
293"""Deprecated alias for Queue."""