blob: 871746b1a047b392a907c20fe1ae04172bae553f [file] [log] [blame]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01001import os
2import sys
3import threading
4
5from . import process
Davin Potts54586472016-09-09 18:03:10 -05006from . import reduction
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01007
Derek B. Kimc40278e2018-07-11 19:22:28 +09008__all__ = ()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01009
10#
11# Exceptions
12#
13
14class ProcessError(Exception):
15 pass
16
17class BufferTooShort(ProcessError):
18 pass
19
20class TimeoutError(ProcessError):
21 pass
22
23class AuthenticationError(ProcessError):
24 pass
25
26#
Derek B. Kimc40278e2018-07-11 19:22:28 +090027# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010028#
29
30class BaseContext(object):
31
32 ProcessError = ProcessError
33 BufferTooShort = BufferTooShort
34 TimeoutError = TimeoutError
35 AuthenticationError = AuthenticationError
36
37 current_process = staticmethod(process.current_process)
38 active_children = staticmethod(process.active_children)
39
40 def cpu_count(self):
41 '''Returns the number of CPUs in the system'''
42 num = os.cpu_count()
43 if num is None:
44 raise NotImplementedError('cannot determine number of cpus')
45 else:
46 return num
47
48 def Manager(self):
49 '''Returns a manager associated with a running server process
50
51 The managers methods such as `Lock()`, `Condition()` and `Queue()`
52 can be used to create shared objects.
53 '''
54 from .managers import SyncManager
55 m = SyncManager(ctx=self.get_context())
56 m.start()
57 return m
58
59 def Pipe(self, duplex=True):
60 '''Returns two connection object connected by a pipe'''
61 from .connection import Pipe
62 return Pipe(duplex)
63
64 def Lock(self):
65 '''Returns a non-recursive lock object'''
66 from .synchronize import Lock
67 return Lock(ctx=self.get_context())
68
69 def RLock(self):
70 '''Returns a recursive lock object'''
71 from .synchronize import RLock
72 return RLock(ctx=self.get_context())
73
74 def Condition(self, lock=None):
75 '''Returns a condition object'''
76 from .synchronize import Condition
77 return Condition(lock, ctx=self.get_context())
78
79 def Semaphore(self, value=1):
80 '''Returns a semaphore object'''
81 from .synchronize import Semaphore
82 return Semaphore(value, ctx=self.get_context())
83
84 def BoundedSemaphore(self, value=1):
85 '''Returns a bounded semaphore object'''
86 from .synchronize import BoundedSemaphore
87 return BoundedSemaphore(value, ctx=self.get_context())
88
89 def Event(self):
90 '''Returns an event object'''
91 from .synchronize import Event
92 return Event(ctx=self.get_context())
93
94 def Barrier(self, parties, action=None, timeout=None):
95 '''Returns a barrier object'''
96 from .synchronize import Barrier
97 return Barrier(parties, action, timeout, ctx=self.get_context())
98
99 def Queue(self, maxsize=0):
100 '''Returns a queue object'''
101 from .queues import Queue
102 return Queue(maxsize, ctx=self.get_context())
103
104 def JoinableQueue(self, maxsize=0):
105 '''Returns a queue object'''
106 from .queues import JoinableQueue
107 return JoinableQueue(maxsize, ctx=self.get_context())
108
109 def SimpleQueue(self):
110 '''Returns a queue object'''
111 from .queues import SimpleQueue
112 return SimpleQueue(ctx=self.get_context())
113
114 def Pool(self, processes=None, initializer=None, initargs=(),
115 maxtasksperchild=None):
116 '''Returns a process pool object'''
117 from .pool import Pool
118 return Pool(processes, initializer, initargs, maxtasksperchild,
119 context=self.get_context())
120
121 def RawValue(self, typecode_or_type, *args):
122 '''Returns a shared object'''
123 from .sharedctypes import RawValue
124 return RawValue(typecode_or_type, *args)
125
126 def RawArray(self, typecode_or_type, size_or_initializer):
127 '''Returns a shared array'''
128 from .sharedctypes import RawArray
129 return RawArray(typecode_or_type, size_or_initializer)
130
131 def Value(self, typecode_or_type, *args, lock=True):
132 '''Returns a synchronized shared object'''
133 from .sharedctypes import Value
134 return Value(typecode_or_type, *args, lock=lock,
135 ctx=self.get_context())
136
137 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
138 '''Returns a synchronized shared array'''
139 from .sharedctypes import Array
140 return Array(typecode_or_type, size_or_initializer, lock=lock,
141 ctx=self.get_context())
142
143 def freeze_support(self):
144 '''Check whether this is a fake forked process in a frozen executable.
145 If so then run code specified by commandline and exit.
146 '''
147 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
148 from .spawn import freeze_support
149 freeze_support()
150
151 def get_logger(self):
152 '''Return package logger -- if it does not already exist then
153 it is created.
154 '''
155 from .util import get_logger
156 return get_logger()
157
158 def log_to_stderr(self, level=None):
159 '''Turn on logging and add a handler which prints to stderr'''
160 from .util import log_to_stderr
161 return log_to_stderr(level)
162
163 def allow_connection_pickling(self):
164 '''Install support for sending connections and sockets
165 between processes
166 '''
167 # This is undocumented. In previous versions of multiprocessing
168 # its only effect was to make socket objects inheritable on Windows.
169 from . import connection
170
171 def set_executable(self, executable):
172 '''Sets the path to a python.exe or pythonw.exe binary used to run
173 child processes instead of sys.executable when using the 'spawn'
174 start method. Useful for people embedding Python.
175 '''
176 from .spawn import set_executable
177 set_executable(executable)
178
179 def set_forkserver_preload(self, module_names):
180 '''Set list of module names to try to load in forkserver process.
181 This is really just a hint.
182 '''
183 from .forkserver import set_forkserver_preload
184 set_forkserver_preload(module_names)
185
186 def get_context(self, method=None):
187 if method is None:
188 return self
189 try:
190 ctx = _concrete_contexts[method]
191 except KeyError:
Serhiy Storchaka5affd232017-04-05 09:37:24 +0300192 raise ValueError('cannot find context for %r' % method) from None
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100193 ctx._check_available()
194 return ctx
195
196 def get_start_method(self, allow_none=False):
197 return self._name
198
Antoine Pitroucd2a2012016-12-10 17:13:16 +0100199 def set_start_method(self, method, force=False):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100200 raise ValueError('cannot set start method of concrete context')
201
Davin Potts54586472016-09-09 18:03:10 -0500202 @property
203 def reducer(self):
204 '''Controls how objects will be reduced to a form that can be
205 shared with other processes.'''
206 return globals().get('reduction')
207
208 @reducer.setter
209 def reducer(self, reduction):
210 globals()['reduction'] = reduction
211
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100212 def _check_available(self):
213 pass
214
215#
216# Type of default context -- underlying context can be set at most once
217#
218
219class Process(process.BaseProcess):
220 _start_method = None
221 @staticmethod
222 def _Popen(process_obj):
223 return _default_context.get_context().Process._Popen(process_obj)
224
225class DefaultContext(BaseContext):
226 Process = Process
227
228 def __init__(self, context):
229 self._default_context = context
230 self._actual_context = None
231
232 def get_context(self, method=None):
233 if method is None:
234 if self._actual_context is None:
235 self._actual_context = self._default_context
236 return self._actual_context
237 else:
238 return super().get_context(method)
239
240 def set_start_method(self, method, force=False):
241 if self._actual_context is not None and not force:
242 raise RuntimeError('context has already been set')
243 if method is None and force:
244 self._actual_context = None
245 return
246 self._actual_context = self.get_context(method)
247
248 def get_start_method(self, allow_none=False):
249 if self._actual_context is None:
250 if allow_none:
251 return None
252 self._actual_context = self._default_context
253 return self._actual_context._name
254
255 def get_all_start_methods(self):
256 if sys.platform == 'win32':
257 return ['spawn']
258 else:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100259 if reduction.HAVE_SEND_HANDLE:
260 return ['fork', 'spawn', 'forkserver']
261 else:
262 return ['fork', 'spawn']
263
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100264#
265# Context types for fixed start method
266#
267
268if sys.platform != 'win32':
269
270 class ForkProcess(process.BaseProcess):
271 _start_method = 'fork'
272 @staticmethod
273 def _Popen(process_obj):
274 from .popen_fork import Popen
275 return Popen(process_obj)
276
277 class SpawnProcess(process.BaseProcess):
278 _start_method = 'spawn'
279 @staticmethod
280 def _Popen(process_obj):
281 from .popen_spawn_posix import Popen
282 return Popen(process_obj)
283
284 class ForkServerProcess(process.BaseProcess):
285 _start_method = 'forkserver'
286 @staticmethod
287 def _Popen(process_obj):
288 from .popen_forkserver import Popen
289 return Popen(process_obj)
290
291 class ForkContext(BaseContext):
292 _name = 'fork'
293 Process = ForkProcess
294
295 class SpawnContext(BaseContext):
296 _name = 'spawn'
297 Process = SpawnProcess
298
299 class ForkServerContext(BaseContext):
300 _name = 'forkserver'
301 Process = ForkServerProcess
302 def _check_available(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100303 if not reduction.HAVE_SEND_HANDLE:
304 raise ValueError('forkserver start method not available')
305
306 _concrete_contexts = {
307 'fork': ForkContext(),
308 'spawn': SpawnContext(),
309 'forkserver': ForkServerContext(),
310 }
311 _default_context = DefaultContext(_concrete_contexts['fork'])
312
313else:
314
315 class SpawnProcess(process.BaseProcess):
316 _start_method = 'spawn'
317 @staticmethod
318 def _Popen(process_obj):
319 from .popen_spawn_win32 import Popen
320 return Popen(process_obj)
321
322 class SpawnContext(BaseContext):
323 _name = 'spawn'
324 Process = SpawnProcess
325
326 _concrete_contexts = {
327 'spawn': SpawnContext(),
328 }
329 _default_context = DefaultContext(_concrete_contexts['spawn'])
330
331#
332# Force the start method
333#
334
335def _force_start_method(method):
336 _default_context._actual_context = _concrete_contexts[method]
337
338#
339# Check that the current thread is spawning a child process
340#
341
342_tls = threading.local()
343
344def get_spawning_popen():
345 return getattr(_tls, 'spawning_popen', None)
346
347def set_spawning_popen(popen):
348 _tls.spawning_popen = popen
349
350def assert_spawning(obj):
351 if get_spawning_popen() is None:
352 raise RuntimeError(
353 '%s objects should only be shared between processes'
354 ' through inheritance' % type(obj).__name__
355 )