blob: 0d69777b447e0434feb4a6faae4544cd0bc86f3d [file] [log] [blame]
Guido van Rossum9022fce1992-08-25 12:30:44 +00001# A multi-producer, multi-consumer queue.
2
3Empty = 'Queue.Empty' # Exception raised by get_nowait()
4
5class Queue:
6
7 # Initialize a queue object with a given maximum size
8 # (If maxsize is <= 0, the maximum size is infinite)
Guido van Rossum7bc817d1993-12-17 15:25:27 +00009 def __init__(self, maxsize):
Guido van Rossum9022fce1992-08-25 12:30:44 +000010 self._init(maxsize)
11 self.mutex = thread.allocate_lock()
12 self.esema = thread.allocate_lock()
13 self.esema.acquire_lock()
14 self.fsema = thread.allocate_lock()
Guido van Rossum9022fce1992-08-25 12:30:44 +000015
16 # Get an approximation of the queue size (not reliable!)
17 def qsize(self):
18 self.mutex.acquire_lock()
19 n = self._qsize()
20 self.mutex.release_lock()
21 return n
22
23 # Check if the queue is empty (not reliable!)
24 def empty(self):
25 self.mutex.acquire_lock()
26 n = self._empty()
27 self.mutex.release_lock()
28 return n
29
30 # Check if the queue is full (not reliable!)
31 def full(self):
32 self.mutex.acquire_lock()
33 n = self._full()
34 self.mutex.release_lock()
35 return n
36
37 # Put a new item into the queue
38 def put(self, item):
39 self.fsema.acquire_lock()
40 self.mutex.acquire_lock()
41 was_empty = self._empty()
42 self._put(item)
43 if was_empty:
44 self.esema.release_lock()
45 if not self._full():
46 self.fsema.release_lock()
47 self.mutex.release_lock()
48
49 # Get an item from the queue,
50 # blocking if necessary until one is available
51 def get(self):
52 self.esema.acquire_lock()
53 self.mutex.acquire_lock()
54 was_full = self._full()
55 item = self._get()
56 if was_full:
57 self.fsema.release_lock()
58 if not self._empty():
59 self.esema.release_lock()
60 self.mutex.release_lock()
61 return item
62
63 # Get an item from the queue if one is immediately available,
64 # raise Empty if the queue is empty or temporarily unavailable
65 def get_nowait(self):
66 locked = self.esema.acquire_lock(0)
67 self.mutex.acquire_lock()
68 if self._empty():
69 # The queue is empyt -- we can't have esema
70 self.mutex.release_lock()
71 raise Empty
72 if not locked:
73 locked = self.esema.acquire_lock(0)
74 if not locked:
75 # Somebody else has esema
76 # but we have mutex --
77 # go out of their way
78 self.mutex.release_lock()
79 raise Empty
80 was_full = self._full()
81 item = self._get()
82 if was_full:
83 self.fsema.release_lock()
84 if not self._empty():
85 self.esema.release_lock()
86 self.mutex.release_lock()
87 return item
88
89 # XXX Need to define put_nowait() as well.
90
91
92 # Override these methods to implement other queue organizations
93 # (e.g. stack or priority queue).
94 # These will only be called with appropriate locks held
95
96 # Initialize the queue representation
97 def _init(self, maxsize):
98 self.maxsize = maxsize
99 self.queue = []
100
101 def _qsize(self):
102 return len(self.queue)
103
104 # Check wheter the queue is empty
105 def _empty(self):
106 return not self.queue
107
108 # Check whether the queue is full
109 def _full(self):
110 return self.maxsize > 0 and len(self.queue) == self.maxsize
111
112 # Put a new item in the queue
113 def _put(self, item):
114 self.queue.append(item)
115
116 # Get an item from the queue
117 def _get(self):
118 item = self.queue[0]
119 del self.queue[0]
120 return item