blob: 795e6cb9bf8595ca300e51f2416831d736eaa9ce [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Simple example which uses a pool of workers to carry out some tasks.
3#
4# Notice that the results will probably not come out of the output
5# queue in the same in the same order as the corresponding tasks were
6# put on the input queue. If it is important to get the results back
7# in the original order then consider using `Pool.map()` or
8# `Pool.imap()` (which will save on the amount of code needed anyway).
9#
10
11import time
12import random
13
14from multiprocessing import Process, Queue, current_process, freeze_support
15
16#
17# Function run by worker processes
18#
19
20def worker(input, output):
21 for func, args in iter(input.get, 'STOP'):
22 result = calculate(func, args)
23 output.put(result)
24
25#
26# Function used to calculate result
27#
28
29def calculate(func, args):
30 result = func(*args)
31 return '%s says that %s%s = %s' % \
32 (current_process().get_name(), func.__name__, args, result)
33
34#
35# Functions referenced by tasks
36#
37
38def mul(a, b):
39 time.sleep(0.5*random.random())
40 return a * b
41
42def plus(a, b):
43 time.sleep(0.5*random.random())
44 return a + b
45
46#
47#
48#
49
50def test():
51 NUMBER_OF_PROCESSES = 4
52 TASKS1 = [(mul, (i, 7)) for i in range(20)]
53 TASKS2 = [(plus, (i, 8)) for i in range(10)]
54
55 # Create queues
56 task_queue = Queue()
57 done_queue = Queue()
58
59 # Submit tasks
60 for task in TASKS1:
61 task_queue.put(task)
62
63 # Start worker processes
64 for i in range(NUMBER_OF_PROCESSES):
65 Process(target=worker, args=(task_queue, done_queue)).start()
66
67 # Get and print results
68 print 'Unordered results:'
69 for i in range(len(TASKS1)):
70 print '\t', done_queue.get()
71
72 # Add more tasks using `put()`
73 for task in TASKS2:
74 task_queue.put(task)
75
76 # Get and print some more results
77 for i in range(len(TASKS2)):
78 print '\t', done_queue.get()
79
80 # Tell child processes to stop
81 for i in range(NUMBER_OF_PROCESSES):
82 task_queue.put('STOP')
83
84
85if __name__ == '__main__':
86 freeze_support()
87 test()