blob: 84cdabcf54fb007fc8bd877eeb1f904bfc746f2a [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Queues"""
2
Victor Stinner4cb814c2015-02-17 22:53:28 +01003__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty',
4 'JoinableQueue']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005
6import collections
7import heapq
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9from . import events
10from . import futures
11from . import locks
12from .tasks import coroutine
13
14
Guido van Rossumfef70982014-01-25 17:24:51 -080015class QueueEmpty(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010016 """Exception raised when Queue.get_nowait() is called on a Queue object
17 which is empty.
18 """
Guido van Rossumfef70982014-01-25 17:24:51 -080019 pass
20
21
22class QueueFull(Exception):
Victor Stinner17d87f82015-02-03 15:09:24 +010023 """Exception raised when the Queue.put_nowait() method is called on a Queue
24 object which is full.
25 """
Guido van Rossumfef70982014-01-25 17:24:51 -080026 pass
27
28
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029class Queue:
30 """A queue, useful for coordinating producer and consumer coroutines.
31
32 If maxsize is less than or equal to zero, the queue size is infinite. If it
33 is an integer greater than 0, then "yield from put()" will block when the
34 queue reaches maxsize, until an item is removed by get().
35
36 Unlike the standard library Queue, you can reliably know this Queue's size
Victor Stinner2748bc72013-12-13 10:57:04 +010037 with qsize(), since your single-threaded asyncio application won't be
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 interrupted between calling qsize() and doing an operation on the Queue.
39 """
40
41 def __init__(self, maxsize=0, *, loop=None):
42 if loop is None:
43 self._loop = events.get_event_loop()
44 else:
45 self._loop = loop
46 self._maxsize = maxsize
47
48 # Futures.
49 self._getters = collections.deque()
50 # Pairs of (item, Future).
51 self._putters = collections.deque()
Victor Stinner4cb814c2015-02-17 22:53:28 +010052 self._unfinished_tasks = 0
53 self._finished = locks.Event(loop=self._loop)
54 self._finished.set()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 self._init(maxsize)
56
57 def _init(self, maxsize):
58 self._queue = collections.deque()
59
60 def _get(self):
61 return self._queue.popleft()
62
63 def _put(self, item):
64 self._queue.append(item)
Victor Stinner4cb814c2015-02-17 22:53:28 +010065 self._unfinished_tasks += 1
66 self._finished.clear()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067
68 def __repr__(self):
69 return '<{} at {:#x} {}>'.format(
70 type(self).__name__, id(self), self._format())
71
72 def __str__(self):
73 return '<{} {}>'.format(type(self).__name__, self._format())
74
75 def _format(self):
76 result = 'maxsize={!r}'.format(self._maxsize)
77 if getattr(self, '_queue', None):
78 result += ' _queue={!r}'.format(list(self._queue))
79 if self._getters:
80 result += ' _getters[{}]'.format(len(self._getters))
81 if self._putters:
82 result += ' _putters[{}]'.format(len(self._putters))
Victor Stinner4cb814c2015-02-17 22:53:28 +010083 if self._unfinished_tasks:
84 result += ' tasks={}'.format(self._unfinished_tasks)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 return result
86
87 def _consume_done_getters(self):
88 # Delete waiters at the head of the get() queue who've timed out.
89 while self._getters and self._getters[0].done():
90 self._getters.popleft()
91
92 def _consume_done_putters(self):
93 # Delete waiters at the head of the put() queue who've timed out.
94 while self._putters and self._putters[0][1].done():
95 self._putters.popleft()
96
97 def qsize(self):
98 """Number of items in the queue."""
99 return len(self._queue)
100
101 @property
102 def maxsize(self):
103 """Number of items allowed in the queue."""
104 return self._maxsize
105
106 def empty(self):
107 """Return True if the queue is empty, False otherwise."""
108 return not self._queue
109
110 def full(self):
111 """Return True if there are maxsize items in the queue.
112
113 Note: if the Queue was initialized with maxsize=0 (the default),
114 then full() is never True.
115 """
116 if self._maxsize <= 0:
117 return False
118 else:
Victor Stinner66dc6b02014-06-17 23:36:21 +0200119 return self.qsize() >= self._maxsize
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120
121 @coroutine
122 def put(self, item):
123 """Put an item into the queue.
124
Victor Stinner952ec982014-12-22 22:09:50 +0100125 Put an item into the queue. If the queue is full, wait until a free
126 slot is available before adding item.
127
128 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129 """
130 self._consume_done_getters()
131 if self._getters:
132 assert not self._queue, (
133 'queue non-empty, why are getters waiting?')
134
135 getter = self._getters.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100137
138 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 getter.set_result(self._get())
140
Victor Stinner66dc6b02014-06-17 23:36:21 +0200141 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 waiter = futures.Future(loop=self._loop)
143
144 self._putters.append((item, waiter))
145 yield from waiter
146
147 else:
148 self._put(item)
149
150 def put_nowait(self, item):
151 """Put an item into the queue without blocking.
152
Guido van Rossumfef70982014-01-25 17:24:51 -0800153 If no free slot is immediately available, raise QueueFull.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700154 """
155 self._consume_done_getters()
156 if self._getters:
157 assert not self._queue, (
158 'queue non-empty, why are getters waiting?')
159
160 getter = self._getters.popleft()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 self._put(item)
Victor Stinner3531d902015-01-09 01:42:52 +0100162
163 # getter cannot be cancelled, we just removed done getters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164 getter.set_result(self._get())
165
Victor Stinner66dc6b02014-06-17 23:36:21 +0200166 elif self._maxsize > 0 and self._maxsize <= self.qsize():
Guido van Rossumfef70982014-01-25 17:24:51 -0800167 raise QueueFull
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 else:
169 self._put(item)
170
171 @coroutine
172 def get(self):
173 """Remove and return an item from the queue.
174
Victor Stinner952ec982014-12-22 22:09:50 +0100175 If queue is empty, wait until an item is available.
176
177 This method is a coroutine.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 """
179 self._consume_done_putters()
180 if self._putters:
181 assert self.full(), 'queue not full, why are putters waiting?'
182 item, putter = self._putters.popleft()
183 self._put(item)
184
185 # When a getter runs and frees up a slot so this putter can
186 # run, we need to defer the put for a tick to ensure that
187 # getters and putters alternate perfectly. See
188 # ChannelTest.test_wait.
Victor Stinnera9acbe82014-07-05 15:29:41 +0200189 self._loop.call_soon(putter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190
191 return self._get()
192
193 elif self.qsize():
194 return self._get()
195 else:
196 waiter = futures.Future(loop=self._loop)
197
198 self._getters.append(waiter)
199 return (yield from waiter)
200
201 def get_nowait(self):
202 """Remove and return an item from the queue.
203
Guido van Rossumfef70982014-01-25 17:24:51 -0800204 Return an item if one is immediately available, else raise QueueEmpty.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205 """
206 self._consume_done_putters()
207 if self._putters:
208 assert self.full(), 'queue not full, why are putters waiting?'
209 item, putter = self._putters.popleft()
210 self._put(item)
211 # Wake putter on next tick.
Victor Stinner3531d902015-01-09 01:42:52 +0100212
213 # getter cannot be cancelled, we just removed done putters
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 putter.set_result(None)
215
216 return self._get()
217
218 elif self.qsize():
219 return self._get()
220 else:
Guido van Rossumfef70982014-01-25 17:24:51 -0800221 raise QueueEmpty
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222
Victor Stinner4cb814c2015-02-17 22:53:28 +0100223 def task_done(self):
224 """Indicate that a formerly enqueued task is complete.
225
226 Used by queue consumers. For each get() used to fetch a task,
227 a subsequent call to task_done() tells the queue that the processing
228 on the task is complete.
229
230 If a join() is currently blocking, it will resume when all items have
231 been processed (meaning that a task_done() call was received for every
232 item that had been put() into the queue).
233
234 Raises ValueError if called more times than there were items placed in
235 the queue.
236 """
237 if self._unfinished_tasks <= 0:
238 raise ValueError('task_done() called too many times')
239 self._unfinished_tasks -= 1
240 if self._unfinished_tasks == 0:
241 self._finished.set()
242
243 @coroutine
244 def join(self):
245 """Block until all items in the queue have been gotten and processed.
246
247 The count of unfinished tasks goes up whenever an item is added to the
248 queue. The count goes down whenever a consumer calls task_done() to
249 indicate that the item was retrieved and all work on it is complete.
250 When the count of unfinished tasks drops to zero, join() unblocks.
251 """
252 if self._unfinished_tasks > 0:
253 yield from self._finished.wait()
254
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255
256class PriorityQueue(Queue):
257 """A subclass of Queue; retrieves entries in priority order (lowest first).
258
259 Entries are typically tuples of the form: (priority number, data).
260 """
261
262 def _init(self, maxsize):
263 self._queue = []
264
265 def _put(self, item, heappush=heapq.heappush):
266 heappush(self._queue, item)
267
268 def _get(self, heappop=heapq.heappop):
269 return heappop(self._queue)
270
271
272class LifoQueue(Queue):
273 """A subclass of Queue that retrieves most recently added entries first."""
274
275 def _init(self, maxsize):
276 self._queue = []
277
278 def _put(self, item):
279 self._queue.append(item)
280
281 def _get(self):
282 return self._queue.pop()
283
284
Victor Stinner4cb814c2015-02-17 22:53:28 +0100285JoinableQueue = Queue
286"""Deprecated alias for Queue."""