blob: 569ec217e4286880e527aa68e8fe33af12a98f61 [file] [log] [blame]
Antoine Pitroua69ba652010-01-18 21:20:53 +00001# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3from __future__ import division
4from __future__ import print_function
5
6"""
7ccbench, a Python concurrency benchmark.
8"""
9
10import time
11import os
12import sys
13import functools
14import itertools
15import threading
16import subprocess
17import socket
18from optparse import OptionParser, SUPPRESS_HELP
19import platform
20
21# Compatibility
22try:
23 xrange
24except NameError:
25 xrange = range
26
27try:
28 map = itertools.imap
29except AttributeError:
30 pass
31
32
33THROUGHPUT_DURATION = 2.0
34
35LATENCY_PING_INTERVAL = 0.1
36LATENCY_DURATION = 2.0
37
Antoine Pitrou7ad96a52010-03-13 21:27:21 +000038BANDWIDTH_PACKET_SIZE = 1024
39BANDWIDTH_DURATION = 2.0
40
Antoine Pitroua69ba652010-01-18 21:20:53 +000041
42def task_pidigits():
43 """Pi calculation (Python)"""
44 _map = map
45 _count = itertools.count
46 _islice = itertools.islice
47
48 def calc_ndigits(n):
49 # From http://shootout.alioth.debian.org/
50 def gen_x():
51 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
52
53 def compose(a, b):
54 aq, ar, as_, at = a
55 bq, br, bs, bt = b
56 return (aq * bq,
57 aq * br + ar * bt,
58 as_ * bq + at * bs,
59 as_ * br + at * bt)
60
61 def extract(z, j):
62 q, r, s, t = z
63 return (q*j + r) // (s*j + t)
64
65 def pi_digits():
66 z = (1, 0, 0, 1)
67 x = gen_x()
68 while 1:
69 y = extract(z, 3)
70 while y != extract(z, 4):
71 z = compose(z, next(x))
72 y = extract(z, 3)
73 z = compose((10, -10*y, 0, 1), z)
74 yield y
75
76 return list(_islice(pi_digits(), n))
77
78 return calc_ndigits, (50, )
79
80def task_regex():
81 """regular expression (C)"""
82 # XXX this task gives horrendous latency results.
83 import re
84 # Taken from the `inspect` module
85 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
86 with open(__file__, "r") as f:
87 arg = f.read(2000)
88
89 def findall(s):
90 t = time.time()
91 try:
92 return pat.findall(s)
93 finally:
94 print(time.time() - t)
95 return pat.findall, (arg, )
96
97def task_sort():
98 """list sorting (C)"""
99 def list_sort(l):
100 l = l[::-1]
101 l.sort()
102
103 return list_sort, (list(range(1000)), )
104
105def task_compress_zlib():
106 """zlib compression (C)"""
107 import zlib
108 with open(__file__, "rb") as f:
109 arg = f.read(5000) * 3
110
111 def compress(s):
112 zlib.decompress(zlib.compress(s, 5))
113 return compress, (arg, )
114
115def task_compress_bz2():
116 """bz2 compression (C)"""
117 import bz2
118 with open(__file__, "rb") as f:
119 arg = f.read(3000) * 2
120
121 def compress(s):
122 bz2.compress(s)
123 return compress, (arg, )
124
125def task_hashing():
126 """SHA1 hashing (C)"""
127 import hashlib
128 with open(__file__, "rb") as f:
129 arg = f.read(5000) * 30
130
131 def compute(s):
132 hashlib.sha1(s).digest()
133 return compute, (arg, )
134
135
136throughput_tasks = [task_pidigits, task_regex]
137for mod in 'bz2', 'hashlib':
138 try:
139 globals()[mod] = __import__(mod)
140 except ImportError:
141 globals()[mod] = None
142
143# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
144# hashlib if available.
145# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
146if bz2 is not None:
147 throughput_tasks.append(task_compress_bz2)
148elif hashlib is not None:
149 throughput_tasks.append(task_hashing)
150else:
151 throughput_tasks.append(task_compress_zlib)
152
153latency_tasks = throughput_tasks
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000154bandwidth_tasks = [task_pidigits]
Antoine Pitroua69ba652010-01-18 21:20:53 +0000155
156
157class TimedLoop:
158 def __init__(self, func, args):
159 self.func = func
160 self.args = args
161
162 def __call__(self, start_time, min_duration, end_event, do_yield=False):
163 step = 20
164 niters = 0
165 duration = 0.0
166 _time = time.time
167 _sleep = time.sleep
168 _func = self.func
169 _args = self.args
170 t1 = start_time
171 while True:
172 for i in range(step):
173 _func(*_args)
174 t2 = _time()
175 # If another thread terminated, the current measurement is invalid
176 # => return the previous one.
177 if end_event:
178 return niters, duration
179 niters += step
180 duration = t2 - start_time
181 if duration >= min_duration:
182 end_event.append(None)
183 return niters, duration
184 if t2 - t1 < 0.01:
185 # Minimize interference of measurement on overall runtime
186 step = step * 3 // 2
187 elif do_yield:
188 # OS scheduling of Python threads is sometimes so bad that we
189 # have to force thread switching ourselves, otherwise we get
190 # completely useless results.
191 _sleep(0.0001)
192 t1 = t2
193
194
195def run_throughput_test(func, args, nthreads):
196 assert nthreads >= 1
197
198 # Warm up
199 func(*args)
200
201 results = []
202 loop = TimedLoop(func, args)
203 end_event = []
204
205 if nthreads == 1:
206 # Pure single-threaded performance, without any switching or
207 # synchronization overhead.
208 start_time = time.time()
209 results.append(loop(start_time, THROUGHPUT_DURATION,
210 end_event, do_yield=False))
211 return results
212
213 started = False
214 ready_cond = threading.Condition()
215 start_cond = threading.Condition()
216 ready = []
217
218 def run():
219 with ready_cond:
220 ready.append(None)
221 ready_cond.notify()
222 with start_cond:
223 while not started:
224 start_cond.wait()
225 results.append(loop(start_time, THROUGHPUT_DURATION,
226 end_event, do_yield=True))
227
228 threads = []
229 for i in range(nthreads):
230 threads.append(threading.Thread(target=run))
231 for t in threads:
232 t.setDaemon(True)
233 t.start()
234 # We don't want measurements to include thread startup overhead,
235 # so we arrange for timing to start after all threads are ready.
236 with ready_cond:
237 while len(ready) < nthreads:
238 ready_cond.wait()
239 with start_cond:
240 start_time = time.time()
241 started = True
242 start_cond.notify(nthreads)
243 for t in threads:
244 t.join()
245
246 return results
247
248def run_throughput_tests(max_threads):
249 for task in throughput_tasks:
250 print(task.__doc__)
251 print()
252 func, args = task()
253 nthreads = 1
254 baseline_speed = None
255 while nthreads <= max_threads:
256 results = run_throughput_test(func, args, nthreads)
257 # Taking the max duration rather than average gives pessimistic
258 # results rather than optimistic.
259 speed = sum(r[0] for r in results) / max(r[1] for r in results)
260 print("threads=%d: %d" % (nthreads, speed), end="")
261 if baseline_speed is None:
262 print(" iterations/s.")
263 baseline_speed = speed
264 else:
265 print(" ( %d %%)" % (speed / baseline_speed * 100))
266 nthreads += 1
267 print()
268
269
270LAT_END = "END"
271
272def _sendto(sock, s, addr):
273 sock.sendto(s.encode('ascii'), addr)
274
275def _recv(sock, n):
276 return sock.recv(n).decode('ascii')
277
278def latency_client(addr, nb_pings, interval):
279 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
280 _time = time.time
281 _sleep = time.sleep
282 def _ping():
283 _sendto(sock, "%r\n" % _time(), addr)
284 # The first ping signals the parent process that we are ready.
285 _ping()
286 # We give the parent a bit of time to notice.
287 _sleep(1.0)
288 for i in range(nb_pings):
289 _sleep(interval)
290 _ping()
291 _sendto(sock, LAT_END + "\n", addr)
292
293def run_latency_client(**kwargs):
294 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
295 cmd_line.extend(['--latclient', repr(kwargs)])
296 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
297 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
298
299def run_latency_test(func, args, nthreads):
300 # Create a listening socket to receive the pings. We use UDP which should
301 # be painlessly cross-platform.
302 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
303 sock.bind(("127.0.0.1", 0))
304 addr = sock.getsockname()
305
306 interval = LATENCY_PING_INTERVAL
307 duration = LATENCY_DURATION
308 nb_pings = int(duration / interval)
309
310 results = []
311 threads = []
312 end_event = []
313 start_cond = threading.Condition()
314 started = False
315 if nthreads > 0:
316 # Warm up
317 func(*args)
318
319 results = []
320 loop = TimedLoop(func, args)
321 ready = []
322 ready_cond = threading.Condition()
323
324 def run():
325 with ready_cond:
326 ready.append(None)
327 ready_cond.notify()
328 with start_cond:
329 while not started:
330 start_cond.wait()
331 loop(start_time, duration * 1.5, end_event, do_yield=False)
332
333 for i in range(nthreads):
334 threads.append(threading.Thread(target=run))
335 for t in threads:
336 t.setDaemon(True)
337 t.start()
338 # Wait for threads to be ready
339 with ready_cond:
340 while len(ready) < nthreads:
341 ready_cond.wait()
342
343 # Run the client and wait for the first ping(s) to arrive before
344 # unblocking the background threads.
345 chunks = []
346 process = run_latency_client(addr=sock.getsockname(),
347 nb_pings=nb_pings, interval=interval)
348 s = _recv(sock, 4096)
349 _time = time.time
350
351 with start_cond:
352 start_time = _time()
353 started = True
354 start_cond.notify(nthreads)
355
356 while LAT_END not in s:
357 s = _recv(sock, 4096)
358 t = _time()
359 chunks.append((t, s))
360
361 # Tell the background threads to stop.
362 end_event.append(None)
363 for t in threads:
364 t.join()
365 process.wait()
366
367 for recv_time, chunk in chunks:
368 # NOTE: it is assumed that a line sent by a client wasn't received
369 # in two chunks because the lines are very small.
370 for line in chunk.splitlines():
371 line = line.strip()
372 if line and line != LAT_END:
373 send_time = eval(line)
374 assert isinstance(send_time, float)
375 results.append((send_time, recv_time))
376
377 return results
378
379def run_latency_tests(max_threads):
380 for task in latency_tasks:
381 print("Background CPU task:", task.__doc__)
382 print()
383 func, args = task()
384 nthreads = 0
385 while nthreads <= max_threads:
386 results = run_latency_test(func, args, nthreads)
387 n = len(results)
388 # We print out milliseconds
389 lats = [1000 * (t2 - t1) for (t1, t2) in results]
390 #print(list(map(int, lats)))
391 avg = sum(lats) / n
392 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
393 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
394 print()
395 #print(" [... from %d samples]" % n)
396 nthreads += 1
397 print()
398
399
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000400BW_END = "END"
401
402def bandwidth_client(addr, packet_size, duration):
403 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
404 sock.bind(("127.0.0.1", 0))
405 local_addr = sock.getsockname()
406 _time = time.time
407 _sleep = time.sleep
408 def _send_chunk(msg):
409 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
410 # We give the parent some time to be ready.
411 _sleep(1.0)
412 try:
413 start_time = _time()
414 end_time = start_time + duration * 2.0
415 i = 0
416 while _time() < end_time:
417 _send_chunk(str(i))
418 s = _recv(sock, packet_size)
419 assert len(s) == packet_size
420 i += 1
421 _send_chunk(BW_END)
422 finally:
423 sock.close()
424
425def run_bandwidth_client(**kwargs):
426 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
427 cmd_line.extend(['--bwclient', repr(kwargs)])
428 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
429 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
430
431def run_bandwidth_test(func, args, nthreads):
432 # Create a listening socket to receive the packets. We use UDP which should
433 # be painlessly cross-platform.
434 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
435 sock.bind(("127.0.0.1", 0))
436 addr = sock.getsockname()
437
438 duration = BANDWIDTH_DURATION
439 packet_size = BANDWIDTH_PACKET_SIZE
440
441 results = []
442 threads = []
443 end_event = []
444 start_cond = threading.Condition()
445 started = False
446 if nthreads > 0:
447 # Warm up
448 func(*args)
449
450 results = []
451 loop = TimedLoop(func, args)
452 ready = []
453 ready_cond = threading.Condition()
454
455 def run():
456 with ready_cond:
457 ready.append(None)
458 ready_cond.notify()
459 with start_cond:
460 while not started:
461 start_cond.wait()
462 loop(start_time, duration * 1.5, end_event, do_yield=False)
463
464 for i in range(nthreads):
465 threads.append(threading.Thread(target=run))
466 for t in threads:
467 t.setDaemon(True)
468 t.start()
469 # Wait for threads to be ready
470 with ready_cond:
471 while len(ready) < nthreads:
472 ready_cond.wait()
473
474 # Run the client and wait for the first packet to arrive before
475 # unblocking the background threads.
476 process = run_bandwidth_client(addr=addr,
477 packet_size=packet_size,
478 duration=duration)
479 _time = time.time
480 # This will also wait for the parent to be ready
481 s = _recv(sock, packet_size)
482 remote_addr = eval(s.partition('#')[0])
483
484 with start_cond:
485 start_time = _time()
486 started = True
487 start_cond.notify(nthreads)
488
489 n = 0
490 first_time = None
491 while not end_event and BW_END not in s:
492 _sendto(sock, s, remote_addr)
493 s = _recv(sock, packet_size)
494 if first_time is None:
495 first_time = _time()
496 n += 1
497 end_time = _time()
498
499 end_event.append(None)
500 for t in threads:
501 t.join()
502 process.kill()
503
504 return (n - 1) / (end_time - first_time)
505
506def run_bandwidth_tests(max_threads):
507 for task in bandwidth_tasks:
508 print("Background CPU task:", task.__doc__)
509 print()
510 func, args = task()
511 nthreads = 0
512 baseline_speed = None
513 while nthreads <= max_threads:
514 results = run_bandwidth_test(func, args, nthreads)
515 speed = results
516 #speed = len(results) * 1.0 / results[-1][0]
517 print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
518 if baseline_speed is None:
519 print(" packets/s.")
520 baseline_speed = speed
521 else:
522 print(" ( %d %%)" % (speed / baseline_speed * 100))
523 nthreads += 1
524 print()
525
526
Antoine Pitroua69ba652010-01-18 21:20:53 +0000527def main():
528 usage = "usage: %prog [-h|--help] [options]"
529 parser = OptionParser(usage=usage)
530 parser.add_option("-t", "--throughput",
531 action="store_true", dest="throughput", default=False,
532 help="run throughput tests")
533 parser.add_option("-l", "--latency",
534 action="store_true", dest="latency", default=False,
535 help="run latency tests")
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000536 parser.add_option("-b", "--bandwidth",
537 action="store_true", dest="bandwidth", default=False,
538 help="run I/O bandwidth tests")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000539 parser.add_option("-i", "--interval",
540 action="store", type="int", dest="check_interval", default=None,
541 help="sys.setcheckinterval() value")
542 parser.add_option("-I", "--switch-interval",
543 action="store", type="float", dest="switch_interval", default=None,
544 help="sys.setswitchinterval() value")
545 parser.add_option("-n", "--num-threads",
546 action="store", type="int", dest="nthreads", default=4,
547 help="max number of threads in tests")
548
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000549 # Hidden option to run the pinging and bandwidth clients
Antoine Pitroua69ba652010-01-18 21:20:53 +0000550 parser.add_option("", "--latclient",
551 action="store", dest="latclient", default=None,
552 help=SUPPRESS_HELP)
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000553 parser.add_option("", "--bwclient",
554 action="store", dest="bwclient", default=None,
555 help=SUPPRESS_HELP)
Antoine Pitroua69ba652010-01-18 21:20:53 +0000556
557 options, args = parser.parse_args()
558 if args:
559 parser.error("unexpected arguments")
560
561 if options.latclient:
562 kwargs = eval(options.latclient)
563 latency_client(**kwargs)
564 return
565
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000566 if options.bwclient:
567 kwargs = eval(options.bwclient)
568 bandwidth_client(**kwargs)
569 return
570
571 if not options.throughput and not options.latency and not options.bandwidth:
572 options.throughput = options.latency = options.bandwidth = True
Antoine Pitroua69ba652010-01-18 21:20:53 +0000573 if options.check_interval:
574 sys.setcheckinterval(options.check_interval)
575 if options.switch_interval:
576 sys.setswitchinterval(options.switch_interval)
577
578 print("== %s %s (%s) ==" % (
579 platform.python_implementation(),
580 platform.python_version(),
581 platform.python_build()[0],
582 ))
583 # Processor identification often has repeated spaces
584 cpu = ' '.join(platform.processor().split())
585 print("== %s %s on '%s' ==" % (
586 platform.machine(),
587 platform.system(),
588 cpu,
589 ))
590 print()
591
592 if options.throughput:
593 print("--- Throughput ---")
594 print()
595 run_throughput_tests(options.nthreads)
596
597 if options.latency:
598 print("--- Latency ---")
599 print()
600 run_latency_tests(options.nthreads)
601
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000602 if options.bandwidth:
603 print("--- I/O bandwidth ---")
604 print()
605 run_bandwidth_tests(options.nthreads)
606
Antoine Pitroua69ba652010-01-18 21:20:53 +0000607if __name__ == "__main__":
608 main()