blob: ccec9fedc4460d74feb7a503990776b39fbb633c [file] [log] [blame]
Steve Dower9ddc4162019-05-29 08:20:35 -07001"""This script contains the actual auditing tests.
2
3It should not be imported directly, but should be run by the test_audit
4module with arguments identifying each test.
5
6"""
7
8import contextlib
Steve Dowera5764d32021-06-30 18:52:25 +01009import os
Steve Dower9ddc4162019-05-29 08:20:35 -070010import sys
11
12
13class TestHook:
14 """Used in standard hook tests to collect any logged events.
15
16 Should be used in a with block to ensure that it has no impact
17 after the test completes.
18 """
19
20 def __init__(self, raise_on_events=None, exc_type=RuntimeError):
21 self.raise_on_events = raise_on_events or ()
22 self.exc_type = exc_type
23 self.seen = []
24 self.closed = False
25
26 def __enter__(self, *a):
27 sys.addaudithook(self)
28 return self
29
30 def __exit__(self, *a):
31 self.close()
32
33 def close(self):
34 self.closed = True
35
36 @property
37 def seen_events(self):
38 return [i[0] for i in self.seen]
39
40 def __call__(self, event, args):
41 if self.closed:
42 return
43 self.seen.append((event, args))
44 if event in self.raise_on_events:
45 raise self.exc_type("saw event " + event)
46
47
Steve Dower9ddc4162019-05-29 08:20:35 -070048# Simple helpers, since we are not in unittest here
49def assertEqual(x, y):
50 if x != y:
51 raise AssertionError(f"{x!r} should equal {y!r}")
52
53
54def assertIn(el, series):
55 if el not in series:
56 raise AssertionError(f"{el!r} should be in {series!r}")
57
58
59def assertNotIn(el, series):
60 if el in series:
61 raise AssertionError(f"{el!r} should not be in {series!r}")
62
63
64def assertSequenceEqual(x, y):
65 if len(x) != len(y):
66 raise AssertionError(f"{x!r} should equal {y!r}")
67 if any(ix != iy for ix, iy in zip(x, y)):
68 raise AssertionError(f"{x!r} should equal {y!r}")
69
70
71@contextlib.contextmanager
72def assertRaises(ex_type):
73 try:
74 yield
75 assert False, f"expected {ex_type}"
76 except BaseException as ex:
77 if isinstance(ex, AssertionError):
78 raise
79 assert type(ex) is ex_type, f"{ex} should be {ex_type}"
80
81
82def test_basic():
83 with TestHook() as hook:
84 sys.audit("test_event", 1, 2, 3)
85 assertEqual(hook.seen[0][0], "test_event")
86 assertEqual(hook.seen[0][1], (1, 2, 3))
87
88
89def test_block_add_hook():
90 # Raising an exception should prevent a new hook from being added,
91 # but will not propagate out.
92 with TestHook(raise_on_events="sys.addaudithook") as hook1:
93 with TestHook() as hook2:
94 sys.audit("test_event")
95 assertIn("test_event", hook1.seen_events)
96 assertNotIn("test_event", hook2.seen_events)
97
98
99def test_block_add_hook_baseexception():
100 # Raising BaseException will propagate out when adding a hook
101 with assertRaises(BaseException):
102 with TestHook(
103 raise_on_events="sys.addaudithook", exc_type=BaseException
104 ) as hook1:
105 # Adding this next hook should raise BaseException
106 with TestHook() as hook2:
107 pass
108
109
Steve Dowera5764d32021-06-30 18:52:25 +0100110def test_marshal():
111 import marshal
112 o = ("a", "b", "c", 1, 2, 3)
113 payload = marshal.dumps(o)
114
115 with TestHook() as hook:
116 assertEqual(o, marshal.loads(marshal.dumps(o)))
117
118 try:
119 with open("test-marshal.bin", "wb") as f:
120 marshal.dump(o, f)
121 with open("test-marshal.bin", "rb") as f:
122 assertEqual(o, marshal.load(f))
123 finally:
124 os.unlink("test-marshal.bin")
125
126 actual = [(a[0], a[1]) for e, a in hook.seen if e == "marshal.dumps"]
127 assertSequenceEqual(actual, [(o, marshal.version)] * 2)
128
129 actual = [a[0] for e, a in hook.seen if e == "marshal.loads"]
130 assertSequenceEqual(actual, [payload])
131
132 actual = [e for e, a in hook.seen if e == "marshal.load"]
133 assertSequenceEqual(actual, ["marshal.load"])
134
135
Steve Dower9ddc4162019-05-29 08:20:35 -0700136def test_pickle():
137 import pickle
138
139 class PicklePrint:
140 def __reduce_ex__(self, p):
141 return str, ("Pwned!",)
142
143 payload_1 = pickle.dumps(PicklePrint())
144 payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3))
145
146 # Before we add the hook, ensure our malicious pickle loads
147 assertEqual("Pwned!", pickle.loads(payload_1))
148
149 with TestHook(raise_on_events="pickle.find_class") as hook:
150 with assertRaises(RuntimeError):
151 # With the hook enabled, loading globals is not allowed
152 pickle.loads(payload_1)
153 # pickles with no globals are okay
154 pickle.loads(payload_2)
155
156
157def test_monkeypatch():
158 class A:
159 pass
160
161 class B:
162 pass
163
164 class C(A):
165 pass
166
167 a = A()
168
169 with TestHook() as hook:
170 # Catch name changes
171 C.__name__ = "X"
172 # Catch type changes
173 C.__bases__ = (B,)
174 # Ensure bypassing __setattr__ is still caught
175 type.__dict__["__bases__"].__set__(C, (B,))
176 # Catch attribute replacement
177 C.__init__ = B.__init__
178 # Catch attribute addition
179 C.new_attr = 123
180 # Catch class changes
181 a.__class__ = B
182
183 actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"]
184 assertSequenceEqual(
185 [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], actual
186 )
187
188
189def test_open():
190 # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
191 try:
192 import ssl
193
194 load_dh_params = ssl.create_default_context().load_dh_params
195 except ImportError:
196 load_dh_params = None
197
198 # Try a range of "open" functions.
199 # All of them should fail
200 with TestHook(raise_on_events={"open"}) as hook:
201 for fn, *args in [
202 (open, sys.argv[2], "r"),
203 (open, sys.executable, "rb"),
204 (open, 3, "wb"),
205 (open, sys.argv[2], "w", -1, None, None, None, False, lambda *a: 1),
206 (load_dh_params, sys.argv[2]),
207 ]:
208 if not fn:
209 continue
210 with assertRaises(RuntimeError):
211 fn(*args)
212
213 actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]]
214 actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]]
215 assertSequenceEqual(
216 [
217 i
218 for i in [
219 (sys.argv[2], "r"),
220 (sys.executable, "r"),
221 (3, "w"),
222 (sys.argv[2], "w"),
223 (sys.argv[2], "rb") if load_dh_params else None,
224 ]
225 if i is not None
226 ],
227 actual_mode,
228 )
229 assertSequenceEqual([], actual_flag)
230
231
232def test_cantrace():
233 traced = []
234
235 def trace(frame, event, *args):
236 if frame.f_code == TestHook.__call__.__code__:
237 traced.append(event)
238
239 old = sys.settrace(trace)
240 try:
241 with TestHook() as hook:
242 # No traced call
243 eval("1")
244
245 # No traced call
246 hook.__cantrace__ = False
247 eval("2")
248
249 # One traced call
250 hook.__cantrace__ = True
251 eval("3")
252
253 # Two traced calls (writing to private member, eval)
254 hook.__cantrace__ = 1
255 eval("4")
256
257 # One traced call (writing to private member)
258 hook.__cantrace__ = 0
259 finally:
260 sys.settrace(old)
261
262 assertSequenceEqual(["call"] * 4, traced)
263
264
Zackery Spytz08286d52019-06-21 09:31:59 -0600265def test_mmap():
266 import mmap
Steve Dowerbea33f52019-11-28 08:46:11 -0800267
Zackery Spytz08286d52019-06-21 09:31:59 -0600268 with TestHook() as hook:
269 mmap.mmap(-1, 8)
270 assertEqual(hook.seen[0][1][:2], (-1, 8))
271
272
Steve Dowerbea33f52019-11-28 08:46:11 -0800273def test_excepthook():
274 def excepthook(exc_type, exc_value, exc_tb):
275 if exc_type is not RuntimeError:
276 sys.__excepthook__(exc_type, exc_value, exc_tb)
277
278 def hook(event, args):
279 if event == "sys.excepthook":
280 if not isinstance(args[2], args[1]):
281 raise TypeError(f"Expected isinstance({args[2]!r}, " f"{args[1]!r})")
282 if args[0] != excepthook:
283 raise ValueError(f"Expected {args[0]} == {excepthook}")
284 print(event, repr(args[2]))
285
286 sys.addaudithook(hook)
287 sys.excepthook = excepthook
288 raise RuntimeError("fatal-error")
289
290
291def test_unraisablehook():
292 from _testcapi import write_unraisable_exc
293
294 def unraisablehook(hookargs):
295 pass
296
297 def hook(event, args):
298 if event == "sys.unraisablehook":
299 if args[0] != unraisablehook:
300 raise ValueError(f"Expected {args[0]} == {unraisablehook}")
301 print(event, repr(args[1].exc_value), args[1].err_msg)
302
303 sys.addaudithook(hook)
304 sys.unraisablehook = unraisablehook
305 write_unraisable_exc(RuntimeError("nonfatal-error"), "for audit hook test", None)
306
307
Steve Doweree17e372019-12-09 11:18:12 -0800308def test_winreg():
309 from winreg import OpenKey, EnumKey, CloseKey, HKEY_LOCAL_MACHINE
310
311 def hook(event, args):
312 if not event.startswith("winreg."):
313 return
314 print(event, *args)
315
316 sys.addaudithook(hook)
317
318 k = OpenKey(HKEY_LOCAL_MACHINE, "Software")
319 EnumKey(k, 0)
320 try:
321 EnumKey(k, 10000)
322 except OSError:
323 pass
324 else:
325 raise RuntimeError("Expected EnumKey(HKLM, 10000) to fail")
326
327 kv = k.Detach()
328 CloseKey(kv)
329
330
Steve Dower63ba5cc2020-03-31 12:38:53 +0100331def test_socket():
332 import socket
333
334 def hook(event, args):
335 if event.startswith("socket."):
336 print(event, *args)
337
338 sys.addaudithook(hook)
339
340 socket.gethostname()
341
342 # Don't care if this fails, we just want the audit message
343 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
344 try:
345 # Don't care if this fails, we just want the audit message
346 sock.bind(('127.0.0.1', 8080))
Steve Dower3ef4a7e2020-04-01 09:38:26 +0100347 except Exception:
Steve Dower63ba5cc2020-03-31 12:38:53 +0100348 pass
349 finally:
350 sock.close()
351
352
Pablo Galindob4f90892021-03-10 00:53:57 +0000353def test_gc():
354 import gc
355
356 def hook(event, args):
357 if event.startswith("gc."):
358 print(event, *args)
359
360 sys.addaudithook(hook)
361
362 gc.get_objects(generation=1)
363
364 x = object()
365 y = [x]
366
367 gc.get_referrers(x)
368 gc.get_referents(y)
369
370
Saiyang Gou927b8412021-04-23 03:19:08 -0700371def test_http_client():
372 import http.client
373
374 def hook(event, args):
375 if event.startswith("http.client."):
376 print(event, *args[1:])
377
378 sys.addaudithook(hook)
379
380 conn = http.client.HTTPConnection('www.python.org')
381 try:
382 conn.request('GET', '/')
383 except OSError:
384 print('http.client.send', '[cannot send]')
385 finally:
386 conn.close()
387
388
Erlend Egeberg Aasland7244c002021-04-27 01:16:46 +0200389def test_sqlite3():
390 import sqlite3
391
392 def hook(event, *args):
393 if event.startswith("sqlite3."):
394 print(event, *args)
395
396 sys.addaudithook(hook)
Erlend Egeberg Aaslandc96cc082021-05-02 23:25:17 +0200397 cx1 = sqlite3.connect(":memory:")
398 cx2 = sqlite3.Connection(":memory:")
Erlend Egeberg Aasland7244c002021-04-27 01:16:46 +0200399
400 # Configured without --enable-loadable-sqlite-extensions
401 if hasattr(sqlite3.Connection, "enable_load_extension"):
Erlend Egeberg Aaslandc96cc082021-05-02 23:25:17 +0200402 cx1.enable_load_extension(False)
Erlend Egeberg Aasland7244c002021-04-27 01:16:46 +0200403 try:
Erlend Egeberg Aaslandc96cc082021-05-02 23:25:17 +0200404 cx1.load_extension("test")
Erlend Egeberg Aasland7244c002021-04-27 01:16:46 +0200405 except sqlite3.OperationalError:
406 pass
407 else:
408 raise RuntimeError("Expected sqlite3.load_extension to fail")
409
410
Steve Dower9ddc4162019-05-29 08:20:35 -0700411if __name__ == "__main__":
Victor Stinnerf6e58ae2020-06-10 18:49:23 +0200412 from test.support import suppress_msvcrt_asserts
Steve Dowerbea33f52019-11-28 08:46:11 -0800413
Victor Stinnerf6e58ae2020-06-10 18:49:23 +0200414 suppress_msvcrt_asserts()
Steve Dower9ddc4162019-05-29 08:20:35 -0700415
416 test = sys.argv[1]
417 globals()[test]()