blob: ea04d4d81224222cb097d4b40a9e3267da0c5c4e [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001:mod:`multiprocessing` --- Process-based "threading" interface
2==============================================================
3
4.. module:: multiprocessing
5 :synopsis: Process-based "threading" interface.
6
7.. versionadded:: 2.6
8
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00009
Benjamin Peterson190d56e2008-06-11 02:40:25 +000010Introduction
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +000011----------------------
Benjamin Peterson190d56e2008-06-11 02:40:25 +000012
Benjamin Peterson910c2ab2008-06-27 23:22:06 +000013:mod:`multiprocessing` is a package that supports spawning processes using an
14API similar to the :mod:`threading` module. The :mod:`multiprocessing` package
15offers both local and remote concurrency, effectively side-stepping the
16:term:`Global Interpreter Lock` by using subprocesses instead of threads. Due
17to this, the :mod:`multiprocessing` module allows the programmer to fully
18leverage multiple processors on a given machine. It runs on both Unix and
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +000019Windows.
Benjamin Peterson190d56e2008-06-11 02:40:25 +000020
Benjamin Peterson910c2ab2008-06-27 23:22:06 +000021
Benjamin Peterson190d56e2008-06-11 02:40:25 +000022The :class:`Process` class
23~~~~~~~~~~~~~~~~~~~~~~~~~~
24
25In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process`
Benjamin Peterson910c2ab2008-06-27 23:22:06 +000026object and then calling its :meth:`~Process.start` method. :class:`Process`
Benjamin Peterson190d56e2008-06-11 02:40:25 +000027follows the API of :class:`threading.Thread`. A trivial example of a
28multiprocess program is ::
29
30 from multiprocessing import Process
31
32 def f(name):
33 print 'hello', name
34
35 if __name__ == '__main__':
36 p = Process(target=f, args=('bob',))
37 p.start()
38 p.join()
39
40Here the function ``f`` is run in a child process.
41
42For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
43necessary, see :ref:`multiprocessing-programming`.
44
45
46
47Exchanging objects between processes
48~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
49
50:mod:`multiprocessing` supports two types of communication channel between
51processes:
52
53**Queues**
54
55 The :class:`Queue` class is a near clone of :class:`Queue.Queue`. For
56 example::
57
58 from multiprocessing import Process, Queue
59
60 def f(q):
61 q.put([42, None, 'hello'])
62
63 if __name__ == '__main__':
64 q = Queue()
65 p = Process(target=f, args=(q,))
66 p.start()
67 print q.get() # prints "[42, None, 'hello']"
68 p.join()
69
70 Queues are thread and process safe.
71
72**Pipes**
73
74 The :func:`Pipe` function returns a pair of connection objects connected by a
75 pipe which by default is duplex (two-way). For example::
76
77 from multiprocessing import Process, Pipe
78
79 def f(conn):
80 conn.send([42, None, 'hello'])
81 conn.close()
82
83 if __name__ == '__main__':
84 parent_conn, child_conn = Pipe()
85 p = Process(target=f, args=(child_conn,))
86 p.start()
87 print parent_conn.recv() # prints "[42, None, 'hello']"
88 p.join()
89
90 The two connection objects returned by :func:`Pipe` represent the two ends of
Benjamin Peterson910c2ab2008-06-27 23:22:06 +000091 the pipe. Each connection object has :meth:`~Connection.send` and
92 :meth:`~Connection.recv` methods (among others). Note that data in a pipe
93 may become corrupted if two processes (or threads) try to read from or write
94 to the *same* end of the pipe at the same time. Of course there is no risk
95 of corruption from processes using different ends of the pipe at the same
96 time.
Benjamin Peterson190d56e2008-06-11 02:40:25 +000097
98
99Synchronization between processes
100~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
101
102:mod:`multiprocessing` contains equivalents of all the synchronization
103primitives from :mod:`threading`. For instance one can use a lock to ensure
104that only one process prints to standard output at a time::
105
106 from multiprocessing import Process, Lock
107
108 def f(l, i):
109 l.acquire()
110 print 'hello world', i
111 l.release()
112
113 if __name__ == '__main__':
114 lock = Lock()
115
116 for num in range(10):
117 Process(target=f, args=(lock, num)).start()
118
119Without using the lock output from the different processes is liable to get all
120mixed up.
121
122
123Sharing state between processes
124~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
125
126As mentioned above, when doing concurrent programming it is usually best to
127avoid using shared state as far as possible. This is particularly true when
128using multiple processes.
129
130However, if you really do need to use some shared data then
131:mod:`multiprocessing` provides a couple of ways of doing so.
132
133**Shared memory**
134
135 Data can be stored in a shared memory map using :class:`Value` or
136 :class:`Array`. For example, the following code ::
137
138 from multiprocessing import Process, Value, Array
139
140 def f(n, a):
141 n.value = 3.1415927
142 for i in range(len(a)):
143 a[i] = -a[i]
144
145 if __name__ == '__main__':
146 num = Value('d', 0.0)
147 arr = Array('i', range(10))
148
149 p = Process(target=f, args=(num, arr))
150 p.start()
151 p.join()
152
153 print num.value
154 print arr[:]
155
156 will print ::
157
158 3.1415927
159 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
160
161 The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are
162 typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a
Benjamin Peterson90f36732008-07-12 20:16:19 +0000163 double precision float and ``'i'`` indicates a signed integer. These shared
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000164 objects will be process and thread safe.
165
166 For more flexibility in using shared memory one can use the
167 :mod:`multiprocessing.sharedctypes` module which supports the creation of
168 arbitrary ctypes objects allocated from shared memory.
169
170**Server process**
171
172 A manager object returned by :func:`Manager` controls a server process which
Andrew M. Kuchlingded01d12008-07-14 00:35:32 +0000173 holds Python objects and allows other processes to manipulate them using
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000174 proxies.
175
176 A manager returned by :func:`Manager` will support types :class:`list`,
177 :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`,
178 :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
179 :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For
180 example, ::
181
182 from multiprocessing import Process, Manager
183
184 def f(d, l):
185 d[1] = '1'
186 d['2'] = 2
187 d[0.25] = None
188 l.reverse()
189
190 if __name__ == '__main__':
191 manager = Manager()
192
193 d = manager.dict()
194 l = manager.list(range(10))
195
196 p = Process(target=f, args=(d, l))
197 p.start()
198 p.join()
199
200 print d
201 print l
202
203 will print ::
204
205 {0.25: None, 1: '1', '2': 2}
206 [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
207
208 Server process managers are more flexible than using shared memory objects
209 because they can be made to support arbitrary object types. Also, a single
210 manager can be shared by processes on different computers over a network.
211 They are, however, slower than using shared memory.
212
213
214Using a pool of workers
215~~~~~~~~~~~~~~~~~~~~~~~
216
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000217The :class:`~multiprocessing.pool.Pool` class represents a pool of worker
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000218processes. It has methods which allows tasks to be offloaded to the worker
219processes in a few different ways.
220
221For example::
222
223 from multiprocessing import Pool
224
225 def f(x):
226 return x*x
227
228 if __name__ == '__main__':
229 pool = Pool(processes=4) # start 4 worker processes
230 result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously
231 print result.get(timeout=1) # prints "100" unless your computer is *very* slow
232 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
233
234
235Reference
236---------
237
238The :mod:`multiprocessing` package mostly replicates the API of the
239:mod:`threading` module.
240
241
242:class:`Process` and exceptions
243~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
244
245.. class:: Process([group[, target[, name[, args[, kwargs]]]]])
246
247 Process objects represent activity that is run in a separate process. The
248 :class:`Process` class has equivalents of all the methods of
249 :class:`threading.Thread`.
250
251 The constructor should always be called with keyword arguments. *group*
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +0000252 should always be ``None``; it exists solely for compatibility with
Benjamin Peterson73641d72008-08-20 14:07:59 +0000253 :class:`threading.Thread`. *target* is the callable object to be invoked by
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000254 the :meth:`run()` method. It defaults to ``None``, meaning nothing is
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000255 called. *name* is the process name. By default, a unique name is constructed
256 of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
257 :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
258 is determined by the *generation* of the process. *args* is the argument
259 tuple for the target invocation. *kwargs* is a dictionary of keyword
260 arguments for the target invocation. By default, no arguments are passed to
261 *target*.
262
263 If a subclass overrides the constructor, it must make sure it invokes the
264 base class constructor (:meth:`Process.__init__`) before doing anything else
265 to the process.
266
267 .. method:: run()
268
269 Method representing the process's activity.
270
271 You may override this method in a subclass. The standard :meth:`run`
272 method invokes the callable object passed to the object's constructor as
273 the target argument, if any, with sequential and keyword arguments taken
274 from the *args* and *kwargs* arguments, respectively.
275
276 .. method:: start()
277
278 Start the process's activity.
279
280 This must be called at most once per process object. It arranges for the
281 object's :meth:`run` method to be invoked in a separate process.
282
283 .. method:: join([timeout])
284
285 Block the calling thread until the process whose :meth:`join` method is
286 called terminates or until the optional timeout occurs.
287
288 If *timeout* is ``None`` then there is no timeout.
289
290 A process can be joined many times.
291
292 A process cannot join itself because this would cause a deadlock. It is
293 an error to attempt to join a process before it has been started.
294
Benjamin Peterson73641d72008-08-20 14:07:59 +0000295 .. attribute:: name
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000296
Benjamin Peterson73641d72008-08-20 14:07:59 +0000297 The process's name.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000298
299 The name is a string used for identification purposes only. It has no
300 semantics. Multiple processes may be given the same name. The initial
301 name is set by the constructor.
302
303 .. method:: is_alive()
304
305 Return whether the process is alive.
306
307 Roughly, a process object is alive from the moment the :meth:`start`
308 method returns until the child process terminates.
309
Benjamin Peterson73641d72008-08-20 14:07:59 +0000310 .. attribute:: daemon
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000311
Benjamin Peterson73641d72008-08-20 14:07:59 +0000312 The process's daemon flag, a Boolean value. This must be called before
313 :meth:`start` is called.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000314
315 The initial value is inherited from the creating process.
316
317 When a process exits, it attempts to terminate all of its daemonic child
318 processes.
319
320 Note that a daemonic process is not allowed to create child processes.
321 Otherwise a daemonic process would leave its children orphaned if it gets
322 terminated when its parent process exits.
323
324 In addition process objects also support the following methods:
325
Benjamin Peterson73641d72008-08-20 14:07:59 +0000326 .. attribute:: pid
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000327
328 Return the process ID. Before the process is spawned, this will be
329 ``None``.
330
Benjamin Peterson73641d72008-08-20 14:07:59 +0000331 .. attribute:: exitcode
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000332
Benjamin Peterson73641d72008-08-20 14:07:59 +0000333 The child's exit code. This will be ``None`` if the process has not yet
334 terminated. A negative value *-N* indicates that the child was terminated
335 by signal *N*.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000336
Benjamin Peterson73641d72008-08-20 14:07:59 +0000337 .. attribute:: authkey
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000338
Benjamin Peterson73641d72008-08-20 14:07:59 +0000339 The process's authentication key (a byte string).
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000340
341 When :mod:`multiprocessing` is initialized the main process is assigned a
342 random string using :func:`os.random`.
343
344 When a :class:`Process` object is created, it will inherit the
Benjamin Peterson73641d72008-08-20 14:07:59 +0000345 authentication key of its parent process, although this may be changed by
346 setting :attr:`authkey` to another byte string.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000347
348 See :ref:`multiprocessing-auth-keys`.
349
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +0000350 .. method:: terminate()
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000351
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +0000352 Terminate the process. On Unix this is done using the ``SIGTERM`` signal;
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000353 on Windows :cfunc:`TerminateProcess` is used. Note that exit handlers and
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +0000354 finally clauses, etc., will not be executed.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000355
356 Note that descendant processes of the process will *not* be terminated --
357 they will simply become orphaned.
358
359 .. warning::
360
361 If this method is used when the associated process is using a pipe or
362 queue then the pipe or queue is liable to become corrupted and may
363 become unusable by other process. Similarly, if the process has
364 acquired a lock or semaphore etc. then terminating it is liable to
365 cause other processes to deadlock.
366
367 Note that the :meth:`start`, :meth:`join`, :meth:`is_alive` and
Benjamin Peterson73641d72008-08-20 14:07:59 +0000368 :attr:`exit_code` methods should only be called by the process that created
369 the process object.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000370
371 Example usage of some of the methods of :class:`Process`::
372
373 >>> import processing, time, signal
374 >>> p = processing.Process(target=time.sleep, args=(1000,))
375 >>> print p, p.is_alive()
376 <Process(Process-1, initial)> False
377 >>> p.start()
378 >>> print p, p.is_alive()
379 <Process(Process-1, started)> True
380 >>> p.terminate()
381 >>> print p, p.is_alive()
382 <Process(Process-1, stopped[SIGTERM])> False
Benjamin Peterson73641d72008-08-20 14:07:59 +0000383 >>> p.exitcode == -signal.SIGTERM
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000384 True
385
386
387.. exception:: BufferTooShort
388
389 Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied
390 buffer object is too small for the message read.
391
392 If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
393 the message as a byte string.
394
395
396Pipes and Queues
397~~~~~~~~~~~~~~~~
398
399When using multiple processes, one generally uses message passing for
400communication between processes and avoids having to use any synchronization
401primitives like locks.
402
403For passing messages one can use :func:`Pipe` (for a connection between two
404processes) or a queue (which allows multiple producers and consumers).
405
406The :class:`Queue` and :class:`JoinableQueue` types are multi-producer,
407multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the
408standard library. They differ in that :class:`Queue` lacks the
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000409:meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join` methods introduced
410into Python 2.5's :class:`Queue.Queue` class.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000411
412If you use :class:`JoinableQueue` then you **must** call
413:meth:`JoinableQueue.task_done` for each task removed from the queue or else the
414semaphore used to count the number of unfinished tasks may eventually overflow
415raising an exception.
416
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000417Note that one can also create a shared queue by using a manager object -- see
418:ref:`multiprocessing-managers`.
419
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000420.. note::
421
422 :mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and
423 :exc:`Queue.Full` exceptions to signal a timeout. They are not available in
424 the :mod:`multiprocessing` namespace so you need to import them from
425 :mod:`Queue`.
426
427
428.. warning::
429
430 If a process is killed using :meth:`Process.terminate` or :func:`os.kill`
431 while it is trying to use a :class:`Queue`, then the data in the queue is
432 likely to become corrupted. This may cause any other processes to get an
433 exception when it tries to use the queue later on.
434
435.. warning::
436
437 As mentioned above, if a child process has put items on a queue (and it has
438 not used :meth:`JoinableQueue.cancel_join_thread`), then that process will
439 not terminate until all buffered items have been flushed to the pipe.
440
441 This means that if you try joining that process you may get a deadlock unless
442 you are sure that all items which have been put on the queue have been
443 consumed. Similarly, if the child process is non-daemonic then the parent
Andrew M. Kuchlingded01d12008-07-14 00:35:32 +0000444 process may hang on exit when it tries to join all its non-daemonic children.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000445
446 Note that a queue created using a manager does not have this issue. See
447 :ref:`multiprocessing-programming`.
448
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000449For an example of the usage of queues for interprocess communication see
450:ref:`multiprocessing-examples`.
451
452
453.. function:: Pipe([duplex])
454
455 Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing
456 the ends of a pipe.
457
458 If *duplex* is ``True`` (the default) then the pipe is bidirectional. If
459 *duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be
460 used for receiving messages and ``conn2`` can only be used for sending
461 messages.
462
463
464.. class:: Queue([maxsize])
465
466 Returns a process shared queue implemented using a pipe and a few
467 locks/semaphores. When a process first puts an item on the queue a feeder
468 thread is started which transfers objects from a buffer into the pipe.
469
470 The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the
471 standard library's :mod:`Queue` module are raised to signal timeouts.
472
473 :class:`Queue` implements all the methods of :class:`Queue.Queue` except for
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000474 :meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000475
476 .. method:: qsize()
477
478 Return the approximate size of the queue. Because of
479 multithreading/multiprocessing semantics, this number is not reliable.
480
481 Note that this may raise :exc:`NotImplementedError` on Unix platforms like
482 MacOS X where ``sem_getvalue()`` is not implemented.
483
484 .. method:: empty()
485
486 Return ``True`` if the queue is empty, ``False`` otherwise. Because of
487 multithreading/multiprocessing semantics, this is not reliable.
488
489 .. method:: full()
490
491 Return ``True`` if the queue is full, ``False`` otherwise. Because of
492 multithreading/multiprocessing semantics, this is not reliable.
493
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +0000494 .. method:: put(item[, block[, timeout]])
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000495
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +0000496 Put item into the queue. If the optional argument *block* is ``True``
497 (the default) and *timeout* is ``None`` (the default), block if necessary until
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000498 a free slot is available. If *timeout* is a positive number, it blocks at
499 most *timeout* seconds and raises the :exc:`Queue.Full` exception if no
500 free slot was available within that time. Otherwise (*block* is
501 ``False``), put an item on the queue if a free slot is immediately
502 available, else raise the :exc:`Queue.Full` exception (*timeout* is
503 ignored in that case).
504
505 .. method:: put_nowait(item)
506
507 Equivalent to ``put(item, False)``.
508
509 .. method:: get([block[, timeout]])
510
511 Remove and return an item from the queue. If optional args *block* is
512 ``True`` (the default) and *timeout* is ``None`` (the default), block if
513 necessary until an item is available. If *timeout* is a positive number,
514 it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty`
515 exception if no item was available within that time. Otherwise (block is
516 ``False``), return an item if one is immediately available, else raise the
517 :exc:`Queue.Empty` exception (*timeout* is ignored in that case).
518
519 .. method:: get_nowait()
520 get_no_wait()
521
522 Equivalent to ``get(False)``.
523
524 :class:`multiprocessing.Queue` has a few additional methods not found in
Andrew M. Kuchlingded01d12008-07-14 00:35:32 +0000525 :class:`Queue.Queue`. These methods are usually unnecessary for most
526 code:
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000527
528 .. method:: close()
529
530 Indicate that no more data will be put on this queue by the current
531 process. The background thread will quit once it has flushed all buffered
532 data to the pipe. This is called automatically when the queue is garbage
533 collected.
534
535 .. method:: join_thread()
536
537 Join the background thread. This can only be used after :meth:`close` has
538 been called. It blocks until the background thread exits, ensuring that
539 all data in the buffer has been flushed to the pipe.
540
541 By default if a process is not the creator of the queue then on exit it
542 will attempt to join the queue's background thread. The process can call
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000543 :meth:`cancel_join_thread` to make :meth:`join_thread` do nothing.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000544
545 .. method:: cancel_join_thread()
546
547 Prevent :meth:`join_thread` from blocking. In particular, this prevents
548 the background thread from being joined automatically when the process
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000549 exits -- see :meth:`join_thread`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000550
551
552.. class:: JoinableQueue([maxsize])
553
554 :class:`JoinableQueue`, a :class:`Queue` subclass, is a queue which
555 additionally has :meth:`task_done` and :meth:`join` methods.
556
557 .. method:: task_done()
558
559 Indicate that a formerly enqueued task is complete. Used by queue consumer
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000560 threads. For each :meth:`~Queue.get` used to fetch a task, a subsequent
561 call to :meth:`task_done` tells the queue that the processing on the task
562 is complete.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000563
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000564 If a :meth:`~Queue.join` is currently blocking, it will resume when all
565 items have been processed (meaning that a :meth:`task_done` call was
566 received for every item that had been :meth:`~Queue.put` into the queue).
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000567
568 Raises a :exc:`ValueError` if called more times than there were items
569 placed in the queue.
570
571
572 .. method:: join()
573
574 Block until all items in the queue have been gotten and processed.
575
576 The count of unfinished tasks goes up whenever an item is added to the
577 queue. The count goes down whenever a consumer thread calls
578 :meth:`task_done` to indicate that the item was retrieved and all work on
579 it is complete. When the count of unfinished tasks drops to zero,
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000580 :meth:`~Queue.join` unblocks.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000581
582
583Miscellaneous
584~~~~~~~~~~~~~
585
586.. function:: active_children()
587
588 Return list of all live children of the current process.
589
590 Calling this has the side affect of "joining" any processes which have
591 already finished.
592
593.. function:: cpu_count()
594
595 Return the number of CPUs in the system. May raise
596 :exc:`NotImplementedError`.
597
598.. function:: current_process()
599
600 Return the :class:`Process` object corresponding to the current process.
601
602 An analogue of :func:`threading.current_thread`.
603
604.. function:: freeze_support()
605
606 Add support for when a program which uses :mod:`multiprocessing` has been
607 frozen to produce a Windows executable. (Has been tested with **py2exe**,
608 **PyInstaller** and **cx_Freeze**.)
609
610 One needs to call this function straight after the ``if __name__ ==
611 '__main__'`` line of the main module. For example::
612
613 from multiprocessing import Process, freeze_support
614
615 def f():
616 print 'hello world!'
617
618 if __name__ == '__main__':
619 freeze_support()
620 Process(target=f).start()
621
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000622 If the ``freeze_support()`` line is missed out then trying to run the frozen
623 executable will raise :exc:`RuntimeError`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000624
625 If the module is being run normally by the Python interpreter then
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000626 :func:`freeze_support` has no effect.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000627
628.. function:: set_executable()
629
630 Sets the path of the python interpreter to use when starting a child process.
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000631 (By default :data:`sys.executable` is used). Embedders will probably need to
632 do some thing like ::
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000633
634 setExecutable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
635
636 before they can create child processes. (Windows only)
637
638
639.. note::
640
641 :mod:`multiprocessing` contains no analogues of
642 :func:`threading.active_count`, :func:`threading.enumerate`,
643 :func:`threading.settrace`, :func:`threading.setprofile`,
644 :class:`threading.Timer`, or :class:`threading.local`.
645
646
647Connection Objects
648~~~~~~~~~~~~~~~~~~
649
650Connection objects allow the sending and receiving of picklable objects or
651strings. They can be thought of as message oriented connected sockets.
652
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000653Connection objects usually created using :func:`Pipe` -- see also
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000654:ref:`multiprocessing-listeners-clients`.
655
656.. class:: Connection
657
658 .. method:: send(obj)
659
660 Send an object to the other end of the connection which should be read
661 using :meth:`recv`.
662
663 The object must be picklable.
664
665 .. method:: recv()
666
667 Return an object sent from the other end of the connection using
668 :meth:`send`. Raises :exc:`EOFError` if there is nothing left to receive
669 and the other end was closed.
670
671 .. method:: fileno()
672
673 Returns the file descriptor or handle used by the connection.
674
675 .. method:: close()
676
677 Close the connection.
678
679 This is called automatically when the connection is garbage collected.
680
681 .. method:: poll([timeout])
682
683 Return whether there is any data available to be read.
684
685 If *timeout* is not specified then it will return immediately. If
686 *timeout* is a number then this specifies the maximum time in seconds to
687 block. If *timeout* is ``None`` then an infinite timeout is used.
688
689 .. method:: send_bytes(buffer[, offset[, size]])
690
691 Send byte data from an object supporting the buffer interface as a
692 complete message.
693
694 If *offset* is given then data is read from that position in *buffer*. If
695 *size* is given then that many bytes will be read from buffer.
696
697 .. method:: recv_bytes([maxlength])
698
699 Return a complete message of byte data sent from the other end of the
700 connection as a string. Raises :exc:`EOFError` if there is nothing left
701 to receive and the other end has closed.
702
703 If *maxlength* is specified and the message is longer than *maxlength*
704 then :exc:`IOError` is raised and the connection will no longer be
705 readable.
706
707 .. method:: recv_bytes_into(buffer[, offset])
708
709 Read into *buffer* a complete message of byte data sent from the other end
710 of the connection and return the number of bytes in the message. Raises
711 :exc:`EOFError` if there is nothing left to receive and the other end was
712 closed.
713
714 *buffer* must be an object satisfying the writable buffer interface. If
715 *offset* is given then the message will be written into the buffer from
716 *that position. Offset must be a non-negative integer less than the
717 *length of *buffer* (in bytes).
718
719 If the buffer is too short then a :exc:`BufferTooShort` exception is
720 raised and the complete message is available as ``e.args[0]`` where ``e``
721 is the exception instance.
722
723
724For example:
725
726 >>> from multiprocessing import Pipe
727 >>> a, b = Pipe()
728 >>> a.send([1, 'hello', None])
729 >>> b.recv()
730 [1, 'hello', None]
731 >>> b.send_bytes('thank you')
732 >>> a.recv_bytes()
733 'thank you'
734 >>> import array
735 >>> arr1 = array.array('i', range(5))
736 >>> arr2 = array.array('i', [0] * 10)
737 >>> a.send_bytes(arr1)
738 >>> count = b.recv_bytes_into(arr2)
739 >>> assert count == len(arr1) * arr1.itemsize
740 >>> arr2
741 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
742
743
744.. warning::
745
746 The :meth:`Connection.recv` method automatically unpickles the data it
747 receives, which can be a security risk unless you can trust the process
748 which sent the message.
749
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000750 Therefore, unless the connection object was produced using :func:`Pipe` you
751 should only use the :meth:`~Connection.recv` and :meth:`~Connection.send`
752 methods after performing some sort of authentication. See
753 :ref:`multiprocessing-auth-keys`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000754
755.. warning::
756
757 If a process is killed while it is trying to read or write to a pipe then
758 the data in the pipe is likely to become corrupted, because it may become
759 impossible to be sure where the message boundaries lie.
760
761
762Synchronization primitives
763~~~~~~~~~~~~~~~~~~~~~~~~~~
764
765Generally synchronization primitives are not as necessary in a multiprocess
Andrew M. Kuchling8ea605c2008-07-14 01:18:16 +0000766program as they are in a multithreaded program. See the documentation for
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000767:mod:`threading` module.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000768
769Note that one can also create synchronization primitives by using a manager
770object -- see :ref:`multiprocessing-managers`.
771
772.. class:: BoundedSemaphore([value])
773
774 A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`.
775
Benjamin Peterson90f36732008-07-12 20:16:19 +0000776 (On Mac OSX this is indistinguishable from :class:`Semaphore` because
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000777 ``sem_getvalue()`` is not implemented on that platform).
778
779.. class:: Condition([lock])
780
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000781 A condition variable: a clone of :class:`threading.Condition`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000782
783 If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
784 object from :mod:`multiprocessing`.
785
786.. class:: Event()
787
788 A clone of :class:`threading.Event`.
789
790.. class:: Lock()
791
792 A non-recursive lock object: a clone of :class:`threading.Lock`.
793
794.. class:: RLock()
795
796 A recursive lock object: a clone of :class:`threading.RLock`.
797
798.. class:: Semaphore([value])
799
800 A bounded semaphore object: a clone of :class:`threading.Semaphore`.
801
802.. note::
803
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000804 The :meth:`acquire` method of :class:`BoundedSemaphore`, :class:`Lock`,
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000805 :class:`RLock` and :class:`Semaphore` has a timeout parameter not supported
806 by the equivalents in :mod:`threading`. The signature is
807 ``acquire(block=True, timeout=None)`` with keyword parameters being
808 acceptable. If *block* is ``True`` and *timeout* is not ``None`` then it
809 specifies a timeout in seconds. If *block* is ``False`` then *timeout* is
810 ignored.
811
812.. note::
813
814 If the SIGINT signal generated by Ctrl-C arrives while the main thread is
815 blocked by a call to :meth:`BoundedSemaphore.acquire`, :meth:`Lock.acquire`,
816 :meth:`RLock.acquire`, :meth:`Semaphore.acquire`, :meth:`Condition.acquire`
817 or :meth:`Condition.wait` then the call will be immediately interrupted and
818 :exc:`KeyboardInterrupt` will be raised.
819
820 This differs from the behaviour of :mod:`threading` where SIGINT will be
821 ignored while the equivalent blocking calls are in progress.
822
823
824Shared :mod:`ctypes` Objects
825~~~~~~~~~~~~~~~~~~~~~~~~~~~~
826
827It is possible to create shared objects using shared memory which can be
828inherited by child processes.
829
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000830.. function:: Value(typecode_or_type[, *args, lock]])
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000831
832 Return a :mod:`ctypes` object allocated from shared memory. By default the
833 return value is actually a synchronized wrapper for the object.
834
835 *typecode_or_type* determines the type of the returned object: it is either a
836 ctypes type or a one character typecode of the kind used by the :mod:`array`
837 module. *\*args* is passed on to the constructor for the type.
838
839 If *lock* is ``True`` (the default) then a new lock object is created to
840 synchronize access to the value. If *lock* is a :class:`Lock` or
841 :class:`RLock` object then that will be used to synchronize access to the
842 value. If *lock* is ``False`` then access to the returned object will not be
843 automatically protected by a lock, so it will not necessarily be
844 "process-safe".
845
846 Note that *lock* is a keyword-only argument.
847
848.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True)
849
850 Return a ctypes array allocated from shared memory. By default the return
851 value is actually a synchronized wrapper for the array.
852
853 *typecode_or_type* determines the type of the elements of the returned array:
854 it is either a ctypes type or a one character typecode of the kind used by
855 the :mod:`array` module. If *size_or_initializer* is an integer, then it
856 determines the length of the array, and the array will be initially zeroed.
857 Otherwise, *size_or_initializer* is a sequence which is used to initialize
858 the array and whose length determines the length of the array.
859
860 If *lock* is ``True`` (the default) then a new lock object is created to
861 synchronize access to the value. If *lock* is a :class:`Lock` or
862 :class:`RLock` object then that will be used to synchronize access to the
863 value. If *lock* is ``False`` then access to the returned object will not be
864 automatically protected by a lock, so it will not necessarily be
865 "process-safe".
866
867 Note that *lock* is a keyword only argument.
868
869 Note that an array of :data:`ctypes.c_char` has *value* and *rawvalue*
870 attributes which allow one to use it to store and retrieve strings.
871
872
873The :mod:`multiprocessing.sharedctypes` module
874>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
875
876.. module:: multiprocessing.sharedctypes
877 :synopsis: Allocate ctypes objects from shared memory.
878
879The :mod:`multiprocessing.sharedctypes` module provides functions for allocating
880:mod:`ctypes` objects from shared memory which can be inherited by child
881processes.
882
883.. note::
884
Benjamin Peterson90f36732008-07-12 20:16:19 +0000885 Although it is possible to store a pointer in shared memory remember that
886 this will refer to a location in the address space of a specific process.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000887 However, the pointer is quite likely to be invalid in the context of a second
888 process and trying to dereference the pointer from the second process may
889 cause a crash.
890
891.. function:: RawArray(typecode_or_type, size_or_initializer)
892
893 Return a ctypes array allocated from shared memory.
894
895 *typecode_or_type* determines the type of the elements of the returned array:
896 it is either a ctypes type or a one character typecode of the kind used by
897 the :mod:`array` module. If *size_or_initializer* is an integer then it
898 determines the length of the array, and the array will be initially zeroed.
899 Otherwise *size_or_initializer* is a sequence which is used to initialize the
900 array and whose length determines the length of the array.
901
902 Note that setting and getting an element is potentially non-atomic -- use
903 :func:`Array` instead to make sure that access is automatically synchronized
904 using a lock.
905
906.. function:: RawValue(typecode_or_type, *args)
907
908 Return a ctypes object allocated from shared memory.
909
910 *typecode_or_type* determines the type of the returned object: it is either a
911 ctypes type or a one character typecode of the kind used by the :mod:`array`
912 module. */*args* is passed on to the constructor for the type.
913
914 Note that setting and getting the value is potentially non-atomic -- use
915 :func:`Value` instead to make sure that access is automatically synchronized
916 using a lock.
917
918 Note that an array of :data:`ctypes.c_char` has ``value`` and ``rawvalue``
919 attributes which allow one to use it to store and retrieve strings -- see
920 documentation for :mod:`ctypes`.
921
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000922.. function:: Array(typecode_or_type, size_or_initializer[, *args[, lock]])
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000923
924 The same as :func:`RawArray` except that depending on the value of *lock* a
925 process-safe synchronization wrapper may be returned instead of a raw ctypes
926 array.
927
928 If *lock* is ``True`` (the default) then a new lock object is created to
929 synchronize access to the value. If *lock* is a :class:`Lock` or
930 :class:`RLock` object then that will be used to synchronize access to the
931 value. If *lock* is ``False`` then access to the returned object will not be
932 automatically protected by a lock, so it will not necessarily be
933 "process-safe".
934
935 Note that *lock* is a keyword-only argument.
936
937.. function:: Value(typecode_or_type, *args[, lock])
938
939 The same as :func:`RawValue` except that depending on the value of *lock* a
940 process-safe synchronization wrapper may be returned instead of a raw ctypes
941 object.
942
943 If *lock* is ``True`` (the default) then a new lock object is created to
944 synchronize access to the value. If *lock* is a :class:`Lock` or
945 :class:`RLock` object then that will be used to synchronize access to the
946 value. If *lock* is ``False`` then access to the returned object will not be
947 automatically protected by a lock, so it will not necessarily be
948 "process-safe".
949
950 Note that *lock* is a keyword-only argument.
951
952.. function:: copy(obj)
953
954 Return a ctypes object allocated from shared memory which is a copy of the
955 ctypes object *obj*.
956
957.. function:: synchronized(obj[, lock])
958
959 Return a process-safe wrapper object for a ctypes object which uses *lock* to
960 synchronize access. If *lock* is ``None`` (the default) then a
961 :class:`multiprocessing.RLock` object is created automatically.
962
963 A synchronized wrapper will have two methods in addition to those of the
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000964 object it wraps: :meth:`get_obj` returns the wrapped object and
965 :meth:`get_lock` returns the lock object used for synchronization.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000966
967 Note that accessing the ctypes object through the wrapper can be a lot slower
Benjamin Peterson910c2ab2008-06-27 23:22:06 +0000968 than accessing the raw ctypes object.
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000969
970
971The table below compares the syntax for creating shared ctypes objects from
972shared memory with the normal ctypes syntax. (In the table ``MyStruct`` is some
973subclass of :class:`ctypes.Structure`.)
974
975==================== ========================== ===========================
976ctypes sharedctypes using type sharedctypes using typecode
977==================== ========================== ===========================
978c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4)
979MyStruct(4, 6) RawValue(MyStruct, 4, 6)
980(c_short * 7)() RawArray(c_short, 7) RawArray('h', 7)
981(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))
982==================== ========================== ===========================
983
984
985Below is an example where a number of ctypes objects are modified by a child
986process::
987
988 from multiprocessing import Process, Lock
989 from multiprocessing.sharedctypes import Value, Array
990 from ctypes import Structure, c_double
991
992 class Point(Structure):
993 _fields_ = [('x', c_double), ('y', c_double)]
994
995 def modify(n, x, s, A):
996 n.value **= 2
997 x.value **= 2
998 s.value = s.value.upper()
999 for a in A:
1000 a.x **= 2
1001 a.y **= 2
1002
1003 if __name__ == '__main__':
1004 lock = Lock()
1005
1006 n = Value('i', 7)
1007 x = Value(ctypes.c_double, 1.0/3.0, lock=False)
1008 s = Array('c', 'hello world', lock=lock)
1009 A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
1010
1011 p = Process(target=modify, args=(n, x, s, A))
1012 p.start()
1013 p.join()
1014
1015 print n.value
1016 print x.value
1017 print s.value
1018 print [(a.x, a.y) for a in A]
1019
1020
1021.. highlightlang:: none
1022
1023The results printed are ::
1024
1025 49
1026 0.1111111111111111
1027 HELLO WORLD
1028 [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
1029
1030.. highlightlang:: python
1031
1032
1033.. _multiprocessing-managers:
1034
1035Managers
1036~~~~~~~~
1037
1038Managers provide a way to create data which can be shared between different
1039processes. A manager object controls a server process which manages *shared
1040objects*. Other processes can access the shared objects by using proxies.
1041
1042.. function:: multiprocessing.Manager()
1043
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001044 Returns a started :class:`~multiprocessing.managers.SyncManager` object which
1045 can be used for sharing objects between processes. The returned manager
1046 object corresponds to a spawned child process and has methods which will
1047 create shared objects and return corresponding proxies.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001048
1049.. module:: multiprocessing.managers
1050 :synopsis: Share data between process with shared objects.
1051
1052Manager processes will be shutdown as soon as they are garbage collected or
1053their parent process exits. The manager classes are defined in the
1054:mod:`multiprocessing.managers` module:
1055
1056.. class:: BaseManager([address[, authkey]])
1057
1058 Create a BaseManager object.
1059
1060 Once created one should call :meth:`start` or :meth:`serve_forever` to ensure
1061 that the manager object refers to a started manager process.
1062
1063 *address* is the address on which the manager process listens for new
1064 connections. If *address* is ``None`` then an arbitrary one is chosen.
1065
1066 *authkey* is the authentication key which will be used to check the validity
1067 of incoming connections to the server process. If *authkey* is ``None`` then
Benjamin Peterson73641d72008-08-20 14:07:59 +00001068 ``current_process().authkey``. Otherwise *authkey* is used and it
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001069 must be a string.
1070
1071 .. method:: start()
1072
1073 Start a subprocess to start the manager.
1074
Andrew M. Kuchlinga2478d92008-07-14 00:40:55 +00001075 .. method:: serve_forever()
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001076
1077 Run the server in the current process.
1078
1079 .. method:: from_address(address, authkey)
1080
1081 A class method which creates a manager object referring to a pre-existing
1082 server process which is using the given address and authentication key.
1083
1084 .. method:: shutdown()
1085
1086 Stop the process used by the manager. This is only available if
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001087 :meth:`start` has been used to start the server process.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001088
1089 This can be called multiple times.
1090
1091 .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
1092
1093 A classmethod which can be used for registering a type or callable with
1094 the manager class.
1095
1096 *typeid* is a "type identifier" which is used to identify a particular
1097 type of shared object. This must be a string.
1098
1099 *callable* is a callable used for creating objects for this type
1100 identifier. If a manager instance will be created using the
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001101 :meth:`from_address` classmethod or if the *create_method* argument is
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001102 ``False`` then this can be left as ``None``.
1103
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001104 *proxytype* is a subclass of :class:`BaseProxy` which is used to create
1105 proxies for shared objects with this *typeid*. If ``None`` then a proxy
1106 class is created automatically.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001107
1108 *exposed* is used to specify a sequence of method names which proxies for
1109 this typeid should be allowed to access using
1110 :meth:`BaseProxy._callMethod`. (If *exposed* is ``None`` then
1111 :attr:`proxytype._exposed_` is used instead if it exists.) In the case
1112 where no exposed list is specified, all "public methods" of the shared
1113 object will be accessible. (Here a "public method" means any attribute
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001114 which has a :meth:`__call__` method and whose name does not begin with
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001115 ``'_'``.)
1116
1117 *method_to_typeid* is a mapping used to specify the return type of those
1118 exposed methods which should return a proxy. It maps method names to
1119 typeid strings. (If *method_to_typeid* is ``None`` then
1120 :attr:`proxytype._method_to_typeid_` is used instead if it exists.) If a
1121 method's name is not a key of this mapping or if the mapping is ``None``
1122 then the object returned by the method will be copied by value.
1123
1124 *create_method* determines whether a method should be created with name
1125 *typeid* which can be used to tell the server process to create a new
1126 shared object and return a proxy for it. By default it is ``True``.
1127
1128 :class:`BaseManager` instances also have one read-only property:
1129
1130 .. attribute:: address
1131
1132 The address used by the manager.
1133
1134
1135.. class:: SyncManager
1136
1137 A subclass of :class:`BaseManager` which can be used for the synchronization
1138 of processes. Objects of this type are returned by
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001139 :func:`multiprocessing.Manager`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001140
1141 It also supports creation of shared lists and dictionaries.
1142
1143 .. method:: BoundedSemaphore([value])
1144
1145 Create a shared :class:`threading.BoundedSemaphore` object and return a
1146 proxy for it.
1147
1148 .. method:: Condition([lock])
1149
1150 Create a shared :class:`threading.Condition` object and return a proxy for
1151 it.
1152
1153 If *lock* is supplied then it should be a proxy for a
1154 :class:`threading.Lock` or :class:`threading.RLock` object.
1155
1156 .. method:: Event()
1157
1158 Create a shared :class:`threading.Event` object and return a proxy for it.
1159
1160 .. method:: Lock()
1161
1162 Create a shared :class:`threading.Lock` object and return a proxy for it.
1163
1164 .. method:: Namespace()
1165
1166 Create a shared :class:`Namespace` object and return a proxy for it.
1167
1168 .. method:: Queue([maxsize])
1169
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001170 Create a shared :class:`Queue.Queue` object and return a proxy for it.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001171
1172 .. method:: RLock()
1173
1174 Create a shared :class:`threading.RLock` object and return a proxy for it.
1175
1176 .. method:: Semaphore([value])
1177
1178 Create a shared :class:`threading.Semaphore` object and return a proxy for
1179 it.
1180
1181 .. method:: Array(typecode, sequence)
1182
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001183 Create an array and return a proxy for it.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001184
1185 .. method:: Value(typecode, value)
1186
1187 Create an object with a writable ``value`` attribute and return a proxy
1188 for it.
1189
1190 .. method:: dict()
1191 dict(mapping)
1192 dict(sequence)
1193
1194 Create a shared ``dict`` object and return a proxy for it.
1195
1196 .. method:: list()
1197 list(sequence)
1198
1199 Create a shared ``list`` object and return a proxy for it.
1200
1201
1202Namespace objects
1203>>>>>>>>>>>>>>>>>
1204
1205A namespace object has no public methods, but does have writable attributes.
1206Its representation shows the values of its attributes.
1207
1208However, when using a proxy for a namespace object, an attribute beginning with
1209``'_'`` will be an attribute of the proxy and not an attribute of the referent::
1210
1211 >>> manager = multiprocessing.Manager()
1212 >>> Global = manager.Namespace()
1213 >>> Global.x = 10
1214 >>> Global.y = 'hello'
1215 >>> Global._z = 12.3 # this is an attribute of the proxy
1216 >>> print Global
1217 Namespace(x=10, y='hello')
1218
1219
1220Customized managers
1221>>>>>>>>>>>>>>>>>>>
1222
1223To create one's own manager, one creates a subclass of :class:`BaseManager` and
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001224use the :meth:`~BaseManager.resgister` classmethod to register new types or
1225callables with the manager class. For example::
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001226
1227 from multiprocessing.managers import BaseManager
1228
1229 class MathsClass(object):
1230 def add(self, x, y):
1231 return x + y
1232 def mul(self, x, y):
1233 return x * y
1234
1235 class MyManager(BaseManager):
1236 pass
1237
1238 MyManager.register('Maths', MathsClass)
1239
1240 if __name__ == '__main__':
1241 manager = MyManager()
1242 manager.start()
1243 maths = manager.Maths()
1244 print maths.add(4, 3) # prints 7
1245 print maths.mul(7, 8) # prints 56
1246
1247
1248Using a remote manager
1249>>>>>>>>>>>>>>>>>>>>>>
1250
1251It is possible to run a manager server on one machine and have clients use it
1252from other machines (assuming that the firewalls involved allow it).
1253
1254Running the following commands creates a server for a single shared queue which
1255remote clients can access::
1256
1257 >>> from multiprocessing.managers import BaseManager
1258 >>> import Queue
1259 >>> queue = Queue.Queue()
1260 >>> class QueueManager(BaseManager): pass
1261 ...
1262 >>> QueueManager.register('getQueue', callable=lambda:queue)
1263 >>> m = QueueManager(address=('', 50000), authkey='abracadabra')
1264 >>> m.serveForever()
1265
1266One client can access the server as follows::
1267
1268 >>> from multiprocessing.managers import BaseManager
1269 >>> class QueueManager(BaseManager): pass
1270 ...
1271 >>> QueueManager.register('getQueue')
1272 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000),
1273 >>> authkey='abracadabra')
1274 >>> queue = m.getQueue()
1275 >>> queue.put('hello')
1276
1277Another client can also use it::
1278
1279 >>> from multiprocessing.managers import BaseManager
1280 >>> class QueueManager(BaseManager): pass
1281 ...
1282 >>> QueueManager.register('getQueue')
1283 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra')
1284 >>> queue = m.getQueue()
1285 >>> queue.get()
1286 'hello'
1287
1288
1289Proxy Objects
1290~~~~~~~~~~~~~
1291
1292A proxy is an object which *refers* to a shared object which lives (presumably)
1293in a different process. The shared object is said to be the *referent* of the
1294proxy. Multiple proxy objects may have the same referent.
1295
1296A proxy object has methods which invoke corresponding methods of its referent
1297(although not every method of the referent will necessarily be available through
1298the proxy). A proxy can usually be used in most of the same ways that its
1299referent can::
1300
1301 >>> from multiprocessing import Manager
1302 >>> manager = Manager()
1303 >>> l = manager.list([i*i for i in range(10)])
1304 >>> print l
1305 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
1306 >>> print repr(l)
1307 <ListProxy object, typeid 'list' at 0xb799974c>
1308 >>> l[4]
1309 16
1310 >>> l[2:5]
1311 [4, 9, 16]
1312
1313Notice that applying :func:`str` to a proxy will return the representation of
1314the referent, whereas applying :func:`repr` will return the representation of
1315the proxy.
1316
1317An important feature of proxy objects is that they are picklable so they can be
1318passed between processes. Note, however, that if a proxy is sent to the
1319corresponding manager's process then unpickling it will produce the referent
1320itself. This means, for example, that one shared object can contain a second::
1321
1322 >>> a = manager.list()
1323 >>> b = manager.list()
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001324 >>> a.append(b) # referent of a now contains referent of b
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001325 >>> print a, b
1326 [[]] []
1327 >>> b.append('hello')
1328 >>> print a, b
1329 [['hello']] ['hello']
1330
1331.. note::
1332
1333 The proxy types in :mod:`multiprocessing` do nothing to support comparisons
1334 by value. So, for instance, ::
1335
1336 manager.list([1,2,3]) == [1,2,3]
1337
1338 will return ``False``. One should just use a copy of the referent instead
1339 when making comparisons.
1340
1341.. class:: BaseProxy
1342
1343 Proxy objects are instances of subclasses of :class:`BaseProxy`.
1344
1345 .. method:: _call_method(methodname[, args[, kwds]])
1346
1347 Call and return the result of a method of the proxy's referent.
1348
1349 If ``proxy`` is a proxy whose referent is ``obj`` then the expression ::
1350
1351 proxy._call_method(methodname, args, kwds)
1352
1353 will evaluate the expression ::
1354
1355 getattr(obj, methodname)(*args, **kwds)
1356
1357 in the manager's process.
1358
1359 The returned value will be a copy of the result of the call or a proxy to
1360 a new shared object -- see documentation for the *method_to_typeid*
1361 argument of :meth:`BaseManager.register`.
1362
1363 If an exception is raised by the call, then then is re-raised by
1364 :meth:`_call_method`. If some other exception is raised in the manager's
1365 process then this is converted into a :exc:`RemoteError` exception and is
1366 raised by :meth:`_call_method`.
1367
1368 Note in particular that an exception will be raised if *methodname* has
1369 not been *exposed*
1370
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001371 An example of the usage of :meth:`_call_method`::
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001372
1373 >>> l = manager.list(range(10))
1374 >>> l._call_method('__len__')
1375 10
1376 >>> l._call_method('__getslice__', (2, 7)) # equiv to `l[2:7]`
1377 [2, 3, 4, 5, 6]
1378 >>> l._call_method('__getitem__', (20,)) # equiv to `l[20]`
1379 Traceback (most recent call last):
1380 ...
1381 IndexError: list index out of range
1382
1383 .. method:: _get_value()
1384
1385 Return a copy of the referent.
1386
1387 If the referent is unpicklable then this will raise an exception.
1388
1389 .. method:: __repr__
1390
1391 Return a representation of the proxy object.
1392
1393 .. method:: __str__
1394
1395 Return the representation of the referent.
1396
1397
1398Cleanup
1399>>>>>>>
1400
1401A proxy object uses a weakref callback so that when it gets garbage collected it
1402deregisters itself from the manager which owns its referent.
1403
1404A shared object gets deleted from the manager process when there are no longer
1405any proxies referring to it.
1406
1407
1408Process Pools
1409~~~~~~~~~~~~~
1410
1411.. module:: multiprocessing.pool
1412 :synopsis: Create pools of processes.
1413
1414One can create a pool of processes which will carry out tasks submitted to it
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001415with the :class:`Pool` class.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001416
1417.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]])
1418
1419 A process pool object which controls a pool of worker processes to which jobs
1420 can be submitted. It supports asynchronous results with timeouts and
1421 callbacks and has a parallel map implementation.
1422
1423 *processes* is the number of worker processes to use. If *processes* is
1424 ``None`` then the number returned by :func:`cpu_count` is used. If
1425 *initializer* is not ``None`` then each worker process will call
1426 ``initializer(*initargs)`` when it starts.
1427
1428 .. method:: apply(func[, args[, kwds]])
1429
1430 Equivalent of the :func:`apply` builtin function. It blocks till the
1431 result is ready.
1432
1433 .. method:: apply_async(func[, args[, kwds[, callback]]])
1434
1435 A variant of the :meth:`apply` method which returns a result object.
1436
1437 If *callback* is specified then it should be a callable which accepts a
1438 single argument. When the result becomes ready *callback* is applied to
1439 it (unless the call failed). *callback* should complete immediately since
1440 otherwise the thread which handles the results will get blocked.
1441
1442 .. method:: map(func, iterable[, chunksize])
1443
1444 A parallel equivalent of the :func:`map` builtin function. It blocks till
1445 the result is ready.
1446
1447 This method chops the iterable into a number of chunks which it submits to
1448 the process pool as separate tasks. The (approximate) size of these
1449 chunks can be specified by setting *chunksize* to a positive integer.
1450
1451 .. method:: map_async(func, iterable[, chunksize[, callback]])
1452
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001453 A variant of the :meth:`map` method which returns a result object.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001454
1455 If *callback* is specified then it should be a callable which accepts a
1456 single argument. When the result becomes ready *callback* is applied to
1457 it (unless the call failed). *callback* should complete immediately since
1458 otherwise the thread which handles the results will get blocked.
1459
1460 .. method:: imap(func, iterable[, chunksize])
1461
1462 An equivalent of :func:`itertools.imap`.
1463
1464 The *chunksize* argument is the same as the one used by the :meth:`.map`
1465 method. For very long iterables using a large value for *chunksize* can
1466 make make the job complete **much** faster than using the default value of
1467 ``1``.
1468
1469 Also if *chunksize* is ``1`` then the :meth:`next` method of the iterator
1470 returned by the :meth:`imap` method has an optional *timeout* parameter:
1471 ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the
1472 result cannot be returned within *timeout* seconds.
1473
1474 .. method:: imap_unordered(func, iterable[, chunksize])
1475
1476 The same as :meth:`imap` except that the ordering of the results from the
1477 returned iterator should be considered arbitrary. (Only when there is
1478 only one worker process is the order guaranteed to be "correct".)
1479
1480 .. method:: close()
1481
1482 Prevents any more tasks from being submitted to the pool. Once all the
1483 tasks have been completed the worker processes will exit.
1484
1485 .. method:: terminate()
1486
1487 Stops the worker processes immediately without completing outstanding
1488 work. When the pool object is garbage collected :meth:`terminate` will be
1489 called immediately.
1490
1491 .. method:: join()
1492
1493 Wait for the worker processes to exit. One must call :meth:`close` or
1494 :meth:`terminate` before using :meth:`join`.
1495
1496
1497.. class:: AsyncResult
1498
1499 The class of the result returned by :meth:`Pool.apply_async` and
1500 :meth:`Pool.map_async`.
1501
1502 .. method:: get([timeout)
1503
1504 Return the result when it arrives. If *timeout* is not ``None`` and the
1505 result does not arrive within *timeout* seconds then
1506 :exc:`multiprocessing.TimeoutError` is raised. If the remote call raised
1507 an exception then that exception will be reraised by :meth:`get`.
1508
1509 .. method:: wait([timeout])
1510
1511 Wait until the result is available or until *timeout* seconds pass.
1512
1513 .. method:: ready()
1514
1515 Return whether the call has completed.
1516
1517 .. method:: successful()
1518
1519 Return whether the call completed without raising an exception. Will
1520 raise :exc:`AssertionError` if the result is not ready.
1521
1522The following example demonstrates the use of a pool::
1523
1524 from multiprocessing import Pool
1525
1526 def f(x):
1527 return x*x
1528
1529 if __name__ == '__main__':
1530 pool = Pool(processes=4) # start 4 worker processes
1531
1532 result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously
1533 print result.get(timeout=1) # prints "100" unless your computer is *very* slow
1534
1535 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
1536
1537 it = pool.imap(f, range(10))
1538 print it.next() # prints "0"
1539 print it.next() # prints "1"
1540 print it.next(timeout=1) # prints "4" unless your computer is *very* slow
1541
1542 import time
1543 result = pool.applyAsync(time.sleep, (10,))
1544 print result.get(timeout=1) # raises TimeoutError
1545
1546
1547.. _multiprocessing-listeners-clients:
1548
1549Listeners and Clients
1550~~~~~~~~~~~~~~~~~~~~~
1551
1552.. module:: multiprocessing.connection
1553 :synopsis: API for dealing with sockets.
1554
1555Usually message passing between processes is done using queues or by using
1556:class:`Connection` objects returned by :func:`Pipe`.
1557
1558However, the :mod:`multiprocessing.connection` module allows some extra
1559flexibility. It basically gives a high level message oriented API for dealing
1560with sockets or Windows named pipes, and also has support for *digest
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001561authentication* using the :mod:`hmac` module.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001562
1563
1564.. function:: deliver_challenge(connection, authkey)
1565
1566 Send a randomly generated message to the other end of the connection and wait
1567 for a reply.
1568
1569 If the reply matches the digest of the message using *authkey* as the key
1570 then a welcome message is sent to the other end of the connection. Otherwise
1571 :exc:`AuthenticationError` is raised.
1572
1573.. function:: answerChallenge(connection, authkey)
1574
1575 Receive a message, calculate the digest of the message using *authkey* as the
1576 key, and then send the digest back.
1577
1578 If a welcome message is not received, then :exc:`AuthenticationError` is
1579 raised.
1580
1581.. function:: Client(address[, family[, authenticate[, authkey]]])
1582
1583 Attempt to set up a connection to the listener which is using address
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001584 *address*, returning a :class:`~multiprocessing.Connection`.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001585
1586 The type of the connection is determined by *family* argument, but this can
1587 generally be omitted since it can usually be inferred from the format of
1588 *address*. (See :ref:`multiprocessing-address-formats`)
1589
1590 If *authentication* is ``True`` or *authkey* is a string then digest
1591 authentication is used. The key used for authentication will be either
Benjamin Peterson73641d72008-08-20 14:07:59 +00001592 *authkey* or ``current_process().authkey)`` if *authkey* is ``None``.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001593 If authentication fails then :exc:`AuthenticationError` is raised. See
1594 :ref:`multiprocessing-auth-keys`.
1595
1596.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
1597
1598 A wrapper for a bound socket or Windows named pipe which is 'listening' for
1599 connections.
1600
1601 *address* is the address to be used by the bound socket or named pipe of the
1602 listener object.
1603
1604 *family* is the type of socket (or named pipe) to use. This can be one of
1605 the strings ``'AF_INET'`` (for a TCP socket), ``'AF_UNIX'`` (for a Unix
1606 domain socket) or ``'AF_PIPE'`` (for a Windows named pipe). Of these only
1607 the first is guaranteed to be available. If *family* is ``None`` then the
1608 family is inferred from the format of *address*. If *address* is also
1609 ``None`` then a default is chosen. This default is the family which is
1610 assumed to be the fastest available. See
1611 :ref:`multiprocessing-address-formats`. Note that if *family* is
1612 ``'AF_UNIX'`` and address is ``None`` then the socket will be created in a
1613 private temporary directory created using :func:`tempfile.mkstemp`.
1614
1615 If the listener object uses a socket then *backlog* (1 by default) is passed
1616 to the :meth:`listen` method of the socket once it has been bound.
1617
1618 If *authenticate* is ``True`` (``False`` by default) or *authkey* is not
1619 ``None`` then digest authentication is used.
1620
1621 If *authkey* is a string then it will be used as the authentication key;
1622 otherwise it must be *None*.
1623
1624 If *authkey* is ``None`` and *authenticate* is ``True`` then
Benjamin Peterson73641d72008-08-20 14:07:59 +00001625 ``current_process().authkey`` is used as the authentication key. If
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001626 *authkey* is ``None`` and *authentication* is ``False`` then no
1627 authentication is done. If authentication fails then
1628 :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`.
1629
1630 .. method:: accept()
1631
1632 Accept a connection on the bound socket or named pipe of the listener
1633 object and return a :class:`Connection` object. If authentication is
1634 attempted and fails, then :exc:`AuthenticationError` is raised.
1635
1636 .. method:: close()
1637
1638 Close the bound socket or named pipe of the listener object. This is
1639 called automatically when the listener is garbage collected. However it
1640 is advisable to call it explicitly.
1641
1642 Listener objects have the following read-only properties:
1643
1644 .. attribute:: address
1645
1646 The address which is being used by the Listener object.
1647
1648 .. attribute:: last_accepted
1649
1650 The address from which the last accepted connection came. If this is
1651 unavailable then it is ``None``.
1652
1653
1654The module defines two exceptions:
1655
1656.. exception:: AuthenticationError
1657
1658 Exception raised when there is an authentication error.
1659
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001660
1661**Examples**
1662
1663The following server code creates a listener which uses ``'secret password'`` as
1664an authentication key. It then waits for a connection and sends some data to
1665the client::
1666
1667 from multiprocessing.connection import Listener
1668 from array import array
1669
1670 address = ('localhost', 6000) # family is deduced to be 'AF_INET'
1671 listener = Listener(address, authkey='secret password')
1672
1673 conn = listener.accept()
1674 print 'connection accepted from', listener.last_accepted
1675
1676 conn.send([2.25, None, 'junk', float])
1677
1678 conn.send_bytes('hello')
1679
1680 conn.send_bytes(array('i', [42, 1729]))
1681
1682 conn.close()
1683 listener.close()
1684
1685The following code connects to the server and receives some data from the
1686server::
1687
1688 from multiprocessing.connection import Client
1689 from array import array
1690
1691 address = ('localhost', 6000)
1692 conn = Client(address, authkey='secret password')
1693
1694 print conn.recv() # => [2.25, None, 'junk', float]
1695
1696 print conn.recv_bytes() # => 'hello'
1697
1698 arr = array('i', [0, 0, 0, 0, 0])
1699 print conn.recv_bytes_into(arr) # => 8
1700 print arr # => array('i', [42, 1729, 0, 0, 0])
1701
1702 conn.close()
1703
1704
1705.. _multiprocessing-address-formats:
1706
1707Address Formats
1708>>>>>>>>>>>>>>>
1709
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +00001710* An ``'AF_INET'`` address is a tuple of the form ``(hostname, port)`` where
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001711 *hostname* is a string and *port* is an integer.
1712
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +00001713* An ``'AF_UNIX'`` address is a string representing a filename on the
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001714 filesystem.
1715
1716* An ``'AF_PIPE'`` address is a string of the form
1717 ``r'\\\\.\\pipe\\PipeName'``. To use :func:`Client` to connect to a named
1718 pipe on a remote computer called ServerName* one should use an address of the
1719 form ``r'\\\\ServerName\\pipe\\PipeName'`` instead.
1720
1721Note that any string beginning with two backslashes is assumed by default to be
1722an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address.
1723
1724
1725.. _multiprocessing-auth-keys:
1726
1727Authentication keys
1728~~~~~~~~~~~~~~~~~~~
1729
1730When one uses :meth:`Connection.recv`, the data received is automatically
1731unpickled. Unfortunately unpickling data from an untrusted source is a security
1732risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module
1733to provide digest authentication.
1734
1735An authentication key is a string which can be thought of as a password: once a
1736connection is established both ends will demand proof that the other knows the
1737authentication key. (Demonstrating that both ends are using the same key does
1738**not** involve sending the key over the connection.)
1739
1740If authentication is requested but do authentication key is specified then the
Benjamin Peterson73641d72008-08-20 14:07:59 +00001741return value of ``current_process().authkey`` is used (see
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001742:class:`~multiprocessing.Process`). This value will automatically inherited by
1743any :class:`~multiprocessing.Process` object that the current process creates.
1744This means that (by default) all processes of a multi-process program will share
1745a single authentication key which can be used when setting up connections
1746between the themselves.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001747
1748Suitable authentication keys can also be generated by using :func:`os.urandom`.
1749
1750
1751Logging
1752~~~~~~~
1753
1754Some support for logging is available. Note, however, that the :mod:`logging`
1755package does not use process shared locks so it is possible (depending on the
1756handler type) for messages from different processes to get mixed up.
1757
1758.. currentmodule:: multiprocessing
1759.. function:: get_logger()
1760
1761 Returns the logger used by :mod:`multiprocessing`. If necessary, a new one
1762 will be created.
1763
1764 When first created the logger has level :data:`logging.NOTSET` and has a
1765 handler which sends output to :data:`sys.stderr` using format
1766 ``'[%(levelname)s/%(processName)s] %(message)s'``. (The logger allows use of
1767 the non-standard ``'%(processName)s'`` format.) Message sent to this logger
Benjamin Peterson90f36732008-07-12 20:16:19 +00001768 will not by default propagate to the root logger.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001769
1770 Note that on Windows child processes will only inherit the level of the
1771 parent process's logger -- any other customization of the logger will not be
1772 inherited.
1773
1774Below is an example session with logging turned on::
1775
1776 >>> import processing, logging
1777 >>> logger = processing.getLogger()
1778 >>> logger.setLevel(logging.INFO)
1779 >>> logger.warning('doomed')
1780 [WARNING/MainProcess] doomed
1781 >>> m = processing.Manager()
1782 [INFO/SyncManager-1] child process calling self.run()
1783 [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa'
1784 >>> del m
1785 [INFO/MainProcess] sending shutdown message to manager
1786 [INFO/SyncManager-1] manager exiting with exitcode 0
1787
1788
1789The :mod:`multiprocessing.dummy` module
1790~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1791
1792.. module:: multiprocessing.dummy
1793 :synopsis: Dumb wrapper around threading.
1794
1795:mod:`multiprocessing.dummy` replicates the API of :mod:`multiprocessing` but is
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001796no more than a wrapper around the :mod:`threading` module.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001797
1798
1799.. _multiprocessing-programming:
1800
1801Programming guidelines
1802----------------------
1803
1804There are certain guidelines and idioms which should be adhered to when using
1805:mod:`multiprocessing`.
1806
1807
1808All platforms
1809~~~~~~~~~~~~~
1810
1811Avoid shared state
1812
1813 As far as possible one should try to avoid shifting large amounts of data
1814 between processes.
1815
1816 It is probably best to stick to using queues or pipes for communication
1817 between processes rather than using the lower level synchronization
1818 primitives from the :mod:`threading` module.
1819
1820Picklability
1821
1822 Ensure that the arguments to the methods of proxies are picklable.
1823
1824Thread safety of proxies
1825
1826 Do not use a proxy object from more than one thread unless you protect it
1827 with a lock.
1828
1829 (There is never a problem with different processes using the *same* proxy.)
1830
1831Joining zombie processes
1832
1833 On Unix when a process finishes but has not been joined it becomes a zombie.
1834 There should never be very many because each time a new process starts (or
1835 :func:`active_children` is called) all completed processes which have not
1836 yet been joined will be joined. Also calling a finished process's
1837 :meth:`Process.is_alive` will join the process. Even so it is probably good
1838 practice to explicitly join all the processes that you start.
1839
1840Better to inherit than pickle/unpickle
1841
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +00001842 On Windows many types from :mod:`multiprocessing` need to be picklable so
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001843 that child processes can use them. However, one should generally avoid
1844 sending shared objects to other processes using pipes or queues. Instead
1845 you should arrange the program so that a process which need access to a
1846 shared resource created elsewhere can inherit it from an ancestor process.
1847
1848Avoid terminating processes
1849
1850 Using the :meth:`Process.terminate` method to stop a process is liable to
1851 cause any shared resources (such as locks, semaphores, pipes and queues)
1852 currently being used by the process to become broken or unavailable to other
1853 processes.
1854
1855 Therefore it is probably best to only consider using
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001856 :meth:`Process.terminate` on processes which never use any shared resources.
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001857
1858Joining processes that use queues
1859
1860 Bear in mind that a process that has put items in a queue will wait before
1861 terminating until all the buffered items are fed by the "feeder" thread to
1862 the underlying pipe. (The child process can call the
1863 :meth:`Queue.cancel_join` method of the queue to avoid this behaviour.)
1864
1865 This means that whenever you use a queue you need to make sure that all
1866 items which have been put on the queue will eventually be removed before the
1867 process is joined. Otherwise you cannot be sure that processes which have
1868 put items on the queue will terminate. Remember also that non-daemonic
1869 processes will be automatically be joined.
1870
1871 An example which will deadlock is the following::
1872
1873 from multiprocessing import Process, Queue
1874
1875 def f(q):
1876 q.put('X' * 1000000)
1877
1878 if __name__ == '__main__':
1879 queue = Queue()
1880 p = Process(target=f, args=(queue,))
1881 p.start()
1882 p.join() # this deadlocks
1883 obj = queue.get()
1884
1885 A fix here would be to swap the last two lines round (or simply remove the
1886 ``p.join()`` line).
1887
Andrew M. Kuchlingbe504f12008-06-19 19:48:42 +00001888Explicitly pass resources to child processes
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001889
1890 On Unix a child process can make use of a shared resource created in a
1891 parent process using a global resource. However, it is better to pass the
1892 object as an argument to the constructor for the child process.
1893
1894 Apart from making the code (potentially) compatible with Windows this also
1895 ensures that as long as the child process is still alive the object will not
1896 be garbage collected in the parent process. This might be important if some
1897 resource is freed when the object is garbage collected in the parent
1898 process.
1899
1900 So for instance ::
1901
1902 from multiprocessing import Process, Lock
1903
1904 def f():
1905 ... do something using "lock" ...
1906
1907 if __name__ == '__main__':
1908 lock = Lock()
1909 for i in range(10):
1910 Process(target=f).start()
1911
1912 should be rewritten as ::
1913
1914 from multiprocessing import Process, Lock
1915
1916 def f(l):
1917 ... do something using "l" ...
1918
1919 if __name__ == '__main__':
1920 lock = Lock()
1921 for i in range(10):
1922 Process(target=f, args=(lock,)).start()
1923
1924
1925Windows
1926~~~~~~~
1927
1928Since Windows lacks :func:`os.fork` it has a few extra restrictions:
1929
1930More picklability
1931
1932 Ensure that all arguments to :meth:`Process.__init__` are picklable. This
1933 means, in particular, that bound or unbound methods cannot be used directly
1934 as the ``target`` argument on Windows --- just define a function and use
1935 that instead.
1936
1937 Also, if you subclass :class:`Process` then make sure that instances will be
1938 picklable when the :meth:`Process.start` method is called.
1939
1940Global variables
1941
1942 Bear in mind that if code run in a child process tries to access a global
1943 variable, then the value it sees (if any) may not be the same as the value
1944 in the parent process at the time that :meth:`Process.start` was called.
1945
1946 However, global variables which are just module level constants cause no
1947 problems.
1948
1949Safe importing of main module
1950
1951 Make sure that the main module can be safely imported by a new Python
1952 interpreter without causing unintended side effects (such a starting a new
1953 process).
1954
1955 For example, under Windows running the following module would fail with a
1956 :exc:`RuntimeError`::
1957
1958 from multiprocessing import Process
1959
1960 def foo():
1961 print 'hello'
1962
1963 p = Process(target=foo)
1964 p.start()
1965
1966 Instead one should protect the "entry point" of the program by using ``if
1967 __name__ == '__main__':`` as follows::
1968
1969 from multiprocessing import Process, freeze_support
1970
1971 def foo():
1972 print 'hello'
1973
1974 if __name__ == '__main__':
1975 freeze_support()
1976 p = Process(target=foo)
1977 p.start()
1978
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00001979 (The ``freeze_support()`` line can be omitted if the program will be run
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001980 normally instead of frozen.)
1981
1982 This allows the newly spawned Python interpreter to safely import the module
1983 and then run the module's ``foo()`` function.
1984
1985 Similar restrictions apply if a pool or manager is created in the main
1986 module.
1987
1988
1989.. _multiprocessing-examples:
1990
1991Examples
1992--------
1993
1994Demonstration of how to create and use customized managers and proxies:
1995
1996.. literalinclude:: ../includes/mp_newtype.py
1997
1998
1999Using :class:`Pool`:
2000
2001.. literalinclude:: ../includes/mp_pool.py
2002
2003
2004Synchronization types like locks, conditions and queues:
2005
2006.. literalinclude:: ../includes/mp_synchronize.py
2007
2008
2009An showing how to use queues to feed tasks to a collection of worker process and
2010collect the results:
2011
2012.. literalinclude:: ../includes/mp_workers.py
2013
2014
2015An example of how a pool of worker processes can each run a
2016:class:`SimpleHTTPServer.HttpServer` instance while sharing a single listening
2017socket.
2018
2019.. literalinclude:: ../includes/mp_webserver.py
2020
2021
2022Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
2023
2024.. literalinclude:: ../includes/mp_benchmarks.py
2025
2026An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
2027and others to build a system which can distribute processes and work via a
2028distributed queue to a "cluster" of machines on a network, accessible via SSH.
2029You will need to have private key authentication for all hosts configured for
2030this to work.
2031
Benjamin Peterson910c2ab2008-06-27 23:22:06 +00002032.. literalinclude:: ../includes/mp_distributing.py