blob: 1fb91602df74c95dcf394335c5d3debb24f970e3 [file] [log] [blame]
Guido van Rossum0cdb8871997-08-26 00:08:51 +00001"""Synchronization metaclass.
2
3This metaclass makes it possible to declare synchronized methods.
4
5"""
6
7import thread
8
9# First we need to define a reentrant lock.
10# This is generally useful and should probably be in a standard Python
11# library module. For now, we in-line it.
12
13class Lock:
14
15 """Reentrant lock.
16
17 This is a mutex-like object which can be acquired by the same
18 thread more than once. It keeps a reference count of the number
19 of times it has been acquired by the same thread. Each acquire()
20 call must be matched by a release() call and only the last
21 release() call actually releases the lock for acquisition by
22 another thread.
23
24 The implementation uses two locks internally:
25
26 __mutex is a short term lock used to protect the instance variables
27 __wait is the lock for which other threads wait
28
29 A thread intending to acquire both locks should acquire __wait
30 first.
31
32 The implementation uses two other instance variables, protected by
33 locking __mutex:
34
35 __tid is the thread ID of the thread that currently has the lock
36 __count is the number of times the current thread has acquired it
37
38 When the lock is released, __tid is None and __count is zero.
39
40 """
41
42 def __init__(self):
43 """Constructor. Initialize all instance variables."""
44 self.__mutex = thread.allocate_lock()
45 self.__wait = thread.allocate_lock()
46 self.__tid = None
47 self.__count = 0
48
49 def acquire(self, flag=1):
50 """Acquire the lock.
51
52 If the optional flag argument is false, returns immediately
53 when it cannot acquire the __wait lock without blocking (it
54 may still block for a little while in order to acquire the
55 __mutex lock).
56
57 The return value is only relevant when the flag argument is
58 false; it is 1 if the lock is acquired, 0 if not.
59
60 """
61 self.__mutex.acquire()
62 try:
63 if self.__tid == thread.get_ident():
64 self.__count = self.__count + 1
65 return 1
66 finally:
67 self.__mutex.release()
68 locked = self.__wait.acquire(flag)
69 if not flag and not locked:
70 return 0
71 try:
72 self.__mutex.acquire()
73 assert self.__tid == None
74 assert self.__count == 0
75 self.__tid = thread.get_ident()
76 self.__count = 1
77 return 1
78 finally:
79 self.__mutex.release()
80
81 def release(self):
82 """Release the lock.
83
84 If this thread doesn't currently have the lock, an assertion
85 error is raised.
86
87 Only allow another thread to acquire the lock when the count
88 reaches zero after decrementing it.
89
90 """
91 self.__mutex.acquire()
92 try:
93 assert self.__tid == thread.get_ident()
94 assert self.__count > 0
95 self.__count = self.__count - 1
96 if self.__count == 0:
97 self.__tid = None
98 self.__wait.release()
99 finally:
100 self.__mutex.release()
101
102
103def _testLock():
104
105 done = []
106
107 def f2(lock, done=done):
108 lock.acquire()
109 print "f2 running in thread %d\n" % thread.get_ident(),
110 lock.release()
111 done.append(1)
112
113 def f1(lock, f2=f2, done=done):
114 lock.acquire()
115 print "f1 running in thread %d\n" % thread.get_ident(),
116 try:
117 f2(lock)
118 finally:
119 lock.release()
120 done.append(1)
121
122 lock = Lock()
123 lock.acquire()
124 f1(lock) # Adds 2 to done
125 lock.release()
126
127 lock.acquire()
128
129 thread.start_new_thread(f1, (lock,)) # Adds 2
130 thread.start_new_thread(f1, (lock, f1)) # Adds 3
131 thread.start_new_thread(f2, (lock,)) # Adds 1
132 thread.start_new_thread(f2, (lock,)) # Adds 1
133
134 lock.release()
135 import time
136 while len(done) < 9:
137 print len(done)
138 time.sleep(0.001)
139 print len(done)
140
141
142# Now, the Locking metaclass is a piece of cake.
143# As an example feature, methods whose name begins with exactly one
144# underscore are not synchronized.
145
146from Meta import MetaClass, MetaHelper, MetaMethodWrapper
147
148class LockingMethodWrapper(MetaMethodWrapper):
149 def __call__(self, *args, **kw):
150 if self.__name__[:1] == '_' and self.__name__[1:] != '_':
151 return apply(self.func, (self.inst,) + args, kw)
152 self.inst.__lock__.acquire()
153 try:
154 return apply(self.func, (self.inst,) + args, kw)
155 finally:
156 self.inst.__lock__.release()
157
158class LockingHelper(MetaHelper):
159 __methodwrapper__ = LockingMethodWrapper
160 def __helperinit__(self, formalclass):
161 MetaHelper.__helperinit__(self, formalclass)
162 self.__lock__ = Lock()
163
164class LockingMetaClass(MetaClass):
165 __helper__ = LockingHelper
166
167Locking = LockingMetaClass('Locking', (), {})
168
169def _test():
170 # For kicks, take away the Locking base class and see it die
171 class Buffer(Locking):
172 def __init__(self, initialsize):
173 assert initialsize > 0
174 self.size = initialsize
175 self.buffer = [None]*self.size
176 self.first = self.last = 0
177 def put(self, item):
178 # Do we need to grow the buffer?
179 if (self.last+1) % self.size != self.first:
180 # Insert the new item
181 self.buffer[self.last] = item
182 self.last = (self.last+1) % self.size
183 return
184 # Double the buffer size
185 # First normalize it so that first==0 and last==size-1
186 print "buffer =", self.buffer
187 print "first = %d, last = %d, size = %d" % (
188 self.first, self.last, self.size)
189 if self.first <= self.last:
190 temp = self.buffer[self.first:self.last]
191 else:
192 temp = self.buffer[self.first:] + self.buffer[:self.last]
193 print "temp =", temp
194 self.buffer = temp + [None]*(self.size+1)
195 self.first = 0
196 self.last = self.size-1
197 self.size = self.size*2
198 print "Buffer size doubled to", self.size
199 print "new buffer =", self.buffer
200 print "first = %d, last = %d, size = %d" % (
201 self.first, self.last, self.size)
202 self.put(item) # Recursive call to test the locking
203 def get(self):
204 # Is the buffer empty?
205 if self.first == self.last:
206 raise EOFError # Avoid defining a new exception
207 item = self.buffer[self.first]
208 self.first = (self.first+1) % self.size
209 return item
210
211 def producer(buffer, wait, n=1000):
212 import time
213 i = 0
214 while i < n:
215 print "put", i
216 buffer.put(i)
217 i = i+1
218 print "Producer: done producing", n, "items"
219 wait.release()
220
221 def consumer(buffer, wait, n=1000):
222 import time
223 i = 0
224 tout = 0.001
225 while i < n:
226 try:
227 x = buffer.get()
228 if x != i:
229 raise AssertionError, \
230 "get() returned %s, expected %s" % (x, i)
231 print "got", i
232 i = i+1
233 tout = 0.001
234 except EOFError:
235 time.sleep(tout)
236 tout = tout*2
237 print "Consumer: done consuming", n, "items"
238 wait.release()
239
240 pwait = thread.allocate_lock()
241 pwait.acquire()
242 cwait = thread.allocate_lock()
243 cwait.acquire()
244 buffer = Buffer(1)
245 n = 1000
246 thread.start_new_thread(consumer, (buffer, cwait, n))
247 thread.start_new_thread(producer, (buffer, pwait, n))
248 pwait.acquire()
249 print "Producer done"
250 cwait.acquire()
251 print "All done"
252 print "buffer size ==", len(buffer.buffer)
253
254if __name__ == '__main__':
255 _testLock()
256 _test()