| #!/usr/bin/env python | 
 |  | 
 | """ | 
 | This is a generic fuzz testing tool, see --help for more information. | 
 | """ | 
 |  | 
 | import os | 
 | import sys | 
 | import random | 
 | import subprocess | 
 | import itertools | 
 |  | 
 | class TestGenerator: | 
 |     def __init__(self, inputs, delete, insert, replace, | 
 |                  insert_strings, pick_input): | 
 |         self.inputs = [(s, open(s).read()) for s in inputs] | 
 |  | 
 |         self.delete = bool(delete) | 
 |         self.insert = bool(insert) | 
 |         self.replace = bool(replace) | 
 |         self.pick_input = bool(pick_input) | 
 |         self.insert_strings = list(insert_strings) | 
 |  | 
 |         self.num_positions = sum([len(d) for _,d in self.inputs]) | 
 |         self.num_insert_strings = len(insert_strings) | 
 |         self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) | 
 |                           * self.num_positions) | 
 |         self.num_tests += 1 | 
 |  | 
 |         if self.pick_input: | 
 |             self.num_tests *= self.num_positions | 
 |  | 
 |     def position_to_source_index(self, position): | 
 |         for i,(s,d) in enumerate(self.inputs): | 
 |             n = len(d) | 
 |             if position < n: | 
 |                 return (i,position) | 
 |             position -= n | 
 |         raise ValueError,'Invalid position.' | 
 |  | 
 |     def get_test(self, index): | 
 |         assert 0 <= index < self.num_tests | 
 |  | 
 |         picked_position = None | 
 |         if self.pick_input: | 
 |             index,picked_position = divmod(index, self.num_positions) | 
 |             picked_position = self.position_to_source_index(picked_position) | 
 |  | 
 |         if index == 0: | 
 |             return ('nothing', None, None, picked_position) | 
 |  | 
 |         index -= 1 | 
 |         index,position = divmod(index, self.num_positions) | 
 |         position = self.position_to_source_index(position) | 
 |         if self.delete: | 
 |             if index == 0: | 
 |                 return ('delete', position, None, picked_position) | 
 |             index -= 1 | 
 |  | 
 |         index,insert_index = divmod(index, self.num_insert_strings) | 
 |         insert_str = self.insert_strings[insert_index] | 
 |         if self.insert: | 
 |             if index == 0: | 
 |                 return ('insert', position, insert_str, picked_position) | 
 |             index -= 1 | 
 |  | 
 |         assert self.replace | 
 |         assert index == 0 | 
 |         return ('replace', position, insert_str, picked_position) | 
 |  | 
 | class TestApplication: | 
 |     def __init__(self, tg, test): | 
 |         self.tg = tg | 
 |         self.test = test | 
 |  | 
 |     def apply(self): | 
 |         if self.test[0] == 'nothing': | 
 |             pass | 
 |         else: | 
 |             i,j = self.test[1] | 
 |             name,data = self.tg.inputs[i] | 
 |             if self.test[0] == 'delete': | 
 |                 data = data[:j] + data[j+1:] | 
 |             elif self.test[0] == 'insert': | 
 |                 data = data[:j] + self.test[2] + data[j:] | 
 |             elif self.test[0] == 'replace': | 
 |                 data = data[:j] + self.test[2] + data[j+1:] | 
 |             else: | 
 |                 raise ValueError,'Invalid test %r' % self.test | 
 |             open(name,'wb').write(data) | 
 |  | 
 |     def revert(self): | 
 |         if self.test[0] != 'nothing': | 
 |             i,j = self.test[1] | 
 |             name,data = self.tg.inputs[i] | 
 |             open(name,'wb').write(data) | 
 |  | 
 | def quote(str): | 
 |     return '"' + str + '"' | 
 |          | 
 | def run_one_test(test_application, index, input_files, args): | 
 |     test = test_application.test | 
 |  | 
 |     # Interpolate arguments. | 
 |     options = { 'index' : index, | 
 |                 'inputs' : ' '.join(quote(f) for f in input_files) } | 
 |  | 
 |     # Add picked input interpolation arguments, if used. | 
 |     if test[3] is not None: | 
 |         pos = test[3][1] | 
 |         options['picked_input'] = input_files[test[3][0]] | 
 |         options['picked_input_pos'] = pos | 
 |         # Compute the line and column. | 
 |         file_data = test_application.tg.inputs[test[3][0]][1] | 
 |         line = column = 1 | 
 |         for i in range(pos): | 
 |             c = file_data[i] | 
 |             if c == '\n': | 
 |                 line += 1 | 
 |                 column = 1 | 
 |             else: | 
 |                 column += 1 | 
 |         options['picked_input_line'] = line | 
 |         options['picked_input_col'] = column | 
 |          | 
 |     test_args = [a % options for a in args] | 
 |     if opts.verbose: | 
 |         print '%s: note: executing %r' % (sys.argv[0], test_args) | 
 |  | 
 |     stdout = None | 
 |     stderr = None | 
 |     if opts.log_dir: | 
 |         stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) | 
 |         stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) | 
 |         stdout = open(stdout_log_path, 'wb') | 
 |         stderr = open(stderr_log_path, 'wb') | 
 |     else: | 
 |         sys.stdout.flush() | 
 |     p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) | 
 |     p.communicate() | 
 |     exit_code = p.wait() | 
 |  | 
 |     test_result = (exit_code == opts.expected_exit_code or | 
 |                    exit_code in opts.extra_exit_codes) | 
 |  | 
 |     if stdout is not None: | 
 |         stdout.close() | 
 |         stderr.close() | 
 |  | 
 |         # Remove the logs for passes, unless logging all results. | 
 |         if not opts.log_all and test_result: | 
 |             os.remove(stdout_log_path) | 
 |             os.remove(stderr_log_path) | 
 |  | 
 |     if not test_result: | 
 |         print 'FAIL: %d' % index | 
 |     elif not opts.succinct: | 
 |         print 'PASS: %d' % index | 
 |     return test_result | 
 |  | 
 | def main(): | 
 |     global opts | 
 |     from optparse import OptionParser, OptionGroup | 
 |     parser = OptionParser("""%prog [options] ... test command args ... | 
 |  | 
 | %prog is a tool for fuzzing inputs and testing them. | 
 |  | 
 | The most basic usage is something like: | 
 |  | 
 |   $ %prog --file foo.txt ./test.sh | 
 |  | 
 | which will run a default list of fuzzing strategies on the input. For each | 
 | fuzzed input, it will overwrite the input files (in place), run the test script, | 
 | then restore the files back to their original contents. | 
 |  | 
 | NOTE: You should make sure you have a backup copy of your inputs, in case | 
 | something goes wrong!!! | 
 |  | 
 | You can cause the fuzzing to not restore the original files with | 
 | '--no-revert'. Generally this is used with '--test <index>' to run one failing | 
 | test and then leave the fuzzed inputs in place to examine the failure. | 
 |  | 
 | For each fuzzed input, %prog will run the test command given on the command | 
 | line. Each argument in the command is subject to string interpolation before | 
 | being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard | 
 | printf format, and VARIABLE is one of: | 
 |  | 
 |   'index' - the test index being run | 
 |   'inputs' - the full list of test inputs | 
 |   'picked_input'      - (with --pick-input) the selected input file | 
 |   'picked_input_pos'  - (with --pick-input) the selected input position | 
 |   'picked_input_line' - (with --pick-input) the selected input line | 
 |   'picked_input_col'  - (with --pick-input) the selected input column | 
 |  | 
 | By default, the script will run forever continually picking new tests to | 
 | run. You can limit the number of tests that are run with '--max-tests <number>', | 
 | and you can run a particular test with '--test <index>'. | 
 |  | 
 | You can specify '--stop-on-fail' to stop the script on the first failure | 
 | without reverting the changes. | 
 |  | 
 | """) | 
 |     parser.add_option("-v", "--verbose", help="Show more output", | 
 |                       action='store_true', dest="verbose", default=False) | 
 |     parser.add_option("-s", "--succinct",  help="Reduce amount of output", | 
 |                       action="store_true", dest="succinct", default=False) | 
 |  | 
 |     group = OptionGroup(parser, "Test Execution") | 
 |     group.add_option("", "--expected-exit-code", help="Set expected exit code", | 
 |                      type=int, dest="expected_exit_code", | 
 |                      default=0) | 
 |     group.add_option("", "--extra-exit-code", | 
 |                      help="Set additional expected exit code", | 
 |                      type=int, action="append", dest="extra_exit_codes", | 
 |                      default=[]) | 
 |     group.add_option("", "--log-dir", | 
 |                      help="Capture test logs to an output directory", | 
 |                      type=str, dest="log_dir", | 
 |                      default=None) | 
 |     group.add_option("", "--log-all", | 
 |                      help="Log all outputs (not just failures)", | 
 |                      action="store_true", dest="log_all", default=False) | 
 |     parser.add_option_group(group) | 
 |  | 
 |     group = OptionGroup(parser, "Input Files") | 
 |     group.add_option("", "--file", metavar="PATH", | 
 |                      help="Add an input file to fuzz", | 
 |                      type=str, action="append", dest="input_files", default=[]) | 
 |     group.add_option("", "--filelist", metavar="LIST", | 
 |                      help="Add a list of inputs files to fuzz (one per line)", | 
 |                      type=str, action="append", dest="filelists", default=[]) | 
 |     parser.add_option_group(group) | 
 |  | 
 |     group = OptionGroup(parser, "Fuzz Options") | 
 |     group.add_option("", "--replacement-chars", dest="replacement_chars", | 
 |                      help="Characters to insert/replace", | 
 |                      default="0{}[]<>\;@#$^%& ") | 
 |     group.add_option("", "--replacement-string", dest="replacement_strings", | 
 |                      action="append", help="Add a replacement string to use", | 
 |                      default=[]) | 
 |     group.add_option("", "--replacement-list", dest="replacement_lists", | 
 |                      help="Add a list of replacement strings (one per line)", | 
 |                      action="append", default=[]) | 
 |     group.add_option("", "--no-delete", help="Don't delete characters", | 
 |                      action='store_false', dest="enable_delete", default=True) | 
 |     group.add_option("", "--no-insert", help="Don't insert strings", | 
 |                      action='store_false', dest="enable_insert", default=True) | 
 |     group.add_option("", "--no-replace", help="Don't replace strings", | 
 |                      action='store_false', dest="enable_replace", default=True) | 
 |     group.add_option("", "--no-revert", help="Don't revert changes", | 
 |                      action='store_false', dest="revert", default=True) | 
 |     group.add_option("", "--stop-on-fail", help="Stop on first failure", | 
 |                      action='store_true', dest="stop_on_fail", default=False) | 
 |     parser.add_option_group(group) | 
 |  | 
 |     group = OptionGroup(parser, "Test Selection") | 
 |     group.add_option("", "--test", help="Run a particular test", | 
 |                      type=int, dest="test", default=None, metavar="INDEX") | 
 |     group.add_option("", "--max-tests", help="Maximum number of tests", | 
 |                      type=int, dest="max_tests", default=None, metavar="COUNT") | 
 |     group.add_option("", "--pick-input", | 
 |                      help="Randomly select an input byte as well as fuzzing", | 
 |                      action='store_true', dest="pick_input", default=False) | 
 |     parser.add_option_group(group) | 
 |  | 
 |     parser.disable_interspersed_args() | 
 |  | 
 |     (opts, args) = parser.parse_args() | 
 |  | 
 |     if not args: | 
 |         parser.error("Invalid number of arguments") | 
 |  | 
 |     # Collect the list of inputs. | 
 |     input_files = list(opts.input_files) | 
 |     for filelist in opts.filelists: | 
 |         f = open(filelist) | 
 |         try: | 
 |             for ln in f: | 
 |                 ln = ln.strip() | 
 |                 if ln: | 
 |                     input_files.append(ln) | 
 |         finally: | 
 |             f.close() | 
 |     input_files.sort() | 
 |  | 
 |     if not input_files: | 
 |         parser.error("No input files!") | 
 |  | 
 |     print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) | 
 |  | 
 |     # Make sure the log directory exists if used. | 
 |     if opts.log_dir: | 
 |         if not os.path.exists(opts.log_dir): | 
 |             try: | 
 |                 os.mkdir(opts.log_dir) | 
 |             except OSError: | 
 |                 print "%s: error: log directory couldn't be created!" % ( | 
 |                     sys.argv[0],) | 
 |                 raise SystemExit,1 | 
 |  | 
 |     # Get the list if insert/replacement strings. | 
 |     replacements = list(opts.replacement_chars) | 
 |     replacements.extend(opts.replacement_strings) | 
 |     for replacement_list in opts.replacement_lists: | 
 |         f = open(replacement_list) | 
 |         try: | 
 |             for ln in f: | 
 |                 ln = ln[:-1] | 
 |                 if ln: | 
 |                     replacements.append(ln) | 
 |         finally: | 
 |             f.close() | 
 |  | 
 |     # Unique and order the replacement list. | 
 |     replacements = list(set(replacements)) | 
 |     replacements.sort() | 
 |  | 
 |     # Create the test generator. | 
 |     tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, | 
 |                        opts.enable_replace, replacements, opts.pick_input) | 
 |  | 
 |     print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) | 
 |     print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) | 
 |     if opts.test is not None: | 
 |         it = [opts.test] | 
 |     elif opts.max_tests is not None: | 
 |         it = itertools.imap(random.randrange, | 
 |                             itertools.repeat(tg.num_tests, opts.max_tests)) | 
 |     else: | 
 |         it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) | 
 |     for test in it: | 
 |         t = tg.get_test(test) | 
 |  | 
 |         if opts.verbose: | 
 |             print '%s: note: running test %d: %r' % (sys.argv[0], test, t) | 
 |         ta = TestApplication(tg, t) | 
 |         try: | 
 |             ta.apply() | 
 |             test_result = run_one_test(ta, test, input_files, args) | 
 |             if not test_result and opts.stop_on_fail: | 
 |                 opts.revert = False | 
 |                 sys.exit(1) | 
 |         finally: | 
 |             if opts.revert: | 
 |                 ta.revert() | 
 |  | 
 |         sys.stdout.flush() | 
 |  | 
 | if __name__ == '__main__': | 
 |     main() |