blob: 2c413a9b61ae1e48dd07898c87c5582f430525fa [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10__all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
14import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import sys
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import _multiprocessing
18from multiprocessing.process import current_process
Florent Xicluna04842a82011-11-11 20:05:50 +010019from multiprocessing.util import register_after_fork, debug
Benjamin Petersone711caf2008-06-11 16:44:04 +000020from multiprocessing.forking import assert_spawning, Popen
Charles-François Natalic8ce7152012-04-17 18:45:57 +020021from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023# Try to import the mp.synchronize module cleanly, if it fails
24# raise ImportError for platforms lacking a working sem_open implementation.
25# See issue 3770
26try:
27 from _multiprocessing import SemLock
28except (ImportError):
29 raise ImportError("This platform lacks a functioning sem_open" +
30 " implementation, therefore, the required" +
31 " synchronization primitives needed will not" +
32 " function, see issue 3770.")
33
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35# Constants
36#
37
38RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
39SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
40
41#
42# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
43#
44
45class SemLock(object):
46
47 def __init__(self, kind, value, maxvalue):
48 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
49 debug('created semlock with handle %s' % sl.handle)
50 self._make_methods()
51
52 if sys.platform != 'win32':
53 def _after_fork(obj):
54 obj._semlock._after_fork()
55 register_after_fork(self, _after_fork)
56
57 def _make_methods(self):
58 self.acquire = self._semlock.acquire
59 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000060
61 def __enter__(self):
62 return self._semlock.__enter__()
63
64 def __exit__(self, *args):
65 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
67 def __getstate__(self):
68 assert_spawning(self)
69 sl = self._semlock
70 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
71
72 def __setstate__(self, state):
73 self._semlock = _multiprocessing.SemLock._rebuild(*state)
74 debug('recreated blocker with handle %r' % state[0])
75 self._make_methods()
76
77#
78# Semaphore
79#
80
81class Semaphore(SemLock):
82
83 def __init__(self, value=1):
84 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
85
86 def get_value(self):
87 return self._semlock._get_value()
88
89 def __repr__(self):
90 try:
91 value = self._semlock._get_value()
92 except Exception:
93 value = 'unknown'
94 return '<Semaphore(value=%s)>' % value
95
96#
97# Bounded semaphore
98#
99
100class BoundedSemaphore(Semaphore):
101
102 def __init__(self, value=1):
103 SemLock.__init__(self, SEMAPHORE, value, value)
104
105 def __repr__(self):
106 try:
107 value = self._semlock._get_value()
108 except Exception:
109 value = 'unknown'
110 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
111 (value, self._semlock.maxvalue)
112
113#
114# Non-recursive lock
115#
116
117class Lock(SemLock):
118
119 def __init__(self):
120 SemLock.__init__(self, SEMAPHORE, 1, 1)
121
122 def __repr__(self):
123 try:
124 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000125 name = current_process().name
126 if threading.current_thread().name != 'MainThread':
127 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128 elif self._semlock._get_value() == 1:
129 name = 'None'
130 elif self._semlock._count() > 0:
131 name = 'SomeOtherThread'
132 else:
133 name = 'SomeOtherProcess'
134 except Exception:
135 name = 'unknown'
136 return '<Lock(owner=%s)>' % name
137
138#
139# Recursive lock
140#
141
142class RLock(SemLock):
143
144 def __init__(self):
145 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
146
147 def __repr__(self):
148 try:
149 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 name = current_process().name
151 if threading.current_thread().name != 'MainThread':
152 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 count = self._semlock._count()
154 elif self._semlock._get_value() == 1:
155 name, count = 'None', 0
156 elif self._semlock._count() > 0:
157 name, count = 'SomeOtherThread', 'nonzero'
158 else:
159 name, count = 'SomeOtherProcess', 'nonzero'
160 except Exception:
161 name, count = 'unknown', 'unknown'
162 return '<RLock(%s, %s)>' % (name, count)
163
164#
165# Condition variable
166#
167
168class Condition(object):
169
170 def __init__(self, lock=None):
171 self._lock = lock or RLock()
172 self._sleeping_count = Semaphore(0)
173 self._woken_count = Semaphore(0)
174 self._wait_semaphore = Semaphore(0)
175 self._make_methods()
176
177 def __getstate__(self):
178 assert_spawning(self)
179 return (self._lock, self._sleeping_count,
180 self._woken_count, self._wait_semaphore)
181
182 def __setstate__(self, state):
183 (self._lock, self._sleeping_count,
184 self._woken_count, self._wait_semaphore) = state
185 self._make_methods()
186
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000187 def __enter__(self):
188 return self._lock.__enter__()
189
190 def __exit__(self, *args):
191 return self._lock.__exit__(*args)
192
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 def _make_methods(self):
194 self.acquire = self._lock.acquire
195 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 def __repr__(self):
198 try:
199 num_waiters = (self._sleeping_count._semlock._get_value() -
200 self._woken_count._semlock._get_value())
201 except Exception:
202 num_waiters = 'unkown'
203 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
204
205 def wait(self, timeout=None):
206 assert self._lock._semlock._is_mine(), \
207 'must acquire() condition before using wait()'
208
209 # indicate that this thread is going to sleep
210 self._sleeping_count.release()
211
212 # release lock
213 count = self._lock._semlock._count()
214 for i in range(count):
215 self._lock.release()
216
217 try:
218 # wait for notification or timeout
Georg Brandl2fa4cc52010-10-28 13:01:06 +0000219 ret = self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 finally:
221 # indicate that this thread has woken
222 self._woken_count.release()
223
224 # reacquire lock
225 for i in range(count):
226 self._lock.acquire()
Georg Brandl2fa4cc52010-10-28 13:01:06 +0000227 return ret
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 def notify(self):
230 assert self._lock._semlock._is_mine(), 'lock is not owned'
231 assert not self._wait_semaphore.acquire(False)
232
233 # to take account of timeouts since last notify() we subtract
234 # woken_count from sleeping_count and rezero woken_count
235 while self._woken_count.acquire(False):
236 res = self._sleeping_count.acquire(False)
237 assert res
238
239 if self._sleeping_count.acquire(False): # try grabbing a sleeper
240 self._wait_semaphore.release() # wake up one sleeper
241 self._woken_count.acquire() # wait for the sleeper to wake
242
243 # rezero _wait_semaphore in case a timeout just happened
244 self._wait_semaphore.acquire(False)
245
246 def notify_all(self):
247 assert self._lock._semlock._is_mine(), 'lock is not owned'
248 assert not self._wait_semaphore.acquire(False)
249
250 # to take account of timeouts since last notify*() we subtract
251 # woken_count from sleeping_count and rezero woken_count
252 while self._woken_count.acquire(False):
253 res = self._sleeping_count.acquire(False)
254 assert res
255
256 sleepers = 0
257 while self._sleeping_count.acquire(False):
258 self._wait_semaphore.release() # wake up one sleeper
259 sleepers += 1
260
261 if sleepers:
262 for i in range(sleepers):
263 self._woken_count.acquire() # wait for a sleeper to wake
264
265 # rezero wait_semaphore in case some timeouts just happened
266 while self._wait_semaphore.acquire(False):
267 pass
268
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200269 def wait_for(self, predicate, timeout=None):
270 result = predicate()
271 if result:
272 return result
273 if timeout is not None:
274 endtime = _time() + timeout
275 else:
276 endtime = None
277 waittime = None
278 while not result:
279 if endtime is not None:
280 waittime = endtime - _time()
281 if waittime <= 0:
282 break
283 self.wait(waittime)
284 result = predicate()
285 return result
286
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287#
288# Event
289#
290
291class Event(object):
292
293 def __init__(self):
294 self._cond = Condition(Lock())
295 self._flag = Semaphore(0)
296
297 def is_set(self):
298 self._cond.acquire()
299 try:
300 if self._flag.acquire(False):
301 self._flag.release()
302 return True
303 return False
304 finally:
305 self._cond.release()
306
307 def set(self):
308 self._cond.acquire()
309 try:
310 self._flag.acquire(False)
311 self._flag.release()
312 self._cond.notify_all()
313 finally:
314 self._cond.release()
315
316 def clear(self):
317 self._cond.acquire()
318 try:
319 self._flag.acquire(False)
320 finally:
321 self._cond.release()
322
323 def wait(self, timeout=None):
324 self._cond.acquire()
325 try:
326 if self._flag.acquire(False):
327 self._flag.release()
328 else:
329 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000330
331 if self._flag.acquire(False):
332 self._flag.release()
333 return True
334 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335 finally:
336 self._cond.release()