blob: 2d38972c0de29d70f88c651ae3cf7e29f2f3249e [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinnereaf16ab2015-07-25 02:40:40 +02003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004
5import collections
6import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
Victor Stinnereaf16ab2015-07-25 02:40:40 +02008from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010from . import locks
Victor Stinnere6ecea52015-07-09 23:13:50 +020011from .coroutines import coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossumfef70982014-01-25 17:24:51 -080014class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010015 """Exception raised when Queue.get_nowait() is called on a Queue object
16 which is empty.
17 """
Guido van Rossumfef70982014-01-25 17:24:51 -080018 pass
19
20
21class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010022 """Exception raised when the Queue.put_nowait() method is called on a Queue
23 object which is full.
24 """
Guido van Rossumfef70982014-01-25 17:24:51 -080025 pass
26
27
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028class Queue:
29 """A queue, useful for coordinating producer and consumer coroutines.
30
31 If maxsize is less than or equal to zero, the queue size is infinite. If it
32 is an integer greater than 0, then "yield from put()" will block when the
33 queue reaches maxsize, until an item is removed by get().
34
35 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010036 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 interrupted between calling qsize() and doing an operation on the Queue.
38 """
39
40 def __init__(self, maxsize=0, *, loop=None):
41 if loop is None:
42 self._loop = events.get_event_loop()
43 else:
44 self._loop = loop
45 self._maxsize = maxsize
46
47 # Futures.
48 self._getters = collections.deque()
Guido van Rossum99f96c52015-09-28 07:42:34 -070049 # Futures.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010051 self._unfinished_tasks = 0
52 self._finished = locks.Event(loop=self._loop)
53 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 self._init(maxsize)
55
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070056 # These three are overridable in subclasses.
57
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 def _init(self, maxsize):
59 self._queue = collections.deque()
60
61 def _get(self):
62 return self._queue.popleft()
63
64 def _put(self, item):
65 self._queue.append(item)
Guido van Rossum0bd16bc2015-04-20 09:24:24 -070066
67 # End of the overridable methods.
68
Guido van Rossum99f96c52015-09-28 07:42:34 -070069 def _wakeup_next(self, waiters):
70 # Wake up the next waiter (if any) that isn't cancelled.
71 while waiters:
72 waiter = waiters.popleft()
73 if not waiter.done():
74 waiter.set_result(None)
75 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076
77 def __repr__(self):
78 return '<{} at {:#x} {}>'.format(
79 type(self).__name__, id(self), self._format())
80
81 def __str__(self):
82 return '<{} {}>'.format(type(self).__name__, self._format())
83
84 def _format(self):
85 result = 'maxsize={!r}'.format(self._maxsize)
86 if getattr(self, '_queue', None):
87 result += ' _queue={!r}'.format(list(self._queue))
88 if self._getters:
89 result += ' _getters[{}]'.format(len(self._getters))
90 if self._putters:
91 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010092 if self._unfinished_tasks:
93 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 return result
95
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 def qsize(self):
97 """Number of items in the queue."""
98 return len(self._queue)
99
100 @property
101 def maxsize(self):
102 """Number of items allowed in the queue."""
103 return self._maxsize
104
105 def empty(self):
106 """Return True if the queue is empty, False otherwise."""
107 return not self._queue
108
109 def full(self):
110 """Return True if there are maxsize items in the queue.
111
112 Note: if the Queue was initialized with maxsize=0 (the default),
113 then full() is never True.
114 """
115 if self._maxsize <= 0:
116 return False
117 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200118 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 @coroutine
121 def put(self, item):
122 """Put an item into the queue.
123
Victor Stinner952ec982014-12-22 22:09:50 +0100124 Put an item into the queue. If the queue is full, wait until a free
125 slot is available before adding item.
126
127 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700129 while self.full():
Yury Selivanov7661db62016-05-16 15:38:39 -0400130 putter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700131 self._putters.append(putter)
132 try:
133 yield from putter
134 except:
135 putter.cancel() # Just in case putter is not done yet.
136 if not self.full() and not putter.cancelled():
137 # We were woken up by get_nowait(), but can't take
138 # the call. Wake up the next in line.
139 self._wakeup_next(self._putters)
140 raise
141 return self.put_nowait(item)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 def put_nowait(self, item):
144 """Put an item into the queue without blocking.
145
Guido van Rossumfef70982014-01-25 17:24:51 -0800146 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700148 if self.full():
Guido van Rossumfef70982014-01-25 17:24:51 -0800149 raise QueueFull
Guido van Rossum99f96c52015-09-28 07:42:34 -0700150 self._put(item)
151 self._unfinished_tasks += 1
152 self._finished.clear()
153 self._wakeup_next(self._getters)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154
155 @coroutine
156 def get(self):
157 """Remove and return an item from the queue.
158
Victor Stinner952ec982014-12-22 22:09:50 +0100159 If queue is empty, wait until an item is available.
160
161 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700163 while self.empty():
Yury Selivanov7661db62016-05-16 15:38:39 -0400164 getter = self._loop.create_future()
Guido van Rossum99f96c52015-09-28 07:42:34 -0700165 self._getters.append(getter)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400166 try:
Guido van Rossum99f96c52015-09-28 07:42:34 -0700167 yield from getter
168 except:
169 getter.cancel() # Just in case getter is not done yet.
170 if not self.empty() and not getter.cancelled():
171 # We were woken up by put_nowait(), but can't take
172 # the call. Wake up the next in line.
173 self._wakeup_next(self._getters)
Yury Selivanov3fc0f2d2015-08-05 13:52:33 -0400174 raise
Guido van Rossum99f96c52015-09-28 07:42:34 -0700175 return self.get_nowait()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700176
177 def get_nowait(self):
178 """Remove and return an item from the queue.
179
Guido van Rossumfef70982014-01-25 17:24:51 -0800180 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700181 """
Guido van Rossum99f96c52015-09-28 07:42:34 -0700182 if self.empty():
Guido van Rossumfef70982014-01-25 17:24:51 -0800183 raise QueueEmpty
Guido van Rossum99f96c52015-09-28 07:42:34 -0700184 item = self._get()
185 self._wakeup_next(self._putters)
186 return item
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187
Victor Stinner4cb814c2015-02-17 22:53:28 +0100188 def task_done(self):
189 """Indicate that a formerly enqueued task is complete.
190
191 Used by queue consumers. For each get() used to fetch a task,
192 a subsequent call to task_done() tells the queue that the processing
193 on the task is complete.
194
195 If a join() is currently blocking, it will resume when all items have
196 been processed (meaning that a task_done() call was received for every
197 item that had been put() into the queue).
198
199 Raises ValueError if called more times than there were items placed in
200 the queue.
201 """
202 if self._unfinished_tasks <= 0:
203 raise ValueError('task_done() called too many times')
204 self._unfinished_tasks -= 1
205 if self._unfinished_tasks == 0:
206 self._finished.set()
207
208 @coroutine
209 def join(self):
210 """Block until all items in the queue have been gotten and processed.
211
212 The count of unfinished tasks goes up whenever an item is added to the
213 queue. The count goes down whenever a consumer calls task_done() to
214 indicate that the item was retrieved and all work on it is complete.
215 When the count of unfinished tasks drops to zero, join() unblocks.
216 """
217 if self._unfinished_tasks > 0:
218 yield from self._finished.wait()
219
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220
221class PriorityQueue(Queue):
222 """A subclass of Queue; retrieves entries in priority order (lowest first).
223
224 Entries are typically tuples of the form: (priority number, data).
225 """
226
227 def _init(self, maxsize):
228 self._queue = []
229
230 def _put(self, item, heappush=heapq.heappush):
231 heappush(self._queue, item)
232
233 def _get(self, heappop=heapq.heappop):
234 return heappop(self._queue)
235
236
237class LifoQueue(Queue):
238 """A subclass of Queue that retrieves most recently added entries first."""
239
240 def _init(self, maxsize):
241 self._queue = []
242
243 def _put(self, item):
244 self._queue.append(item)
245
246 def _get(self):
247 return self._queue.pop()
248
249
Victor Stinnereaf16ab2015-07-25 02:40:40 +0200250if not compat.PY35:
251 JoinableQueue = Queue
252 """Deprecated alias for Queue."""
253 __all__.append('JoinableQueue')