| # |
| # A test file for the `multiprocessing` package |
| # |
| # Copyright (c) 2006-2008, R Oudkerk |
| # All rights reserved. |
| # |
| |
| import time |
| import sys |
| import random |
| from queue import Empty |
| |
| import multiprocessing # may get overwritten |
| |
| |
| #### TEST_VALUE |
| |
| def value_func(running, mutex): |
| random.seed() |
| time.sleep(random.random()*4) |
| |
| mutex.acquire() |
| print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished') |
| running.value -= 1 |
| mutex.release() |
| |
| def test_value(): |
| TASKS = 10 |
| running = multiprocessing.Value('i', TASKS) |
| mutex = multiprocessing.Lock() |
| |
| for i in range(TASKS): |
| p = multiprocessing.Process(target=value_func, args=(running, mutex)) |
| p.start() |
| |
| while running.value > 0: |
| time.sleep(0.08) |
| mutex.acquire() |
| print(running.value, end=' ') |
| sys.stdout.flush() |
| mutex.release() |
| |
| print() |
| print('No more running processes') |
| |
| |
| #### TEST_QUEUE |
| |
| def queue_func(queue): |
| for i in range(30): |
| time.sleep(0.5 * random.random()) |
| queue.put(i*i) |
| queue.put('STOP') |
| |
| def test_queue(): |
| q = multiprocessing.Queue() |
| |
| p = multiprocessing.Process(target=queue_func, args=(q,)) |
| p.start() |
| |
| o = None |
| while o != 'STOP': |
| try: |
| o = q.get(timeout=0.3) |
| print(o, end=' ') |
| sys.stdout.flush() |
| except Empty: |
| print('TIMEOUT') |
| |
| print() |
| |
| |
| #### TEST_CONDITION |
| |
| def condition_func(cond): |
| cond.acquire() |
| print('\t' + str(cond)) |
| time.sleep(2) |
| print('\tchild is notifying') |
| print('\t' + str(cond)) |
| cond.notify() |
| cond.release() |
| |
| def test_condition(): |
| cond = multiprocessing.Condition() |
| |
| p = multiprocessing.Process(target=condition_func, args=(cond,)) |
| print(cond) |
| |
| cond.acquire() |
| print(cond) |
| cond.acquire() |
| print(cond) |
| |
| p.start() |
| |
| print('main is waiting') |
| cond.wait() |
| print('main has woken up') |
| |
| print(cond) |
| cond.release() |
| print(cond) |
| cond.release() |
| |
| p.join() |
| print(cond) |
| |
| |
| #### TEST_SEMAPHORE |
| |
| def semaphore_func(sema, mutex, running): |
| sema.acquire() |
| |
| mutex.acquire() |
| running.value += 1 |
| print(running.value, 'tasks are running') |
| mutex.release() |
| |
| random.seed() |
| time.sleep(random.random()*2) |
| |
| mutex.acquire() |
| running.value -= 1 |
| print('%s has finished' % multiprocessing.current_process()) |
| mutex.release() |
| |
| sema.release() |
| |
| def test_semaphore(): |
| sema = multiprocessing.Semaphore(3) |
| mutex = multiprocessing.RLock() |
| running = multiprocessing.Value('i', 0) |
| |
| processes = [ |
| multiprocessing.Process(target=semaphore_func, |
| args=(sema, mutex, running)) |
| for i in range(10) |
| ] |
| |
| for p in processes: |
| p.start() |
| |
| for p in processes: |
| p.join() |
| |
| |
| #### TEST_JOIN_TIMEOUT |
| |
| def join_timeout_func(): |
| print('\tchild sleeping') |
| time.sleep(5.5) |
| print('\n\tchild terminating') |
| |
| def test_join_timeout(): |
| p = multiprocessing.Process(target=join_timeout_func) |
| p.start() |
| |
| print('waiting for process to finish') |
| |
| while 1: |
| p.join(timeout=1) |
| if not p.is_alive(): |
| break |
| print('.', end=' ') |
| sys.stdout.flush() |
| |
| |
| #### TEST_EVENT |
| |
| def event_func(event): |
| print('\t%r is waiting' % multiprocessing.current_process()) |
| event.wait() |
| print('\t%r has woken up' % multiprocessing.current_process()) |
| |
| def test_event(): |
| event = multiprocessing.Event() |
| |
| processes = [multiprocessing.Process(target=event_func, args=(event,)) |
| for i in range(5)] |
| |
| for p in processes: |
| p.start() |
| |
| print('main is sleeping') |
| time.sleep(2) |
| |
| print('main is setting event') |
| event.set() |
| |
| for p in processes: |
| p.join() |
| |
| |
| #### TEST_SHAREDVALUES |
| |
| def sharedvalues_func(values, arrays, shared_values, shared_arrays): |
| for i in range(len(values)): |
| v = values[i][1] |
| sv = shared_values[i].value |
| assert v == sv |
| |
| for i in range(len(values)): |
| a = arrays[i][1] |
| sa = list(shared_arrays[i][:]) |
| assert a == sa |
| |
| print('Tests passed') |
| |
| def test_sharedvalues(): |
| values = [ |
| ('i', 10), |
| ('h', -2), |
| ('d', 1.25) |
| ] |
| arrays = [ |
| ('i', list(range(100))), |
| ('d', [0.25 * i for i in range(100)]), |
| ('H', list(range(1000))) |
| ] |
| |
| shared_values = [multiprocessing.Value(id, v) for id, v in values] |
| shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] |
| |
| p = multiprocessing.Process( |
| target=sharedvalues_func, |
| args=(values, arrays, shared_values, shared_arrays) |
| ) |
| p.start() |
| p.join() |
| |
| assert p.exitcode == 0 |
| |
| |
| #### |
| |
| def test(namespace=multiprocessing): |
| global multiprocessing |
| |
| multiprocessing = namespace |
| |
| for func in [test_value, test_queue, test_condition, |
| test_semaphore, test_join_timeout, test_event, |
| test_sharedvalues]: |
| |
| print('\n\t######## %s\n' % func.__name__) |
| func() |
| |
| ignore = multiprocessing.active_children() # cleanup any old processes |
| if hasattr(multiprocessing, '_debug_info'): |
| info = multiprocessing._debug_info() |
| if info: |
| print(info) |
| raise ValueError('there should be no positive refcounts left') |
| |
| |
| if __name__ == '__main__': |
| multiprocessing.freeze_support() |
| |
| assert len(sys.argv) in (1, 2) |
| |
| if len(sys.argv) == 1 or sys.argv[1] == 'processes': |
| print(' Using processes '.center(79, '-')) |
| namespace = multiprocessing |
| elif sys.argv[1] == 'manager': |
| print(' Using processes and a manager '.center(79, '-')) |
| namespace = multiprocessing.Manager() |
| namespace.Process = multiprocessing.Process |
| namespace.current_process = multiprocessing.current_process |
| namespace.active_children = multiprocessing.active_children |
| elif sys.argv[1] == 'threads': |
| print(' Using threads '.center(79, '-')) |
| import multiprocessing.dummy as namespace |
| else: |
| print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]) |
| raise SystemExit(2) |
| |
| test(namespace) |