| Tor Norbye | 3a2425a | 2013-11-04 10:16:08 -0800 | [diff] [blame^] | 1 | import sys, os |
| 2 | import imp |
| 3 | |
| 4 | helpers_dir = os.getenv("PYCHARM_HELPERS_DIR", sys.path[0]) |
| 5 | if sys.path[0] != helpers_dir: |
| 6 | sys.path.insert(0, helpers_dir) |
| 7 | |
| 8 | from tcunittest import TeamcityTestResult |
| 9 | |
| 10 | from pycharm_run_utils import import_system_module |
| 11 | from pycharm_run_utils import adjust_sys_path |
| 12 | from pycharm_run_utils import debug, getModuleName |
| 13 | |
| 14 | adjust_sys_path() |
| 15 | |
| 16 | re = import_system_module("re") |
| 17 | inspect = import_system_module("inspect") |
| 18 | |
| 19 | try: |
| 20 | from attest.reporters import AbstractReporter |
| 21 | from attest.collectors import Tests |
| 22 | from attest import TestBase |
| 23 | except: |
| 24 | raise NameError("Please, install attests") |
| 25 | |
| 26 | class TeamCityReporter(AbstractReporter, TeamcityTestResult): |
| 27 | """Teamcity reporter for attests.""" |
| 28 | |
| 29 | def __init__(self, prefix): |
| 30 | TeamcityTestResult.__init__(self) |
| 31 | self.prefix = prefix |
| 32 | |
| 33 | def begin(self, tests): |
| 34 | """initialize suite stack and count tests""" |
| 35 | self.total = len(tests) |
| 36 | self.suite_stack = [] |
| 37 | self.messages.testCount(self.total) |
| 38 | |
| 39 | def success(self, result): |
| 40 | """called when test finished successfully""" |
| 41 | suite = self.get_suite_name(result.test) |
| 42 | self.start_suite(suite) |
| 43 | name = self.get_test_name(result) |
| 44 | self.start_test(result, name) |
| 45 | self.messages.testFinished(name) |
| 46 | |
| 47 | def failure(self, result): |
| 48 | """called when test failed""" |
| 49 | suite = self.get_suite_name(result.test) |
| 50 | self.start_suite(suite) |
| 51 | name = self.get_test_name(result) |
| 52 | self.start_test(result, name) |
| 53 | exctype, value, tb = result.exc_info |
| 54 | error_value = self.find_error_value(tb) |
| 55 | if (error_value.startswith("'") or error_value.startswith('"')) and\ |
| 56 | (error_value.endswith("'") or error_value.endswith('"')): |
| 57 | first = self._unescape(self.find_first(error_value)) |
| 58 | second = self._unescape(self.find_second(error_value)) |
| 59 | else: |
| 60 | first = second = "" |
| 61 | |
| 62 | err = self.formatErr(result.exc_info) |
| 63 | if isinstance(result.error, AssertionError): |
| 64 | self.messages.testFailed(name, message='Failure', |
| 65 | details=err, |
| 66 | expected=first, actual=second) |
| 67 | else: |
| 68 | self.messages.testError(name, message='Error', |
| 69 | details=err) |
| 70 | |
| 71 | def finished(self): |
| 72 | """called when all tests finished""" |
| 73 | self.end_last_suite() |
| 74 | for suite in self.suite_stack[::-1]: |
| 75 | self.messages.testSuiteFinished(suite) |
| 76 | |
| 77 | def get_test_name(self, result): |
| 78 | name = result.test_name |
| 79 | ind = name.find("%") #remove unique module prefix |
| 80 | if ind != -1: |
| 81 | name = name[:ind]+name[name.find(".", ind):] |
| 82 | return name |
| 83 | |
| 84 | def end_last_suite(self): |
| 85 | if self.current_suite: |
| 86 | self.messages.testSuiteFinished(self.current_suite) |
| 87 | self.current_suite = None |
| 88 | |
| 89 | def get_suite_name(self, test): |
| 90 | module = inspect.getmodule(test) |
| 91 | klass = getattr(test, "im_class", None) |
| 92 | file = module.__file__ |
| 93 | if file.endswith("pyc"): |
| 94 | file = file[:-1] |
| 95 | |
| 96 | suite = module.__name__ |
| 97 | if self.prefix: |
| 98 | tmp = file[:-3] |
| 99 | ind = tmp.split(self.prefix)[1] |
| 100 | suite = ind.replace("/", ".") |
| 101 | if klass: |
| 102 | suite += "." + klass.__name__ |
| 103 | lineno = inspect.getsourcelines(klass) |
| 104 | else: |
| 105 | lineno = ("", 1) |
| 106 | |
| 107 | return (suite, file+":"+str(lineno[1])) |
| 108 | |
| 109 | def start_suite(self, suite_info): |
| 110 | """finish previous suite and put current suite |
| 111 | to stack""" |
| 112 | suite, file = suite_info |
| 113 | if suite != self.current_suite: |
| 114 | if self.current_suite: |
| 115 | if suite.startswith(self.current_suite+"."): |
| 116 | self.suite_stack.append(self.current_suite) |
| 117 | else: |
| 118 | self.messages.testSuiteFinished(self.current_suite) |
| 119 | for s in self.suite_stack: |
| 120 | if not suite.startswith(s+"."): |
| 121 | self.current_suite = s |
| 122 | self.messages.testSuiteFinished(self.current_suite) |
| 123 | else: |
| 124 | break |
| 125 | self.current_suite = suite |
| 126 | self.messages.testSuiteStarted(self.current_suite, location="file://" + file) |
| 127 | |
| 128 | def start_test(self, result, name): |
| 129 | """trying to find test location """ |
| 130 | real_func = result.test.func_closure[0].cell_contents |
| 131 | lineno = inspect.getsourcelines(real_func) |
| 132 | file = inspect.getsourcefile(real_func) |
| 133 | self.messages.testStarted(name, "file://"+file+":"+str(lineno[1])) |
| 134 | |
| 135 | def get_subclasses(module, base_class=TestBase): |
| 136 | test_classes = [] |
| 137 | for name in dir(module): |
| 138 | obj = getattr(module, name) |
| 139 | try: |
| 140 | if issubclass(obj, base_class): |
| 141 | test_classes.append(obj) |
| 142 | except TypeError: # If 'obj' is not a class |
| 143 | pass |
| 144 | return test_classes |
| 145 | |
| 146 | def get_module(file_name): |
| 147 | baseName = os.path.splitext(os.path.basename(file_name))[0] |
| 148 | return imp.load_source(baseName, file_name) |
| 149 | |
| 150 | modules = {} |
| 151 | def getModuleName(prefix, cnt): |
| 152 | """ adds unique number to prevent name collisions""" |
| 153 | return prefix + "%" + str(cnt) |
| 154 | |
| 155 | def loadSource(fileName): |
| 156 | baseName = os.path.basename(fileName) |
| 157 | moduleName = os.path.splitext(baseName)[0] |
| 158 | |
| 159 | if moduleName in modules: |
| 160 | cnt = 2 |
| 161 | prefix = moduleName |
| 162 | while getModuleName(prefix, cnt) in modules: |
| 163 | cnt += 1 |
| 164 | moduleName = getModuleName(prefix, cnt) |
| 165 | debug("/ Loading " + fileName + " as " + moduleName) |
| 166 | module = imp.load_source(moduleName, fileName) |
| 167 | modules[moduleName] = module |
| 168 | return module |
| 169 | |
| 170 | |
| 171 | def register_tests_from_module(module, tests): |
| 172 | """add tests from module to main test suite""" |
| 173 | tests_to_register = [] |
| 174 | |
| 175 | for i in dir(module): |
| 176 | obj = getattr(module, i) |
| 177 | if isinstance(obj, Tests): |
| 178 | tests_to_register.append(i) |
| 179 | |
| 180 | for i in tests_to_register: |
| 181 | baseName = module.__name__+"."+i |
| 182 | tests.register(baseName) |
| 183 | test_subclasses = get_subclasses(module) |
| 184 | if test_subclasses: |
| 185 | for subclass in test_subclasses: |
| 186 | tests.register(subclass()) |
| 187 | |
| 188 | |
| 189 | def register_tests_from_folder(tests, folder, pattern=None): |
| 190 | """add tests from folder to main test suite""" |
| 191 | listing = os.listdir(folder) |
| 192 | files = listing |
| 193 | if pattern: #get files matched given pattern |
| 194 | prog_list = [re.compile(pat.strip()) for pat in pattern.split(',')] |
| 195 | files = [] |
| 196 | for fileName in listing: |
| 197 | if os.path.isdir(folder+fileName): |
| 198 | files.append(fileName) |
| 199 | for prog in prog_list: |
| 200 | if prog.match(fileName): |
| 201 | files.append(fileName) |
| 202 | |
| 203 | if not folder.endswith("/"): |
| 204 | folder += "/" |
| 205 | for fileName in files: |
| 206 | if os.path.isdir(folder+fileName): |
| 207 | register_tests_from_folder(tests, folder+fileName, pattern) |
| 208 | if not fileName.endswith("py"): |
| 209 | continue |
| 210 | |
| 211 | module = loadSource(folder+fileName) |
| 212 | register_tests_from_module(module, tests) |
| 213 | |
| 214 | def process_args(): |
| 215 | tests = Tests() |
| 216 | prefix = "" |
| 217 | if not sys.argv: |
| 218 | return |
| 219 | |
| 220 | arg = sys.argv[1].strip() |
| 221 | if not len(arg): |
| 222 | return |
| 223 | |
| 224 | argument_list = arg.split("::") |
| 225 | if len(argument_list) == 1: |
| 226 | # From module or folder |
| 227 | a_splitted = argument_list[0].split(";") |
| 228 | if len(a_splitted) != 1: |
| 229 | # means we have pattern to match against |
| 230 | if a_splitted[0].endswith("/"): |
| 231 | debug("/ from folder " + a_splitted[0] + ". Use pattern: " + a_splitted[1]) |
| 232 | prefix = a_splitted[0] |
| 233 | register_tests_from_folder(tests, a_splitted[0], a_splitted[1]) |
| 234 | else: |
| 235 | if argument_list[0].endswith("/"): |
| 236 | debug("/ from folder " + argument_list[0]) |
| 237 | prefix = a_splitted[0] |
| 238 | register_tests_from_folder(tests, argument_list[0]) |
| 239 | else: |
| 240 | debug("/ from file " + argument_list[0]) |
| 241 | module = get_module(argument_list[0]) |
| 242 | register_tests_from_module(module, tests) |
| 243 | |
| 244 | elif len(argument_list) == 2: |
| 245 | # From testcase |
| 246 | debug("/ from test class " + argument_list[1] + " in " + argument_list[0]) |
| 247 | module = get_module(argument_list[0]) |
| 248 | klass = getattr(module, argument_list[1]) |
| 249 | tests.register(klass()) |
| 250 | else: |
| 251 | # From method in class or from function |
| 252 | module = get_module(argument_list[0]) |
| 253 | if argument_list[1] == "": |
| 254 | debug("/ from function " + argument_list[2] + " in " + argument_list[0]) |
| 255 | # test function, not method |
| 256 | test = getattr(module, argument_list[2]) |
| 257 | else: |
| 258 | debug("/ from method " + argument_list[2] + " in class " + argument_list[1] + " in " + argument_list[0]) |
| 259 | klass = getattr(module, argument_list[1]) |
| 260 | test = getattr(klass(), argument_list[2]) |
| 261 | tests.register([test]) |
| 262 | |
| 263 | tests.run(reporter=TeamCityReporter(prefix)) |
| 264 | |
| 265 | if __name__ == "__main__": |
| 266 | process_args() |