| #!/usr/bin/env python |
| |
| """ |
| This is a generic fuzz testing tool, see --help for more information. |
| """ |
| |
| import os |
| import sys |
| import random |
| import subprocess |
| import itertools |
| |
| class TestGenerator: |
| def __init__(self, inputs, delete, insert, replace, |
| insert_strings, pick_input): |
| self.inputs = [(s, open(s).read()) for s in inputs] |
| |
| self.delete = bool(delete) |
| self.insert = bool(insert) |
| self.replace = bool(replace) |
| self.pick_input = bool(pick_input) |
| self.insert_strings = list(insert_strings) |
| |
| self.num_positions = sum([len(d) for _,d in self.inputs]) |
| self.num_insert_strings = len(insert_strings) |
| self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) |
| * self.num_positions) |
| self.num_tests += 1 |
| |
| if self.pick_input: |
| self.num_tests *= self.num_positions |
| |
| def position_to_source_index(self, position): |
| for i,(s,d) in enumerate(self.inputs): |
| n = len(d) |
| if position < n: |
| return (i,position) |
| position -= n |
| raise ValueError,'Invalid position.' |
| |
| def get_test(self, index): |
| assert 0 <= index < self.num_tests |
| |
| picked_position = None |
| if self.pick_input: |
| index,picked_position = divmod(index, self.num_positions) |
| picked_position = self.position_to_source_index(picked_position) |
| |
| if index == 0: |
| return ('nothing', None, None, picked_position) |
| |
| index -= 1 |
| index,position = divmod(index, self.num_positions) |
| position = self.position_to_source_index(position) |
| if self.delete: |
| if index == 0: |
| return ('delete', position, None, picked_position) |
| index -= 1 |
| |
| index,insert_index = divmod(index, self.num_insert_strings) |
| insert_str = self.insert_strings[insert_index] |
| if self.insert: |
| if index == 0: |
| return ('insert', position, insert_str, picked_position) |
| index -= 1 |
| |
| assert self.replace |
| assert index == 0 |
| return ('replace', position, insert_str, picked_position) |
| |
| class TestApplication: |
| def __init__(self, tg, test): |
| self.tg = tg |
| self.test = test |
| |
| def apply(self): |
| if self.test[0] == 'nothing': |
| pass |
| else: |
| i,j = self.test[1] |
| name,data = self.tg.inputs[i] |
| if self.test[0] == 'delete': |
| data = data[:j] + data[j+1:] |
| elif self.test[0] == 'insert': |
| data = data[:j] + self.test[2] + data[j:] |
| elif self.test[0] == 'replace': |
| data = data[:j] + self.test[2] + data[j+1:] |
| else: |
| raise ValueError,'Invalid test %r' % self.test |
| open(name,'wb').write(data) |
| |
| def revert(self): |
| if self.test[0] != 'nothing': |
| i,j = self.test[1] |
| name,data = self.tg.inputs[i] |
| open(name,'wb').write(data) |
| |
| def quote(str): |
| return '"' + str + '"' |
| |
| def run_one_test(test_application, index, input_files, args): |
| test = test_application.test |
| |
| # Interpolate arguments. |
| options = { 'index' : index, |
| 'inputs' : ' '.join(quote(f) for f in input_files) } |
| |
| # Add picked input interpolation arguments, if used. |
| if test[3] is not None: |
| pos = test[3][1] |
| options['picked_input'] = input_files[test[3][0]] |
| options['picked_input_pos'] = pos |
| # Compute the line and column. |
| file_data = test_application.tg.inputs[test[3][0]][1] |
| line = column = 1 |
| for i in range(pos): |
| c = file_data[i] |
| if c == '\n': |
| line += 1 |
| column = 1 |
| else: |
| column += 1 |
| options['picked_input_line'] = line |
| options['picked_input_col'] = column |
| |
| test_args = [a % options for a in args] |
| if opts.verbose: |
| print '%s: note: executing %r' % (sys.argv[0], test_args) |
| |
| stdout = None |
| stderr = None |
| if opts.log_dir: |
| stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) |
| stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) |
| stdout = open(stdout_log_path, 'wb') |
| stderr = open(stderr_log_path, 'wb') |
| else: |
| sys.stdout.flush() |
| p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) |
| p.communicate() |
| exit_code = p.wait() |
| |
| test_result = (exit_code == opts.expected_exit_code or |
| exit_code in opts.extra_exit_codes) |
| |
| if stdout is not None: |
| stdout.close() |
| stderr.close() |
| |
| # Remove the logs for passes, unless logging all results. |
| if not opts.log_all and test_result: |
| os.remove(stdout_log_path) |
| os.remove(stderr_log_path) |
| |
| if not test_result: |
| print 'FAIL: %d' % index |
| elif not opts.succinct: |
| print 'PASS: %d' % index |
| |
| def main(): |
| global opts |
| from optparse import OptionParser, OptionGroup |
| parser = OptionParser("""%prog [options] ... test command args ... |
| |
| %prog is a tool for fuzzing inputs and testing them. |
| |
| The most basic usage is something like: |
| |
| $ %prog --file foo.txt ./test.sh |
| |
| which will run a default list of fuzzing strategies on the input. For each |
| fuzzed input, it will overwrite the input files (in place), run the test script, |
| then restore the files back to their original contents. |
| |
| NOTE: You should make sure you have a backup copy of your inputs, in case |
| something goes wrong!!! |
| |
| You can cause the fuzzing to not restore the original files with |
| '--no-revert'. Generally this is used with '--test <index>' to run one failing |
| test and then leave the fuzzed inputs in place to examine the failure. |
| |
| For each fuzzed input, %prog will run the test command given on the command |
| line. Each argument in the command is subject to string interpolation before |
| being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard |
| printf format, and VARIABLE is one of: |
| |
| 'index' - the test index being run |
| 'inputs' - the full list of test inputs |
| 'picked_input' - (with --pick-input) the selected input file |
| 'picked_input_pos' - (with --pick-input) the selected input position |
| 'picked_input_line' - (with --pick-input) the selected input line |
| 'picked_input_col' - (with --pick-input) the selected input column |
| |
| By default, the script will run forever continually picking new tests to |
| run. You can limit the number of tests that are run with '--max-tests <number>', |
| and you can run a particular test with '--test <index>'. |
| """) |
| parser.add_option("-v", "--verbose", help="Show more output", |
| action='store_true', dest="verbose", default=False) |
| parser.add_option("-s", "--succinct", help="Reduce amount of output", |
| action="store_true", dest="succinct", default=False) |
| |
| group = OptionGroup(parser, "Test Execution") |
| group.add_option("", "--expected-exit-code", help="Set expected exit code", |
| type=int, dest="expected_exit_code", |
| default=0) |
| group.add_option("", "--extra-exit-code", |
| help="Set additional expected exit code", |
| type=int, action="append", dest="extra_exit_codes", |
| default=[]) |
| group.add_option("", "--log-dir", |
| help="Capture test logs to an output directory", |
| type=str, dest="log_dir", |
| default=None) |
| group.add_option("", "--log-all", |
| help="Log all outputs (not just failures)", |
| action="store_true", dest="log_all", default=False) |
| parser.add_option_group(group) |
| |
| group = OptionGroup(parser, "Input Files") |
| group.add_option("", "--file", metavar="PATH", |
| help="Add an input file to fuzz", |
| type=str, action="append", dest="input_files", default=[]) |
| group.add_option("", "--filelist", metavar="LIST", |
| help="Add a list of inputs files to fuzz (one per line)", |
| type=str, action="append", dest="filelists", default=[]) |
| parser.add_option_group(group) |
| |
| group = OptionGroup(parser, "Fuzz Options") |
| group.add_option("", "--replacement-chars", dest="replacement_chars", |
| help="Characters to insert/replace", |
| default="0{}[]<>\;@#$^%& ") |
| group.add_option("", "--replacement-string", dest="replacement_strings", |
| action="append", help="Add a replacement string to use", |
| default=[]) |
| group.add_option("", "--replacement-list", dest="replacement_lists", |
| help="Add a list of replacement strings (one per line)", |
| action="append", default=[]) |
| group.add_option("", "--no-delete", help="Don't delete characters", |
| action='store_false', dest="enable_delete", default=True) |
| group.add_option("", "--no-insert", help="Don't insert strings", |
| action='store_false', dest="enable_insert", default=True) |
| group.add_option("", "--no-replace", help="Don't replace strings", |
| action='store_false', dest="enable_replace", default=True) |
| group.add_option("", "--no-revert", help="Don't revert changes", |
| action='store_false', dest="revert", default=True) |
| parser.add_option_group(group) |
| |
| group = OptionGroup(parser, "Test Selection") |
| group.add_option("", "--test", help="Run a particular test", |
| type=int, dest="test", default=None, metavar="INDEX") |
| group.add_option("", "--max-tests", help="Maximum number of tests", |
| type=int, dest="max_tests", default=None, metavar="COUNT") |
| group.add_option("", "--pick-input", |
| help="Randomly select an input byte as well as fuzzing", |
| action='store_true', dest="pick_input", default=False) |
| parser.add_option_group(group) |
| |
| parser.disable_interspersed_args() |
| |
| (opts, args) = parser.parse_args() |
| |
| if not args: |
| parser.error("Invalid number of arguments") |
| |
| # Collect the list of inputs. |
| input_files = list(opts.input_files) |
| for filelist in opts.filelists: |
| f = open(filelist) |
| try: |
| for ln in f: |
| ln = ln.strip() |
| if ln: |
| input_files.append(ln) |
| finally: |
| f.close() |
| input_files.sort() |
| |
| if not input_files: |
| parser.error("No input files!") |
| |
| print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) |
| |
| # Make sure the log directory exists if used. |
| if opts.log_dir: |
| if not os.path.exists(opts.log_dir): |
| try: |
| os.mkdir(opts.log_dir) |
| except OSError: |
| print "%s: error: log directory couldn't be created!" % ( |
| sys.argv[0],) |
| raise SystemExit,1 |
| |
| # Get the list if insert/replacement strings. |
| replacements = list(opts.replacement_chars) |
| replacements.extend(opts.replacement_strings) |
| for replacement_list in opts.replacement_lists: |
| f = open(replacement_list) |
| try: |
| for ln in f: |
| ln = ln[:-1] |
| if ln: |
| replacements.append(ln) |
| finally: |
| f.close() |
| |
| # Unique and order the replacement list. |
| replacements = list(set(replacements)) |
| replacements.sort() |
| |
| # Create the test generator. |
| tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, |
| opts.enable_replace, replacements, opts.pick_input) |
| |
| print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) |
| print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) |
| if opts.test is not None: |
| it = [opts.test] |
| elif opts.max_tests is not None: |
| it = itertools.imap(random.randrange, |
| itertools.repeat(tg.num_tests, opts.max_tests)) |
| else: |
| it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) |
| for test in it: |
| t = tg.get_test(test) |
| |
| if opts.verbose: |
| print '%s: note: running test %d: %r' % (sys.argv[0], test, t) |
| ta = TestApplication(tg, t) |
| try: |
| ta.apply() |
| run_one_test(ta, test, input_files, args) |
| finally: |
| if opts.revert: |
| ta.revert() |
| |
| sys.stdout.flush() |
| |
| if __name__ == '__main__': |
| main() |