blob: 09736efbb00c5023d635b903da8e8fe2d759133c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10__all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010014import os
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import sys
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010017import itertools
18import tempfile
Benjamin Petersone711caf2008-06-11 16:44:04 +000019import _multiprocessing
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010020
Charles-François Natalic8ce7152012-04-17 18:45:57 +020021from time import time as _time
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010023from . import popen
24from . import process
25from . import util
26
Benjamin Petersone5384b02008-10-04 22:00:42 +000027# Try to import the mp.synchronize module cleanly, if it fails
28# raise ImportError for platforms lacking a working sem_open implementation.
29# See issue 3770
30try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010031 from _multiprocessing import SemLock, sem_unlink
Benjamin Petersone5384b02008-10-04 22:00:42 +000032except (ImportError):
33 raise ImportError("This platform lacks a functioning sem_open" +
34 " implementation, therefore, the required" +
35 " synchronization primitives needed will not" +
36 " function, see issue 3770.")
37
Benjamin Petersone711caf2008-06-11 16:44:04 +000038#
39# Constants
40#
41
42RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
43SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
44
45#
46# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
47#
48
49class SemLock(object):
50
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010051 _rand = tempfile._RandomNameSequence()
52
Benjamin Petersone711caf2008-06-11 16:44:04 +000053 def __init__(self, kind, value, maxvalue):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010054 unlink_immediately = (sys.platform == 'win32' or
55 popen.get_start_method() == 'fork')
56 for i in range(100):
57 try:
58 sl = self._semlock = _multiprocessing.SemLock(
59 kind, value, maxvalue, self._make_name(),
60 unlink_immediately)
61 except FileExistsError:
62 pass
63 else:
64 break
65 else:
66 raise FileExistsError('cannot find name for semaphore')
67
68 util.debug('created semlock with handle %s' % sl.handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +000069 self._make_methods()
70
71 if sys.platform != 'win32':
72 def _after_fork(obj):
73 obj._semlock._after_fork()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010074 util.register_after_fork(self, _after_fork)
75
76 if self._semlock.name is not None:
77 # We only get here if we are on Unix with forking
78 # disabled. When the object is garbage collected or the
79 # process shuts down we unlink the semaphore name
80 from .semaphore_tracker import register
81 register(self._semlock.name)
82 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
83 exitpriority=0)
84
85 @staticmethod
86 def _cleanup(name):
87 from .semaphore_tracker import unregister
88 sem_unlink(name)
89 unregister(name)
Benjamin Petersone711caf2008-06-11 16:44:04 +000090
91 def _make_methods(self):
92 self.acquire = self._semlock.acquire
93 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000094
95 def __enter__(self):
96 return self._semlock.__enter__()
97
98 def __exit__(self, *args):
99 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100
101 def __getstate__(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100102 popen.assert_spawning(self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103 sl = self._semlock
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100104 if sys.platform == 'win32':
105 h = popen.get_spawning_popen().duplicate_for_child(sl.handle)
106 else:
107 h = sl.handle
108 return (h, sl.kind, sl.maxvalue, sl.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109
110 def __setstate__(self, state):
111 self._semlock = _multiprocessing.SemLock._rebuild(*state)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100112 util.debug('recreated blocker with handle %r' % state[0])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113 self._make_methods()
114
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100115 @staticmethod
116 def _make_name():
117 return '/%s-%s' % (process.current_process()._config['semprefix'],
118 next(SemLock._rand))
119
Benjamin Petersone711caf2008-06-11 16:44:04 +0000120#
121# Semaphore
122#
123
124class Semaphore(SemLock):
125
126 def __init__(self, value=1):
127 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
128
129 def get_value(self):
130 return self._semlock._get_value()
131
132 def __repr__(self):
133 try:
134 value = self._semlock._get_value()
135 except Exception:
136 value = 'unknown'
137 return '<Semaphore(value=%s)>' % value
138
139#
140# Bounded semaphore
141#
142
143class BoundedSemaphore(Semaphore):
144
145 def __init__(self, value=1):
146 SemLock.__init__(self, SEMAPHORE, value, value)
147
148 def __repr__(self):
149 try:
150 value = self._semlock._get_value()
151 except Exception:
152 value = 'unknown'
153 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
154 (value, self._semlock.maxvalue)
155
156#
157# Non-recursive lock
158#
159
160class Lock(SemLock):
161
162 def __init__(self):
163 SemLock.__init__(self, SEMAPHORE, 1, 1)
164
165 def __repr__(self):
166 try:
167 if self._semlock._is_mine():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100168 name = process.current_process().name
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 if threading.current_thread().name != 'MainThread':
170 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 elif self._semlock._get_value() == 1:
172 name = 'None'
173 elif self._semlock._count() > 0:
174 name = 'SomeOtherThread'
175 else:
176 name = 'SomeOtherProcess'
177 except Exception:
178 name = 'unknown'
179 return '<Lock(owner=%s)>' % name
180
181#
182# Recursive lock
183#
184
185class RLock(SemLock):
186
187 def __init__(self):
188 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
189
190 def __repr__(self):
191 try:
192 if self._semlock._is_mine():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100193 name = process.current_process().name
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 if threading.current_thread().name != 'MainThread':
195 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 count = self._semlock._count()
197 elif self._semlock._get_value() == 1:
198 name, count = 'None', 0
199 elif self._semlock._count() > 0:
200 name, count = 'SomeOtherThread', 'nonzero'
201 else:
202 name, count = 'SomeOtherProcess', 'nonzero'
203 except Exception:
204 name, count = 'unknown', 'unknown'
205 return '<RLock(%s, %s)>' % (name, count)
206
207#
208# Condition variable
209#
210
211class Condition(object):
212
213 def __init__(self, lock=None):
214 self._lock = lock or RLock()
215 self._sleeping_count = Semaphore(0)
216 self._woken_count = Semaphore(0)
217 self._wait_semaphore = Semaphore(0)
218 self._make_methods()
219
220 def __getstate__(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100221 popen.assert_spawning(self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 return (self._lock, self._sleeping_count,
223 self._woken_count, self._wait_semaphore)
224
225 def __setstate__(self, state):
226 (self._lock, self._sleeping_count,
227 self._woken_count, self._wait_semaphore) = state
228 self._make_methods()
229
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000230 def __enter__(self):
231 return self._lock.__enter__()
232
233 def __exit__(self, *args):
234 return self._lock.__exit__(*args)
235
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236 def _make_methods(self):
237 self.acquire = self._lock.acquire
238 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 def __repr__(self):
241 try:
242 num_waiters = (self._sleeping_count._semlock._get_value() -
243 self._woken_count._semlock._get_value())
244 except Exception:
doko@ubuntu.com9df891c2013-05-15 18:06:56 +0200245 num_waiters = 'unknown'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
247
248 def wait(self, timeout=None):
249 assert self._lock._semlock._is_mine(), \
250 'must acquire() condition before using wait()'
251
252 # indicate that this thread is going to sleep
253 self._sleeping_count.release()
254
255 # release lock
256 count = self._lock._semlock._count()
257 for i in range(count):
258 self._lock.release()
259
260 try:
261 # wait for notification or timeout
Richard Oudkerk86eb7e92012-06-04 18:59:07 +0100262 return self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263 finally:
264 # indicate that this thread has woken
265 self._woken_count.release()
266
267 # reacquire lock
268 for i in range(count):
269 self._lock.acquire()
270
271 def notify(self):
272 assert self._lock._semlock._is_mine(), 'lock is not owned'
273 assert not self._wait_semaphore.acquire(False)
274
275 # to take account of timeouts since last notify() we subtract
276 # woken_count from sleeping_count and rezero woken_count
277 while self._woken_count.acquire(False):
278 res = self._sleeping_count.acquire(False)
279 assert res
280
281 if self._sleeping_count.acquire(False): # try grabbing a sleeper
282 self._wait_semaphore.release() # wake up one sleeper
283 self._woken_count.acquire() # wait for the sleeper to wake
284
285 # rezero _wait_semaphore in case a timeout just happened
286 self._wait_semaphore.acquire(False)
287
288 def notify_all(self):
289 assert self._lock._semlock._is_mine(), 'lock is not owned'
290 assert not self._wait_semaphore.acquire(False)
291
292 # to take account of timeouts since last notify*() we subtract
293 # woken_count from sleeping_count and rezero woken_count
294 while self._woken_count.acquire(False):
295 res = self._sleeping_count.acquire(False)
296 assert res
297
298 sleepers = 0
299 while self._sleeping_count.acquire(False):
300 self._wait_semaphore.release() # wake up one sleeper
301 sleepers += 1
302
303 if sleepers:
304 for i in range(sleepers):
305 self._woken_count.acquire() # wait for a sleeper to wake
306
307 # rezero wait_semaphore in case some timeouts just happened
308 while self._wait_semaphore.acquire(False):
309 pass
310
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200311 def wait_for(self, predicate, timeout=None):
312 result = predicate()
313 if result:
314 return result
315 if timeout is not None:
316 endtime = _time() + timeout
317 else:
318 endtime = None
319 waittime = None
320 while not result:
321 if endtime is not None:
322 waittime = endtime - _time()
323 if waittime <= 0:
324 break
325 self.wait(waittime)
326 result = predicate()
327 return result
328
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329#
330# Event
331#
332
333class Event(object):
334
335 def __init__(self):
336 self._cond = Condition(Lock())
337 self._flag = Semaphore(0)
338
339 def is_set(self):
340 self._cond.acquire()
341 try:
342 if self._flag.acquire(False):
343 self._flag.release()
344 return True
345 return False
346 finally:
347 self._cond.release()
348
349 def set(self):
350 self._cond.acquire()
351 try:
352 self._flag.acquire(False)
353 self._flag.release()
354 self._cond.notify_all()
355 finally:
356 self._cond.release()
357
358 def clear(self):
359 self._cond.acquire()
360 try:
361 self._flag.acquire(False)
362 finally:
363 self._cond.release()
364
365 def wait(self, timeout=None):
366 self._cond.acquire()
367 try:
368 if self._flag.acquire(False):
369 self._flag.release()
370 else:
371 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000372
373 if self._flag.acquire(False):
374 self._flag.release()
375 return True
376 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000377 finally:
378 self._cond.release()
Richard Oudkerk3730a172012-06-15 18:26:07 +0100379
380#
381# Barrier
382#
383
384class Barrier(threading.Barrier):
385
386 def __init__(self, parties, action=None, timeout=None):
387 import struct
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100388 from .heap import BufferWrapper
Richard Oudkerk3730a172012-06-15 18:26:07 +0100389 wrapper = BufferWrapper(struct.calcsize('i') * 2)
390 cond = Condition()
391 self.__setstate__((parties, action, timeout, cond, wrapper))
392 self._state = 0
393 self._count = 0
394
395 def __setstate__(self, state):
396 (self._parties, self._action, self._timeout,
397 self._cond, self._wrapper) = state
398 self._array = self._wrapper.create_memoryview().cast('i')
399
400 def __getstate__(self):
401 return (self._parties, self._action, self._timeout,
402 self._cond, self._wrapper)
403
404 @property
405 def _state(self):
406 return self._array[0]
407
408 @_state.setter
409 def _state(self, value):
410 self._array[0] = value
411
412 @property
413 def _count(self):
414 return self._array[1]
415
416 @_count.setter
417 def _count(self, value):
418 self._array[1] = value