blob: 3763ea94484e19d7f977863ba940808a4e44a5f7 [file] [log] [blame]
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time
import multiprocessing
import threading
import queue
import gc
_timer = time.perf_counter
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in range(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print(iterations, 'objects passed through the queue in', elapsed, 'seconds')
print('average number/sec:', iterations/elapsed)
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in range(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print(iterations, 'objects passed through connection in',elapsed,'seconds')
print('average number/sec:', iterations/elapsed)
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in range(iterations):
a = seq[5]
elapsed = _timer() - t
print(iterations, 'iterations in', elapsed, 'seconds')
print('average number/sec:', iterations/elapsed)
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in range(iterations):
l.acquire()
l.release()
elapsed = _timer() - t
print(iterations, 'iterations in', elapsed, 'seconds')
print('average number/sec:', iterations/elapsed)
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in range(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in range(iterations):
c.notify()
c.wait()
elapsed = _timer() - t
c.release()
p.join()
print(iterations * 2, 'waits in', elapsed, 'seconds')
print('average number/sec:', iterations * 2 / elapsed)
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print('\n\t######## testing Queue.Queue\n')
test_queuespeed(threading.Thread, queue.Queue(),
threading.Condition())
print('\n\t######## testing multiprocessing.Queue\n')
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print('\n\t######## testing Queue managed by server process\n')
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print('\n\t######## testing multiprocessing.Pipe\n')
test_pipespeed()
print()
print('\n\t######## testing list\n')
test_seqspeed(list(range(10)))
print('\n\t######## testing list managed by server process\n')
test_seqspeed(manager.list(list(range(10))))
print('\n\t######## testing Array("i", ..., lock=False)\n')
test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=False))
print('\n\t######## testing Array("i", ..., lock=True)\n')
test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=True))
print()
print('\n\t######## testing threading.Lock\n')
test_lockspeed(threading.Lock())
print('\n\t######## testing threading.RLock\n')
test_lockspeed(threading.RLock())
print('\n\t######## testing multiprocessing.Lock\n')
test_lockspeed(multiprocessing.Lock())
print('\n\t######## testing multiprocessing.RLock\n')
test_lockspeed(multiprocessing.RLock())
print('\n\t######## testing lock managed by server process\n')
test_lockspeed(manager.Lock())
print('\n\t######## testing rlock managed by server process\n')
test_lockspeed(manager.RLock())
print()
print('\n\t######## testing threading.Condition\n')
test_conditionspeed(threading.Thread, threading.Condition())
print('\n\t######## testing multiprocessing.Condition\n')
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print('\n\t######## testing condition managed by a server process\n')
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()