blob: b6950c92a11f1b274d9a3c60ca85ea932f11effd [file] [log] [blame]
Tor Norbye3a2425a2013-11-04 10:16:08 -08001import traceback, sys
2from unittest import TestResult
3import datetime
4
5from tcmessages import TeamcityServiceMessages
6
7PYTHON_VERSION_MAJOR = sys.version_info[0]
8
9def strclass(cls):
10 if not cls.__name__:
11 return cls.__module__
12 return "%s.%s" % (cls.__module__, cls.__name__)
13
14def smart_str(s):
15 encoding='utf-8'
16 errors='strict'
17 if PYTHON_VERSION_MAJOR < 3:
18 is_string = isinstance(s, basestring)
19 else:
20 is_string = isinstance(s, str)
21 if not is_string:
22 try:
23 return str(s)
24 except UnicodeEncodeError:
25 if isinstance(s, Exception):
26 # An Exception subclass containing non-ASCII data that doesn't
27 # know how to print itself properly. We shouldn't raise a
28 # further exception.
29 return ' '.join([smart_str(arg) for arg in s])
30 return unicode(s).encode(encoding, errors)
31 elif isinstance(s, unicode):
32 return s.encode(encoding, errors)
33 else:
34 return s
35
36class TeamcityTestResult(TestResult):
37 def __init__(self, stream=sys.stdout, *args, **kwargs):
38 TestResult.__init__(self)
39 for arg, value in kwargs.items():
40 setattr(self, arg, value)
41 self.output = stream
42 self.messages = TeamcityServiceMessages(self.output, prepend_linebreak=True)
43 self.messages.testMatrixEntered()
44 self.current_suite = None
45
46 def find_first(self, val):
47 quot = val[0]
48 count = 1
49 quote_ind = val[count:].find(quot)
Tor Norbye8668e1b2013-12-20 09:14:04 -080050 while quote_ind != -1 and val[count+quote_ind-1] == "\\":
Tor Norbye3a2425a2013-11-04 10:16:08 -080051 count = count + quote_ind + 1
52 quote_ind = val[count:].find(quot)
53
54 return val[0:quote_ind+count+1]
55
56 def find_second(self, val):
57 val_index = val.find("!=")
58 if val_index != -1:
59 count = 1
60 val = val[val_index+2:].strip()
61 quot = val[0]
62 quote_ind = val[count:].find(quot)
Tor Norbye8668e1b2013-12-20 09:14:04 -080063 while quote_ind != -1 and val[count+quote_ind-1] == "\\":
Tor Norbye3a2425a2013-11-04 10:16:08 -080064 count = count + quote_ind + 1
65 quote_ind = val[count:].find(quot)
66 return val[0:quote_ind+count+1]
67
68 else:
69 quot = val[-1]
Tor Norbye8668e1b2013-12-20 09:14:04 -080070 quote_ind = val[:len(val)-1].rfind(quot)
71 while quote_ind != -1 and val[quote_ind-1] == "\\":
Tor Norbye3a2425a2013-11-04 10:16:08 -080072 quote_ind = val[:quote_ind-1].rfind(quot)
73 return val[quote_ind:]
74
75 def formatErr(self, err):
76 exctype, value, tb = err
77 return ''.join(traceback.format_exception(exctype, value, tb))
78
79 def getTestName(self, test):
80 if hasattr(test, '_testMethodName'):
81 if test._testMethodName == "runTest":
82 return str(test)
83 return test._testMethodName
84 else:
85 test_name = str(test)
86 whitespace_index = test_name.index(" ")
87 if whitespace_index != -1:
88 test_name = test_name[:whitespace_index]
89 return test_name
90
91 def getTestId(self, test):
92 return test.id
93
94 def addSuccess(self, test):
95 TestResult.addSuccess(self, test)
96
97 def addError(self, test, err):
98 TestResult.addError(self, test, err)
99
100 err = self._exc_info_to_string(err, test)
101
102 self.messages.testError(self.getTestName(test),
103 message='Error', details=err)
104
105 def find_error_value(self, err):
106 error_value = traceback.extract_tb(err)
107 error_value = error_value[-1][-1]
108 return error_value.split('assert')[-1].strip()
109
110 def addFailure(self, test, err):
111 TestResult.addFailure(self, test, err)
112
113 error_value = smart_str(err[1])
114 if not len(error_value):
115 # means it's test function and we have to extract value from traceback
116 error_value = self.find_error_value(err[2])
117
118 self_find_first = self.find_first(error_value)
119 self_find_second = self.find_second(error_value)
120 quotes = ["'", '"']
121 if (self_find_first[0] == self_find_first[-1] and self_find_first[0] in quotes and
122 self_find_second[0] == self_find_second[-1] and self_find_second[0] in quotes):
123 # let's unescape strings to show sexy multiline diff in PyCharm.
124 # By default all caret return chars are escaped by testing framework
125 first = self._unescape(self_find_first)
126 second = self._unescape(self_find_second)
127 else:
128 first = second = ""
129 err = self._exc_info_to_string(err, test)
130
131 self.messages.testFailed(self.getTestName(test),
132 message='Failure', details=err, expected=first, actual=second)
133
134 def addSkip(self, test, reason):
135 self.messages.testIgnored(self.getTestName(test), message=reason)
136
137 def __getSuite(self, test):
138 if hasattr(test, "suite"):
139 suite = strclass(test.suite)
140 suite_location = test.suite.location
141 location = test.suite.abs_location
142 if hasattr(test, "lineno"):
143 location = location + ":" + str(test.lineno)
144 else:
145 location = location + ":" + str(test.test.lineno)
146 else:
147 import inspect
148
149 try:
150 source_file = inspect.getsourcefile(test.__class__)
151 if source_file:
152 source_dir_splitted = source_file.split("/")[:-1]
153 source_dir = "/".join(source_dir_splitted) + "/"
154 else:
155 source_dir = ""
156 except TypeError:
157 source_dir = ""
158
159 suite = strclass(test.__class__)
160 suite_location = "python_uttestid://" + source_dir + suite
161 location = "python_uttestid://" + source_dir + str(test.id())
162
163 return (suite, location, suite_location)
164
165 def startTest(self, test):
166 suite, location, suite_location = self.__getSuite(test)
167 if suite != self.current_suite:
168 if self.current_suite:
169 self.messages.testSuiteFinished(self.current_suite)
170 self.current_suite = suite
171 self.messages.testSuiteStarted(self.current_suite, location=suite_location)
172 setattr(test, "startTime", datetime.datetime.now())
173 self.messages.testStarted(self.getTestName(test), location=location)
174
175 def stopTest(self, test):
176 start = getattr(test, "startTime", datetime.datetime.now())
177 d = datetime.datetime.now() - start
178 duration=d.microseconds / 1000 + d.seconds * 1000 + d.days * 86400000
179 self.messages.testFinished(self.getTestName(test), duration=int(duration))
180
181 def endLastSuite(self):
182 if self.current_suite:
183 self.messages.testSuiteFinished(self.current_suite)
184 self.current_suite = None
185
186 def _unescape(self, text):
187 # do not use text.decode('string_escape'), it leads to problems with different string encodings given
188 return text.replace("\\n", "\n")
189
190class TeamcityTestRunner(object):
191 def __init__(self, stream=sys.stdout):
192 self.stream = stream
193
194 def _makeResult(self, **kwargs):
195 return TeamcityTestResult(self.stream, **kwargs)
196
197 def run(self, test, **kwargs):
198 result = self._makeResult(**kwargs)
199 result.messages.testCount(test.countTestCases())
200 test(result)
201 result.endLastSuite()
202 return result