blob: 9f7118f8e9704b97855a7db636094bb59600e40c [file] [log] [blame]
Antoine Pitroua69ba652010-01-18 21:20:53 +00001# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3from __future__ import division
4from __future__ import print_function
5
6"""
7ccbench, a Python concurrency benchmark.
8"""
9
10import time
11import os
12import sys
13import functools
14import itertools
15import threading
16import subprocess
17import socket
18from optparse import OptionParser, SUPPRESS_HELP
19import platform
20
21# Compatibility
22try:
23 xrange
24except NameError:
25 xrange = range
26
27try:
28 map = itertools.imap
29except AttributeError:
30 pass
31
32
33THROUGHPUT_DURATION = 2.0
34
35LATENCY_PING_INTERVAL = 0.1
36LATENCY_DURATION = 2.0
37
Antoine Pitrou7ad96a52010-03-13 21:27:21 +000038BANDWIDTH_PACKET_SIZE = 1024
39BANDWIDTH_DURATION = 2.0
40
Antoine Pitroua69ba652010-01-18 21:20:53 +000041
42def task_pidigits():
43 """Pi calculation (Python)"""
44 _map = map
45 _count = itertools.count
46 _islice = itertools.islice
47
48 def calc_ndigits(n):
49 # From http://shootout.alioth.debian.org/
50 def gen_x():
51 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
52
53 def compose(a, b):
54 aq, ar, as_, at = a
55 bq, br, bs, bt = b
56 return (aq * bq,
57 aq * br + ar * bt,
58 as_ * bq + at * bs,
59 as_ * br + at * bt)
60
61 def extract(z, j):
62 q, r, s, t = z
63 return (q*j + r) // (s*j + t)
64
65 def pi_digits():
66 z = (1, 0, 0, 1)
67 x = gen_x()
68 while 1:
69 y = extract(z, 3)
70 while y != extract(z, 4):
71 z = compose(z, next(x))
72 y = extract(z, 3)
73 z = compose((10, -10*y, 0, 1), z)
74 yield y
75
76 return list(_islice(pi_digits(), n))
77
78 return calc_ndigits, (50, )
79
80def task_regex():
81 """regular expression (C)"""
82 # XXX this task gives horrendous latency results.
83 import re
84 # Taken from the `inspect` module
85 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
86 with open(__file__, "r") as f:
87 arg = f.read(2000)
88
89 def findall(s):
90 t = time.time()
91 try:
92 return pat.findall(s)
93 finally:
94 print(time.time() - t)
95 return pat.findall, (arg, )
96
97def task_sort():
98 """list sorting (C)"""
99 def list_sort(l):
100 l = l[::-1]
101 l.sort()
102
103 return list_sort, (list(range(1000)), )
104
105def task_compress_zlib():
106 """zlib compression (C)"""
107 import zlib
108 with open(__file__, "rb") as f:
109 arg = f.read(5000) * 3
110
111 def compress(s):
112 zlib.decompress(zlib.compress(s, 5))
113 return compress, (arg, )
114
115def task_compress_bz2():
116 """bz2 compression (C)"""
117 import bz2
118 with open(__file__, "rb") as f:
119 arg = f.read(3000) * 2
120
121 def compress(s):
122 bz2.compress(s)
123 return compress, (arg, )
124
125def task_hashing():
126 """SHA1 hashing (C)"""
127 import hashlib
128 with open(__file__, "rb") as f:
129 arg = f.read(5000) * 30
130
131 def compute(s):
132 hashlib.sha1(s).digest()
133 return compute, (arg, )
134
135
136throughput_tasks = [task_pidigits, task_regex]
137for mod in 'bz2', 'hashlib':
138 try:
139 globals()[mod] = __import__(mod)
140 except ImportError:
141 globals()[mod] = None
142
143# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
144# hashlib if available.
145# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
146if bz2 is not None:
147 throughput_tasks.append(task_compress_bz2)
148elif hashlib is not None:
149 throughput_tasks.append(task_hashing)
150else:
151 throughput_tasks.append(task_compress_zlib)
152
153latency_tasks = throughput_tasks
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000154bandwidth_tasks = [task_pidigits]
Antoine Pitroua69ba652010-01-18 21:20:53 +0000155
156
157class TimedLoop:
158 def __init__(self, func, args):
159 self.func = func
160 self.args = args
161
162 def __call__(self, start_time, min_duration, end_event, do_yield=False):
163 step = 20
164 niters = 0
165 duration = 0.0
166 _time = time.time
167 _sleep = time.sleep
168 _func = self.func
169 _args = self.args
170 t1 = start_time
171 while True:
172 for i in range(step):
173 _func(*_args)
174 t2 = _time()
175 # If another thread terminated, the current measurement is invalid
176 # => return the previous one.
177 if end_event:
178 return niters, duration
179 niters += step
180 duration = t2 - start_time
181 if duration >= min_duration:
182 end_event.append(None)
183 return niters, duration
184 if t2 - t1 < 0.01:
185 # Minimize interference of measurement on overall runtime
186 step = step * 3 // 2
187 elif do_yield:
188 # OS scheduling of Python threads is sometimes so bad that we
189 # have to force thread switching ourselves, otherwise we get
190 # completely useless results.
191 _sleep(0.0001)
192 t1 = t2
193
194
195def run_throughput_test(func, args, nthreads):
196 assert nthreads >= 1
197
198 # Warm up
199 func(*args)
200
201 results = []
202 loop = TimedLoop(func, args)
203 end_event = []
204
205 if nthreads == 1:
206 # Pure single-threaded performance, without any switching or
207 # synchronization overhead.
208 start_time = time.time()
209 results.append(loop(start_time, THROUGHPUT_DURATION,
210 end_event, do_yield=False))
211 return results
212
213 started = False
214 ready_cond = threading.Condition()
215 start_cond = threading.Condition()
216 ready = []
217
218 def run():
219 with ready_cond:
220 ready.append(None)
221 ready_cond.notify()
222 with start_cond:
223 while not started:
224 start_cond.wait()
225 results.append(loop(start_time, THROUGHPUT_DURATION,
226 end_event, do_yield=True))
227
228 threads = []
229 for i in range(nthreads):
230 threads.append(threading.Thread(target=run))
231 for t in threads:
232 t.setDaemon(True)
233 t.start()
234 # We don't want measurements to include thread startup overhead,
235 # so we arrange for timing to start after all threads are ready.
236 with ready_cond:
237 while len(ready) < nthreads:
238 ready_cond.wait()
239 with start_cond:
240 start_time = time.time()
241 started = True
242 start_cond.notify(nthreads)
243 for t in threads:
244 t.join()
245
246 return results
247
248def run_throughput_tests(max_threads):
249 for task in throughput_tasks:
250 print(task.__doc__)
251 print()
252 func, args = task()
253 nthreads = 1
254 baseline_speed = None
255 while nthreads <= max_threads:
256 results = run_throughput_test(func, args, nthreads)
257 # Taking the max duration rather than average gives pessimistic
258 # results rather than optimistic.
259 speed = sum(r[0] for r in results) / max(r[1] for r in results)
260 print("threads=%d: %d" % (nthreads, speed), end="")
261 if baseline_speed is None:
262 print(" iterations/s.")
263 baseline_speed = speed
264 else:
265 print(" ( %d %%)" % (speed / baseline_speed * 100))
266 nthreads += 1
267 print()
268
269
270LAT_END = "END"
271
272def _sendto(sock, s, addr):
273 sock.sendto(s.encode('ascii'), addr)
274
275def _recv(sock, n):
276 return sock.recv(n).decode('ascii')
277
278def latency_client(addr, nb_pings, interval):
Antoine Pitrou375ff582011-03-11 20:57:11 +0100279 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
280 try:
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000281 _time = time.time
282 _sleep = time.sleep
283 def _ping():
284 _sendto(sock, "%r\n" % _time(), addr)
285 # The first ping signals the parent process that we are ready.
Antoine Pitroua69ba652010-01-18 21:20:53 +0000286 _ping()
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000287 # We give the parent a bit of time to notice.
288 _sleep(1.0)
289 for i in range(nb_pings):
290 _sleep(interval)
291 _ping()
292 _sendto(sock, LAT_END + "\n", addr)
Antoine Pitrou375ff582011-03-11 20:57:11 +0100293 finally:
294 sock.close()
Antoine Pitroua69ba652010-01-18 21:20:53 +0000295
296def run_latency_client(**kwargs):
297 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
298 cmd_line.extend(['--latclient', repr(kwargs)])
299 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
300 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
301
302def run_latency_test(func, args, nthreads):
303 # Create a listening socket to receive the pings. We use UDP which should
304 # be painlessly cross-platform.
305 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
306 sock.bind(("127.0.0.1", 0))
307 addr = sock.getsockname()
308
309 interval = LATENCY_PING_INTERVAL
310 duration = LATENCY_DURATION
311 nb_pings = int(duration / interval)
312
313 results = []
314 threads = []
315 end_event = []
316 start_cond = threading.Condition()
317 started = False
318 if nthreads > 0:
319 # Warm up
320 func(*args)
321
322 results = []
323 loop = TimedLoop(func, args)
324 ready = []
325 ready_cond = threading.Condition()
326
327 def run():
328 with ready_cond:
329 ready.append(None)
330 ready_cond.notify()
331 with start_cond:
332 while not started:
333 start_cond.wait()
334 loop(start_time, duration * 1.5, end_event, do_yield=False)
335
336 for i in range(nthreads):
337 threads.append(threading.Thread(target=run))
338 for t in threads:
339 t.setDaemon(True)
340 t.start()
341 # Wait for threads to be ready
342 with ready_cond:
343 while len(ready) < nthreads:
344 ready_cond.wait()
345
346 # Run the client and wait for the first ping(s) to arrive before
347 # unblocking the background threads.
348 chunks = []
349 process = run_latency_client(addr=sock.getsockname(),
350 nb_pings=nb_pings, interval=interval)
351 s = _recv(sock, 4096)
352 _time = time.time
353
354 with start_cond:
355 start_time = _time()
356 started = True
357 start_cond.notify(nthreads)
358
359 while LAT_END not in s:
360 s = _recv(sock, 4096)
361 t = _time()
362 chunks.append((t, s))
363
364 # Tell the background threads to stop.
365 end_event.append(None)
366 for t in threads:
367 t.join()
368 process.wait()
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000369 sock.close()
Antoine Pitroua69ba652010-01-18 21:20:53 +0000370
371 for recv_time, chunk in chunks:
372 # NOTE: it is assumed that a line sent by a client wasn't received
373 # in two chunks because the lines are very small.
374 for line in chunk.splitlines():
375 line = line.strip()
376 if line and line != LAT_END:
377 send_time = eval(line)
378 assert isinstance(send_time, float)
379 results.append((send_time, recv_time))
380
381 return results
382
383def run_latency_tests(max_threads):
384 for task in latency_tasks:
385 print("Background CPU task:", task.__doc__)
386 print()
387 func, args = task()
388 nthreads = 0
389 while nthreads <= max_threads:
390 results = run_latency_test(func, args, nthreads)
391 n = len(results)
392 # We print out milliseconds
393 lats = [1000 * (t2 - t1) for (t1, t2) in results]
394 #print(list(map(int, lats)))
395 avg = sum(lats) / n
396 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
397 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
398 print()
399 #print(" [... from %d samples]" % n)
400 nthreads += 1
401 print()
402
403
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000404BW_END = "END"
405
406def bandwidth_client(addr, packet_size, duration):
407 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
408 sock.bind(("127.0.0.1", 0))
409 local_addr = sock.getsockname()
410 _time = time.time
411 _sleep = time.sleep
412 def _send_chunk(msg):
413 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
414 # We give the parent some time to be ready.
415 _sleep(1.0)
416 try:
417 start_time = _time()
418 end_time = start_time + duration * 2.0
419 i = 0
420 while _time() < end_time:
421 _send_chunk(str(i))
422 s = _recv(sock, packet_size)
423 assert len(s) == packet_size
424 i += 1
425 _send_chunk(BW_END)
426 finally:
427 sock.close()
428
429def run_bandwidth_client(**kwargs):
430 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
431 cmd_line.extend(['--bwclient', repr(kwargs)])
432 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
433 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
434
435def run_bandwidth_test(func, args, nthreads):
436 # Create a listening socket to receive the packets. We use UDP which should
437 # be painlessly cross-platform.
438 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
439 sock.bind(("127.0.0.1", 0))
440 addr = sock.getsockname()
441
442 duration = BANDWIDTH_DURATION
443 packet_size = BANDWIDTH_PACKET_SIZE
444
445 results = []
446 threads = []
447 end_event = []
448 start_cond = threading.Condition()
449 started = False
450 if nthreads > 0:
451 # Warm up
452 func(*args)
453
454 results = []
455 loop = TimedLoop(func, args)
456 ready = []
457 ready_cond = threading.Condition()
458
459 def run():
460 with ready_cond:
461 ready.append(None)
462 ready_cond.notify()
463 with start_cond:
464 while not started:
465 start_cond.wait()
466 loop(start_time, duration * 1.5, end_event, do_yield=False)
467
468 for i in range(nthreads):
469 threads.append(threading.Thread(target=run))
470 for t in threads:
471 t.setDaemon(True)
472 t.start()
473 # Wait for threads to be ready
474 with ready_cond:
475 while len(ready) < nthreads:
476 ready_cond.wait()
477
478 # Run the client and wait for the first packet to arrive before
479 # unblocking the background threads.
480 process = run_bandwidth_client(addr=addr,
481 packet_size=packet_size,
482 duration=duration)
483 _time = time.time
484 # This will also wait for the parent to be ready
485 s = _recv(sock, packet_size)
486 remote_addr = eval(s.partition('#')[0])
487
488 with start_cond:
489 start_time = _time()
490 started = True
491 start_cond.notify(nthreads)
492
493 n = 0
494 first_time = None
495 while not end_event and BW_END not in s:
496 _sendto(sock, s, remote_addr)
497 s = _recv(sock, packet_size)
498 if first_time is None:
499 first_time = _time()
500 n += 1
501 end_time = _time()
502
503 end_event.append(None)
504 for t in threads:
505 t.join()
506 process.kill()
507
508 return (n - 1) / (end_time - first_time)
509
510def run_bandwidth_tests(max_threads):
511 for task in bandwidth_tasks:
512 print("Background CPU task:", task.__doc__)
513 print()
514 func, args = task()
515 nthreads = 0
516 baseline_speed = None
517 while nthreads <= max_threads:
518 results = run_bandwidth_test(func, args, nthreads)
519 speed = results
520 #speed = len(results) * 1.0 / results[-1][0]
521 print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
522 if baseline_speed is None:
523 print(" packets/s.")
524 baseline_speed = speed
525 else:
526 print(" ( %d %%)" % (speed / baseline_speed * 100))
527 nthreads += 1
528 print()
529
530
Antoine Pitroua69ba652010-01-18 21:20:53 +0000531def main():
532 usage = "usage: %prog [-h|--help] [options]"
533 parser = OptionParser(usage=usage)
534 parser.add_option("-t", "--throughput",
535 action="store_true", dest="throughput", default=False,
536 help="run throughput tests")
537 parser.add_option("-l", "--latency",
538 action="store_true", dest="latency", default=False,
539 help="run latency tests")
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000540 parser.add_option("-b", "--bandwidth",
541 action="store_true", dest="bandwidth", default=False,
542 help="run I/O bandwidth tests")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000543 parser.add_option("-i", "--interval",
544 action="store", type="int", dest="check_interval", default=None,
545 help="sys.setcheckinterval() value")
546 parser.add_option("-I", "--switch-interval",
547 action="store", type="float", dest="switch_interval", default=None,
548 help="sys.setswitchinterval() value")
549 parser.add_option("-n", "--num-threads",
550 action="store", type="int", dest="nthreads", default=4,
551 help="max number of threads in tests")
552
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000553 # Hidden option to run the pinging and bandwidth clients
Antoine Pitroua69ba652010-01-18 21:20:53 +0000554 parser.add_option("", "--latclient",
555 action="store", dest="latclient", default=None,
556 help=SUPPRESS_HELP)
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000557 parser.add_option("", "--bwclient",
558 action="store", dest="bwclient", default=None,
559 help=SUPPRESS_HELP)
Antoine Pitroua69ba652010-01-18 21:20:53 +0000560
561 options, args = parser.parse_args()
562 if args:
563 parser.error("unexpected arguments")
564
565 if options.latclient:
566 kwargs = eval(options.latclient)
567 latency_client(**kwargs)
568 return
569
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000570 if options.bwclient:
571 kwargs = eval(options.bwclient)
572 bandwidth_client(**kwargs)
573 return
574
575 if not options.throughput and not options.latency and not options.bandwidth:
576 options.throughput = options.latency = options.bandwidth = True
Antoine Pitroua69ba652010-01-18 21:20:53 +0000577 if options.check_interval:
578 sys.setcheckinterval(options.check_interval)
579 if options.switch_interval:
580 sys.setswitchinterval(options.switch_interval)
581
582 print("== %s %s (%s) ==" % (
583 platform.python_implementation(),
584 platform.python_version(),
585 platform.python_build()[0],
586 ))
587 # Processor identification often has repeated spaces
588 cpu = ' '.join(platform.processor().split())
589 print("== %s %s on '%s' ==" % (
590 platform.machine(),
591 platform.system(),
592 cpu,
593 ))
594 print()
595
596 if options.throughput:
597 print("--- Throughput ---")
598 print()
599 run_throughput_tests(options.nthreads)
600
601 if options.latency:
602 print("--- Latency ---")
603 print()
604 run_latency_tests(options.nthreads)
605
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000606 if options.bandwidth:
607 print("--- I/O bandwidth ---")
608 print()
609 run_bandwidth_tests(options.nthreads)
610
Antoine Pitroua69ba652010-01-18 21:20:53 +0000611if __name__ == "__main__":
612 main()