blob: 532ac5c1dd24855d831f1f14c7162c74637d2ab9 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [
36 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
37 ]
38
39import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import sys
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import _multiprocessing
43from multiprocessing.process import current_process
Florent Xicluna04842a82011-11-11 20:05:50 +010044from multiprocessing.util import register_after_fork, debug
Benjamin Petersone711caf2008-06-11 16:44:04 +000045from multiprocessing.forking import assert_spawning, Popen
Charles-François Natalic8ce7152012-04-17 18:45:57 +020046from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Benjamin Petersone5384b02008-10-04 22:00:42 +000048# Try to import the mp.synchronize module cleanly, if it fails
49# raise ImportError for platforms lacking a working sem_open implementation.
50# See issue 3770
51try:
52 from _multiprocessing import SemLock
53except (ImportError):
54 raise ImportError("This platform lacks a functioning sem_open" +
55 " implementation, therefore, the required" +
56 " synchronization primitives needed will not" +
57 " function, see issue 3770.")
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60# Constants
61#
62
63RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
64SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
65
66#
67# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
68#
69
70class SemLock(object):
71
72 def __init__(self, kind, value, maxvalue):
73 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
74 debug('created semlock with handle %s' % sl.handle)
75 self._make_methods()
76
77 if sys.platform != 'win32':
78 def _after_fork(obj):
79 obj._semlock._after_fork()
80 register_after_fork(self, _after_fork)
81
82 def _make_methods(self):
83 self.acquire = self._semlock.acquire
84 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000085
86 def __enter__(self):
87 return self._semlock.__enter__()
88
89 def __exit__(self, *args):
90 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +000091
92 def __getstate__(self):
93 assert_spawning(self)
94 sl = self._semlock
95 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
96
97 def __setstate__(self, state):
98 self._semlock = _multiprocessing.SemLock._rebuild(*state)
99 debug('recreated blocker with handle %r' % state[0])
100 self._make_methods()
101
102#
103# Semaphore
104#
105
106class Semaphore(SemLock):
107
108 def __init__(self, value=1):
109 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
110
111 def get_value(self):
112 return self._semlock._get_value()
113
114 def __repr__(self):
115 try:
116 value = self._semlock._get_value()
117 except Exception:
118 value = 'unknown'
119 return '<Semaphore(value=%s)>' % value
120
121#
122# Bounded semaphore
123#
124
125class BoundedSemaphore(Semaphore):
126
127 def __init__(self, value=1):
128 SemLock.__init__(self, SEMAPHORE, value, value)
129
130 def __repr__(self):
131 try:
132 value = self._semlock._get_value()
133 except Exception:
134 value = 'unknown'
135 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
136 (value, self._semlock.maxvalue)
137
138#
139# Non-recursive lock
140#
141
142class Lock(SemLock):
143
144 def __init__(self):
145 SemLock.__init__(self, SEMAPHORE, 1, 1)
146
147 def __repr__(self):
148 try:
149 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 name = current_process().name
151 if threading.current_thread().name != 'MainThread':
152 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 elif self._semlock._get_value() == 1:
154 name = 'None'
155 elif self._semlock._count() > 0:
156 name = 'SomeOtherThread'
157 else:
158 name = 'SomeOtherProcess'
159 except Exception:
160 name = 'unknown'
161 return '<Lock(owner=%s)>' % name
162
163#
164# Recursive lock
165#
166
167class RLock(SemLock):
168
169 def __init__(self):
170 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
171
172 def __repr__(self):
173 try:
174 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000175 name = current_process().name
176 if threading.current_thread().name != 'MainThread':
177 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 count = self._semlock._count()
179 elif self._semlock._get_value() == 1:
180 name, count = 'None', 0
181 elif self._semlock._count() > 0:
182 name, count = 'SomeOtherThread', 'nonzero'
183 else:
184 name, count = 'SomeOtherProcess', 'nonzero'
185 except Exception:
186 name, count = 'unknown', 'unknown'
187 return '<RLock(%s, %s)>' % (name, count)
188
189#
190# Condition variable
191#
192
193class Condition(object):
194
195 def __init__(self, lock=None):
196 self._lock = lock or RLock()
197 self._sleeping_count = Semaphore(0)
198 self._woken_count = Semaphore(0)
199 self._wait_semaphore = Semaphore(0)
200 self._make_methods()
201
202 def __getstate__(self):
203 assert_spawning(self)
204 return (self._lock, self._sleeping_count,
205 self._woken_count, self._wait_semaphore)
206
207 def __setstate__(self, state):
208 (self._lock, self._sleeping_count,
209 self._woken_count, self._wait_semaphore) = state
210 self._make_methods()
211
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000212 def __enter__(self):
213 return self._lock.__enter__()
214
215 def __exit__(self, *args):
216 return self._lock.__exit__(*args)
217
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 def _make_methods(self):
219 self.acquire = self._lock.acquire
220 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221
222 def __repr__(self):
223 try:
224 num_waiters = (self._sleeping_count._semlock._get_value() -
225 self._woken_count._semlock._get_value())
226 except Exception:
227 num_waiters = 'unkown'
228 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
229
230 def wait(self, timeout=None):
231 assert self._lock._semlock._is_mine(), \
232 'must acquire() condition before using wait()'
233
234 # indicate that this thread is going to sleep
235 self._sleeping_count.release()
236
237 # release lock
238 count = self._lock._semlock._count()
239 for i in range(count):
240 self._lock.release()
241
242 try:
243 # wait for notification or timeout
Georg Brandl2fa4cc52010-10-28 13:01:06 +0000244 ret = self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245 finally:
246 # indicate that this thread has woken
247 self._woken_count.release()
248
249 # reacquire lock
250 for i in range(count):
251 self._lock.acquire()
Georg Brandl2fa4cc52010-10-28 13:01:06 +0000252 return ret
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254 def notify(self):
255 assert self._lock._semlock._is_mine(), 'lock is not owned'
256 assert not self._wait_semaphore.acquire(False)
257
258 # to take account of timeouts since last notify() we subtract
259 # woken_count from sleeping_count and rezero woken_count
260 while self._woken_count.acquire(False):
261 res = self._sleeping_count.acquire(False)
262 assert res
263
264 if self._sleeping_count.acquire(False): # try grabbing a sleeper
265 self._wait_semaphore.release() # wake up one sleeper
266 self._woken_count.acquire() # wait for the sleeper to wake
267
268 # rezero _wait_semaphore in case a timeout just happened
269 self._wait_semaphore.acquire(False)
270
271 def notify_all(self):
272 assert self._lock._semlock._is_mine(), 'lock is not owned'
273 assert not self._wait_semaphore.acquire(False)
274
275 # to take account of timeouts since last notify*() we subtract
276 # woken_count from sleeping_count and rezero woken_count
277 while self._woken_count.acquire(False):
278 res = self._sleeping_count.acquire(False)
279 assert res
280
281 sleepers = 0
282 while self._sleeping_count.acquire(False):
283 self._wait_semaphore.release() # wake up one sleeper
284 sleepers += 1
285
286 if sleepers:
287 for i in range(sleepers):
288 self._woken_count.acquire() # wait for a sleeper to wake
289
290 # rezero wait_semaphore in case some timeouts just happened
291 while self._wait_semaphore.acquire(False):
292 pass
293
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200294 def wait_for(self, predicate, timeout=None):
295 result = predicate()
296 if result:
297 return result
298 if timeout is not None:
299 endtime = _time() + timeout
300 else:
301 endtime = None
302 waittime = None
303 while not result:
304 if endtime is not None:
305 waittime = endtime - _time()
306 if waittime <= 0:
307 break
308 self.wait(waittime)
309 result = predicate()
310 return result
311
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312#
313# Event
314#
315
316class Event(object):
317
318 def __init__(self):
319 self._cond = Condition(Lock())
320 self._flag = Semaphore(0)
321
322 def is_set(self):
323 self._cond.acquire()
324 try:
325 if self._flag.acquire(False):
326 self._flag.release()
327 return True
328 return False
329 finally:
330 self._cond.release()
331
332 def set(self):
333 self._cond.acquire()
334 try:
335 self._flag.acquire(False)
336 self._flag.release()
337 self._cond.notify_all()
338 finally:
339 self._cond.release()
340
341 def clear(self):
342 self._cond.acquire()
343 try:
344 self._flag.acquire(False)
345 finally:
346 self._cond.release()
347
348 def wait(self, timeout=None):
349 self._cond.acquire()
350 try:
351 if self._flag.acquire(False):
352 self._flag.release()
353 else:
354 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000355
356 if self._flag.acquire(False):
357 self._flag.release()
358 return True
359 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000360 finally:
361 self._cond.release()