blob: ab1465a276092310cd85654aa51b1c1298c52f36 [file] [log] [blame]
Antoine Pitroua69ba652010-01-18 21:20:53 +00001# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3from __future__ import division
4from __future__ import print_function
5
6"""
7ccbench, a Python concurrency benchmark.
8"""
9
10import time
11import os
12import sys
Antoine Pitroua69ba652010-01-18 21:20:53 +000013import itertools
14import threading
15import subprocess
16import socket
17from optparse import OptionParser, SUPPRESS_HELP
18import platform
19
20# Compatibility
21try:
22 xrange
23except NameError:
24 xrange = range
25
26try:
27 map = itertools.imap
28except AttributeError:
29 pass
30
31
32THROUGHPUT_DURATION = 2.0
33
34LATENCY_PING_INTERVAL = 0.1
35LATENCY_DURATION = 2.0
36
Antoine Pitrou7ad96a52010-03-13 21:27:21 +000037BANDWIDTH_PACKET_SIZE = 1024
38BANDWIDTH_DURATION = 2.0
39
Antoine Pitroua69ba652010-01-18 21:20:53 +000040
41def task_pidigits():
42 """Pi calculation (Python)"""
43 _map = map
44 _count = itertools.count
45 _islice = itertools.islice
46
47 def calc_ndigits(n):
48 # From http://shootout.alioth.debian.org/
49 def gen_x():
50 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
51
52 def compose(a, b):
53 aq, ar, as_, at = a
54 bq, br, bs, bt = b
55 return (aq * bq,
56 aq * br + ar * bt,
57 as_ * bq + at * bs,
58 as_ * br + at * bt)
59
60 def extract(z, j):
61 q, r, s, t = z
62 return (q*j + r) // (s*j + t)
63
64 def pi_digits():
65 z = (1, 0, 0, 1)
66 x = gen_x()
67 while 1:
68 y = extract(z, 3)
69 while y != extract(z, 4):
70 z = compose(z, next(x))
71 y = extract(z, 3)
72 z = compose((10, -10*y, 0, 1), z)
73 yield y
74
75 return list(_islice(pi_digits(), n))
76
77 return calc_ndigits, (50, )
78
79def task_regex():
80 """regular expression (C)"""
81 # XXX this task gives horrendous latency results.
82 import re
83 # Taken from the `inspect` module
84 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
85 with open(__file__, "r") as f:
86 arg = f.read(2000)
Antoine Pitroua69ba652010-01-18 21:20:53 +000087 return pat.findall, (arg, )
88
89def task_sort():
90 """list sorting (C)"""
91 def list_sort(l):
92 l = l[::-1]
93 l.sort()
94
95 return list_sort, (list(range(1000)), )
96
97def task_compress_zlib():
98 """zlib compression (C)"""
99 import zlib
100 with open(__file__, "rb") as f:
101 arg = f.read(5000) * 3
102
103 def compress(s):
104 zlib.decompress(zlib.compress(s, 5))
105 return compress, (arg, )
106
107def task_compress_bz2():
108 """bz2 compression (C)"""
109 import bz2
110 with open(__file__, "rb") as f:
111 arg = f.read(3000) * 2
112
113 def compress(s):
114 bz2.compress(s)
115 return compress, (arg, )
116
117def task_hashing():
118 """SHA1 hashing (C)"""
119 import hashlib
120 with open(__file__, "rb") as f:
121 arg = f.read(5000) * 30
122
123 def compute(s):
124 hashlib.sha1(s).digest()
125 return compute, (arg, )
126
127
128throughput_tasks = [task_pidigits, task_regex]
129for mod in 'bz2', 'hashlib':
130 try:
131 globals()[mod] = __import__(mod)
132 except ImportError:
133 globals()[mod] = None
134
135# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
136# hashlib if available.
137# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
138if bz2 is not None:
139 throughput_tasks.append(task_compress_bz2)
140elif hashlib is not None:
141 throughput_tasks.append(task_hashing)
142else:
143 throughput_tasks.append(task_compress_zlib)
144
145latency_tasks = throughput_tasks
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000146bandwidth_tasks = [task_pidigits]
Antoine Pitroua69ba652010-01-18 21:20:53 +0000147
148
149class TimedLoop:
150 def __init__(self, func, args):
151 self.func = func
152 self.args = args
153
154 def __call__(self, start_time, min_duration, end_event, do_yield=False):
155 step = 20
156 niters = 0
157 duration = 0.0
158 _time = time.time
159 _sleep = time.sleep
160 _func = self.func
161 _args = self.args
162 t1 = start_time
163 while True:
164 for i in range(step):
165 _func(*_args)
166 t2 = _time()
167 # If another thread terminated, the current measurement is invalid
168 # => return the previous one.
169 if end_event:
170 return niters, duration
171 niters += step
172 duration = t2 - start_time
173 if duration >= min_duration:
174 end_event.append(None)
175 return niters, duration
176 if t2 - t1 < 0.01:
177 # Minimize interference of measurement on overall runtime
178 step = step * 3 // 2
179 elif do_yield:
180 # OS scheduling of Python threads is sometimes so bad that we
181 # have to force thread switching ourselves, otherwise we get
182 # completely useless results.
183 _sleep(0.0001)
184 t1 = t2
185
186
187def run_throughput_test(func, args, nthreads):
188 assert nthreads >= 1
189
190 # Warm up
191 func(*args)
192
193 results = []
194 loop = TimedLoop(func, args)
195 end_event = []
196
197 if nthreads == 1:
198 # Pure single-threaded performance, without any switching or
199 # synchronization overhead.
200 start_time = time.time()
201 results.append(loop(start_time, THROUGHPUT_DURATION,
202 end_event, do_yield=False))
203 return results
204
205 started = False
206 ready_cond = threading.Condition()
207 start_cond = threading.Condition()
208 ready = []
209
210 def run():
211 with ready_cond:
212 ready.append(None)
213 ready_cond.notify()
214 with start_cond:
215 while not started:
216 start_cond.wait()
217 results.append(loop(start_time, THROUGHPUT_DURATION,
218 end_event, do_yield=True))
219
220 threads = []
221 for i in range(nthreads):
222 threads.append(threading.Thread(target=run))
223 for t in threads:
224 t.setDaemon(True)
225 t.start()
226 # We don't want measurements to include thread startup overhead,
227 # so we arrange for timing to start after all threads are ready.
228 with ready_cond:
229 while len(ready) < nthreads:
230 ready_cond.wait()
231 with start_cond:
232 start_time = time.time()
233 started = True
234 start_cond.notify(nthreads)
235 for t in threads:
236 t.join()
237
238 return results
239
240def run_throughput_tests(max_threads):
241 for task in throughput_tasks:
242 print(task.__doc__)
243 print()
244 func, args = task()
245 nthreads = 1
246 baseline_speed = None
247 while nthreads <= max_threads:
248 results = run_throughput_test(func, args, nthreads)
249 # Taking the max duration rather than average gives pessimistic
250 # results rather than optimistic.
251 speed = sum(r[0] for r in results) / max(r[1] for r in results)
252 print("threads=%d: %d" % (nthreads, speed), end="")
253 if baseline_speed is None:
254 print(" iterations/s.")
255 baseline_speed = speed
256 else:
257 print(" ( %d %%)" % (speed / baseline_speed * 100))
258 nthreads += 1
259 print()
260
261
262LAT_END = "END"
263
264def _sendto(sock, s, addr):
265 sock.sendto(s.encode('ascii'), addr)
266
267def _recv(sock, n):
268 return sock.recv(n).decode('ascii')
269
270def latency_client(addr, nb_pings, interval):
Antoine Pitrou375ff582011-03-11 20:57:11 +0100271 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
272 try:
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000273 _time = time.time
274 _sleep = time.sleep
275 def _ping():
276 _sendto(sock, "%r\n" % _time(), addr)
277 # The first ping signals the parent process that we are ready.
Antoine Pitroua69ba652010-01-18 21:20:53 +0000278 _ping()
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000279 # We give the parent a bit of time to notice.
280 _sleep(1.0)
281 for i in range(nb_pings):
282 _sleep(interval)
283 _ping()
284 _sendto(sock, LAT_END + "\n", addr)
Antoine Pitrou375ff582011-03-11 20:57:11 +0100285 finally:
286 sock.close()
Antoine Pitroua69ba652010-01-18 21:20:53 +0000287
288def run_latency_client(**kwargs):
289 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
290 cmd_line.extend(['--latclient', repr(kwargs)])
291 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
292 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
293
294def run_latency_test(func, args, nthreads):
295 # Create a listening socket to receive the pings. We use UDP which should
296 # be painlessly cross-platform.
297 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
298 sock.bind(("127.0.0.1", 0))
299 addr = sock.getsockname()
300
301 interval = LATENCY_PING_INTERVAL
302 duration = LATENCY_DURATION
303 nb_pings = int(duration / interval)
304
305 results = []
306 threads = []
307 end_event = []
308 start_cond = threading.Condition()
309 started = False
310 if nthreads > 0:
311 # Warm up
312 func(*args)
313
314 results = []
315 loop = TimedLoop(func, args)
316 ready = []
317 ready_cond = threading.Condition()
318
319 def run():
320 with ready_cond:
321 ready.append(None)
322 ready_cond.notify()
323 with start_cond:
324 while not started:
325 start_cond.wait()
326 loop(start_time, duration * 1.5, end_event, do_yield=False)
327
328 for i in range(nthreads):
329 threads.append(threading.Thread(target=run))
330 for t in threads:
331 t.setDaemon(True)
332 t.start()
333 # Wait for threads to be ready
334 with ready_cond:
335 while len(ready) < nthreads:
336 ready_cond.wait()
337
338 # Run the client and wait for the first ping(s) to arrive before
339 # unblocking the background threads.
340 chunks = []
341 process = run_latency_client(addr=sock.getsockname(),
342 nb_pings=nb_pings, interval=interval)
343 s = _recv(sock, 4096)
344 _time = time.time
345
346 with start_cond:
347 start_time = _time()
348 started = True
349 start_cond.notify(nthreads)
350
351 while LAT_END not in s:
352 s = _recv(sock, 4096)
353 t = _time()
354 chunks.append((t, s))
355
356 # Tell the background threads to stop.
357 end_event.append(None)
358 for t in threads:
359 t.join()
360 process.wait()
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000361 sock.close()
Antoine Pitroua69ba652010-01-18 21:20:53 +0000362
363 for recv_time, chunk in chunks:
364 # NOTE: it is assumed that a line sent by a client wasn't received
365 # in two chunks because the lines are very small.
366 for line in chunk.splitlines():
367 line = line.strip()
368 if line and line != LAT_END:
369 send_time = eval(line)
370 assert isinstance(send_time, float)
371 results.append((send_time, recv_time))
372
373 return results
374
375def run_latency_tests(max_threads):
376 for task in latency_tasks:
377 print("Background CPU task:", task.__doc__)
378 print()
379 func, args = task()
380 nthreads = 0
381 while nthreads <= max_threads:
382 results = run_latency_test(func, args, nthreads)
383 n = len(results)
384 # We print out milliseconds
385 lats = [1000 * (t2 - t1) for (t1, t2) in results]
386 #print(list(map(int, lats)))
387 avg = sum(lats) / n
388 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
389 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
390 print()
391 #print(" [... from %d samples]" % n)
392 nthreads += 1
393 print()
394
395
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000396BW_END = "END"
397
398def bandwidth_client(addr, packet_size, duration):
399 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
400 sock.bind(("127.0.0.1", 0))
401 local_addr = sock.getsockname()
402 _time = time.time
403 _sleep = time.sleep
404 def _send_chunk(msg):
405 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
406 # We give the parent some time to be ready.
407 _sleep(1.0)
408 try:
409 start_time = _time()
410 end_time = start_time + duration * 2.0
411 i = 0
412 while _time() < end_time:
413 _send_chunk(str(i))
414 s = _recv(sock, packet_size)
415 assert len(s) == packet_size
416 i += 1
417 _send_chunk(BW_END)
418 finally:
419 sock.close()
420
421def run_bandwidth_client(**kwargs):
422 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
423 cmd_line.extend(['--bwclient', repr(kwargs)])
424 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
425 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
426
427def run_bandwidth_test(func, args, nthreads):
428 # Create a listening socket to receive the packets. We use UDP which should
429 # be painlessly cross-platform.
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200430 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
431 sock.bind(("127.0.0.1", 0))
432 addr = sock.getsockname()
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000433
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200434 duration = BANDWIDTH_DURATION
435 packet_size = BANDWIDTH_PACKET_SIZE
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000436
437 results = []
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200438 threads = []
439 end_event = []
440 start_cond = threading.Condition()
441 started = False
442 if nthreads > 0:
443 # Warm up
444 func(*args)
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000445
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200446 results = []
447 loop = TimedLoop(func, args)
448 ready = []
449 ready_cond = threading.Condition()
450
451 def run():
452 with ready_cond:
453 ready.append(None)
454 ready_cond.notify()
455 with start_cond:
456 while not started:
457 start_cond.wait()
458 loop(start_time, duration * 1.5, end_event, do_yield=False)
459
460 for i in range(nthreads):
461 threads.append(threading.Thread(target=run))
462 for t in threads:
463 t.setDaemon(True)
464 t.start()
465 # Wait for threads to be ready
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000466 with ready_cond:
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200467 while len(ready) < nthreads:
468 ready_cond.wait()
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000469
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200470 # Run the client and wait for the first packet to arrive before
471 # unblocking the background threads.
472 process = run_bandwidth_client(addr=addr,
473 packet_size=packet_size,
474 duration=duration)
475 _time = time.time
476 # This will also wait for the parent to be ready
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000477 s = _recv(sock, packet_size)
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200478 remote_addr = eval(s.partition('#')[0])
479
480 with start_cond:
481 start_time = _time()
482 started = True
483 start_cond.notify(nthreads)
484
485 n = 0
486 first_time = None
487 while not end_event and BW_END not in s:
488 _sendto(sock, s, remote_addr)
489 s = _recv(sock, packet_size)
490 if first_time is None:
491 first_time = _time()
492 n += 1
493 end_time = _time()
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000494
495 end_event.append(None)
496 for t in threads:
497 t.join()
498 process.kill()
499
500 return (n - 1) / (end_time - first_time)
501
502def run_bandwidth_tests(max_threads):
503 for task in bandwidth_tasks:
504 print("Background CPU task:", task.__doc__)
505 print()
506 func, args = task()
507 nthreads = 0
508 baseline_speed = None
509 while nthreads <= max_threads:
510 results = run_bandwidth_test(func, args, nthreads)
511 speed = results
512 #speed = len(results) * 1.0 / results[-1][0]
513 print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
514 if baseline_speed is None:
515 print(" packets/s.")
516 baseline_speed = speed
517 else:
518 print(" ( %d %%)" % (speed / baseline_speed * 100))
519 nthreads += 1
520 print()
521
522
Antoine Pitroua69ba652010-01-18 21:20:53 +0000523def main():
524 usage = "usage: %prog [-h|--help] [options]"
525 parser = OptionParser(usage=usage)
526 parser.add_option("-t", "--throughput",
527 action="store_true", dest="throughput", default=False,
528 help="run throughput tests")
529 parser.add_option("-l", "--latency",
530 action="store_true", dest="latency", default=False,
531 help="run latency tests")
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000532 parser.add_option("-b", "--bandwidth",
533 action="store_true", dest="bandwidth", default=False,
534 help="run I/O bandwidth tests")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000535 parser.add_option("-i", "--interval",
536 action="store", type="int", dest="check_interval", default=None,
Victor Stinner36456df2019-06-25 03:01:08 +0200537 help="sys.setcheckinterval() value "
538 "(Python 3.8 and older)")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000539 parser.add_option("-I", "--switch-interval",
540 action="store", type="float", dest="switch_interval", default=None,
Victor Stinner36456df2019-06-25 03:01:08 +0200541 help="sys.setswitchinterval() value "
542 "(Python 3.2 and newer)")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000543 parser.add_option("-n", "--num-threads",
544 action="store", type="int", dest="nthreads", default=4,
545 help="max number of threads in tests")
546
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000547 # Hidden option to run the pinging and bandwidth clients
Antoine Pitroua69ba652010-01-18 21:20:53 +0000548 parser.add_option("", "--latclient",
549 action="store", dest="latclient", default=None,
550 help=SUPPRESS_HELP)
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000551 parser.add_option("", "--bwclient",
552 action="store", dest="bwclient", default=None,
553 help=SUPPRESS_HELP)
Antoine Pitroua69ba652010-01-18 21:20:53 +0000554
555 options, args = parser.parse_args()
556 if args:
557 parser.error("unexpected arguments")
558
559 if options.latclient:
560 kwargs = eval(options.latclient)
561 latency_client(**kwargs)
562 return
563
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000564 if options.bwclient:
565 kwargs = eval(options.bwclient)
566 bandwidth_client(**kwargs)
567 return
568
569 if not options.throughput and not options.latency and not options.bandwidth:
570 options.throughput = options.latency = options.bandwidth = True
Antoine Pitroua69ba652010-01-18 21:20:53 +0000571 if options.check_interval:
572 sys.setcheckinterval(options.check_interval)
573 if options.switch_interval:
574 sys.setswitchinterval(options.switch_interval)
575
576 print("== %s %s (%s) ==" % (
577 platform.python_implementation(),
578 platform.python_version(),
579 platform.python_build()[0],
580 ))
581 # Processor identification often has repeated spaces
582 cpu = ' '.join(platform.processor().split())
583 print("== %s %s on '%s' ==" % (
584 platform.machine(),
585 platform.system(),
586 cpu,
587 ))
588 print()
589
590 if options.throughput:
591 print("--- Throughput ---")
592 print()
593 run_throughput_tests(options.nthreads)
594
595 if options.latency:
596 print("--- Latency ---")
597 print()
598 run_latency_tests(options.nthreads)
599
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000600 if options.bandwidth:
601 print("--- I/O bandwidth ---")
602 print()
603 run_bandwidth_tests(options.nthreads)
604
Antoine Pitroua69ba652010-01-18 21:20:53 +0000605if __name__ == "__main__":
606 main()