blob: a220e486e336a1a11fcfd7b3447bea55a211cb46 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001:mod:`multiprocessing` --- Process-based "threading" interface
2==============================================================
3
4.. module:: multiprocessing
5 :synopsis: Process-based "threading" interface.
6
Benjamin Petersone711caf2008-06-11 16:44:04 +00007:mod:`multiprocessing` is a package for the Python language which supports the
8spawning of processes using a similar API of the :mod:`threading` module. It
9runs on both Unix and Windows.
10
11The :mod:`multiprocessing` module offers the capability of both local and remote
12concurrency effectively side-stepping the Global Interpreter Lock by utilizing
13subprocesses for "threads". Due to this, the :mod:`multiprocessing` module
14allows the programmer to fully leverage multiple processors on a given machine.
15
16
17Introduction
18------------
19
20
21Threads, processes and the GIL
22~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
23
24To run more than one piece of code at the same time on the same computer one has
25the choice of either using multiple processes or multiple threads.
26
27Although a program can be made up of multiple processes, these processes are in
28effect completely independent of one another: different processes are not able
29to cooperate with one another unless one sets up some means of communication
30between them (such as by using sockets). If a lot of data must be transferred
31between processes then this can be inefficient.
32
33On the other hand, multiple threads within a single process are intimately
34connected: they share their data but often can interfere badly with one another.
35It is often argued that the only way to make multithreaded programming "easy" is
36to avoid relying on any shared state and for the threads to only communicate by
37passing messages to each other.
38
39CPython has a *Global Interpreter Lock* (GIL) which in many ways makes threading
40easier than it is in most languages by making sure that only one thread can
41manipulate the interpreter's objects at a time. As a result, it is often safe
42to let multiple threads access data without using any additional locking as one
43would need to in a language such as C.
44
45One downside of the GIL is that on multi-processor (or multi-core) systems a
46multithreaded Python program can only make use of one processor at a time unless
47your application makes heavy use of I/O which effectively side-steps this. This
48is a problem that can be overcome by using multiple processes instead.
49
50This package allows one to write multi-process programs using much the same API
51that one uses for writing threaded programs.
52
53
54Forking and spawning
55~~~~~~~~~~~~~~~~~~~~
56
57There are two ways of creating a new process in Python:
58
59* The current process can *fork* a new child process by using the
60 :func:`os.fork` function. This effectively creates an identical copy of the
61 current process which is now able to go off and perform some task set by the
62 parent process. This means that the child process inherits *copies* of all
63 variables that the parent process had. However, :func:`os.fork` is not
64 available on every platform: in particular Windows does not support it.
65
66* Alternatively, the current process can spawn a completely new Python
67 interpreter by using the :mod:`subprocess` module or one of the
68 :func:`os.spawn*` functions. Getting this new interpreter in to a fit state
69 to perform the task set for it by its parent process is, however, a bit of a
70 challenge.
71
72The :mod:`multiprocessing` module uses :func:`os.fork` if it is available since
73it makes life a lot simpler. Forking the process is also more efficient in
74terms of memory usage and the time needed to create the new process.
75
76
77The :class:`Process` class
78~~~~~~~~~~~~~~~~~~~~~~~~~~
79
80In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process`
81object and then calling its :meth:`Process.start` method. :class:`Process`
82follows the API of :class:`threading.Thread`. A trivial example of a
83multiprocess program is ::
84
85 from multiprocessing import Process
86
87 def f(name):
88 print 'hello', name
89
90 if __name__ == '__main__':
91 p = Process(target=f, args=('bob',))
92 p.start()
93 p.join()
94
95Here the function ``f`` is run in a child process.
96
97For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
98necessary, see :ref:`multiprocessing-programming`.
99
100
101
102Exchanging objects between processes
103~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
104
105:mod:`multiprocessing` supports two types of communication channel between
106processes:
107
108**Queues**
109
110 The :class:`Queue` class is a near clone of :class:`Queue.Queue`. For
111 example::
112
113 from multiprocessing import Process, Queue
114
115 def f(q):
116 q.put([42, None, 'hello'])
117
118 if __name__ == '__main__':
119 q = Queue()
120 p = Process(target=f, args=(q,))
121 p.start()
122 print q.get() # prints "[42, None, 'hello']"
123 p.join()
124
125 Queues are thread and process safe.
126
127**Pipes**
128
129 The :func:`Pipe` function returns a pair of connection objects connected by a
130 pipe which by default is duplex (two-way). For example::
131
132 from multiprocessing import Process, Pipe
133
134 def f(conn):
135 conn.send([42, None, 'hello'])
136 conn.close()
137
138 if __name__ == '__main__':
139 parent_conn, child_conn = Pipe()
140 p = Process(target=f, args=(child_conn,))
141 p.start()
142 print parent_conn.recv() # prints "[42, None, 'hello']"
143 p.join()
144
145 The two connection objects returned by :func:`Pipe` represent the two ends of
146 the pipe. Each connection object has :meth:`send` and :meth:`recv` methods
147 (among others). Note that data in a pipe may become corrupted if two
148 processes (or threads) try to read from or write to the *same* end of the
149 pipe at the same time. Of course there is no risk of corruption from
150 processes using different ends of the pipe at the same time.
151
152
153Synchronization between processes
154~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
155
156:mod:`multiprocessing` contains equivalents of all the synchronization
157primitives from :mod:`threading`. For instance one can use a lock to ensure
158that only one process prints to standard output at a time::
159
160 from multiprocessing import Process, Lock
161
162 def f(l, i):
163 l.acquire()
164 print 'hello world', i
165 l.release()
166
167 if __name__ == '__main__':
168 lock = Lock()
169
170 for num in range(10):
171 Process(target=f, args=(lock, num)).start()
172
173Without using the lock output from the different processes is liable to get all
174mixed up.
175
176
177Sharing state between processes
178~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
179
180As mentioned above, when doing concurrent programming it is usually best to
181avoid using shared state as far as possible. This is particularly true when
182using multiple processes.
183
184However, if you really do need to use some shared data then
185:mod:`multiprocessing` provides a couple of ways of doing so.
186
187**Shared memory**
188
189 Data can be stored in a shared memory map using :class:`Value` or
190 :class:`Array`. For example, the following code ::
191
192 from multiprocessing import Process, Value, Array
193
194 def f(n, a):
195 n.value = 3.1415927
196 for i in range(len(a)):
197 a[i] = -a[i]
198
199 if __name__ == '__main__':
200 num = Value('d', 0.0)
201 arr = Array('i', range(10))
202
203 p = Process(target=f, args=(num, arr))
204 p.start()
205 p.join()
206
207 print num.value
208 print arr[:]
209
210 will print ::
211
212 3.1415927
213 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
214
215 The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are
216 typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a
217 double precision float and ``'i'`` inidicates a signed integer. These shared
218 objects will be process and thread safe.
219
220 For more flexibility in using shared memory one can use the
221 :mod:`multiprocessing.sharedctypes` module which supports the creation of
222 arbitrary ctypes objects allocated from shared memory.
223
224**Server process**
225
226 A manager object returned by :func:`Manager` controls a server process which
227 holds python objects and allows other processes to manipulate them using
228 proxies.
229
230 A manager returned by :func:`Manager` will support types :class:`list`,
231 :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`,
232 :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
233 :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For
234 example, ::
235
236 from multiprocessing import Process, Manager
237
238 def f(d, l):
239 d[1] = '1'
240 d['2'] = 2
241 d[0.25] = None
242 l.reverse()
243
244 if __name__ == '__main__':
245 manager = Manager()
246
247 d = manager.dict()
248 l = manager.list(range(10))
249
250 p = Process(target=f, args=(d, l))
251 p.start()
252 p.join()
253
254 print d
255 print l
256
257 will print ::
258
259 {0.25: None, 1: '1', '2': 2}
260 [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
261
262 Server process managers are more flexible than using shared memory objects
263 because they can be made to support arbitrary object types. Also, a single
264 manager can be shared by processes on different computers over a network.
265 They are, however, slower than using shared memory.
266
267
268Using a pool of workers
269~~~~~~~~~~~~~~~~~~~~~~~
270
271The :class:`multiprocessing.pool.Pool()` class represens a pool of worker
272processes. It has methods which allows tasks to be offloaded to the worker
273processes in a few different ways.
274
275For example::
276
277 from multiprocessing import Pool
278
279 def f(x):
280 return x*x
281
282 if __name__ == '__main__':
283 pool = Pool(processes=4) # start 4 worker processes
284 result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously
285 print result.get(timeout=1) # prints "100" unless your computer is *very* slow
286 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
287
288
289Reference
290---------
291
292The :mod:`multiprocessing` package mostly replicates the API of the
293:mod:`threading` module.
294
295
296:class:`Process` and exceptions
297~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
298
299.. class:: Process([group[, target[, name[, args[, kwargs]]]]])
300
301 Process objects represent activity that is run in a separate process. The
302 :class:`Process` class has equivalents of all the methods of
303 :class:`threading.Thread`.
304
305 The constructor should always be called with keyword arguments. *group*
306 should always be ``None``; it exists soley for compatibility with
307 :class:`threading.Thread`. *target* is the callable object to be invoked by
308 the :meth:`run()` method. It defaults to None, meaning nothing is
309 called. *name* is the process name. By default, a unique name is constructed
310 of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
311 :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
312 is determined by the *generation* of the process. *args* is the argument
313 tuple for the target invocation. *kwargs* is a dictionary of keyword
314 arguments for the target invocation. By default, no arguments are passed to
315 *target*.
316
317 If a subclass overrides the constructor, it must make sure it invokes the
318 base class constructor (:meth:`Process.__init__`) before doing anything else
319 to the process.
320
321 .. method:: run()
322
323 Method representing the process's activity.
324
325 You may override this method in a subclass. The standard :meth:`run`
326 method invokes the callable object passed to the object's constructor as
327 the target argument, if any, with sequential and keyword arguments taken
328 from the *args* and *kwargs* arguments, respectively.
329
330 .. method:: start()
331
332 Start the process's activity.
333
334 This must be called at most once per process object. It arranges for the
335 object's :meth:`run` method to be invoked in a separate process.
336
337 .. method:: join([timeout])
338
339 Block the calling thread until the process whose :meth:`join` method is
340 called terminates or until the optional timeout occurs.
341
342 If *timeout* is ``None`` then there is no timeout.
343
344 A process can be joined many times.
345
346 A process cannot join itself because this would cause a deadlock. It is
347 an error to attempt to join a process before it has been started.
348
349 .. method:: get_name()
350
351 Return the process's name.
352
353 .. method:: set_name(name)
354
355 Set the process's name.
356
357 The name is a string used for identification purposes only. It has no
358 semantics. Multiple processes may be given the same name. The initial
359 name is set by the constructor.
360
361 .. method:: is_alive()
362
363 Return whether the process is alive.
364
365 Roughly, a process object is alive from the moment the :meth:`start`
366 method returns until the child process terminates.
367
368 .. method:: is_daemon()
369
370 Return the process's daemon flag.
371
372 .. method:: set_daemon(daemonic)
373
374 Set the process's daemon flag to the Boolean value *daemonic*. This must
375 be called before :meth:`start` is called.
376
377 The initial value is inherited from the creating process.
378
379 When a process exits, it attempts to terminate all of its daemonic child
380 processes.
381
382 Note that a daemonic process is not allowed to create child processes.
383 Otherwise a daemonic process would leave its children orphaned if it gets
384 terminated when its parent process exits.
385
386 In addition process objects also support the following methods:
387
388 .. method:: get_pid()
389
390 Return the process ID. Before the process is spawned, this will be
391 ``None``.
392
393 .. method:: get_exit_code()
394
395 Return the child's exit code. This will be ``None`` if the process has
396 not yet terminated. A negative value *-N* indicates that the child was
397 terminated by signal *N*.
398
399 .. method:: get_auth_key()
400
401 Return the process's authentication key (a byte string).
402
403 When :mod:`multiprocessing` is initialized the main process is assigned a
404 random string using :func:`os.random`.
405
406 When a :class:`Process` object is created, it will inherit the
407 authentication key of its parent process, although this may be changed
408 using :meth:`set_auth_key` below.
409
410 See :ref:`multiprocessing-auth-keys`.
411
412 .. method:: set_auth_key(authkey)
413
414 Set the process's authentication key which must be a byte string.
415
416 .. method:: terminate()`
417
418 Terminate the process. On Unix this is done using the ``SIGTERM`` signal,
419 on Windows ``TerminateProcess()`` is used. Note that exit handlers and
420 finally clauses etc will not be executed.
421
422 Note that descendant processes of the process will *not* be terminated --
423 they will simply become orphaned.
424
425 .. warning::
426
427 If this method is used when the associated process is using a pipe or
428 queue then the pipe or queue is liable to become corrupted and may
429 become unusable by other process. Similarly, if the process has
430 acquired a lock or semaphore etc. then terminating it is liable to
431 cause other processes to deadlock.
432
433 Note that the :meth:`start`, :meth:`join`, :meth:`is_alive` and
434 :meth:`get_exit_code` methods should only be called by the process that
435 created the process object.
436
437 Example usage of some of the methods of :class:`Process`::
438
439 >>> import processing, time, signal
440 >>> p = processing.Process(target=time.sleep, args=(1000,))
441 >>> print p, p.is_alive()
442 <Process(Process-1, initial)> False
443 >>> p.start()
444 >>> print p, p.is_alive()
445 <Process(Process-1, started)> True
446 >>> p.terminate()
447 >>> print p, p.is_alive()
448 <Process(Process-1, stopped[SIGTERM])> False
449 >>> p.get_exit_code() == -signal.SIGTERM
450 True
451
452
453.. exception:: BufferTooShort
454
455 Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied
456 buffer object is too small for the message read.
457
458 If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
459 the message as a byte string.
460
461
462Pipes and Queues
463~~~~~~~~~~~~~~~~
464
465When using multiple processes, one generally uses message passing for
466communication between processes and avoids having to use any synchronization
467primitives like locks.
468
469For passing messages one can use :func:`Pipe` (for a connection between two
470processes) or a queue (which allows multiple producers and consumers).
471
472The :class:`Queue` and :class:`JoinableQueue` types are multi-producer,
473multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the
474standard library. They differ in that :class:`Queue` lacks the
475:meth:`task_done` and :meth:`join` methods introduced into Python 2.5's
476:class:`Queue.Queue` class.
477
478If you use :class:`JoinableQueue` then you **must** call
479:meth:`JoinableQueue.task_done` for each task removed from the queue or else the
480semaphore used to count the number of unfinished tasks may eventually overflow
481raising an exception.
482
483.. note::
484
485 :mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and
486 :exc:`Queue.Full` exceptions to signal a timeout. They are not available in
487 the :mod:`multiprocessing` namespace so you need to import them from
488 :mod:`Queue`.
489
490
491.. warning::
492
493 If a process is killed using :meth:`Process.terminate` or :func:`os.kill`
494 while it is trying to use a :class:`Queue`, then the data in the queue is
495 likely to become corrupted. This may cause any other processes to get an
496 exception when it tries to use the queue later on.
497
498.. warning::
499
500 As mentioned above, if a child process has put items on a queue (and it has
501 not used :meth:`JoinableQueue.cancel_join_thread`), then that process will
502 not terminate until all buffered items have been flushed to the pipe.
503
504 This means that if you try joining that process you may get a deadlock unless
505 you are sure that all items which have been put on the queue have been
506 consumed. Similarly, if the child process is non-daemonic then the parent
507 process may hang on exit when it tries to join all it non-daemonic children.
508
509 Note that a queue created using a manager does not have this issue. See
510 :ref:`multiprocessing-programming`.
511
512Note that one can also create a shared queue by using a manager object -- see
513:ref:`multiprocessing-managers`.
514
515For an example of the usage of queues for interprocess communication see
516:ref:`multiprocessing-examples`.
517
518
519.. function:: Pipe([duplex])
520
521 Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing
522 the ends of a pipe.
523
524 If *duplex* is ``True`` (the default) then the pipe is bidirectional. If
525 *duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be
526 used for receiving messages and ``conn2`` can only be used for sending
527 messages.
528
529
530.. class:: Queue([maxsize])
531
532 Returns a process shared queue implemented using a pipe and a few
533 locks/semaphores. When a process first puts an item on the queue a feeder
534 thread is started which transfers objects from a buffer into the pipe.
535
536 The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the
537 standard library's :mod:`Queue` module are raised to signal timeouts.
538
539 :class:`Queue` implements all the methods of :class:`Queue.Queue` except for
540 :meth:`task_done` and :meth:`join`.
541
542 .. method:: qsize()
543
544 Return the approximate size of the queue. Because of
545 multithreading/multiprocessing semantics, this number is not reliable.
546
547 Note that this may raise :exc:`NotImplementedError` on Unix platforms like
548 MacOS X where ``sem_getvalue()`` is not implemented.
549
550 .. method:: empty()
551
552 Return ``True`` if the queue is empty, ``False`` otherwise. Because of
553 multithreading/multiprocessing semantics, this is not reliable.
554
555 .. method:: full()
556
557 Return ``True`` if the queue is full, ``False`` otherwise. Because of
558 multithreading/multiprocessing semantics, this is not reliable.
559
560 .. method:: put(item[, block[, timeout]])`
561
562 Put item into the queue. If optional args *block* is ``True`` (the
563 default) and *timeout* is ``None`` (the default), block if necessary until
564 a free slot is available. If *timeout* is a positive number, it blocks at
565 most *timeout* seconds and raises the :exc:`Queue.Full` exception if no
566 free slot was available within that time. Otherwise (*block* is
567 ``False``), put an item on the queue if a free slot is immediately
568 available, else raise the :exc:`Queue.Full` exception (*timeout* is
569 ignored in that case).
570
571 .. method:: put_nowait(item)
572
573 Equivalent to ``put(item, False)``.
574
575 .. method:: get([block[, timeout]])
576
577 Remove and return an item from the queue. If optional args *block* is
578 ``True`` (the default) and *timeout* is ``None`` (the default), block if
579 necessary until an item is available. If *timeout* is a positive number,
580 it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty`
581 exception if no item was available within that time. Otherwise (block is
582 ``False``), return an item if one is immediately available, else raise the
583 :exc:`Queue.Empty` exception (*timeout* is ignored in that case).
584
585 .. method:: get_nowait()
586 get_no_wait()
587
588 Equivalent to ``get(False)``.
589
590 :class:`multiprocessing.Queue` has a few additional methods not found in
591 :class:`Queue.Queue` which are usually unnecessary:
592
593 .. method:: close()
594
595 Indicate that no more data will be put on this queue by the current
596 process. The background thread will quit once it has flushed all buffered
597 data to the pipe. This is called automatically when the queue is garbage
598 collected.
599
600 .. method:: join_thread()
601
602 Join the background thread. This can only be used after :meth:`close` has
603 been called. It blocks until the background thread exits, ensuring that
604 all data in the buffer has been flushed to the pipe.
605
606 By default if a process is not the creator of the queue then on exit it
607 will attempt to join the queue's background thread. The process can call
608 :meth:`cancel_join_thread()` to make :meth:`join_thread()` do nothing.
609
610 .. method:: cancel_join_thread()
611
612 Prevent :meth:`join_thread` from blocking. In particular, this prevents
613 the background thread from being joined automatically when the process
614 exits -- see :meth:`join_thread()`.
615
616
617.. class:: JoinableQueue([maxsize])
618
619 :class:`JoinableQueue`, a :class:`Queue` subclass, is a queue which
620 additionally has :meth:`task_done` and :meth:`join` methods.
621
622 .. method:: task_done()
623
624 Indicate that a formerly enqueued task is complete. Used by queue consumer
625 threads. For each :meth:`get` used to fetch a task, a subsequent call to
626 :meth:`task_done` tells the queue that the processing on the task is
627 complete.
628
629 If a :meth:`join` is currently blocking, it will resume when all items
630 have been processed (meaning that a :meth:`task_done` call was received
631 for every item that had been :meth:`put` into the queue).
632
633 Raises a :exc:`ValueError` if called more times than there were items
634 placed in the queue.
635
636
637 .. method:: join()
638
639 Block until all items in the queue have been gotten and processed.
640
641 The count of unfinished tasks goes up whenever an item is added to the
642 queue. The count goes down whenever a consumer thread calls
643 :meth:`task_done` to indicate that the item was retrieved and all work on
644 it is complete. When the count of unfinished tasks drops to zero,
645 :meth:`join` unblocks.
646
647
648Miscellaneous
649~~~~~~~~~~~~~
650
651.. function:: active_children()
652
653 Return list of all live children of the current process.
654
655 Calling this has the side affect of "joining" any processes which have
656 already finished.
657
658.. function:: cpu_count()
659
660 Return the number of CPUs in the system. May raise
661 :exc:`NotImplementedError`.
662
663.. function:: current_process()
664
665 Return the :class:`Process` object corresponding to the current process.
666
667 An analogue of :func:`threading.current_thread`.
668
669.. function:: freeze_support()
670
671 Add support for when a program which uses :mod:`multiprocessing` has been
672 frozen to produce a Windows executable. (Has been tested with **py2exe**,
673 **PyInstaller** and **cx_Freeze**.)
674
675 One needs to call this function straight after the ``if __name__ ==
676 '__main__'`` line of the main module. For example::
677
678 from multiprocessing import Process, freeze_support
679
680 def f():
681 print 'hello world!'
682
683 if __name__ == '__main__':
684 freeze_support()
685 Process(target=f).start()
686
687 If the :func:`freeze_support()` line is missed out then trying to run the
688 frozen executable will raise :exc:`RuntimeError`.
689
690 If the module is being run normally by the Python interpreter then
691 :func:`freeze_support()` has no effect.
692
693.. function:: set_executable()
694
695 Sets the path of the python interpreter to use when starting a child process.
696 (By default `sys.executable` is used). Embedders will probably need to do
697 some thing like ::
698
699 setExecutable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
700
701 before they can create child processes. (Windows only)
702
703
704.. note::
705
706 :mod:`multiprocessing` contains no analogues of
707 :func:`threading.active_count`, :func:`threading.enumerate`,
708 :func:`threading.settrace`, :func:`threading.setprofile`,
709 :class:`threading.Timer`, or :class:`threading.local`.
710
711
712Connection Objects
713~~~~~~~~~~~~~~~~~~
714
715Connection objects allow the sending and receiving of picklable objects or
716strings. They can be thought of as message oriented connected sockets.
717
718Connection objects usually created using :func:`Pipe()` -- see also
719:ref:`multiprocessing-listeners-clients`.
720
721.. class:: Connection
722
723 .. method:: send(obj)
724
725 Send an object to the other end of the connection which should be read
726 using :meth:`recv`.
727
728 The object must be picklable.
729
730 .. method:: recv()
731
732 Return an object sent from the other end of the connection using
733 :meth:`send`. Raises :exc:`EOFError` if there is nothing left to receive
734 and the other end was closed.
735
736 .. method:: fileno()
737
738 Returns the file descriptor or handle used by the connection.
739
740 .. method:: close()
741
742 Close the connection.
743
744 This is called automatically when the connection is garbage collected.
745
746 .. method:: poll([timeout])
747
748 Return whether there is any data available to be read.
749
750 If *timeout* is not specified then it will return immediately. If
751 *timeout* is a number then this specifies the maximum time in seconds to
752 block. If *timeout* is ``None`` then an infinite timeout is used.
753
754 .. method:: send_bytes(buffer[, offset[, size]])
755
756 Send byte data from an object supporting the buffer interface as a
757 complete message.
758
759 If *offset* is given then data is read from that position in *buffer*. If
760 *size* is given then that many bytes will be read from buffer.
761
762 .. method:: recv_bytes([maxlength])
763
764 Return a complete message of byte data sent from the other end of the
765 connection as a string. Raises :exc:`EOFError` if there is nothing left
766 to receive and the other end has closed.
767
768 If *maxlength* is specified and the message is longer than *maxlength*
769 then :exc:`IOError` is raised and the connection will no longer be
770 readable.
771
772 .. method:: recv_bytes_into(buffer[, offset])
773
774 Read into *buffer* a complete message of byte data sent from the other end
775 of the connection and return the number of bytes in the message. Raises
776 :exc:`EOFError` if there is nothing left to receive and the other end was
777 closed.
778
779 *buffer* must be an object satisfying the writable buffer interface. If
780 *offset* is given then the message will be written into the buffer from
781 *that position. Offset must be a non-negative integer less than the
782 *length of *buffer* (in bytes).
783
784 If the buffer is too short then a :exc:`BufferTooShort` exception is
785 raised and the complete message is available as ``e.args[0]`` where ``e``
786 is the exception instance.
787
788
789For example:
790
791 >>> from multiprocessing import Pipe
792 >>> a, b = Pipe()
793 >>> a.send([1, 'hello', None])
794 >>> b.recv()
795 [1, 'hello', None]
796 >>> b.send_bytes('thank you')
797 >>> a.recv_bytes()
798 'thank you'
799 >>> import array
800 >>> arr1 = array.array('i', range(5))
801 >>> arr2 = array.array('i', [0] * 10)
802 >>> a.send_bytes(arr1)
803 >>> count = b.recv_bytes_into(arr2)
804 >>> assert count == len(arr1) * arr1.itemsize
805 >>> arr2
806 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
807
808
809.. warning::
810
811 The :meth:`Connection.recv` method automatically unpickles the data it
812 receives, which can be a security risk unless you can trust the process
813 which sent the message.
814
815 Therefore, unless the connection object was produced using :func:`Pipe()`
816 you should only use the `recv()` and `send()` methods after performing some
817 sort of authentication. See :ref:`multiprocessing-auth-keys`.
818
819.. warning::
820
821 If a process is killed while it is trying to read or write to a pipe then
822 the data in the pipe is likely to become corrupted, because it may become
823 impossible to be sure where the message boundaries lie.
824
825
826Synchronization primitives
827~~~~~~~~~~~~~~~~~~~~~~~~~~
828
829Generally synchronization primitives are not as necessary in a multiprocess
830program as they are in a mulithreaded program. See the documentation for the
831standard library's :mod:`threading` module.
832
833Note that one can also create synchronization primitives by using a manager
834object -- see :ref:`multiprocessing-managers`.
835
836.. class:: BoundedSemaphore([value])
837
838 A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`.
839
840 (On Mac OSX this is indistiguishable from :class:`Semaphore` because
841 ``sem_getvalue()`` is not implemented on that platform).
842
843.. class:: Condition([lock])
844
845 A condition variable: a clone of `threading.Condition`.
846
847 If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
848 object from :mod:`multiprocessing`.
849
850.. class:: Event()
851
852 A clone of :class:`threading.Event`.
853
854.. class:: Lock()
855
856 A non-recursive lock object: a clone of :class:`threading.Lock`.
857
858.. class:: RLock()
859
860 A recursive lock object: a clone of :class:`threading.RLock`.
861
862.. class:: Semaphore([value])
863
864 A bounded semaphore object: a clone of :class:`threading.Semaphore`.
865
866.. note::
867
868 The :meth:`acquire()` method of :class:`BoundedSemaphore`, :class:`Lock`,
869 :class:`RLock` and :class:`Semaphore` has a timeout parameter not supported
870 by the equivalents in :mod:`threading`. The signature is
871 ``acquire(block=True, timeout=None)`` with keyword parameters being
872 acceptable. If *block* is ``True`` and *timeout* is not ``None`` then it
873 specifies a timeout in seconds. If *block* is ``False`` then *timeout* is
874 ignored.
875
876.. note::
877
878 If the SIGINT signal generated by Ctrl-C arrives while the main thread is
879 blocked by a call to :meth:`BoundedSemaphore.acquire`, :meth:`Lock.acquire`,
880 :meth:`RLock.acquire`, :meth:`Semaphore.acquire`, :meth:`Condition.acquire`
881 or :meth:`Condition.wait` then the call will be immediately interrupted and
882 :exc:`KeyboardInterrupt` will be raised.
883
884 This differs from the behaviour of :mod:`threading` where SIGINT will be
885 ignored while the equivalent blocking calls are in progress.
886
887
888Shared :mod:`ctypes` Objects
889~~~~~~~~~~~~~~~~~~~~~~~~~~~~
890
891It is possible to create shared objects using shared memory which can be
892inherited by child processes.
893
894.. function:: Value(typecode_or_type[, lock[, *args]])
895
896 Return a :mod:`ctypes` object allocated from shared memory. By default the
897 return value is actually a synchronized wrapper for the object.
898
899 *typecode_or_type* determines the type of the returned object: it is either a
900 ctypes type or a one character typecode of the kind used by the :mod:`array`
901 module. *\*args* is passed on to the constructor for the type.
902
903 If *lock* is ``True`` (the default) then a new lock object is created to
904 synchronize access to the value. If *lock* is a :class:`Lock` or
905 :class:`RLock` object then that will be used to synchronize access to the
906 value. If *lock* is ``False`` then access to the returned object will not be
907 automatically protected by a lock, so it will not necessarily be
908 "process-safe".
909
910 Note that *lock* is a keyword-only argument.
911
912.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True)
913
914 Return a ctypes array allocated from shared memory. By default the return
915 value is actually a synchronized wrapper for the array.
916
917 *typecode_or_type* determines the type of the elements of the returned array:
918 it is either a ctypes type or a one character typecode of the kind used by
919 the :mod:`array` module. If *size_or_initializer* is an integer, then it
920 determines the length of the array, and the array will be initially zeroed.
921 Otherwise, *size_or_initializer* is a sequence which is used to initialize
922 the array and whose length determines the length of the array.
923
924 If *lock* is ``True`` (the default) then a new lock object is created to
925 synchronize access to the value. If *lock* is a :class:`Lock` or
926 :class:`RLock` object then that will be used to synchronize access to the
927 value. If *lock* is ``False`` then access to the returned object will not be
928 automatically protected by a lock, so it will not necessarily be
929 "process-safe".
930
931 Note that *lock* is a keyword only argument.
932
933 Note that an array of :data:`ctypes.c_char` has *value* and *rawvalue*
934 attributes which allow one to use it to store and retrieve strings.
935
936
937The :mod:`multiprocessing.sharedctypes` module
938>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
939
940.. module:: multiprocessing.sharedctypes
941 :synopsis: Allocate ctypes objects from shared memory.
942
943The :mod:`multiprocessing.sharedctypes` module provides functions for allocating
944:mod:`ctypes` objects from shared memory which can be inherited by child
945processes.
946
947.. note::
948
949 Although it is posible to store a pointer in shared memory remember that this
950 will refer to a location in the address space of a specific process.
951 However, the pointer is quite likely to be invalid in the context of a second
952 process and trying to dereference the pointer from the second process may
953 cause a crash.
954
955.. function:: RawArray(typecode_or_type, size_or_initializer)
956
957 Return a ctypes array allocated from shared memory.
958
959 *typecode_or_type* determines the type of the elements of the returned array:
960 it is either a ctypes type or a one character typecode of the kind used by
961 the :mod:`array` module. If *size_or_initializer* is an integer then it
962 determines the length of the array, and the array will be initially zeroed.
963 Otherwise *size_or_initializer* is a sequence which is used to initialize the
964 array and whose length determines the length of the array.
965
966 Note that setting and getting an element is potentially non-atomic -- use
967 :func:`Array` instead to make sure that access is automatically synchronized
968 using a lock.
969
970.. function:: RawValue(typecode_or_type, *args)
971
972 Return a ctypes object allocated from shared memory.
973
974 *typecode_or_type* determines the type of the returned object: it is either a
975 ctypes type or a one character typecode of the kind used by the :mod:`array`
976 module. */*args* is passed on to the constructor for the type.
977
978 Note that setting and getting the value is potentially non-atomic -- use
979 :func:`Value` instead to make sure that access is automatically synchronized
980 using a lock.
981
982 Note that an array of :data:`ctypes.c_char` has ``value`` and ``rawvalue``
983 attributes which allow one to use it to store and retrieve strings -- see
984 documentation for :mod:`ctypes`.
985
986.. function:: Array(typecode_or_type, size_or_initializer[, lock[, *args]])
987
988 The same as :func:`RawArray` except that depending on the value of *lock* a
989 process-safe synchronization wrapper may be returned instead of a raw ctypes
990 array.
991
992 If *lock* is ``True`` (the default) then a new lock object is created to
993 synchronize access to the value. If *lock* is a :class:`Lock` or
994 :class:`RLock` object then that will be used to synchronize access to the
995 value. If *lock* is ``False`` then access to the returned object will not be
996 automatically protected by a lock, so it will not necessarily be
997 "process-safe".
998
999 Note that *lock* is a keyword-only argument.
1000
1001.. function:: Value(typecode_or_type, *args[, lock])
1002
1003 The same as :func:`RawValue` except that depending on the value of *lock* a
1004 process-safe synchronization wrapper may be returned instead of a raw ctypes
1005 object.
1006
1007 If *lock* is ``True`` (the default) then a new lock object is created to
1008 synchronize access to the value. If *lock* is a :class:`Lock` or
1009 :class:`RLock` object then that will be used to synchronize access to the
1010 value. If *lock* is ``False`` then access to the returned object will not be
1011 automatically protected by a lock, so it will not necessarily be
1012 "process-safe".
1013
1014 Note that *lock* is a keyword-only argument.
1015
1016.. function:: copy(obj)
1017
1018 Return a ctypes object allocated from shared memory which is a copy of the
1019 ctypes object *obj*.
1020
1021.. function:: synchronized(obj[, lock])
1022
1023 Return a process-safe wrapper object for a ctypes object which uses *lock* to
1024 synchronize access. If *lock* is ``None`` (the default) then a
1025 :class:`multiprocessing.RLock` object is created automatically.
1026
1027 A synchronized wrapper will have two methods in addition to those of the
1028 object it wraps: :meth:`get_obj()` returns the wrapped object and
1029 :meth:`get_lock()` returns the lock object used for synchronization.
1030
1031 Note that accessing the ctypes object through the wrapper can be a lot slower
1032 han accessing the raw ctypes object.
1033
1034
1035The table below compares the syntax for creating shared ctypes objects from
1036shared memory with the normal ctypes syntax. (In the table ``MyStruct`` is some
1037subclass of :class:`ctypes.Structure`.)
1038
1039==================== ========================== ===========================
1040ctypes sharedctypes using type sharedctypes using typecode
1041==================== ========================== ===========================
1042c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4)
1043MyStruct(4, 6) RawValue(MyStruct, 4, 6)
1044(c_short * 7)() RawArray(c_short, 7) RawArray('h', 7)
1045(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))
1046==================== ========================== ===========================
1047
1048
1049Below is an example where a number of ctypes objects are modified by a child
1050process::
1051
1052 from multiprocessing import Process, Lock
1053 from multiprocessing.sharedctypes import Value, Array
1054 from ctypes import Structure, c_double
1055
1056 class Point(Structure):
1057 _fields_ = [('x', c_double), ('y', c_double)]
1058
1059 def modify(n, x, s, A):
1060 n.value **= 2
1061 x.value **= 2
1062 s.value = s.value.upper()
1063 for a in A:
1064 a.x **= 2
1065 a.y **= 2
1066
1067 if __name__ == '__main__':
1068 lock = Lock()
1069
1070 n = Value('i', 7)
1071 x = Value(ctypes.c_double, 1.0/3.0, lock=False)
1072 s = Array('c', 'hello world', lock=lock)
1073 A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
1074
1075 p = Process(target=modify, args=(n, x, s, A))
1076 p.start()
1077 p.join()
1078
1079 print n.value
1080 print x.value
1081 print s.value
1082 print [(a.x, a.y) for a in A]
1083
1084
1085.. highlightlang:: none
1086
1087The results printed are ::
1088
1089 49
1090 0.1111111111111111
1091 HELLO WORLD
1092 [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
1093
1094.. highlightlang:: python
1095
1096
1097.. _multiprocessing-managers:
1098
1099Managers
1100~~~~~~~~
1101
1102Managers provide a way to create data which can be shared between different
1103processes. A manager object controls a server process which manages *shared
1104objects*. Other processes can access the shared objects by using proxies.
1105
1106.. function:: multiprocessing.Manager()
1107
1108 Returns a started :class:`SyncManager` object which can be used for sharing
1109 objects between processes. The returned manager object corresponds to a
1110 spawned child process and has methods which will create shared objects and
1111 return corresponding proxies.
1112
1113.. module:: multiprocessing.managers
1114 :synopsis: Share data between process with shared objects.
1115
1116Manager processes will be shutdown as soon as they are garbage collected or
1117their parent process exits. The manager classes are defined in the
1118:mod:`multiprocessing.managers` module:
1119
1120.. class:: BaseManager([address[, authkey]])
1121
1122 Create a BaseManager object.
1123
1124 Once created one should call :meth:`start` or :meth:`serve_forever` to ensure
1125 that the manager object refers to a started manager process.
1126
1127 *address* is the address on which the manager process listens for new
1128 connections. If *address* is ``None`` then an arbitrary one is chosen.
1129
1130 *authkey* is the authentication key which will be used to check the validity
1131 of incoming connections to the server process. If *authkey* is ``None`` then
1132 ``current_process().get_auth_key()``. Otherwise *authkey* is used and it
1133 must be a string.
1134
1135 .. method:: start()
1136
1137 Start a subprocess to start the manager.
1138
1139 .. method:: server_forever()
1140
1141 Run the server in the current process.
1142
1143 .. method:: from_address(address, authkey)
1144
1145 A class method which creates a manager object referring to a pre-existing
1146 server process which is using the given address and authentication key.
1147
1148 .. method:: shutdown()
1149
1150 Stop the process used by the manager. This is only available if
1151 meth:`start` has been used to start the server process.
1152
1153 This can be called multiple times.
1154
1155 .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
1156
1157 A classmethod which can be used for registering a type or callable with
1158 the manager class.
1159
1160 *typeid* is a "type identifier" which is used to identify a particular
1161 type of shared object. This must be a string.
1162
1163 *callable* is a callable used for creating objects for this type
1164 identifier. If a manager instance will be created using the
1165 :meth:`from_address()` classmethod or if the *create_method* argument is
1166 ``False`` then this can be left as ``None``.
1167
1168 *proxytype* is a subclass of :class:`multiprocessing.managers.BaseProxy`
1169 which is used to create proxies for shared objects with this *typeid*. If
1170 ``None`` then a proxy class is created automatically.
1171
1172 *exposed* is used to specify a sequence of method names which proxies for
1173 this typeid should be allowed to access using
1174 :meth:`BaseProxy._callMethod`. (If *exposed* is ``None`` then
1175 :attr:`proxytype._exposed_` is used instead if it exists.) In the case
1176 where no exposed list is specified, all "public methods" of the shared
1177 object will be accessible. (Here a "public method" means any attribute
1178 which has a ``__call__()`` method and whose name does not begin with
1179 ``'_'``.)
1180
1181 *method_to_typeid* is a mapping used to specify the return type of those
1182 exposed methods which should return a proxy. It maps method names to
1183 typeid strings. (If *method_to_typeid* is ``None`` then
1184 :attr:`proxytype._method_to_typeid_` is used instead if it exists.) If a
1185 method's name is not a key of this mapping or if the mapping is ``None``
1186 then the object returned by the method will be copied by value.
1187
1188 *create_method* determines whether a method should be created with name
1189 *typeid* which can be used to tell the server process to create a new
1190 shared object and return a proxy for it. By default it is ``True``.
1191
1192 :class:`BaseManager` instances also have one read-only property:
1193
1194 .. attribute:: address
1195
1196 The address used by the manager.
1197
1198
1199.. class:: SyncManager
1200
1201 A subclass of :class:`BaseManager` which can be used for the synchronization
1202 of processes. Objects of this type are returned by
1203 :func:`multiprocessing.Manager()`.
1204
1205 It also supports creation of shared lists and dictionaries.
1206
1207 .. method:: BoundedSemaphore([value])
1208
1209 Create a shared :class:`threading.BoundedSemaphore` object and return a
1210 proxy for it.
1211
1212 .. method:: Condition([lock])
1213
1214 Create a shared :class:`threading.Condition` object and return a proxy for
1215 it.
1216
1217 If *lock* is supplied then it should be a proxy for a
1218 :class:`threading.Lock` or :class:`threading.RLock` object.
1219
1220 .. method:: Event()
1221
1222 Create a shared :class:`threading.Event` object and return a proxy for it.
1223
1224 .. method:: Lock()
1225
1226 Create a shared :class:`threading.Lock` object and return a proxy for it.
1227
1228 .. method:: Namespace()
1229
1230 Create a shared :class:`Namespace` object and return a proxy for it.
1231
1232 .. method:: Queue([maxsize])
1233
1234 Create a shared `Queue.Queue` object and return a proxy for it.
1235
1236 .. method:: RLock()
1237
1238 Create a shared :class:`threading.RLock` object and return a proxy for it.
1239
1240 .. method:: Semaphore([value])
1241
1242 Create a shared :class:`threading.Semaphore` object and return a proxy for
1243 it.
1244
1245 .. method:: Array(typecode, sequence)
1246
1247 Create an array and return a proxy for it. (*format* is ignored.)
1248
1249 .. method:: Value(typecode, value)
1250
1251 Create an object with a writable ``value`` attribute and return a proxy
1252 for it.
1253
1254 .. method:: dict()
1255 dict(mapping)
1256 dict(sequence)
1257
1258 Create a shared ``dict`` object and return a proxy for it.
1259
1260 .. method:: list()
1261 list(sequence)
1262
1263 Create a shared ``list`` object and return a proxy for it.
1264
1265
1266Namespace objects
1267>>>>>>>>>>>>>>>>>
1268
1269A namespace object has no public methods, but does have writable attributes.
1270Its representation shows the values of its attributes.
1271
1272However, when using a proxy for a namespace object, an attribute beginning with
1273``'_'`` will be an attribute of the proxy and not an attribute of the referent::
1274
1275 >>> manager = multiprocessing.Manager()
1276 >>> Global = manager.Namespace()
1277 >>> Global.x = 10
1278 >>> Global.y = 'hello'
1279 >>> Global._z = 12.3 # this is an attribute of the proxy
1280 >>> print Global
1281 Namespace(x=10, y='hello')
1282
1283
1284Customized managers
1285>>>>>>>>>>>>>>>>>>>
1286
1287To create one's own manager, one creates a subclass of :class:`BaseManager` and
1288use the :meth:`resgister()` classmethod to register new types or callables with
1289the manager class. For example::
1290
1291 from multiprocessing.managers import BaseManager
1292
1293 class MathsClass(object):
1294 def add(self, x, y):
1295 return x + y
1296 def mul(self, x, y):
1297 return x * y
1298
1299 class MyManager(BaseManager):
1300 pass
1301
1302 MyManager.register('Maths', MathsClass)
1303
1304 if __name__ == '__main__':
1305 manager = MyManager()
1306 manager.start()
1307 maths = manager.Maths()
1308 print maths.add(4, 3) # prints 7
1309 print maths.mul(7, 8) # prints 56
1310
1311
1312Using a remote manager
1313>>>>>>>>>>>>>>>>>>>>>>
1314
1315It is possible to run a manager server on one machine and have clients use it
1316from other machines (assuming that the firewalls involved allow it).
1317
1318Running the following commands creates a server for a single shared queue which
1319remote clients can access::
1320
1321 >>> from multiprocessing.managers import BaseManager
1322 >>> import Queue
1323 >>> queue = Queue.Queue()
1324 >>> class QueueManager(BaseManager): pass
1325 ...
1326 >>> QueueManager.register('getQueue', callable=lambda:queue)
1327 >>> m = QueueManager(address=('', 50000), authkey='abracadabra')
1328 >>> m.serveForever()
1329
1330One client can access the server as follows::
1331
1332 >>> from multiprocessing.managers import BaseManager
1333 >>> class QueueManager(BaseManager): pass
1334 ...
1335 >>> QueueManager.register('getQueue')
1336 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000),
1337 >>> authkey='abracadabra')
1338 >>> queue = m.getQueue()
1339 >>> queue.put('hello')
1340
1341Another client can also use it::
1342
1343 >>> from multiprocessing.managers import BaseManager
1344 >>> class QueueManager(BaseManager): pass
1345 ...
1346 >>> QueueManager.register('getQueue')
1347 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra')
1348 >>> queue = m.getQueue()
1349 >>> queue.get()
1350 'hello'
1351
1352
1353Proxy Objects
1354~~~~~~~~~~~~~
1355
1356A proxy is an object which *refers* to a shared object which lives (presumably)
1357in a different process. The shared object is said to be the *referent* of the
1358proxy. Multiple proxy objects may have the same referent.
1359
1360A proxy object has methods which invoke corresponding methods of its referent
1361(although not every method of the referent will necessarily be available through
1362the proxy). A proxy can usually be used in most of the same ways that its
1363referent can::
1364
1365 >>> from multiprocessing import Manager
1366 >>> manager = Manager()
1367 >>> l = manager.list([i*i for i in range(10)])
1368 >>> print l
1369 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
1370 >>> print repr(l)
1371 <ListProxy object, typeid 'list' at 0xb799974c>
1372 >>> l[4]
1373 16
1374 >>> l[2:5]
1375 [4, 9, 16]
1376
1377Notice that applying :func:`str` to a proxy will return the representation of
1378the referent, whereas applying :func:`repr` will return the representation of
1379the proxy.
1380
1381An important feature of proxy objects is that they are picklable so they can be
1382passed between processes. Note, however, that if a proxy is sent to the
1383corresponding manager's process then unpickling it will produce the referent
1384itself. This means, for example, that one shared object can contain a second::
1385
1386 >>> a = manager.list()
1387 >>> b = manager.list()
1388 >>> a.append(b) # referent of `a` now contains referent of `b`
1389 >>> print a, b
1390 [[]] []
1391 >>> b.append('hello')
1392 >>> print a, b
1393 [['hello']] ['hello']
1394
1395.. note::
1396
1397 The proxy types in :mod:`multiprocessing` do nothing to support comparisons
1398 by value. So, for instance, ::
1399
1400 manager.list([1,2,3]) == [1,2,3]
1401
1402 will return ``False``. One should just use a copy of the referent instead
1403 when making comparisons.
1404
1405.. class:: BaseProxy
1406
1407 Proxy objects are instances of subclasses of :class:`BaseProxy`.
1408
1409 .. method:: _call_method(methodname[, args[, kwds]])
1410
1411 Call and return the result of a method of the proxy's referent.
1412
1413 If ``proxy`` is a proxy whose referent is ``obj`` then the expression ::
1414
1415 proxy._call_method(methodname, args, kwds)
1416
1417 will evaluate the expression ::
1418
1419 getattr(obj, methodname)(*args, **kwds)
1420
1421 in the manager's process.
1422
1423 The returned value will be a copy of the result of the call or a proxy to
1424 a new shared object -- see documentation for the *method_to_typeid*
1425 argument of :meth:`BaseManager.register`.
1426
1427 If an exception is raised by the call, then then is re-raised by
1428 :meth:`_call_method`. If some other exception is raised in the manager's
1429 process then this is converted into a :exc:`RemoteError` exception and is
1430 raised by :meth:`_call_method`.
1431
1432 Note in particular that an exception will be raised if *methodname* has
1433 not been *exposed*
1434
1435 An example of the usage of :meth:`_call_method()`::
1436
1437 >>> l = manager.list(range(10))
1438 >>> l._call_method('__len__')
1439 10
1440 >>> l._call_method('__getslice__', (2, 7)) # equiv to `l[2:7]`
1441 [2, 3, 4, 5, 6]
1442 >>> l._call_method('__getitem__', (20,)) # equiv to `l[20]`
1443 Traceback (most recent call last):
1444 ...
1445 IndexError: list index out of range
1446
1447 .. method:: _get_value()
1448
1449 Return a copy of the referent.
1450
1451 If the referent is unpicklable then this will raise an exception.
1452
1453 .. method:: __repr__
1454
1455 Return a representation of the proxy object.
1456
1457 .. method:: __str__
1458
1459 Return the representation of the referent.
1460
1461
1462Cleanup
1463>>>>>>>
1464
1465A proxy object uses a weakref callback so that when it gets garbage collected it
1466deregisters itself from the manager which owns its referent.
1467
1468A shared object gets deleted from the manager process when there are no longer
1469any proxies referring to it.
1470
1471
1472Process Pools
1473~~~~~~~~~~~~~
1474
1475.. module:: multiprocessing.pool
1476 :synopsis: Create pools of processes.
1477
1478One can create a pool of processes which will carry out tasks submitted to it
1479with the :class:`Pool` class in :mod:`multiprocess.pool`.
1480
1481.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]])
1482
1483 A process pool object which controls a pool of worker processes to which jobs
1484 can be submitted. It supports asynchronous results with timeouts and
1485 callbacks and has a parallel map implementation.
1486
1487 *processes* is the number of worker processes to use. If *processes* is
1488 ``None`` then the number returned by :func:`cpu_count` is used. If
1489 *initializer* is not ``None`` then each worker process will call
1490 ``initializer(*initargs)`` when it starts.
1491
1492 .. method:: apply(func[, args[, kwds]])
1493
1494 Equivalent of the :func:`apply` builtin function. It blocks till the
1495 result is ready.
1496
1497 .. method:: apply_async(func[, args[, kwds[, callback]]])
1498
1499 A variant of the :meth:`apply` method which returns a result object.
1500
1501 If *callback* is specified then it should be a callable which accepts a
1502 single argument. When the result becomes ready *callback* is applied to
1503 it (unless the call failed). *callback* should complete immediately since
1504 otherwise the thread which handles the results will get blocked.
1505
1506 .. method:: map(func, iterable[, chunksize])
1507
1508 A parallel equivalent of the :func:`map` builtin function. It blocks till
1509 the result is ready.
1510
1511 This method chops the iterable into a number of chunks which it submits to
1512 the process pool as separate tasks. The (approximate) size of these
1513 chunks can be specified by setting *chunksize* to a positive integer.
1514
1515 .. method:: map_async(func, iterable[, chunksize[, callback]])
1516
1517 A variant of the :meth:`.map` method which returns a result object.
1518
1519 If *callback* is specified then it should be a callable which accepts a
1520 single argument. When the result becomes ready *callback* is applied to
1521 it (unless the call failed). *callback* should complete immediately since
1522 otherwise the thread which handles the results will get blocked.
1523
1524 .. method:: imap(func, iterable[, chunksize])
1525
1526 An equivalent of :func:`itertools.imap`.
1527
1528 The *chunksize* argument is the same as the one used by the :meth:`.map`
1529 method. For very long iterables using a large value for *chunksize* can
1530 make make the job complete **much** faster than using the default value of
1531 ``1``.
1532
1533 Also if *chunksize* is ``1`` then the :meth:`next` method of the iterator
1534 returned by the :meth:`imap` method has an optional *timeout* parameter:
1535 ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the
1536 result cannot be returned within *timeout* seconds.
1537
1538 .. method:: imap_unordered(func, iterable[, chunksize])
1539
1540 The same as :meth:`imap` except that the ordering of the results from the
1541 returned iterator should be considered arbitrary. (Only when there is
1542 only one worker process is the order guaranteed to be "correct".)
1543
1544 .. method:: close()
1545
1546 Prevents any more tasks from being submitted to the pool. Once all the
1547 tasks have been completed the worker processes will exit.
1548
1549 .. method:: terminate()
1550
1551 Stops the worker processes immediately without completing outstanding
1552 work. When the pool object is garbage collected :meth:`terminate` will be
1553 called immediately.
1554
1555 .. method:: join()
1556
1557 Wait for the worker processes to exit. One must call :meth:`close` or
1558 :meth:`terminate` before using :meth:`join`.
1559
1560
1561.. class:: AsyncResult
1562
1563 The class of the result returned by :meth:`Pool.apply_async` and
1564 :meth:`Pool.map_async`.
1565
1566 .. method:: get([timeout)
1567
1568 Return the result when it arrives. If *timeout* is not ``None`` and the
1569 result does not arrive within *timeout* seconds then
1570 :exc:`multiprocessing.TimeoutError` is raised. If the remote call raised
1571 an exception then that exception will be reraised by :meth:`get`.
1572
1573 .. method:: wait([timeout])
1574
1575 Wait until the result is available or until *timeout* seconds pass.
1576
1577 .. method:: ready()
1578
1579 Return whether the call has completed.
1580
1581 .. method:: successful()
1582
1583 Return whether the call completed without raising an exception. Will
1584 raise :exc:`AssertionError` if the result is not ready.
1585
1586The following example demonstrates the use of a pool::
1587
1588 from multiprocessing import Pool
1589
1590 def f(x):
1591 return x*x
1592
1593 if __name__ == '__main__':
1594 pool = Pool(processes=4) # start 4 worker processes
1595
1596 result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously
1597 print result.get(timeout=1) # prints "100" unless your computer is *very* slow
1598
1599 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
1600
1601 it = pool.imap(f, range(10))
1602 print it.next() # prints "0"
1603 print it.next() # prints "1"
1604 print it.next(timeout=1) # prints "4" unless your computer is *very* slow
1605
1606 import time
1607 result = pool.applyAsync(time.sleep, (10,))
1608 print result.get(timeout=1) # raises TimeoutError
1609
1610
1611.. _multiprocessing-listeners-clients:
1612
1613Listeners and Clients
1614~~~~~~~~~~~~~~~~~~~~~
1615
1616.. module:: multiprocessing.connection
1617 :synopsis: API for dealing with sockets.
1618
1619Usually message passing between processes is done using queues or by using
1620:class:`Connection` objects returned by :func:`Pipe`.
1621
1622However, the :mod:`multiprocessing.connection` module allows some extra
1623flexibility. It basically gives a high level message oriented API for dealing
1624with sockets or Windows named pipes, and also has support for *digest
1625authentication* using the :mod:`hmac` module from the standard library.
1626
1627
1628.. function:: deliver_challenge(connection, authkey)
1629
1630 Send a randomly generated message to the other end of the connection and wait
1631 for a reply.
1632
1633 If the reply matches the digest of the message using *authkey* as the key
1634 then a welcome message is sent to the other end of the connection. Otherwise
1635 :exc:`AuthenticationError` is raised.
1636
1637.. function:: answerChallenge(connection, authkey)
1638
1639 Receive a message, calculate the digest of the message using *authkey* as the
1640 key, and then send the digest back.
1641
1642 If a welcome message is not received, then :exc:`AuthenticationError` is
1643 raised.
1644
1645.. function:: Client(address[, family[, authenticate[, authkey]]])
1646
1647 Attempt to set up a connection to the listener which is using address
1648 *address*, returning a :class:`Connection`.
1649
1650 The type of the connection is determined by *family* argument, but this can
1651 generally be omitted since it can usually be inferred from the format of
1652 *address*. (See :ref:`multiprocessing-address-formats`)
1653
1654 If *authentication* is ``True`` or *authkey* is a string then digest
1655 authentication is used. The key used for authentication will be either
1656 *authkey* or ``current_process().get_auth_key()`` if *authkey* is ``None``.
1657 If authentication fails then :exc:`AuthenticationError` is raised. See
1658 :ref:`multiprocessing-auth-keys`.
1659
1660.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
1661
1662 A wrapper for a bound socket or Windows named pipe which is 'listening' for
1663 connections.
1664
1665 *address* is the address to be used by the bound socket or named pipe of the
1666 listener object.
1667
1668 *family* is the type of socket (or named pipe) to use. This can be one of
1669 the strings ``'AF_INET'`` (for a TCP socket), ``'AF_UNIX'`` (for a Unix
1670 domain socket) or ``'AF_PIPE'`` (for a Windows named pipe). Of these only
1671 the first is guaranteed to be available. If *family* is ``None`` then the
1672 family is inferred from the format of *address*. If *address* is also
1673 ``None`` then a default is chosen. This default is the family which is
1674 assumed to be the fastest available. See
1675 :ref:`multiprocessing-address-formats`. Note that if *family* is
1676 ``'AF_UNIX'`` and address is ``None`` then the socket will be created in a
1677 private temporary directory created using :func:`tempfile.mkstemp`.
1678
1679 If the listener object uses a socket then *backlog* (1 by default) is passed
1680 to the :meth:`listen` method of the socket once it has been bound.
1681
1682 If *authenticate* is ``True`` (``False`` by default) or *authkey* is not
1683 ``None`` then digest authentication is used.
1684
1685 If *authkey* is a string then it will be used as the authentication key;
1686 otherwise it must be *None*.
1687
1688 If *authkey* is ``None`` and *authenticate* is ``True`` then
1689 ``current_process().get_auth_key()`` is used as the authentication key. If
1690 *authkey* is ``None`` and *authentication* is ``False`` then no
1691 authentication is done. If authentication fails then
1692 :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`.
1693
1694 .. method:: accept()
1695
1696 Accept a connection on the bound socket or named pipe of the listener
1697 object and return a :class:`Connection` object. If authentication is
1698 attempted and fails, then :exc:`AuthenticationError` is raised.
1699
1700 .. method:: close()
1701
1702 Close the bound socket or named pipe of the listener object. This is
1703 called automatically when the listener is garbage collected. However it
1704 is advisable to call it explicitly.
1705
1706 Listener objects have the following read-only properties:
1707
1708 .. attribute:: address
1709
1710 The address which is being used by the Listener object.
1711
1712 .. attribute:: last_accepted
1713
1714 The address from which the last accepted connection came. If this is
1715 unavailable then it is ``None``.
1716
1717
1718The module defines two exceptions:
1719
1720.. exception:: AuthenticationError
1721
1722 Exception raised when there is an authentication error.
1723
1724.. exception:: BufferTooShort
1725
1726 Exception raise by the :meth:`Connection.recv_bytes_into` method of a
1727 connection object when the supplied buffer object is too small for the
1728 message read.
1729
1730 If *e* is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
1731 the message as a byte string.
1732
1733
1734**Examples**
1735
1736The following server code creates a listener which uses ``'secret password'`` as
1737an authentication key. It then waits for a connection and sends some data to
1738the client::
1739
1740 from multiprocessing.connection import Listener
1741 from array import array
1742
1743 address = ('localhost', 6000) # family is deduced to be 'AF_INET'
1744 listener = Listener(address, authkey='secret password')
1745
1746 conn = listener.accept()
1747 print 'connection accepted from', listener.last_accepted
1748
1749 conn.send([2.25, None, 'junk', float])
1750
1751 conn.send_bytes('hello')
1752
1753 conn.send_bytes(array('i', [42, 1729]))
1754
1755 conn.close()
1756 listener.close()
1757
1758The following code connects to the server and receives some data from the
1759server::
1760
1761 from multiprocessing.connection import Client
1762 from array import array
1763
1764 address = ('localhost', 6000)
1765 conn = Client(address, authkey='secret password')
1766
1767 print conn.recv() # => [2.25, None, 'junk', float]
1768
1769 print conn.recv_bytes() # => 'hello'
1770
1771 arr = array('i', [0, 0, 0, 0, 0])
1772 print conn.recv_bytes_into(arr) # => 8
1773 print arr # => array('i', [42, 1729, 0, 0, 0])
1774
1775 conn.close()
1776
1777
1778.. _multiprocessing-address-formats:
1779
1780Address Formats
1781>>>>>>>>>>>>>>>
1782
1783* An ``'AF_INET'`` address is a tuple of the form ``(hostname, port)``` where
1784 *hostname* is a string and *port* is an integer.
1785
1786* An ``'AF_UNIX'``` address is a string representing a filename on the
1787 filesystem.
1788
1789* An ``'AF_PIPE'`` address is a string of the form
1790 ``r'\\\\.\\pipe\\PipeName'``. To use :func:`Client` to connect to a named
1791 pipe on a remote computer called ServerName* one should use an address of the
1792 form ``r'\\\\ServerName\\pipe\\PipeName'`` instead.
1793
1794Note that any string beginning with two backslashes is assumed by default to be
1795an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address.
1796
1797
1798.. _multiprocessing-auth-keys:
1799
1800Authentication keys
1801~~~~~~~~~~~~~~~~~~~
1802
1803When one uses :meth:`Connection.recv`, the data received is automatically
1804unpickled. Unfortunately unpickling data from an untrusted source is a security
1805risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module
1806to provide digest authentication.
1807
1808An authentication key is a string which can be thought of as a password: once a
1809connection is established both ends will demand proof that the other knows the
1810authentication key. (Demonstrating that both ends are using the same key does
1811**not** involve sending the key over the connection.)
1812
1813If authentication is requested but do authentication key is specified then the
1814return value of ``current_process().get_auth_key`` is used (see
1815:class:`Process`). This value will automatically inherited by any
1816:class:`Process` object that the current process creates. This means that (by
1817default) all processes of a multi-process program will share a single
1818authentication key which can be used when setting up connections between the
1819themselves.
1820
1821Suitable authentication keys can also be generated by using :func:`os.urandom`.
1822
1823
1824Logging
1825~~~~~~~
1826
1827Some support for logging is available. Note, however, that the :mod:`logging`
1828package does not use process shared locks so it is possible (depending on the
1829handler type) for messages from different processes to get mixed up.
1830
1831.. currentmodule:: multiprocessing
1832.. function:: get_logger()
1833
1834 Returns the logger used by :mod:`multiprocessing`. If necessary, a new one
1835 will be created.
1836
1837 When first created the logger has level :data:`logging.NOTSET` and has a
1838 handler which sends output to :data:`sys.stderr` using format
1839 ``'[%(levelname)s/%(processName)s] %(message)s'``. (The logger allows use of
1840 the non-standard ``'%(processName)s'`` format.) Message sent to this logger
1841 will not by default propogate to the root logger.
1842
1843 Note that on Windows child processes will only inherit the level of the
1844 parent process's logger -- any other customization of the logger will not be
1845 inherited.
1846
1847Below is an example session with logging turned on::
1848
1849 >>> import processing, logging
1850 >>> logger = processing.getLogger()
1851 >>> logger.setLevel(logging.INFO)
1852 >>> logger.warning('doomed')
1853 [WARNING/MainProcess] doomed
1854 >>> m = processing.Manager()
1855 [INFO/SyncManager-1] child process calling self.run()
1856 [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa'
1857 >>> del m
1858 [INFO/MainProcess] sending shutdown message to manager
1859 [INFO/SyncManager-1] manager exiting with exitcode 0
1860
1861
1862The :mod:`multiprocessing.dummy` module
1863~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1864
1865.. module:: multiprocessing.dummy
1866 :synopsis: Dumb wrapper around threading.
1867
1868:mod:`multiprocessing.dummy` replicates the API of :mod:`multiprocessing` but is
1869no more than a wrapper around the `threading` module.
1870
1871
1872.. _multiprocessing-programming:
1873
1874Programming guidelines
1875----------------------
1876
1877There are certain guidelines and idioms which should be adhered to when using
1878:mod:`multiprocessing`.
1879
1880
1881All platforms
1882~~~~~~~~~~~~~
1883
1884Avoid shared state
1885
1886 As far as possible one should try to avoid shifting large amounts of data
1887 between processes.
1888
1889 It is probably best to stick to using queues or pipes for communication
1890 between processes rather than using the lower level synchronization
1891 primitives from the :mod:`threading` module.
1892
1893Picklability
1894
1895 Ensure that the arguments to the methods of proxies are picklable.
1896
1897Thread safety of proxies
1898
1899 Do not use a proxy object from more than one thread unless you protect it
1900 with a lock.
1901
1902 (There is never a problem with different processes using the *same* proxy.)
1903
1904Joining zombie processes
1905
1906 On Unix when a process finishes but has not been joined it becomes a zombie.
1907 There should never be very many because each time a new process starts (or
1908 :func:`active_children` is called) all completed processes which have not
1909 yet been joined will be joined. Also calling a finished process's
1910 :meth:`Process.is_alive` will join the process. Even so it is probably good
1911 practice to explicitly join all the processes that you start.
1912
1913Better to inherit than pickle/unpickle
1914
1915 On Windows many of types from :mod:`multiprocessing` need to be picklable so
1916 that child processes can use them. However, one should generally avoid
1917 sending shared objects to other processes using pipes or queues. Instead
1918 you should arrange the program so that a process which need access to a
1919 shared resource created elsewhere can inherit it from an ancestor process.
1920
1921Avoid terminating processes
1922
1923 Using the :meth:`Process.terminate` method to stop a process is liable to
1924 cause any shared resources (such as locks, semaphores, pipes and queues)
1925 currently being used by the process to become broken or unavailable to other
1926 processes.
1927
1928 Therefore it is probably best to only consider using
1929 :meth:`Process.terminate()` on processes which never use any shared
1930 resources.
1931
1932Joining processes that use queues
1933
1934 Bear in mind that a process that has put items in a queue will wait before
1935 terminating until all the buffered items are fed by the "feeder" thread to
1936 the underlying pipe. (The child process can call the
1937 :meth:`Queue.cancel_join` method of the queue to avoid this behaviour.)
1938
1939 This means that whenever you use a queue you need to make sure that all
1940 items which have been put on the queue will eventually be removed before the
1941 process is joined. Otherwise you cannot be sure that processes which have
1942 put items on the queue will terminate. Remember also that non-daemonic
1943 processes will be automatically be joined.
1944
1945 An example which will deadlock is the following::
1946
1947 from multiprocessing import Process, Queue
1948
1949 def f(q):
1950 q.put('X' * 1000000)
1951
1952 if __name__ == '__main__':
1953 queue = Queue()
1954 p = Process(target=f, args=(queue,))
1955 p.start()
1956 p.join() # this deadlocks
1957 obj = queue.get()
1958
1959 A fix here would be to swap the last two lines round (or simply remove the
1960 ``p.join()`` line).
1961
1962Explicity pass resources to child processes
1963
1964 On Unix a child process can make use of a shared resource created in a
1965 parent process using a global resource. However, it is better to pass the
1966 object as an argument to the constructor for the child process.
1967
1968 Apart from making the code (potentially) compatible with Windows this also
1969 ensures that as long as the child process is still alive the object will not
1970 be garbage collected in the parent process. This might be important if some
1971 resource is freed when the object is garbage collected in the parent
1972 process.
1973
1974 So for instance ::
1975
1976 from multiprocessing import Process, Lock
1977
1978 def f():
1979 ... do something using "lock" ...
1980
1981 if __name__ == '__main__':
1982 lock = Lock()
1983 for i in range(10):
1984 Process(target=f).start()
1985
1986 should be rewritten as ::
1987
1988 from multiprocessing import Process, Lock
1989
1990 def f(l):
1991 ... do something using "l" ...
1992
1993 if __name__ == '__main__':
1994 lock = Lock()
1995 for i in range(10):
1996 Process(target=f, args=(lock,)).start()
1997
1998
1999Windows
2000~~~~~~~
2001
2002Since Windows lacks :func:`os.fork` it has a few extra restrictions:
2003
2004More picklability
2005
2006 Ensure that all arguments to :meth:`Process.__init__` are picklable. This
2007 means, in particular, that bound or unbound methods cannot be used directly
2008 as the ``target`` argument on Windows --- just define a function and use
2009 that instead.
2010
2011 Also, if you subclass :class:`Process` then make sure that instances will be
2012 picklable when the :meth:`Process.start` method is called.
2013
2014Global variables
2015
2016 Bear in mind that if code run in a child process tries to access a global
2017 variable, then the value it sees (if any) may not be the same as the value
2018 in the parent process at the time that :meth:`Process.start` was called.
2019
2020 However, global variables which are just module level constants cause no
2021 problems.
2022
2023Safe importing of main module
2024
2025 Make sure that the main module can be safely imported by a new Python
2026 interpreter without causing unintended side effects (such a starting a new
2027 process).
2028
2029 For example, under Windows running the following module would fail with a
2030 :exc:`RuntimeError`::
2031
2032 from multiprocessing import Process
2033
2034 def foo():
2035 print 'hello'
2036
2037 p = Process(target=foo)
2038 p.start()
2039
2040 Instead one should protect the "entry point" of the program by using ``if
2041 __name__ == '__main__':`` as follows::
2042
2043 from multiprocessing import Process, freeze_support
2044
2045 def foo():
2046 print 'hello'
2047
2048 if __name__ == '__main__':
2049 freeze_support()
2050 p = Process(target=foo)
2051 p.start()
2052
2053 (The :func:`freeze_support()` line can be omitted if the program will be run
2054 normally instead of frozen.)
2055
2056 This allows the newly spawned Python interpreter to safely import the module
2057 and then run the module's ``foo()`` function.
2058
2059 Similar restrictions apply if a pool or manager is created in the main
2060 module.
2061
2062
2063.. _multiprocessing-examples:
2064
2065Examples
2066--------
2067
2068Demonstration of how to create and use customized managers and proxies:
2069
2070.. literalinclude:: ../includes/mp_newtype.py
2071
2072
2073Using :class:`Pool`:
2074
2075.. literalinclude:: ../includes/mp_pool.py
2076
2077
2078Synchronization types like locks, conditions and queues:
2079
2080.. literalinclude:: ../includes/mp_synchronize.py
2081
2082
2083An showing how to use queues to feed tasks to a collection of worker process and
2084collect the results:
2085
2086.. literalinclude:: ../includes/mp_workers.py
2087
2088
2089An example of how a pool of worker processes can each run a
2090:class:`SimpleHTTPServer.HttpServer` instance while sharing a single listening
2091socket.
2092
2093.. literalinclude:: ../includes/mp_webserver.py
2094
2095
2096Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
2097
2098.. literalinclude:: ../includes/mp_benchmarks.py
2099
2100An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
2101and others to build a system which can distribute processes and work via a
2102distributed queue to a "cluster" of machines on a network, accessible via SSH.
2103You will need to have private key authentication for all hosts configured for
2104this to work.
2105
Benjamin Peterson95a939c2008-06-14 02:23:29 +00002106.. literalinclude:: ../includes/mp_distributing.py