blob: d0be48f1fd7a8f14b935f74d89c168d831b84dbc [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10__all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
14import threading
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import sys
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010016import tempfile
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import _multiprocessing
Victor Stinnerc2368cb2018-07-06 13:51:52 +020018import time
Benjamin Petersone711caf2008-06-11 16:44:04 +000019
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010020from . import context
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010021from . import process
22from . import util
23
Benjamin Petersone5384b02008-10-04 22:00:42 +000024# Try to import the mp.synchronize module cleanly, if it fails
25# raise ImportError for platforms lacking a working sem_open implementation.
26# See issue 3770
27try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010028 from _multiprocessing import SemLock, sem_unlink
Benjamin Petersone5384b02008-10-04 22:00:42 +000029except (ImportError):
30 raise ImportError("This platform lacks a functioning sem_open" +
31 " implementation, therefore, the required" +
32 " synchronization primitives needed will not" +
33 " function, see issue 3770.")
34
Benjamin Petersone711caf2008-06-11 16:44:04 +000035#
36# Constants
37#
38
39RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
40SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
41
42#
43# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
44#
45
46class SemLock(object):
47
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010048 _rand = tempfile._RandomNameSequence()
49
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010050 def __init__(self, kind, value, maxvalue, *, ctx):
Richard Oudkerka40675a2014-03-23 11:54:15 +000051 if ctx is None:
52 ctx = context._default_context.get_context()
53 name = ctx.get_start_method()
54 unlink_now = sys.platform == 'win32' or name == 'fork'
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010055 for i in range(100):
56 try:
57 sl = self._semlock = _multiprocessing.SemLock(
58 kind, value, maxvalue, self._make_name(),
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010059 unlink_now)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010060 except FileExistsError:
61 pass
62 else:
63 break
64 else:
65 raise FileExistsError('cannot find name for semaphore')
66
67 util.debug('created semlock with handle %s' % sl.handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +000068 self._make_methods()
69
70 if sys.platform != 'win32':
71 def _after_fork(obj):
72 obj._semlock._after_fork()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010073 util.register_after_fork(self, _after_fork)
74
75 if self._semlock.name is not None:
76 # We only get here if we are on Unix with forking
77 # disabled. When the object is garbage collected or the
78 # process shuts down we unlink the semaphore name
Pierre Glaserf22cc692019-05-10 22:59:08 +020079 from .resource_tracker import register
80 register(self._semlock.name, "semaphore")
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010081 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
82 exitpriority=0)
83
84 @staticmethod
85 def _cleanup(name):
Pierre Glaserf22cc692019-05-10 22:59:08 +020086 from .resource_tracker import unregister
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010087 sem_unlink(name)
Pierre Glaserf22cc692019-05-10 22:59:08 +020088 unregister(name, "semaphore")
Benjamin Petersone711caf2008-06-11 16:44:04 +000089
90 def _make_methods(self):
91 self.acquire = self._semlock.acquire
92 self.release = self._semlock.release
Benjamin Peterson8cc7d882009-06-01 23:14:51 +000093
94 def __enter__(self):
95 return self._semlock.__enter__()
96
97 def __exit__(self, *args):
98 return self._semlock.__exit__(*args)
Benjamin Petersone711caf2008-06-11 16:44:04 +000099
100 def __getstate__(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100101 context.assert_spawning(self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102 sl = self._semlock
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100103 if sys.platform == 'win32':
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100104 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100105 else:
106 h = sl.handle
107 return (h, sl.kind, sl.maxvalue, sl.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108
109 def __setstate__(self, state):
110 self._semlock = _multiprocessing.SemLock._rebuild(*state)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100111 util.debug('recreated blocker with handle %r' % state[0])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000112 self._make_methods()
113
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100114 @staticmethod
115 def _make_name():
Richard Oudkerke9436972013-11-02 17:05:07 +0000116 return '%s-%s' % (process.current_process()._config['semprefix'],
117 next(SemLock._rand))
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100118
Benjamin Petersone711caf2008-06-11 16:44:04 +0000119#
120# Semaphore
121#
122
123class Semaphore(SemLock):
124
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100125 def __init__(self, value=1, *, ctx):
126 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127
128 def get_value(self):
129 return self._semlock._get_value()
130
131 def __repr__(self):
132 try:
133 value = self._semlock._get_value()
134 except Exception:
135 value = 'unknown'
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300136 return '<%s(value=%s)>' % (self.__class__.__name__, value)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137
138#
139# Bounded semaphore
140#
141
142class BoundedSemaphore(Semaphore):
143
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100144 def __init__(self, value=1, *, ctx):
145 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 def __repr__(self):
148 try:
149 value = self._semlock._get_value()
150 except Exception:
151 value = 'unknown'
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300152 return '<%s(value=%s, maxvalue=%s)>' % \
153 (self.__class__.__name__, value, self._semlock.maxvalue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000154
155#
156# Non-recursive lock
157#
158
159class Lock(SemLock):
160
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100161 def __init__(self, *, ctx):
162 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163
164 def __repr__(self):
165 try:
166 if self._semlock._is_mine():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100167 name = process.current_process().name
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 if threading.current_thread().name != 'MainThread':
169 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 elif self._semlock._get_value() == 1:
171 name = 'None'
172 elif self._semlock._count() > 0:
173 name = 'SomeOtherThread'
174 else:
175 name = 'SomeOtherProcess'
176 except Exception:
177 name = 'unknown'
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300178 return '<%s(owner=%s)>' % (self.__class__.__name__, name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179
180#
181# Recursive lock
182#
183
184class RLock(SemLock):
185
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100186 def __init__(self, *, ctx):
187 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188
189 def __repr__(self):
190 try:
191 if self._semlock._is_mine():
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100192 name = process.current_process().name
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 if threading.current_thread().name != 'MainThread':
194 name += '|' + threading.current_thread().name
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 count = self._semlock._count()
196 elif self._semlock._get_value() == 1:
197 name, count = 'None', 0
198 elif self._semlock._count() > 0:
199 name, count = 'SomeOtherThread', 'nonzero'
200 else:
201 name, count = 'SomeOtherProcess', 'nonzero'
202 except Exception:
203 name, count = 'unknown', 'unknown'
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300204 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205
206#
207# Condition variable
208#
209
210class Condition(object):
211
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100212 def __init__(self, lock=None, *, ctx):
213 self._lock = lock or ctx.RLock()
214 self._sleeping_count = ctx.Semaphore(0)
215 self._woken_count = ctx.Semaphore(0)
216 self._wait_semaphore = ctx.Semaphore(0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 self._make_methods()
218
219 def __getstate__(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100220 context.assert_spawning(self)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221 return (self._lock, self._sleeping_count,
222 self._woken_count, self._wait_semaphore)
223
224 def __setstate__(self, state):
225 (self._lock, self._sleeping_count,
226 self._woken_count, self._wait_semaphore) = state
227 self._make_methods()
228
Benjamin Peterson8cc7d882009-06-01 23:14:51 +0000229 def __enter__(self):
230 return self._lock.__enter__()
231
232 def __exit__(self, *args):
233 return self._lock.__exit__(*args)
234
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235 def _make_methods(self):
236 self.acquire = self._lock.acquire
237 self.release = self._lock.release
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 def __repr__(self):
240 try:
241 num_waiters = (self._sleeping_count._semlock._get_value() -
242 self._woken_count._semlock._get_value())
243 except Exception:
doko@ubuntu.com9df891c2013-05-15 18:06:56 +0200244 num_waiters = 'unknown'
Serhiy Storchaka465e60e2014-07-25 23:36:00 +0300245 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246
247 def wait(self, timeout=None):
248 assert self._lock._semlock._is_mine(), \
249 'must acquire() condition before using wait()'
250
251 # indicate that this thread is going to sleep
252 self._sleeping_count.release()
253
254 # release lock
255 count = self._lock._semlock._count()
256 for i in range(count):
257 self._lock.release()
258
259 try:
260 # wait for notification or timeout
Richard Oudkerk86eb7e92012-06-04 18:59:07 +0100261 return self._wait_semaphore.acquire(True, timeout)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 finally:
263 # indicate that this thread has woken
264 self._woken_count.release()
265
266 # reacquire lock
267 for i in range(count):
268 self._lock.acquire()
269
Antoine Pitrou48350412017-07-04 08:59:22 +0200270 def notify(self, n=1):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 assert self._lock._semlock._is_mine(), 'lock is not owned'
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500272 assert not self._wait_semaphore.acquire(
Allen99c0ee32020-08-17 09:38:55 -0400273 False), ('notify: Should not have been able to acquire '
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500274 + '_wait_semaphore')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
276 # to take account of timeouts since last notify*() we subtract
277 # woken_count from sleeping_count and rezero woken_count
278 while self._woken_count.acquire(False):
279 res = self._sleeping_count.acquire(False)
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500280 assert res, ('notify: Bug in sleeping_count.acquire'
281 + '- res should not be False')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282
283 sleepers = 0
Antoine Pitrou48350412017-07-04 08:59:22 +0200284 while sleepers < n and self._sleeping_count.acquire(False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285 self._wait_semaphore.release() # wake up one sleeper
286 sleepers += 1
287
288 if sleepers:
289 for i in range(sleepers):
290 self._woken_count.acquire() # wait for a sleeper to wake
291
292 # rezero wait_semaphore in case some timeouts just happened
293 while self._wait_semaphore.acquire(False):
294 pass
295
Antoine Pitrou48350412017-07-04 08:59:22 +0200296 def notify_all(self):
297 self.notify(n=sys.maxsize)
298
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200299 def wait_for(self, predicate, timeout=None):
300 result = predicate()
301 if result:
302 return result
303 if timeout is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +0200304 endtime = time.monotonic() + timeout
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200305 else:
306 endtime = None
307 waittime = None
308 while not result:
309 if endtime is not None:
Victor Stinnerc2368cb2018-07-06 13:51:52 +0200310 waittime = endtime - time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200311 if waittime <= 0:
312 break
313 self.wait(waittime)
314 result = predicate()
315 return result
316
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317#
318# Event
319#
320
321class Event(object):
322
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100323 def __init__(self, *, ctx):
324 self._cond = ctx.Condition(ctx.Lock())
325 self._flag = ctx.Semaphore(0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326
327 def is_set(self):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100328 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329 if self._flag.acquire(False):
330 self._flag.release()
331 return True
332 return False
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333
334 def set(self):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100335 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 self._flag.acquire(False)
337 self._flag.release()
338 self._cond.notify_all()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339
340 def clear(self):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100341 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342 self._flag.acquire(False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343
344 def wait(self, timeout=None):
Charles-François Natalia924fc72014-05-25 14:12:12 +0100345 with self._cond:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000346 if self._flag.acquire(False):
347 self._flag.release()
348 else:
349 self._cond.wait(timeout)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000350
351 if self._flag.acquire(False):
352 self._flag.release()
353 return True
354 return False
Richard Oudkerk3730a172012-06-15 18:26:07 +0100355
356#
357# Barrier
358#
359
360class Barrier(threading.Barrier):
361
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100362 def __init__(self, parties, action=None, timeout=None, *, ctx):
Richard Oudkerk3730a172012-06-15 18:26:07 +0100363 import struct
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100364 from .heap import BufferWrapper
Richard Oudkerk3730a172012-06-15 18:26:07 +0100365 wrapper = BufferWrapper(struct.calcsize('i') * 2)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100366 cond = ctx.Condition()
Richard Oudkerk3730a172012-06-15 18:26:07 +0100367 self.__setstate__((parties, action, timeout, cond, wrapper))
368 self._state = 0
369 self._count = 0
370
371 def __setstate__(self, state):
372 (self._parties, self._action, self._timeout,
373 self._cond, self._wrapper) = state
374 self._array = self._wrapper.create_memoryview().cast('i')
375
376 def __getstate__(self):
377 return (self._parties, self._action, self._timeout,
378 self._cond, self._wrapper)
379
380 @property
381 def _state(self):
382 return self._array[0]
383
384 @_state.setter
385 def _state(self, value):
386 self._array[0] = value
387
388 @property
389 def _count(self):
390 return self._array[1]
391
392 @_count.setter
393 def _count(self, value):
394 self._array[1] = value