blob: 264e1ce8b367d9f96d412d66d6ea592efe6147d1 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
R David Murraye81a7732015-04-12 18:47:56 -04003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8from . import events
9from . import futures
10from . import locks
11from .tasks import coroutine
12
13
Guido van Rossumfef70982014-01-25 17:24:51 -080014class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010015 """Exception raised when Queue.get_nowait() is called on a Queue object
16 which is empty.
17 """
Guido van Rossumfef70982014-01-25 17:24:51 -080018 pass
19
20
21class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010022 """Exception raised when the Queue.put_nowait() method is called on a Queue
23 object which is full.
24 """
Guido van Rossumfef70982014-01-25 17:24:51 -080025 pass
26
27
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028class Queue:
29 """A queue, useful for coordinating producer and consumer coroutines.
30
31 If maxsize is less than or equal to zero, the queue size is infinite. If it
32 is an integer greater than 0, then "yield from put()" will block when the
33 queue reaches maxsize, until an item is removed by get().
34
35 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010036 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 interrupted between calling qsize() and doing an operation on the Queue.
38 """
39
40 def __init__(self, maxsize=0, *, loop=None):
41 if loop is None:
42 self._loop = events.get_event_loop()
43 else:
44 self._loop = loop
45 self._maxsize = maxsize
46
47 # Futures.
48 self._getters = collections.deque()
49 # Pairs of (item, Future).
50 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010051 self._unfinished_tasks = 0
52 self._finished = locks.Event(loop=self._loop)
53 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 self._init(maxsize)
55
56 def _init(self, maxsize):
57 self._queue = collections.deque()
58
59 def _get(self):
60 return self._queue.popleft()
61
62 def _put(self, item):
63 self._queue.append(item)
Victor Stinner4cb814c2015-02-17 22:53:28 +010064 self._unfinished_tasks += 1
65 self._finished.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
67 def __repr__(self):
68 return '<{} at {:#x} {}>'.format(
69 type(self).__name__, id(self), self._format())
70
71 def __str__(self):
72 return '<{} {}>'.format(type(self).__name__, self._format())
73
74 def _format(self):
75 result = 'maxsize={!r}'.format(self._maxsize)
76 if getattr(self, '_queue', None):
77 result += ' _queue={!r}'.format(list(self._queue))
78 if self._getters:
79 result += ' _getters[{}]'.format(len(self._getters))
80 if self._putters:
81 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010082 if self._unfinished_tasks:
83 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 return result
85
86 def _consume_done_getters(self):
87 # Delete waiters at the head of the get() queue who've timed out.
88 while self._getters and self._getters[0].done():
89 self._getters.popleft()
90
91 def _consume_done_putters(self):
92 # Delete waiters at the head of the put() queue who've timed out.
93 while self._putters and self._putters[0][1].done():
94 self._putters.popleft()
95
96 def qsize(self):
97 """Number of items in the queue."""
98 return len(self._queue)
99
100 @property
101 def maxsize(self):
102 """Number of items allowed in the queue."""
103 return self._maxsize
104
105 def empty(self):
106 """Return True if the queue is empty, False otherwise."""
107 return not self._queue
108
109 def full(self):
110 """Return True if there are maxsize items in the queue.
111
112 Note: if the Queue was initialized with maxsize=0 (the default),
113 then full() is never True.
114 """
115 if self._maxsize <= 0:
116 return False
117 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200118 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 @coroutine
121 def put(self, item):
122 """Put an item into the queue.
123
Victor Stinner952ec982014-12-22 22:09:50 +0100124 Put an item into the queue. If the queue is full, wait until a free
125 slot is available before adding item.
126
127 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
129 self._consume_done_getters()
130 if self._getters:
131 assert not self._queue, (
132 'queue non-empty, why are getters waiting?')
133
134 getter = self._getters.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100136
137 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 getter.set_result(self._get())
139
Victor Stinner66dc6b02014-06-17 23:36:21 +0200140 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700141 waiter = futures.Future(loop=self._loop)
142
143 self._putters.append((item, waiter))
144 yield from waiter
145
146 else:
147 self._put(item)
148
149 def put_nowait(self, item):
150 """Put an item into the queue without blocking.
151
Guido van Rossumfef70982014-01-25 17:24:51 -0800152 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153 """
154 self._consume_done_getters()
155 if self._getters:
156 assert not self._queue, (
157 'queue non-empty, why are getters waiting?')
158
159 getter = self._getters.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100161
162 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 getter.set_result(self._get())
164
Victor Stinner66dc6b02014-06-17 23:36:21 +0200165 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800166 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 else:
168 self._put(item)
169
170 @coroutine
171 def get(self):
172 """Remove and return an item from the queue.
173
Victor Stinner952ec982014-12-22 22:09:50 +0100174 If queue is empty, wait until an item is available.
175
176 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 """
178 self._consume_done_putters()
179 if self._putters:
180 assert self.full(), 'queue not full, why are putters waiting?'
181 item, putter = self._putters.popleft()
182 self._put(item)
183
184 # When a getter runs and frees up a slot so this putter can
185 # run, we need to defer the put for a tick to ensure that
186 # getters and putters alternate perfectly. See
187 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200188 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189
190 return self._get()
191
192 elif self.qsize():
193 return self._get()
194 else:
195 waiter = futures.Future(loop=self._loop)
196
197 self._getters.append(waiter)
198 return (yield from waiter)
199
200 def get_nowait(self):
201 """Remove and return an item from the queue.
202
Guido van Rossumfef70982014-01-25 17:24:51 -0800203 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 """
205 self._consume_done_putters()
206 if self._putters:
207 assert self.full(), 'queue not full, why are putters waiting?'
208 item, putter = self._putters.popleft()
209 self._put(item)
210 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100211
212 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 putter.set_result(None)
214
215 return self._get()
216
217 elif self.qsize():
218 return self._get()
219 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800220 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700221
Victor Stinner4cb814c2015-02-17 22:53:28 +0100222 def task_done(self):
223 """Indicate that a formerly enqueued task is complete.
224
225 Used by queue consumers. For each get() used to fetch a task,
226 a subsequent call to task_done() tells the queue that the processing
227 on the task is complete.
228
229 If a join() is currently blocking, it will resume when all items have
230 been processed (meaning that a task_done() call was received for every
231 item that had been put() into the queue).
232
233 Raises ValueError if called more times than there were items placed in
234 the queue.
235 """
236 if self._unfinished_tasks <= 0:
237 raise ValueError('task_done() called too many times')
238 self._unfinished_tasks -= 1
239 if self._unfinished_tasks == 0:
240 self._finished.set()
241
242 @coroutine
243 def join(self):
244 """Block until all items in the queue have been gotten and processed.
245
246 The count of unfinished tasks goes up whenever an item is added to the
247 queue. The count goes down whenever a consumer calls task_done() to
248 indicate that the item was retrieved and all work on it is complete.
249 When the count of unfinished tasks drops to zero, join() unblocks.
250 """
251 if self._unfinished_tasks > 0:
252 yield from self._finished.wait()
253
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254
255class PriorityQueue(Queue):
256 """A subclass of Queue; retrieves entries in priority order (lowest first).
257
258 Entries are typically tuples of the form: (priority number, data).
259 """
260
261 def _init(self, maxsize):
262 self._queue = []
263
264 def _put(self, item, heappush=heapq.heappush):
265 heappush(self._queue, item)
266
267 def _get(self, heappop=heapq.heappop):
268 return heappop(self._queue)
269
270
271class LifoQueue(Queue):
272 """A subclass of Queue that retrieves most recently added entries first."""
273
274 def _init(self, maxsize):
275 self._queue = []
276
277 def _put(self, item):
278 self._queue.append(item)
279
280 def _get(self):
281 return self._queue.pop()