blob: c3296fe1382ce77d608894c60712ff8e22edf3f2 [file] [log] [blame]
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +00001'''A multi-producer, multi-consumer queue.'''
Guido van Rossum9022fce1992-08-25 12:30:44 +00002
Benjamin Peterson0ed52452009-03-21 17:36:10 +00003try:
Raymond Hettinger143f51a2012-01-09 05:32:01 +00004 import threading
Benjamin Peterson0ed52452009-03-21 17:36:10 +00005except ImportError:
Raymond Hettinger7b7caa82012-01-09 20:02:24 +00006 import dummy_threading as threading
Raymond Hettinger756b3f32004-01-29 06:37:52 +00007from collections import deque
Raymond Hettinger143f51a2012-01-09 05:32:01 +00008from heapq import heappush, heappop
Victor Stinnerec895392012-04-29 02:41:27 +02009try:
10 from time import monotonic as time
11except ImportError:
12 from time import time
Martin v. Löwis77ac4292002-10-15 15:11:13 +000013
Raymond Hettinger35641462008-01-17 00:13:27 +000014__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
Brett Cannonb42bb5a2003-07-01 05:34:27 +000015
Tim Petersb8c94172001-01-15 22:53:46 +000016class Empty(Exception):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000017 'Exception raised by Queue.get(block=0)/get_nowait().'
Tim Petersb8c94172001-01-15 22:53:46 +000018 pass
19
20class Full(Exception):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000021 'Exception raised by Queue.put(block=0)/put_nowait().'
Tim Petersb8c94172001-01-15 22:53:46 +000022 pass
Guido van Rossum9022fce1992-08-25 12:30:44 +000023
24class Queue:
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000025 '''Create a queue object with a given maximum size.
Guido van Rossum9022fce1992-08-25 12:30:44 +000026
Thomas Wouters0e3f5912006-08-11 14:57:12 +000027 If maxsize is <= 0, the queue size is infinite.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000028 '''
29
Thomas Wouters0e3f5912006-08-11 14:57:12 +000030 def __init__(self, maxsize=0):
Raymond Hettingerda3caed2008-01-14 21:39:24 +000031 self.maxsize = maxsize
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000032 self._init(maxsize)
Raymond Hettinger75404272012-01-07 15:32:52 -080033
Tim Peters5af0e412004-07-12 00:45:14 +000034 # mutex must be held whenever the queue is mutating. All methods
35 # that acquire mutex must release it before returning. mutex
Thomas Wouters89f507f2006-12-13 04:49:30 +000036 # is shared between the three conditions, so acquiring and
Tim Peters5af0e412004-07-12 00:45:14 +000037 # releasing the conditions also acquires and releases mutex.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000038 self.mutex = threading.Lock()
Raymond Hettinger75404272012-01-07 15:32:52 -080039
Tim Peters5af0e412004-07-12 00:45:14 +000040 # Notify not_empty whenever an item is added to the queue; a
41 # thread waiting to get is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000042 self.not_empty = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080043
Tim Peters5af0e412004-07-12 00:45:14 +000044 # Notify not_full whenever an item is removed from the queue;
45 # a thread waiting to put is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000046 self.not_full = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080047
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000048 # Notify all_tasks_done whenever the number of unfinished tasks
49 # drops to zero; thread waiting to join() is notified to resume
Raymond Hettinger143f51a2012-01-09 05:32:01 +000050 self.all_tasks_done = threading.Condition(self.mutex)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000051 self.unfinished_tasks = 0
52
53 def task_done(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000054 '''Indicate that a formerly enqueued task is complete.
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000055
56 Used by Queue consumer threads. For each get() used to fetch a task,
57 a subsequent call to task_done() tells the queue that the processing
58 on the task is complete.
59
60 If a join() is currently blocking, it will resume when all items
61 have been processed (meaning that a task_done() call was received
62 for every item that had been put() into the queue).
63
64 Raises a ValueError if called more times than there were items
65 placed in the queue.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000066 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080067 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000068 unfinished = self.unfinished_tasks - 1
69 if unfinished <= 0:
70 if unfinished < 0:
71 raise ValueError('task_done() called too many times')
Benjamin Peterson672b8032008-06-11 19:14:14 +000072 self.all_tasks_done.notify_all()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000073 self.unfinished_tasks = unfinished
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000074
75 def join(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000076 '''Blocks until all items in the Queue have been gotten and processed.
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000077
78 The count of unfinished tasks goes up whenever an item is added to the
79 queue. The count goes down whenever a consumer thread calls task_done()
80 to indicate the item was retrieved and all work on it is complete.
81
82 When the count of unfinished tasks drops to zero, join() unblocks.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000083 '''
Raymond Hettinger75404272012-01-07 15:32:52 -080084 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000085 while self.unfinished_tasks:
86 self.all_tasks_done.wait()
Guido van Rossum9022fce1992-08-25 12:30:44 +000087
Barry Warsaw3d96d521997-11-20 19:56:38 +000088 def qsize(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000089 '''Return the approximate size of the queue (not reliable!).'''
Raymond Hettinger75404272012-01-07 15:32:52 -080090 with self.mutex:
91 return self._qsize()
Barry Warsaw3d96d521997-11-20 19:56:38 +000092
Alexandre Vassalottif260e442008-05-11 19:59:59 +000093 def empty(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +000094 '''Return True if the queue is empty, False otherwise (not reliable!).
Raymond Hettinger611eaf02009-03-06 23:55:28 +000095
96 This method is likely to be removed at some point. Use qsize() == 0
97 as a direct substitute, but be aware that either approach risks a race
98 condition where a queue can grow before the result of empty() or
99 qsize() can be used.
100
101 To create code that needs to wait for all queued tasks to be
102 completed, the preferred technique is to use the join() method.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000103 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800104 with self.mutex:
105 return not self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000106
107 def full(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000108 '''Return True if the queue is full, False otherwise (not reliable!).
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000109
Raymond Hettinger189316a2010-10-31 17:57:52 +0000110 This method is likely to be removed at some point. Use qsize() >= n
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000111 as a direct substitute, but be aware that either approach risks a race
112 condition where a queue can shrink before the result of full() or
113 qsize() can be used.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000114 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800115 with self.mutex:
116 return 0 < self.maxsize <= self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000117
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000118 def put(self, item, block=True, timeout=None):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000119 '''Put an item into the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000120
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000121 If optional args 'block' is true and 'timeout' is None (the default),
122 block if necessary until a free slot is available. If 'timeout' is
123 a positive number, it blocks at most 'timeout' seconds and raises
124 the Full exception if no free slot was available within that time.
125 Otherwise ('block' is false), put an item on the queue if a free slot
126 is immediately available, else raise the Full exception ('timeout'
127 is ignored in that case).
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000128 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800129 with self.not_full:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000130 if self.maxsize > 0:
131 if not block:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000132 if self._qsize() >= self.maxsize:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000133 raise Full
134 elif timeout is None:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000135 while self._qsize() >= self.maxsize:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000136 self.not_full.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000137 elif timeout < 0:
Tim Peters5af0e412004-07-12 00:45:14 +0000138 raise ValueError("'timeout' must be a positive number")
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000139 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000140 endtime = time() + timeout
Raymond Hettinger189316a2010-10-31 17:57:52 +0000141 while self._qsize() >= self.maxsize:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000142 remaining = endtime - time()
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000143 if remaining <= 0.0:
144 raise Full
145 self.not_full.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000146 self._put(item)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000147 self.unfinished_tasks += 1
Tim Peters5af0e412004-07-12 00:45:14 +0000148 self.not_empty.notify()
Barry Warsaw3d96d521997-11-20 19:56:38 +0000149
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000150 def get(self, block=True, timeout=None):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000151 '''Remove and return an item from the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000152
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000153 If optional args 'block' is true and 'timeout' is None (the default),
154 block if necessary until an item is available. If 'timeout' is
155 a positive number, it blocks at most 'timeout' seconds and raises
156 the Empty exception if no item was available within that time.
157 Otherwise ('block' is false), return an item if one is immediately
158 available, else raise the Empty exception ('timeout' is ignored
159 in that case).
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000160 '''
Raymond Hettinger75404272012-01-07 15:32:52 -0800161 with self.not_empty:
Tim Peters71ed2202004-07-12 01:20:32 +0000162 if not block:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000163 if not self._qsize():
Tim Peters71ed2202004-07-12 01:20:32 +0000164 raise Empty
165 elif timeout is None:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000166 while not self._qsize():
Tim Peters5af0e412004-07-12 00:45:14 +0000167 self.not_empty.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000168 elif timeout < 0:
169 raise ValueError("'timeout' must be a positive number")
Tim Peters5af0e412004-07-12 00:45:14 +0000170 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000171 endtime = time() + timeout
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000172 while not self._qsize():
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000173 remaining = endtime - time()
Tim Peters71ed2202004-07-12 01:20:32 +0000174 if remaining <= 0.0:
Tim Peters5af0e412004-07-12 00:45:14 +0000175 raise Empty
176 self.not_empty.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000177 item = self._get()
Tim Peters5af0e412004-07-12 00:45:14 +0000178 self.not_full.notify()
179 return item
Guido van Rossum9022fce1992-08-25 12:30:44 +0000180
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000181 def put_nowait(self, item):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000182 '''Put an item into the queue without blocking.
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000183
184 Only enqueue the item if a free slot is immediately available.
185 Otherwise raise the Full exception.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000186 '''
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000187 return self.put(item, block=False)
188
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000189 def get_nowait(self):
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000190 '''Remove and return an item from the queue without blocking.
Guido van Rossum9022fce1992-08-25 12:30:44 +0000191
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000192 Only get an item if one is immediately available. Otherwise
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000193 raise the Empty exception.
Raymond Hettinger0c5e52f2012-01-09 06:17:39 +0000194 '''
Raymond Hettinger61bd7292012-01-09 06:02:08 +0000195 return self.get(block=False)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000196
Barry Warsaw3d96d521997-11-20 19:56:38 +0000197 # Override these methods to implement other queue organizations
198 # (e.g. stack or priority queue).
199 # These will only be called with appropriate locks held
Guido van Rossum9022fce1992-08-25 12:30:44 +0000200
Barry Warsaw3d96d521997-11-20 19:56:38 +0000201 # Initialize the queue representation
202 def _init(self, maxsize):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000203 self.queue = deque()
Guido van Rossum9022fce1992-08-25 12:30:44 +0000204
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000205 def _qsize(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000206 return len(self.queue)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000207
Barry Warsaw3d96d521997-11-20 19:56:38 +0000208 # Put a new item in the queue
209 def _put(self, item):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000210 self.queue.append(item)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000211
Barry Warsaw3d96d521997-11-20 19:56:38 +0000212 # Get an item from the queue
213 def _get(self):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000214 return self.queue.popleft()
Raymond Hettinger35641462008-01-17 00:13:27 +0000215
216
217class PriorityQueue(Queue):
218 '''Variant of Queue that retrieves open entries in priority order (lowest first).
219
220 Entries are typically tuples of the form: (priority number, data).
221 '''
222
223 def _init(self, maxsize):
224 self.queue = []
225
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000226 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000227 return len(self.queue)
228
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000229 def _put(self, item):
Raymond Hettinger35641462008-01-17 00:13:27 +0000230 heappush(self.queue, item)
231
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000232 def _get(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000233 return heappop(self.queue)
234
235
236class LifoQueue(Queue):
237 '''Variant of Queue that retrieves most recently added entries first.'''
238
239 def _init(self, maxsize):
240 self.queue = []
241
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000242 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000243 return len(self.queue)
244
245 def _put(self, item):
246 self.queue.append(item)
247
248 def _get(self):
249 return self.queue.pop()