blob: c65ba4b46dfb4feefd38bc6dad35d233170d895f [file] [log] [blame]
Guido van Rossum4acc25b2000-02-02 15:10:15 +00001"""A multi-producer, multi-consumer queue."""
Guido van Rossum9022fce1992-08-25 12:30:44 +00002
Benjamin Peterson0ed52452009-03-21 17:36:10 +00003try:
Raymond Hettinger143f51a2012-01-09 05:32:01 +00004 import threading
Benjamin Peterson0ed52452009-03-21 17:36:10 +00005except ImportError:
Raymond Hettinger143f51a2012-01-09 05:32:01 +00006 import dummythreading as threading
Raymond Hettinger756b3f32004-01-29 06:37:52 +00007from collections import deque
Raymond Hettinger143f51a2012-01-09 05:32:01 +00008from heapq import heappush, heappop
9from time import time
Martin v. Löwis77ac4292002-10-15 15:11:13 +000010
Raymond Hettinger35641462008-01-17 00:13:27 +000011__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
Brett Cannonb42bb5a2003-07-01 05:34:27 +000012
Tim Petersb8c94172001-01-15 22:53:46 +000013class Empty(Exception):
14 "Exception raised by Queue.get(block=0)/get_nowait()."
15 pass
16
17class Full(Exception):
18 "Exception raised by Queue.put(block=0)/put_nowait()."
19 pass
Guido van Rossum9022fce1992-08-25 12:30:44 +000020
21class Queue:
Thomas Wouters0e3f5912006-08-11 14:57:12 +000022 """Create a queue object with a given maximum size.
Guido van Rossum9022fce1992-08-25 12:30:44 +000023
Thomas Wouters0e3f5912006-08-11 14:57:12 +000024 If maxsize is <= 0, the queue size is infinite.
25 """
26 def __init__(self, maxsize=0):
Raymond Hettingerda3caed2008-01-14 21:39:24 +000027 self.maxsize = maxsize
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000028 self._init(maxsize)
Raymond Hettinger75404272012-01-07 15:32:52 -080029
Tim Peters5af0e412004-07-12 00:45:14 +000030 # mutex must be held whenever the queue is mutating. All methods
31 # that acquire mutex must release it before returning. mutex
Thomas Wouters89f507f2006-12-13 04:49:30 +000032 # is shared between the three conditions, so acquiring and
Tim Peters5af0e412004-07-12 00:45:14 +000033 # releasing the conditions also acquires and releases mutex.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000034 self.mutex = threading.Lock()
Raymond Hettinger75404272012-01-07 15:32:52 -080035
Tim Peters5af0e412004-07-12 00:45:14 +000036 # Notify not_empty whenever an item is added to the queue; a
37 # thread waiting to get is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000038 self.not_empty = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080039
Tim Peters5af0e412004-07-12 00:45:14 +000040 # Notify not_full whenever an item is removed from the queue;
41 # a thread waiting to put is notified then.
Raymond Hettinger143f51a2012-01-09 05:32:01 +000042 self.not_full = threading.Condition(self.mutex)
Raymond Hettinger75404272012-01-07 15:32:52 -080043
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000044 # Notify all_tasks_done whenever the number of unfinished tasks
45 # drops to zero; thread waiting to join() is notified to resume
Raymond Hettinger143f51a2012-01-09 05:32:01 +000046 self.all_tasks_done = threading.Condition(self.mutex)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000047 self.unfinished_tasks = 0
48
49 def task_done(self):
50 """Indicate that a formerly enqueued task is complete.
51
52 Used by Queue consumer threads. For each get() used to fetch a task,
53 a subsequent call to task_done() tells the queue that the processing
54 on the task is complete.
55
56 If a join() is currently blocking, it will resume when all items
57 have been processed (meaning that a task_done() call was received
58 for every item that had been put() into the queue).
59
60 Raises a ValueError if called more times than there were items
61 placed in the queue.
62 """
Raymond Hettinger75404272012-01-07 15:32:52 -080063 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000064 unfinished = self.unfinished_tasks - 1
65 if unfinished <= 0:
66 if unfinished < 0:
67 raise ValueError('task_done() called too many times')
Benjamin Peterson672b8032008-06-11 19:14:14 +000068 self.all_tasks_done.notify_all()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000069 self.unfinished_tasks = unfinished
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000070
71 def join(self):
72 """Blocks until all items in the Queue have been gotten and processed.
73
74 The count of unfinished tasks goes up whenever an item is added to the
75 queue. The count goes down whenever a consumer thread calls task_done()
76 to indicate the item was retrieved and all work on it is complete.
77
78 When the count of unfinished tasks drops to zero, join() unblocks.
79 """
Raymond Hettinger75404272012-01-07 15:32:52 -080080 with self.all_tasks_done:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000081 while self.unfinished_tasks:
82 self.all_tasks_done.wait()
Guido van Rossum9022fce1992-08-25 12:30:44 +000083
Barry Warsaw3d96d521997-11-20 19:56:38 +000084 def qsize(self):
Guido van Rossum9e1721f1999-02-08 18:34:01 +000085 """Return the approximate size of the queue (not reliable!)."""
Raymond Hettinger75404272012-01-07 15:32:52 -080086 with self.mutex:
87 return self._qsize()
Barry Warsaw3d96d521997-11-20 19:56:38 +000088
Alexandre Vassalottif260e442008-05-11 19:59:59 +000089 def empty(self):
Raymond Hettinger611eaf02009-03-06 23:55:28 +000090 """Return True if the queue is empty, False otherwise (not reliable!).
91
92 This method is likely to be removed at some point. Use qsize() == 0
93 as a direct substitute, but be aware that either approach risks a race
94 condition where a queue can grow before the result of empty() or
95 qsize() can be used.
96
97 To create code that needs to wait for all queued tasks to be
98 completed, the preferred technique is to use the join() method.
99
100 """
Raymond Hettinger75404272012-01-07 15:32:52 -0800101 with self.mutex:
102 return not self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000103
104 def full(self):
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000105 """Return True if the queue is full, False otherwise (not reliable!).
106
Raymond Hettinger189316a2010-10-31 17:57:52 +0000107 This method is likely to be removed at some point. Use qsize() >= n
Raymond Hettinger611eaf02009-03-06 23:55:28 +0000108 as a direct substitute, but be aware that either approach risks a race
109 condition where a queue can shrink before the result of full() or
110 qsize() can be used.
111
112 """
Raymond Hettinger75404272012-01-07 15:32:52 -0800113 with self.mutex:
114 return 0 < self.maxsize <= self._qsize()
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000115
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000116 def put(self, item, block=True, timeout=None):
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000117 """Put an item into the queue.
118
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000119 If optional args 'block' is true and 'timeout' is None (the default),
120 block if necessary until a free slot is available. If 'timeout' is
121 a positive number, it blocks at most 'timeout' seconds and raises
122 the Full exception if no free slot was available within that time.
123 Otherwise ('block' is false), put an item on the queue if a free slot
124 is immediately available, else raise the Full exception ('timeout'
125 is ignored in that case).
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000126 """
Raymond Hettinger75404272012-01-07 15:32:52 -0800127 with self.not_full:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000128 if self.maxsize > 0:
129 if not block:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000130 if self._qsize() >= self.maxsize:
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000131 raise Full
132 elif timeout is None:
Raymond Hettinger189316a2010-10-31 17:57:52 +0000133 while self._qsize() >= self.maxsize:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000134 self.not_full.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000135 elif timeout < 0:
Tim Peters5af0e412004-07-12 00:45:14 +0000136 raise ValueError("'timeout' must be a positive number")
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000137 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000138 endtime = time() + timeout
Raymond Hettinger189316a2010-10-31 17:57:52 +0000139 while self._qsize() >= self.maxsize:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000140 remaining = endtime - time()
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000141 if remaining <= 0.0:
142 raise Full
143 self.not_full.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000144 self._put(item)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000145 self.unfinished_tasks += 1
Tim Peters5af0e412004-07-12 00:45:14 +0000146 self.not_empty.notify()
Barry Warsaw3d96d521997-11-20 19:56:38 +0000147
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000148 def put_nowait(self, item):
149 """Put an item into the queue without blocking.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000150
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000151 Only enqueue the item if a free slot is immediately available.
152 Otherwise raise the Full exception.
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000153 """
Tim Peters71ed2202004-07-12 01:20:32 +0000154 return self.put(item, False)
Barry Warsaw3d96d521997-11-20 19:56:38 +0000155
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000156 def get(self, block=True, timeout=None):
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000157 """Remove and return an item from the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +0000158
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000159 If optional args 'block' is true and 'timeout' is None (the default),
160 block if necessary until an item is available. If 'timeout' is
161 a positive number, it blocks at most 'timeout' seconds and raises
162 the Empty exception if no item was available within that time.
163 Otherwise ('block' is false), return an item if one is immediately
164 available, else raise the Empty exception ('timeout' is ignored
165 in that case).
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000166 """
Raymond Hettinger75404272012-01-07 15:32:52 -0800167 with self.not_empty:
Tim Peters71ed2202004-07-12 01:20:32 +0000168 if not block:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000169 if not self._qsize():
Tim Peters71ed2202004-07-12 01:20:32 +0000170 raise Empty
171 elif timeout is None:
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000172 while not self._qsize():
Tim Peters5af0e412004-07-12 00:45:14 +0000173 self.not_empty.wait()
Raymond Hettingerae138cb2008-01-15 20:42:00 +0000174 elif timeout < 0:
175 raise ValueError("'timeout' must be a positive number")
Tim Peters5af0e412004-07-12 00:45:14 +0000176 else:
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000177 endtime = time() + timeout
Raymond Hettingerda3caed2008-01-14 21:39:24 +0000178 while not self._qsize():
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000179 remaining = endtime - time()
Tim Peters71ed2202004-07-12 01:20:32 +0000180 if remaining <= 0.0:
Tim Peters5af0e412004-07-12 00:45:14 +0000181 raise Empty
182 self.not_empty.wait(remaining)
Mark Hammond3b959db2002-04-19 00:11:32 +0000183 item = self._get()
Tim Peters5af0e412004-07-12 00:45:14 +0000184 self.not_full.notify()
185 return item
Guido van Rossum9022fce1992-08-25 12:30:44 +0000186
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000187 def get_nowait(self):
188 """Remove and return an item from the queue without blocking.
Guido van Rossum9022fce1992-08-25 12:30:44 +0000189
Martin v. Löwis77ac4292002-10-15 15:11:13 +0000190 Only get an item if one is immediately available. Otherwise
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000191 raise the Empty exception.
192 """
Tim Peters71ed2202004-07-12 01:20:32 +0000193 return self.get(False)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000194
Barry Warsaw3d96d521997-11-20 19:56:38 +0000195 # Override these methods to implement other queue organizations
196 # (e.g. stack or priority queue).
197 # These will only be called with appropriate locks held
Guido van Rossum9022fce1992-08-25 12:30:44 +0000198
Barry Warsaw3d96d521997-11-20 19:56:38 +0000199 # Initialize the queue representation
200 def _init(self, maxsize):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000201 self.queue = deque()
Guido van Rossum9022fce1992-08-25 12:30:44 +0000202
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000203 def _qsize(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000204 return len(self.queue)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000205
Barry Warsaw3d96d521997-11-20 19:56:38 +0000206 # Put a new item in the queue
207 def _put(self, item):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000208 self.queue.append(item)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000209
Barry Warsaw3d96d521997-11-20 19:56:38 +0000210 # Get an item from the queue
211 def _get(self):
Raymond Hettinger756b3f32004-01-29 06:37:52 +0000212 return self.queue.popleft()
Raymond Hettinger35641462008-01-17 00:13:27 +0000213
214
215class PriorityQueue(Queue):
216 '''Variant of Queue that retrieves open entries in priority order (lowest first).
217
218 Entries are typically tuples of the form: (priority number, data).
219 '''
220
221 def _init(self, maxsize):
222 self.queue = []
223
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000224 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000225 return len(self.queue)
226
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000227 def _put(self, item):
Raymond Hettinger35641462008-01-17 00:13:27 +0000228 heappush(self.queue, item)
229
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000230 def _get(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000231 return heappop(self.queue)
232
233
234class LifoQueue(Queue):
235 '''Variant of Queue that retrieves most recently added entries first.'''
236
237 def _init(self, maxsize):
238 self.queue = []
239
Raymond Hettinger143f51a2012-01-09 05:32:01 +0000240 def _qsize(self):
Raymond Hettinger35641462008-01-17 00:13:27 +0000241 return len(self.queue)
242
243 def _put(self, item):
244 self.queue.append(item)
245
246 def _get(self):
247 return self.queue.pop()