blob: 63849f9d16e362fdda37caa8892aa204ee68b4e4 [file] [log] [blame]
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01001import os
2import sys
3import threading
4
5from . import process
6
7__all__ = [] # things are copied from here to __init__.py
8
9#
10# Exceptions
11#
12
13class ProcessError(Exception):
14 pass
15
16class BufferTooShort(ProcessError):
17 pass
18
19class TimeoutError(ProcessError):
20 pass
21
22class AuthenticationError(ProcessError):
23 pass
24
25#
26# Base type for contexts
27#
28
29class BaseContext(object):
30
31 ProcessError = ProcessError
32 BufferTooShort = BufferTooShort
33 TimeoutError = TimeoutError
34 AuthenticationError = AuthenticationError
35
36 current_process = staticmethod(process.current_process)
37 active_children = staticmethod(process.active_children)
38
39 def cpu_count(self):
40 '''Returns the number of CPUs in the system'''
41 num = os.cpu_count()
42 if num is None:
43 raise NotImplementedError('cannot determine number of cpus')
44 else:
45 return num
46
47 def Manager(self):
48 '''Returns a manager associated with a running server process
49
50 The managers methods such as `Lock()`, `Condition()` and `Queue()`
51 can be used to create shared objects.
52 '''
53 from .managers import SyncManager
54 m = SyncManager(ctx=self.get_context())
55 m.start()
56 return m
57
58 def Pipe(self, duplex=True):
59 '''Returns two connection object connected by a pipe'''
60 from .connection import Pipe
61 return Pipe(duplex)
62
63 def Lock(self):
64 '''Returns a non-recursive lock object'''
65 from .synchronize import Lock
66 return Lock(ctx=self.get_context())
67
68 def RLock(self):
69 '''Returns a recursive lock object'''
70 from .synchronize import RLock
71 return RLock(ctx=self.get_context())
72
73 def Condition(self, lock=None):
74 '''Returns a condition object'''
75 from .synchronize import Condition
76 return Condition(lock, ctx=self.get_context())
77
78 def Semaphore(self, value=1):
79 '''Returns a semaphore object'''
80 from .synchronize import Semaphore
81 return Semaphore(value, ctx=self.get_context())
82
83 def BoundedSemaphore(self, value=1):
84 '''Returns a bounded semaphore object'''
85 from .synchronize import BoundedSemaphore
86 return BoundedSemaphore(value, ctx=self.get_context())
87
88 def Event(self):
89 '''Returns an event object'''
90 from .synchronize import Event
91 return Event(ctx=self.get_context())
92
93 def Barrier(self, parties, action=None, timeout=None):
94 '''Returns a barrier object'''
95 from .synchronize import Barrier
96 return Barrier(parties, action, timeout, ctx=self.get_context())
97
98 def Queue(self, maxsize=0):
99 '''Returns a queue object'''
100 from .queues import Queue
101 return Queue(maxsize, ctx=self.get_context())
102
103 def JoinableQueue(self, maxsize=0):
104 '''Returns a queue object'''
105 from .queues import JoinableQueue
106 return JoinableQueue(maxsize, ctx=self.get_context())
107
108 def SimpleQueue(self):
109 '''Returns a queue object'''
110 from .queues import SimpleQueue
111 return SimpleQueue(ctx=self.get_context())
112
113 def Pool(self, processes=None, initializer=None, initargs=(),
114 maxtasksperchild=None):
115 '''Returns a process pool object'''
116 from .pool import Pool
117 return Pool(processes, initializer, initargs, maxtasksperchild,
118 context=self.get_context())
119
120 def RawValue(self, typecode_or_type, *args):
121 '''Returns a shared object'''
122 from .sharedctypes import RawValue
123 return RawValue(typecode_or_type, *args)
124
125 def RawArray(self, typecode_or_type, size_or_initializer):
126 '''Returns a shared array'''
127 from .sharedctypes import RawArray
128 return RawArray(typecode_or_type, size_or_initializer)
129
130 def Value(self, typecode_or_type, *args, lock=True):
131 '''Returns a synchronized shared object'''
132 from .sharedctypes import Value
133 return Value(typecode_or_type, *args, lock=lock,
134 ctx=self.get_context())
135
136 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
137 '''Returns a synchronized shared array'''
138 from .sharedctypes import Array
139 return Array(typecode_or_type, size_or_initializer, lock=lock,
140 ctx=self.get_context())
141
142 def freeze_support(self):
143 '''Check whether this is a fake forked process in a frozen executable.
144 If so then run code specified by commandline and exit.
145 '''
146 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
147 from .spawn import freeze_support
148 freeze_support()
149
150 def get_logger(self):
151 '''Return package logger -- if it does not already exist then
152 it is created.
153 '''
154 from .util import get_logger
155 return get_logger()
156
157 def log_to_stderr(self, level=None):
158 '''Turn on logging and add a handler which prints to stderr'''
159 from .util import log_to_stderr
160 return log_to_stderr(level)
161
162 def allow_connection_pickling(self):
163 '''Install support for sending connections and sockets
164 between processes
165 '''
166 # This is undocumented. In previous versions of multiprocessing
167 # its only effect was to make socket objects inheritable on Windows.
168 from . import connection
169
170 def set_executable(self, executable):
171 '''Sets the path to a python.exe or pythonw.exe binary used to run
172 child processes instead of sys.executable when using the 'spawn'
173 start method. Useful for people embedding Python.
174 '''
175 from .spawn import set_executable
176 set_executable(executable)
177
178 def set_forkserver_preload(self, module_names):
179 '''Set list of module names to try to load in forkserver process.
180 This is really just a hint.
181 '''
182 from .forkserver import set_forkserver_preload
183 set_forkserver_preload(module_names)
184
185 def get_context(self, method=None):
186 if method is None:
187 return self
188 try:
189 ctx = _concrete_contexts[method]
190 except KeyError:
191 raise ValueError('cannot find context for %r' % method)
192 ctx._check_available()
193 return ctx
194
195 def get_start_method(self, allow_none=False):
196 return self._name
197
198 def set_start_method(self, method=None):
199 raise ValueError('cannot set start method of concrete context')
200
201 def _check_available(self):
202 pass
203
204#
205# Type of default context -- underlying context can be set at most once
206#
207
208class Process(process.BaseProcess):
209 _start_method = None
210 @staticmethod
211 def _Popen(process_obj):
212 return _default_context.get_context().Process._Popen(process_obj)
213
214class DefaultContext(BaseContext):
215 Process = Process
216
217 def __init__(self, context):
218 self._default_context = context
219 self._actual_context = None
220
221 def get_context(self, method=None):
222 if method is None:
223 if self._actual_context is None:
224 self._actual_context = self._default_context
225 return self._actual_context
226 else:
227 return super().get_context(method)
228
229 def set_start_method(self, method, force=False):
230 if self._actual_context is not None and not force:
231 raise RuntimeError('context has already been set')
232 if method is None and force:
233 self._actual_context = None
234 return
235 self._actual_context = self.get_context(method)
236
237 def get_start_method(self, allow_none=False):
238 if self._actual_context is None:
239 if allow_none:
240 return None
241 self._actual_context = self._default_context
242 return self._actual_context._name
243
244 def get_all_start_methods(self):
245 if sys.platform == 'win32':
246 return ['spawn']
247 else:
248 from . import reduction
249 if reduction.HAVE_SEND_HANDLE:
250 return ['fork', 'spawn', 'forkserver']
251 else:
252 return ['fork', 'spawn']
253
254DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_')
255
256#
257# Context types for fixed start method
258#
259
260if sys.platform != 'win32':
261
262 class ForkProcess(process.BaseProcess):
263 _start_method = 'fork'
264 @staticmethod
265 def _Popen(process_obj):
266 from .popen_fork import Popen
267 return Popen(process_obj)
268
269 class SpawnProcess(process.BaseProcess):
270 _start_method = 'spawn'
271 @staticmethod
272 def _Popen(process_obj):
273 from .popen_spawn_posix import Popen
274 return Popen(process_obj)
275
276 class ForkServerProcess(process.BaseProcess):
277 _start_method = 'forkserver'
278 @staticmethod
279 def _Popen(process_obj):
280 from .popen_forkserver import Popen
281 return Popen(process_obj)
282
283 class ForkContext(BaseContext):
284 _name = 'fork'
285 Process = ForkProcess
286
287 class SpawnContext(BaseContext):
288 _name = 'spawn'
289 Process = SpawnProcess
290
291 class ForkServerContext(BaseContext):
292 _name = 'forkserver'
293 Process = ForkServerProcess
294 def _check_available(self):
295 from . import reduction
296 if not reduction.HAVE_SEND_HANDLE:
297 raise ValueError('forkserver start method not available')
298
299 _concrete_contexts = {
300 'fork': ForkContext(),
301 'spawn': SpawnContext(),
302 'forkserver': ForkServerContext(),
303 }
304 _default_context = DefaultContext(_concrete_contexts['fork'])
305
306else:
307
308 class SpawnProcess(process.BaseProcess):
309 _start_method = 'spawn'
310 @staticmethod
311 def _Popen(process_obj):
312 from .popen_spawn_win32 import Popen
313 return Popen(process_obj)
314
315 class SpawnContext(BaseContext):
316 _name = 'spawn'
317 Process = SpawnProcess
318
319 _concrete_contexts = {
320 'spawn': SpawnContext(),
321 }
322 _default_context = DefaultContext(_concrete_contexts['spawn'])
323
324#
325# Force the start method
326#
327
328def _force_start_method(method):
329 _default_context._actual_context = _concrete_contexts[method]
330
331#
332# Check that the current thread is spawning a child process
333#
334
335_tls = threading.local()
336
337def get_spawning_popen():
338 return getattr(_tls, 'spawning_popen', None)
339
340def set_spawning_popen(popen):
341 _tls.spawning_popen = popen
342
343def assert_spawning(obj):
344 if get_spawning_popen() is None:
345 raise RuntimeError(
346 '%s objects should only be shared between processes'
347 ' through inheritance' % type(obj).__name__
348 )