Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 1 | """Synchronization metaclass. |
| 2 | |
| 3 | This metaclass makes it possible to declare synchronized methods. |
| 4 | |
| 5 | """ |
| 6 | |
| 7 | import thread |
| 8 | |
| 9 | # First we need to define a reentrant lock. |
| 10 | # This is generally useful and should probably be in a standard Python |
| 11 | # library module. For now, we in-line it. |
| 12 | |
| 13 | class Lock: |
| 14 | |
| 15 | """Reentrant lock. |
| 16 | |
| 17 | This is a mutex-like object which can be acquired by the same |
| 18 | thread more than once. It keeps a reference count of the number |
| 19 | of times it has been acquired by the same thread. Each acquire() |
| 20 | call must be matched by a release() call and only the last |
| 21 | release() call actually releases the lock for acquisition by |
| 22 | another thread. |
| 23 | |
| 24 | The implementation uses two locks internally: |
| 25 | |
| 26 | __mutex is a short term lock used to protect the instance variables |
| 27 | __wait is the lock for which other threads wait |
| 28 | |
| 29 | A thread intending to acquire both locks should acquire __wait |
| 30 | first. |
| 31 | |
| 32 | The implementation uses two other instance variables, protected by |
| 33 | locking __mutex: |
| 34 | |
| 35 | __tid is the thread ID of the thread that currently has the lock |
| 36 | __count is the number of times the current thread has acquired it |
| 37 | |
| 38 | When the lock is released, __tid is None and __count is zero. |
| 39 | |
| 40 | """ |
| 41 | |
| 42 | def __init__(self): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 43 | """Constructor. Initialize all instance variables.""" |
| 44 | self.__mutex = thread.allocate_lock() |
| 45 | self.__wait = thread.allocate_lock() |
| 46 | self.__tid = None |
| 47 | self.__count = 0 |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 48 | |
| 49 | def acquire(self, flag=1): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 50 | """Acquire the lock. |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 51 | |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 52 | If the optional flag argument is false, returns immediately |
| 53 | when it cannot acquire the __wait lock without blocking (it |
| 54 | may still block for a little while in order to acquire the |
| 55 | __mutex lock). |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 56 | |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 57 | The return value is only relevant when the flag argument is |
| 58 | false; it is 1 if the lock is acquired, 0 if not. |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 59 | |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 60 | """ |
| 61 | self.__mutex.acquire() |
| 62 | try: |
| 63 | if self.__tid == thread.get_ident(): |
| 64 | self.__count = self.__count + 1 |
| 65 | return 1 |
| 66 | finally: |
| 67 | self.__mutex.release() |
| 68 | locked = self.__wait.acquire(flag) |
| 69 | if not flag and not locked: |
| 70 | return 0 |
| 71 | try: |
| 72 | self.__mutex.acquire() |
| 73 | assert self.__tid == None |
| 74 | assert self.__count == 0 |
| 75 | self.__tid = thread.get_ident() |
| 76 | self.__count = 1 |
| 77 | return 1 |
| 78 | finally: |
| 79 | self.__mutex.release() |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 80 | |
| 81 | def release(self): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 82 | """Release the lock. |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 83 | |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 84 | If this thread doesn't currently have the lock, an assertion |
| 85 | error is raised. |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 86 | |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 87 | Only allow another thread to acquire the lock when the count |
| 88 | reaches zero after decrementing it. |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 89 | |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 90 | """ |
| 91 | self.__mutex.acquire() |
| 92 | try: |
| 93 | assert self.__tid == thread.get_ident() |
| 94 | assert self.__count > 0 |
| 95 | self.__count = self.__count - 1 |
| 96 | if self.__count == 0: |
| 97 | self.__tid = None |
| 98 | self.__wait.release() |
| 99 | finally: |
| 100 | self.__mutex.release() |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 101 | |
| 102 | |
| 103 | def _testLock(): |
| 104 | |
| 105 | done = [] |
| 106 | |
| 107 | def f2(lock, done=done): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 108 | lock.acquire() |
| 109 | print "f2 running in thread %d\n" % thread.get_ident(), |
| 110 | lock.release() |
| 111 | done.append(1) |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 112 | |
| 113 | def f1(lock, f2=f2, done=done): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 114 | lock.acquire() |
| 115 | print "f1 running in thread %d\n" % thread.get_ident(), |
| 116 | try: |
| 117 | f2(lock) |
| 118 | finally: |
| 119 | lock.release() |
| 120 | done.append(1) |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 121 | |
| 122 | lock = Lock() |
| 123 | lock.acquire() |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 124 | f1(lock) # Adds 2 to done |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 125 | lock.release() |
| 126 | |
| 127 | lock.acquire() |
Tim Peters | e6ddc8b | 2004-07-18 05:56:09 +0000 | [diff] [blame] | 128 | |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 129 | thread.start_new_thread(f1, (lock,)) # Adds 2 |
| 130 | thread.start_new_thread(f1, (lock, f1)) # Adds 3 |
| 131 | thread.start_new_thread(f2, (lock,)) # Adds 1 |
| 132 | thread.start_new_thread(f2, (lock,)) # Adds 1 |
| 133 | |
| 134 | lock.release() |
| 135 | import time |
| 136 | while len(done) < 9: |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 137 | print len(done) |
| 138 | time.sleep(0.001) |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 139 | print len(done) |
| 140 | |
| 141 | |
| 142 | # Now, the Locking metaclass is a piece of cake. |
| 143 | # As an example feature, methods whose name begins with exactly one |
| 144 | # underscore are not synchronized. |
| 145 | |
| 146 | from Meta import MetaClass, MetaHelper, MetaMethodWrapper |
| 147 | |
| 148 | class LockingMethodWrapper(MetaMethodWrapper): |
| 149 | def __call__(self, *args, **kw): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 150 | if self.__name__[:1] == '_' and self.__name__[1:] != '_': |
| 151 | return apply(self.func, (self.inst,) + args, kw) |
| 152 | self.inst.__lock__.acquire() |
| 153 | try: |
| 154 | return apply(self.func, (self.inst,) + args, kw) |
| 155 | finally: |
| 156 | self.inst.__lock__.release() |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 157 | |
| 158 | class LockingHelper(MetaHelper): |
| 159 | __methodwrapper__ = LockingMethodWrapper |
| 160 | def __helperinit__(self, formalclass): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 161 | MetaHelper.__helperinit__(self, formalclass) |
| 162 | self.__lock__ = Lock() |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 163 | |
| 164 | class LockingMetaClass(MetaClass): |
| 165 | __helper__ = LockingHelper |
| 166 | |
| 167 | Locking = LockingMetaClass('Locking', (), {}) |
| 168 | |
| 169 | def _test(): |
| 170 | # For kicks, take away the Locking base class and see it die |
| 171 | class Buffer(Locking): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 172 | def __init__(self, initialsize): |
| 173 | assert initialsize > 0 |
| 174 | self.size = initialsize |
| 175 | self.buffer = [None]*self.size |
| 176 | self.first = self.last = 0 |
| 177 | def put(self, item): |
| 178 | # Do we need to grow the buffer? |
| 179 | if (self.last+1) % self.size != self.first: |
| 180 | # Insert the new item |
| 181 | self.buffer[self.last] = item |
| 182 | self.last = (self.last+1) % self.size |
| 183 | return |
| 184 | # Double the buffer size |
| 185 | # First normalize it so that first==0 and last==size-1 |
| 186 | print "buffer =", self.buffer |
| 187 | print "first = %d, last = %d, size = %d" % ( |
| 188 | self.first, self.last, self.size) |
| 189 | if self.first <= self.last: |
| 190 | temp = self.buffer[self.first:self.last] |
| 191 | else: |
| 192 | temp = self.buffer[self.first:] + self.buffer[:self.last] |
| 193 | print "temp =", temp |
| 194 | self.buffer = temp + [None]*(self.size+1) |
| 195 | self.first = 0 |
| 196 | self.last = self.size-1 |
| 197 | self.size = self.size*2 |
| 198 | print "Buffer size doubled to", self.size |
| 199 | print "new buffer =", self.buffer |
| 200 | print "first = %d, last = %d, size = %d" % ( |
| 201 | self.first, self.last, self.size) |
| 202 | self.put(item) # Recursive call to test the locking |
| 203 | def get(self): |
| 204 | # Is the buffer empty? |
| 205 | if self.first == self.last: |
| 206 | raise EOFError # Avoid defining a new exception |
| 207 | item = self.buffer[self.first] |
| 208 | self.first = (self.first+1) % self.size |
| 209 | return item |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 210 | |
| 211 | def producer(buffer, wait, n=1000): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 212 | import time |
| 213 | i = 0 |
| 214 | while i < n: |
| 215 | print "put", i |
| 216 | buffer.put(i) |
| 217 | i = i+1 |
| 218 | print "Producer: done producing", n, "items" |
| 219 | wait.release() |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 220 | |
| 221 | def consumer(buffer, wait, n=1000): |
Guido van Rossum | 4117e54 | 1998-09-14 16:44:15 +0000 | [diff] [blame] | 222 | import time |
| 223 | i = 0 |
| 224 | tout = 0.001 |
| 225 | while i < n: |
| 226 | try: |
| 227 | x = buffer.get() |
| 228 | if x != i: |
| 229 | raise AssertionError, \ |
| 230 | "get() returned %s, expected %s" % (x, i) |
| 231 | print "got", i |
| 232 | i = i+1 |
| 233 | tout = 0.001 |
| 234 | except EOFError: |
| 235 | time.sleep(tout) |
| 236 | tout = tout*2 |
| 237 | print "Consumer: done consuming", n, "items" |
| 238 | wait.release() |
Guido van Rossum | 0cdb887 | 1997-08-26 00:08:51 +0000 | [diff] [blame] | 239 | |
| 240 | pwait = thread.allocate_lock() |
| 241 | pwait.acquire() |
| 242 | cwait = thread.allocate_lock() |
| 243 | cwait.acquire() |
| 244 | buffer = Buffer(1) |
| 245 | n = 1000 |
| 246 | thread.start_new_thread(consumer, (buffer, cwait, n)) |
| 247 | thread.start_new_thread(producer, (buffer, pwait, n)) |
| 248 | pwait.acquire() |
| 249 | print "Producer done" |
| 250 | cwait.acquire() |
| 251 | print "All done" |
| 252 | print "buffer size ==", len(buffer.buffer) |
| 253 | |
| 254 | if __name__ == '__main__': |
| 255 | _testLock() |
| 256 | _test() |