blob: bb374b355e4147b27d52e79008486cf604de16e9 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001:mod:`multiprocessing` --- Process-based "threading" interface
2==============================================================
3
4.. module:: multiprocessing
5 :synopsis: Process-based "threading" interface.
6
7.. versionadded:: 2.6
8
9:mod:`multiprocessing` is a package for the Python language which supports the
10spawning of processes using a similar API of the :mod:`threading` module. It
11runs on both Unix and Windows.
12
13The :mod:`multiprocessing` module offers the capability of both local and remote
14concurrency effectively side-stepping the Global Interpreter Lock by utilizing
15subprocesses for "threads". Due to this, the :mod:`multiprocessing` module
16allows the programmer to fully leverage multiple processors on a given machine.
17
18
19Introduction
20------------
21
22
23Threads, processes and the GIL
24~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
25
26To run more than one piece of code at the same time on the same computer one has
27the choice of either using multiple processes or multiple threads.
28
29Although a program can be made up of multiple processes, these processes are in
30effect completely independent of one another: different processes are not able
31to cooperate with one another unless one sets up some means of communication
32between them (such as by using sockets). If a lot of data must be transferred
33between processes then this can be inefficient.
34
35On the other hand, multiple threads within a single process are intimately
36connected: they share their data but often can interfere badly with one another.
37It is often argued that the only way to make multithreaded programming "easy" is
38to avoid relying on any shared state and for the threads to only communicate by
39passing messages to each other.
40
41CPython has a *Global Interpreter Lock* (GIL) which in many ways makes threading
42easier than it is in most languages by making sure that only one thread can
43manipulate the interpreter's objects at a time. As a result, it is often safe
44to let multiple threads access data without using any additional locking as one
45would need to in a language such as C.
46
47One downside of the GIL is that on multi-processor (or multi-core) systems a
48multithreaded Python program can only make use of one processor at a time unless
49your application makes heavy use of I/O which effectively side-steps this. This
50is a problem that can be overcome by using multiple processes instead.
51
52This package allows one to write multi-process programs using much the same API
53that one uses for writing threaded programs.
54
55
56Forking and spawning
57~~~~~~~~~~~~~~~~~~~~
58
59There are two ways of creating a new process in Python:
60
61* The current process can *fork* a new child process by using the
62 :func:`os.fork` function. This effectively creates an identical copy of the
63 current process which is now able to go off and perform some task set by the
64 parent process. This means that the child process inherits *copies* of all
65 variables that the parent process had. However, :func:`os.fork` is not
66 available on every platform: in particular Windows does not support it.
67
68* Alternatively, the current process can spawn a completely new Python
69 interpreter by using the :mod:`subprocess` module or one of the
70 :func:`os.spawn*` functions. Getting this new interpreter in to a fit state
71 to perform the task set for it by its parent process is, however, a bit of a
72 challenge.
73
74The :mod:`multiprocessing` module uses :func:`os.fork` if it is available since
75it makes life a lot simpler. Forking the process is also more efficient in
76terms of memory usage and the time needed to create the new process.
77
78
79The :class:`Process` class
80~~~~~~~~~~~~~~~~~~~~~~~~~~
81
82In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process`
83object and then calling its :meth:`Process.start` method. :class:`Process`
84follows the API of :class:`threading.Thread`. A trivial example of a
85multiprocess program is ::
86
87 from multiprocessing import Process
88
89 def f(name):
90 print 'hello', name
91
92 if __name__ == '__main__':
93 p = Process(target=f, args=('bob',))
94 p.start()
95 p.join()
96
97Here the function ``f`` is run in a child process.
98
99For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
100necessary, see :ref:`multiprocessing-programming`.
101
102
103
104Exchanging objects between processes
105~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
106
107:mod:`multiprocessing` supports two types of communication channel between
108processes:
109
110**Queues**
111
112 The :class:`Queue` class is a near clone of :class:`Queue.Queue`. For
113 example::
114
115 from multiprocessing import Process, Queue
116
117 def f(q):
118 q.put([42, None, 'hello'])
119
120 if __name__ == '__main__':
121 q = Queue()
122 p = Process(target=f, args=(q,))
123 p.start()
124 print q.get() # prints "[42, None, 'hello']"
125 p.join()
126
127 Queues are thread and process safe.
128
129**Pipes**
130
131 The :func:`Pipe` function returns a pair of connection objects connected by a
132 pipe which by default is duplex (two-way). For example::
133
134 from multiprocessing import Process, Pipe
135
136 def f(conn):
137 conn.send([42, None, 'hello'])
138 conn.close()
139
140 if __name__ == '__main__':
141 parent_conn, child_conn = Pipe()
142 p = Process(target=f, args=(child_conn,))
143 p.start()
144 print parent_conn.recv() # prints "[42, None, 'hello']"
145 p.join()
146
147 The two connection objects returned by :func:`Pipe` represent the two ends of
148 the pipe. Each connection object has :meth:`send` and :meth:`recv` methods
149 (among others). Note that data in a pipe may become corrupted if two
150 processes (or threads) try to read from or write to the *same* end of the
151 pipe at the same time. Of course there is no risk of corruption from
152 processes using different ends of the pipe at the same time.
153
154
155Synchronization between processes
156~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
157
158:mod:`multiprocessing` contains equivalents of all the synchronization
159primitives from :mod:`threading`. For instance one can use a lock to ensure
160that only one process prints to standard output at a time::
161
162 from multiprocessing import Process, Lock
163
164 def f(l, i):
165 l.acquire()
166 print 'hello world', i
167 l.release()
168
169 if __name__ == '__main__':
170 lock = Lock()
171
172 for num in range(10):
173 Process(target=f, args=(lock, num)).start()
174
175Without using the lock output from the different processes is liable to get all
176mixed up.
177
178
179Sharing state between processes
180~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
181
182As mentioned above, when doing concurrent programming it is usually best to
183avoid using shared state as far as possible. This is particularly true when
184using multiple processes.
185
186However, if you really do need to use some shared data then
187:mod:`multiprocessing` provides a couple of ways of doing so.
188
189**Shared memory**
190
191 Data can be stored in a shared memory map using :class:`Value` or
192 :class:`Array`. For example, the following code ::
193
194 from multiprocessing import Process, Value, Array
195
196 def f(n, a):
197 n.value = 3.1415927
198 for i in range(len(a)):
199 a[i] = -a[i]
200
201 if __name__ == '__main__':
202 num = Value('d', 0.0)
203 arr = Array('i', range(10))
204
205 p = Process(target=f, args=(num, arr))
206 p.start()
207 p.join()
208
209 print num.value
210 print arr[:]
211
212 will print ::
213
214 3.1415927
215 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
216
217 The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are
218 typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a
219 double precision float and ``'i'`` inidicates a signed integer. These shared
220 objects will be process and thread safe.
221
222 For more flexibility in using shared memory one can use the
223 :mod:`multiprocessing.sharedctypes` module which supports the creation of
224 arbitrary ctypes objects allocated from shared memory.
225
226**Server process**
227
228 A manager object returned by :func:`Manager` controls a server process which
229 holds python objects and allows other processes to manipulate them using
230 proxies.
231
232 A manager returned by :func:`Manager` will support types :class:`list`,
233 :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`,
234 :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
235 :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For
236 example, ::
237
238 from multiprocessing import Process, Manager
239
240 def f(d, l):
241 d[1] = '1'
242 d['2'] = 2
243 d[0.25] = None
244 l.reverse()
245
246 if __name__ == '__main__':
247 manager = Manager()
248
249 d = manager.dict()
250 l = manager.list(range(10))
251
252 p = Process(target=f, args=(d, l))
253 p.start()
254 p.join()
255
256 print d
257 print l
258
259 will print ::
260
261 {0.25: None, 1: '1', '2': 2}
262 [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
263
264 Server process managers are more flexible than using shared memory objects
265 because they can be made to support arbitrary object types. Also, a single
266 manager can be shared by processes on different computers over a network.
267 They are, however, slower than using shared memory.
268
269
270Using a pool of workers
271~~~~~~~~~~~~~~~~~~~~~~~
272
273The :class:`multiprocessing.pool.Pool()` class represens a pool of worker
274processes. It has methods which allows tasks to be offloaded to the worker
275processes in a few different ways.
276
277For example::
278
279 from multiprocessing import Pool
280
281 def f(x):
282 return x*x
283
284 if __name__ == '__main__':
285 pool = Pool(processes=4) # start 4 worker processes
286 result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously
287 print result.get(timeout=1) # prints "100" unless your computer is *very* slow
288 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
289
290
291Reference
292---------
293
294The :mod:`multiprocessing` package mostly replicates the API of the
295:mod:`threading` module.
296
297
298:class:`Process` and exceptions
299~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
300
301.. class:: Process([group[, target[, name[, args[, kwargs]]]]])
302
303 Process objects represent activity that is run in a separate process. The
304 :class:`Process` class has equivalents of all the methods of
305 :class:`threading.Thread`.
306
307 The constructor should always be called with keyword arguments. *group*
308 should always be ``None``; it exists soley for compatibility with
309 :class:`threading.Thread`. *target* is the callable object to be invoked by
310 the :meth:`run()` method. It defaults to None, meaning nothing is
311 called. *name* is the process name. By default, a unique name is constructed
312 of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
313 :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
314 is determined by the *generation* of the process. *args* is the argument
315 tuple for the target invocation. *kwargs* is a dictionary of keyword
316 arguments for the target invocation. By default, no arguments are passed to
317 *target*.
318
319 If a subclass overrides the constructor, it must make sure it invokes the
320 base class constructor (:meth:`Process.__init__`) before doing anything else
321 to the process.
322
323 .. method:: run()
324
325 Method representing the process's activity.
326
327 You may override this method in a subclass. The standard :meth:`run`
328 method invokes the callable object passed to the object's constructor as
329 the target argument, if any, with sequential and keyword arguments taken
330 from the *args* and *kwargs* arguments, respectively.
331
332 .. method:: start()
333
334 Start the process's activity.
335
336 This must be called at most once per process object. It arranges for the
337 object's :meth:`run` method to be invoked in a separate process.
338
339 .. method:: join([timeout])
340
341 Block the calling thread until the process whose :meth:`join` method is
342 called terminates or until the optional timeout occurs.
343
344 If *timeout* is ``None`` then there is no timeout.
345
346 A process can be joined many times.
347
348 A process cannot join itself because this would cause a deadlock. It is
349 an error to attempt to join a process before it has been started.
350
351 .. method:: get_name()
352
353 Return the process's name.
354
355 .. method:: set_name(name)
356
357 Set the process's name.
358
359 The name is a string used for identification purposes only. It has no
360 semantics. Multiple processes may be given the same name. The initial
361 name is set by the constructor.
362
363 .. method:: is_alive()
364
365 Return whether the process is alive.
366
367 Roughly, a process object is alive from the moment the :meth:`start`
368 method returns until the child process terminates.
369
370 .. method:: is_daemon()
371
372 Return the process's daemon flag.
373
374 .. method:: set_daemon(daemonic)
375
376 Set the process's daemon flag to the Boolean value *daemonic*. This must
377 be called before :meth:`start` is called.
378
379 The initial value is inherited from the creating process.
380
381 When a process exits, it attempts to terminate all of its daemonic child
382 processes.
383
384 Note that a daemonic process is not allowed to create child processes.
385 Otherwise a daemonic process would leave its children orphaned if it gets
386 terminated when its parent process exits.
387
388 In addition process objects also support the following methods:
389
390 .. method:: get_pid()
391
392 Return the process ID. Before the process is spawned, this will be
393 ``None``.
394
395 .. method:: get_exit_code()
396
397 Return the child's exit code. This will be ``None`` if the process has
398 not yet terminated. A negative value *-N* indicates that the child was
399 terminated by signal *N*.
400
401 .. method:: get_auth_key()
402
403 Return the process's authentication key (a byte string).
404
405 When :mod:`multiprocessing` is initialized the main process is assigned a
406 random string using :func:`os.random`.
407
408 When a :class:`Process` object is created, it will inherit the
409 authentication key of its parent process, although this may be changed
410 using :meth:`set_auth_key` below.
411
412 See :ref:`multiprocessing-auth-keys`.
413
414 .. method:: set_auth_key(authkey)
415
416 Set the process's authentication key which must be a byte string.
417
418 .. method:: terminate()`
419
420 Terminate the process. On Unix this is done using the ``SIGTERM`` signal,
421 on Windows ``TerminateProcess()`` is used. Note that exit handlers and
422 finally clauses etc will not be executed.
423
424 Note that descendant processes of the process will *not* be terminated --
425 they will simply become orphaned.
426
427 .. warning::
428
429 If this method is used when the associated process is using a pipe or
430 queue then the pipe or queue is liable to become corrupted and may
431 become unusable by other process. Similarly, if the process has
432 acquired a lock or semaphore etc. then terminating it is liable to
433 cause other processes to deadlock.
434
435 Note that the :meth:`start`, :meth:`join`, :meth:`is_alive` and
436 :meth:`get_exit_code` methods should only be called by the process that
437 created the process object.
438
439 Example usage of some of the methods of :class:`Process`::
440
441 >>> import processing, time, signal
442 >>> p = processing.Process(target=time.sleep, args=(1000,))
443 >>> print p, p.is_alive()
444 <Process(Process-1, initial)> False
445 >>> p.start()
446 >>> print p, p.is_alive()
447 <Process(Process-1, started)> True
448 >>> p.terminate()
449 >>> print p, p.is_alive()
450 <Process(Process-1, stopped[SIGTERM])> False
451 >>> p.get_exit_code() == -signal.SIGTERM
452 True
453
454
455.. exception:: BufferTooShort
456
457 Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied
458 buffer object is too small for the message read.
459
460 If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
461 the message as a byte string.
462
463
464Pipes and Queues
465~~~~~~~~~~~~~~~~
466
467When using multiple processes, one generally uses message passing for
468communication between processes and avoids having to use any synchronization
469primitives like locks.
470
471For passing messages one can use :func:`Pipe` (for a connection between two
472processes) or a queue (which allows multiple producers and consumers).
473
474The :class:`Queue` and :class:`JoinableQueue` types are multi-producer,
475multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the
476standard library. They differ in that :class:`Queue` lacks the
477:meth:`task_done` and :meth:`join` methods introduced into Python 2.5's
478:class:`Queue.Queue` class.
479
480If you use :class:`JoinableQueue` then you **must** call
481:meth:`JoinableQueue.task_done` for each task removed from the queue or else the
482semaphore used to count the number of unfinished tasks may eventually overflow
483raising an exception.
484
485.. note::
486
487 :mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and
488 :exc:`Queue.Full` exceptions to signal a timeout. They are not available in
489 the :mod:`multiprocessing` namespace so you need to import them from
490 :mod:`Queue`.
491
492
493.. warning::
494
495 If a process is killed using :meth:`Process.terminate` or :func:`os.kill`
496 while it is trying to use a :class:`Queue`, then the data in the queue is
497 likely to become corrupted. This may cause any other processes to get an
498 exception when it tries to use the queue later on.
499
500.. warning::
501
502 As mentioned above, if a child process has put items on a queue (and it has
503 not used :meth:`JoinableQueue.cancel_join_thread`), then that process will
504 not terminate until all buffered items have been flushed to the pipe.
505
506 This means that if you try joining that process you may get a deadlock unless
507 you are sure that all items which have been put on the queue have been
508 consumed. Similarly, if the child process is non-daemonic then the parent
509 process may hang on exit when it tries to join all it non-daemonic children.
510
511 Note that a queue created using a manager does not have this issue. See
512 :ref:`multiprocessing-programming`.
513
514Note that one can also create a shared queue by using a manager object -- see
515:ref:`multiprocessing-managers`.
516
517For an example of the usage of queues for interprocess communication see
518:ref:`multiprocessing-examples`.
519
520
521.. function:: Pipe([duplex])
522
523 Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing
524 the ends of a pipe.
525
526 If *duplex* is ``True`` (the default) then the pipe is bidirectional. If
527 *duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be
528 used for receiving messages and ``conn2`` can only be used for sending
529 messages.
530
531
532.. class:: Queue([maxsize])
533
534 Returns a process shared queue implemented using a pipe and a few
535 locks/semaphores. When a process first puts an item on the queue a feeder
536 thread is started which transfers objects from a buffer into the pipe.
537
538 The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the
539 standard library's :mod:`Queue` module are raised to signal timeouts.
540
541 :class:`Queue` implements all the methods of :class:`Queue.Queue` except for
542 :meth:`task_done` and :meth:`join`.
543
544 .. method:: qsize()
545
546 Return the approximate size of the queue. Because of
547 multithreading/multiprocessing semantics, this number is not reliable.
548
549 Note that this may raise :exc:`NotImplementedError` on Unix platforms like
550 MacOS X where ``sem_getvalue()`` is not implemented.
551
552 .. method:: empty()
553
554 Return ``True`` if the queue is empty, ``False`` otherwise. Because of
555 multithreading/multiprocessing semantics, this is not reliable.
556
557 .. method:: full()
558
559 Return ``True`` if the queue is full, ``False`` otherwise. Because of
560 multithreading/multiprocessing semantics, this is not reliable.
561
562 .. method:: put(item[, block[, timeout]])`
563
564 Put item into the queue. If optional args *block* is ``True`` (the
565 default) and *timeout* is ``None`` (the default), block if necessary until
566 a free slot is available. If *timeout* is a positive number, it blocks at
567 most *timeout* seconds and raises the :exc:`Queue.Full` exception if no
568 free slot was available within that time. Otherwise (*block* is
569 ``False``), put an item on the queue if a free slot is immediately
570 available, else raise the :exc:`Queue.Full` exception (*timeout* is
571 ignored in that case).
572
573 .. method:: put_nowait(item)
574
575 Equivalent to ``put(item, False)``.
576
577 .. method:: get([block[, timeout]])
578
579 Remove and return an item from the queue. If optional args *block* is
580 ``True`` (the default) and *timeout* is ``None`` (the default), block if
581 necessary until an item is available. If *timeout* is a positive number,
582 it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty`
583 exception if no item was available within that time. Otherwise (block is
584 ``False``), return an item if one is immediately available, else raise the
585 :exc:`Queue.Empty` exception (*timeout* is ignored in that case).
586
587 .. method:: get_nowait()
588 get_no_wait()
589
590 Equivalent to ``get(False)``.
591
592 :class:`multiprocessing.Queue` has a few additional methods not found in
593 :class:`Queue.Queue` which are usually unnecessary:
594
595 .. method:: close()
596
597 Indicate that no more data will be put on this queue by the current
598 process. The background thread will quit once it has flushed all buffered
599 data to the pipe. This is called automatically when the queue is garbage
600 collected.
601
602 .. method:: join_thread()
603
604 Join the background thread. This can only be used after :meth:`close` has
605 been called. It blocks until the background thread exits, ensuring that
606 all data in the buffer has been flushed to the pipe.
607
608 By default if a process is not the creator of the queue then on exit it
609 will attempt to join the queue's background thread. The process can call
610 :meth:`cancel_join_thread()` to make :meth:`join_thread()` do nothing.
611
612 .. method:: cancel_join_thread()
613
614 Prevent :meth:`join_thread` from blocking. In particular, this prevents
615 the background thread from being joined automatically when the process
616 exits -- see :meth:`join_thread()`.
617
618
619.. class:: JoinableQueue([maxsize])
620
621 :class:`JoinableQueue`, a :class:`Queue` subclass, is a queue which
622 additionally has :meth:`task_done` and :meth:`join` methods.
623
624 .. method:: task_done()
625
626 Indicate that a formerly enqueued task is complete. Used by queue consumer
627 threads. For each :meth:`get` used to fetch a task, a subsequent call to
628 :meth:`task_done` tells the queue that the processing on the task is
629 complete.
630
631 If a :meth:`join` is currently blocking, it will resume when all items
632 have been processed (meaning that a :meth:`task_done` call was received
633 for every item that had been :meth:`put` into the queue).
634
635 Raises a :exc:`ValueError` if called more times than there were items
636 placed in the queue.
637
638
639 .. method:: join()
640
641 Block until all items in the queue have been gotten and processed.
642
643 The count of unfinished tasks goes up whenever an item is added to the
644 queue. The count goes down whenever a consumer thread calls
645 :meth:`task_done` to indicate that the item was retrieved and all work on
646 it is complete. When the count of unfinished tasks drops to zero,
647 :meth:`join` unblocks.
648
649
650Miscellaneous
651~~~~~~~~~~~~~
652
653.. function:: active_children()
654
655 Return list of all live children of the current process.
656
657 Calling this has the side affect of "joining" any processes which have
658 already finished.
659
660.. function:: cpu_count()
661
662 Return the number of CPUs in the system. May raise
663 :exc:`NotImplementedError`.
664
665.. function:: current_process()
666
667 Return the :class:`Process` object corresponding to the current process.
668
669 An analogue of :func:`threading.current_thread`.
670
671.. function:: freeze_support()
672
673 Add support for when a program which uses :mod:`multiprocessing` has been
674 frozen to produce a Windows executable. (Has been tested with **py2exe**,
675 **PyInstaller** and **cx_Freeze**.)
676
677 One needs to call this function straight after the ``if __name__ ==
678 '__main__'`` line of the main module. For example::
679
680 from multiprocessing import Process, freeze_support
681
682 def f():
683 print 'hello world!'
684
685 if __name__ == '__main__':
686 freeze_support()
687 Process(target=f).start()
688
689 If the :func:`freeze_support()` line is missed out then trying to run the
690 frozen executable will raise :exc:`RuntimeError`.
691
692 If the module is being run normally by the Python interpreter then
693 :func:`freeze_support()` has no effect.
694
695.. function:: set_executable()
696
697 Sets the path of the python interpreter to use when starting a child process.
698 (By default `sys.executable` is used). Embedders will probably need to do
699 some thing like ::
700
701 setExecutable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
702
703 before they can create child processes. (Windows only)
704
705
706.. note::
707
708 :mod:`multiprocessing` contains no analogues of
709 :func:`threading.active_count`, :func:`threading.enumerate`,
710 :func:`threading.settrace`, :func:`threading.setprofile`,
711 :class:`threading.Timer`, or :class:`threading.local`.
712
713
714Connection Objects
715~~~~~~~~~~~~~~~~~~
716
717Connection objects allow the sending and receiving of picklable objects or
718strings. They can be thought of as message oriented connected sockets.
719
720Connection objects usually created using :func:`Pipe()` -- see also
721:ref:`multiprocessing-listeners-clients`.
722
723.. class:: Connection
724
725 .. method:: send(obj)
726
727 Send an object to the other end of the connection which should be read
728 using :meth:`recv`.
729
730 The object must be picklable.
731
732 .. method:: recv()
733
734 Return an object sent from the other end of the connection using
735 :meth:`send`. Raises :exc:`EOFError` if there is nothing left to receive
736 and the other end was closed.
737
738 .. method:: fileno()
739
740 Returns the file descriptor or handle used by the connection.
741
742 .. method:: close()
743
744 Close the connection.
745
746 This is called automatically when the connection is garbage collected.
747
748 .. method:: poll([timeout])
749
750 Return whether there is any data available to be read.
751
752 If *timeout* is not specified then it will return immediately. If
753 *timeout* is a number then this specifies the maximum time in seconds to
754 block. If *timeout* is ``None`` then an infinite timeout is used.
755
756 .. method:: send_bytes(buffer[, offset[, size]])
757
758 Send byte data from an object supporting the buffer interface as a
759 complete message.
760
761 If *offset* is given then data is read from that position in *buffer*. If
762 *size* is given then that many bytes will be read from buffer.
763
764 .. method:: recv_bytes([maxlength])
765
766 Return a complete message of byte data sent from the other end of the
767 connection as a string. Raises :exc:`EOFError` if there is nothing left
768 to receive and the other end has closed.
769
770 If *maxlength* is specified and the message is longer than *maxlength*
771 then :exc:`IOError` is raised and the connection will no longer be
772 readable.
773
774 .. method:: recv_bytes_into(buffer[, offset])
775
776 Read into *buffer* a complete message of byte data sent from the other end
777 of the connection and return the number of bytes in the message. Raises
778 :exc:`EOFError` if there is nothing left to receive and the other end was
779 closed.
780
781 *buffer* must be an object satisfying the writable buffer interface. If
782 *offset* is given then the message will be written into the buffer from
783 *that position. Offset must be a non-negative integer less than the
784 *length of *buffer* (in bytes).
785
786 If the buffer is too short then a :exc:`BufferTooShort` exception is
787 raised and the complete message is available as ``e.args[0]`` where ``e``
788 is the exception instance.
789
790
791For example:
792
793 >>> from multiprocessing import Pipe
794 >>> a, b = Pipe()
795 >>> a.send([1, 'hello', None])
796 >>> b.recv()
797 [1, 'hello', None]
798 >>> b.send_bytes('thank you')
799 >>> a.recv_bytes()
800 'thank you'
801 >>> import array
802 >>> arr1 = array.array('i', range(5))
803 >>> arr2 = array.array('i', [0] * 10)
804 >>> a.send_bytes(arr1)
805 >>> count = b.recv_bytes_into(arr2)
806 >>> assert count == len(arr1) * arr1.itemsize
807 >>> arr2
808 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
809
810
811.. warning::
812
813 The :meth:`Connection.recv` method automatically unpickles the data it
814 receives, which can be a security risk unless you can trust the process
815 which sent the message.
816
817 Therefore, unless the connection object was produced using :func:`Pipe()`
818 you should only use the `recv()` and `send()` methods after performing some
819 sort of authentication. See :ref:`multiprocessing-auth-keys`.
820
821.. warning::
822
823 If a process is killed while it is trying to read or write to a pipe then
824 the data in the pipe is likely to become corrupted, because it may become
825 impossible to be sure where the message boundaries lie.
826
827
828Synchronization primitives
829~~~~~~~~~~~~~~~~~~~~~~~~~~
830
831Generally synchronization primitives are not as necessary in a multiprocess
832program as they are in a mulithreaded program. See the documentation for the
833standard library's :mod:`threading` module.
834
835Note that one can also create synchronization primitives by using a manager
836object -- see :ref:`multiprocessing-managers`.
837
838.. class:: BoundedSemaphore([value])
839
840 A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`.
841
842 (On Mac OSX this is indistiguishable from :class:`Semaphore` because
843 ``sem_getvalue()`` is not implemented on that platform).
844
845.. class:: Condition([lock])
846
847 A condition variable: a clone of `threading.Condition`.
848
849 If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
850 object from :mod:`multiprocessing`.
851
852.. class:: Event()
853
854 A clone of :class:`threading.Event`.
855
856.. class:: Lock()
857
858 A non-recursive lock object: a clone of :class:`threading.Lock`.
859
860.. class:: RLock()
861
862 A recursive lock object: a clone of :class:`threading.RLock`.
863
864.. class:: Semaphore([value])
865
866 A bounded semaphore object: a clone of :class:`threading.Semaphore`.
867
868.. note::
869
870 The :meth:`acquire()` method of :class:`BoundedSemaphore`, :class:`Lock`,
871 :class:`RLock` and :class:`Semaphore` has a timeout parameter not supported
872 by the equivalents in :mod:`threading`. The signature is
873 ``acquire(block=True, timeout=None)`` with keyword parameters being
874 acceptable. If *block* is ``True`` and *timeout* is not ``None`` then it
875 specifies a timeout in seconds. If *block* is ``False`` then *timeout* is
876 ignored.
877
878.. note::
879
880 If the SIGINT signal generated by Ctrl-C arrives while the main thread is
881 blocked by a call to :meth:`BoundedSemaphore.acquire`, :meth:`Lock.acquire`,
882 :meth:`RLock.acquire`, :meth:`Semaphore.acquire`, :meth:`Condition.acquire`
883 or :meth:`Condition.wait` then the call will be immediately interrupted and
884 :exc:`KeyboardInterrupt` will be raised.
885
886 This differs from the behaviour of :mod:`threading` where SIGINT will be
887 ignored while the equivalent blocking calls are in progress.
888
889
890Shared :mod:`ctypes` Objects
891~~~~~~~~~~~~~~~~~~~~~~~~~~~~
892
893It is possible to create shared objects using shared memory which can be
894inherited by child processes.
895
896.. function:: Value(typecode_or_type[, lock[, *args]])
897
898 Return a :mod:`ctypes` object allocated from shared memory. By default the
899 return value is actually a synchronized wrapper for the object.
900
901 *typecode_or_type* determines the type of the returned object: it is either a
902 ctypes type or a one character typecode of the kind used by the :mod:`array`
903 module. *\*args* is passed on to the constructor for the type.
904
905 If *lock* is ``True`` (the default) then a new lock object is created to
906 synchronize access to the value. If *lock* is a :class:`Lock` or
907 :class:`RLock` object then that will be used to synchronize access to the
908 value. If *lock* is ``False`` then access to the returned object will not be
909 automatically protected by a lock, so it will not necessarily be
910 "process-safe".
911
912 Note that *lock* is a keyword-only argument.
913
914.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True)
915
916 Return a ctypes array allocated from shared memory. By default the return
917 value is actually a synchronized wrapper for the array.
918
919 *typecode_or_type* determines the type of the elements of the returned array:
920 it is either a ctypes type or a one character typecode of the kind used by
921 the :mod:`array` module. If *size_or_initializer* is an integer, then it
922 determines the length of the array, and the array will be initially zeroed.
923 Otherwise, *size_or_initializer* is a sequence which is used to initialize
924 the array and whose length determines the length of the array.
925
926 If *lock* is ``True`` (the default) then a new lock object is created to
927 synchronize access to the value. If *lock* is a :class:`Lock` or
928 :class:`RLock` object then that will be used to synchronize access to the
929 value. If *lock* is ``False`` then access to the returned object will not be
930 automatically protected by a lock, so it will not necessarily be
931 "process-safe".
932
933 Note that *lock* is a keyword only argument.
934
935 Note that an array of :data:`ctypes.c_char` has *value* and *rawvalue*
936 attributes which allow one to use it to store and retrieve strings.
937
938
939The :mod:`multiprocessing.sharedctypes` module
940>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
941
942.. module:: multiprocessing.sharedctypes
943 :synopsis: Allocate ctypes objects from shared memory.
944
945The :mod:`multiprocessing.sharedctypes` module provides functions for allocating
946:mod:`ctypes` objects from shared memory which can be inherited by child
947processes.
948
949.. note::
950
951 Although it is posible to store a pointer in shared memory remember that this
952 will refer to a location in the address space of a specific process.
953 However, the pointer is quite likely to be invalid in the context of a second
954 process and trying to dereference the pointer from the second process may
955 cause a crash.
956
957.. function:: RawArray(typecode_or_type, size_or_initializer)
958
959 Return a ctypes array allocated from shared memory.
960
961 *typecode_or_type* determines the type of the elements of the returned array:
962 it is either a ctypes type or a one character typecode of the kind used by
963 the :mod:`array` module. If *size_or_initializer* is an integer then it
964 determines the length of the array, and the array will be initially zeroed.
965 Otherwise *size_or_initializer* is a sequence which is used to initialize the
966 array and whose length determines the length of the array.
967
968 Note that setting and getting an element is potentially non-atomic -- use
969 :func:`Array` instead to make sure that access is automatically synchronized
970 using a lock.
971
972.. function:: RawValue(typecode_or_type, *args)
973
974 Return a ctypes object allocated from shared memory.
975
976 *typecode_or_type* determines the type of the returned object: it is either a
977 ctypes type or a one character typecode of the kind used by the :mod:`array`
978 module. */*args* is passed on to the constructor for the type.
979
980 Note that setting and getting the value is potentially non-atomic -- use
981 :func:`Value` instead to make sure that access is automatically synchronized
982 using a lock.
983
984 Note that an array of :data:`ctypes.c_char` has ``value`` and ``rawvalue``
985 attributes which allow one to use it to store and retrieve strings -- see
986 documentation for :mod:`ctypes`.
987
988.. function:: Array(typecode_or_type, size_or_initializer[, lock[, *args]])
989
990 The same as :func:`RawArray` except that depending on the value of *lock* a
991 process-safe synchronization wrapper may be returned instead of a raw ctypes
992 array.
993
994 If *lock* is ``True`` (the default) then a new lock object is created to
995 synchronize access to the value. If *lock* is a :class:`Lock` or
996 :class:`RLock` object then that will be used to synchronize access to the
997 value. If *lock* is ``False`` then access to the returned object will not be
998 automatically protected by a lock, so it will not necessarily be
999 "process-safe".
1000
1001 Note that *lock* is a keyword-only argument.
1002
1003.. function:: Value(typecode_or_type, *args[, lock])
1004
1005 The same as :func:`RawValue` except that depending on the value of *lock* a
1006 process-safe synchronization wrapper may be returned instead of a raw ctypes
1007 object.
1008
1009 If *lock* is ``True`` (the default) then a new lock object is created to
1010 synchronize access to the value. If *lock* is a :class:`Lock` or
1011 :class:`RLock` object then that will be used to synchronize access to the
1012 value. If *lock* is ``False`` then access to the returned object will not be
1013 automatically protected by a lock, so it will not necessarily be
1014 "process-safe".
1015
1016 Note that *lock* is a keyword-only argument.
1017
1018.. function:: copy(obj)
1019
1020 Return a ctypes object allocated from shared memory which is a copy of the
1021 ctypes object *obj*.
1022
1023.. function:: synchronized(obj[, lock])
1024
1025 Return a process-safe wrapper object for a ctypes object which uses *lock* to
1026 synchronize access. If *lock* is ``None`` (the default) then a
1027 :class:`multiprocessing.RLock` object is created automatically.
1028
1029 A synchronized wrapper will have two methods in addition to those of the
1030 object it wraps: :meth:`get_obj()` returns the wrapped object and
1031 :meth:`get_lock()` returns the lock object used for synchronization.
1032
1033 Note that accessing the ctypes object through the wrapper can be a lot slower
1034 han accessing the raw ctypes object.
1035
1036
1037The table below compares the syntax for creating shared ctypes objects from
1038shared memory with the normal ctypes syntax. (In the table ``MyStruct`` is some
1039subclass of :class:`ctypes.Structure`.)
1040
1041==================== ========================== ===========================
1042ctypes sharedctypes using type sharedctypes using typecode
1043==================== ========================== ===========================
1044c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4)
1045MyStruct(4, 6) RawValue(MyStruct, 4, 6)
1046(c_short * 7)() RawArray(c_short, 7) RawArray('h', 7)
1047(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))
1048==================== ========================== ===========================
1049
1050
1051Below is an example where a number of ctypes objects are modified by a child
1052process::
1053
1054 from multiprocessing import Process, Lock
1055 from multiprocessing.sharedctypes import Value, Array
1056 from ctypes import Structure, c_double
1057
1058 class Point(Structure):
1059 _fields_ = [('x', c_double), ('y', c_double)]
1060
1061 def modify(n, x, s, A):
1062 n.value **= 2
1063 x.value **= 2
1064 s.value = s.value.upper()
1065 for a in A:
1066 a.x **= 2
1067 a.y **= 2
1068
1069 if __name__ == '__main__':
1070 lock = Lock()
1071
1072 n = Value('i', 7)
1073 x = Value(ctypes.c_double, 1.0/3.0, lock=False)
1074 s = Array('c', 'hello world', lock=lock)
1075 A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
1076
1077 p = Process(target=modify, args=(n, x, s, A))
1078 p.start()
1079 p.join()
1080
1081 print n.value
1082 print x.value
1083 print s.value
1084 print [(a.x, a.y) for a in A]
1085
1086
1087.. highlightlang:: none
1088
1089The results printed are ::
1090
1091 49
1092 0.1111111111111111
1093 HELLO WORLD
1094 [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
1095
1096.. highlightlang:: python
1097
1098
1099.. _multiprocessing-managers:
1100
1101Managers
1102~~~~~~~~
1103
1104Managers provide a way to create data which can be shared between different
1105processes. A manager object controls a server process which manages *shared
1106objects*. Other processes can access the shared objects by using proxies.
1107
1108.. function:: multiprocessing.Manager()
1109
1110 Returns a started :class:`SyncManager` object which can be used for sharing
1111 objects between processes. The returned manager object corresponds to a
1112 spawned child process and has methods which will create shared objects and
1113 return corresponding proxies.
1114
1115.. module:: multiprocessing.managers
1116 :synopsis: Share data between process with shared objects.
1117
1118Manager processes will be shutdown as soon as they are garbage collected or
1119their parent process exits. The manager classes are defined in the
1120:mod:`multiprocessing.managers` module:
1121
1122.. class:: BaseManager([address[, authkey]])
1123
1124 Create a BaseManager object.
1125
1126 Once created one should call :meth:`start` or :meth:`serve_forever` to ensure
1127 that the manager object refers to a started manager process.
1128
1129 *address* is the address on which the manager process listens for new
1130 connections. If *address* is ``None`` then an arbitrary one is chosen.
1131
1132 *authkey* is the authentication key which will be used to check the validity
1133 of incoming connections to the server process. If *authkey* is ``None`` then
1134 ``current_process().get_auth_key()``. Otherwise *authkey* is used and it
1135 must be a string.
1136
1137 .. method:: start()
1138
1139 Start a subprocess to start the manager.
1140
1141 .. method:: server_forever()
1142
1143 Run the server in the current process.
1144
1145 .. method:: from_address(address, authkey)
1146
1147 A class method which creates a manager object referring to a pre-existing
1148 server process which is using the given address and authentication key.
1149
1150 .. method:: shutdown()
1151
1152 Stop the process used by the manager. This is only available if
1153 meth:`start` has been used to start the server process.
1154
1155 This can be called multiple times.
1156
1157 .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
1158
1159 A classmethod which can be used for registering a type or callable with
1160 the manager class.
1161
1162 *typeid* is a "type identifier" which is used to identify a particular
1163 type of shared object. This must be a string.
1164
1165 *callable* is a callable used for creating objects for this type
1166 identifier. If a manager instance will be created using the
1167 :meth:`from_address()` classmethod or if the *create_method* argument is
1168 ``False`` then this can be left as ``None``.
1169
1170 *proxytype* is a subclass of :class:`multiprocessing.managers.BaseProxy`
1171 which is used to create proxies for shared objects with this *typeid*. If
1172 ``None`` then a proxy class is created automatically.
1173
1174 *exposed* is used to specify a sequence of method names which proxies for
1175 this typeid should be allowed to access using
1176 :meth:`BaseProxy._callMethod`. (If *exposed* is ``None`` then
1177 :attr:`proxytype._exposed_` is used instead if it exists.) In the case
1178 where no exposed list is specified, all "public methods" of the shared
1179 object will be accessible. (Here a "public method" means any attribute
1180 which has a ``__call__()`` method and whose name does not begin with
1181 ``'_'``.)
1182
1183 *method_to_typeid* is a mapping used to specify the return type of those
1184 exposed methods which should return a proxy. It maps method names to
1185 typeid strings. (If *method_to_typeid* is ``None`` then
1186 :attr:`proxytype._method_to_typeid_` is used instead if it exists.) If a
1187 method's name is not a key of this mapping or if the mapping is ``None``
1188 then the object returned by the method will be copied by value.
1189
1190 *create_method* determines whether a method should be created with name
1191 *typeid* which can be used to tell the server process to create a new
1192 shared object and return a proxy for it. By default it is ``True``.
1193
1194 :class:`BaseManager` instances also have one read-only property:
1195
1196 .. attribute:: address
1197
1198 The address used by the manager.
1199
1200
1201.. class:: SyncManager
1202
1203 A subclass of :class:`BaseManager` which can be used for the synchronization
1204 of processes. Objects of this type are returned by
1205 :func:`multiprocessing.Manager()`.
1206
1207 It also supports creation of shared lists and dictionaries.
1208
1209 .. method:: BoundedSemaphore([value])
1210
1211 Create a shared :class:`threading.BoundedSemaphore` object and return a
1212 proxy for it.
1213
1214 .. method:: Condition([lock])
1215
1216 Create a shared :class:`threading.Condition` object and return a proxy for
1217 it.
1218
1219 If *lock* is supplied then it should be a proxy for a
1220 :class:`threading.Lock` or :class:`threading.RLock` object.
1221
1222 .. method:: Event()
1223
1224 Create a shared :class:`threading.Event` object and return a proxy for it.
1225
1226 .. method:: Lock()
1227
1228 Create a shared :class:`threading.Lock` object and return a proxy for it.
1229
1230 .. method:: Namespace()
1231
1232 Create a shared :class:`Namespace` object and return a proxy for it.
1233
1234 .. method:: Queue([maxsize])
1235
1236 Create a shared `Queue.Queue` object and return a proxy for it.
1237
1238 .. method:: RLock()
1239
1240 Create a shared :class:`threading.RLock` object and return a proxy for it.
1241
1242 .. method:: Semaphore([value])
1243
1244 Create a shared :class:`threading.Semaphore` object and return a proxy for
1245 it.
1246
1247 .. method:: Array(typecode, sequence)
1248
1249 Create an array and return a proxy for it. (*format* is ignored.)
1250
1251 .. method:: Value(typecode, value)
1252
1253 Create an object with a writable ``value`` attribute and return a proxy
1254 for it.
1255
1256 .. method:: dict()
1257 dict(mapping)
1258 dict(sequence)
1259
1260 Create a shared ``dict`` object and return a proxy for it.
1261
1262 .. method:: list()
1263 list(sequence)
1264
1265 Create a shared ``list`` object and return a proxy for it.
1266
1267
1268Namespace objects
1269>>>>>>>>>>>>>>>>>
1270
1271A namespace object has no public methods, but does have writable attributes.
1272Its representation shows the values of its attributes.
1273
1274However, when using a proxy for a namespace object, an attribute beginning with
1275``'_'`` will be an attribute of the proxy and not an attribute of the referent::
1276
1277 >>> manager = multiprocessing.Manager()
1278 >>> Global = manager.Namespace()
1279 >>> Global.x = 10
1280 >>> Global.y = 'hello'
1281 >>> Global._z = 12.3 # this is an attribute of the proxy
1282 >>> print Global
1283 Namespace(x=10, y='hello')
1284
1285
1286Customized managers
1287>>>>>>>>>>>>>>>>>>>
1288
1289To create one's own manager, one creates a subclass of :class:`BaseManager` and
1290use the :meth:`resgister()` classmethod to register new types or callables with
1291the manager class. For example::
1292
1293 from multiprocessing.managers import BaseManager
1294
1295 class MathsClass(object):
1296 def add(self, x, y):
1297 return x + y
1298 def mul(self, x, y):
1299 return x * y
1300
1301 class MyManager(BaseManager):
1302 pass
1303
1304 MyManager.register('Maths', MathsClass)
1305
1306 if __name__ == '__main__':
1307 manager = MyManager()
1308 manager.start()
1309 maths = manager.Maths()
1310 print maths.add(4, 3) # prints 7
1311 print maths.mul(7, 8) # prints 56
1312
1313
1314Using a remote manager
1315>>>>>>>>>>>>>>>>>>>>>>
1316
1317It is possible to run a manager server on one machine and have clients use it
1318from other machines (assuming that the firewalls involved allow it).
1319
1320Running the following commands creates a server for a single shared queue which
1321remote clients can access::
1322
1323 >>> from multiprocessing.managers import BaseManager
1324 >>> import Queue
1325 >>> queue = Queue.Queue()
1326 >>> class QueueManager(BaseManager): pass
1327 ...
1328 >>> QueueManager.register('getQueue', callable=lambda:queue)
1329 >>> m = QueueManager(address=('', 50000), authkey='abracadabra')
1330 >>> m.serveForever()
1331
1332One client can access the server as follows::
1333
1334 >>> from multiprocessing.managers import BaseManager
1335 >>> class QueueManager(BaseManager): pass
1336 ...
1337 >>> QueueManager.register('getQueue')
1338 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000),
1339 >>> authkey='abracadabra')
1340 >>> queue = m.getQueue()
1341 >>> queue.put('hello')
1342
1343Another client can also use it::
1344
1345 >>> from multiprocessing.managers import BaseManager
1346 >>> class QueueManager(BaseManager): pass
1347 ...
1348 >>> QueueManager.register('getQueue')
1349 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra')
1350 >>> queue = m.getQueue()
1351 >>> queue.get()
1352 'hello'
1353
1354
1355Proxy Objects
1356~~~~~~~~~~~~~
1357
1358A proxy is an object which *refers* to a shared object which lives (presumably)
1359in a different process. The shared object is said to be the *referent* of the
1360proxy. Multiple proxy objects may have the same referent.
1361
1362A proxy object has methods which invoke corresponding methods of its referent
1363(although not every method of the referent will necessarily be available through
1364the proxy). A proxy can usually be used in most of the same ways that its
1365referent can::
1366
1367 >>> from multiprocessing import Manager
1368 >>> manager = Manager()
1369 >>> l = manager.list([i*i for i in range(10)])
1370 >>> print l
1371 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
1372 >>> print repr(l)
1373 <ListProxy object, typeid 'list' at 0xb799974c>
1374 >>> l[4]
1375 16
1376 >>> l[2:5]
1377 [4, 9, 16]
1378
1379Notice that applying :func:`str` to a proxy will return the representation of
1380the referent, whereas applying :func:`repr` will return the representation of
1381the proxy.
1382
1383An important feature of proxy objects is that they are picklable so they can be
1384passed between processes. Note, however, that if a proxy is sent to the
1385corresponding manager's process then unpickling it will produce the referent
1386itself. This means, for example, that one shared object can contain a second::
1387
1388 >>> a = manager.list()
1389 >>> b = manager.list()
1390 >>> a.append(b) # referent of `a` now contains referent of `b`
1391 >>> print a, b
1392 [[]] []
1393 >>> b.append('hello')
1394 >>> print a, b
1395 [['hello']] ['hello']
1396
1397.. note::
1398
1399 The proxy types in :mod:`multiprocessing` do nothing to support comparisons
1400 by value. So, for instance, ::
1401
1402 manager.list([1,2,3]) == [1,2,3]
1403
1404 will return ``False``. One should just use a copy of the referent instead
1405 when making comparisons.
1406
1407.. class:: BaseProxy
1408
1409 Proxy objects are instances of subclasses of :class:`BaseProxy`.
1410
1411 .. method:: _call_method(methodname[, args[, kwds]])
1412
1413 Call and return the result of a method of the proxy's referent.
1414
1415 If ``proxy`` is a proxy whose referent is ``obj`` then the expression ::
1416
1417 proxy._call_method(methodname, args, kwds)
1418
1419 will evaluate the expression ::
1420
1421 getattr(obj, methodname)(*args, **kwds)
1422
1423 in the manager's process.
1424
1425 The returned value will be a copy of the result of the call or a proxy to
1426 a new shared object -- see documentation for the *method_to_typeid*
1427 argument of :meth:`BaseManager.register`.
1428
1429 If an exception is raised by the call, then then is re-raised by
1430 :meth:`_call_method`. If some other exception is raised in the manager's
1431 process then this is converted into a :exc:`RemoteError` exception and is
1432 raised by :meth:`_call_method`.
1433
1434 Note in particular that an exception will be raised if *methodname* has
1435 not been *exposed*
1436
1437 An example of the usage of :meth:`_call_method()`::
1438
1439 >>> l = manager.list(range(10))
1440 >>> l._call_method('__len__')
1441 10
1442 >>> l._call_method('__getslice__', (2, 7)) # equiv to `l[2:7]`
1443 [2, 3, 4, 5, 6]
1444 >>> l._call_method('__getitem__', (20,)) # equiv to `l[20]`
1445 Traceback (most recent call last):
1446 ...
1447 IndexError: list index out of range
1448
1449 .. method:: _get_value()
1450
1451 Return a copy of the referent.
1452
1453 If the referent is unpicklable then this will raise an exception.
1454
1455 .. method:: __repr__
1456
1457 Return a representation of the proxy object.
1458
1459 .. method:: __str__
1460
1461 Return the representation of the referent.
1462
1463
1464Cleanup
1465>>>>>>>
1466
1467A proxy object uses a weakref callback so that when it gets garbage collected it
1468deregisters itself from the manager which owns its referent.
1469
1470A shared object gets deleted from the manager process when there are no longer
1471any proxies referring to it.
1472
1473
1474Process Pools
1475~~~~~~~~~~~~~
1476
1477.. module:: multiprocessing.pool
1478 :synopsis: Create pools of processes.
1479
1480One can create a pool of processes which will carry out tasks submitted to it
1481with the :class:`Pool` class in :mod:`multiprocess.pool`.
1482
1483.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]])
1484
1485 A process pool object which controls a pool of worker processes to which jobs
1486 can be submitted. It supports asynchronous results with timeouts and
1487 callbacks and has a parallel map implementation.
1488
1489 *processes* is the number of worker processes to use. If *processes* is
1490 ``None`` then the number returned by :func:`cpu_count` is used. If
1491 *initializer* is not ``None`` then each worker process will call
1492 ``initializer(*initargs)`` when it starts.
1493
1494 .. method:: apply(func[, args[, kwds]])
1495
1496 Equivalent of the :func:`apply` builtin function. It blocks till the
1497 result is ready.
1498
1499 .. method:: apply_async(func[, args[, kwds[, callback]]])
1500
1501 A variant of the :meth:`apply` method which returns a result object.
1502
1503 If *callback* is specified then it should be a callable which accepts a
1504 single argument. When the result becomes ready *callback* is applied to
1505 it (unless the call failed). *callback* should complete immediately since
1506 otherwise the thread which handles the results will get blocked.
1507
1508 .. method:: map(func, iterable[, chunksize])
1509
1510 A parallel equivalent of the :func:`map` builtin function. It blocks till
1511 the result is ready.
1512
1513 This method chops the iterable into a number of chunks which it submits to
1514 the process pool as separate tasks. The (approximate) size of these
1515 chunks can be specified by setting *chunksize* to a positive integer.
1516
1517 .. method:: map_async(func, iterable[, chunksize[, callback]])
1518
1519 A variant of the :meth:`.map` method which returns a result object.
1520
1521 If *callback* is specified then it should be a callable which accepts a
1522 single argument. When the result becomes ready *callback* is applied to
1523 it (unless the call failed). *callback* should complete immediately since
1524 otherwise the thread which handles the results will get blocked.
1525
1526 .. method:: imap(func, iterable[, chunksize])
1527
1528 An equivalent of :func:`itertools.imap`.
1529
1530 The *chunksize* argument is the same as the one used by the :meth:`.map`
1531 method. For very long iterables using a large value for *chunksize* can
1532 make make the job complete **much** faster than using the default value of
1533 ``1``.
1534
1535 Also if *chunksize* is ``1`` then the :meth:`next` method of the iterator
1536 returned by the :meth:`imap` method has an optional *timeout* parameter:
1537 ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the
1538 result cannot be returned within *timeout* seconds.
1539
1540 .. method:: imap_unordered(func, iterable[, chunksize])
1541
1542 The same as :meth:`imap` except that the ordering of the results from the
1543 returned iterator should be considered arbitrary. (Only when there is
1544 only one worker process is the order guaranteed to be "correct".)
1545
1546 .. method:: close()
1547
1548 Prevents any more tasks from being submitted to the pool. Once all the
1549 tasks have been completed the worker processes will exit.
1550
1551 .. method:: terminate()
1552
1553 Stops the worker processes immediately without completing outstanding
1554 work. When the pool object is garbage collected :meth:`terminate` will be
1555 called immediately.
1556
1557 .. method:: join()
1558
1559 Wait for the worker processes to exit. One must call :meth:`close` or
1560 :meth:`terminate` before using :meth:`join`.
1561
1562
1563.. class:: AsyncResult
1564
1565 The class of the result returned by :meth:`Pool.apply_async` and
1566 :meth:`Pool.map_async`.
1567
1568 .. method:: get([timeout)
1569
1570 Return the result when it arrives. If *timeout* is not ``None`` and the
1571 result does not arrive within *timeout* seconds then
1572 :exc:`multiprocessing.TimeoutError` is raised. If the remote call raised
1573 an exception then that exception will be reraised by :meth:`get`.
1574
1575 .. method:: wait([timeout])
1576
1577 Wait until the result is available or until *timeout* seconds pass.
1578
1579 .. method:: ready()
1580
1581 Return whether the call has completed.
1582
1583 .. method:: successful()
1584
1585 Return whether the call completed without raising an exception. Will
1586 raise :exc:`AssertionError` if the result is not ready.
1587
1588The following example demonstrates the use of a pool::
1589
1590 from multiprocessing import Pool
1591
1592 def f(x):
1593 return x*x
1594
1595 if __name__ == '__main__':
1596 pool = Pool(processes=4) # start 4 worker processes
1597
1598 result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously
1599 print result.get(timeout=1) # prints "100" unless your computer is *very* slow
1600
1601 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
1602
1603 it = pool.imap(f, range(10))
1604 print it.next() # prints "0"
1605 print it.next() # prints "1"
1606 print it.next(timeout=1) # prints "4" unless your computer is *very* slow
1607
1608 import time
1609 result = pool.applyAsync(time.sleep, (10,))
1610 print result.get(timeout=1) # raises TimeoutError
1611
1612
1613.. _multiprocessing-listeners-clients:
1614
1615Listeners and Clients
1616~~~~~~~~~~~~~~~~~~~~~
1617
1618.. module:: multiprocessing.connection
1619 :synopsis: API for dealing with sockets.
1620
1621Usually message passing between processes is done using queues or by using
1622:class:`Connection` objects returned by :func:`Pipe`.
1623
1624However, the :mod:`multiprocessing.connection` module allows some extra
1625flexibility. It basically gives a high level message oriented API for dealing
1626with sockets or Windows named pipes, and also has support for *digest
1627authentication* using the :mod:`hmac` module from the standard library.
1628
1629
1630.. function:: deliver_challenge(connection, authkey)
1631
1632 Send a randomly generated message to the other end of the connection and wait
1633 for a reply.
1634
1635 If the reply matches the digest of the message using *authkey* as the key
1636 then a welcome message is sent to the other end of the connection. Otherwise
1637 :exc:`AuthenticationError` is raised.
1638
1639.. function:: answerChallenge(connection, authkey)
1640
1641 Receive a message, calculate the digest of the message using *authkey* as the
1642 key, and then send the digest back.
1643
1644 If a welcome message is not received, then :exc:`AuthenticationError` is
1645 raised.
1646
1647.. function:: Client(address[, family[, authenticate[, authkey]]])
1648
1649 Attempt to set up a connection to the listener which is using address
1650 *address*, returning a :class:`Connection`.
1651
1652 The type of the connection is determined by *family* argument, but this can
1653 generally be omitted since it can usually be inferred from the format of
1654 *address*. (See :ref:`multiprocessing-address-formats`)
1655
1656 If *authentication* is ``True`` or *authkey* is a string then digest
1657 authentication is used. The key used for authentication will be either
1658 *authkey* or ``current_process().get_auth_key()`` if *authkey* is ``None``.
1659 If authentication fails then :exc:`AuthenticationError` is raised. See
1660 :ref:`multiprocessing-auth-keys`.
1661
1662.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
1663
1664 A wrapper for a bound socket or Windows named pipe which is 'listening' for
1665 connections.
1666
1667 *address* is the address to be used by the bound socket or named pipe of the
1668 listener object.
1669
1670 *family* is the type of socket (or named pipe) to use. This can be one of
1671 the strings ``'AF_INET'`` (for a TCP socket), ``'AF_UNIX'`` (for a Unix
1672 domain socket) or ``'AF_PIPE'`` (for a Windows named pipe). Of these only
1673 the first is guaranteed to be available. If *family* is ``None`` then the
1674 family is inferred from the format of *address*. If *address* is also
1675 ``None`` then a default is chosen. This default is the family which is
1676 assumed to be the fastest available. See
1677 :ref:`multiprocessing-address-formats`. Note that if *family* is
1678 ``'AF_UNIX'`` and address is ``None`` then the socket will be created in a
1679 private temporary directory created using :func:`tempfile.mkstemp`.
1680
1681 If the listener object uses a socket then *backlog* (1 by default) is passed
1682 to the :meth:`listen` method of the socket once it has been bound.
1683
1684 If *authenticate* is ``True`` (``False`` by default) or *authkey* is not
1685 ``None`` then digest authentication is used.
1686
1687 If *authkey* is a string then it will be used as the authentication key;
1688 otherwise it must be *None*.
1689
1690 If *authkey* is ``None`` and *authenticate* is ``True`` then
1691 ``current_process().get_auth_key()`` is used as the authentication key. If
1692 *authkey* is ``None`` and *authentication* is ``False`` then no
1693 authentication is done. If authentication fails then
1694 :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`.
1695
1696 .. method:: accept()
1697
1698 Accept a connection on the bound socket or named pipe of the listener
1699 object and return a :class:`Connection` object. If authentication is
1700 attempted and fails, then :exc:`AuthenticationError` is raised.
1701
1702 .. method:: close()
1703
1704 Close the bound socket or named pipe of the listener object. This is
1705 called automatically when the listener is garbage collected. However it
1706 is advisable to call it explicitly.
1707
1708 Listener objects have the following read-only properties:
1709
1710 .. attribute:: address
1711
1712 The address which is being used by the Listener object.
1713
1714 .. attribute:: last_accepted
1715
1716 The address from which the last accepted connection came. If this is
1717 unavailable then it is ``None``.
1718
1719
1720The module defines two exceptions:
1721
1722.. exception:: AuthenticationError
1723
1724 Exception raised when there is an authentication error.
1725
1726.. exception:: BufferTooShort
1727
1728 Exception raise by the :meth:`Connection.recv_bytes_into` method of a
1729 connection object when the supplied buffer object is too small for the
1730 message read.
1731
1732 If *e* is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
1733 the message as a byte string.
1734
1735
1736**Examples**
1737
1738The following server code creates a listener which uses ``'secret password'`` as
1739an authentication key. It then waits for a connection and sends some data to
1740the client::
1741
1742 from multiprocessing.connection import Listener
1743 from array import array
1744
1745 address = ('localhost', 6000) # family is deduced to be 'AF_INET'
1746 listener = Listener(address, authkey='secret password')
1747
1748 conn = listener.accept()
1749 print 'connection accepted from', listener.last_accepted
1750
1751 conn.send([2.25, None, 'junk', float])
1752
1753 conn.send_bytes('hello')
1754
1755 conn.send_bytes(array('i', [42, 1729]))
1756
1757 conn.close()
1758 listener.close()
1759
1760The following code connects to the server and receives some data from the
1761server::
1762
1763 from multiprocessing.connection import Client
1764 from array import array
1765
1766 address = ('localhost', 6000)
1767 conn = Client(address, authkey='secret password')
1768
1769 print conn.recv() # => [2.25, None, 'junk', float]
1770
1771 print conn.recv_bytes() # => 'hello'
1772
1773 arr = array('i', [0, 0, 0, 0, 0])
1774 print conn.recv_bytes_into(arr) # => 8
1775 print arr # => array('i', [42, 1729, 0, 0, 0])
1776
1777 conn.close()
1778
1779
1780.. _multiprocessing-address-formats:
1781
1782Address Formats
1783>>>>>>>>>>>>>>>
1784
1785* An ``'AF_INET'`` address is a tuple of the form ``(hostname, port)``` where
1786 *hostname* is a string and *port* is an integer.
1787
1788* An ``'AF_UNIX'``` address is a string representing a filename on the
1789 filesystem.
1790
1791* An ``'AF_PIPE'`` address is a string of the form
1792 ``r'\\\\.\\pipe\\PipeName'``. To use :func:`Client` to connect to a named
1793 pipe on a remote computer called ServerName* one should use an address of the
1794 form ``r'\\\\ServerName\\pipe\\PipeName'`` instead.
1795
1796Note that any string beginning with two backslashes is assumed by default to be
1797an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address.
1798
1799
1800.. _multiprocessing-auth-keys:
1801
1802Authentication keys
1803~~~~~~~~~~~~~~~~~~~
1804
1805When one uses :meth:`Connection.recv`, the data received is automatically
1806unpickled. Unfortunately unpickling data from an untrusted source is a security
1807risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module
1808to provide digest authentication.
1809
1810An authentication key is a string which can be thought of as a password: once a
1811connection is established both ends will demand proof that the other knows the
1812authentication key. (Demonstrating that both ends are using the same key does
1813**not** involve sending the key over the connection.)
1814
1815If authentication is requested but do authentication key is specified then the
1816return value of ``current_process().get_auth_key`` is used (see
1817:class:`Process`). This value will automatically inherited by any
1818:class:`Process` object that the current process creates. This means that (by
1819default) all processes of a multi-process program will share a single
1820authentication key which can be used when setting up connections between the
1821themselves.
1822
1823Suitable authentication keys can also be generated by using :func:`os.urandom`.
1824
1825
1826Logging
1827~~~~~~~
1828
1829Some support for logging is available. Note, however, that the :mod:`logging`
1830package does not use process shared locks so it is possible (depending on the
1831handler type) for messages from different processes to get mixed up.
1832
1833.. currentmodule:: multiprocessing
1834.. function:: get_logger()
1835
1836 Returns the logger used by :mod:`multiprocessing`. If necessary, a new one
1837 will be created.
1838
1839 When first created the logger has level :data:`logging.NOTSET` and has a
1840 handler which sends output to :data:`sys.stderr` using format
1841 ``'[%(levelname)s/%(processName)s] %(message)s'``. (The logger allows use of
1842 the non-standard ``'%(processName)s'`` format.) Message sent to this logger
1843 will not by default propogate to the root logger.
1844
1845 Note that on Windows child processes will only inherit the level of the
1846 parent process's logger -- any other customization of the logger will not be
1847 inherited.
1848
1849Below is an example session with logging turned on::
1850
1851 >>> import processing, logging
1852 >>> logger = processing.getLogger()
1853 >>> logger.setLevel(logging.INFO)
1854 >>> logger.warning('doomed')
1855 [WARNING/MainProcess] doomed
1856 >>> m = processing.Manager()
1857 [INFO/SyncManager-1] child process calling self.run()
1858 [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa'
1859 >>> del m
1860 [INFO/MainProcess] sending shutdown message to manager
1861 [INFO/SyncManager-1] manager exiting with exitcode 0
1862
1863
1864The :mod:`multiprocessing.dummy` module
1865~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1866
1867.. module:: multiprocessing.dummy
1868 :synopsis: Dumb wrapper around threading.
1869
1870:mod:`multiprocessing.dummy` replicates the API of :mod:`multiprocessing` but is
1871no more than a wrapper around the `threading` module.
1872
1873
1874.. _multiprocessing-programming:
1875
1876Programming guidelines
1877----------------------
1878
1879There are certain guidelines and idioms which should be adhered to when using
1880:mod:`multiprocessing`.
1881
1882
1883All platforms
1884~~~~~~~~~~~~~
1885
1886Avoid shared state
1887
1888 As far as possible one should try to avoid shifting large amounts of data
1889 between processes.
1890
1891 It is probably best to stick to using queues or pipes for communication
1892 between processes rather than using the lower level synchronization
1893 primitives from the :mod:`threading` module.
1894
1895Picklability
1896
1897 Ensure that the arguments to the methods of proxies are picklable.
1898
1899Thread safety of proxies
1900
1901 Do not use a proxy object from more than one thread unless you protect it
1902 with a lock.
1903
1904 (There is never a problem with different processes using the *same* proxy.)
1905
1906Joining zombie processes
1907
1908 On Unix when a process finishes but has not been joined it becomes a zombie.
1909 There should never be very many because each time a new process starts (or
1910 :func:`active_children` is called) all completed processes which have not
1911 yet been joined will be joined. Also calling a finished process's
1912 :meth:`Process.is_alive` will join the process. Even so it is probably good
1913 practice to explicitly join all the processes that you start.
1914
1915Better to inherit than pickle/unpickle
1916
1917 On Windows many of types from :mod:`multiprocessing` need to be picklable so
1918 that child processes can use them. However, one should generally avoid
1919 sending shared objects to other processes using pipes or queues. Instead
1920 you should arrange the program so that a process which need access to a
1921 shared resource created elsewhere can inherit it from an ancestor process.
1922
1923Avoid terminating processes
1924
1925 Using the :meth:`Process.terminate` method to stop a process is liable to
1926 cause any shared resources (such as locks, semaphores, pipes and queues)
1927 currently being used by the process to become broken or unavailable to other
1928 processes.
1929
1930 Therefore it is probably best to only consider using
1931 :meth:`Process.terminate()` on processes which never use any shared
1932 resources.
1933
1934Joining processes that use queues
1935
1936 Bear in mind that a process that has put items in a queue will wait before
1937 terminating until all the buffered items are fed by the "feeder" thread to
1938 the underlying pipe. (The child process can call the
1939 :meth:`Queue.cancel_join` method of the queue to avoid this behaviour.)
1940
1941 This means that whenever you use a queue you need to make sure that all
1942 items which have been put on the queue will eventually be removed before the
1943 process is joined. Otherwise you cannot be sure that processes which have
1944 put items on the queue will terminate. Remember also that non-daemonic
1945 processes will be automatically be joined.
1946
1947 An example which will deadlock is the following::
1948
1949 from multiprocessing import Process, Queue
1950
1951 def f(q):
1952 q.put('X' * 1000000)
1953
1954 if __name__ == '__main__':
1955 queue = Queue()
1956 p = Process(target=f, args=(queue,))
1957 p.start()
1958 p.join() # this deadlocks
1959 obj = queue.get()
1960
1961 A fix here would be to swap the last two lines round (or simply remove the
1962 ``p.join()`` line).
1963
1964Explicity pass resources to child processes
1965
1966 On Unix a child process can make use of a shared resource created in a
1967 parent process using a global resource. However, it is better to pass the
1968 object as an argument to the constructor for the child process.
1969
1970 Apart from making the code (potentially) compatible with Windows this also
1971 ensures that as long as the child process is still alive the object will not
1972 be garbage collected in the parent process. This might be important if some
1973 resource is freed when the object is garbage collected in the parent
1974 process.
1975
1976 So for instance ::
1977
1978 from multiprocessing import Process, Lock
1979
1980 def f():
1981 ... do something using "lock" ...
1982
1983 if __name__ == '__main__':
1984 lock = Lock()
1985 for i in range(10):
1986 Process(target=f).start()
1987
1988 should be rewritten as ::
1989
1990 from multiprocessing import Process, Lock
1991
1992 def f(l):
1993 ... do something using "l" ...
1994
1995 if __name__ == '__main__':
1996 lock = Lock()
1997 for i in range(10):
1998 Process(target=f, args=(lock,)).start()
1999
2000
2001Windows
2002~~~~~~~
2003
2004Since Windows lacks :func:`os.fork` it has a few extra restrictions:
2005
2006More picklability
2007
2008 Ensure that all arguments to :meth:`Process.__init__` are picklable. This
2009 means, in particular, that bound or unbound methods cannot be used directly
2010 as the ``target`` argument on Windows --- just define a function and use
2011 that instead.
2012
2013 Also, if you subclass :class:`Process` then make sure that instances will be
2014 picklable when the :meth:`Process.start` method is called.
2015
2016Global variables
2017
2018 Bear in mind that if code run in a child process tries to access a global
2019 variable, then the value it sees (if any) may not be the same as the value
2020 in the parent process at the time that :meth:`Process.start` was called.
2021
2022 However, global variables which are just module level constants cause no
2023 problems.
2024
2025Safe importing of main module
2026
2027 Make sure that the main module can be safely imported by a new Python
2028 interpreter without causing unintended side effects (such a starting a new
2029 process).
2030
2031 For example, under Windows running the following module would fail with a
2032 :exc:`RuntimeError`::
2033
2034 from multiprocessing import Process
2035
2036 def foo():
2037 print 'hello'
2038
2039 p = Process(target=foo)
2040 p.start()
2041
2042 Instead one should protect the "entry point" of the program by using ``if
2043 __name__ == '__main__':`` as follows::
2044
2045 from multiprocessing import Process, freeze_support
2046
2047 def foo():
2048 print 'hello'
2049
2050 if __name__ == '__main__':
2051 freeze_support()
2052 p = Process(target=foo)
2053 p.start()
2054
2055 (The :func:`freeze_support()` line can be omitted if the program will be run
2056 normally instead of frozen.)
2057
2058 This allows the newly spawned Python interpreter to safely import the module
2059 and then run the module's ``foo()`` function.
2060
2061 Similar restrictions apply if a pool or manager is created in the main
2062 module.
2063
2064
2065.. _multiprocessing-examples:
2066
2067Examples
2068--------
2069
2070Demonstration of how to create and use customized managers and proxies:
2071
2072.. literalinclude:: ../includes/mp_newtype.py
2073
2074
2075Using :class:`Pool`:
2076
2077.. literalinclude:: ../includes/mp_pool.py
2078
2079
2080Synchronization types like locks, conditions and queues:
2081
2082.. literalinclude:: ../includes/mp_synchronize.py
2083
2084
2085An showing how to use queues to feed tasks to a collection of worker process and
2086collect the results:
2087
2088.. literalinclude:: ../includes/mp_workers.py
2089
2090
2091An example of how a pool of worker processes can each run a
2092:class:`SimpleHTTPServer.HttpServer` instance while sharing a single listening
2093socket.
2094
2095.. literalinclude:: ../includes/mp_webserver.py
2096
2097
2098Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
2099
2100.. literalinclude:: ../includes/mp_benchmarks.py
2101
2102An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
2103and others to build a system which can distribute processes and work via a
2104distributed queue to a "cluster" of machines on a network, accessible via SSH.
2105You will need to have private key authentication for all hosts configured for
2106this to work.
2107
2108.. literalinclude:: ../includes/mp_distributing.py