blob: 8612e276d765884634bf3a033542a4fb14021f69 [file] [log] [blame]
Łukasz Langaa785c872016-09-09 17:37:37 -07001import dis
2import os.path
3import re
4import subprocess
5import sys
6import types
7import unittest
8
9from test.support import findfile, run_unittest
10
11
12def abspath(filename):
13 return os.path.abspath(findfile(filename, subdir="dtracedata"))
14
15
16def normalize_trace_output(output):
17 """Normalize DTrace output for comparison.
18
19 DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
20 are concatenated. So if the operating system moves our thread around, the
21 straight result can be "non-causal". So we add timestamps to the probe
22 firing, sort by that field, then strip it from the output"""
23
24 # When compiling with '--with-pydebug', strip '[# refs]' debug output.
25 output = re.sub(r"\[[0-9]+ refs\]", "", output)
26 try:
27 result = [
28 row.split("\t")
29 for row in output.splitlines()
30 if row and not row.startswith('#')
31 ]
32 result.sort(key=lambda row: int(row[0]))
33 result = [row[1] for row in result]
34 return "\n".join(result)
35 except (IndexError, ValueError):
36 raise AssertionError(
37 "tracer produced unparseable output:\n{}".format(output)
38 )
39
40
41class TraceBackend:
42 EXTENSION = None
43 COMMAND = None
44 COMMAND_ARGS = []
45
46 def run_case(self, name, optimize_python=None):
47 actual_output = normalize_trace_output(self.trace_python(
48 script_file=abspath(name + self.EXTENSION),
49 python_file=abspath(name + ".py"),
50 optimize_python=optimize_python))
51
52 with open(abspath(name + self.EXTENSION + ".expected")) as f:
53 expected_output = f.read().rstrip()
54
55 return (expected_output, actual_output)
56
57 def generate_trace_command(self, script_file, subcommand=None):
58 command = self.COMMAND + [script_file]
59 if subcommand:
60 command += ["-c", subcommand]
61 return command
62
63 def trace(self, script_file, subcommand=None):
64 command = self.generate_trace_command(script_file, subcommand)
65 stdout, _ = subprocess.Popen(command,
66 stdout=subprocess.PIPE,
67 stderr=subprocess.STDOUT,
68 universal_newlines=True).communicate()
69 return stdout
70
71 def trace_python(self, script_file, python_file, optimize_python=None):
72 python_flags = []
73 if optimize_python:
74 python_flags.extend(["-O"] * optimize_python)
75 subcommand = " ".join([sys.executable] + python_flags + [python_file])
76 return self.trace(script_file, subcommand)
77
78 def assert_usable(self):
79 try:
80 output = self.trace(abspath("assert_usable" + self.EXTENSION))
81 output = output.strip()
Victor Stinnerb31206a2018-01-25 19:06:05 +010082 except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
Łukasz Langaa785c872016-09-09 17:37:37 -070083 output = str(fnfe)
84 if output != "probe: success":
85 raise unittest.SkipTest(
86 "{}(1) failed: {}".format(self.COMMAND[0], output)
87 )
88
89
90class DTraceBackend(TraceBackend):
91 EXTENSION = ".d"
92 COMMAND = ["dtrace", "-q", "-s"]
93
94
95class SystemTapBackend(TraceBackend):
96 EXTENSION = ".stp"
97 COMMAND = ["stap", "-g"]
98
99
100class TraceTests(unittest.TestCase):
101 # unittest.TestCase options
102 maxDiff = None
103
104 # TraceTests options
105 backend = None
106 optimize_python = 0
107
108 @classmethod
109 def setUpClass(self):
110 self.backend.assert_usable()
111
112 def run_case(self, name):
113 actual_output, expected_output = self.backend.run_case(
114 name, optimize_python=self.optimize_python)
115 self.assertEqual(actual_output, expected_output)
116
117 def test_function_entry_return(self):
118 self.run_case("call_stack")
119
120 def test_verify_call_opcodes(self):
121 """Ensure our call stack test hits all function call opcodes"""
122
123 opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
124
125 with open(abspath("call_stack.py")) as f:
126 code_string = f.read()
127
128 def get_function_instructions(funcname):
129 # Recompile with appropriate optimization setting
130 code = compile(source=code_string,
131 filename="<string>",
132 mode="exec",
133 optimize=self.optimize_python)
134
135 for c in code.co_consts:
136 if isinstance(c, types.CodeType) and c.co_name == funcname:
137 return dis.get_instructions(c)
138 return []
139
140 for instruction in get_function_instructions('start'):
141 opcodes.discard(instruction.opname)
142
143 self.assertEqual(set(), opcodes)
144
145 def test_gc(self):
146 self.run_case("gc")
147
148 def test_line(self):
149 self.run_case("line")
150
151
152class DTraceNormalTests(TraceTests):
153 backend = DTraceBackend()
154 optimize_python = 0
155
156
157class DTraceOptimizedTests(TraceTests):
158 backend = DTraceBackend()
159 optimize_python = 2
160
161
162class SystemTapNormalTests(TraceTests):
163 backend = SystemTapBackend()
164 optimize_python = 0
165
166
167class SystemTapOptimizedTests(TraceTests):
168 backend = SystemTapBackend()
169 optimize_python = 2
170
171
172def test_main():
173 run_unittest(DTraceNormalTests, DTraceOptimizedTests, SystemTapNormalTests,
174 SystemTapOptimizedTests)
175
176
177if __name__ == '__main__':
178 test_main()