blob: 82c30a27a5e008285fcfc3a5449d1bdaec5f3e09 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10__all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010014import os
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import sys
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010017import itertools
18import tempfile
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import _multiprocessing
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010020
Charles-François Natalic8ce7152012-04-17 18:45:57 +020021from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010023from . import context
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010024from . import process
25from . import util
26
Benjamin Petersone5384b02008-10-04 22:00:42 +000027# Try to import the mp.synchronize module cleanly, if it fails
28# raise ImportError for platforms lacking a working sem_open implementation.
29# See issue 3770
30try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031 from _multiprocessing import SemLock, sem_unlink
Benjamin Petersone5384b02008-10-04 22:00:42 +000032except (ImportError):
33 raise ImportError("This platform lacks a functioning sem_open" +
34 " implementation, therefore, the required" +
35 " synchronization primitives needed will not" +
36 " function, see issue 3770.")
37
Benjamin Petersone711caf2008-06-11 16:44:04 +000038#
39# Constants
40#
41
42RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
43SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
44
45#
46# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
47#
48
49class SemLock(object):
50
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010051 _rand = tempfile._RandomNameSequence()
52
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010053 def __init__(self, kind, value, maxvalue, *, ctx):
54 ctx = ctx or get_context()
55 ctx = ctx.get_context()
56 unlink_now = sys.platform == 'win32' or ctx._name == 'fork'
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010057 for i in range(100):
58 try:
59 sl = self._semlock = _multiprocessing.SemLock(
60 kind, value, maxvalue, self._make_name(),
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010061 unlink_now)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010062 except FileExistsError:
63 pass
64 else:
65 break
66 else:
67 raise FileExistsError('cannot find name for semaphore')
68
69 util.debug('created semlock with handle %s' % sl.handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +000070 self._make_methods()
71
72 if sys.platform != 'win32':
73 def _after_fork(obj):
74 obj._semlock._after_fork()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010075 util.register_after_fork(self, _after_fork)
76
77 if self._semlock.name is not None:
78 # We only get here if we are on Unix with forking
79 # disabled. When the object is garbage collected or the
80 # process shuts down we unlink the semaphore name
81 from .semaphore_tracker import register
82 register(self._semlock.name)
83 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
84 exitpriority=0)
85
86 @staticmethod
87 def _cleanup(name):
88 from .semaphore_tracker import unregister
89 sem_unlink(name)
90 unregister(name)
Benjamin Petersone711caf2008-06-11 16:44:04 +000091
92 def _make_methods(self):
93 self.acquire = self._semlock.acquire
94 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000095
96 def __enter__(self):
97 return self._semlock.__enter__()
98
99 def __exit__(self, *args):
100 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000101
102 def __getstate__(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100103 context.assert_spawning(self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000104 sl = self._semlock
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100105 if sys.platform == 'win32':
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100106 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100107 else:
108 h = sl.handle
109 return (h, sl.kind, sl.maxvalue, sl.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000110
111 def __setstate__(self, state):
112 self._semlock = _multiprocessing.SemLock._rebuild(*state)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100113 util.debug('recreated blocker with handle %r' % state[0])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000114 self._make_methods()
115
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100116 @staticmethod
117 def _make_name():
118 return '/%s-%s' % (process.current_process()._config['semprefix'],
119 next(SemLock._rand))
120
Benjamin Petersone711caf2008-06-11 16:44:04 +0000121#
122# Semaphore
123#
124
125class Semaphore(SemLock):
126
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100127 def __init__(self, value=1, *, ctx):
128 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129
130 def get_value(self):
131 return self._semlock._get_value()
132
133 def __repr__(self):
134 try:
135 value = self._semlock._get_value()
136 except Exception:
137 value = 'unknown'
138 return '<Semaphore(value=%s)>' % value
139
140#
141# Bounded semaphore
142#
143
144class BoundedSemaphore(Semaphore):
145
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100146 def __init__(self, value=1, *, ctx):
147 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148
149 def __repr__(self):
150 try:
151 value = self._semlock._get_value()
152 except Exception:
153 value = 'unknown'
154 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
155 (value, self._semlock.maxvalue)
156
157#
158# Non-recursive lock
159#
160
161class Lock(SemLock):
162
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100163 def __init__(self, *, ctx):
164 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
166 def __repr__(self):
167 try:
168 if self._semlock._is_mine():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100169 name = process.current_process().name
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000170 if threading.current_thread().name != 'MainThread':
171 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 elif self._semlock._get_value() == 1:
173 name = 'None'
174 elif self._semlock._count() > 0:
175 name = 'SomeOtherThread'
176 else:
177 name = 'SomeOtherProcess'
178 except Exception:
179 name = 'unknown'
180 return '<Lock(owner=%s)>' % name
181
182#
183# Recursive lock
184#
185
186class RLock(SemLock):
187
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100188 def __init__(self, *, ctx):
189 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190
191 def __repr__(self):
192 try:
193 if self._semlock._is_mine():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100194 name = process.current_process().name
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 if threading.current_thread().name != 'MainThread':
196 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 count = self._semlock._count()
198 elif self._semlock._get_value() == 1:
199 name, count = 'None', 0
200 elif self._semlock._count() > 0:
201 name, count = 'SomeOtherThread', 'nonzero'
202 else:
203 name, count = 'SomeOtherProcess', 'nonzero'
204 except Exception:
205 name, count = 'unknown', 'unknown'
206 return '<RLock(%s, %s)>' % (name, count)
207
208#
209# Condition variable
210#
211
212class Condition(object):
213
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100214 def __init__(self, lock=None, *, ctx):
215 self._lock = lock or ctx.RLock()
216 self._sleeping_count = ctx.Semaphore(0)
217 self._woken_count = ctx.Semaphore(0)
218 self._wait_semaphore = ctx.Semaphore(0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219 self._make_methods()
220
221 def __getstate__(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100222 context.assert_spawning(self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 return (self._lock, self._sleeping_count,
224 self._woken_count, self._wait_semaphore)
225
226 def __setstate__(self, state):
227 (self._lock, self._sleeping_count,
228 self._woken_count, self._wait_semaphore) = state
229 self._make_methods()
230
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000231 def __enter__(self):
232 return self._lock.__enter__()
233
234 def __exit__(self, *args):
235 return self._lock.__exit__(*args)
236
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 def _make_methods(self):
238 self.acquire = self._lock.acquire
239 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 def __repr__(self):
242 try:
243 num_waiters = (self._sleeping_count._semlock._get_value() -
244 self._woken_count._semlock._get_value())
245 except Exception:
doko@ubuntu.com9df891c2013-05-15 18:06:56 +0200246 num_waiters = 'unknown'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
248
249 def wait(self, timeout=None):
250 assert self._lock._semlock._is_mine(), \
251 'must acquire() condition before using wait()'
252
253 # indicate that this thread is going to sleep
254 self._sleeping_count.release()
255
256 # release lock
257 count = self._lock._semlock._count()
258 for i in range(count):
259 self._lock.release()
260
261 try:
262 # wait for notification or timeout
Richard Oudkerk86eb7e92012-06-04 18:59:07 +0100263 return self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264 finally:
265 # indicate that this thread has woken
266 self._woken_count.release()
267
268 # reacquire lock
269 for i in range(count):
270 self._lock.acquire()
271
272 def notify(self):
273 assert self._lock._semlock._is_mine(), 'lock is not owned'
274 assert not self._wait_semaphore.acquire(False)
275
276 # to take account of timeouts since last notify() we subtract
277 # woken_count from sleeping_count and rezero woken_count
278 while self._woken_count.acquire(False):
279 res = self._sleeping_count.acquire(False)
280 assert res
281
282 if self._sleeping_count.acquire(False): # try grabbing a sleeper
283 self._wait_semaphore.release() # wake up one sleeper
284 self._woken_count.acquire() # wait for the sleeper to wake
285
286 # rezero _wait_semaphore in case a timeout just happened
287 self._wait_semaphore.acquire(False)
288
289 def notify_all(self):
290 assert self._lock._semlock._is_mine(), 'lock is not owned'
291 assert not self._wait_semaphore.acquire(False)
292
293 # to take account of timeouts since last notify*() we subtract
294 # woken_count from sleeping_count and rezero woken_count
295 while self._woken_count.acquire(False):
296 res = self._sleeping_count.acquire(False)
297 assert res
298
299 sleepers = 0
300 while self._sleeping_count.acquire(False):
301 self._wait_semaphore.release() # wake up one sleeper
302 sleepers += 1
303
304 if sleepers:
305 for i in range(sleepers):
306 self._woken_count.acquire() # wait for a sleeper to wake
307
308 # rezero wait_semaphore in case some timeouts just happened
309 while self._wait_semaphore.acquire(False):
310 pass
311
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200312 def wait_for(self, predicate, timeout=None):
313 result = predicate()
314 if result:
315 return result
316 if timeout is not None:
317 endtime = _time() + timeout
318 else:
319 endtime = None
320 waittime = None
321 while not result:
322 if endtime is not None:
323 waittime = endtime - _time()
324 if waittime <= 0:
325 break
326 self.wait(waittime)
327 result = predicate()
328 return result
329
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330#
331# Event
332#
333
334class Event(object):
335
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100336 def __init__(self, *, ctx):
337 self._cond = ctx.Condition(ctx.Lock())
338 self._flag = ctx.Semaphore(0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339
340 def is_set(self):
341 self._cond.acquire()
342 try:
343 if self._flag.acquire(False):
344 self._flag.release()
345 return True
346 return False
347 finally:
348 self._cond.release()
349
350 def set(self):
351 self._cond.acquire()
352 try:
353 self._flag.acquire(False)
354 self._flag.release()
355 self._cond.notify_all()
356 finally:
357 self._cond.release()
358
359 def clear(self):
360 self._cond.acquire()
361 try:
362 self._flag.acquire(False)
363 finally:
364 self._cond.release()
365
366 def wait(self, timeout=None):
367 self._cond.acquire()
368 try:
369 if self._flag.acquire(False):
370 self._flag.release()
371 else:
372 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000373
374 if self._flag.acquire(False):
375 self._flag.release()
376 return True
377 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000378 finally:
379 self._cond.release()
Richard Oudkerk3730a172012-06-15 18:26:07 +0100380
381#
382# Barrier
383#
384
385class Barrier(threading.Barrier):
386
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100387 def __init__(self, parties, action=None, timeout=None, *, ctx):
Richard Oudkerk3730a172012-06-15 18:26:07 +0100388 import struct
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100389 from .heap import BufferWrapper
Richard Oudkerk3730a172012-06-15 18:26:07 +0100390 wrapper = BufferWrapper(struct.calcsize('i') * 2)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100391 cond = ctx.Condition()
Richard Oudkerk3730a172012-06-15 18:26:07 +0100392 self.__setstate__((parties, action, timeout, cond, wrapper))
393 self._state = 0
394 self._count = 0
395
396 def __setstate__(self, state):
397 (self._parties, self._action, self._timeout,
398 self._cond, self._wrapper) = state
399 self._array = self._wrapper.create_memoryview().cast('i')
400
401 def __getstate__(self):
402 return (self._parties, self._action, self._timeout,
403 self._cond, self._wrapper)
404
405 @property
406 def _state(self):
407 return self._array[0]
408
409 @_state.setter
410 def _state(self, value):
411 self._array[0] = value
412
413 @property
414 def _count(self):
415 return self._array[1]
416
417 @_count.setter
418 def _count(self, value):
419 self._array[1] = value