Use the faulthandler module's infrastructure to write a GIL-less
memory watchdog for timely stats collection.
diff --git a/Lib/test/support.py b/Lib/test/support.py
index 62ad606..cce04aa 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -23,6 +23,7 @@
 import sysconfig
 import fnmatch
 import logging.handlers
+import struct
 
 try:
     import _thread, threading
@@ -34,6 +35,10 @@
 except ImportError:
     multiprocessing = None
 
+try:
+    import faulthandler
+except ImportError:
+    faulthandler = None
 
 try:
     import zlib
@@ -1133,41 +1138,66 @@
         raise ValueError('Memory limit %r too low to be useful' % (limit,))
     max_memuse = memlimit
 
-def _memory_watchdog(start_evt, finish_evt, period=10.0):
-    """A function which periodically watches the process' memory consumption
+class _MemoryWatchdog:
+    """An object which periodically watches the process' memory consumption
     and prints it out.
     """
-    # XXX: because of the GIL, and because the very long operations tested
-    # in most bigmem tests are uninterruptible, the loop below gets woken up
-    # much less often than expected.
-    # The polling code should be rewritten in raw C, without holding the GIL,
-    # and push results onto an anonymous pipe.
-    try:
-        page_size = os.sysconf('SC_PAGESIZE')
-    except (ValueError, AttributeError):
+
+    def __init__(self):
+        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
+        self.started = False
+        self.thread = None
         try:
-            page_size = os.sysconf('SC_PAGE_SIZE')
+            self.page_size = os.sysconf('SC_PAGESIZE')
         except (ValueError, AttributeError):
-            page_size = 4096
-    procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
-    try:
-        f = open(procfile, 'rb')
-    except IOError as e:
-        warnings.warn('/proc not available for stats: {}'.format(e),
-                      RuntimeWarning)
-        sys.stderr.flush()
-        return
-    with f:
-        start_evt.set()
-        old_data = -1
-        while not finish_evt.wait(period):
-            f.seek(0)
-            statm = f.read().decode('ascii')
-            data = int(statm.split()[5])
-            if data != old_data:
-                old_data = data
+            try:
+                self.page_size = os.sysconf('SC_PAGE_SIZE')
+            except (ValueError, AttributeError):
+                self.page_size = 4096
+
+    def consumer(self, fd):
+        HEADER = "l"
+        header_size = struct.calcsize(HEADER)
+        try:
+            while True:
+                header = os.read(fd, header_size)
+                if len(header) < header_size:
+                    # Pipe closed on other end
+                    break
+                data_len, = struct.unpack(HEADER, header)
+                data = os.read(fd, data_len)
+                statm = data.decode('ascii')
+                data = int(statm.split()[5])
                 print(" ... process data size: {data:.1f}G"
-                       .format(data=data * page_size / (1024 ** 3)))
+                       .format(data=data * self.page_size / (1024 ** 3)))
+        finally:
+            os.close(fd)
+
+    def start(self):
+        if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
+            return
+        try:
+            rfd = os.open(self.procfile, os.O_RDONLY)
+        except OSError as e:
+            warnings.warn('/proc not available for stats: {}'.format(e),
+                          RuntimeWarning)
+            sys.stderr.flush()
+            return
+        pipe_fd, wfd = os.pipe()
+        # _file_watchdog() doesn't take the GIL in its child thread, and
+        # therefore collects statistics timely
+        faulthandler._file_watchdog(rfd, wfd, 3.0)
+        self.started = True
+        self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
+        self.thread.daemon = True
+        self.thread.start()
+
+    def stop(self):
+        if not self.started:
+            return
+        faulthandler._cancel_file_watchdog()
+        self.thread.join()
+
 
 def bigmemtest(size, memuse, dry_run=True):
     """Decorator for bigmem tests.
@@ -1194,27 +1224,20 @@
                     "not enough memory: %.1fG minimum needed"
                     % (size * memuse / (1024 ** 3)))
 
-            if real_max_memuse and verbose and threading:
+            if real_max_memuse and verbose and faulthandler and threading:
                 print()
                 print(" ... expected peak memory use: {peak:.1f}G"
                       .format(peak=size * memuse / (1024 ** 3)))
