| # This file should be kept compatible with both Python 2.6 and Python >= 3.0. |
| |
| from __future__ import division |
| from __future__ import print_function |
| |
| """ |
| ccbench, a Python concurrency benchmark. |
| """ |
| |
| import time |
| import os |
| import sys |
| import itertools |
| import threading |
| import subprocess |
| import socket |
| from optparse import OptionParser, SUPPRESS_HELP |
| import platform |
| |
| # Compatibility |
| try: |
| xrange |
| except NameError: |
| xrange = range |
| |
| try: |
| map = itertools.imap |
| except AttributeError: |
| pass |
| |
| |
| THROUGHPUT_DURATION = 2.0 |
| |
| LATENCY_PING_INTERVAL = 0.1 |
| LATENCY_DURATION = 2.0 |
| |
| BANDWIDTH_PACKET_SIZE = 1024 |
| BANDWIDTH_DURATION = 2.0 |
| |
| |
| def task_pidigits(): |
| """Pi calculation (Python)""" |
| _map = map |
| _count = itertools.count |
| _islice = itertools.islice |
| |
| def calc_ndigits(n): |
| # From http://shootout.alioth.debian.org/ |
| def gen_x(): |
| return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) |
| |
| def compose(a, b): |
| aq, ar, as_, at = a |
| bq, br, bs, bt = b |
| return (aq * bq, |
| aq * br + ar * bt, |
| as_ * bq + at * bs, |
| as_ * br + at * bt) |
| |
| def extract(z, j): |
| q, r, s, t = z |
| return (q*j + r) // (s*j + t) |
| |
| def pi_digits(): |
| z = (1, 0, 0, 1) |
| x = gen_x() |
| while 1: |
| y = extract(z, 3) |
| while y != extract(z, 4): |
| z = compose(z, next(x)) |
| y = extract(z, 3) |
| z = compose((10, -10*y, 0, 1), z) |
| yield y |
| |
| return list(_islice(pi_digits(), n)) |
| |
| return calc_ndigits, (50, ) |
| |
| def task_regex(): |
| """regular expression (C)""" |
| # XXX this task gives horrendous latency results. |
| import re |
| # Taken from the `inspect` module |
| pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE) |
| with open(__file__, "r") as f: |
| arg = f.read(2000) |
| |
| def findall(s): |
| t = time.time() |
| try: |
| return pat.findall(s) |
| finally: |
| print(time.time() - t) |
| return pat.findall, (arg, ) |
| |
| def task_sort(): |
| """list sorting (C)""" |
| def list_sort(l): |
| l = l[::-1] |
| l.sort() |
| |
| return list_sort, (list(range(1000)), ) |
| |
| def task_compress_zlib(): |
| """zlib compression (C)""" |
| import zlib |
| with open(__file__, "rb") as f: |
| arg = f.read(5000) * 3 |
| |
| def compress(s): |
| zlib.decompress(zlib.compress(s, 5)) |
| return compress, (arg, ) |
| |
| def task_compress_bz2(): |
| """bz2 compression (C)""" |
| import bz2 |
| with open(__file__, "rb") as f: |
| arg = f.read(3000) * 2 |
| |
| def compress(s): |
| bz2.compress(s) |
| return compress, (arg, ) |
| |
| def task_hashing(): |
| """SHA1 hashing (C)""" |
| import hashlib |
| with open(__file__, "rb") as f: |
| arg = f.read(5000) * 30 |
| |
| def compute(s): |
| hashlib.sha1(s).digest() |
| return compute, (arg, ) |
| |
| |
| throughput_tasks = [task_pidigits, task_regex] |
| for mod in 'bz2', 'hashlib': |
| try: |
| globals()[mod] = __import__(mod) |
| except ImportError: |
| globals()[mod] = None |
| |
| # For whatever reasons, zlib gives irregular results, so we prefer bz2 or |
| # hashlib if available. |
| # (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards) |
| if bz2 is not None: |
| throughput_tasks.append(task_compress_bz2) |
| elif hashlib is not None: |
| throughput_tasks.append(task_hashing) |
| else: |
| throughput_tasks.append(task_compress_zlib) |
| |
| latency_tasks = throughput_tasks |
| bandwidth_tasks = [task_pidigits] |
| |
| |
| class TimedLoop: |
| def __init__(self, func, args): |
| self.func = func |
| self.args = args |
| |
| def __call__(self, start_time, min_duration, end_event, do_yield=False): |
| step = 20 |
| niters = 0 |
| duration = 0.0 |
| _time = time.time |
| _sleep = time.sleep |
| _func = self.func |
| _args = self.args |
| t1 = start_time |
| while True: |
| for i in range(step): |
| _func(*_args) |
| t2 = _time() |
| # If another thread terminated, the current measurement is invalid |
| # => return the previous one. |
| if end_event: |
| return niters, duration |
| niters += step |
| duration = t2 - start_time |
| if duration >= min_duration: |
| end_event.append(None) |
| return niters, duration |
| if t2 - t1 < 0.01: |
| # Minimize interference of measurement on overall runtime |
| step = step * 3 // 2 |
| elif do_yield: |
| # OS scheduling of Python threads is sometimes so bad that we |
| # have to force thread switching ourselves, otherwise we get |
| # completely useless results. |
| _sleep(0.0001) |
| t1 = t2 |
| |
| |
| def run_throughput_test(func, args, nthreads): |
| assert nthreads >= 1 |
| |
| # Warm up |
| func(*args) |
| |
| results = [] |
| loop = TimedLoop(func, args) |
| end_event = [] |
| |
| if nthreads == 1: |
| # Pure single-threaded performance, without any switching or |
| # synchronization overhead. |
| start_time = time.time() |
| results.append(loop(start_time, THROUGHPUT_DURATION, |
| end_event, do_yield=False)) |
| return results |
| |
| started = False |
| ready_cond = threading.Condition() |
| start_cond = threading.Condition() |
| ready = [] |
| |
| def run(): |
| with ready_cond: |
| ready.append(None) |
| ready_cond.notify() |
| with start_cond: |
| while not started: |
| start_cond.wait() |
| results.append(loop(start_time, THROUGHPUT_DURATION, |
| end_event, do_yield=True)) |
| |
| threads = [] |
| for i in range(nthreads): |
| threads.append(threading.Thread(target=run)) |
| for t in threads: |
| t.setDaemon(True) |
| t.start() |
| # We don't want measurements to include thread startup overhead, |
| # so we arrange for timing to start after all threads are ready. |
| with ready_cond: |
| while len(ready) < nthreads: |
| ready_cond.wait() |
| with start_cond: |
| start_time = time.time() |
| started = True |
| start_cond.notify(nthreads) |
| for t in threads: |
| t.join() |
| |
| return results |
| |
| def run_throughput_tests(max_threads): |
| for task in throughput_tasks: |
| print(task.__doc__) |
| print() |
| func, args = task() |
| nthreads = 1 |
| baseline_speed = None |
| while nthreads <= max_threads: |
| results = run_throughput_test(func, args, nthreads) |
| # Taking the max duration rather than average gives pessimistic |
| # results rather than optimistic. |
| speed = sum(r[0] for r in results) / max(r[1] for r in results) |
| print("threads=%d: %d" % (nthreads, speed), end="") |
| if baseline_speed is None: |
| print(" iterations/s.") |
| baseline_speed = speed |
| else: |
| print(" ( %d %%)" % (speed / baseline_speed * 100)) |
| nthreads += 1 |
| print() |
| |
| |
| LAT_END = "END" |
| |
| def _sendto(sock, s, addr): |
| sock.sendto(s.encode('ascii'), addr) |
| |
| def _recv(sock, n): |
| return sock.recv(n).decode('ascii') |
| |
| def latency_client(addr, nb_pings, interval): |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| try: |
| _time = time.time |
| _sleep = time.sleep |
| def _ping(): |
| _sendto(sock, "%r\n" % _time(), addr) |
| # The first ping signals the parent process that we are ready. |
| _ping() |
| # We give the parent a bit of time to notice. |
| _sleep(1.0) |
| for i in range(nb_pings): |
| _sleep(interval) |
| _ping() |
| _sendto(sock, LAT_END + "\n", addr) |
| finally: |
| sock.close() |
| |
| def run_latency_client(**kwargs): |
| cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] |
| cmd_line.extend(['--latclient', repr(kwargs)]) |
| return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, |
| #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| |
| def run_latency_test(func, args, nthreads): |
| # Create a listening socket to receive the pings. We use UDP which should |
| # be painlessly cross-platform. |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| sock.bind(("127.0.0.1", 0)) |
| addr = sock.getsockname() |
| |
| interval = LATENCY_PING_INTERVAL |
| duration = LATENCY_DURATION |
| nb_pings = int(duration / interval) |
| |
| results = [] |
| threads = [] |
| end_event = [] |
| start_cond = threading.Condition() |
| started = False |
| if nthreads > 0: |
| # Warm up |
| func(*args) |
| |
| results = [] |
| loop = TimedLoop(func, args) |
| ready = [] |
| ready_cond = threading.Condition() |
| |
| def run(): |
| with ready_cond: |
| ready.append(None) |
| ready_cond.notify() |
| with start_cond: |
| while not started: |
| start_cond.wait() |
| loop(start_time, duration * 1.5, end_event, do_yield=False) |
| |
| for i in range(nthreads): |
| threads.append(threading.Thread(target=run)) |
| for t in threads: |
| t.setDaemon(True) |
| t.start() |
| # Wait for threads to be ready |
| with ready_cond: |
| while len(ready) < nthreads: |
| ready_cond.wait() |
| |
| # Run the client and wait for the first ping(s) to arrive before |
| # unblocking the background threads. |
| chunks = [] |
| process = run_latency_client(addr=sock.getsockname(), |
| nb_pings=nb_pings, interval=interval) |
| s = _recv(sock, 4096) |
| _time = time.time |
| |
| with start_cond: |
| start_time = _time() |
| started = True |
| start_cond.notify(nthreads) |
| |
| while LAT_END not in s: |
| s = _recv(sock, 4096) |
| t = _time() |
| chunks.append((t, s)) |
| |
| # Tell the background threads to stop. |
| end_event.append(None) |
| for t in threads: |
| t.join() |
| process.wait() |
| sock.close() |
| |
| for recv_time, chunk in chunks: |
| # NOTE: it is assumed that a line sent by a client wasn't received |
| # in two chunks because the lines are very small. |
| for line in chunk.splitlines(): |
| line = line.strip() |
| if line and line != LAT_END: |
| send_time = eval(line) |
| assert isinstance(send_time, float) |
| results.append((send_time, recv_time)) |
| |
| return results |
| |
| def run_latency_tests(max_threads): |
| for task in latency_tasks: |
| print("Background CPU task:", task.__doc__) |
| print() |
| func, args = task() |
| nthreads = 0 |
| while nthreads <= max_threads: |
| results = run_latency_test(func, args, nthreads) |
| n = len(results) |
| # We print out milliseconds |
| lats = [1000 * (t2 - t1) for (t1, t2) in results] |
| #print(list(map(int, lats))) |
| avg = sum(lats) / n |
| dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 |
| print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") |
| print() |
| #print(" [... from %d samples]" % n) |
| nthreads += 1 |
| print() |
| |
| |
| BW_END = "END" |
| |
| def bandwidth_client(addr, packet_size, duration): |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| sock.bind(("127.0.0.1", 0)) |
| local_addr = sock.getsockname() |
| _time = time.time |
| _sleep = time.sleep |
| def _send_chunk(msg): |
| _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr) |
| # We give the parent some time to be ready. |
| _sleep(1.0) |
| try: |
| start_time = _time() |
| end_time = start_time + duration * 2.0 |
| i = 0 |
| while _time() < end_time: |
| _send_chunk(str(i)) |
| s = _recv(sock, packet_size) |
| assert len(s) == packet_size |
| i += 1 |
| _send_chunk(BW_END) |
| finally: |
| sock.close() |
| |
| def run_bandwidth_client(**kwargs): |
| cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] |
| cmd_line.extend(['--bwclient', repr(kwargs)]) |
| return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, |
| #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| |
| def run_bandwidth_test(func, args, nthreads): |
| # Create a listening socket to receive the packets. We use UDP which should |
| # be painlessly cross-platform. |
| with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: |
| sock.bind(("127.0.0.1", 0)) |
| addr = sock.getsockname() |
| |
| duration = BANDWIDTH_DURATION |
| packet_size = BANDWIDTH_PACKET_SIZE |
| |
| results = [] |
| threads = [] |
| end_event = [] |
| start_cond = threading.Condition() |
| started = False |
| if nthreads > 0: |
| # Warm up |
| func(*args) |
| |
| results = [] |
| loop = TimedLoop(func, args) |
| ready = [] |
| ready_cond = threading.Condition() |
| |
| def run(): |
| with ready_cond: |
| ready.append(None) |
| ready_cond.notify() |
| with start_cond: |
| while not started: |
| start_cond.wait() |
| loop(start_time, duration * 1.5, end_event, do_yield=False) |
| |
| for i in range(nthreads): |
| threads.append(threading.Thread(target=run)) |
| for t in threads: |
| t.setDaemon(True) |
| t.start() |
| # Wait for threads to be ready |
| with ready_cond: |
| while len(ready) < nthreads: |
| ready_cond.wait() |
| |
| # Run the client and wait for the first packet to arrive before |
| # unblocking the background threads. |
| process = run_bandwidth_client(addr=addr, |
| packet_size=packet_size, |
| duration=duration) |
| _time = time.time |
| # This will also wait for the parent to be ready |
| s = _recv(sock, packet_size) |
| remote_addr = eval(s.partition('#')[0]) |
| |
| with start_cond: |
| start_time = _time() |
| started = True |
| start_cond.notify(nthreads) |
| |
| n = 0 |
| first_time = None |
| while not end_event and BW_END not in s: |
| _sendto(sock, s, remote_addr) |
| s = _recv(sock, packet_size) |
| if first_time is None: |
| first_time = _time() |
| n += 1 |
| end_time = _time() |
| |
| end_event.append(None) |
| for t in threads: |
| t.join() |
| process.kill() |
| |
| return (n - 1) / (end_time - first_time) |
| |
| def run_bandwidth_tests(max_threads): |
| for task in bandwidth_tasks: |
| print("Background CPU task:", task.__doc__) |
| print() |
| func, args = task() |
| nthreads = 0 |
| baseline_speed = None |
| while nthreads <= max_threads: |
| results = run_bandwidth_test(func, args, nthreads) |
| speed = results |
| #speed = len(results) * 1.0 / results[-1][0] |
| print("CPU threads=%d: %.1f" % (nthreads, speed), end="") |
| if baseline_speed is None: |
| print(" packets/s.") |
| baseline_speed = speed |
| else: |
| print(" ( %d %%)" % (speed / baseline_speed * 100)) |
| nthreads += 1 |
| print() |
| |
| |
| def main(): |
| usage = "usage: %prog [-h|--help] [options]" |
| parser = OptionParser(usage=usage) |
| parser.add_option("-t", "--throughput", |
| action="store_true", dest="throughput", default=False, |
| help="run throughput tests") |
| parser.add_option("-l", "--latency", |
| action="store_true", dest="latency", default=False, |
| help="run latency tests") |
| parser.add_option("-b", "--bandwidth", |
| action="store_true", dest="bandwidth", default=False, |
| help="run I/O bandwidth tests") |
| parser.add_option("-i", "--interval", |
| action="store", type="int", dest="check_interval", default=None, |
| help="sys.setcheckinterval() value") |
| parser.add_option("-I", "--switch-interval", |
| action="store", type="float", dest="switch_interval", default=None, |
| help="sys.setswitchinterval() value") |
| parser.add_option("-n", "--num-threads", |
| action="store", type="int", dest="nthreads", default=4, |
| help="max number of threads in tests") |
| |
| # Hidden option to run the pinging and bandwidth clients |
| parser.add_option("", "--latclient", |
| action="store", dest="latclient", default=None, |
| help=SUPPRESS_HELP) |
| parser.add_option("", "--bwclient", |
| action="store", dest="bwclient", default=None, |
| help=SUPPRESS_HELP) |
| |
| options, args = parser.parse_args() |
| if args: |
| parser.error("unexpected arguments") |
| |
| if options.latclient: |
| kwargs = eval(options.latclient) |
| latency_client(**kwargs) |
| return |
| |
| if options.bwclient: |
| kwargs = eval(options.bwclient) |
| bandwidth_client(**kwargs) |
| return |
| |
| if not options.throughput and not options.latency and not options.bandwidth: |
| options.throughput = options.latency = options.bandwidth = True |
| if options.check_interval: |
| sys.setcheckinterval(options.check_interval) |
| if options.switch_interval: |
| sys.setswitchinterval(options.switch_interval) |
| |
| print("== %s %s (%s) ==" % ( |
| platform.python_implementation(), |
| platform.python_version(), |
| platform.python_build()[0], |
| )) |
| # Processor identification often has repeated spaces |
| cpu = ' '.join(platform.processor().split()) |
| print("== %s %s on '%s' ==" % ( |
| platform.machine(), |
| platform.system(), |
| cpu, |
| )) |
| print() |
| |
| if options.throughput: |
| print("--- Throughput ---") |
| print() |
| run_throughput_tests(options.nthreads) |
| |
| if options.latency: |
| print("--- Latency ---") |
| print() |
| run_latency_tests(options.nthreads) |
| |
| if options.bandwidth: |
| print("--- I/O bandwidth ---") |
| print() |
| run_bandwidth_tests(options.nthreads) |
| |
| if __name__ == "__main__": |
| main() |