Issue #8407: Add pthread_kill(), sigpending() and sigwait() functions to the
signal module.
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index c74f001..c1054ed 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -8,6 +8,10 @@
 import subprocess
 import traceback
 import sys, os, time, errno
+try:
+    import threading
+except ImportError:
+    threading = None
 
 if sys.platform in ('os2', 'riscos'):
     raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
@@ -187,7 +191,7 @@
 
 
 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
-class BasicSignalTests(unittest.TestCase):
+class PosixTests(unittest.TestCase):
     def trivial_signal_handler(self, *args):
         pass
 
@@ -484,50 +488,121 @@
         self.assertEqual(self.hndl_called, True)
 
 
-@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
-                     'need signal.pthread_sigmask()')
 class PendingSignalsTests(unittest.TestCase):
     """
-    Tests for the pthread_sigmask() function.
+    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
+    functions.
     """
+    def setUp(self):
+        self.has_pthread_kill = hasattr(signal, 'pthread_kill')
+
     def handler(self, signum, frame):
         1/0
 
     def read_sigmask(self):
         return signal.pthread_sigmask(signal.SIG_BLOCK, [])
 
-    def test_pthread_sigmask_arguments(self):
-        self.assertRaises(TypeError, signal.pthread_sigmask)
-        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
-        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
-        self.assertRaises(RuntimeError, signal.pthread_sigmask, 1700, [])
+    def can_test_blocked_signals(self, skip):
+        """
+        Check if a blocked signal can be raised to the main thread without
+        calling its signal handler. We need pthread_kill() or exactly one
+        thread (the main thread).
 
-    def test_pthread_sigmask(self):
-        import faulthandler
-        pid = os.getpid()
-        signum = signal.SIGUSR1
+        Return True if it's possible. Otherwise, return False and print a
+        warning if skip is False, or raise a SkipTest exception if skip is
+        True.
+        """
+        if self.has_pthread_kill:
+            return True
 
         # The fault handler timeout thread masks all signals. If the main
         # thread masks also SIGUSR1, all threads mask this signal. In this
         # case, if we send SIGUSR1 to the process, the signal is pending in the
         # main or the faulthandler timeout thread.  Unblock SIGUSR1 in the main
         # thread calls the signal handler only if the signal is pending for the
-        # main thread.
-        #
-        # Stop the faulthandler timeout thread to workaround this problem.
-        # Another solution would be to send the signal directly to the main
-        # thread using pthread_kill(), but Python doesn't expose this
-        # function.
+        # main thread. Stop the faulthandler timeout thread to workaround this
+        # problem.
+        import faulthandler
         faulthandler.cancel_dump_tracebacks_later()
 
-        # Issue #11998: The _tkinter module loads the Tcl library which creates
-        # a thread waiting events in select(). This thread receives signals
-        # blocked by all other threads. We cannot test blocked signals if the
-        # _tkinter module is loaded.
-        can_test_blocked_signals = ('_tkinter' not in sys.modules)
-        if not can_test_blocked_signals:
-            print("WARNING: _tkinter is loaded, cannot test signals "
-                  "blocked by pthread_sigmask() (issue #11998)")
+        # Issue #11998: The _tkinter module loads the Tcl library which
+        # creates a thread waiting events in select(). This thread receives
+        # signals blocked by all other threads. We cannot test blocked
+        # signals
+        if '_tkinter' in sys.modules:
+            message = ("_tkinter is loaded and pthread_kill() is missing, "
+                       "cannot test blocked signals (issue #11998)")
+            if skip:
+                self.skipTest(message)
+            else:
+                print("WARNING: %s" % message)
+            return False
+        return True
+
+    def kill(self, signum):
+        if self.has_pthread_kill:
+            tid = threading.current_thread().ident
+            signal.pthread_kill(tid, signum)
+        else:
+            pid = os.getpid()
+            os.kill(pid, signum)
+
+    @unittest.skipUnless(hasattr(signal, 'sigpending'),
+                         'need signal.sigpending()')
+    def test_sigpending_empty(self):
+        self.assertEqual(signal.sigpending(), set())
+
+    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+                         'need signal.pthread_sigmask()')
+    @unittest.skipUnless(hasattr(signal, 'sigpending'),
+                         'need signal.sigpending()')
+    def test_sigpending(self):
+        self.can_test_blocked_signals(True)
+
+        signum = signal.SIGUSR1
+        old_handler = signal.signal(signum, self.handler)
+        self.addCleanup(signal.signal, signum, old_handler)
+
+        signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+        self.kill(signum)
+        self.assertEqual(signal.sigpending(), {signum})
+        with self.assertRaises(ZeroDivisionError):
+            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+
+    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+                         'need signal.pthread_kill()')
+    def test_pthread_kill(self):
+        signum = signal.SIGUSR1
+        current = threading.current_thread().ident
+
+        old_handler = signal.signal(signum, self.handler)
+        self.addCleanup(signal.signal, signum, old_handler)
+
+        with self.assertRaises(ZeroDivisionError):
+            signal.pthread_kill(current, signum)
+
+    @unittest.skipUnless(hasattr(signal, 'sigwait'),
+                         'need signal.sigwait()')
+    def test_sigwait(self):
+        old_handler = signal.signal(signal.SIGALRM, self.handler)
+        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+
+        signal.alarm(1)
+        self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM)
+
+    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+                         'need signal.pthread_sigmask()')
+    def test_pthread_sigmask_arguments(self):
+        self.assertRaises(TypeError, signal.pthread_sigmask)
+        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
+        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
+        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
+
+    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+                         'need signal.pthread_sigmask()')
+    def test_pthread_sigmask(self):
+        test_blocked_signals = self.can_test_blocked_signals(False)
+        signum = signal.SIGUSR1
 
         # Install our signal handler
         old_handler = signal.signal(signum, self.handler)
@@ -537,13 +612,13 @@
         old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
         self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
         with self.assertRaises(ZeroDivisionError):
-            os.kill(pid, signum)
+            self.kill(signum)
 
         # Block and then raise SIGUSR1. The signal is blocked: the signal
         # handler is not called, and the signal is now pending
         signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
-        if can_test_blocked_signals:
-            os.kill(pid, signum)
+        if test_blocked_signals:
+            self.kill(signum)
 
         # Check the new mask
         blocked = self.read_sigmask()
@@ -551,14 +626,14 @@
         self.assertEqual(old_mask ^ blocked, {signum})
 
         # Unblock SIGUSR1
-        if can_test_blocked_signals:
+        if test_blocked_signals:
             with self.assertRaises(ZeroDivisionError):
                 # unblock the pending signal calls immediatly the signal handler
                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
         else:
             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
         with self.assertRaises(ZeroDivisionError):
-            os.kill(pid, signum)
+            self.kill(signum)
 
         # Check the new mask
         unblocked = self.read_sigmask()
@@ -570,7 +645,7 @@
 
 def test_main():
     try:
-        support.run_unittest(BasicSignalTests, InterProcessSignalTests,
+        support.run_unittest(PosixTests, InterProcessSignalTests,
                              WakeupSignalTests, SiginterruptTest,
                              ItimerTest, WindowsSignalTests,
                              PendingSignalsTests)