Daniel Dunbar | 8035880 | 2010-05-30 22:27:52 +0000 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | |
| 3 | """ |
| 4 | This is a generic fuzz testing tool, see --help for more information. |
| 5 | """ |
| 6 | |
| 7 | import os |
| 8 | import sys |
| 9 | import random |
| 10 | import subprocess |
| 11 | import itertools |
| 12 | |
| 13 | class TestGenerator: |
| 14 | def __init__(self, inputs, delete, insert, replace, |
| 15 | insert_strings, pick_input): |
| 16 | self.inputs = [(s, open(s).read()) for s in inputs] |
| 17 | |
| 18 | self.delete = bool(delete) |
| 19 | self.insert = bool(insert) |
| 20 | self.replace = bool(replace) |
| 21 | self.pick_input = bool(pick_input) |
| 22 | self.insert_strings = list(insert_strings) |
| 23 | |
| 24 | self.num_positions = sum([len(d) for _,d in self.inputs]) |
| 25 | self.num_insert_strings = len(insert_strings) |
| 26 | self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) |
| 27 | * self.num_positions) |
| 28 | self.num_tests += 1 |
| 29 | |
| 30 | if self.pick_input: |
| 31 | self.num_tests *= self.num_positions |
| 32 | |
| 33 | def position_to_source_index(self, position): |
| 34 | for i,(s,d) in enumerate(self.inputs): |
| 35 | n = len(d) |
| 36 | if position < n: |
| 37 | return (i,position) |
| 38 | position -= n |
| 39 | raise ValueError,'Invalid position.' |
| 40 | |
| 41 | def get_test(self, index): |
| 42 | assert 0 <= index < self.num_tests |
| 43 | |
| 44 | picked_position = None |
| 45 | if self.pick_input: |
| 46 | index,picked_position = divmod(index, self.num_positions) |
| 47 | picked_position = self.position_to_source_index(picked_position) |
| 48 | |
| 49 | if index == 0: |
| 50 | return ('nothing', None, None, picked_position) |
| 51 | |
| 52 | index -= 1 |
| 53 | index,position = divmod(index, self.num_positions) |
| 54 | position = self.position_to_source_index(position) |
| 55 | if self.delete: |
| 56 | if index == 0: |
| 57 | return ('delete', position, None, picked_position) |
| 58 | index -= 1 |
| 59 | |
| 60 | index,insert_index = divmod(index, self.num_insert_strings) |
| 61 | insert_str = self.insert_strings[insert_index] |
| 62 | if self.insert: |
| 63 | if index == 0: |
| 64 | return ('insert', position, insert_str, picked_position) |
| 65 | index -= 1 |
| 66 | |
| 67 | assert self.replace |
| 68 | assert index == 0 |
| 69 | return ('replace', position, insert_str, picked_position) |
| 70 | |
| 71 | class TestApplication: |
| 72 | def __init__(self, tg, test): |
| 73 | self.tg = tg |
| 74 | self.test = test |
| 75 | |
| 76 | def apply(self): |
| 77 | if self.test[0] == 'nothing': |
| 78 | pass |
| 79 | else: |
| 80 | i,j = self.test[1] |
| 81 | name,data = self.tg.inputs[i] |
| 82 | if self.test[0] == 'delete': |
| 83 | data = data[:j] + data[j+1:] |
| 84 | elif self.test[0] == 'insert': |
| 85 | data = data[:j] + self.test[2] + data[j:] |
| 86 | elif self.test[0] == 'replace': |
| 87 | data = data[:j] + self.test[2] + data[j+1:] |
| 88 | else: |
| 89 | raise ValueError,'Invalid test %r' % self.test |
| 90 | open(name,'wb').write(data) |
| 91 | |
| 92 | def revert(self): |
| 93 | if self.test[0] != 'nothing': |
| 94 | i,j = self.test[1] |
| 95 | name,data = self.tg.inputs[i] |
| 96 | open(name,'wb').write(data) |
| 97 | |
| 98 | def quote(str): |
| 99 | return '"' + str + '"' |
| 100 | |
| 101 | def run_one_test(test_application, index, input_files, args): |
| 102 | test = test_application.test |
| 103 | |
| 104 | # Interpolate arguments. |
| 105 | options = { 'index' : index, |
| 106 | 'inputs' : ' '.join(quote(f) for f in input_files) } |
| 107 | |
| 108 | # Add picked input interpolation arguments, if used. |
| 109 | if test[3] is not None: |
| 110 | pos = test[3][1] |
| 111 | options['picked_input'] = input_files[test[3][0]] |
| 112 | options['picked_input_pos'] = pos |
| 113 | # Compute the line and column. |
| 114 | file_data = test_application.tg.inputs[test[3][0]][1] |
| 115 | line = column = 1 |
| 116 | for i in range(pos): |
| 117 | c = file_data[i] |
| 118 | if c == '\n': |
| 119 | line += 1 |
| 120 | column = 1 |
| 121 | else: |
| 122 | column += 1 |
| 123 | options['picked_input_line'] = line |
| 124 | options['picked_input_col'] = column |
| 125 | |
| 126 | test_args = [a % options for a in args] |
| 127 | if opts.verbose: |
| 128 | print '%s: note: executing %r' % (sys.argv[0], test_args) |
| 129 | |
| 130 | stdout = None |
| 131 | stderr = None |
| 132 | if opts.log_dir: |
| 133 | stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) |
| 134 | stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) |
| 135 | stdout = open(stdout_log_path, 'wb') |
| 136 | stderr = open(stderr_log_path, 'wb') |
| 137 | else: |
| 138 | sys.stdout.flush() |
| 139 | p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) |
| 140 | p.communicate() |
| 141 | exit_code = p.wait() |
| 142 | |
| 143 | test_result = (exit_code == opts.expected_exit_code or |
| 144 | exit_code in opts.extra_exit_codes) |
| 145 | |
| 146 | if stdout is not None: |
| 147 | stdout.close() |
| 148 | stderr.close() |
| 149 | |
| 150 | # Remove the logs for passes, unless logging all results. |
| 151 | if not opts.log_all and test_result: |
| 152 | os.remove(stdout_log_path) |
| 153 | os.remove(stderr_log_path) |
| 154 | |
| 155 | if not test_result: |
| 156 | print 'FAIL: %d' % index |
| 157 | elif not opts.succinct: |
| 158 | print 'PASS: %d' % index |
| 159 | |
| 160 | def main(): |
| 161 | global opts |
| 162 | from optparse import OptionParser, OptionGroup |
| 163 | parser = OptionParser("""%prog [options] ... test command args ... |
| 164 | |
| 165 | %prog is a tool for fuzzing inputs and testing them. |
| 166 | |
| 167 | The most basic usage is something like: |
| 168 | |
| 169 | $ %prog --file foo.txt ./test.sh |
| 170 | |
| 171 | which will run a default list of fuzzing strategies on the input. For each |
| 172 | fuzzed input, it will overwrite the input files (in place), run the test script, |
| 173 | then restore the files back to their original contents. |
| 174 | |
| 175 | NOTE: You should make sure you have a backup copy of your inputs, in case |
| 176 | something goes wrong!!! |
| 177 | |
| 178 | You can cause the fuzzing to not restore the original files with |
| 179 | '--no-revert'. Generally this is used with '--test <index>' to run one failing |
| 180 | test and then leave the fuzzed inputs in place to examine the failure. |
| 181 | |
| 182 | For each fuzzed input, %prog will run the test command given on the command |
| 183 | line. Each argument in the command is subject to string interpolation before |
| 184 | being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard |
| 185 | printf format, and VARIBLE is one of: |
| 186 | |
| 187 | 'index' - the test index being run |
| 188 | 'inputs' - the full list of test inputs |
| 189 | 'picked_input' - (with --pick-input) the selected input file |
| 190 | 'picked_input_pos' - (with --pick-input) the selected input position |
| 191 | 'picked_input_line' - (with --pick-input) the selected input line |
| 192 | 'picked_input_col' - (with --pick-input) the selected input column |
| 193 | |
| 194 | By default, the script will run forever continually picking new tests to |
| 195 | run. You can limit the number of tests that are run with '--max-tests <number>', |
| 196 | and you can run a particular test with '--test <index>'. |
| 197 | """) |
| 198 | parser.add_option("-v", "--verbose", help="Show more output", |
| 199 | action='store_true', dest="verbose", default=False) |
| 200 | parser.add_option("-s", "--succinct", help="Reduce amount of output", |
| 201 | action="store_true", dest="succinct", default=False) |
| 202 | |
| 203 | group = OptionGroup(parser, "Test Execution") |
| 204 | group.add_option("", "--expected-exit-code", help="Set expected exit code", |
| 205 | type=int, dest="expected_exit_code", |
| 206 | default=0) |
| 207 | group.add_option("", "--extra-exit-code", |
| 208 | help="Set additional expected exit code", |
| 209 | type=int, action="append", dest="extra_exit_codes", |
| 210 | default=[]) |
| 211 | group.add_option("", "--log-dir", |
| 212 | help="Capture test logs to an output directory", |
| 213 | type=str, dest="log_dir", |
| 214 | default=None) |
| 215 | group.add_option("", "--log-all", |
| 216 | help="Log all outputs (not just failures)", |
| 217 | action="store_true", dest="log_all", default=False) |
| 218 | parser.add_option_group(group) |
| 219 | |
| 220 | group = OptionGroup(parser, "Input Files") |
| 221 | group.add_option("", "--file", metavar="PATH", |
| 222 | help="Add an input file to fuzz", |
| 223 | type=str, action="append", dest="input_files", default=[]) |
| 224 | group.add_option("", "--filelist", metavar="LIST", |
| 225 | help="Add a list of inputs files to fuzz (one per line)", |
| 226 | type=int, action="append", dest="filelists", default=[]) |
| 227 | parser.add_option_group(group) |
| 228 | |
| 229 | group = OptionGroup(parser, "Fuzz Options") |
| 230 | group.add_option("", "--replacement-chars", dest="replacement_chars", |
| 231 | help="Characters to insert/replace", |
| 232 | default="0{}[]<>\;@#$^%& ") |
| 233 | group.add_option("", "--replacement-string", dest="replacement_strings", |
| 234 | action="append", help="Add a replacement string to use", |
| 235 | default=[]) |
Daniel Dunbar | c53a844 | 2010-05-30 22:27:55 +0000 | [diff] [blame] | 236 | group.add_option("", "--replacement-list", dest="replacement_lists", |
| 237 | help="Add a list of replacement strings (one per line)", |
| 238 | action="append", default=[]) |
Daniel Dunbar | 8035880 | 2010-05-30 22:27:52 +0000 | [diff] [blame] | 239 | group.add_option("", "--no-delete", help="Don't delete characters", |
| 240 | action='store_false', dest="enable_delete", default=True) |
| 241 | group.add_option("", "--no-insert", help="Don't insert strings", |
| 242 | action='store_false', dest="enable_insert", default=True) |
| 243 | group.add_option("", "--no-replace", help="Don't replace strings", |
| 244 | action='store_false', dest="enable_replace", default=True) |
| 245 | group.add_option("", "--no-revert", help="Don't revert changes", |
| 246 | action='store_false', dest="revert", default=True) |
| 247 | parser.add_option_group(group) |
| 248 | |
| 249 | group = OptionGroup(parser, "Test Selection") |
| 250 | group.add_option("", "--test", help="Run a particular test", |
| 251 | type=int, dest="test", default=None, metavar="INDEX") |
| 252 | group.add_option("", "--max-tests", help="Maximum number of tests", |
| 253 | type=int, dest="max_tests", default=10, metavar="COUNT") |
| 254 | group.add_option("", "--pick-input", |
| 255 | help="Randomly select an input byte as well as fuzzing", |
| 256 | action='store_true', dest="pick_input", default=False) |
| 257 | parser.add_option_group(group) |
| 258 | |
| 259 | parser.disable_interspersed_args() |
| 260 | |
| 261 | (opts, args) = parser.parse_args() |
| 262 | |
| 263 | if not args: |
| 264 | parser.error("Invalid number of arguments") |
| 265 | |
| 266 | # Collect the list of inputs. |
| 267 | input_files = list(opts.input_files) |
| 268 | for filelist in opts.filelists: |
| 269 | f = open(filelist) |
| 270 | try: |
| 271 | for ln in f: |
| 272 | ln = ln.strip() |
| 273 | if ln: |
| 274 | input_files.append(ln) |
| 275 | finally: |
| 276 | f.close() |
| 277 | input_files.sort() |
| 278 | |
| 279 | if not input_files: |
| 280 | parser.error("No input files!") |
| 281 | |
| 282 | print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) |
| 283 | |
| 284 | # Make sure the log directory exists if used. |
| 285 | if opts.log_dir: |
| 286 | if not os.path.exists(opts.log_dir): |
| 287 | try: |
| 288 | os.mkdir(opts.log_dir) |
| 289 | except OSError: |
| 290 | print "%s: error: log directory couldn't be created!" % ( |
| 291 | sys.argv[0],) |
| 292 | raise SystemExit,1 |
| 293 | |
| 294 | # Get the list if insert/replacement strings. |
| 295 | replacements = list(opts.replacement_chars) |
| 296 | replacements.extend(opts.replacement_strings) |
Daniel Dunbar | c53a844 | 2010-05-30 22:27:55 +0000 | [diff] [blame] | 297 | for replacement_list in opts.replacement_lists: |
| 298 | f = open(replacement_list) |
| 299 | try: |
| 300 | for ln in f: |
| 301 | ln = ln[:-1] |
| 302 | if ln: |
| 303 | replacements.append(ln) |
| 304 | finally: |
| 305 | f.close() |
| 306 | |
| 307 | # Unique and order the replacement list. |
| 308 | replacements = list(set(replacements)) |
| 309 | replacements.sort() |
Daniel Dunbar | 8035880 | 2010-05-30 22:27:52 +0000 | [diff] [blame] | 310 | |
| 311 | # Create the test generator. |
| 312 | tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, |
| 313 | opts.enable_replace, replacements, opts.pick_input) |
| 314 | |
| 315 | print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) |
| 316 | print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) |
| 317 | if opts.test is not None: |
| 318 | it = [opts.test] |
| 319 | elif opts.max_tests is not None: |
| 320 | it = itertools.imap(random.randrange, |
| 321 | itertools.repeat(tg.num_tests, opts.max_tests)) |
| 322 | else: |
| 323 | it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) |
| 324 | for test in it: |
| 325 | t = tg.get_test(test) |
| 326 | |
| 327 | if opts.verbose: |
| 328 | print '%s: note: running test %d: %r' % (sys.argv[0], test, t) |
| 329 | ta = TestApplication(tg, t) |
| 330 | try: |
| 331 | ta.apply() |
| 332 | run_one_test(ta, test, input_files, args) |
| 333 | finally: |
| 334 | if opts.revert: |
| 335 | ta.revert() |
| 336 | |
| 337 | sys.stdout.flush() |
| 338 | |
| 339 | if __name__ == '__main__': |
| 340 | main() |