blob: cd035da14cf419daa3376b1c03a0180ef8691b08 [file] [log] [blame]
Guido van Rossum4acc25b2000-02-02 15:10:15 +00001"""A multi-producer, multi-consumer queue."""
Guido van Rossum9022fce1992-08-25 12:30:44 +00002
Tim Petersb8c94172001-01-15 22:53:46 +00003class Empty(Exception):
4 "Exception raised by Queue.get(block=0)/get_nowait()."
5 pass
6
7class Full(Exception):
8 "Exception raised by Queue.put(block=0)/put_nowait()."
9 pass
Guido van Rossum9022fce1992-08-25 12:30:44 +000010
11class Queue:
Guido van Rossuma41c6911999-09-09 14:54:28 +000012 def __init__(self, maxsize=0):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000013 """Initialize a queue object with a given maximum size.
Guido van Rossum9022fce1992-08-25 12:30:44 +000014
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000015 If maxsize is <= 0, the queue size is infinite.
16 """
17 import thread
18 self._init(maxsize)
19 self.mutex = thread.allocate_lock()
20 self.esema = thread.allocate_lock()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000021 self.esema.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000022 self.fsema = thread.allocate_lock()
Guido van Rossum9022fce1992-08-25 12:30:44 +000023
Barry Warsaw3d96d521997-11-20 19:56:38 +000024 def qsize(self):
Guido van Rossum9e1721f1999-02-08 18:34:01 +000025 """Return the approximate size of the queue (not reliable!)."""
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000026 self.mutex.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000027 n = self._qsize()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000028 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000029 return n
Barry Warsaw3d96d521997-11-20 19:56:38 +000030
31 def empty(self):
Guido van Rossum9e1721f1999-02-08 18:34:01 +000032 """Return 1 if the queue is empty, 0 otherwise (not reliable!)."""
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000033 self.mutex.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000034 n = self._empty()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000035 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000036 return n
Barry Warsaw3d96d521997-11-20 19:56:38 +000037
38 def full(self):
Guido van Rossum9e1721f1999-02-08 18:34:01 +000039 """Return 1 if the queue is full, 0 otherwise (not reliable!)."""
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000040 self.mutex.acquire()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000041 n = self._full()
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000042 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000043 return n
Barry Warsaw3d96d521997-11-20 19:56:38 +000044
Guido van Rossum9e1721f1999-02-08 18:34:01 +000045 def put(self, item, block=1):
Guido van Rossumc09e6b11998-04-09 14:25:32 +000046 """Put an item into the queue.
47
Guido van Rossum9e1721f1999-02-08 18:34:01 +000048 If optional arg 'block' is 1 (the default), block if
49 necessary until a free slot is available. Otherwise (block
50 is 0), put an item on the queue if a free slot is immediately
51 available, else raise the Full exception.
52 """
53 if block:
54 self.fsema.acquire()
55 elif not self.fsema.acquire(0):
56 raise Full
Guido van Rossum7e6d18c1998-04-29 14:29:32 +000057 self.mutex.acquire()
Mark Hammond3b959db2002-04-19 00:11:32 +000058 release_fsema = True
59 try:
60 was_empty = self._empty()
61 self._put(item)
62 # If we fail before here, the empty state has
63 # not changed, so we can skip the release of esema
64 if was_empty:
65 self.esema.release()
66 # If we fail before here, the queue can not be full, so
67 # release_full_sema remains True
68 release_fsema = not self._full()
69 finally:
70 # Catching system level exceptions here (RecursionDepth,
71 # OutOfMemory, etc) - so do as little as possible in terms
72 # of Python calls.
73 if release_fsema:
74 self.fsema.release()
75 self.mutex.release()
Barry Warsaw3d96d521997-11-20 19:56:38 +000076
Guido van Rossum9e1721f1999-02-08 18:34:01 +000077 def put_nowait(self, item):
78 """Put an item into the queue without blocking.
Guido van Rossumc09e6b11998-04-09 14:25:32 +000079
Guido van Rossum9e1721f1999-02-08 18:34:01 +000080 Only enqueue the item if a free slot is immediately available.
81 Otherwise raise the Full exception.
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000082 """
Guido van Rossum9e1721f1999-02-08 18:34:01 +000083 return self.put(item, 0)
Barry Warsaw3d96d521997-11-20 19:56:38 +000084
Guido van Rossum9e1721f1999-02-08 18:34:01 +000085 def get(self, block=1):
86 """Remove and return an item from the queue.
Guido van Rossumc09e6b11998-04-09 14:25:32 +000087
Guido van Rossum9e1721f1999-02-08 18:34:01 +000088 If optional arg 'block' is 1 (the default), block if
89 necessary until an item is available. Otherwise (block is 0),
90 return an item if one is immediately available, else raise the
91 Empty exception.
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000092 """
Guido van Rossum9e1721f1999-02-08 18:34:01 +000093 if block:
94 self.esema.acquire()
95 elif not self.esema.acquire(0):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +000096 raise Empty
Guido van Rossum9e1721f1999-02-08 18:34:01 +000097 self.mutex.acquire()
Mark Hammond3b959db2002-04-19 00:11:32 +000098 release_esema = True
99 try:
100 was_full = self._full()
101 item = self._get()
102 # If we fail before here, the full state has
103 # not changed, so we can skip the release of fsema
104 if was_full:
105 self.fsema.release()
106 # Failure means empty state also unchanged - release_esema
107 # remains True.
108 release_esema = not self._empty()
109 finally:
110 if release_esema:
111 self.esema.release()
112 self.mutex.release()
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000113 return item
Guido van Rossum9022fce1992-08-25 12:30:44 +0000114
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000115 def get_nowait(self):
116 """Remove and return an item from the queue without blocking.
Guido van Rossum9022fce1992-08-25 12:30:44 +0000117
Guido van Rossum9e1721f1999-02-08 18:34:01 +0000118 Only get an item if one is immediately available. Otherwise
119 raise the Empty exception.
120 """
121 return self.get(0)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000122
Barry Warsaw3d96d521997-11-20 19:56:38 +0000123 # Override these methods to implement other queue organizations
124 # (e.g. stack or priority queue).
125 # These will only be called with appropriate locks held
Guido van Rossum9022fce1992-08-25 12:30:44 +0000126
Barry Warsaw3d96d521997-11-20 19:56:38 +0000127 # Initialize the queue representation
128 def _init(self, maxsize):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000129 self.maxsize = maxsize
130 self.queue = []
Guido van Rossum9022fce1992-08-25 12:30:44 +0000131
Barry Warsaw3d96d521997-11-20 19:56:38 +0000132 def _qsize(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000133 return len(self.queue)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000134
Jeremy Hyltona05e2932000-06-28 14:48:01 +0000135 # Check whether the queue is empty
Barry Warsaw3d96d521997-11-20 19:56:38 +0000136 def _empty(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000137 return not self.queue
Guido van Rossum9022fce1992-08-25 12:30:44 +0000138
Barry Warsaw3d96d521997-11-20 19:56:38 +0000139 # Check whether the queue is full
140 def _full(self):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000141 return self.maxsize > 0 and len(self.queue) == self.maxsize
Guido van Rossum9022fce1992-08-25 12:30:44 +0000142
Barry Warsaw3d96d521997-11-20 19:56:38 +0000143 # Put a new item in the queue
144 def _put(self, item):
Guido van Rossum45e2fbc1998-03-26 21:13:24 +0000145 self.queue.append(item)
Guido van Rossum9022fce1992-08-25 12:30:44 +0000146
Barry Warsaw3d96d521997-11-20 19:56:38 +0000147 # Get an item from the queue
148 def _get(self):
Raymond Hettinger46ac8eb2002-06-30 03:39:14 +0000149 return self.queue.pop(0)