-                sys.stdout.flush()
-                start_evt = threading.Event()
-                finish_evt = threading.Event()
-                t = threading.Thread(target=_memory_watchdog,
-                                     args=(start_evt, finish_evt, 0.5))
-                t.daemon = True
-                t.start()
-                start_evt.set()
+                watchdog = _MemoryWatchdog()
+                watchdog.start()
             else:
-                t = None
+                watchdog = None
 
             try:
                 return f(self, maxsize)
             finally:
-                if t:
-                    finish_evt.set()
-                    t.join()
+                if watchdog:
+                    watchdog.stop()
 
         wrapper.size = size
         wrapper.memuse = memuse
diff --git a/Modules/faulthandler.c b/Modules/faulthandler.c
index 8ee0630..3002840 100644
--- a/Modules/faulthandler.c
+++ b/Modules/faulthandler.c
@@ -13,6 +13,7 @@
 
 #ifdef WITH_THREAD
 #  define FAULTHANDLER_LATER
+#  define FAULTHANDLER_WATCHDOG
 #endif
 
 #ifndef MS_WINDOWS
@@ -65,6 +66,20 @@
 } thread;
 #endif
 
+#ifdef FAULTHANDLER_WATCHDOG
+static struct {
+    int rfd;
+    int wfd;
+    PY_TIMEOUT_T period_us;   /* period in microseconds */
+    /* The main thread always holds this lock. It is only released when
+       faulthandler_watchdog() is interrupted before this thread exits, or at
+       Python exit. */
+    PyThread_type_lock cancel_event;
+    /* released by child thread when joined */
+    PyThread_type_lock running;
+} watchdog;
+#endif
+
 #ifdef FAULTHANDLER_USER
 typedef struct {
     int enabled;
@@ -587,6 +602,138 @@
 }
 #endif  /* FAULTHANDLER_LATER */
 
+#ifdef FAULTHANDLER_WATCHDOG
+
+static void
+file_watchdog(void *unused)
+{
+    PyLockStatus st;
+    PY_TIMEOUT_T timeout;
+
+    const int MAXDATA = 1024;
+    char buf1[MAXDATA], buf2[MAXDATA];
+    char *data = buf1, *old_data = buf2;
+    Py_ssize_t data_len, old_data_len = -1;
+
+#if defined(HAVE_PTHREAD_SIGMASK) && !defined(HAVE_BROKEN_PTHREAD_SIGMASK)
+    sigset_t set;
+
+    /* we don't want to receive any signal */
+    sigfillset(&set);
+    pthread_sigmask(SIG_SETMASK, &set, NULL);
+#endif
+
+    /* On first pass, feed file contents immediately */
+    timeout = 0;
+    do {
+        st = PyThread_acquire_lock_timed(watchdog.cancel_event,
+                                         timeout, 0);
+        timeout = watchdog.period_us;
+        if (st == PY_LOCK_ACQUIRED) {
+            PyThread_release_lock(watchdog.cancel_event);
+            break;
+        }
+        /* Timeout => read and write data */
+        assert(st == PY_LOCK_FAILURE);
+
+        if (lseek(watchdog.rfd, 0, SEEK_SET) < 0) {
+            break;
+        }
+        data_len = read(watchdog.rfd, data, MAXDATA);
+        if (data_len < 0) {
+            break;
+        }
+        if (data_len != old_data_len || memcmp(data, old_data, data_len)) {
+            char *tdata;
+            Py_ssize_t tlen;
+            /* Contents changed, feed them to wfd */
+            long x = (long) data_len;
+            /* We can't do anything if the consumer is too slow, just bail out */
+            if (write(watchdog.wfd, (void *) &x, sizeof(x)) < sizeof(x))
+                break;
+            if (write(watchdog.wfd, data, data_len) < data_len)
+                break;
+            tdata = data;
+            data = old_data;
+            old_data = tdata;
+            tlen = data_len;
+            data_len = old_data_len;
+            old_data_len = tlen;
+        }
+    } while (1);
+
+    close(watchdog.rfd);
+    close(watchdog.wfd);
+
+    /* The only way out */
+    PyThread_release_lock(watchdog.running);
+}
+
+static void
+cancel_file_watchdog(void)
+{
+    /* Notify cancellation */
+    PyThread_release_lock(watchdog.cancel_event);
+
+    /* Wait for thread to join */
+    PyThread_acquire_lock(watchdog.running, 1);
+    PyThread_release_lock(watchdog.running);
+
+    /* The main thread should always hold the cancel_event lock */
+    PyThread_acquire_lock(watchdog.cancel_event, 1);
+}
+
+static PyObject*
+faulthandler_file_watchdog(PyObject *self,
+                           PyObject *args, PyObject *kwargs)
+{
+    static char *kwlist[] = {"rfd", "wfd", "period", NULL};
+    double period;
+    PY_TIMEOUT_T period_us;
+    int rfd, wfd;
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs,
+        "iid:_file_watchdog", kwlist,
+        &rfd, &wfd, &period))
+        return NULL;
+    if ((period * 1e6) >= (double) PY_TIMEOUT_MAX) {
+        PyErr_SetString(PyExc_OverflowError,  "period value is too large");
+        return NULL;
+    }
+    period_us = (PY_TIMEOUT_T)(period * 1e6);
+    if (period_us <= 0) {
+        PyErr_SetString(PyExc_ValueError, "period must be greater than 0");
+        return NULL;
+    }
+
+    /* Cancel previous thread, if running */
+    cancel_file_watchdog();
+
+    watchdog.rfd = rfd;
+    watchdog.wfd = wfd;
+    watchdog.period_us = period_us;
+
+    /* Arm these locks to serve as events when released */
+    PyThread_acquire_lock(watchdog.running, 1);
+
+    if (PyThread_start_new_thread(file_watchdog, NULL) == -1) {
+        PyThread_release_lock(watchdog.running);
+        PyErr_SetString(PyExc_RuntimeError,
+                        "unable to start file watchdog thread");
+        return NULL;
+    }
+
+    Py_RETURN_NONE;
+}
+
+static PyObject*
+faulthandler_cancel_file_watchdog(PyObject *self)
+{
+    cancel_file_watchdog();
+    Py_RETURN_NONE;
+}
+#endif  /* FAULTHANDLER_WATCHDOG */
+
 #ifdef FAULTHANDLER_USER
 static int
 faulthandler_register(int signum, int chain, _Py_sighandler_t *p_previous)
