Merged revisions 64104,64117 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r64104 | benjamin.peterson | 2008-06-10 21:40:25 -0500 (Tue, 10 Jun 2008) | 2 lines

  add the multiprocessing package to fulfill PEP 371
........
  r64117 | benjamin.peterson | 2008-06-11 07:26:31 -0500 (Wed, 11 Jun 2008) | 2 lines

  fix import of multiprocessing by juggling imports
........
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
new file mode 100644
index 0000000..8cf11bd
--- /dev/null
+++ b/Doc/includes/mp_synchronize.py
@@ -0,0 +1,273 @@
+#
+# A test file for the `multiprocessing` package
+#
+
+import time, sys, random
+from Queue import Empty
+
+import multiprocessing               # may get overwritten
+
+
+#### TEST_VALUE
+
+def value_func(running, mutex):
+    random.seed()
+    time.sleep(random.random()*4)
+
+    mutex.acquire()
+    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
+    running.value -= 1
+    mutex.release()
+
+def test_value():
+    TASKS = 10
+    running = multiprocessing.Value('i', TASKS)
+    mutex = multiprocessing.Lock()
+
+    for i in range(TASKS):
+        p = multiprocessing.Process(target=value_func, args=(running, mutex))
+        p.start()
+
+    while running.value > 0:
+        time.sleep(0.08)
+        mutex.acquire()
+        print running.value,
+        sys.stdout.flush()
+        mutex.release()
+
+    print
+    print 'No more running processes'
+
+
+#### TEST_QUEUE
+
+def queue_func(queue):
+    for i in range(30):
+        time.sleep(0.5 * random.random())
+        queue.put(i*i)
+    queue.put('STOP')
+
+def test_queue():
+    q = multiprocessing.Queue()
+
+    p = multiprocessing.Process(target=queue_func, args=(q,))
+    p.start()
+
+    o = None
+    while o != 'STOP':
+        try:
+            o = q.get(timeout=0.3)
+            print o,
+            sys.stdout.flush()
+        except Empty:
+            print 'TIMEOUT'
+
+    print
+
+
+#### TEST_CONDITION
+
+def condition_func(cond):
+    cond.acquire()
+    print '\t' + str(cond)
+    time.sleep(2)
+    print '\tchild is notifying'
+    print '\t' + str(cond)
+    cond.notify()
+    cond.release()
+
+def test_condition():
+    cond = multiprocessing.Condition()
+
+    p = multiprocessing.Process(target=condition_func, args=(cond,))
+    print cond
+
+    cond.acquire()
+    print cond
+    cond.acquire()
+    print cond
+
+    p.start()
+
+    print 'main is waiting'
+    cond.wait()
+    print 'main has woken up'
+
+    print cond
+    cond.release()
+    print cond
+    cond.release()
+
+    p.join()
+    print cond
+
+
+#### TEST_SEMAPHORE
+
+def semaphore_func(sema, mutex, running):
+    sema.acquire()
+
+    mutex.acquire()
+    running.value += 1
+    print running.value, 'tasks are running'
+    mutex.release()
+
+    random.seed()
+    time.sleep(random.random()*2)
+
+    mutex.acquire()
+    running.value -= 1
+    print '%s has finished' % multiprocessing.current_process()
+    mutex.release()
+
+    sema.release()
+
+def test_semaphore():
+    sema = multiprocessing.Semaphore(3)
+    mutex = multiprocessing.RLock()
+    running = multiprocessing.Value('i', 0)
+
+    processes = [
+        multiprocessing.Process(target=semaphore_func,
+                                args=(sema, mutex, running))
+        for i in range(10)
+        ]
+
+    for p in processes:
+        p.start()
+
+    for p in processes:
+        p.join()
+
+
+#### TEST_JOIN_TIMEOUT
+
+def join_timeout_func():
+    print '\tchild sleeping'
+    time.sleep(5.5)
+    print '\n\tchild terminating'
+
+def test_join_timeout():
+    p = multiprocessing.Process(target=join_timeout_func)
+    p.start()
+
+    print 'waiting for process to finish'
+
+    while 1:
+        p.join(timeout=1)
+        if not p.is_alive():
+            break
+        print '.',
+        sys.stdout.flush()
+
+
+#### TEST_EVENT
+
+def event_func(event):
+    print '\t%r is waiting' % multiprocessing.current_process()
+    event.wait()
+    print '\t%r has woken up' % multiprocessing.current_process()
+
+def test_event():
+    event = multiprocessing.Event()
+
+    processes = [multiprocessing.Process(target=event_func, args=(event,))
+                 for i in range(5)]
+
+    for p in processes:
+        p.start()
+
+    print 'main is sleeping'
+    time.sleep(2)
+
+    print 'main is setting event'
+    event.set()
+
+    for p in processes:
+        p.join()
+
+
+#### TEST_SHAREDVALUES
+
+def sharedvalues_func(values, arrays, shared_values, shared_arrays):
+    for i in range(len(values)):
+        v = values[i][1]
+        sv = shared_values[i].value
+        assert v == sv
+
+    for i in range(len(values)):
+        a = arrays[i][1]
+        sa = list(shared_arrays[i][:])
+        assert a == sa
+
+    print 'Tests passed'
+
+def test_sharedvalues():
+    values = [
+        ('i', 10),
+        ('h', -2),
+        ('d', 1.25)
+        ]
+    arrays = [
+        ('i', range(100)),
+        ('d', [0.25 * i for i in range(100)]),
+        ('H', range(1000))
+        ]
+
+    shared_values = [multiprocessing.Value(id, v) for id, v in values]
+    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
+
+    p = multiprocessing.Process(
+        target=sharedvalues_func,
+        args=(values, arrays, shared_values, shared_arrays)
+        )
+    p.start()
+    p.join()
+
+    assert p.get_exitcode() == 0
+
+
+####
+
+def test(namespace=multiprocessing):
+    global multiprocessing
+
+    multiprocessing = namespace
+
+    for func in [ test_value, test_queue, test_condition,
+                  test_semaphore, test_join_timeout, test_event,
+                  test_sharedvalues ]:
+
+        print '\n\t######## %s\n' % func.__name__
+        func()
+
+    ignore = multiprocessing.active_children()      # cleanup any old processes
+    if hasattr(multiprocessing, '_debug_info'):
+        info = multiprocessing._debug_info()
+        if info:
+            print info
+            raise ValueError, 'there should be no positive refcounts left'
+
+
+if __name__ == '__main__':
+    multiprocessing.freeze_support()
+
+    assert len(sys.argv) in (1, 2)
+
+    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
+        print ' Using processes '.center(79, '-')
+        namespace = multiprocessing
+    elif sys.argv[1] == 'manager':
+        print ' Using processes and a manager '.center(79, '-')
+        namespace = multiprocessing.Manager()
+        namespace.Process = multiprocessing.Process
+        namespace.current_process = multiprocessing.current_process
+        namespace.active_children = multiprocessing.active_children
+    elif sys.argv[1] == 'threads':
+        print ' Using threads '.center(79, '-')
+        import multiprocessing.dummy as namespace
+    else:
+        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
+        raise SystemExit, 2
+
+    test(namespace)