| import unittest | 
 | from test import support | 
 | from contextlib import closing | 
 | import gc | 
 | import pickle | 
 | import select | 
 | import signal | 
 | import struct | 
 | import subprocess | 
 | import traceback | 
 | import sys, os, time, errno | 
 | from test.script_helper import assert_python_ok, spawn_python | 
 | try: | 
 |     import threading | 
 | except ImportError: | 
 |     threading = None | 
 |  | 
 | if sys.platform == 'riscos': | 
 |     raise unittest.SkipTest("Can't test signal on %s" % sys.platform) | 
 |  | 
 |  | 
 | class HandlerBCalled(Exception): | 
 |     pass | 
 |  | 
 |  | 
 | def exit_subprocess(): | 
 |     """Use os._exit(0) to exit the current subprocess. | 
 |  | 
 |     Otherwise, the test catches the SystemExit and continues executing | 
 |     in parallel with the original test, so you wind up with an | 
 |     exponential number of tests running concurrently. | 
 |     """ | 
 |     os._exit(0) | 
 |  | 
 |  | 
 | def ignoring_eintr(__func, *args, **kwargs): | 
 |     try: | 
 |         return __func(*args, **kwargs) | 
 |     except EnvironmentError as e: | 
 |         if e.errno != errno.EINTR: | 
 |             raise | 
 |         return None | 
 |  | 
 |  | 
 | @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | 
 | class InterProcessSignalTests(unittest.TestCase): | 
 |     MAX_DURATION = 20   # Entire test should last at most 20 sec. | 
 |  | 
 |     def setUp(self): | 
 |         self.using_gc = gc.isenabled() | 
 |         gc.disable() | 
 |  | 
 |     def tearDown(self): | 
 |         if self.using_gc: | 
 |             gc.enable() | 
 |  | 
 |     def format_frame(self, frame, limit=None): | 
 |         return ''.join(traceback.format_stack(frame, limit=limit)) | 
 |  | 
 |     def handlerA(self, signum, frame): | 
 |         self.a_called = True | 
 |  | 
 |     def handlerB(self, signum, frame): | 
 |         self.b_called = True | 
 |         raise HandlerBCalled(signum, self.format_frame(frame)) | 
 |  | 
 |     def wait(self, child): | 
 |         """Wait for child to finish, ignoring EINTR.""" | 
 |         while True: | 
 |             try: | 
 |                 child.wait() | 
 |                 return | 
 |             except OSError as e: | 
 |                 if e.errno != errno.EINTR: | 
 |                     raise | 
 |  | 
 |     def run_test(self): | 
 |         # Install handlers. This function runs in a sub-process, so we | 
 |         # don't worry about re-setting the default handlers. | 
 |         signal.signal(signal.SIGHUP, self.handlerA) | 
 |         signal.signal(signal.SIGUSR1, self.handlerB) | 
 |         signal.signal(signal.SIGUSR2, signal.SIG_IGN) | 
 |         signal.signal(signal.SIGALRM, signal.default_int_handler) | 
 |  | 
 |         # Variables the signals will modify: | 
 |         self.a_called = False | 
 |         self.b_called = False | 
 |  | 
 |         # Let the sub-processes know who to send signals to. | 
 |         pid = os.getpid() | 
 |  | 
 |         child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) | 
 |         if child: | 
 |             self.wait(child) | 
 |             if not self.a_called: | 
 |                 time.sleep(1)  # Give the signal time to be delivered. | 
 |         self.assertTrue(self.a_called) | 
 |         self.assertFalse(self.b_called) | 
 |         self.a_called = False | 
 |  | 
 |         # Make sure the signal isn't delivered while the previous | 
 |         # Popen object is being destroyed, because __del__ swallows | 
 |         # exceptions. | 
 |         del child | 
 |         try: | 
 |             child = subprocess.Popen(['kill', '-USR1', str(pid)]) | 
 |             # This wait should be interrupted by the signal's exception. | 
 |             self.wait(child) | 
 |             time.sleep(1)  # Give the signal time to be delivered. | 
 |             self.fail('HandlerBCalled exception not thrown') | 
 |         except HandlerBCalled: | 
 |             self.assertTrue(self.b_called) | 
 |             self.assertFalse(self.a_called) | 
 |  | 
 |         child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) | 
 |         if child: | 
 |             self.wait(child)  # Nothing should happen. | 
 |  | 
 |         try: | 
 |             signal.alarm(1) | 
 |             # The race condition in pause doesn't matter in this case, | 
 |             # since alarm is going to raise a KeyboardException, which | 
 |             # will skip the call. | 
 |             signal.pause() | 
 |             # But if another signal arrives before the alarm, pause | 
 |             # may return early. | 
 |             time.sleep(1) | 
 |         except KeyboardInterrupt: | 
 |             pass | 
 |         except: | 
 |             self.fail("Some other exception woke us from pause: %s" % | 
 |                       traceback.format_exc()) | 
 |         else: | 
 |             self.fail("pause returned of its own accord, and the signal" | 
 |                       " didn't arrive after another second.") | 
 |  | 
 |     # Issue 3864, unknown if this affects earlier versions of freebsd also | 
 |     @unittest.skipIf(sys.platform=='freebsd6', | 
 |         'inter process signals not reliable (do not mix well with threading) ' | 
 |         'on freebsd6') | 
 |     def test_main(self): | 
 |         # This function spawns a child process to insulate the main | 
 |         # test-running process from all the signals. It then | 
 |         # communicates with that child process over a pipe and | 
 |         # re-raises information about any exceptions the child | 
 |         # throws. The real work happens in self.run_test(). | 
 |         os_done_r, os_done_w = os.pipe() | 
 |         with closing(os.fdopen(os_done_r, 'rb')) as done_r, \ | 
 |              closing(os.fdopen(os_done_w, 'wb')) as done_w: | 
 |             child = os.fork() | 
 |             if child == 0: | 
 |                 # In the child process; run the test and report results | 
 |                 # through the pipe. | 
 |                 try: | 
 |                     done_r.close() | 
 |                     # Have to close done_w again here because | 
 |                     # exit_subprocess() will skip the enclosing with block. | 
 |                     with closing(done_w): | 
 |                         try: | 
 |                             self.run_test() | 
 |                         except: | 
 |                             pickle.dump(traceback.format_exc(), done_w) | 
 |                         else: | 
 |                             pickle.dump(None, done_w) | 
 |                 except: | 
 |                     print('Uh oh, raised from pickle.') | 
 |                     traceback.print_exc() | 
 |                 finally: | 
 |                     exit_subprocess() | 
 |  | 
 |             done_w.close() | 
 |             # Block for up to MAX_DURATION seconds for the test to finish. | 
 |             r, w, x = select.select([done_r], [], [], self.MAX_DURATION) | 
 |             if done_r in r: | 
 |                 tb = pickle.load(done_r) | 
 |                 if tb: | 
 |                     self.fail(tb) | 
 |             else: | 
 |                 os.kill(child, signal.SIGKILL) | 
 |                 self.fail('Test deadlocked after %d seconds.' % | 
 |                           self.MAX_DURATION) | 
 |  | 
 |  | 
 | @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | 
 | class PosixTests(unittest.TestCase): | 
 |     def trivial_signal_handler(self, *args): | 
 |         pass | 
 |  | 
 |     def test_out_of_range_signal_number_raises_error(self): | 
 |         self.assertRaises(ValueError, signal.getsignal, 4242) | 
 |  | 
 |         self.assertRaises(ValueError, signal.signal, 4242, | 
 |                           self.trivial_signal_handler) | 
 |  | 
 |     def test_setting_signal_handler_to_none_raises_error(self): | 
 |         self.assertRaises(TypeError, signal.signal, | 
 |                           signal.SIGUSR1, None) | 
 |  | 
 |     def test_getsignal(self): | 
 |         hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) | 
 |         self.assertEqual(signal.getsignal(signal.SIGHUP), | 
 |                          self.trivial_signal_handler) | 
 |         signal.signal(signal.SIGHUP, hup) | 
 |         self.assertEqual(signal.getsignal(signal.SIGHUP), hup) | 
 |  | 
 |  | 
 | @unittest.skipUnless(sys.platform == "win32", "Windows specific") | 
 | class WindowsSignalTests(unittest.TestCase): | 
 |     def test_issue9324(self): | 
 |         # Updated for issue #10003, adding SIGBREAK | 
 |         handler = lambda x, y: None | 
 |         for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, | 
 |                     signal.SIGILL, signal.SIGINT, signal.SIGSEGV, | 
 |                     signal.SIGTERM): | 
 |             # Set and then reset a handler for signals that work on windows | 
 |             signal.signal(sig, signal.signal(sig, handler)) | 
 |  | 
 |         with self.assertRaises(ValueError): | 
 |             signal.signal(-1, handler) | 
 |  | 
 |         with self.assertRaises(ValueError): | 
 |             signal.signal(7, handler) | 
 |  | 
 |  | 
 | @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | 
 | class WakeupSignalTests(unittest.TestCase): | 
 |     def check_wakeup(self, test_body, *signals, ordered=True): | 
 |         # use a subprocess to have only one thread | 
 |         code = """if 1: | 
 |         import fcntl | 
 |         import os | 
 |         import signal | 
 |         import struct | 
 |  | 
 |         signals = {!r} | 
 |  | 
 |         def handler(signum, frame): | 
 |             pass | 
 |  | 
 |         def check_signum(signals): | 
 |             data = os.read(read, len(signals)+1) | 
 |             raised = struct.unpack('%uB' % len(data), data) | 
 |             if not {!r}: | 
 |                 raised = set(raised) | 
 |                 signals = set(signals) | 
 |             if raised != signals: | 
 |                 raise Exception("%r != %r" % (raised, signals)) | 
 |  | 
 |         {} | 
 |  | 
 |         signal.signal(signal.SIGALRM, handler) | 
 |         read, write = os.pipe() | 
 |         for fd in (read, write): | 
 |             flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) | 
 |             flags = flags | os.O_NONBLOCK | 
 |             fcntl.fcntl(fd, fcntl.F_SETFL, flags) | 
 |         signal.set_wakeup_fd(write) | 
 |  | 
 |         test() | 
 |         check_signum(signals) | 
 |  | 
 |         os.close(read) | 
 |         os.close(write) | 
 |         """.format(signals, ordered, test_body) | 
 |  | 
 |         assert_python_ok('-c', code) | 
 |  | 
 |     def test_wakeup_fd_early(self): | 
 |         self.check_wakeup("""def test(): | 
 |             import select | 
 |             import time | 
 |  | 
 |             TIMEOUT_FULL = 10 | 
 |             TIMEOUT_HALF = 5 | 
 |  | 
 |             signal.alarm(1) | 
 |             before_time = time.time() | 
 |             # We attempt to get a signal during the sleep, | 
 |             # before select is called | 
 |             time.sleep(TIMEOUT_FULL) | 
 |             mid_time = time.time() | 
 |             dt = mid_time - before_time | 
 |             if dt >= TIMEOUT_HALF: | 
 |                 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) | 
 |             select.select([read], [], [], TIMEOUT_FULL) | 
 |             after_time = time.time() | 
 |             dt = after_time - mid_time | 
 |             if dt >= TIMEOUT_HALF: | 
 |                 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) | 
 |         """, signal.SIGALRM) | 
 |  | 
 |     def test_wakeup_fd_during(self): | 
 |         self.check_wakeup("""def test(): | 
 |             import select | 
 |             import time | 
 |  | 
 |             TIMEOUT_FULL = 10 | 
 |             TIMEOUT_HALF = 5 | 
 |  | 
 |             signal.alarm(1) | 
 |             before_time = time.time() | 
 |             # We attempt to get a signal during the select call | 
 |             try: | 
 |                 select.select([read], [], [], TIMEOUT_FULL) | 
 |             except select.error: | 
 |                 pass | 
 |             else: | 
 |                 raise Exception("select.error not raised") | 
 |             after_time = time.time() | 
 |             dt = after_time - before_time | 
 |             if dt >= TIMEOUT_HALF: | 
 |                 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) | 
 |         """, signal.SIGALRM) | 
 |  | 
 |     def test_signum(self): | 
 |         self.check_wakeup("""def test(): | 
 |             signal.signal(signal.SIGUSR1, handler) | 
 |             os.kill(os.getpid(), signal.SIGUSR1) | 
 |             os.kill(os.getpid(), signal.SIGALRM) | 
 |         """, signal.SIGUSR1, signal.SIGALRM) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), | 
 |                          'need signal.pthread_sigmask()') | 
 |     def test_pending(self): | 
 |         self.check_wakeup("""def test(): | 
 |             signum1 = signal.SIGUSR1 | 
 |             signum2 = signal.SIGUSR2 | 
 |  | 
 |             signal.signal(signum1, handler) | 
 |             signal.signal(signum2, handler) | 
 |  | 
 |             signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) | 
 |             os.kill(os.getpid(), signum1) | 
 |             os.kill(os.getpid(), signum2) | 
 |             # Unblocking the 2 signals calls the C signal handler twice | 
 |             signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) | 
 |         """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False) | 
 |  | 
 |  | 
 | @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | 
 | class SiginterruptTest(unittest.TestCase): | 
 |  | 
 |     def readpipe_interrupted(self, interrupt): | 
 |         """Perform a read during which a signal will arrive.  Return True if the | 
 |         read is interrupted by the signal and raises an exception.  Return False | 
 |         if it returns normally. | 
 |         """ | 
 |         # use a subprocess to have only one thread, to have a timeout on the | 
 |         # blocking read and to not touch signal handling in this process | 
 |         code = """if 1: | 
 |             import errno | 
 |             import os | 
 |             import signal | 
 |             import sys | 
 |  | 
 |             interrupt = %r | 
 |             r, w = os.pipe() | 
 |  | 
 |             def handler(signum, frame): | 
 |                 pass | 
 |  | 
 |             signal.signal(signal.SIGALRM, handler) | 
 |             if interrupt is not None: | 
 |                 signal.siginterrupt(signal.SIGALRM, interrupt) | 
 |  | 
 |             print("ready") | 
 |             sys.stdout.flush() | 
 |  | 
 |             # run the test twice | 
 |             for loop in range(2): | 
 |                 # send a SIGALRM in a second (during the read) | 
 |                 signal.alarm(1) | 
 |                 try: | 
 |                     # blocking call: read from a pipe without data | 
 |                     os.read(r, 1) | 
 |                 except OSError as err: | 
 |                     if err.errno != errno.EINTR: | 
 |                         raise | 
 |                 else: | 
 |                     sys.exit(2) | 
 |             sys.exit(3) | 
 |         """ % (interrupt,) | 
 |         with spawn_python('-c', code) as process: | 
 |             try: | 
 |                 # wait until the child process is loaded and has started | 
 |                 first_line = process.stdout.readline() | 
 |  | 
 |                 stdout, stderr = process.communicate(timeout=5.0) | 
 |             except subprocess.TimeoutExpired: | 
 |                 process.kill() | 
 |                 return False | 
 |             else: | 
 |                 stdout = first_line + stdout | 
 |                 exitcode = process.wait() | 
 |                 if exitcode not in (2, 3): | 
 |                     raise Exception("Child error (exit code %s): %s" | 
 |                                     % (exitcode, stdout)) | 
 |                 return (exitcode == 3) | 
 |  | 
 |     def test_without_siginterrupt(self): | 
 |         # If a signal handler is installed and siginterrupt is not called | 
 |         # at all, when that signal arrives, it interrupts a syscall that's in | 
 |         # progress. | 
 |         interrupted = self.readpipe_interrupted(None) | 
 |         self.assertTrue(interrupted) | 
 |  | 
 |     def test_siginterrupt_on(self): | 
 |         # If a signal handler is installed and siginterrupt is called with | 
 |         # a true value for the second argument, when that signal arrives, it | 
 |         # interrupts a syscall that's in progress. | 
 |         interrupted = self.readpipe_interrupted(True) | 
 |         self.assertTrue(interrupted) | 
 |  | 
 |     def test_siginterrupt_off(self): | 
 |         # If a signal handler is installed and siginterrupt is called with | 
 |         # a false value for the second argument, when that signal arrives, it | 
 |         # does not interrupt a syscall that's in progress. | 
 |         interrupted = self.readpipe_interrupted(False) | 
 |         self.assertFalse(interrupted) | 
 |  | 
 |  | 
 | @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") | 
 | class ItimerTest(unittest.TestCase): | 
 |     def setUp(self): | 
 |         self.hndl_called = False | 
 |         self.hndl_count = 0 | 
 |         self.itimer = None | 
 |         self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) | 
 |  | 
 |     def tearDown(self): | 
 |         signal.signal(signal.SIGALRM, self.old_alarm) | 
 |         if self.itimer is not None: # test_itimer_exc doesn't change this attr | 
 |             # just ensure that itimer is stopped | 
 |             signal.setitimer(self.itimer, 0) | 
 |  | 
 |     def sig_alrm(self, *args): | 
 |         self.hndl_called = True | 
 |  | 
 |     def sig_vtalrm(self, *args): | 
 |         self.hndl_called = True | 
 |  | 
 |         if self.hndl_count > 3: | 
 |             # it shouldn't be here, because it should have been disabled. | 
 |             raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " | 
 |                 "timer.") | 
 |         elif self.hndl_count == 3: | 
 |             # disable ITIMER_VIRTUAL, this function shouldn't be called anymore | 
 |             signal.setitimer(signal.ITIMER_VIRTUAL, 0) | 
 |  | 
 |         self.hndl_count += 1 | 
 |  | 
 |     def sig_prof(self, *args): | 
 |         self.hndl_called = True | 
 |         signal.setitimer(signal.ITIMER_PROF, 0) | 
 |  | 
 |     def test_itimer_exc(self): | 
 |         # XXX I'm assuming -1 is an invalid itimer, but maybe some platform | 
 |         # defines it ? | 
 |         self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) | 
 |         # Negative times are treated as zero on some platforms. | 
 |         if 0: | 
 |             self.assertRaises(signal.ItimerError, | 
 |                               signal.setitimer, signal.ITIMER_REAL, -1) | 
 |  | 
 |     def test_itimer_real(self): | 
 |         self.itimer = signal.ITIMER_REAL | 
 |         signal.setitimer(self.itimer, 1.0) | 
 |         signal.pause() | 
 |         self.assertEqual(self.hndl_called, True) | 
 |  | 
 |     # Issue 3864, unknown if this affects earlier versions of freebsd also | 
 |     @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), | 
 |         'itimer not reliable (does not mix well with threading) on some BSDs.') | 
 |     def test_itimer_virtual(self): | 
 |         self.itimer = signal.ITIMER_VIRTUAL | 
 |         signal.signal(signal.SIGVTALRM, self.sig_vtalrm) | 
 |         signal.setitimer(self.itimer, 0.3, 0.2) | 
 |  | 
 |         start_time = time.time() | 
 |         while time.time() - start_time < 60.0: | 
 |             # use up some virtual time by doing real work | 
 |             _ = pow(12345, 67890, 10000019) | 
 |             if signal.getitimer(self.itimer) == (0.0, 0.0): | 
 |                 break # sig_vtalrm handler stopped this itimer | 
 |         else: # Issue 8424 | 
 |             self.skipTest("timeout: likely cause: machine too slow or load too " | 
 |                           "high") | 
 |  | 
 |         # virtual itimer should be (0.0, 0.0) now | 
 |         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) | 
 |         # and the handler should have been called | 
 |         self.assertEqual(self.hndl_called, True) | 
 |  | 
 |     # Issue 3864, unknown if this affects earlier versions of freebsd also | 
 |     @unittest.skipIf(sys.platform=='freebsd6', | 
 |         'itimer not reliable (does not mix well with threading) on freebsd6') | 
 |     def test_itimer_prof(self): | 
 |         self.itimer = signal.ITIMER_PROF | 
 |         signal.signal(signal.SIGPROF, self.sig_prof) | 
 |         signal.setitimer(self.itimer, 0.2, 0.2) | 
 |  | 
 |         start_time = time.time() | 
 |         while time.time() - start_time < 60.0: | 
 |             # do some work | 
 |             _ = pow(12345, 67890, 10000019) | 
 |             if signal.getitimer(self.itimer) == (0.0, 0.0): | 
 |                 break # sig_prof handler stopped this itimer | 
 |         else: # Issue 8424 | 
 |             self.skipTest("timeout: likely cause: machine too slow or load too " | 
 |                           "high") | 
 |  | 
 |         # profiling itimer should be (0.0, 0.0) now | 
 |         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) | 
 |         # and the handler should have been called | 
 |         self.assertEqual(self.hndl_called, True) | 
 |  | 
 |  | 
 | class PendingSignalsTests(unittest.TestCase): | 
 |     """ | 
 |     Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() | 
 |     functions. | 
 |     """ | 
 |     @unittest.skipUnless(hasattr(signal, 'sigpending'), | 
 |                          'need signal.sigpending()') | 
 |     def test_sigpending_empty(self): | 
 |         self.assertEqual(signal.sigpending(), set()) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), | 
 |                          'need signal.pthread_sigmask()') | 
 |     @unittest.skipUnless(hasattr(signal, 'sigpending'), | 
 |                          'need signal.sigpending()') | 
 |     def test_sigpending(self): | 
 |         code = """if 1: | 
 |             import os | 
 |             import signal | 
 |  | 
 |             def handler(signum, frame): | 
 |                 1/0 | 
 |  | 
 |             signum = signal.SIGUSR1 | 
 |             signal.signal(signum, handler) | 
 |  | 
 |             signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) | 
 |             os.kill(os.getpid(), signum) | 
 |             pending = signal.sigpending() | 
 |             if pending != {signum}: | 
 |                 raise Exception('%s != {%s}' % (pending, signum)) | 
 |             try: | 
 |                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) | 
 |             except ZeroDivisionError: | 
 |                 pass | 
 |             else: | 
 |                 raise Exception("ZeroDivisionError not raised") | 
 |         """ | 
 |         assert_python_ok('-c', code) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_kill'), | 
 |                          'need signal.pthread_kill()') | 
 |     def test_pthread_kill(self): | 
 |         code = """if 1: | 
 |             import signal | 
 |             import threading | 
 |             import sys | 
 |  | 
 |             signum = signal.SIGUSR1 | 
 |  | 
 |             def handler(signum, frame): | 
 |                 1/0 | 
 |  | 
 |             signal.signal(signum, handler) | 
 |  | 
 |             if sys.platform == 'freebsd6': | 
 |                 # Issue #12392 and #12469: send a signal to the main thread | 
 |                 # doesn't work before the creation of the first thread on | 
 |                 # FreeBSD 6 | 
 |                 def noop(): | 
 |                     pass | 
 |                 thread = threading.Thread(target=noop) | 
 |                 thread.start() | 
 |                 thread.join() | 
 |  | 
 |             tid = threading.get_ident() | 
 |             try: | 
 |                 signal.pthread_kill(tid, signum) | 
 |             except ZeroDivisionError: | 
 |                 pass | 
 |             else: | 
 |                 raise Exception("ZeroDivisionError not raised") | 
 |         """ | 
 |         assert_python_ok('-c', code) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), | 
 |                          'need signal.pthread_sigmask()') | 
 |     def wait_helper(self, blocked, test): | 
 |         """ | 
 |         test: body of the "def test(signum):" function. | 
 |         blocked: number of the blocked signal | 
 |         """ | 
 |         code = '''if 1: | 
 |         import signal | 
 |         import sys | 
 |  | 
 |         def handler(signum, frame): | 
 |             1/0 | 
 |  | 
 |         %s | 
 |  | 
 |         blocked = %s | 
 |         signum = signal.SIGALRM | 
 |  | 
 |         # child: block and wait the signal | 
 |         try: | 
 |             signal.signal(signum, handler) | 
 |             signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) | 
 |  | 
 |             # Do the tests | 
 |             test(signum) | 
 |  | 
 |             # The handler must not be called on unblock | 
 |             try: | 
 |                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) | 
 |             except ZeroDivisionError: | 
 |                 print("the signal handler has been called", | 
 |                       file=sys.stderr) | 
 |                 sys.exit(1) | 
 |         except BaseException as err: | 
 |             print("error: {}".format(err), file=sys.stderr) | 
 |             sys.stderr.flush() | 
 |             sys.exit(1) | 
 |         ''' % (test.strip(), blocked) | 
 |  | 
 |         # sig*wait* must be called with the signal blocked: since the current | 
 |         # process might have several threads running, use a subprocess to have | 
 |         # a single thread. | 
 |         assert_python_ok('-c', code) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigwait'), | 
 |                          'need signal.sigwait()') | 
 |     def test_sigwait(self): | 
 |         self.wait_helper(signal.SIGALRM, ''' | 
 |         def test(signum): | 
 |             signal.alarm(1) | 
 |             received = signal.sigwait([signum]) | 
 |             if received != signum: | 
 |                 raise Exception('received %s, not %s' % (received, signum)) | 
 |         ''') | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), | 
 |                          'need signal.sigwaitinfo()') | 
 |     def test_sigwaitinfo(self): | 
 |         self.wait_helper(signal.SIGALRM, ''' | 
 |         def test(signum): | 
 |             signal.alarm(1) | 
 |             info = signal.sigwaitinfo([signum]) | 
 |             if info.si_signo != signum: | 
 |                 raise Exception("info.si_signo != %s" % signum) | 
 |         ''') | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), | 
 |                          'need signal.sigtimedwait()') | 
 |     def test_sigtimedwait(self): | 
 |         self.wait_helper(signal.SIGALRM, ''' | 
 |         def test(signum): | 
 |             signal.alarm(1) | 
 |             info = signal.sigtimedwait([signum], 10.1000) | 
 |             if info.si_signo != signum: | 
 |                 raise Exception('info.si_signo != %s' % signum) | 
 |         ''') | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), | 
 |                          'need signal.sigtimedwait()') | 
 |     def test_sigtimedwait_poll(self): | 
 |         # check that polling with sigtimedwait works | 
 |         self.wait_helper(signal.SIGALRM, ''' | 
 |         def test(signum): | 
 |             import os | 
 |             os.kill(os.getpid(), signum) | 
 |             info = signal.sigtimedwait([signum], 0) | 
 |             if info.si_signo != signum: | 
 |                 raise Exception('info.si_signo != %s' % signum) | 
 |         ''') | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), | 
 |                          'need signal.sigtimedwait()') | 
 |     def test_sigtimedwait_timeout(self): | 
 |         self.wait_helper(signal.SIGALRM, ''' | 
 |         def test(signum): | 
 |             received = signal.sigtimedwait([signum], 1.0) | 
 |             if received is not None: | 
 |                 raise Exception("received=%r" % (received,)) | 
 |         ''') | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), | 
 |                          'need signal.sigtimedwait()') | 
 |     def test_sigtimedwait_negative_timeout(self): | 
 |         signum = signal.SIGALRM | 
 |         self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), | 
 |                          'need signal.sigwaitinfo()') | 
 |     def test_sigwaitinfo_interrupted(self): | 
 |         self.wait_helper(signal.SIGUSR1, ''' | 
 |         def test(signum): | 
 |             import errno | 
 |  | 
 |             hndl_called = True | 
 |             def alarm_handler(signum, frame): | 
 |                 hndl_called = False | 
 |  | 
 |             signal.signal(signal.SIGALRM, alarm_handler) | 
 |             signal.alarm(1) | 
 |             try: | 
 |                 signal.sigwaitinfo([signal.SIGUSR1]) | 
 |             except OSError as e: | 
 |                 if e.errno == errno.EINTR: | 
 |                     if not hndl_called: | 
 |                         raise Exception("SIGALRM handler not called") | 
 |                 else: | 
 |                     raise Exception("Expected EINTR to be raised by sigwaitinfo") | 
 |             else: | 
 |                 raise Exception("Expected EINTR to be raised by sigwaitinfo") | 
 |         ''') | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'sigwait'), | 
 |                          'need signal.sigwait()') | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), | 
 |                          'need signal.pthread_sigmask()') | 
 |     @unittest.skipIf(threading is None, "test needs threading module") | 
 |     def test_sigwait_thread(self): | 
 |         # Check that calling sigwait() from a thread doesn't suspend the whole | 
 |         # process. A new interpreter is spawned to avoid problems when mixing | 
 |         # threads and fork(): only async-safe functions are allowed between | 
 |         # fork() and exec(). | 
 |         assert_python_ok("-c", """if True: | 
 |             import os, threading, sys, time, signal | 
 |  | 
 |             # the default handler terminates the process | 
 |             signum = signal.SIGUSR1 | 
 |  | 
 |             def kill_later(): | 
 |                 # wait until the main thread is waiting in sigwait() | 
 |                 time.sleep(1) | 
 |                 os.kill(os.getpid(), signum) | 
 |  | 
 |             # the signal must be blocked by all the threads | 
 |             signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) | 
 |             killer = threading.Thread(target=kill_later) | 
 |             killer.start() | 
 |             received = signal.sigwait([signum]) | 
 |             if received != signum: | 
 |                 print("sigwait() received %s, not %s" % (received, signum), | 
 |                       file=sys.stderr) | 
 |                 sys.exit(1) | 
 |             killer.join() | 
 |             # unblock the signal, which should have been cleared by sigwait() | 
 |             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) | 
 |         """) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), | 
 |                          'need signal.pthread_sigmask()') | 
 |     def test_pthread_sigmask_arguments(self): | 
 |         self.assertRaises(TypeError, signal.pthread_sigmask) | 
 |         self.assertRaises(TypeError, signal.pthread_sigmask, 1) | 
 |         self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) | 
 |         self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) | 
 |  | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), | 
 |                          'need signal.pthread_sigmask()') | 
 |     def test_pthread_sigmask(self): | 
 |         code = """if 1: | 
 |         import signal | 
 |         import os; import threading | 
 |  | 
 |         def handler(signum, frame): | 
 |             1/0 | 
 |  | 
 |         def kill(signum): | 
 |             os.kill(os.getpid(), signum) | 
 |  | 
 |         def read_sigmask(): | 
 |             return signal.pthread_sigmask(signal.SIG_BLOCK, []) | 
 |  | 
 |         signum = signal.SIGUSR1 | 
 |  | 
 |         # Install our signal handler | 
 |         old_handler = signal.signal(signum, handler) | 
 |  | 
 |         # Unblock SIGUSR1 (and copy the old mask) to test our signal handler | 
 |         old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) | 
 |         try: | 
 |             kill(signum) | 
 |         except ZeroDivisionError: | 
 |             pass | 
 |         else: | 
 |             raise Exception("ZeroDivisionError not raised") | 
 |  | 
 |         # Block and then raise SIGUSR1. The signal is blocked: the signal | 
 |         # handler is not called, and the signal is now pending | 
 |         signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) | 
 |         kill(signum) | 
 |  | 
 |         # Check the new mask | 
 |         blocked = read_sigmask() | 
 |         if signum not in blocked: | 
 |             raise Exception("%s not in %s" % (signum, blocked)) | 
 |         if old_mask ^ blocked != {signum}: | 
 |             raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) | 
 |  | 
 |         # Unblock SIGUSR1 | 
 |         try: | 
 |             # unblock the pending signal calls immediatly the signal handler | 
 |             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) | 
 |         except ZeroDivisionError: | 
 |             pass | 
 |         else: | 
 |             raise Exception("ZeroDivisionError not raised") | 
 |         try: | 
 |             kill(signum) | 
 |         except ZeroDivisionError: | 
 |             pass | 
 |         else: | 
 |             raise Exception("ZeroDivisionError not raised") | 
 |  | 
 |         # Check the new mask | 
 |         unblocked = read_sigmask() | 
 |         if signum in unblocked: | 
 |             raise Exception("%s in %s" % (signum, unblocked)) | 
 |         if blocked ^ unblocked != {signum}: | 
 |             raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) | 
 |         if old_mask != unblocked: | 
 |             raise Exception("%s != %s" % (old_mask, unblocked)) | 
 |         """ | 
 |         assert_python_ok('-c', code) | 
 |  | 
 |     @unittest.skipIf(sys.platform == 'freebsd6', | 
 |         "issue #12392: send a signal to the main thread doesn't work " | 
 |         "before the creation of the first thread on FreeBSD 6") | 
 |     @unittest.skipUnless(hasattr(signal, 'pthread_kill'), | 
 |                          'need signal.pthread_kill()') | 
 |     def test_pthread_kill_main_thread(self): | 
 |         # Test that a signal can be sent to the main thread with pthread_kill() | 
 |         # before any other thread has been created (see issue #12392). | 
 |         code = """if True: | 
 |             import threading | 
 |             import signal | 
 |             import sys | 
 |  | 
 |             def handler(signum, frame): | 
 |                 sys.exit(3) | 
 |  | 
 |             signal.signal(signal.SIGUSR1, handler) | 
 |             signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) | 
 |             sys.exit(2) | 
 |         """ | 
 |  | 
 |         with spawn_python('-c', code) as process: | 
 |             stdout, stderr = process.communicate() | 
 |             exitcode = process.wait() | 
 |             if exitcode != 3: | 
 |                 raise Exception("Child error (exit code %s): %s" % | 
 |                                 (exitcode, stdout)) | 
 |  | 
 |  | 
 | def test_main(): | 
 |     try: | 
 |         support.run_unittest(PosixTests, InterProcessSignalTests, | 
 |                              WakeupSignalTests, SiginterruptTest, | 
 |                              ItimerTest, WindowsSignalTests, | 
 |                              PendingSignalsTests) | 
 |     finally: | 
 |         support.reap_children() | 
 |  | 
 |  | 
 | if __name__ == "__main__": | 
 |     test_main() |