|  | #!/usr/bin/env python | 
|  |  | 
|  | """ | 
|  | This is a generic fuzz testing tool, see --help for more information. | 
|  | """ | 
|  |  | 
|  | import os | 
|  | import sys | 
|  | import random | 
|  | import subprocess | 
|  | import itertools | 
|  |  | 
|  | class TestGenerator: | 
|  | def __init__(self, inputs, delete, insert, replace, | 
|  | insert_strings, pick_input): | 
|  | self.inputs = [(s, open(s).read()) for s in inputs] | 
|  |  | 
|  | self.delete = bool(delete) | 
|  | self.insert = bool(insert) | 
|  | self.replace = bool(replace) | 
|  | self.pick_input = bool(pick_input) | 
|  | self.insert_strings = list(insert_strings) | 
|  |  | 
|  | self.num_positions = sum([len(d) for _,d in self.inputs]) | 
|  | self.num_insert_strings = len(insert_strings) | 
|  | self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) | 
|  | * self.num_positions) | 
|  | self.num_tests += 1 | 
|  |  | 
|  | if self.pick_input: | 
|  | self.num_tests *= self.num_positions | 
|  |  | 
|  | def position_to_source_index(self, position): | 
|  | for i,(s,d) in enumerate(self.inputs): | 
|  | n = len(d) | 
|  | if position < n: | 
|  | return (i,position) | 
|  | position -= n | 
|  | raise ValueError,'Invalid position.' | 
|  |  | 
|  | def get_test(self, index): | 
|  | assert 0 <= index < self.num_tests | 
|  |  | 
|  | picked_position = None | 
|  | if self.pick_input: | 
|  | index,picked_position = divmod(index, self.num_positions) | 
|  | picked_position = self.position_to_source_index(picked_position) | 
|  |  | 
|  | if index == 0: | 
|  | return ('nothing', None, None, picked_position) | 
|  |  | 
|  | index -= 1 | 
|  | index,position = divmod(index, self.num_positions) | 
|  | position = self.position_to_source_index(position) | 
|  | if self.delete: | 
|  | if index == 0: | 
|  | return ('delete', position, None, picked_position) | 
|  | index -= 1 | 
|  |  | 
|  | index,insert_index = divmod(index, self.num_insert_strings) | 
|  | insert_str = self.insert_strings[insert_index] | 
|  | if self.insert: | 
|  | if index == 0: | 
|  | return ('insert', position, insert_str, picked_position) | 
|  | index -= 1 | 
|  |  | 
|  | assert self.replace | 
|  | assert index == 0 | 
|  | return ('replace', position, insert_str, picked_position) | 
|  |  | 
|  | class TestApplication: | 
|  | def __init__(self, tg, test): | 
|  | self.tg = tg | 
|  | self.test = test | 
|  |  | 
|  | def apply(self): | 
|  | if self.test[0] == 'nothing': | 
|  | pass | 
|  | else: | 
|  | i,j = self.test[1] | 
|  | name,data = self.tg.inputs[i] | 
|  | if self.test[0] == 'delete': | 
|  | data = data[:j] + data[j+1:] | 
|  | elif self.test[0] == 'insert': | 
|  | data = data[:j] + self.test[2] + data[j:] | 
|  | elif self.test[0] == 'replace': | 
|  | data = data[:j] + self.test[2] + data[j+1:] | 
|  | else: | 
|  | raise ValueError,'Invalid test %r' % self.test | 
|  | open(name,'wb').write(data) | 
|  |  | 
|  | def revert(self): | 
|  | if self.test[0] != 'nothing': | 
|  | i,j = self.test[1] | 
|  | name,data = self.tg.inputs[i] | 
|  | open(name,'wb').write(data) | 
|  |  | 
|  | def quote(str): | 
|  | return '"' + str + '"' | 
|  |  | 
|  | def run_one_test(test_application, index, input_files, args): | 
|  | test = test_application.test | 
|  |  | 
|  | # Interpolate arguments. | 
|  | options = { 'index' : index, | 
|  | 'inputs' : ' '.join(quote(f) for f in input_files) } | 
|  |  | 
|  | # Add picked input interpolation arguments, if used. | 
|  | if test[3] is not None: | 
|  | pos = test[3][1] | 
|  | options['picked_input'] = input_files[test[3][0]] | 
|  | options['picked_input_pos'] = pos | 
|  | # Compute the line and column. | 
|  | file_data = test_application.tg.inputs[test[3][0]][1] | 
|  | line = column = 1 | 
|  | for i in range(pos): | 
|  | c = file_data[i] | 
|  | if c == '\n': | 
|  | line += 1 | 
|  | column = 1 | 
|  | else: | 
|  | column += 1 | 
|  | options['picked_input_line'] = line | 
|  | options['picked_input_col'] = column | 
|  |  | 
|  | test_args = [a % options for a in args] | 
|  | if opts.verbose: | 
|  | print '%s: note: executing %r' % (sys.argv[0], test_args) | 
|  |  | 
|  | stdout = None | 
|  | stderr = None | 
|  | if opts.log_dir: | 
|  | stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) | 
|  | stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) | 
|  | stdout = open(stdout_log_path, 'wb') | 
|  | stderr = open(stderr_log_path, 'wb') | 
|  | else: | 
|  | sys.stdout.flush() | 
|  | p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) | 
|  | p.communicate() | 
|  | exit_code = p.wait() | 
|  |  | 
|  | test_result = (exit_code == opts.expected_exit_code or | 
|  | exit_code in opts.extra_exit_codes) | 
|  |  | 
|  | if stdout is not None: | 
|  | stdout.close() | 
|  | stderr.close() | 
|  |  | 
|  | # Remove the logs for passes, unless logging all results. | 
|  | if not opts.log_all and test_result: | 
|  | os.remove(stdout_log_path) | 
|  | os.remove(stderr_log_path) | 
|  |  | 
|  | if not test_result: | 
|  | print 'FAIL: %d' % index | 
|  | elif not opts.succinct: | 
|  | print 'PASS: %d' % index | 
|  | return test_result | 
|  |  | 
|  | def main(): | 
|  | global opts | 
|  | from optparse import OptionParser, OptionGroup | 
|  | parser = OptionParser("""%prog [options] ... test command args ... | 
|  |  | 
|  | %prog is a tool for fuzzing inputs and testing them. | 
|  |  | 
|  | The most basic usage is something like: | 
|  |  | 
|  | $ %prog --file foo.txt ./test.sh | 
|  |  | 
|  | which will run a default list of fuzzing strategies on the input. For each | 
|  | fuzzed input, it will overwrite the input files (in place), run the test script, | 
|  | then restore the files back to their original contents. | 
|  |  | 
|  | NOTE: You should make sure you have a backup copy of your inputs, in case | 
|  | something goes wrong!!! | 
|  |  | 
|  | You can cause the fuzzing to not restore the original files with | 
|  | '--no-revert'. Generally this is used with '--test <index>' to run one failing | 
|  | test and then leave the fuzzed inputs in place to examine the failure. | 
|  |  | 
|  | For each fuzzed input, %prog will run the test command given on the command | 
|  | line. Each argument in the command is subject to string interpolation before | 
|  | being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard | 
|  | printf format, and VARIABLE is one of: | 
|  |  | 
|  | 'index' - the test index being run | 
|  | 'inputs' - the full list of test inputs | 
|  | 'picked_input'      - (with --pick-input) the selected input file | 
|  | 'picked_input_pos'  - (with --pick-input) the selected input position | 
|  | 'picked_input_line' - (with --pick-input) the selected input line | 
|  | 'picked_input_col'  - (with --pick-input) the selected input column | 
|  |  | 
|  | By default, the script will run forever continually picking new tests to | 
|  | run. You can limit the number of tests that are run with '--max-tests <number>', | 
|  | and you can run a particular test with '--test <index>'. | 
|  |  | 
|  | You can specify '--stop-on-fail' to stop the script on the first failure | 
|  | without reverting the changes. | 
|  |  | 
|  | """) | 
|  | parser.add_option("-v", "--verbose", help="Show more output", | 
|  | action='store_true', dest="verbose", default=False) | 
|  | parser.add_option("-s", "--succinct",  help="Reduce amount of output", | 
|  | action="store_true", dest="succinct", default=False) | 
|  |  | 
|  | group = OptionGroup(parser, "Test Execution") | 
|  | group.add_option("", "--expected-exit-code", help="Set expected exit code", | 
|  | type=int, dest="expected_exit_code", | 
|  | default=0) | 
|  | group.add_option("", "--extra-exit-code", | 
|  | help="Set additional expected exit code", | 
|  | type=int, action="append", dest="extra_exit_codes", | 
|  | default=[]) | 
|  | group.add_option("", "--log-dir", | 
|  | help="Capture test logs to an output directory", | 
|  | type=str, dest="log_dir", | 
|  | default=None) | 
|  | group.add_option("", "--log-all", | 
|  | help="Log all outputs (not just failures)", | 
|  | action="store_true", dest="log_all", default=False) | 
|  | parser.add_option_group(group) | 
|  |  | 
|  | group = OptionGroup(parser, "Input Files") | 
|  | group.add_option("", "--file", metavar="PATH", | 
|  | help="Add an input file to fuzz", | 
|  | type=str, action="append", dest="input_files", default=[]) | 
|  | group.add_option("", "--filelist", metavar="LIST", | 
|  | help="Add a list of inputs files to fuzz (one per line)", | 
|  | type=str, action="append", dest="filelists", default=[]) | 
|  | parser.add_option_group(group) | 
|  |  | 
|  | group = OptionGroup(parser, "Fuzz Options") | 
|  | group.add_option("", "--replacement-chars", dest="replacement_chars", | 
|  | help="Characters to insert/replace", | 
|  | default="0{}[]<>\;@#$^%& ") | 
|  | group.add_option("", "--replacement-string", dest="replacement_strings", | 
|  | action="append", help="Add a replacement string to use", | 
|  | default=[]) | 
|  | group.add_option("", "--replacement-list", dest="replacement_lists", | 
|  | help="Add a list of replacement strings (one per line)", | 
|  | action="append", default=[]) | 
|  | group.add_option("", "--no-delete", help="Don't delete characters", | 
|  | action='store_false', dest="enable_delete", default=True) | 
|  | group.add_option("", "--no-insert", help="Don't insert strings", | 
|  | action='store_false', dest="enable_insert", default=True) | 
|  | group.add_option("", "--no-replace", help="Don't replace strings", | 
|  | action='store_false', dest="enable_replace", default=True) | 
|  | group.add_option("", "--no-revert", help="Don't revert changes", | 
|  | action='store_false', dest="revert", default=True) | 
|  | group.add_option("", "--stop-on-fail", help="Stop on first failure", | 
|  | action='store_true', dest="stop_on_fail", default=False) | 
|  | parser.add_option_group(group) | 
|  |  | 
|  | group = OptionGroup(parser, "Test Selection") | 
|  | group.add_option("", "--test", help="Run a particular test", | 
|  | type=int, dest="test", default=None, metavar="INDEX") | 
|  | group.add_option("", "--max-tests", help="Maximum number of tests", | 
|  | type=int, dest="max_tests", default=None, metavar="COUNT") | 
|  | group.add_option("", "--pick-input", | 
|  | help="Randomly select an input byte as well as fuzzing", | 
|  | action='store_true', dest="pick_input", default=False) | 
|  | parser.add_option_group(group) | 
|  |  | 
|  | parser.disable_interspersed_args() | 
|  |  | 
|  | (opts, args) = parser.parse_args() | 
|  |  | 
|  | if not args: | 
|  | parser.error("Invalid number of arguments") | 
|  |  | 
|  | # Collect the list of inputs. | 
|  | input_files = list(opts.input_files) | 
|  | for filelist in opts.filelists: | 
|  | f = open(filelist) | 
|  | try: | 
|  | for ln in f: | 
|  | ln = ln.strip() | 
|  | if ln: | 
|  | input_files.append(ln) | 
|  | finally: | 
|  | f.close() | 
|  | input_files.sort() | 
|  |  | 
|  | if not input_files: | 
|  | parser.error("No input files!") | 
|  |  | 
|  | print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) | 
|  |  | 
|  | # Make sure the log directory exists if used. | 
|  | if opts.log_dir: | 
|  | if not os.path.exists(opts.log_dir): | 
|  | try: | 
|  | os.mkdir(opts.log_dir) | 
|  | except OSError: | 
|  | print "%s: error: log directory couldn't be created!" % ( | 
|  | sys.argv[0],) | 
|  | raise SystemExit,1 | 
|  |  | 
|  | # Get the list if insert/replacement strings. | 
|  | replacements = list(opts.replacement_chars) | 
|  | replacements.extend(opts.replacement_strings) | 
|  | for replacement_list in opts.replacement_lists: | 
|  | f = open(replacement_list) | 
|  | try: | 
|  | for ln in f: | 
|  | ln = ln[:-1] | 
|  | if ln: | 
|  | replacements.append(ln) | 
|  | finally: | 
|  | f.close() | 
|  |  | 
|  | # Unique and order the replacement list. | 
|  | replacements = list(set(replacements)) | 
|  | replacements.sort() | 
|  |  | 
|  | # Create the test generator. | 
|  | tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, | 
|  | opts.enable_replace, replacements, opts.pick_input) | 
|  |  | 
|  | print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) | 
|  | print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) | 
|  | if opts.test is not None: | 
|  | it = [opts.test] | 
|  | elif opts.max_tests is not None: | 
|  | it = itertools.imap(random.randrange, | 
|  | itertools.repeat(tg.num_tests, opts.max_tests)) | 
|  | else: | 
|  | it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) | 
|  | for test in it: | 
|  | t = tg.get_test(test) | 
|  |  | 
|  | if opts.verbose: | 
|  | print '%s: note: running test %d: %r' % (sys.argv[0], test, t) | 
|  | ta = TestApplication(tg, t) | 
|  | try: | 
|  | ta.apply() | 
|  | test_result = run_one_test(ta, test, input_files, args) | 
|  | if not test_result and opts.stop_on_fail: | 
|  | opts.revert = False | 
|  | sys.exit(1) | 
|  | finally: | 
|  | if opts.revert: | 
|  | ta.revert() | 
|  |  | 
|  | sys.stdout.flush() | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | main() |