blob: e35bbff185c204f05bf9a858ae4f2cdcf2fb1d3f [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [
36 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
37 ]
38
39import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import sys
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import _multiprocessing
43from multiprocessing.process import current_process
Florent Xicluna04842a82011-11-11 20:05:50 +010044from multiprocessing.util import register_after_fork, debug
Benjamin Petersone711caf2008-06-11 16:44:04 +000045from multiprocessing.forking import assert_spawning, Popen
46
Benjamin Petersone5384b02008-10-04 22:00:42 +000047# Try to import the mp.synchronize module cleanly, if it fails
48# raise ImportError for platforms lacking a working sem_open implementation.
49# See issue 3770
50try:
51 from _multiprocessing import SemLock
52except (ImportError):
53 raise ImportError("This platform lacks a functioning sem_open" +
54 " implementation, therefore, the required" +
55 " synchronization primitives needed will not" +
56 " function, see issue 3770.")
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59# Constants
60#
61
62RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
63SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
64
65#
66# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
67#
68
69class SemLock(object):
70
71 def __init__(self, kind, value, maxvalue):
72 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
73 debug('created semlock with handle %s' % sl.handle)
74 self._make_methods()
75
76 if sys.platform != 'win32':
77 def _after_fork(obj):
78 obj._semlock._after_fork()
79 register_after_fork(self, _after_fork)
80
81 def _make_methods(self):
82 self.acquire = self._semlock.acquire
83 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000084
85 def __enter__(self):
86 return self._semlock.__enter__()
87
88 def __exit__(self, *args):
89 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +000090
91 def __getstate__(self):
92 assert_spawning(self)
93 sl = self._semlock
94 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
95
96 def __setstate__(self, state):
97 self._semlock = _multiprocessing.SemLock._rebuild(*state)
98 debug('recreated blocker with handle %r' % state[0])
99 self._make_methods()
100
101#
102# Semaphore
103#
104
105class Semaphore(SemLock):
106
107 def __init__(self, value=1):
108 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
109
110 def get_value(self):
111 return self._semlock._get_value()
112
113 def __repr__(self):
114 try:
115 value = self._semlock._get_value()
116 except Exception:
117 value = 'unknown'
118 return '<Semaphore(value=%s)>' % value
119
120#
121# Bounded semaphore
122#
123
124class BoundedSemaphore(Semaphore):
125
126 def __init__(self, value=1):
127 SemLock.__init__(self, SEMAPHORE, value, value)
128
129 def __repr__(self):
130 try:
131 value = self._semlock._get_value()
132 except Exception:
133 value = 'unknown'
134 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
135 (value, self._semlock.maxvalue)
136
137#
138# Non-recursive lock
139#
140
141class Lock(SemLock):
142
143 def __init__(self):
144 SemLock.__init__(self, SEMAPHORE, 1, 1)
145
146 def __repr__(self):
147 try:
148 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000149 name = current_process().name
150 if threading.current_thread().name != 'MainThread':
151 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 elif self._semlock._get_value() == 1:
153 name = 'None'
154 elif self._semlock._count() > 0:
155 name = 'SomeOtherThread'
156 else:
157 name = 'SomeOtherProcess'
158 except Exception:
159 name = 'unknown'
160 return '<Lock(owner=%s)>' % name
161
162#
163# Recursive lock
164#
165
166class RLock(SemLock):
167
168 def __init__(self):
169 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
170
171 def __repr__(self):
172 try:
173 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 name = current_process().name
175 if threading.current_thread().name != 'MainThread':
176 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000177 count = self._semlock._count()
178 elif self._semlock._get_value() == 1:
179 name, count = 'None', 0
180 elif self._semlock._count() > 0:
181 name, count = 'SomeOtherThread', 'nonzero'
182 else:
183 name, count = 'SomeOtherProcess', 'nonzero'
184 except Exception:
185 name, count = 'unknown', 'unknown'
186 return '<RLock(%s, %s)>' % (name, count)
187
188#
189# Condition variable
190#
191
192class Condition(object):
193
194 def __init__(self, lock=None):
195 self._lock = lock or RLock()
196 self._sleeping_count = Semaphore(0)
197 self._woken_count = Semaphore(0)
198 self._wait_semaphore = Semaphore(0)
199 self._make_methods()
200
201 def __getstate__(self):
202 assert_spawning(self)
203 return (self._lock, self._sleeping_count,
204 self._woken_count, self._wait_semaphore)
205
206 def __setstate__(self, state):
207 (self._lock, self._sleeping_count,
208 self._woken_count, self._wait_semaphore) = state
209 self._make_methods()
210
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000211 def __enter__(self):
212 return self._lock.__enter__()
213
214 def __exit__(self, *args):
215 return self._lock.__exit__(*args)
216
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 def _make_methods(self):
218 self.acquire = self._lock.acquire
219 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220
221 def __repr__(self):
222 try:
223 num_waiters = (self._sleeping_count._semlock._get_value() -
224 self._woken_count._semlock._get_value())
225 except Exception:
226 num_waiters = 'unkown'
227 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
228
229 def wait(self, timeout=None):
230 assert self._lock._semlock._is_mine(), \
231 'must acquire() condition before using wait()'
232
233 # indicate that this thread is going to sleep
234 self._sleeping_count.release()
235
236 # release lock
237 count = self._lock._semlock._count()
238 for i in range(count):
239 self._lock.release()
240
241 try:
242 # wait for notification or timeout
Georg Brandl2fa4cc52010-10-28 13:01:06 +0000243 ret = self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244 finally:
245 # indicate that this thread has woken
246 self._woken_count.release()
247
248 # reacquire lock
249 for i in range(count):
250 self._lock.acquire()
Georg Brandl2fa4cc52010-10-28 13:01:06 +0000251 return ret
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 def notify(self):
254 assert self._lock._semlock._is_mine(), 'lock is not owned'
255 assert not self._wait_semaphore.acquire(False)
256
257 # to take account of timeouts since last notify() we subtract
258 # woken_count from sleeping_count and rezero woken_count
259 while self._woken_count.acquire(False):
260 res = self._sleeping_count.acquire(False)
261 assert res
262
263 if self._sleeping_count.acquire(False): # try grabbing a sleeper
264 self._wait_semaphore.release() # wake up one sleeper
265 self._woken_count.acquire() # wait for the sleeper to wake
266
267 # rezero _wait_semaphore in case a timeout just happened
268 self._wait_semaphore.acquire(False)
269
270 def notify_all(self):
271 assert self._lock._semlock._is_mine(), 'lock is not owned'
272 assert not self._wait_semaphore.acquire(False)
273
274 # to take account of timeouts since last notify*() we subtract
275 # woken_count from sleeping_count and rezero woken_count
276 while self._woken_count.acquire(False):
277 res = self._sleeping_count.acquire(False)
278 assert res
279
280 sleepers = 0
281 while self._sleeping_count.acquire(False):
282 self._wait_semaphore.release() # wake up one sleeper
283 sleepers += 1
284
285 if sleepers:
286 for i in range(sleepers):
287 self._woken_count.acquire() # wait for a sleeper to wake
288
289 # rezero wait_semaphore in case some timeouts just happened
290 while self._wait_semaphore.acquire(False):
291 pass
292
293#
294# Event
295#
296
297class Event(object):
298
299 def __init__(self):
300 self._cond = Condition(Lock())
301 self._flag = Semaphore(0)
302
303 def is_set(self):
304 self._cond.acquire()
305 try:
306 if self._flag.acquire(False):
307 self._flag.release()
308 return True
309 return False
310 finally:
311 self._cond.release()
312
313 def set(self):
314 self._cond.acquire()
315 try:
316 self._flag.acquire(False)
317 self._flag.release()
318 self._cond.notify_all()
319 finally:
320 self._cond.release()
321
322 def clear(self):
323 self._cond.acquire()
324 try:
325 self._flag.acquire(False)
326 finally:
327 self._cond.release()
328
329 def wait(self, timeout=None):
330 self._cond.acquire()
331 try:
332 if self._flag.acquire(False):
333 self._flag.release()
334 else:
335 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000336
337 if self._flag.acquire(False):
338 self._flag.release()
339 return True
340 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000341 finally:
342 self._cond.release()