| # | 
 | # A test file for the `multiprocessing` package | 
 | # | 
 | # Copyright (c) 2006-2008, R Oudkerk | 
 | # All rights reserved. | 
 | # | 
 |  | 
 | import time | 
 | import sys | 
 | import random | 
 | from queue import Empty | 
 |  | 
 | import multiprocessing               # may get overwritten | 
 |  | 
 |  | 
 | #### TEST_VALUE | 
 |  | 
 | def value_func(running, mutex): | 
 |     random.seed() | 
 |     time.sleep(random.random()*4) | 
 |  | 
 |     mutex.acquire() | 
 |     print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished') | 
 |     running.value -= 1 | 
 |     mutex.release() | 
 |  | 
 | def test_value(): | 
 |     TASKS = 10 | 
 |     running = multiprocessing.Value('i', TASKS) | 
 |     mutex = multiprocessing.Lock() | 
 |  | 
 |     for i in range(TASKS): | 
 |         p = multiprocessing.Process(target=value_func, args=(running, mutex)) | 
 |         p.start() | 
 |  | 
 |     while running.value > 0: | 
 |         time.sleep(0.08) | 
 |         mutex.acquire() | 
 |         print(running.value, end=' ') | 
 |         sys.stdout.flush() | 
 |         mutex.release() | 
 |  | 
 |     print() | 
 |     print('No more running processes') | 
 |  | 
 |  | 
 | #### TEST_QUEUE | 
 |  | 
 | def queue_func(queue): | 
 |     for i in range(30): | 
 |         time.sleep(0.5 * random.random()) | 
 |         queue.put(i*i) | 
 |     queue.put('STOP') | 
 |  | 
 | def test_queue(): | 
 |     q = multiprocessing.Queue() | 
 |  | 
 |     p = multiprocessing.Process(target=queue_func, args=(q,)) | 
 |     p.start() | 
 |  | 
 |     o = None | 
 |     while o != 'STOP': | 
 |         try: | 
 |             o = q.get(timeout=0.3) | 
 |             print(o, end=' ') | 
 |             sys.stdout.flush() | 
 |         except Empty: | 
 |             print('TIMEOUT') | 
 |  | 
 |     print() | 
 |  | 
 |  | 
 | #### TEST_CONDITION | 
 |  | 
 | def condition_func(cond): | 
 |     cond.acquire() | 
 |     print('\t' + str(cond)) | 
 |     time.sleep(2) | 
 |     print('\tchild is notifying') | 
 |     print('\t' + str(cond)) | 
 |     cond.notify() | 
 |     cond.release() | 
 |  | 
 | def test_condition(): | 
 |     cond = multiprocessing.Condition() | 
 |  | 
 |     p = multiprocessing.Process(target=condition_func, args=(cond,)) | 
 |     print(cond) | 
 |  | 
 |     cond.acquire() | 
 |     print(cond) | 
 |     cond.acquire() | 
 |     print(cond) | 
 |  | 
 |     p.start() | 
 |  | 
 |     print('main is waiting') | 
 |     cond.wait() | 
 |     print('main has woken up') | 
 |  | 
 |     print(cond) | 
 |     cond.release() | 
 |     print(cond) | 
 |     cond.release() | 
 |  | 
 |     p.join() | 
 |     print(cond) | 
 |  | 
 |  | 
 | #### TEST_SEMAPHORE | 
 |  | 
 | def semaphore_func(sema, mutex, running): | 
 |     sema.acquire() | 
 |  | 
 |     mutex.acquire() | 
 |     running.value += 1 | 
 |     print(running.value, 'tasks are running') | 
 |     mutex.release() | 
 |  | 
 |     random.seed() | 
 |     time.sleep(random.random()*2) | 
 |  | 
 |     mutex.acquire() | 
 |     running.value -= 1 | 
 |     print('%s has finished' % multiprocessing.current_process()) | 
 |     mutex.release() | 
 |  | 
 |     sema.release() | 
 |  | 
 | def test_semaphore(): | 
 |     sema = multiprocessing.Semaphore(3) | 
 |     mutex = multiprocessing.RLock() | 
 |     running = multiprocessing.Value('i', 0) | 
 |  | 
 |     processes = [ | 
 |         multiprocessing.Process(target=semaphore_func, | 
 |                                 args=(sema, mutex, running)) | 
 |         for i in range(10) | 
 |         ] | 
 |  | 
 |     for p in processes: | 
 |         p.start() | 
 |  | 
 |     for p in processes: | 
 |         p.join() | 
 |  | 
 |  | 
 | #### TEST_JOIN_TIMEOUT | 
 |  | 
 | def join_timeout_func(): | 
 |     print('\tchild sleeping') | 
 |     time.sleep(5.5) | 
 |     print('\n\tchild terminating') | 
 |  | 
 | def test_join_timeout(): | 
 |     p = multiprocessing.Process(target=join_timeout_func) | 
 |     p.start() | 
 |  | 
 |     print('waiting for process to finish') | 
 |  | 
 |     while 1: | 
 |         p.join(timeout=1) | 
 |         if not p.is_alive(): | 
 |             break | 
 |         print('.', end=' ') | 
 |         sys.stdout.flush() | 
 |  | 
 |  | 
 | #### TEST_EVENT | 
 |  | 
 | def event_func(event): | 
 |     print('\t%r is waiting' % multiprocessing.current_process()) | 
 |     event.wait() | 
 |     print('\t%r has woken up' % multiprocessing.current_process()) | 
 |  | 
 | def test_event(): | 
 |     event = multiprocessing.Event() | 
 |  | 
 |     processes = [multiprocessing.Process(target=event_func, args=(event,)) | 
 |                  for i in range(5)] | 
 |  | 
 |     for p in processes: | 
 |         p.start() | 
 |  | 
 |     print('main is sleeping') | 
 |     time.sleep(2) | 
 |  | 
 |     print('main is setting event') | 
 |     event.set() | 
 |  | 
 |     for p in processes: | 
 |         p.join() | 
 |  | 
 |  | 
 | #### TEST_SHAREDVALUES | 
 |  | 
 | def sharedvalues_func(values, arrays, shared_values, shared_arrays): | 
 |     for i in range(len(values)): | 
 |         v = values[i][1] | 
 |         sv = shared_values[i].value | 
 |         assert v == sv | 
 |  | 
 |     for i in range(len(values)): | 
 |         a = arrays[i][1] | 
 |         sa = list(shared_arrays[i][:]) | 
 |         assert a == sa | 
 |  | 
 |     print('Tests passed') | 
 |  | 
 | def test_sharedvalues(): | 
 |     values = [ | 
 |         ('i', 10), | 
 |         ('h', -2), | 
 |         ('d', 1.25) | 
 |         ] | 
 |     arrays = [ | 
 |         ('i', list(range(100))), | 
 |         ('d', [0.25 * i for i in range(100)]), | 
 |         ('H', list(range(1000))) | 
 |         ] | 
 |  | 
 |     shared_values = [multiprocessing.Value(id, v) for id, v in values] | 
 |     shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] | 
 |  | 
 |     p = multiprocessing.Process( | 
 |         target=sharedvalues_func, | 
 |         args=(values, arrays, shared_values, shared_arrays) | 
 |         ) | 
 |     p.start() | 
 |     p.join() | 
 |  | 
 |     assert p.exitcode == 0 | 
 |  | 
 |  | 
 | #### | 
 |  | 
 | def test(namespace=multiprocessing): | 
 |     global multiprocessing | 
 |  | 
 |     multiprocessing = namespace | 
 |  | 
 |     for func in [test_value, test_queue, test_condition, | 
 |                  test_semaphore, test_join_timeout, test_event, | 
 |                  test_sharedvalues]: | 
 |  | 
 |         print('\n\t######## %s\n' % func.__name__) | 
 |         func() | 
 |  | 
 |     ignore = multiprocessing.active_children()      # cleanup any old processes | 
 |     if hasattr(multiprocessing, '_debug_info'): | 
 |         info = multiprocessing._debug_info() | 
 |         if info: | 
 |             print(info) | 
 |             raise ValueError('there should be no positive refcounts left') | 
 |  | 
 |  | 
 | if __name__ == '__main__': | 
 |     multiprocessing.freeze_support() | 
 |  | 
 |     assert len(sys.argv) in (1, 2) | 
 |  | 
 |     if len(sys.argv) == 1 or sys.argv[1] == 'processes': | 
 |         print(' Using processes '.center(79, '-')) | 
 |         namespace = multiprocessing | 
 |     elif sys.argv[1] == 'manager': | 
 |         print(' Using processes and a manager '.center(79, '-')) | 
 |         namespace = multiprocessing.Manager() | 
 |         namespace.Process = multiprocessing.Process | 
 |         namespace.current_process = multiprocessing.current_process | 
 |         namespace.active_children = multiprocessing.active_children | 
 |     elif sys.argv[1] == 'threads': | 
 |         print(' Using threads '.center(79, '-')) | 
 |         import multiprocessing.dummy as namespace | 
 |     else: | 
 |         print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]) | 
 |         raise SystemExit(2) | 
 |  | 
 |     test(namespace) |