blob: d41e94b07f4309ea30a7316e6eb30dbb37153c03 [file] [log] [blame]
Vladimir Matveevc24c6c22019-01-08 01:58:25 -08001import errno
Victor Stinnerd6debb22017-03-27 16:05:26 +02002import os
Antoine Pitrouc08177a2017-06-28 23:29:29 +02003import random
Guido van Rossum4f17e3e1995-03-16 15:07:38 +00004import signal
Victor Stinner11517102014-07-29 23:31:34 +02005import socket
Antoine Pitrouf7d090c2017-06-29 16:40:14 +02006import statistics
Christian Heimes4fbc72b2008-03-22 00:47:35 +00007import subprocess
Victor Stinnerd6debb22017-03-27 16:05:26 +02008import sys
9import time
10import unittest
11from test import support
Berker Peksagce643912015-05-06 06:33:17 +030012from test.support.script_helper import assert_python_ok, spawn_python
Victor Stinnerb3e72192011-05-08 01:46:11 +020013try:
Victor Stinner56e8c292014-07-21 12:30:22 +020014 import _testcapi
15except ImportError:
16 _testcapi = None
Christian Heimesc06950e2008-02-28 21:17:00 +000017
Guido van Rossumcc5a91d1997-04-16 00:29:15 +000018
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +020019class GenericTests(unittest.TestCase):
20
21 def test_enums(self):
22 for name in dir(signal):
23 sig = getattr(signal, name)
24 if name in {'SIG_DFL', 'SIG_IGN'}:
25 self.assertIsInstance(sig, signal.Handlers)
26 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
27 self.assertIsInstance(sig, signal.Sigmasks)
28 elif name.startswith('SIG') and not name.startswith('SIG_'):
29 self.assertIsInstance(sig, signal.Signals)
30 elif name.startswith('CTRL_'):
31 self.assertIsInstance(sig, signal.Signals)
32 self.assertEqual(sys.platform, "win32")
33
34
Brian Curtin3f004b12010-08-06 19:34:52 +000035@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
Victor Stinnerb3e72192011-05-08 01:46:11 +020036class PosixTests(unittest.TestCase):
Christian Heimes4fbc72b2008-03-22 00:47:35 +000037 def trivial_signal_handler(self, *args):
38 pass
39
Thomas Woutersed03b412007-08-28 21:37:11 +000040 def test_out_of_range_signal_number_raises_error(self):
41 self.assertRaises(ValueError, signal.getsignal, 4242)
42
Thomas Woutersed03b412007-08-28 21:37:11 +000043 self.assertRaises(ValueError, signal.signal, 4242,
Christian Heimes4fbc72b2008-03-22 00:47:35 +000044 self.trivial_signal_handler)
Thomas Woutersed03b412007-08-28 21:37:11 +000045
Antoine Pietri5d2a27d2018-03-12 14:42:34 +010046 self.assertRaises(ValueError, signal.strsignal, 4242)
47
Thomas Woutersed03b412007-08-28 21:37:11 +000048 def test_setting_signal_handler_to_none_raises_error(self):
49 self.assertRaises(TypeError, signal.signal,
50 signal.SIGUSR1, None)
51
Christian Heimes4fbc72b2008-03-22 00:47:35 +000052 def test_getsignal(self):
53 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +020054 self.assertIsInstance(hup, signal.Handlers)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +000055 self.assertEqual(signal.getsignal(signal.SIGHUP),
56 self.trivial_signal_handler)
Christian Heimes4fbc72b2008-03-22 00:47:35 +000057 signal.signal(signal.SIGHUP, hup)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +000058 self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
Christian Heimes4fbc72b2008-03-22 00:47:35 +000059
Antoine Pietri5d2a27d2018-03-12 14:42:34 +010060 def test_strsignal(self):
Antoine Pietri019f5b32018-03-12 20:03:14 +010061 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
62 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
Michael Osipov48ce4892018-08-23 15:27:19 +020063 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
Antoine Pietri5d2a27d2018-03-12 14:42:34 +010064
Victor Stinner32eb8402016-03-15 11:12:35 +010065 # Issue 3864, unknown if this affects earlier versions of freebsd also
Victor Stinner32eb8402016-03-15 11:12:35 +010066 def test_interprocess_signal(self):
67 dirname = os.path.dirname(__file__)
68 script = os.path.join(dirname, 'signalinterproctester.py')
69 assert_python_ok(script)
70
Antoine Pitrou9d3627e2018-05-04 13:00:50 +020071 def test_valid_signals(self):
72 s = signal.valid_signals()
73 self.assertIsInstance(s, set)
74 self.assertIn(signal.Signals.SIGINT, s)
75 self.assertIn(signal.Signals.SIGALRM, s)
76 self.assertNotIn(0, s)
77 self.assertNotIn(signal.NSIG, s)
78 self.assertLess(len(s), signal.NSIG)
79
Gregory P. Smith38f11cc2019-02-16 12:57:40 -080080 @unittest.skipUnless(sys.executable, "sys.executable required.")
81 def test_keyboard_interrupt_exit_code(self):
82 """KeyboardInterrupt triggers exit via SIGINT."""
83 process = subprocess.run(
84 [sys.executable, "-c",
Gregory P. Smith414c6252019-02-16 17:22:39 -080085 "import os, signal, time\n"
86 "os.kill(os.getpid(), signal.SIGINT)\n"
87 "for _ in range(999): time.sleep(0.01)"],
Gregory P. Smith38f11cc2019-02-16 12:57:40 -080088 stderr=subprocess.PIPE)
89 self.assertIn(b"KeyboardInterrupt", process.stderr)
90 self.assertEqual(process.returncode, -signal.SIGINT)
Gregory P. Smith414c6252019-02-16 17:22:39 -080091 # Caveat: The exit code is insufficient to guarantee we actually died
92 # via a signal. POSIX shells do more than look at the 8 bit value.
93 # Writing an automation friendly test of an interactive shell
94 # to confirm that our process died via a SIGINT proved too complex.
Gregory P. Smith38f11cc2019-02-16 12:57:40 -080095
Christian Heimes4fbc72b2008-03-22 00:47:35 +000096
Brian Curtin3f004b12010-08-06 19:34:52 +000097@unittest.skipUnless(sys.platform == "win32", "Windows specific")
98class WindowsSignalTests(unittest.TestCase):
Antoine Pitrou9d3627e2018-05-04 13:00:50 +020099
100 def test_valid_signals(self):
101 s = signal.valid_signals()
102 self.assertIsInstance(s, set)
103 self.assertGreaterEqual(len(s), 6)
104 self.assertIn(signal.Signals.SIGINT, s)
105 self.assertNotIn(0, s)
106 self.assertNotIn(signal.NSIG, s)
107 self.assertLess(len(s), signal.NSIG)
108
Brian Curtin3f004b12010-08-06 19:34:52 +0000109 def test_issue9324(self):
Brian Curtineccd4d92010-10-01 15:09:53 +0000110 # Updated for issue #10003, adding SIGBREAK
Brian Curtin3f004b12010-08-06 19:34:52 +0000111 handler = lambda x, y: None
Nick Coghlan60b3ac72013-08-03 22:56:30 +1000112 checked = set()
Brian Curtineccd4d92010-10-01 15:09:53 +0000113 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
114 signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
115 signal.SIGTERM):
Nick Coghlan60b3ac72013-08-03 22:56:30 +1000116 # Set and then reset a handler for signals that work on windows.
117 # Issue #18396, only for signals without a C-level handler.
118 if signal.getsignal(sig) is not None:
119 signal.signal(sig, signal.signal(sig, handler))
120 checked.add(sig)
121 # Issue #18396: Ensure the above loop at least tested *something*
122 self.assertTrue(checked)
Brian Curtin3f004b12010-08-06 19:34:52 +0000123
124 with self.assertRaises(ValueError):
125 signal.signal(-1, handler)
Brian Curtin80cd4bf2010-08-07 03:52:38 +0000126
127 with self.assertRaises(ValueError):
128 signal.signal(7, handler)
Brian Curtin3f004b12010-08-06 19:34:52 +0000129
Gregory P. Smith38f11cc2019-02-16 12:57:40 -0800130 @unittest.skipUnless(sys.executable, "sys.executable required.")
131 def test_keyboard_interrupt_exit_code(self):
132 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
133 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
134 # as that requires setting up a console control handler in a child
135 # in its own process group. Doable, but quite complicated. (see
136 # @eryksun on https://github.com/python/cpython/pull/11862)
137 process = subprocess.run(
138 [sys.executable, "-c", "raise KeyboardInterrupt"],
139 stderr=subprocess.PIPE)
140 self.assertIn(b"KeyboardInterrupt", process.stderr)
141 STATUS_CONTROL_C_EXIT = 0xC000013A
142 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
143
Brian Curtin3f004b12010-08-06 19:34:52 +0000144
Benjamin Petersonc68a4a02013-01-18 00:10:24 -0500145class WakeupFDTests(unittest.TestCase):
146
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800147 def test_invalid_call(self):
148 # First parameter is positional-only
149 with self.assertRaises(TypeError):
150 signal.set_wakeup_fd(signum=signal.SIGINT)
151
152 # warn_on_full_buffer is a keyword-only parameter
153 with self.assertRaises(TypeError):
154 signal.set_wakeup_fd(signal.SIGINT, False)
155
Benjamin Petersonc68a4a02013-01-18 00:10:24 -0500156 def test_invalid_fd(self):
Victor Stinner1d8948e2014-07-24 22:51:05 +0200157 fd = support.make_bad_fd()
Victor Stinnera7d03d92014-07-21 17:17:28 +0200158 self.assertRaises((ValueError, OSError),
159 signal.set_wakeup_fd, fd)
Benjamin Petersonc68a4a02013-01-18 00:10:24 -0500160
Victor Stinner11517102014-07-29 23:31:34 +0200161 def test_invalid_socket(self):
162 sock = socket.socket()
163 fd = sock.fileno()
164 sock.close()
165 self.assertRaises((ValueError, OSError),
166 signal.set_wakeup_fd, fd)
167
Victor Stinnerd18ccd12014-07-24 21:58:53 +0200168 def test_set_wakeup_fd_result(self):
Victor Stinner1d8948e2014-07-24 22:51:05 +0200169 r1, w1 = os.pipe()
170 self.addCleanup(os.close, r1)
171 self.addCleanup(os.close, w1)
172 r2, w2 = os.pipe()
173 self.addCleanup(os.close, r2)
174 self.addCleanup(os.close, w2)
Victor Stinnerd18ccd12014-07-24 21:58:53 +0200175
Victor Stinner7cea44d2014-08-27 14:02:36 +0200176 if hasattr(os, 'set_blocking'):
177 os.set_blocking(w1, False)
178 os.set_blocking(w2, False)
Victor Stinner38227602014-08-27 12:59:44 +0200179
Victor Stinner1d8948e2014-07-24 22:51:05 +0200180 signal.set_wakeup_fd(w1)
Victor Stinnerc82a1792014-07-24 22:55:12 +0200181 self.assertEqual(signal.set_wakeup_fd(w2), w1)
182 self.assertEqual(signal.set_wakeup_fd(-1), w2)
183 self.assertEqual(signal.set_wakeup_fd(-1), -1)
Victor Stinner56e8c292014-07-21 12:30:22 +0200184
Victor Stinner11517102014-07-29 23:31:34 +0200185 def test_set_wakeup_fd_socket_result(self):
186 sock1 = socket.socket()
187 self.addCleanup(sock1.close)
Victor Stinner38227602014-08-27 12:59:44 +0200188 sock1.setblocking(False)
Victor Stinner11517102014-07-29 23:31:34 +0200189 fd1 = sock1.fileno()
190
191 sock2 = socket.socket()
192 self.addCleanup(sock2.close)
Victor Stinner38227602014-08-27 12:59:44 +0200193 sock2.setblocking(False)
Victor Stinner11517102014-07-29 23:31:34 +0200194 fd2 = sock2.fileno()
195
196 signal.set_wakeup_fd(fd1)
Victor Stinnerda565a72014-07-30 10:03:03 +0200197 self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
198 self.assertEqual(signal.set_wakeup_fd(-1), fd2)
199 self.assertEqual(signal.set_wakeup_fd(-1), -1)
Victor Stinner11517102014-07-29 23:31:34 +0200200
Victor Stinner38227602014-08-27 12:59:44 +0200201 # On Windows, files are always blocking and Windows does not provide a
202 # function to test if a socket is in non-blocking mode.
203 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
204 def test_set_wakeup_fd_blocking(self):
205 rfd, wfd = os.pipe()
206 self.addCleanup(os.close, rfd)
207 self.addCleanup(os.close, wfd)
208
209 # fd must be non-blocking
210 os.set_blocking(wfd, True)
211 with self.assertRaises(ValueError) as cm:
212 signal.set_wakeup_fd(wfd)
213 self.assertEqual(str(cm.exception),
214 "the fd %s must be in non-blocking mode" % wfd)
215
216 # non-blocking is ok
217 os.set_blocking(wfd, False)
218 signal.set_wakeup_fd(wfd)
219 signal.set_wakeup_fd(-1)
220
Benjamin Petersonc68a4a02013-01-18 00:10:24 -0500221
Brian Curtin3f004b12010-08-06 19:34:52 +0000222@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000223class WakeupSignalTests(unittest.TestCase):
Victor Stinner56e8c292014-07-21 12:30:22 +0200224 @unittest.skipIf(_testcapi is None, 'need _testcapi')
Charles-François Natali027f9a32011-10-02 18:36:05 +0200225 def check_wakeup(self, test_body, *signals, ordered=True):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200226 # use a subprocess to have only one thread
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200227 code = """if 1:
Victor Stinner56e8c292014-07-21 12:30:22 +0200228 import _testcapi
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200229 import os
230 import signal
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200231 import struct
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000232
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200233 signals = {!r}
Victor Stinnerc13ef662011-05-25 02:35:58 +0200234
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200235 def handler(signum, frame):
236 pass
237
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200238 def check_signum(signals):
239 data = os.read(read, len(signals)+1)
240 raised = struct.unpack('%uB' % len(data), data)
Charles-François Natali027f9a32011-10-02 18:36:05 +0200241 if not {!r}:
242 raised = set(raised)
243 signals = set(signals)
Victor Stinner0a01f132011-07-04 18:06:35 +0200244 if raised != signals:
245 raise Exception("%r != %r" % (raised, signals))
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200246
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200247 {}
248
249 signal.signal(signal.SIGALRM, handler)
250 read, write = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +0200251 os.set_blocking(write, False)
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200252 signal.set_wakeup_fd(write)
253
254 test()
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200255 check_signum(signals)
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200256
257 os.close(read)
258 os.close(write)
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +0200259 """.format(tuple(map(int, signals)), ordered, test_body)
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200260
261 assert_python_ok('-c', code)
Victor Stinnerd49b1f12011-05-08 02:03:15 +0200262
Victor Stinner56e8c292014-07-21 12:30:22 +0200263 @unittest.skipIf(_testcapi is None, 'need _testcapi')
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200264 def test_wakeup_write_error(self):
265 # Issue #16105: write() errors in the C signal handler should not
266 # pass silently.
267 # Use a subprocess to have only one thread.
268 code = """if 1:
Victor Stinner56e8c292014-07-21 12:30:22 +0200269 import _testcapi
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200270 import errno
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200271 import os
272 import signal
273 import sys
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200274 from test.support import captured_stderr
275
276 def handler(signum, frame):
277 1/0
278
279 signal.signal(signal.SIGALRM, handler)
280 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +0200281 os.set_blocking(r, False)
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200282
283 # Set wakeup_fd a read-only file descriptor to trigger the error
284 signal.set_wakeup_fd(r)
285 try:
286 with captured_stderr() as err:
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800287 signal.raise_signal(signal.SIGALRM)
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200288 except ZeroDivisionError:
289 # An ignored exception should have been printed out on stderr
290 err = err.getvalue()
291 if ('Exception ignored when trying to write to the signal wakeup fd'
292 not in err):
293 raise AssertionError(err)
294 if ('OSError: [Errno %d]' % errno.EBADF) not in err:
295 raise AssertionError(err)
296 else:
297 raise AssertionError("ZeroDivisionError not raised")
Victor Stinner56e8c292014-07-21 12:30:22 +0200298
299 os.close(r)
300 os.close(w)
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200301 """
Antoine Pitrou8f0bdda2013-08-17 21:43:47 +0200302 r, w = os.pipe()
303 try:
304 os.write(r, b'x')
305 except OSError:
306 pass
307 else:
308 self.skipTest("OS doesn't report write() error on the read end of a pipe")
309 finally:
310 os.close(r)
311 os.close(w)
Antoine Pitrou6f6ec372013-08-17 20:27:56 +0200312
313 assert_python_ok('-c', code)
314
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000315 def test_wakeup_fd_early(self):
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200316 self.check_wakeup("""def test():
317 import select
318 import time
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000319
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200320 TIMEOUT_FULL = 10
321 TIMEOUT_HALF = 5
322
Victor Stinner749a6a82015-03-30 22:09:14 +0200323 class InterruptSelect(Exception):
324 pass
325
326 def handler(signum, frame):
327 raise InterruptSelect
328 signal.signal(signal.SIGALRM, handler)
329
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200330 signal.alarm(1)
Victor Stinner79d68f92015-03-19 21:54:09 +0100331
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200332 # We attempt to get a signal during the sleep,
333 # before select is called
Victor Stinner79d68f92015-03-19 21:54:09 +0100334 try:
335 select.select([], [], [], TIMEOUT_FULL)
Victor Stinner749a6a82015-03-30 22:09:14 +0200336 except InterruptSelect:
Victor Stinner79d68f92015-03-19 21:54:09 +0100337 pass
338 else:
339 raise Exception("select() was not interrupted")
340
341 before_time = time.monotonic()
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200342 select.select([read], [], [], TIMEOUT_FULL)
Victor Stinner79d68f92015-03-19 21:54:09 +0100343 after_time = time.monotonic()
344 dt = after_time - before_time
Victor Stinner0a01f132011-07-04 18:06:35 +0200345 if dt >= TIMEOUT_HALF:
346 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200347 """, signal.SIGALRM)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000348
349 def test_wakeup_fd_during(self):
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200350 self.check_wakeup("""def test():
351 import select
352 import time
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000353
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200354 TIMEOUT_FULL = 10
355 TIMEOUT_HALF = 5
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000356
Victor Stinner749a6a82015-03-30 22:09:14 +0200357 class InterruptSelect(Exception):
358 pass
359
360 def handler(signum, frame):
361 raise InterruptSelect
362 signal.signal(signal.SIGALRM, handler)
363
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200364 signal.alarm(1)
Victor Stinner79d68f92015-03-19 21:54:09 +0100365 before_time = time.monotonic()
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200366 # We attempt to get a signal during the select call
367 try:
368 select.select([read], [], [], TIMEOUT_FULL)
Victor Stinner749a6a82015-03-30 22:09:14 +0200369 except InterruptSelect:
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200370 pass
371 else:
Victor Stinner749a6a82015-03-30 22:09:14 +0200372 raise Exception("select() was not interrupted")
Victor Stinner79d68f92015-03-19 21:54:09 +0100373 after_time = time.monotonic()
Victor Stinnere40b3aa2011-07-04 17:35:10 +0200374 dt = after_time - before_time
Victor Stinner0a01f132011-07-04 18:06:35 +0200375 if dt >= TIMEOUT_HALF:
376 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200377 """, signal.SIGALRM)
Victor Stinnerd49b1f12011-05-08 02:03:15 +0200378
379 def test_signum(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200380 self.check_wakeup("""def test():
381 signal.signal(signal.SIGUSR1, handler)
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800382 signal.raise_signal(signal.SIGUSR1)
383 signal.raise_signal(signal.SIGALRM)
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200384 """, signal.SIGUSR1, signal.SIGALRM)
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000385
Victor Stinnerc13ef662011-05-25 02:35:58 +0200386 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
387 'need signal.pthread_sigmask()')
Victor Stinnerc13ef662011-05-25 02:35:58 +0200388 def test_pending(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200389 self.check_wakeup("""def test():
390 signum1 = signal.SIGUSR1
391 signum2 = signal.SIGUSR2
Victor Stinnerc13ef662011-05-25 02:35:58 +0200392
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200393 signal.signal(signum1, handler)
394 signal.signal(signum2, handler)
Victor Stinnerc13ef662011-05-25 02:35:58 +0200395
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200396 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800397 signal.raise_signal(signum1)
398 signal.raise_signal(signum2)
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200399 # Unblocking the 2 signals calls the C signal handler twice
400 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
Charles-François Natali027f9a32011-10-02 18:36:05 +0200401 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False)
Victor Stinnerc13ef662011-05-25 02:35:58 +0200402
Christian Heimes5fb7c2a2007-12-24 08:52:31 +0000403
Victor Stinner11517102014-07-29 23:31:34 +0200404@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
405class WakeupSocketSignalTests(unittest.TestCase):
406
407 @unittest.skipIf(_testcapi is None, 'need _testcapi')
408 def test_socket(self):
409 # use a subprocess to have only one thread
410 code = """if 1:
411 import signal
412 import socket
413 import struct
414 import _testcapi
415
416 signum = signal.SIGINT
417 signals = (signum,)
418
419 def handler(signum, frame):
420 pass
421
422 signal.signal(signum, handler)
423
424 read, write = socket.socketpair()
Victor Stinner11517102014-07-29 23:31:34 +0200425 write.setblocking(False)
426 signal.set_wakeup_fd(write.fileno())
427
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800428 signal.raise_signal(signum)
Victor Stinner11517102014-07-29 23:31:34 +0200429
430 data = read.recv(1)
431 if not data:
432 raise Exception("no signum written")
433 raised = struct.unpack('B', data)
434 if raised != signals:
435 raise Exception("%r != %r" % (raised, signals))
436
437 read.close()
438 write.close()
439 """
440
441 assert_python_ok('-c', code)
442
443 @unittest.skipIf(_testcapi is None, 'need _testcapi')
444 def test_send_error(self):
445 # Use a subprocess to have only one thread.
446 if os.name == 'nt':
447 action = 'send'
448 else:
449 action = 'write'
450 code = """if 1:
451 import errno
452 import signal
453 import socket
454 import sys
455 import time
456 import _testcapi
457 from test.support import captured_stderr
458
459 signum = signal.SIGINT
460
461 def handler(signum, frame):
462 pass
463
464 signal.signal(signum, handler)
465
466 read, write = socket.socketpair()
467 read.setblocking(False)
468 write.setblocking(False)
469
470 signal.set_wakeup_fd(write.fileno())
471
472 # Close sockets: send() will fail
473 read.close()
474 write.close()
475
476 with captured_stderr() as err:
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800477 signal.raise_signal(signum)
Victor Stinner11517102014-07-29 23:31:34 +0200478
479 err = err.getvalue()
480 if ('Exception ignored when trying to {action} to the signal wakeup fd'
481 not in err):
482 raise AssertionError(err)
483 """.format(action=action)
484 assert_python_ok('-c', code)
485
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800486 @unittest.skipIf(_testcapi is None, 'need _testcapi')
487 def test_warn_on_full_buffer(self):
488 # Use a subprocess to have only one thread.
489 if os.name == 'nt':
490 action = 'send'
491 else:
492 action = 'write'
493 code = """if 1:
494 import errno
495 import signal
496 import socket
497 import sys
498 import time
499 import _testcapi
500 from test.support import captured_stderr
501
502 signum = signal.SIGINT
503
504 # This handler will be called, but we intentionally won't read from
505 # the wakeup fd.
506 def handler(signum, frame):
507 pass
508
509 signal.signal(signum, handler)
510
511 read, write = socket.socketpair()
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800512
Victor Stinner686b4b52018-07-18 18:29:54 +0200513 # Fill the socketpair buffer
514 if sys.platform == 'win32':
515 # bpo-34130: On Windows, sometimes non-blocking send fails to fill
516 # the full socketpair buffer, so use a timeout of 50 ms instead.
517 write.settimeout(0.050)
518 else:
519 write.setblocking(False)
520
521 # Start with large chunk size to reduce the
522 # number of send needed to fill the buffer.
523 written = 0
524 for chunk_size in (2 ** 16, 2 ** 8, 1):
525 chunk = b"x" * chunk_size
526 try:
527 while True:
528 write.send(chunk)
529 written += chunk_size
530 except (BlockingIOError, socket.timeout):
531 pass
532
533 print(f"%s bytes written into the socketpair" % written, flush=True)
534
535 write.setblocking(False)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800536 try:
Victor Stinner686b4b52018-07-18 18:29:54 +0200537 write.send(b"x")
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800538 except BlockingIOError:
Victor Stinner686b4b52018-07-18 18:29:54 +0200539 # The socketpair buffer seems full
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800540 pass
Victor Stinner686b4b52018-07-18 18:29:54 +0200541 else:
542 raise AssertionError("%s bytes failed to fill the socketpair "
543 "buffer" % written)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800544
545 # By default, we get a warning when a signal arrives
Victor Stinner686b4b52018-07-18 18:29:54 +0200546 msg = ('Exception ignored when trying to {action} '
547 'to the signal wakeup fd')
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800548 signal.set_wakeup_fd(write.fileno())
549
550 with captured_stderr() as err:
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800551 signal.raise_signal(signum)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800552
553 err = err.getvalue()
Victor Stinner686b4b52018-07-18 18:29:54 +0200554 if msg not in err:
555 raise AssertionError("first set_wakeup_fd() test failed, "
556 "stderr: %r" % err)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800557
558 # And also if warn_on_full_buffer=True
559 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
560
561 with captured_stderr() as err:
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800562 signal.raise_signal(signum)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800563
564 err = err.getvalue()
Victor Stinner686b4b52018-07-18 18:29:54 +0200565 if msg not in err:
566 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
567 "test failed, stderr: %r" % err)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800568
569 # But not if warn_on_full_buffer=False
570 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
571
572 with captured_stderr() as err:
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800573 signal.raise_signal(signum)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800574
575 err = err.getvalue()
576 if err != "":
Victor Stinner686b4b52018-07-18 18:29:54 +0200577 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
578 "test failed, stderr: %r" % err)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800579
580 # And then check the default again, to make sure warn_on_full_buffer
581 # settings don't leak across calls.
582 signal.set_wakeup_fd(write.fileno())
583
584 with captured_stderr() as err:
Vladimir Matveevc24c6c22019-01-08 01:58:25 -0800585 signal.raise_signal(signum)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800586
587 err = err.getvalue()
Victor Stinner686b4b52018-07-18 18:29:54 +0200588 if msg not in err:
589 raise AssertionError("second set_wakeup_fd() test failed, "
590 "stderr: %r" % err)
Nathaniel J. Smith902ab802017-12-17 20:10:18 -0800591
592 """.format(action=action)
593 assert_python_ok('-c', code)
594
Victor Stinner11517102014-07-29 23:31:34 +0200595
Brian Curtin3f004b12010-08-06 19:34:52 +0000596@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
Christian Heimes8640e742008-02-23 16:23:06 +0000597class SiginterruptTest(unittest.TestCase):
Jean-Paul Calderone9c39bc72010-05-09 03:25:16 +0000598
Victor Stinnerd6284962011-06-20 23:28:09 +0200599 def readpipe_interrupted(self, interrupt):
Jean-Paul Calderone9c39bc72010-05-09 03:25:16 +0000600 """Perform a read during which a signal will arrive. Return True if the
601 read is interrupted by the signal and raises an exception. Return False
602 if it returns normally.
603 """
Victor Stinnerd6284962011-06-20 23:28:09 +0200604 # use a subprocess to have only one thread, to have a timeout on the
605 # blocking read and to not touch signal handling in this process
606 code = """if 1:
607 import errno
608 import os
609 import signal
610 import sys
Jean-Paul Calderone9c39bc72010-05-09 03:25:16 +0000611
Victor Stinnerd6284962011-06-20 23:28:09 +0200612 interrupt = %r
613 r, w = os.pipe()
Christian Heimes8640e742008-02-23 16:23:06 +0000614
Victor Stinnerd6284962011-06-20 23:28:09 +0200615 def handler(signum, frame):
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000616 1 / 0
Victor Stinnerd6284962011-06-20 23:28:09 +0200617
618 signal.signal(signal.SIGALRM, handler)
619 if interrupt is not None:
620 signal.siginterrupt(signal.SIGALRM, interrupt)
621
Victor Stinnerdfde0d42011-07-01 15:58:39 +0200622 print("ready")
623 sys.stdout.flush()
624
Victor Stinnerd6284962011-06-20 23:28:09 +0200625 # run the test twice
Victor Stinner56e8c292014-07-21 12:30:22 +0200626 try:
627 for loop in range(2):
628 # send a SIGALRM in a second (during the read)
629 signal.alarm(1)
630 try:
631 # blocking call: read from a pipe without data
632 os.read(r, 1)
Charles-François Natali6e6c59b2015-02-07 13:27:50 +0000633 except ZeroDivisionError:
634 pass
Victor Stinner56e8c292014-07-21 12:30:22 +0200635 else:
636 sys.exit(2)
637 sys.exit(3)
638 finally:
639 os.close(r)
640 os.close(w)
Victor Stinnerd6284962011-06-20 23:28:09 +0200641 """ % (interrupt,)
642 with spawn_python('-c', code) as process:
Christian Heimes8640e742008-02-23 16:23:06 +0000643 try:
Victor Stinner45273652011-06-22 22:15:51 +0200644 # wait until the child process is loaded and has started
645 first_line = process.stdout.readline()
646
Victor Stinner19e5bcd2011-07-01 15:59:54 +0200647 stdout, stderr = process.communicate(timeout=5.0)
Victor Stinnerd6284962011-06-20 23:28:09 +0200648 except subprocess.TimeoutExpired:
649 process.kill()
Christian Heimes8640e742008-02-23 16:23:06 +0000650 return False
Victor Stinnerd6284962011-06-20 23:28:09 +0200651 else:
Victor Stinner45273652011-06-22 22:15:51 +0200652 stdout = first_line + stdout
Victor Stinnerd6284962011-06-20 23:28:09 +0200653 exitcode = process.wait()
654 if exitcode not in (2, 3):
Antoine Pitroudab4e8a2014-05-11 19:05:23 +0200655 raise Exception("Child error (exit code %s): %r"
Victor Stinnerd6284962011-06-20 23:28:09 +0200656 % (exitcode, stdout))
657 return (exitcode == 3)
Christian Heimes8640e742008-02-23 16:23:06 +0000658
659 def test_without_siginterrupt(self):
Victor Stinner3a7f0f02011-05-08 02:10:36 +0200660 # If a signal handler is installed and siginterrupt is not called
661 # at all, when that signal arrives, it interrupts a syscall that's in
662 # progress.
Victor Stinnerd6284962011-06-20 23:28:09 +0200663 interrupted = self.readpipe_interrupted(None)
664 self.assertTrue(interrupted)
Christian Heimes8640e742008-02-23 16:23:06 +0000665
666 def test_siginterrupt_on(self):
Victor Stinner3a7f0f02011-05-08 02:10:36 +0200667 # If a signal handler is installed and siginterrupt is called with
668 # a true value for the second argument, when that signal arrives, it
669 # interrupts a syscall that's in progress.
Victor Stinnerd6284962011-06-20 23:28:09 +0200670 interrupted = self.readpipe_interrupted(True)
671 self.assertTrue(interrupted)
Christian Heimes8640e742008-02-23 16:23:06 +0000672
673 def test_siginterrupt_off(self):
Victor Stinner3a7f0f02011-05-08 02:10:36 +0200674 # If a signal handler is installed and siginterrupt is called with
675 # a false value for the second argument, when that signal arrives, it
676 # does not interrupt a syscall that's in progress.
Victor Stinnerd6284962011-06-20 23:28:09 +0200677 interrupted = self.readpipe_interrupted(False)
678 self.assertFalse(interrupted)
Jean-Paul Calderone9c39bc72010-05-09 03:25:16 +0000679
680
Brian Curtin3f004b12010-08-06 19:34:52 +0000681@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
Martin v. Löwis823725e2008-03-24 13:39:54 +0000682class ItimerTest(unittest.TestCase):
683 def setUp(self):
684 self.hndl_called = False
685 self.hndl_count = 0
686 self.itimer = None
Christian Heimescc47b052008-03-25 14:56:36 +0000687 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000688
689 def tearDown(self):
Christian Heimescc47b052008-03-25 14:56:36 +0000690 signal.signal(signal.SIGALRM, self.old_alarm)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000691 if self.itimer is not None: # test_itimer_exc doesn't change this attr
692 # just ensure that itimer is stopped
693 signal.setitimer(self.itimer, 0)
694
695 def sig_alrm(self, *args):
696 self.hndl_called = True
Martin v. Löwis823725e2008-03-24 13:39:54 +0000697
698 def sig_vtalrm(self, *args):
699 self.hndl_called = True
700
701 if self.hndl_count > 3:
702 # it shouldn't be here, because it should have been disabled.
703 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
704 "timer.")
705 elif self.hndl_count == 3:
706 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
707 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000708
709 self.hndl_count += 1
710
Martin v. Löwis823725e2008-03-24 13:39:54 +0000711 def sig_prof(self, *args):
712 self.hndl_called = True
713 signal.setitimer(signal.ITIMER_PROF, 0)
714
Martin v. Löwis823725e2008-03-24 13:39:54 +0000715 def test_itimer_exc(self):
716 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
717 # defines it ?
718 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
Christian Heimescc47b052008-03-25 14:56:36 +0000719 # Negative times are treated as zero on some platforms.
720 if 0:
721 self.assertRaises(signal.ItimerError,
722 signal.setitimer, signal.ITIMER_REAL, -1)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000723
724 def test_itimer_real(self):
725 self.itimer = signal.ITIMER_REAL
Martin v. Löwis823725e2008-03-24 13:39:54 +0000726 signal.setitimer(self.itimer, 1.0)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000727 signal.pause()
Martin v. Löwis823725e2008-03-24 13:39:54 +0000728 self.assertEqual(self.hndl_called, True)
729
R. David Murray44546f82010-04-21 01:59:28 +0000730 # Issue 3864, unknown if this affects earlier versions of freebsd also
Victor Stinner13ff2452018-01-22 18:32:50 +0100731 @unittest.skipIf(sys.platform in ('netbsd5',),
Gregory P. Smith397cd8a2010-10-17 04:23:21 +0000732 'itimer not reliable (does not mix well with threading) on some BSDs.')
Martin v. Löwis823725e2008-03-24 13:39:54 +0000733 def test_itimer_virtual(self):
734 self.itimer = signal.ITIMER_VIRTUAL
735 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
736 signal.setitimer(self.itimer, 0.3, 0.2)
737
Victor Stinner79d68f92015-03-19 21:54:09 +0100738 start_time = time.monotonic()
739 while time.monotonic() - start_time < 60.0:
Mark Dickinson95078872009-10-04 18:47:48 +0000740 # use up some virtual time by doing real work
741 _ = pow(12345, 67890, 10000019)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000742 if signal.getitimer(self.itimer) == (0.0, 0.0):
743 break # sig_vtalrm handler stopped this itimer
Stefan Krahf1da973042010-04-20 08:15:14 +0000744 else: # Issue 8424
Benjamin Peterson9b140f62010-05-15 18:00:56 +0000745 self.skipTest("timeout: likely cause: machine too slow or load too "
746 "high")
Martin v. Löwis823725e2008-03-24 13:39:54 +0000747
748 # virtual itimer should be (0.0, 0.0) now
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000749 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
Martin v. Löwis823725e2008-03-24 13:39:54 +0000750 # and the handler should have been called
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000751 self.assertEqual(self.hndl_called, True)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000752
753 def test_itimer_prof(self):
754 self.itimer = signal.ITIMER_PROF
755 signal.signal(signal.SIGPROF, self.sig_prof)
Neal Norwitzf5c7c2e2008-04-05 04:47:45 +0000756 signal.setitimer(self.itimer, 0.2, 0.2)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000757
Victor Stinner79d68f92015-03-19 21:54:09 +0100758 start_time = time.monotonic()
759 while time.monotonic() - start_time < 60.0:
Mark Dickinson78373472009-10-31 10:39:21 +0000760 # do some work
761 _ = pow(12345, 67890, 10000019)
Martin v. Löwis823725e2008-03-24 13:39:54 +0000762 if signal.getitimer(self.itimer) == (0.0, 0.0):
763 break # sig_prof handler stopped this itimer
Stefan Krahf1da973042010-04-20 08:15:14 +0000764 else: # Issue 8424
Benjamin Peterson9b140f62010-05-15 18:00:56 +0000765 self.skipTest("timeout: likely cause: machine too slow or load too "
766 "high")
Martin v. Löwis823725e2008-03-24 13:39:54 +0000767
Neal Norwitzf5c7c2e2008-04-05 04:47:45 +0000768 # profiling itimer should be (0.0, 0.0) now
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000769 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
Neal Norwitzf5c7c2e2008-04-05 04:47:45 +0000770 # and the handler should have been called
Martin v. Löwis823725e2008-03-24 13:39:54 +0000771 self.assertEqual(self.hndl_called, True)
772
Antoine Pitrou729780a2017-06-30 10:01:05 +0200773 def test_setitimer_tiny(self):
774 # bpo-30807: C setitimer() takes a microsecond-resolution interval.
775 # Check that float -> timeval conversion doesn't round
776 # the interval down to zero, which would disable the timer.
777 self.itimer = signal.ITIMER_REAL
778 signal.setitimer(self.itimer, 1e-6)
779 time.sleep(1)
780 self.assertEqual(self.hndl_called, True)
781
Victor Stinnera9293352011-04-30 15:21:58 +0200782
Victor Stinner35b300c2011-05-04 13:20:35 +0200783class PendingSignalsTests(unittest.TestCase):
784 """
Victor Stinnerb3e72192011-05-08 01:46:11 +0200785 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
786 functions.
Victor Stinner35b300c2011-05-04 13:20:35 +0200787 """
Victor Stinnerb3e72192011-05-08 01:46:11 +0200788 @unittest.skipUnless(hasattr(signal, 'sigpending'),
789 'need signal.sigpending()')
790 def test_sigpending_empty(self):
791 self.assertEqual(signal.sigpending(), set())
792
793 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
794 'need signal.pthread_sigmask()')
795 @unittest.skipUnless(hasattr(signal, 'sigpending'),
796 'need signal.sigpending()')
797 def test_sigpending(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200798 code = """if 1:
799 import os
800 import signal
Victor Stinnerb3e72192011-05-08 01:46:11 +0200801
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200802 def handler(signum, frame):
803 1/0
Victor Stinnerb3e72192011-05-08 01:46:11 +0200804
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200805 signum = signal.SIGUSR1
806 signal.signal(signum, handler)
807
808 signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
809 os.kill(os.getpid(), signum)
810 pending = signal.sigpending()
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +0200811 for sig in pending:
812 assert isinstance(sig, signal.Signals), repr(pending)
Victor Stinner0a01f132011-07-04 18:06:35 +0200813 if pending != {signum}:
814 raise Exception('%s != {%s}' % (pending, signum))
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200815 try:
816 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
817 except ZeroDivisionError:
818 pass
819 else:
820 raise Exception("ZeroDivisionError not raised")
821 """
822 assert_python_ok('-c', code)
Victor Stinnerb3e72192011-05-08 01:46:11 +0200823
824 @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
825 'need signal.pthread_kill()')
826 def test_pthread_kill(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200827 code = """if 1:
828 import signal
829 import threading
830 import sys
Victor Stinnerb3e72192011-05-08 01:46:11 +0200831
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200832 signum = signal.SIGUSR1
Victor Stinnerb3e72192011-05-08 01:46:11 +0200833
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200834 def handler(signum, frame):
835 1/0
836
837 signal.signal(signum, handler)
838
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200839 tid = threading.get_ident()
840 try:
841 signal.pthread_kill(tid, signum)
842 except ZeroDivisionError:
843 pass
844 else:
845 raise Exception("ZeroDivisionError not raised")
846 """
847 assert_python_ok('-c', code)
Victor Stinnerb3e72192011-05-08 01:46:11 +0200848
Victor Stinner7f294d12011-06-10 14:02:10 +0200849 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
850 'need signal.pthread_sigmask()')
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200851 def wait_helper(self, blocked, test):
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200852 """
853 test: body of the "def test(signum):" function.
854 blocked: number of the blocked signal
855 """
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200856 code = '''if 1:
857 import signal
858 import sys
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +0200859 from signal import Signals
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200860
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200861 def handler(signum, frame):
862 1/0
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200863
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200864 %s
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200865
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200866 blocked = %s
867 signum = signal.SIGALRM
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200868
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200869 # child: block and wait the signal
870 try:
871 signal.signal(signum, handler)
872 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200873
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200874 # Do the tests
875 test(signum)
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200876
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200877 # The handler must not be called on unblock
878 try:
879 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
880 except ZeroDivisionError:
881 print("the signal handler has been called",
882 file=sys.stderr)
883 sys.exit(1)
884 except BaseException as err:
885 print("error: {}".format(err), file=sys.stderr)
886 sys.stderr.flush()
887 sys.exit(1)
888 ''' % (test.strip(), blocked)
Victor Stinner415007e2011-06-13 16:19:06 +0200889
Ross Lagerwallbc808222011-06-25 12:13:40 +0200890 # sig*wait* must be called with the signal blocked: since the current
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200891 # process might have several threads running, use a subprocess to have
892 # a single thread.
893 assert_python_ok('-c', code)
Victor Stinneraf494602011-06-10 12:48:13 +0200894
Victor Stinnerb3e72192011-05-08 01:46:11 +0200895 @unittest.skipUnless(hasattr(signal, 'sigwait'),
896 'need signal.sigwait()')
Ross Lagerwallbc808222011-06-25 12:13:40 +0200897 def test_sigwait(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200898 self.wait_helper(signal.SIGALRM, '''
899 def test(signum):
Ross Lagerwallbc808222011-06-25 12:13:40 +0200900 signal.alarm(1)
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200901 received = signal.sigwait([signum])
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +0200902 assert isinstance(received, signal.Signals), received
Victor Stinner0a01f132011-07-04 18:06:35 +0200903 if received != signum:
904 raise Exception('received %s, not %s' % (received, signum))
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200905 ''')
Ross Lagerwallbc808222011-06-25 12:13:40 +0200906
907 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
908 'need signal.sigwaitinfo()')
909 def test_sigwaitinfo(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200910 self.wait_helper(signal.SIGALRM, '''
911 def test(signum):
Ross Lagerwallbc808222011-06-25 12:13:40 +0200912 signal.alarm(1)
913 info = signal.sigwaitinfo([signum])
Victor Stinner0a01f132011-07-04 18:06:35 +0200914 if info.si_signo != signum:
915 raise Exception("info.si_signo != %s" % signum)
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200916 ''')
Ross Lagerwallbc808222011-06-25 12:13:40 +0200917
918 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
919 'need signal.sigtimedwait()')
920 def test_sigtimedwait(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200921 self.wait_helper(signal.SIGALRM, '''
922 def test(signum):
Ross Lagerwallbc808222011-06-25 12:13:40 +0200923 signal.alarm(1)
Victor Stinner643cd682012-03-02 22:54:03 +0100924 info = signal.sigtimedwait([signum], 10.1000)
Victor Stinner0a01f132011-07-04 18:06:35 +0200925 if info.si_signo != signum:
926 raise Exception('info.si_signo != %s' % signum)
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200927 ''')
Ross Lagerwallbc808222011-06-25 12:13:40 +0200928
Ross Lagerwallbc808222011-06-25 12:13:40 +0200929 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
930 'need signal.sigtimedwait()')
931 def test_sigtimedwait_poll(self):
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200932 # check that polling with sigtimedwait works
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200933 self.wait_helper(signal.SIGALRM, '''
934 def test(signum):
Victor Stinner9e8b82f2011-06-29 10:43:02 +0200935 import os
936 os.kill(os.getpid(), signum)
Victor Stinner643cd682012-03-02 22:54:03 +0100937 info = signal.sigtimedwait([signum], 0)
Victor Stinner0a01f132011-07-04 18:06:35 +0200938 if info.si_signo != signum:
939 raise Exception('info.si_signo != %s' % signum)
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200940 ''')
Ross Lagerwallbc808222011-06-25 12:13:40 +0200941
942 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
943 'need signal.sigtimedwait()')
944 def test_sigtimedwait_timeout(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200945 self.wait_helper(signal.SIGALRM, '''
946 def test(signum):
Victor Stinner643cd682012-03-02 22:54:03 +0100947 received = signal.sigtimedwait([signum], 1.0)
Victor Stinner0a01f132011-07-04 18:06:35 +0200948 if received is not None:
949 raise Exception("received=%r" % (received,))
Victor Stinnerd554cdf2011-07-04 17:49:40 +0200950 ''')
Ross Lagerwallbc808222011-06-25 12:13:40 +0200951
952 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
953 'need signal.sigtimedwait()')
954 def test_sigtimedwait_negative_timeout(self):
955 signum = signal.SIGALRM
Victor Stinner643cd682012-03-02 22:54:03 +0100956 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
Ross Lagerwallbc808222011-06-25 12:13:40 +0200957
Ross Lagerwallbc808222011-06-25 12:13:40 +0200958 @unittest.skipUnless(hasattr(signal, 'sigwait'),
959 'need signal.sigwait()')
Victor Stinner415007e2011-06-13 16:19:06 +0200960 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
961 'need signal.pthread_sigmask()')
Victor Stinner10c30d62011-06-10 01:39:53 +0200962 def test_sigwait_thread(self):
Victor Stinner415007e2011-06-13 16:19:06 +0200963 # Check that calling sigwait() from a thread doesn't suspend the whole
964 # process. A new interpreter is spawned to avoid problems when mixing
965 # threads and fork(): only async-safe functions are allowed between
966 # fork() and exec().
967 assert_python_ok("-c", """if True:
968 import os, threading, sys, time, signal
Victor Stinner10c30d62011-06-10 01:39:53 +0200969
Victor Stinner415007e2011-06-13 16:19:06 +0200970 # the default handler terminates the process
971 signum = signal.SIGUSR1
972
973 def kill_later():
974 # wait until the main thread is waiting in sigwait()
975 time.sleep(1)
976 os.kill(os.getpid(), signum)
977
978 # the signal must be blocked by all the threads
979 signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
980 killer = threading.Thread(target=kill_later)
Victor Stinneraf494602011-06-10 12:48:13 +0200981 killer.start()
982 received = signal.sigwait([signum])
983 if received != signum:
984 print("sigwait() received %s, not %s" % (received, signum),
985 file=sys.stderr)
Victor Stinner415007e2011-06-13 16:19:06 +0200986 sys.exit(1)
Victor Stinner10c30d62011-06-10 01:39:53 +0200987 killer.join()
Victor Stinner415007e2011-06-13 16:19:06 +0200988 # unblock the signal, which should have been cleared by sigwait()
989 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
990 """)
Victor Stinneraf494602011-06-10 12:48:13 +0200991
Victor Stinnerb3e72192011-05-08 01:46:11 +0200992 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
993 'need signal.pthread_sigmask()')
994 def test_pthread_sigmask_arguments(self):
995 self.assertRaises(TypeError, signal.pthread_sigmask)
996 self.assertRaises(TypeError, signal.pthread_sigmask, 1)
997 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
998 self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200999 with self.assertRaises(ValueError):
1000 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
Serhiy Storchakad54cfb12018-05-08 07:48:50 +03001001 with self.assertRaises(ValueError):
1002 signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1003 with self.assertRaises(ValueError):
1004 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
Antoine Pitrou9d3627e2018-05-04 13:00:50 +02001005
1006 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1007 'need signal.pthread_sigmask()')
1008 def test_pthread_sigmask_valid_signals(self):
1009 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1010 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1011 # Get current blocked set
1012 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1013 self.assertLessEqual(s, signal.valid_signals())
Victor Stinnerb3e72192011-05-08 01:46:11 +02001014
1015 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1016 'need signal.pthread_sigmask()')
1017 def test_pthread_sigmask(self):
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001018 code = """if 1:
1019 import signal
1020 import os; import threading
1021
1022 def handler(signum, frame):
1023 1/0
1024
1025 def kill(signum):
1026 os.kill(os.getpid(), signum)
1027
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +02001028 def check_mask(mask):
1029 for sig in mask:
1030 assert isinstance(sig, signal.Signals), repr(sig)
1031
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001032 def read_sigmask():
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +02001033 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1034 check_mask(sigmask)
1035 return sigmask
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001036
Victor Stinnerb3e72192011-05-08 01:46:11 +02001037 signum = signal.SIGUSR1
Victor Stinner6fd49e12011-05-04 12:38:03 +02001038
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001039 # Install our signal handler
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001040 old_handler = signal.signal(signum, handler)
Victor Stinnera9293352011-04-30 15:21:58 +02001041
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001042 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
Victor Stinnera9293352011-04-30 15:21:58 +02001043 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +02001044 check_mask(old_mask)
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001045 try:
1046 kill(signum)
1047 except ZeroDivisionError:
1048 pass
1049 else:
1050 raise Exception("ZeroDivisionError not raised")
Victor Stinnera9293352011-04-30 15:21:58 +02001051
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001052 # Block and then raise SIGUSR1. The signal is blocked: the signal
1053 # handler is not called, and the signal is now pending
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +02001054 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1055 check_mask(mask)
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001056 kill(signum)
Victor Stinnera9293352011-04-30 15:21:58 +02001057
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001058 # Check the new mask
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001059 blocked = read_sigmask()
Giampaolo Rodola'e09fb712014-04-04 15:34:17 +02001060 check_mask(blocked)
Victor Stinner0a01f132011-07-04 18:06:35 +02001061 if signum not in blocked:
1062 raise Exception("%s not in %s" % (signum, blocked))
1063 if old_mask ^ blocked != {signum}:
1064 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
Victor Stinnera9293352011-04-30 15:21:58 +02001065
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001066 # Unblock SIGUSR1
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001067 try:
R David Murrayfc069992013-12-13 20:52:19 -05001068 # unblock the pending signal calls immediately the signal handler
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001069 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001070 except ZeroDivisionError:
1071 pass
1072 else:
1073 raise Exception("ZeroDivisionError not raised")
1074 try:
1075 kill(signum)
1076 except ZeroDivisionError:
1077 pass
1078 else:
1079 raise Exception("ZeroDivisionError not raised")
Victor Stinnera9293352011-04-30 15:21:58 +02001080
Victor Stinnerd0e516d2011-05-03 14:57:12 +02001081 # Check the new mask
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001082 unblocked = read_sigmask()
Victor Stinner0a01f132011-07-04 18:06:35 +02001083 if signum in unblocked:
1084 raise Exception("%s in %s" % (signum, unblocked))
1085 if blocked ^ unblocked != {signum}:
1086 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1087 if old_mask != unblocked:
1088 raise Exception("%s != %s" % (old_mask, unblocked))
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001089 """
1090 assert_python_ok('-c', code)
1091
Victor Stinnerd554cdf2011-07-04 17:49:40 +02001092 @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1093 'need signal.pthread_kill()')
1094 def test_pthread_kill_main_thread(self):
1095 # Test that a signal can be sent to the main thread with pthread_kill()
1096 # before any other thread has been created (see issue #12392).
1097 code = """if True:
1098 import threading
1099 import signal
1100 import sys
1101
1102 def handler(signum, frame):
1103 sys.exit(3)
1104
1105 signal.signal(signal.SIGUSR1, handler)
1106 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1107 sys.exit(2)
1108 """
1109
1110 with spawn_python('-c', code) as process:
1111 stdout, stderr = process.communicate()
1112 exitcode = process.wait()
1113 if exitcode != 3:
1114 raise Exception("Child error (exit code %s): %s" %
1115 (exitcode, stdout))
Victor Stinnera9293352011-04-30 15:21:58 +02001116
1117
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001118class StressTest(unittest.TestCase):
1119 """
1120 Stress signal delivery, especially when a signal arrives in
1121 the middle of recomputing the signal state or executing
1122 previously tripped signal handlers.
1123 """
1124
Antoine Pitrouf7d090c2017-06-29 16:40:14 +02001125 def setsig(self, signum, handler):
1126 old_handler = signal.signal(signum, handler)
1127 self.addCleanup(signal.signal, signum, old_handler)
1128
1129 def measure_itimer_resolution(self):
1130 N = 20
1131 times = []
1132
1133 def handler(signum=None, frame=None):
1134 if len(times) < N:
1135 times.append(time.perf_counter())
1136 # 1 µs is the smallest possible timer interval,
1137 # we want to measure what the concrete duration
1138 # will be on this platform
1139 signal.setitimer(signal.ITIMER_REAL, 1e-6)
1140
1141 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1142 self.setsig(signal.SIGALRM, handler)
1143 handler()
1144 while len(times) < N:
1145 time.sleep(1e-3)
1146
1147 durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1148 med = statistics.median(durations)
1149 if support.verbose:
1150 print("detected median itimer() resolution: %.6f s." % (med,))
1151 return med
1152
1153 def decide_itimer_count(self):
1154 # Some systems have poor setitimer() resolution (for example
1155 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1156 # number of sequential timers based on that.
1157 reso = self.measure_itimer_resolution()
1158 if reso <= 1e-4:
1159 return 10000
1160 elif reso <= 1e-2:
1161 return 100
1162 else:
1163 self.skipTest("detected itimer resolution (%.3f s.) too high "
1164 "(> 10 ms.) on this platform (or system too busy)"
1165 % (reso,))
1166
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001167 @unittest.skipUnless(hasattr(signal, "setitimer"),
1168 "test needs setitimer()")
1169 def test_stress_delivery_dependent(self):
1170 """
1171 This test uses dependent signal handlers.
1172 """
Antoine Pitrouf7d090c2017-06-29 16:40:14 +02001173 N = self.decide_itimer_count()
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001174 sigs = []
1175
1176 def first_handler(signum, frame):
1177 # 1e-6 is the minimum non-zero value for `setitimer()`.
1178 # Choose a random delay so as to improve chances of
1179 # triggering a race condition. Ideally the signal is received
1180 # when inside critical signal-handling routines such as
1181 # Py_MakePendingCalls().
1182 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1183
1184 def second_handler(signum=None, frame=None):
1185 sigs.append(signum)
1186
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001187 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both
1188 # ascending and descending sequences (SIGUSR1 then SIGALRM,
1189 # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
Antoine Pitrouf7d090c2017-06-29 16:40:14 +02001190 self.setsig(signal.SIGPROF, first_handler)
1191 self.setsig(signal.SIGUSR1, first_handler)
1192 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001193
1194 expected_sigs = 0
Victor Stinner2cf4c202018-12-17 09:36:36 +01001195 deadline = time.monotonic() + 15.0
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001196
1197 while expected_sigs < N:
1198 os.kill(os.getpid(), signal.SIGPROF)
1199 expected_sigs += 1
1200 # Wait for handlers to run to avoid signal coalescing
Victor Stinner2cf4c202018-12-17 09:36:36 +01001201 while len(sigs) < expected_sigs and time.monotonic() < deadline:
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001202 time.sleep(1e-5)
1203
1204 os.kill(os.getpid(), signal.SIGUSR1)
1205 expected_sigs += 1
Victor Stinner2cf4c202018-12-17 09:36:36 +01001206 while len(sigs) < expected_sigs and time.monotonic() < deadline:
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001207 time.sleep(1e-5)
1208
1209 # All ITIMER_REAL signals should have been delivered to the
1210 # Python handler
1211 self.assertEqual(len(sigs), N, "Some signals were lost")
1212
1213 @unittest.skipUnless(hasattr(signal, "setitimer"),
1214 "test needs setitimer()")
1215 def test_stress_delivery_simultaneous(self):
1216 """
1217 This test uses simultaneous signal handlers.
1218 """
Antoine Pitrouf7d090c2017-06-29 16:40:14 +02001219 N = self.decide_itimer_count()
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001220 sigs = []
1221
1222 def handler(signum, frame):
1223 sigs.append(signum)
1224
Antoine Pitrouf7d090c2017-06-29 16:40:14 +02001225 self.setsig(signal.SIGUSR1, handler)
1226 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001227
1228 expected_sigs = 0
Victor Stinner2cf4c202018-12-17 09:36:36 +01001229 deadline = time.monotonic() + 15.0
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001230
1231 while expected_sigs < N:
1232 # Hopefully the SIGALRM will be received somewhere during
1233 # initial processing of SIGUSR1.
1234 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1235 os.kill(os.getpid(), signal.SIGUSR1)
1236
1237 expected_sigs += 2
1238 # Wait for handlers to run to avoid signal coalescing
Victor Stinner2cf4c202018-12-17 09:36:36 +01001239 while len(sigs) < expected_sigs and time.monotonic() < deadline:
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001240 time.sleep(1e-5)
1241
1242 # All ITIMER_REAL signals should have been delivered to the
1243 # Python handler
1244 self.assertEqual(len(sigs), N, "Some signals were lost")
1245
Vladimir Matveevc24c6c22019-01-08 01:58:25 -08001246class RaiseSignalTest(unittest.TestCase):
1247
1248 def test_sigint(self):
Gregory P. Smith38f11cc2019-02-16 12:57:40 -08001249 with self.assertRaises(KeyboardInterrupt):
Vladimir Matveevc24c6c22019-01-08 01:58:25 -08001250 signal.raise_signal(signal.SIGINT)
Vladimir Matveevc24c6c22019-01-08 01:58:25 -08001251
1252 @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1253 def test_invalid_argument(self):
1254 try:
1255 SIGHUP = 1 # not supported on win32
1256 signal.raise_signal(SIGHUP)
1257 self.fail("OSError (Invalid argument) expected")
1258 except OSError as e:
1259 if e.errno == errno.EINVAL:
1260 pass
1261 else:
1262 raise
1263
1264 def test_handler(self):
1265 is_ok = False
1266 def handler(a, b):
1267 nonlocal is_ok
1268 is_ok = True
1269 old_signal = signal.signal(signal.SIGINT, handler)
1270 self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1271
1272 signal.raise_signal(signal.SIGINT)
1273 self.assertTrue(is_ok)
1274
Antoine Pitrouc08177a2017-06-28 23:29:29 +02001275
Zachary Ware38c707e2015-04-13 15:00:43 -05001276def tearDownModule():
1277 support.reap_children()
Thomas Woutersed03b412007-08-28 21:37:11 +00001278
1279if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -05001280 unittest.main()