blob: 2bbd84f4e2edbd9360b97dded717c77d1c7b66b8 [file] [log] [blame]
Tor Norbye3a2425a2013-11-04 10:16:08 -08001import sys, os
2import imp
3
4helpers_dir = os.getenv("PYCHARM_HELPERS_DIR", sys.path[0])
5if sys.path[0] != helpers_dir:
6 sys.path.insert(0, helpers_dir)
7
8from tcunittest import TeamcityTestResult
9
10from pycharm_run_utils import import_system_module
11from pycharm_run_utils import adjust_sys_path
12from pycharm_run_utils import debug, getModuleName
13
14adjust_sys_path()
15
16re = import_system_module("re")
17inspect = import_system_module("inspect")
18
19try:
20 from attest.reporters import AbstractReporter
21 from attest.collectors import Tests
22 from attest import TestBase
23except:
24 raise NameError("Please, install attests")
25
26class TeamCityReporter(AbstractReporter, TeamcityTestResult):
27 """Teamcity reporter for attests."""
28
29 def __init__(self, prefix):
30 TeamcityTestResult.__init__(self)
31 self.prefix = prefix
32
33 def begin(self, tests):
34 """initialize suite stack and count tests"""
35 self.total = len(tests)
36 self.suite_stack = []
37 self.messages.testCount(self.total)
38
39 def success(self, result):
40 """called when test finished successfully"""
41 suite = self.get_suite_name(result.test)
42 self.start_suite(suite)
43 name = self.get_test_name(result)
44 self.start_test(result, name)
45 self.messages.testFinished(name)
46
47 def failure(self, result):
48 """called when test failed"""
49 suite = self.get_suite_name(result.test)
50 self.start_suite(suite)
51 name = self.get_test_name(result)
52 self.start_test(result, name)
53 exctype, value, tb = result.exc_info
54 error_value = self.find_error_value(tb)
55 if (error_value.startswith("'") or error_value.startswith('"')) and\
56 (error_value.endswith("'") or error_value.endswith('"')):
57 first = self._unescape(self.find_first(error_value))
58 second = self._unescape(self.find_second(error_value))
59 else:
60 first = second = ""
61
62 err = self.formatErr(result.exc_info)
63 if isinstance(result.error, AssertionError):
64 self.messages.testFailed(name, message='Failure',
65 details=err,
66 expected=first, actual=second)
67 else:
68 self.messages.testError(name, message='Error',
69 details=err)
70
71 def finished(self):
72 """called when all tests finished"""
73 self.end_last_suite()
74 for suite in self.suite_stack[::-1]:
75 self.messages.testSuiteFinished(suite)
76
77 def get_test_name(self, result):
78 name = result.test_name
79 ind = name.find("%") #remove unique module prefix
80 if ind != -1:
81 name = name[:ind]+name[name.find(".", ind):]
82 return name
83
84 def end_last_suite(self):
85 if self.current_suite:
86 self.messages.testSuiteFinished(self.current_suite)
87 self.current_suite = None
88
89 def get_suite_name(self, test):
90 module = inspect.getmodule(test)
91 klass = getattr(test, "im_class", None)
92 file = module.__file__
93 if file.endswith("pyc"):
94 file = file[:-1]
95
96 suite = module.__name__
97 if self.prefix:
98 tmp = file[:-3]
99 ind = tmp.split(self.prefix)[1]
100 suite = ind.replace("/", ".")
101 if klass:
102 suite += "." + klass.__name__
103 lineno = inspect.getsourcelines(klass)
104 else:
105 lineno = ("", 1)
106
107 return (suite, file+":"+str(lineno[1]))
108
109 def start_suite(self, suite_info):
110 """finish previous suite and put current suite
111 to stack"""
112 suite, file = suite_info
113 if suite != self.current_suite:
114 if self.current_suite:
115 if suite.startswith(self.current_suite+"."):
116 self.suite_stack.append(self.current_suite)
117 else:
118 self.messages.testSuiteFinished(self.current_suite)
119 for s in self.suite_stack:
120 if not suite.startswith(s+"."):
121 self.current_suite = s
122 self.messages.testSuiteFinished(self.current_suite)
123 else:
124 break
125 self.current_suite = suite
126 self.messages.testSuiteStarted(self.current_suite, location="file://" + file)
127
128 def start_test(self, result, name):
129 """trying to find test location """
130 real_func = result.test.func_closure[0].cell_contents
131 lineno = inspect.getsourcelines(real_func)
132 file = inspect.getsourcefile(real_func)
133 self.messages.testStarted(name, "file://"+file+":"+str(lineno[1]))
134
135def get_subclasses(module, base_class=TestBase):
136 test_classes = []
137 for name in dir(module):
138 obj = getattr(module, name)
139 try:
140 if issubclass(obj, base_class):
141 test_classes.append(obj)
142 except TypeError: # If 'obj' is not a class
143 pass
144 return test_classes
145
146def get_module(file_name):
147 baseName = os.path.splitext(os.path.basename(file_name))[0]
148 return imp.load_source(baseName, file_name)
149
150modules = {}
151def getModuleName(prefix, cnt):
152 """ adds unique number to prevent name collisions"""
153 return prefix + "%" + str(cnt)
154
155def loadSource(fileName):
156 baseName = os.path.basename(fileName)
157 moduleName = os.path.splitext(baseName)[0]
158
159 if moduleName in modules:
160 cnt = 2
161 prefix = moduleName
162 while getModuleName(prefix, cnt) in modules:
163 cnt += 1
164 moduleName = getModuleName(prefix, cnt)
165 debug("/ Loading " + fileName + " as " + moduleName)
166 module = imp.load_source(moduleName, fileName)
167 modules[moduleName] = module
168 return module
169
170
171def register_tests_from_module(module, tests):
172 """add tests from module to main test suite"""
173 tests_to_register = []
174
175 for i in dir(module):
176 obj = getattr(module, i)
177 if isinstance(obj, Tests):
178 tests_to_register.append(i)
179
180 for i in tests_to_register:
181 baseName = module.__name__+"."+i
182 tests.register(baseName)
183 test_subclasses = get_subclasses(module)
184 if test_subclasses:
185 for subclass in test_subclasses:
186 tests.register(subclass())
187
188
189def register_tests_from_folder(tests, folder, pattern=None):
190 """add tests from folder to main test suite"""
191 listing = os.listdir(folder)
192 files = listing
193 if pattern: #get files matched given pattern
194 prog_list = [re.compile(pat.strip()) for pat in pattern.split(',')]
195 files = []
196 for fileName in listing:
197 if os.path.isdir(folder+fileName):
198 files.append(fileName)
199 for prog in prog_list:
200 if prog.match(fileName):
201 files.append(fileName)
202
203 if not folder.endswith("/"):
204 folder += "/"
205 for fileName in files:
206 if os.path.isdir(folder+fileName):
207 register_tests_from_folder(tests, folder+fileName, pattern)
208 if not fileName.endswith("py"):
209 continue
210
211 module = loadSource(folder+fileName)
212 register_tests_from_module(module, tests)
213
214def process_args():
215 tests = Tests()
216 prefix = ""
217 if not sys.argv:
218 return
219
220 arg = sys.argv[1].strip()
221 if not len(arg):
222 return
223
224 argument_list = arg.split("::")
225 if len(argument_list) == 1:
226 # From module or folder
227 a_splitted = argument_list[0].split(";")
228 if len(a_splitted) != 1:
229 # means we have pattern to match against
230 if a_splitted[0].endswith("/"):
231 debug("/ from folder " + a_splitted[0] + ". Use pattern: " + a_splitted[1])
232 prefix = a_splitted[0]
233 register_tests_from_folder(tests, a_splitted[0], a_splitted[1])
234 else:
235 if argument_list[0].endswith("/"):
236 debug("/ from folder " + argument_list[0])
237 prefix = a_splitted[0]
238 register_tests_from_folder(tests, argument_list[0])
239 else:
240 debug("/ from file " + argument_list[0])
241 module = get_module(argument_list[0])
242 register_tests_from_module(module, tests)
243
244 elif len(argument_list) == 2:
245 # From testcase
246 debug("/ from test class " + argument_list[1] + " in " + argument_list[0])
247 module = get_module(argument_list[0])
248 klass = getattr(module, argument_list[1])
249 tests.register(klass())
250 else:
251 # From method in class or from function
252 module = get_module(argument_list[0])
253 if argument_list[1] == "":
254 debug("/ from function " + argument_list[2] + " in " + argument_list[0])
255 # test function, not method
256 test = getattr(module, argument_list[2])
257 else:
258 debug("/ from method " + argument_list[2] + " in class " + argument_list[1] + " in " + argument_list[0])
259 klass = getattr(module, argument_list[1])
260 test = getattr(klass(), argument_list[2])
261 tests.register([test])
262
263 tests.run(reporter=TeamCityReporter(prefix))
264
265if __name__ == "__main__":
266 process_args()