blob: 4f77a65f4d779ba34f659c2c87a7c6345aad01da [file] [log] [blame]
Antoine Pitroua69ba652010-01-18 21:20:53 +00001# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3from __future__ import division
4from __future__ import print_function
5
6"""
7ccbench, a Python concurrency benchmark.
8"""
9
10import time
11import os
12import sys
Antoine Pitroua69ba652010-01-18 21:20:53 +000013import itertools
14import threading
15import subprocess
16import socket
17from optparse import OptionParser, SUPPRESS_HELP
18import platform
19
20# Compatibility
21try:
22 xrange
23except NameError:
24 xrange = range
25
26try:
27 map = itertools.imap
28except AttributeError:
29 pass
30
31
32THROUGHPUT_DURATION = 2.0
33
34LATENCY_PING_INTERVAL = 0.1
35LATENCY_DURATION = 2.0
36
Antoine Pitrou7ad96a52010-03-13 21:27:21 +000037BANDWIDTH_PACKET_SIZE = 1024
38BANDWIDTH_DURATION = 2.0
39
Antoine Pitroua69ba652010-01-18 21:20:53 +000040
41def task_pidigits():
42 """Pi calculation (Python)"""
43 _map = map
44 _count = itertools.count
45 _islice = itertools.islice
46
47 def calc_ndigits(n):
48 # From http://shootout.alioth.debian.org/
49 def gen_x():
50 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
51
52 def compose(a, b):
53 aq, ar, as_, at = a
54 bq, br, bs, bt = b
55 return (aq * bq,
56 aq * br + ar * bt,
57 as_ * bq + at * bs,
58 as_ * br + at * bt)
59
60 def extract(z, j):
61 q, r, s, t = z
62 return (q*j + r) // (s*j + t)
63
64 def pi_digits():
65 z = (1, 0, 0, 1)
66 x = gen_x()
67 while 1:
68 y = extract(z, 3)
69 while y != extract(z, 4):
70 z = compose(z, next(x))
71 y = extract(z, 3)
72 z = compose((10, -10*y, 0, 1), z)
73 yield y
74
75 return list(_islice(pi_digits(), n))
76
77 return calc_ndigits, (50, )
78
79def task_regex():
80 """regular expression (C)"""
81 # XXX this task gives horrendous latency results.
82 import re
83 # Taken from the `inspect` module
84 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
85 with open(__file__, "r") as f:
86 arg = f.read(2000)
87
88 def findall(s):
89 t = time.time()
90 try:
91 return pat.findall(s)
92 finally:
93 print(time.time() - t)
94 return pat.findall, (arg, )
95
96def task_sort():
97 """list sorting (C)"""
98 def list_sort(l):
99 l = l[::-1]
100 l.sort()
101
102 return list_sort, (list(range(1000)), )
103
104def task_compress_zlib():
105 """zlib compression (C)"""
106 import zlib
107 with open(__file__, "rb") as f:
108 arg = f.read(5000) * 3
109
110 def compress(s):
111 zlib.decompress(zlib.compress(s, 5))
112 return compress, (arg, )
113
114def task_compress_bz2():
115 """bz2 compression (C)"""
116 import bz2
117 with open(__file__, "rb") as f:
118 arg = f.read(3000) * 2
119
120 def compress(s):
121 bz2.compress(s)
122 return compress, (arg, )
123
124def task_hashing():
125 """SHA1 hashing (C)"""
126 import hashlib
127 with open(__file__, "rb") as f:
128 arg = f.read(5000) * 30
129
130 def compute(s):
131 hashlib.sha1(s).digest()
132 return compute, (arg, )
133
134
135throughput_tasks = [task_pidigits, task_regex]
136for mod in 'bz2', 'hashlib':
137 try:
138 globals()[mod] = __import__(mod)
139 except ImportError:
140 globals()[mod] = None
141
142# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
143# hashlib if available.
144# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
145if bz2 is not None:
146 throughput_tasks.append(task_compress_bz2)
147elif hashlib is not None:
148 throughput_tasks.append(task_hashing)
149else:
150 throughput_tasks.append(task_compress_zlib)
151
152latency_tasks = throughput_tasks
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000153bandwidth_tasks = [task_pidigits]
Antoine Pitroua69ba652010-01-18 21:20:53 +0000154
155
156class TimedLoop:
157 def __init__(self, func, args):
158 self.func = func
159 self.args = args
160
161 def __call__(self, start_time, min_duration, end_event, do_yield=False):
162 step = 20
163 niters = 0
164 duration = 0.0
165 _time = time.time
166 _sleep = time.sleep
167 _func = self.func
168 _args = self.args
169 t1 = start_time
170 while True:
171 for i in range(step):
172 _func(*_args)
173 t2 = _time()
174 # If another thread terminated, the current measurement is invalid
175 # => return the previous one.
176 if end_event:
177 return niters, duration
178 niters += step
179 duration = t2 - start_time
180 if duration >= min_duration:
181 end_event.append(None)
182 return niters, duration
183 if t2 - t1 < 0.01:
184 # Minimize interference of measurement on overall runtime
185 step = step * 3 // 2
186 elif do_yield:
187 # OS scheduling of Python threads is sometimes so bad that we
188 # have to force thread switching ourselves, otherwise we get
189 # completely useless results.
190 _sleep(0.0001)
191 t1 = t2
192
193
194def run_throughput_test(func, args, nthreads):
195 assert nthreads >= 1
196
197 # Warm up
198 func(*args)
199
200 results = []
201 loop = TimedLoop(func, args)
202 end_event = []
203
204 if nthreads == 1:
205 # Pure single-threaded performance, without any switching or
206 # synchronization overhead.
207 start_time = time.time()
208 results.append(loop(start_time, THROUGHPUT_DURATION,
209 end_event, do_yield=False))
210 return results
211
212 started = False
213 ready_cond = threading.Condition()
214 start_cond = threading.Condition()
215 ready = []
216
217 def run():
218 with ready_cond:
219 ready.append(None)
220 ready_cond.notify()
221 with start_cond:
222 while not started:
223 start_cond.wait()
224 results.append(loop(start_time, THROUGHPUT_DURATION,
225 end_event, do_yield=True))
226
227 threads = []
228 for i in range(nthreads):
229 threads.append(threading.Thread(target=run))
230 for t in threads:
231 t.setDaemon(True)
232 t.start()
233 # We don't want measurements to include thread startup overhead,
234 # so we arrange for timing to start after all threads are ready.
235 with ready_cond:
236 while len(ready) < nthreads:
237 ready_cond.wait()
238 with start_cond:
239 start_time = time.time()
240 started = True
241 start_cond.notify(nthreads)
242 for t in threads:
243 t.join()
244
245 return results
246
247def run_throughput_tests(max_threads):
248 for task in throughput_tasks:
249 print(task.__doc__)
250 print()
251 func, args = task()
252 nthreads = 1
253 baseline_speed = None
254 while nthreads <= max_threads:
255 results = run_throughput_test(func, args, nthreads)
256 # Taking the max duration rather than average gives pessimistic
257 # results rather than optimistic.
258 speed = sum(r[0] for r in results) / max(r[1] for r in results)
259 print("threads=%d: %d" % (nthreads, speed), end="")
260 if baseline_speed is None:
261 print(" iterations/s.")
262 baseline_speed = speed
263 else:
264 print(" ( %d %%)" % (speed / baseline_speed * 100))
265 nthreads += 1
266 print()
267
268
269LAT_END = "END"
270
271def _sendto(sock, s, addr):
272 sock.sendto(s.encode('ascii'), addr)
273
274def _recv(sock, n):
275 return sock.recv(n).decode('ascii')
276
277def latency_client(addr, nb_pings, interval):
Antoine Pitrou375ff582011-03-11 20:57:11 +0100278 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
279 try:
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000280 _time = time.time
281 _sleep = time.sleep
282 def _ping():
283 _sendto(sock, "%r\n" % _time(), addr)
284 # The first ping signals the parent process that we are ready.
Antoine Pitroua69ba652010-01-18 21:20:53 +0000285 _ping()
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000286 # We give the parent a bit of time to notice.
287 _sleep(1.0)
288 for i in range(nb_pings):
289 _sleep(interval)
290 _ping()
291 _sendto(sock, LAT_END + "\n", addr)
Antoine Pitrou375ff582011-03-11 20:57:11 +0100292 finally:
293 sock.close()
Antoine Pitroua69ba652010-01-18 21:20:53 +0000294
295def run_latency_client(**kwargs):
296 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
297 cmd_line.extend(['--latclient', repr(kwargs)])
298 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
299 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
300
301def run_latency_test(func, args, nthreads):
302 # Create a listening socket to receive the pings. We use UDP which should
303 # be painlessly cross-platform.
304 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
305 sock.bind(("127.0.0.1", 0))
306 addr = sock.getsockname()
307
308 interval = LATENCY_PING_INTERVAL
309 duration = LATENCY_DURATION
310 nb_pings = int(duration / interval)
311
312 results = []
313 threads = []
314 end_event = []
315 start_cond = threading.Condition()
316 started = False
317 if nthreads > 0:
318 # Warm up
319 func(*args)
320
321 results = []
322 loop = TimedLoop(func, args)
323 ready = []
324 ready_cond = threading.Condition()
325
326 def run():
327 with ready_cond:
328 ready.append(None)
329 ready_cond.notify()
330 with start_cond:
331 while not started:
332 start_cond.wait()
333 loop(start_time, duration * 1.5, end_event, do_yield=False)
334
335 for i in range(nthreads):
336 threads.append(threading.Thread(target=run))
337 for t in threads:
338 t.setDaemon(True)
339 t.start()
340 # Wait for threads to be ready
341 with ready_cond:
342 while len(ready) < nthreads:
343 ready_cond.wait()
344
345 # Run the client and wait for the first ping(s) to arrive before
346 # unblocking the background threads.
347 chunks = []
348 process = run_latency_client(addr=sock.getsockname(),
349 nb_pings=nb_pings, interval=interval)
350 s = _recv(sock, 4096)
351 _time = time.time
352
353 with start_cond:
354 start_time = _time()
355 started = True
356 start_cond.notify(nthreads)
357
358 while LAT_END not in s:
359 s = _recv(sock, 4096)
360 t = _time()
361 chunks.append((t, s))
362
363 # Tell the background threads to stop.
364 end_event.append(None)
365 for t in threads:
366 t.join()
367 process.wait()
Antoine Pitrou126c8b42011-01-15 11:39:23 +0000368 sock.close()
Antoine Pitroua69ba652010-01-18 21:20:53 +0000369
370 for recv_time, chunk in chunks:
371 # NOTE: it is assumed that a line sent by a client wasn't received
372 # in two chunks because the lines are very small.
373 for line in chunk.splitlines():
374 line = line.strip()
375 if line and line != LAT_END:
376 send_time = eval(line)
377 assert isinstance(send_time, float)
378 results.append((send_time, recv_time))
379
380 return results
381
382def run_latency_tests(max_threads):
383 for task in latency_tasks:
384 print("Background CPU task:", task.__doc__)
385 print()
386 func, args = task()
387 nthreads = 0
388 while nthreads <= max_threads:
389 results = run_latency_test(func, args, nthreads)
390 n = len(results)
391 # We print out milliseconds
392 lats = [1000 * (t2 - t1) for (t1, t2) in results]
393 #print(list(map(int, lats)))
394 avg = sum(lats) / n
395 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
396 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
397 print()
398 #print(" [... from %d samples]" % n)
399 nthreads += 1
400 print()
401
402
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000403BW_END = "END"
404
405def bandwidth_client(addr, packet_size, duration):
406 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
407 sock.bind(("127.0.0.1", 0))
408 local_addr = sock.getsockname()
409 _time = time.time
410 _sleep = time.sleep
411 def _send_chunk(msg):
412 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
413 # We give the parent some time to be ready.
414 _sleep(1.0)
415 try:
416 start_time = _time()
417 end_time = start_time + duration * 2.0
418 i = 0
419 while _time() < end_time:
420 _send_chunk(str(i))
421 s = _recv(sock, packet_size)
422 assert len(s) == packet_size
423 i += 1
424 _send_chunk(BW_END)
425 finally:
426 sock.close()
427
428def run_bandwidth_client(**kwargs):
429 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
430 cmd_line.extend(['--bwclient', repr(kwargs)])
431 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
432 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
433
434def run_bandwidth_test(func, args, nthreads):
435 # Create a listening socket to receive the packets. We use UDP which should
436 # be painlessly cross-platform.
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200437 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
438 sock.bind(("127.0.0.1", 0))
439 addr = sock.getsockname()
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000440
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200441 duration = BANDWIDTH_DURATION
442 packet_size = BANDWIDTH_PACKET_SIZE
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000443
444 results = []
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200445 threads = []
446 end_event = []
447 start_cond = threading.Condition()
448 started = False
449 if nthreads > 0:
450 # Warm up
451 func(*args)
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000452
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200453 results = []
454 loop = TimedLoop(func, args)
455 ready = []
456 ready_cond = threading.Condition()
457
458 def run():
459 with ready_cond:
460 ready.append(None)
461 ready_cond.notify()
462 with start_cond:
463 while not started:
464 start_cond.wait()
465 loop(start_time, duration * 1.5, end_event, do_yield=False)
466
467 for i in range(nthreads):
468 threads.append(threading.Thread(target=run))
469 for t in threads:
470 t.setDaemon(True)
471 t.start()
472 # Wait for threads to be ready
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000473 with ready_cond:
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200474 while len(ready) < nthreads:
475 ready_cond.wait()
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000476
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200477 # Run the client and wait for the first packet to arrive before
478 # unblocking the background threads.
479 process = run_bandwidth_client(addr=addr,
480 packet_size=packet_size,
481 duration=duration)
482 _time = time.time
483 # This will also wait for the parent to be ready
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000484 s = _recv(sock, packet_size)
Georg Brandlc0fdf6c2012-03-27 07:43:53 +0200485 remote_addr = eval(s.partition('#')[0])
486
487 with start_cond:
488 start_time = _time()
489 started = True
490 start_cond.notify(nthreads)
491
492 n = 0
493 first_time = None
494 while not end_event and BW_END not in s:
495 _sendto(sock, s, remote_addr)
496 s = _recv(sock, packet_size)
497 if first_time is None:
498 first_time = _time()
499 n += 1
500 end_time = _time()
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000501
502 end_event.append(None)
503 for t in threads:
504 t.join()
505 process.kill()
506
507 return (n - 1) / (end_time - first_time)
508
509def run_bandwidth_tests(max_threads):
510 for task in bandwidth_tasks:
511 print("Background CPU task:", task.__doc__)
512 print()
513 func, args = task()
514 nthreads = 0
515 baseline_speed = None
516 while nthreads <= max_threads:
517 results = run_bandwidth_test(func, args, nthreads)
518 speed = results
519 #speed = len(results) * 1.0 / results[-1][0]
520 print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
521 if baseline_speed is None:
522 print(" packets/s.")
523 baseline_speed = speed
524 else:
525 print(" ( %d %%)" % (speed / baseline_speed * 100))
526 nthreads += 1
527 print()
528
529
Antoine Pitroua69ba652010-01-18 21:20:53 +0000530def main():
531 usage = "usage: %prog [-h|--help] [options]"
532 parser = OptionParser(usage=usage)
533 parser.add_option("-t", "--throughput",
534 action="store_true", dest="throughput", default=False,
535 help="run throughput tests")
536 parser.add_option("-l", "--latency",
537 action="store_true", dest="latency", default=False,
538 help="run latency tests")
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000539 parser.add_option("-b", "--bandwidth",
540 action="store_true", dest="bandwidth", default=False,
541 help="run I/O bandwidth tests")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000542 parser.add_option("-i", "--interval",
543 action="store", type="int", dest="check_interval", default=None,
Victor Stinner36456df2019-06-25 03:01:08 +0200544 help="sys.setcheckinterval() value "
545 "(Python 3.8 and older)")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000546 parser.add_option("-I", "--switch-interval",
547 action="store", type="float", dest="switch_interval", default=None,
Victor Stinner36456df2019-06-25 03:01:08 +0200548 help="sys.setswitchinterval() value "
549 "(Python 3.2 and newer)")
Antoine Pitroua69ba652010-01-18 21:20:53 +0000550 parser.add_option("-n", "--num-threads",
551 action="store", type="int", dest="nthreads", default=4,
552 help="max number of threads in tests")
553
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000554 # Hidden option to run the pinging and bandwidth clients
Antoine Pitroua69ba652010-01-18 21:20:53 +0000555 parser.add_option("", "--latclient",
556 action="store", dest="latclient", default=None,
557 help=SUPPRESS_HELP)
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000558 parser.add_option("", "--bwclient",
559 action="store", dest="bwclient", default=None,
560 help=SUPPRESS_HELP)
Antoine Pitroua69ba652010-01-18 21:20:53 +0000561
562 options, args = parser.parse_args()
563 if args:
564 parser.error("unexpected arguments")
565
566 if options.latclient:
567 kwargs = eval(options.latclient)
568 latency_client(**kwargs)
569 return
570
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000571 if options.bwclient:
572 kwargs = eval(options.bwclient)
573 bandwidth_client(**kwargs)
574 return
575
576 if not options.throughput and not options.latency and not options.bandwidth:
577 options.throughput = options.latency = options.bandwidth = True
Antoine Pitroua69ba652010-01-18 21:20:53 +0000578 if options.check_interval:
579 sys.setcheckinterval(options.check_interval)
580 if options.switch_interval:
581 sys.setswitchinterval(options.switch_interval)
582
583 print("== %s %s (%s) ==" % (
584 platform.python_implementation(),
585 platform.python_version(),
586 platform.python_build()[0],
587 ))
588 # Processor identification often has repeated spaces
589 cpu = ' '.join(platform.processor().split())
590 print("== %s %s on '%s' ==" % (
591 platform.machine(),
592 platform.system(),
593 cpu,
594 ))
595 print()
596
597 if options.throughput:
598 print("--- Throughput ---")
599 print()
600 run_throughput_tests(options.nthreads)
601
602 if options.latency:
603 print("--- Latency ---")
604 print()
605 run_latency_tests(options.nthreads)
606
Antoine Pitrou7ad96a52010-03-13 21:27:21 +0000607 if options.bandwidth:
608 print("--- I/O bandwidth ---")
609 print()
610 run_bandwidth_tests(options.nthreads)
611
Antoine Pitroua69ba652010-01-18 21:20:53 +0000612if __name__ == "__main__":
613 main()