blob: 623f6fb733a57f5aa08939033b46037cd3eae1c6 [file] [log] [blame]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01001import os
2import sys
3import threading
4
5from . import process
Davin Potts54586472016-09-09 18:03:10 -05006from . import reduction
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01007
8__all__ = [] # things are copied from here to __init__.py
9
10#
11# Exceptions
12#
13
14class ProcessError(Exception):
15 pass
16
17class BufferTooShort(ProcessError):
18 pass
19
20class TimeoutError(ProcessError):
21 pass
22
23class AuthenticationError(ProcessError):
24 pass
25
26#
27# Base type for contexts
28#
29
30class BaseContext(object):
31
32 ProcessError = ProcessError
33 BufferTooShort = BufferTooShort
34 TimeoutError = TimeoutError
35 AuthenticationError = AuthenticationError
36
37 current_process = staticmethod(process.current_process)
38 active_children = staticmethod(process.active_children)
39
40 def cpu_count(self):
41 '''Returns the number of CPUs in the system'''
42 num = os.cpu_count()
43 if num is None:
44 raise NotImplementedError('cannot determine number of cpus')
45 else:
46 return num
47
48 def Manager(self):
49 '''Returns a manager associated with a running server process
50
51 The managers methods such as `Lock()`, `Condition()` and `Queue()`
52 can be used to create shared objects.
53 '''
54 from .managers import SyncManager
55 m = SyncManager(ctx=self.get_context())
56 m.start()
57 return m
58
59 def Pipe(self, duplex=True):
60 '''Returns two connection object connected by a pipe'''
61 from .connection import Pipe
62 return Pipe(duplex)
63
64 def Lock(self):
65 '''Returns a non-recursive lock object'''
66 from .synchronize import Lock
67 return Lock(ctx=self.get_context())
68
69 def RLock(self):
70 '''Returns a recursive lock object'''
71 from .synchronize import RLock
72 return RLock(ctx=self.get_context())
73
74 def Condition(self, lock=None):
75 '''Returns a condition object'''
76 from .synchronize import Condition
77 return Condition(lock, ctx=self.get_context())
78
79 def Semaphore(self, value=1):
80 '''Returns a semaphore object'''
81 from .synchronize import Semaphore
82 return Semaphore(value, ctx=self.get_context())
83
84 def BoundedSemaphore(self, value=1):
85 '''Returns a bounded semaphore object'''
86 from .synchronize import BoundedSemaphore
87 return BoundedSemaphore(value, ctx=self.get_context())
88
89 def Event(self):
90 '''Returns an event object'''
91 from .synchronize import Event
92 return Event(ctx=self.get_context())
93
94 def Barrier(self, parties, action=None, timeout=None):
95 '''Returns a barrier object'''
96 from .synchronize import Barrier
97 return Barrier(parties, action, timeout, ctx=self.get_context())
98
99 def Queue(self, maxsize=0):
100 '''Returns a queue object'''
101 from .queues import Queue
102 return Queue(maxsize, ctx=self.get_context())
103
104 def JoinableQueue(self, maxsize=0):
105 '''Returns a queue object'''
106 from .queues import JoinableQueue
107 return JoinableQueue(maxsize, ctx=self.get_context())
108
109 def SimpleQueue(self):
110 '''Returns a queue object'''
111 from .queues import SimpleQueue
112 return SimpleQueue(ctx=self.get_context())
113
114 def Pool(self, processes=None, initializer=None, initargs=(),
115 maxtasksperchild=None):
116 '''Returns a process pool object'''
117 from .pool import Pool
118 return Pool(processes, initializer, initargs, maxtasksperchild,
119 context=self.get_context())
120
121 def RawValue(self, typecode_or_type, *args):
122 '''Returns a shared object'''
123 from .sharedctypes import RawValue
124 return RawValue(typecode_or_type, *args)
125
126 def RawArray(self, typecode_or_type, size_or_initializer):
127 '''Returns a shared array'''
128 from .sharedctypes import RawArray
129 return RawArray(typecode_or_type, size_or_initializer)
130
131 def Value(self, typecode_or_type, *args, lock=True):
132 '''Returns a synchronized shared object'''
133 from .sharedctypes import Value
134 return Value(typecode_or_type, *args, lock=lock,
135 ctx=self.get_context())
136
137 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
138 '''Returns a synchronized shared array'''
139 from .sharedctypes import Array
140 return Array(typecode_or_type, size_or_initializer, lock=lock,
141 ctx=self.get_context())
142
143 def freeze_support(self):
144 '''Check whether this is a fake forked process in a frozen executable.
145 If so then run code specified by commandline and exit.
146 '''
147 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
148 from .spawn import freeze_support
149 freeze_support()
150
151 def get_logger(self):
152 '''Return package logger -- if it does not already exist then
153 it is created.
154 '''
155 from .util import get_logger
156 return get_logger()
157
158 def log_to_stderr(self, level=None):
159 '''Turn on logging and add a handler which prints to stderr'''
160 from .util import log_to_stderr
161 return log_to_stderr(level)
162
163 def allow_connection_pickling(self):
164 '''Install support for sending connections and sockets
165 between processes
166 '''
167 # This is undocumented. In previous versions of multiprocessing
168 # its only effect was to make socket objects inheritable on Windows.
169 from . import connection
170
171 def set_executable(self, executable):
172 '''Sets the path to a python.exe or pythonw.exe binary used to run
173 child processes instead of sys.executable when using the 'spawn'
174 start method. Useful for people embedding Python.
175 '''
176 from .spawn import set_executable
177 set_executable(executable)
178
179 def set_forkserver_preload(self, module_names):
180 '''Set list of module names to try to load in forkserver process.
181 This is really just a hint.
182 '''
183 from .forkserver import set_forkserver_preload
184 set_forkserver_preload(module_names)
185
186 def get_context(self, method=None):
187 if method is None:
188 return self
189 try:
190 ctx = _concrete_contexts[method]
191 except KeyError:
192 raise ValueError('cannot find context for %r' % method)
193 ctx._check_available()
194 return ctx
195
196 def get_start_method(self, allow_none=False):
197 return self._name
198
Antoine Pitroucd2a2012016-12-10 17:13:16 +0100199 def set_start_method(self, method, force=False):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100200 raise ValueError('cannot set start method of concrete context')
201
Davin Potts54586472016-09-09 18:03:10 -0500202 @property
203 def reducer(self):
204 '''Controls how objects will be reduced to a form that can be
205 shared with other processes.'''
206 return globals().get('reduction')
207
208 @reducer.setter
209 def reducer(self, reduction):
210 globals()['reduction'] = reduction
211
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100212 def _check_available(self):
213 pass
214
215#
216# Type of default context -- underlying context can be set at most once
217#
218
219class Process(process.BaseProcess):
220 _start_method = None
221 @staticmethod
222 def _Popen(process_obj):
223 return _default_context.get_context().Process._Popen(process_obj)
224
225class DefaultContext(BaseContext):
226 Process = Process
227
228 def __init__(self, context):
229 self._default_context = context
230 self._actual_context = None
231
232 def get_context(self, method=None):
233 if method is None:
234 if self._actual_context is None:
235 self._actual_context = self._default_context
236 return self._actual_context
237 else:
238 return super().get_context(method)
239
240 def set_start_method(self, method, force=False):
241 if self._actual_context is not None and not force:
242 raise RuntimeError('context has already been set')
243 if method is None and force:
244 self._actual_context = None
245 return
246 self._actual_context = self.get_context(method)
247
248 def get_start_method(self, allow_none=False):
249 if self._actual_context is None:
250 if allow_none:
251 return None
252 self._actual_context = self._default_context
253 return self._actual_context._name
254
255 def get_all_start_methods(self):
256 if sys.platform == 'win32':
257 return ['spawn']
258 else:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100259 if reduction.HAVE_SEND_HANDLE:
260 return ['fork', 'spawn', 'forkserver']
261 else:
262 return ['fork', 'spawn']
263
264DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_')
265
266#
267# Context types for fixed start method
268#
269
270if sys.platform != 'win32':
271
272 class ForkProcess(process.BaseProcess):
273 _start_method = 'fork'
274 @staticmethod
275 def _Popen(process_obj):
276 from .popen_fork import Popen
277 return Popen(process_obj)
278
279 class SpawnProcess(process.BaseProcess):
280 _start_method = 'spawn'
281 @staticmethod
282 def _Popen(process_obj):
283 from .popen_spawn_posix import Popen
284 return Popen(process_obj)
285
286 class ForkServerProcess(process.BaseProcess):
287 _start_method = 'forkserver'
288 @staticmethod
289 def _Popen(process_obj):
290 from .popen_forkserver import Popen
291 return Popen(process_obj)
292
293 class ForkContext(BaseContext):
294 _name = 'fork'
295 Process = ForkProcess
296
297 class SpawnContext(BaseContext):
298 _name = 'spawn'
299 Process = SpawnProcess
300
301 class ForkServerContext(BaseContext):
302 _name = 'forkserver'
303 Process = ForkServerProcess
304 def _check_available(self):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100305 if not reduction.HAVE_SEND_HANDLE:
306 raise ValueError('forkserver start method not available')
307
308 _concrete_contexts = {
309 'fork': ForkContext(),
310 'spawn': SpawnContext(),
311 'forkserver': ForkServerContext(),
312 }
313 _default_context = DefaultContext(_concrete_contexts['fork'])
314
315else:
316
317 class SpawnProcess(process.BaseProcess):
318 _start_method = 'spawn'
319 @staticmethod
320 def _Popen(process_obj):
321 from .popen_spawn_win32 import Popen
322 return Popen(process_obj)
323
324 class SpawnContext(BaseContext):
325 _name = 'spawn'
326 Process = SpawnProcess
327
328 _concrete_contexts = {
329 'spawn': SpawnContext(),
330 }
331 _default_context = DefaultContext(_concrete_contexts['spawn'])
332
333#
334# Force the start method
335#
336
337def _force_start_method(method):
338 _default_context._actual_context = _concrete_contexts[method]
339
340#
341# Check that the current thread is spawning a child process
342#
343
344_tls = threading.local()
345
346def get_spawning_popen():
347 return getattr(_tls, 'spawning_popen', None)
348
349def set_spawning_popen(popen):
350 _tls.spawning_popen = popen
351
352def assert_spawning(obj):
353 if get_spawning_popen() is None:
354 raise RuntimeError(
355 '%s objects should only be shared between processes'
356 ' through inheritance' % type(obj).__name__
357 )