blob: 1ebd7b62e64b0b2b5d3980477180b84f87e2798b [file] [log] [blame]
Benjamin Peterson7f03ea72008-06-13 19:20:48 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9__all__ = [
10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
11 ]
12
13import threading
14import os
15import sys
16
17from time import time as _time, sleep as _sleep
18
19import _multiprocessing
20from multiprocessing.process import current_process
21from multiprocessing.util import Finalize, register_after_fork, debug
22from multiprocessing.forking import assert_spawning, Popen
23
24#
25# Constants
26#
27
28RECURSIVE_MUTEX, SEMAPHORE = range(2)
29SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
30
31#
32# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
33#
34
35class SemLock(object):
36
37 def __init__(self, kind, value, maxvalue):
38 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
39 debug('created semlock with handle %s' % sl.handle)
40 self._make_methods()
41
42 if sys.platform != 'win32':
43 def _after_fork(obj):
44 obj._semlock._after_fork()
45 register_after_fork(self, _after_fork)
46
47 def _make_methods(self):
48 self.acquire = self._semlock.acquire
49 self.release = self._semlock.release
50 self.__enter__ = self._semlock.__enter__
51 self.__exit__ = self._semlock.__exit__
52
53 def __getstate__(self):
54 assert_spawning(self)
55 sl = self._semlock
56 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
57
58 def __setstate__(self, state):
59 self._semlock = _multiprocessing.SemLock._rebuild(*state)
60 debug('recreated blocker with handle %r' % state[0])
61 self._make_methods()
62
63#
64# Semaphore
65#
66
67class Semaphore(SemLock):
68
69 def __init__(self, value=1):
70 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
71
72 def get_value(self):
73 return self._semlock._get_value()
74
75 def __repr__(self):
76 try:
77 value = self._semlock._get_value()
78 except Exception:
79 value = 'unknown'
80 return '<Semaphore(value=%s)>' % value
81
82#
83# Bounded semaphore
84#
85
86class BoundedSemaphore(Semaphore):
87
88 def __init__(self, value=1):
89 SemLock.__init__(self, SEMAPHORE, value, value)
90
91 def __repr__(self):
92 try:
93 value = self._semlock._get_value()
94 except Exception:
95 value = 'unknown'
96 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
97 (value, self._semlock.maxvalue)
98
99#
100# Non-recursive lock
101#
102
103class Lock(SemLock):
104
105 def __init__(self):
106 SemLock.__init__(self, SEMAPHORE, 1, 1)
107
108 def __repr__(self):
109 try:
110 if self._semlock._is_mine():
111 name = current_process().get_name()
112 if threading.current_thread().get_name() != 'MainThread':
113 name += '|' + threading.current_thread().get_name()
114 elif self._semlock._get_value() == 1:
115 name = 'None'
116 elif self._semlock._count() > 0:
117 name = 'SomeOtherThread'
118 else:
119 name = 'SomeOtherProcess'
120 except Exception:
121 name = 'unknown'
122 return '<Lock(owner=%s)>' % name
123
124#
125# Recursive lock
126#
127
128class RLock(SemLock):
129
130 def __init__(self):
131 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
132
133 def __repr__(self):
134 try:
135 if self._semlock._is_mine():
136 name = current_process().get_name()
137 if threading.current_thread().get_name() != 'MainThread':
138 name += '|' + threading.current_thread().get_name()
139 count = self._semlock._count()
140 elif self._semlock._get_value() == 1:
141 name, count = 'None', 0
142 elif self._semlock._count() > 0:
143 name, count = 'SomeOtherThread', 'nonzero'
144 else:
145 name, count = 'SomeOtherProcess', 'nonzero'
146 except Exception:
147 name, count = 'unknown', 'unknown'
148 return '<RLock(%s, %s)>' % (name, count)
149
150#
151# Condition variable
152#
153
154class Condition(object):
155
156 def __init__(self, lock=None):
157 self._lock = lock or RLock()
158 self._sleeping_count = Semaphore(0)
159 self._woken_count = Semaphore(0)
160 self._wait_semaphore = Semaphore(0)
161 self._make_methods()
162
163 def __getstate__(self):
164 assert_spawning(self)
165 return (self._lock, self._sleeping_count,
166 self._woken_count, self._wait_semaphore)
167
168 def __setstate__(self, state):
169 (self._lock, self._sleeping_count,
170 self._woken_count, self._wait_semaphore) = state
171 self._make_methods()
172
173 def _make_methods(self):
174 self.acquire = self._lock.acquire
175 self.release = self._lock.release
176 self.__enter__ = self._lock.__enter__
177 self.__exit__ = self._lock.__exit__
178
179 def __repr__(self):
180 try:
181 num_waiters = (self._sleeping_count._semlock._get_value() -
182 self._woken_count._semlock._get_value())
183 except Exception:
184 num_waiters = 'unkown'
185 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
186
187 def wait(self, timeout=None):
188 assert self._lock._semlock._is_mine(), \
189 'must acquire() condition before using wait()'
190
191 # indicate that this thread is going to sleep
192 self._sleeping_count.release()
193
194 # release lock
195 count = self._lock._semlock._count()
196 for i in xrange(count):
197 self._lock.release()
198
199 try:
200 # wait for notification or timeout
201 self._wait_semaphore.acquire(True, timeout)
202 finally:
203 # indicate that this thread has woken
204 self._woken_count.release()
205
206 # reacquire lock
207 for i in xrange(count):
208 self._lock.acquire()
209
210 def notify(self):
211 assert self._lock._semlock._is_mine(), 'lock is not owned'
212 assert not self._wait_semaphore.acquire(False)
213
214 # to take account of timeouts since last notify() we subtract
215 # woken_count from sleeping_count and rezero woken_count
216 while self._woken_count.acquire(False):
217 res = self._sleeping_count.acquire(False)
218 assert res
219
220 if self._sleeping_count.acquire(False): # try grabbing a sleeper
221 self._wait_semaphore.release() # wake up one sleeper
222 self._woken_count.acquire() # wait for the sleeper to wake
223
224 # rezero _wait_semaphore in case a timeout just happened
225 self._wait_semaphore.acquire(False)
226
227 def notify_all(self):
228 assert self._lock._semlock._is_mine(), 'lock is not owned'
229 assert not self._wait_semaphore.acquire(False)
230
231 # to take account of timeouts since last notify*() we subtract
232 # woken_count from sleeping_count and rezero woken_count
233 while self._woken_count.acquire(False):
234 res = self._sleeping_count.acquire(False)
235 assert res
236
237 sleepers = 0
238 while self._sleeping_count.acquire(False):
239 self._wait_semaphore.release() # wake up one sleeper
240 sleepers += 1
241
242 if sleepers:
243 for i in xrange(sleepers):
244 self._woken_count.acquire() # wait for a sleeper to wake
245
246 # rezero wait_semaphore in case some timeouts just happened
247 while self._wait_semaphore.acquire(False):
248 pass
249
250#
251# Event
252#
253
254class Event(object):
255
256 def __init__(self):
257 self._cond = Condition(Lock())
258 self._flag = Semaphore(0)
259
260 def is_set(self):
261 self._cond.acquire()
262 try:
263 if self._flag.acquire(False):
264 self._flag.release()
265 return True
266 return False
267 finally:
268 self._cond.release()
269
270 def set(self):
271 self._cond.acquire()
272 try:
273 self._flag.acquire(False)
274 self._flag.release()
275 self._cond.notify_all()
276 finally:
277 self._cond.release()
278
279 def clear(self):
280 self._cond.acquire()
281 try:
282 self._flag.acquire(False)
283 finally:
284 self._cond.release()
285
286 def wait(self, timeout=None):
287 self._cond.acquire()
288 try:
289 if self._flag.acquire(False):
290 self._flag.release()
291 else:
292 self._cond.wait(timeout)
293 finally:
294 self._cond.release()