Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Simple example which uses a pool of workers to carry out some tasks. |
| 3 | # |
| 4 | # Notice that the results will probably not come out of the output |
| 5 | # queue in the same in the same order as the corresponding tasks were |
| 6 | # put on the input queue. If it is important to get the results back |
| 7 | # in the original order then consider using `Pool.map()` or |
| 8 | # `Pool.imap()` (which will save on the amount of code needed anyway). |
| 9 | # |
| 10 | |
| 11 | import time |
| 12 | import random |
| 13 | |
| 14 | from multiprocessing import Process, Queue, current_process, freeze_support |
| 15 | |
| 16 | # |
| 17 | # Function run by worker processes |
| 18 | # |
| 19 | |
| 20 | def worker(input, output): |
| 21 | for func, args in iter(input.get, 'STOP'): |
| 22 | result = calculate(func, args) |
| 23 | output.put(result) |
| 24 | |
| 25 | # |
| 26 | # Function used to calculate result |
| 27 | # |
| 28 | |
| 29 | def calculate(func, args): |
| 30 | result = func(*args) |
| 31 | return '%s says that %s%s = %s' % \ |
| 32 | (current_process().get_name(), func.__name__, args, result) |
| 33 | |
| 34 | # |
| 35 | # Functions referenced by tasks |
| 36 | # |
| 37 | |
| 38 | def mul(a, b): |
| 39 | time.sleep(0.5*random.random()) |
| 40 | return a * b |
| 41 | |
| 42 | def plus(a, b): |
| 43 | time.sleep(0.5*random.random()) |
| 44 | return a + b |
| 45 | |
| 46 | # |
| 47 | # |
| 48 | # |
| 49 | |
| 50 | def test(): |
| 51 | NUMBER_OF_PROCESSES = 4 |
| 52 | TASKS1 = [(mul, (i, 7)) for i in range(20)] |
| 53 | TASKS2 = [(plus, (i, 8)) for i in range(10)] |
| 54 | |
| 55 | # Create queues |
| 56 | task_queue = Queue() |
| 57 | done_queue = Queue() |
| 58 | |
| 59 | # Submit tasks |
| 60 | for task in TASKS1: |
| 61 | task_queue.put(task) |
| 62 | |
| 63 | # Start worker processes |
| 64 | for i in range(NUMBER_OF_PROCESSES): |
| 65 | Process(target=worker, args=(task_queue, done_queue)).start() |
| 66 | |
| 67 | # Get and print results |
| 68 | print 'Unordered results:' |
| 69 | for i in range(len(TASKS1)): |
| 70 | print '\t', done_queue.get() |
| 71 | |
| 72 | # Add more tasks using `put()` |
| 73 | for task in TASKS2: |
| 74 | task_queue.put(task) |
| 75 | |
| 76 | # Get and print some more results |
| 77 | for i in range(len(TASKS2)): |
| 78 | print '\t', done_queue.get() |
| 79 | |
| 80 | # Tell child processes to stop |
| 81 | for i in range(NUMBER_OF_PROCESSES): |
| 82 | task_queue.put('STOP') |
| 83 | |
| 84 | |
| 85 | if __name__ == '__main__': |
| 86 | freeze_support() |
| 87 | test() |