@@ -973,6 +1120,18 @@
                "to dump_tracebacks_later().")},
 #endif
 
+#ifdef FAULTHANDLER_WATCHDOG
+    {"_file_watchdog",
+     (PyCFunction)faulthandler_file_watchdog, METH_VARARGS|METH_KEYWORDS,
+     PyDoc_STR("_file_watchdog(rfd, wfd, period):\n"
+               "feed the contents of 'rfd' to 'wfd', if changed,\n"
+               "every 'period seconds'.")},
+    {"_cancel_file_watchdog",
+     (PyCFunction)faulthandler_cancel_file_watchdog, METH_NOARGS,
+     PyDoc_STR("_cancel_file_watchdog():\ncancel the previous call "
+               "to _file_watchdog().")},
+#endif
+
 #ifdef FAULTHANDLER_USER
     {"register",
      (PyCFunction)faulthandler_register_py, METH_VARARGS|METH_KEYWORDS,
@@ -1097,6 +1256,16 @@
     }
     PyThread_acquire_lock(thread.cancel_event, 1);
 #endif
+#ifdef FAULTHANDLER_WATCHDOG
+    watchdog.cancel_event = PyThread_allocate_lock();
+    watchdog.running = PyThread_allocate_lock();
+    if (!watchdog.cancel_event || !watchdog.running) {
+        PyErr_SetString(PyExc_RuntimeError,
+                        "could not allocate locks for faulthandler");
+        return -1;
+    }
+    PyThread_acquire_lock(watchdog.cancel_event, 1);
+#endif
 
     return faulthandler_env_options();
 }
@@ -1121,6 +1290,20 @@
     }
 #endif
 
+#ifdef FAULTHANDLER_WATCHDOG
+    /* file watchdog */
+    cancel_file_watchdog();
+    if (watchdog.cancel_event) {
+        PyThread_release_lock(watchdog.cancel_event);
+        PyThread_free_lock(watchdog.cancel_event);
+        watchdog.cancel_event = NULL;
+    }
+    if (watchdog.running) {
+        PyThread_free_lock(watchdog.running);
+        watchdog.running = NULL;
+    }
+#endif
+
 #ifdef FAULTHANDLER_USER
     /* user */
     if (user_signals != NULL) {