blob: db5c560c03c6ef41561f7400bfa877972edc12ef [file] [log] [blame]
#!/usr/bin/python
"""
MultiTestRunner - Harness for running multiple tests in the simple clang style.
TODO
--
- Fix Ctrl-c issues
- Use a timeout
- Detect signalled failures (abort)
- Better support for finding tests
"""
# TOD
import os, sys, re, random, time
import threading
import ProgressBar
import TestRunner
from TestRunner import TestStatus
from Queue import Queue
kTestFileExtensions = set(['.mi','.i','.c','.cpp','.m','.mm','.ll'])
kClangErrorRE = re.compile('(.*):([0-9]+):([0-9]+): error: (.*)')
kClangWarningRE = re.compile('(.*):([0-9]+):([0-9]+): warning: (.*)')
kAssertionRE = re.compile('Assertion failed: (.*, function .*, file .*, line [0-9]+\\.)')
def getTests(inputs):
for path in inputs:
if not os.path.exists(path):
print >>sys.stderr,"WARNING: Invalid test \"%s\""%(path,)
continue
if os.path.isdir(path):
for dirpath,dirnames,filenames in os.walk(path):
dotTests = os.path.join(dirpath,'.tests')
if os.path.exists(dotTests):
for ln in open(dotTests):
if ln.strip():
yield os.path.join(dirpath,ln.strip())
else:
# FIXME: This doesn't belong here
if 'Output' in dirnames:
dirnames.remove('Output')
for f in filenames:
base,ext = os.path.splitext(f)
if ext in kTestFileExtensions:
yield os.path.join(dirpath,f)
else:
yield path
class TestingProgressDisplay:
def __init__(self, opts, numTests, progressBar=None):
self.opts = opts
self.numTests = numTests
self.digits = len(str(self.numTests))
self.current = None
self.lock = threading.Lock()
self.progressBar = progressBar
self.progress = 0.
def update(self, index, tr):
# Avoid locking overhead in quiet mode
if self.opts.quiet and not tr.failed():
return
# Output lock
self.lock.acquire()
try:
self.handleUpdate(index, tr)
finally:
self.lock.release()
def finish(self):
if self.progressBar:
self.progressBar.clear()
elif self.opts.succinct:
sys.stdout.write('\n')
def handleUpdate(self, index, tr):
if self.progressBar:
if tr.failed():
self.progressBar.clear()
else:
# Force monotonicity
self.progress = max(self.progress, float(index)/self.numTests)
self.progressBar.update(self.progress, tr.path)
return
elif self.opts.succinct:
if not tr.failed():
sys.stdout.write('.')
sys.stdout.flush()
return
else:
sys.stdout.write('\n')
extra = ''
if tr.code==TestStatus.Invalid:
extra = ' - (Invalid test)'
elif tr.code==TestStatus.NoRunLine:
extra = ' - (No RUN line)'
elif tr.failed():
extra = ' - %s'%(TestStatus.getName(tr.code).upper(),)
print '%*d/%*d - %s%s'%(self.digits, index+1, self.digits,
self.numTests, tr.path, extra)
if tr.failed():
msgs = []
if tr.warnings:
msgs.append('%d warnings'%(len(tr.warnings),))
if tr.errors:
msgs.append('%d errors'%(len(tr.errors),))
if tr.assertions:
msgs.append('%d assertions'%(len(tr.assertions),))
if msgs:
print '\tFAIL (%s)'%(', '.join(msgs))
for i,error in enumerate(set([e for (_,_,_,e) in tr.errors])):
print '\t\tERROR: %s'%(error,)
if i>20:
print '\t\t\t(too many errors, skipping)'
break
for assertion in set(tr.assertions):
print '\t\tASSERTION: %s'%(assertion,)
if self.opts.showOutput:
TestRunner.cat(tr.testResults, sys.stdout)
class TestResult:
def __init__(self, path, code, testResults):
self.path = path
self.code = code
self.testResults = testResults
self.warnings = []
self.errors = []
self.assertions = []
if self.failed():
f = open(self.testResults)
data = f.read()
f.close()
self.warnings = [m.groups() for m in kClangWarningRE.finditer(data)]
self.errors = [m.groups() for m in kClangErrorRE.finditer(data)]
self.assertions = [m.group(1) for m in kAssertionRE.finditer(data)]
def failed(self):
return self.code in (TestStatus.Fail,TestStatus.XPass)
class TestProvider:
def __init__(self, opts, tests, display):
self.opts = opts
self.tests = tests
self.index = 0
self.lock = threading.Lock()
self.results = [None]*len(self.tests)
self.startTime = time.time()
self.progress = display
def get(self):
self.lock.acquire()
try:
if self.opts.maxTime is not None:
if time.time() - self.startTime > self.opts.maxTime:
return None
if self.index >= len(self.tests):
return None
item = self.tests[self.index],self.index
self.index += 1
return item
finally:
self.lock.release()
def setResult(self, index, result):
self.results[index] = result
self.progress.update(index, result)
class Tester(threading.Thread):
def __init__(self, provider):
threading.Thread.__init__(self)
self.provider = provider
def run(self):
while 1:
item = self.provider.get()
if item is None:
break
self.runTest(item)
def runTest(self, (path,index)):
command = path
# Use hand concatentation here because we want to override
# absolute paths.
output = 'Output/' + path + '.out'
testname = path
testresults = 'Output/' + path + '.testresults'
TestRunner.mkdir_p(os.path.dirname(testresults))
numTests = len(self.provider.tests)
digits = len(str(numTests))
code = None
try:
opts = self.provider.opts
if opts.debugDoNotTest:
code = None
else:
code = TestRunner.runOneTest(path, command, output, testname,
opts.clang,
useValgrind=opts.useValgrind,
useDGCompat=opts.useDGCompat,
useScript=opts.testScript,
output=open(testresults,'w'))
except KeyboardInterrupt:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print 'Ctrl-C detected, goodbye.'
os.kill(0,9)
self.provider.setResult(index, TestResult(path, code, testresults))
def detectCPUs():
"""
Detects the number of CPUs on a system. Cribbed from pp.
"""
# Linux, Unix and MacOS:
if hasattr(os, "sysconf"):
if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
# Linux & Unix:
ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
if isinstance(ncpus, int) and ncpus > 0:
return ncpus
else: # OSX:
return int(os.popen2("sysctl -n hw.ncpu")[1].read())
# Windows:
if os.environ.has_key("NUMBER_OF_PROCESSORS"):
ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
if ncpus > 0:
return ncpus
return 1 # Default
def main():
global options
from optparse import OptionParser
parser = OptionParser("usage: %prog [options] {inputs}")
parser.add_option("-j", "--threads", dest="numThreads",
help="Number of testing threads",
type=int, action="store",
default=detectCPUs())
parser.add_option("", "--clang", dest="clang",
help="Program to use as \"clang\"",
action="store", default="clang")
parser.add_option("", "--vg", dest="useValgrind",
help="Run tests under valgrind",
action="store_true", default=False)
parser.add_option("", "--dg", dest="useDGCompat",
help="Use llvm dejagnu compatibility mode",
action="store_true", default=False)
parser.add_option("", "--script", dest="testScript",
help="Default script to use",
action="store", default=None)
parser.add_option("-v", "--verbose", dest="showOutput",
help="Show all test output",
action="store_true", default=False)
parser.add_option("-q", "--quiet", dest="quiet",
help="Suppress no error output",
action="store_true", default=False)
parser.add_option("-s", "--succinct", dest="succinct",
help="Reduce amount of output",
action="store_true", default=False)
parser.add_option("", "--max-tests", dest="maxTests",
help="Maximum number of tests to run",
action="store", type=int, default=None)
parser.add_option("", "--max-time", dest="maxTime",
help="Maximum time to spend testing (in seconds)",
action="store", type=float, default=None)
parser.add_option("", "--shuffle", dest="shuffle",
help="Run tests in random order",
action="store_true", default=False)
parser.add_option("", "--seed", dest="seed",
help="Seed for random number generator (default: random).",
action="store", default=None)
parser.add_option("", "--no-progress-bar", dest="useProgressBar",
help="Do not use curses based progress bar",
action="store_false", default=True)
parser.add_option("", "--debug-do-not-test", dest="debugDoNotTest",
help="DEBUG: Skip running actual test script",
action="store_true", default=False)
(opts, args) = parser.parse_args()
if not args:
parser.error('No inputs specified')
allTests = list(getTests(args))
allTests.sort()
tests = allTests
if opts.seed is not None:
try:
seed = int(opts.seed)
except:
parser.error('--seed argument should be an integer')
random.seed(seed)
if opts.shuffle:
random.shuffle(tests)
if opts.maxTests is not None:
tests = tests[:opts.maxTests]
extra = ''
if len(tests) != len(allTests):
extra = ' of %d'%(len(allTests),)
header = '-- Testing: %d%s tests, %d threads --'%(len(tests),extra,opts.numThreads)
progressBar = None
if not opts.quiet:
if opts.useProgressBar:
try:
tc = ProgressBar.TerminalController()
progressBar = ProgressBar.ProgressBar(tc, header)
except ValueError:
pass
if not progressBar:
print header
display = TestingProgressDisplay(opts, len(tests), progressBar)
provider = TestProvider(opts, tests, display)
testers = [Tester(provider) for i in range(opts.numThreads)]
startTime = time.time()
for t in testers:
t.start()
try:
for t in testers:
t.join()
except KeyboardInterrupt:
sys.exit(1)
display.finish()
if not opts.quiet:
print 'Testing Time: %.2fs'%(time.time() - startTime)
xfails = [i for i in provider.results if i and i.code==TestStatus.XFail]
if xfails:
print '*'*20
print 'Expected Failures (%d):' % len(xfails)
for tr in xfails:
print '\t%s'%(tr.path,)
xpasses = [i for i in provider.results if i and i.code==TestStatus.XPass]
if xpasses:
print '*'*20
print 'Unexpected Passing Tests (%d):' % len(xpasses)
for tr in xpasses:
print '\t%s'%(tr.path,)
failures = [i for i in provider.results if i and i.code==TestStatus.Fail]
if failures:
print '*'*20
print 'Failing Tests (%d):' % len(failures)
for tr in failures:
if tr.code != TestStatus.XPass:
print '\t%s'%(tr.path,)
print '\nFailures: %d'%(len(failures),)
assertions = {}
errors = {}
errorFree = []
for tr in failures:
if not tr.errors and not tr.assertions:
errorFree.append(tr)
for (_,_,_,error) in tr.errors:
errors[error] = errors.get(error,0) + 1
for assertion in tr.assertions:
assertions[assertion] = assertions.get(assertion,0) + 1
if errorFree:
print 'Failures w/o Errors (%d):' % len(errorFree)
for tr in errorFree:
print '\t%s'%(tr.path,)
if errors:
print 'Error Summary (%d):' % sum(errors.values())
items = errors.items()
items.sort(key = lambda (_,v): -v)
for i,(error,count) in enumerate(items):
print '\t%3d: %s'%(count,error)
if i>100:
print '\t\t(too many errors, skipping)'
break
if assertions:
print 'Assertion Summary (%d):' % sum(assertions.values())
items = assertions.items()
items.sort(key = lambda (_,v): -v)
for assertion,count in items:
print '\t%3d: %s'%(count,assertion)
if __name__=='__main__':
main()