blob: 22eabe55b84f97efae348d7ea04a23bbfcdc1c32 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10__all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
14import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import sys
16
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import _multiprocessing
18from multiprocessing.process import current_process
Florent Xicluna04842a82011-11-11 20:05:50 +010019from multiprocessing.util import register_after_fork, debug
Benjamin Petersone711caf2008-06-11 16:44:04 +000020from multiprocessing.forking import assert_spawning, Popen
Charles-François Natalic8ce7152012-04-17 18:45:57 +020021from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023# Try to import the mp.synchronize module cleanly, if it fails
24# raise ImportError for platforms lacking a working sem_open implementation.
25# See issue 3770
26try:
27 from _multiprocessing import SemLock
28except (ImportError):
29 raise ImportError("This platform lacks a functioning sem_open" +
30 " implementation, therefore, the required" +
31 " synchronization primitives needed will not" +
32 " function, see issue 3770.")
33
Benjamin Petersone711caf2008-06-11 16:44:04 +000034#
35# Constants
36#
37
38RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
39SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
40
41#
42# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
43#
44
45class SemLock(object):
46
47 def __init__(self, kind, value, maxvalue):
48 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
49 debug('created semlock with handle %s' % sl.handle)
50 self._make_methods()
51
52 if sys.platform != 'win32':
53 def _after_fork(obj):
54 obj._semlock._after_fork()
55 register_after_fork(self, _after_fork)
56
57 def _make_methods(self):
58 self.acquire = self._semlock.acquire
59 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000060
61 def __enter__(self):
62 return self._semlock.__enter__()
63
64 def __exit__(self, *args):
65 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
67 def __getstate__(self):
68 assert_spawning(self)
69 sl = self._semlock
70 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
71
72 def __setstate__(self, state):
73 self._semlock = _multiprocessing.SemLock._rebuild(*state)
74 debug('recreated blocker with handle %r' % state[0])
75 self._make_methods()
76
77#
78# Semaphore
79#
80
81class Semaphore(SemLock):
82
83 def __init__(self, value=1):
84 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
85
86 def get_value(self):
87 return self._semlock._get_value()
88
89 def __repr__(self):
90 try:
91 value = self._semlock._get_value()
92 except Exception:
93 value = 'unknown'
94 return '<Semaphore(value=%s)>' % value
95
96#
97# Bounded semaphore
98#
99
100class BoundedSemaphore(Semaphore):
101
102 def __init__(self, value=1):
103 SemLock.__init__(self, SEMAPHORE, value, value)
104
105 def __repr__(self):
106 try:
107 value = self._semlock._get_value()
108 except Exception:
109 value = 'unknown'
110 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
111 (value, self._semlock.maxvalue)
112
113#
114# Non-recursive lock
115#
116
117class Lock(SemLock):
118
119 def __init__(self):
120 SemLock.__init__(self, SEMAPHORE, 1, 1)
121
122 def __repr__(self):
123 try:
124 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000125 name = current_process().name
126 if threading.current_thread().name != 'MainThread':
127 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128 elif self._semlock._get_value() == 1:
129 name = 'None'
130 elif self._semlock._count() > 0:
131 name = 'SomeOtherThread'
132 else:
133 name = 'SomeOtherProcess'
134 except Exception:
135 name = 'unknown'
136 return '<Lock(owner=%s)>' % name
137
138#
139# Recursive lock
140#
141
142class RLock(SemLock):
143
144 def __init__(self):
145 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
146
147 def __repr__(self):
148 try:
149 if self._semlock._is_mine():
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 name = current_process().name
151 if threading.current_thread().name != 'MainThread':
152 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153 count = self._semlock._count()
154 elif self._semlock._get_value() == 1:
155 name, count = 'None', 0
156 elif self._semlock._count() > 0:
157 name, count = 'SomeOtherThread', 'nonzero'
158 else:
159 name, count = 'SomeOtherProcess', 'nonzero'
160 except Exception:
161 name, count = 'unknown', 'unknown'
162 return '<RLock(%s, %s)>' % (name, count)
163
164#
165# Condition variable
166#
167
168class Condition(object):
169
170 def __init__(self, lock=None):
171 self._lock = lock or RLock()
172 self._sleeping_count = Semaphore(0)
173 self._woken_count = Semaphore(0)
174 self._wait_semaphore = Semaphore(0)
175 self._make_methods()
176
177 def __getstate__(self):
178 assert_spawning(self)
179 return (self._lock, self._sleeping_count,
180 self._woken_count, self._wait_semaphore)
181
182 def __setstate__(self, state):
183 (self._lock, self._sleeping_count,
184 self._woken_count, self._wait_semaphore) = state
185 self._make_methods()
186
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000187 def __enter__(self):
188 return self._lock.__enter__()
189
190 def __exit__(self, *args):
191 return self._lock.__exit__(*args)
192
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 def _make_methods(self):
194 self.acquire = self._lock.acquire
195 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 def __repr__(self):
198 try:
199 num_waiters = (self._sleeping_count._semlock._get_value() -
200 self._woken_count._semlock._get_value())
201 except Exception:
202 num_waiters = 'unkown'
203 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
204
205 def wait(self, timeout=None):
206 assert self._lock._semlock._is_mine(), \
207 'must acquire() condition before using wait()'
208
209 # indicate that this thread is going to sleep
210 self._sleeping_count.release()
211
212 # release lock
213 count = self._lock._semlock._count()
214 for i in range(count):
215 self._lock.release()
216
217 try:
218 # wait for notification or timeout
Richard Oudkerk86eb7e92012-06-04 18:59:07 +0100219 return self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 finally:
221 # indicate that this thread has woken
222 self._woken_count.release()
223
224 # reacquire lock
225 for i in range(count):
226 self._lock.acquire()
227
228 def notify(self):
229 assert self._lock._semlock._is_mine(), 'lock is not owned'
230 assert not self._wait_semaphore.acquire(False)
231
232 # to take account of timeouts since last notify() we subtract
233 # woken_count from sleeping_count and rezero woken_count
234 while self._woken_count.acquire(False):
235 res = self._sleeping_count.acquire(False)
236 assert res
237
238 if self._sleeping_count.acquire(False): # try grabbing a sleeper
239 self._wait_semaphore.release() # wake up one sleeper
240 self._woken_count.acquire() # wait for the sleeper to wake
241
242 # rezero _wait_semaphore in case a timeout just happened
243 self._wait_semaphore.acquire(False)
244
245 def notify_all(self):
246 assert self._lock._semlock._is_mine(), 'lock is not owned'
247 assert not self._wait_semaphore.acquire(False)
248
249 # to take account of timeouts since last notify*() we subtract
250 # woken_count from sleeping_count and rezero woken_count
251 while self._woken_count.acquire(False):
252 res = self._sleeping_count.acquire(False)
253 assert res
254
255 sleepers = 0
256 while self._sleeping_count.acquire(False):
257 self._wait_semaphore.release() # wake up one sleeper
258 sleepers += 1
259
260 if sleepers:
261 for i in range(sleepers):
262 self._woken_count.acquire() # wait for a sleeper to wake
263
264 # rezero wait_semaphore in case some timeouts just happened
265 while self._wait_semaphore.acquire(False):
266 pass
267
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200268 def wait_for(self, predicate, timeout=None):
269 result = predicate()
270 if result:
271 return result
272 if timeout is not None:
273 endtime = _time() + timeout
274 else:
275 endtime = None
276 waittime = None
277 while not result:
278 if endtime is not None:
279 waittime = endtime - _time()
280 if waittime <= 0:
281 break
282 self.wait(waittime)
283 result = predicate()
284 return result
285
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286#
287# Event
288#
289
290class Event(object):
291
292 def __init__(self):
293 self._cond = Condition(Lock())
294 self._flag = Semaphore(0)
295
296 def is_set(self):
297 self._cond.acquire()
298 try:
299 if self._flag.acquire(False):
300 self._flag.release()
301 return True
302 return False
303 finally:
304 self._cond.release()
305
306 def set(self):
307 self._cond.acquire()
308 try:
309 self._flag.acquire(False)
310 self._flag.release()
311 self._cond.notify_all()
312 finally:
313 self._cond.release()
314
315 def clear(self):
316 self._cond.acquire()
317 try:
318 self._flag.acquire(False)
319 finally:
320 self._cond.release()
321
322 def wait(self, timeout=None):
323 self._cond.acquire()
324 try:
325 if self._flag.acquire(False):
326 self._flag.release()
327 else:
328 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000329
330 if self._flag.acquire(False):
331 self._flag.release()
332 return True
333 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000334 finally:
335 self._cond.release()
Richard Oudkerk3730a172012-06-15 18:26:07 +0100336
337#
338# Barrier
339#
340
341class Barrier(threading.Barrier):
342
343 def __init__(self, parties, action=None, timeout=None):
344 import struct
345 from multiprocessing.heap import BufferWrapper
346 wrapper = BufferWrapper(struct.calcsize('i') * 2)
347 cond = Condition()
348 self.__setstate__((parties, action, timeout, cond, wrapper))
349 self._state = 0
350 self._count = 0
351
352 def __setstate__(self, state):
353 (self._parties, self._action, self._timeout,
354 self._cond, self._wrapper) = state
355 self._array = self._wrapper.create_memoryview().cast('i')
356
357 def __getstate__(self):
358 return (self._parties, self._action, self._timeout,
359 self._cond, self._wrapper)
360
361 @property
362 def _state(self):
363 return self._array[0]
364
365 @_state.setter
366 def _state(self, value):
367 self._array[0] = value
368
369 @property
370 def _count(self):
371 return self._array[1]
372
373 @_count.setter
374 def _count(self, value):
375 self._array[1] = value