Reland r49188.
It was reverted due to breaking a valgrind test which has since been disabled.

Review URL: http://codereview.chromium.org/2763004

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@49320 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: 9cfb89a8002a2fc6b0ea3fd4eccb33b6f4dd464a
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 7a09f38..3b13617 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -149,6 +149,16 @@
   destruction_observers_.RemoveObserver(obs);
 }
 
+void MessageLoop::AddTaskObserver(TaskObserver *obs) {
+  DCHECK_EQ(this, current());
+  task_observers_.AddObserver(obs);
+}
+
+void MessageLoop::RemoveTaskObserver(TaskObserver *obs) {
+  DCHECK_EQ(this, current());
+  task_observers_.RemoveObserver(obs);
+}
+
 void MessageLoop::Run() {
   AutoRunState save_state(this);
   RunHandler();
@@ -325,7 +335,10 @@
   nestable_tasks_allowed_ = false;
 
   HistogramEvent(kTaskRunEvent);
+  FOR_EACH_OBSERVER(TaskObserver, task_observers_,
+                    WillProcessTask(task->tracked_birth_time()));
   task->Run();
+  FOR_EACH_OBSERVER(TaskObserver, task_observers_, DidProcessTask());
   delete task;
 
   nestable_tasks_allowed_ = true;
@@ -584,16 +597,9 @@
 // MessageLoopForUI
 
 #if defined(OS_WIN)
-void MessageLoopForUI::WillProcessMessage(const MSG& message) {
-  pump_win()->WillProcessMessage(message);
-}
 void MessageLoopForUI::DidProcessMessage(const MSG& message) {
   pump_win()->DidProcessMessage(message);
 }
-void MessageLoopForUI::PumpOutPendingPaintMessages() {
-  pump_ui()->PumpOutPendingPaintMessages();
-}
-
 #endif  // defined(OS_WIN)
 
 #if !defined(OS_MACOSX)
diff --git a/base/message_loop.h b/base/message_loop.h
index 1a04323..8b4be7c 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -8,6 +8,7 @@
 #include <queue>
 #include <string>
 
+#include "base/basictypes.h"
 #include "base/histogram.h"
 #include "base/message_pump.h"
 #include "base/observer_list.h"
@@ -58,6 +59,24 @@
 //
 class MessageLoop : public base::MessagePump::Delegate {
  public:
+  // A TaskObserver is an object that receives task notifications from the
+  // MessageLoop.
+  //
+  // NOTE: A TaskObserver implementation should be extremely fast!
+  class TaskObserver {
+   public:
+    TaskObserver() {}
+
+    // This method is called before processing a task.
+    virtual void WillProcessTask(base::TimeTicks birth_time) = 0;
+
+    // This method is called after processing a task.
+    virtual void DidProcessTask() = 0;
+
+   protected:
+    virtual ~TaskObserver() {}
+  };
+
   static void EnableHistogrammer(bool enable_histogrammer);
 
   // A DestructionObserver is notified when the current MessageLoop is being
@@ -255,9 +274,14 @@
   // Returns true if we are currently running a nested message loop.
   bool IsNested();
 
+  // These functions can only be called on the same thread that |this| is
+  // running on.
+  void AddTaskObserver(TaskObserver* task_observer);
+  void RemoveTaskObserver(TaskObserver* task_observer);
+
 #if defined(OS_WIN)
   typedef base::MessagePumpWin::Dispatcher Dispatcher;
-  typedef base::MessagePumpWin::Observer Observer;
+  typedef base::MessagePumpForUI::Observer Observer;
 #elif !defined(OS_MACOSX)
   typedef base::MessagePumpForUI::Dispatcher Dispatcher;
   typedef base::MessagePumpForUI::Observer Observer;
@@ -433,6 +457,8 @@
   // The next sequence number to use for delayed tasks.
   int next_sequence_num_;
 
+  ObserverList<TaskObserver> task_observers_;
+
   DISALLOW_COPY_AND_ASSIGN(MessageLoop);
 };
 
@@ -456,10 +482,8 @@
   }
 
 #if defined(OS_WIN)
-  void WillProcessMessage(const MSG& message);
   void DidProcessMessage(const MSG& message);
-  void PumpOutPendingPaintMessages();
-#endif
+#endif  // defined(OS_WIN)
 
 #if !defined(OS_MACOSX)
   // Please see message_pump_win/message_pump_glib for definitions of these
@@ -473,7 +497,7 @@
   base::MessagePumpForUI* pump_ui() {
     return static_cast<base::MessagePumpForUI*>(pump_.get());
   }
-#endif  // defined(OS_MACOSX)
+#endif  // !defined(OS_MACOSX)
 };
 
 // Do not add any member variables to MessageLoopForUI!  This is important b/c
@@ -491,6 +515,24 @@
 //
 class MessageLoopForIO : public MessageLoop {
  public:
+#if defined(OS_WIN)
+  typedef base::MessagePumpForIO::IOHandler IOHandler;
+  typedef base::MessagePumpForIO::IOContext IOContext;
+  typedef base::MessagePumpForIO::IOObserver IOObserver;
+#elif defined(OS_POSIX)
+  typedef base::MessagePumpLibevent::Watcher Watcher;
+  typedef base::MessagePumpLibevent::FileDescriptorWatcher
+      FileDescriptorWatcher;
+  typedef base::MessagePumpLibevent::IOObserver IOObserver;
+
+  enum Mode {
+    WATCH_READ = base::MessagePumpLibevent::WATCH_READ,
+    WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE,
+    WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE
+  };
+
+#endif
+
   MessageLoopForIO() : MessageLoop(TYPE_IO) {
   }
 
@@ -501,10 +543,15 @@
     return static_cast<MessageLoopForIO*>(loop);
   }
 
-#if defined(OS_WIN)
-  typedef base::MessagePumpForIO::IOHandler IOHandler;
-  typedef base::MessagePumpForIO::IOContext IOContext;
+  void AddIOObserver(IOObserver* io_observer) {
+    pump_io()->AddIOObserver(io_observer);
+  }
 
+  void RemoveIOObserver(IOObserver* io_observer) {
+    pump_io()->RemoveIOObserver(io_observer);
+  }
+
+#if defined(OS_WIN)
   // Please see MessagePumpWin for definitions of these methods.
   void RegisterIOHandler(HANDLE file_handle, IOHandler* handler);
   bool WaitForIOCompletion(DWORD timeout, IOHandler* filter);
@@ -516,22 +563,17 @@
   }
 
 #elif defined(OS_POSIX)
-  typedef base::MessagePumpLibevent::Watcher Watcher;
-  typedef base::MessagePumpLibevent::FileDescriptorWatcher
-      FileDescriptorWatcher;
-
-  enum Mode {
-    WATCH_READ = base::MessagePumpLibevent::WATCH_READ,
-    WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE,
-    WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE
-  };
-
   // Please see MessagePumpLibevent for definition.
   bool WatchFileDescriptor(int fd,
                            bool persistent,
                            Mode mode,
                            FileDescriptorWatcher *controller,
                            Watcher *delegate);
+
+ private:
+  base::MessagePumpLibevent* pump_io() {
+    return static_cast<base::MessagePumpLibevent*>(pump_.get());
+  }
 #endif  // defined(OS_POSIX)
 };
 
diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc
index de039f2..ea04324 100644
--- a/base/message_loop_unittest.cc
+++ b/base/message_loop_unittest.cc
@@ -2,10 +2,12 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "base/eintr_wrapper.h"
 #include "base/logging.h"
 #include "base/message_loop.h"
 #include "base/platform_thread.h"
 #include "base/ref_counted.h"
+#include "base/task.h"
 #include "base/thread.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
@@ -1455,6 +1457,70 @@
   RunTest_NonNestableInNestedLoop(MessageLoop::TYPE_IO, true);
 }
 
+class DummyTask : public Task {
+ public:
+  DummyTask(int num_tasks) : num_tasks_(num_tasks) {}
+
+  virtual void Run() {
+    if (num_tasks_ > 1) {
+      MessageLoop::current()->PostTask(
+          FROM_HERE,
+          new DummyTask(num_tasks_ - 1));
+    } else {
+      MessageLoop::current()->Quit();
+    }
+  }
+
+ private:
+  const int num_tasks_;
+};
+
+class DummyTaskObserver : public MessageLoop::TaskObserver {
+ public:
+  DummyTaskObserver(int num_tasks)
+      : num_tasks_started_(0),
+        num_tasks_processed_(0),
+        num_tasks_(num_tasks) {}
+
+  virtual ~DummyTaskObserver() {}
+
+  virtual void WillProcessTask(base::TimeTicks /* birth_time */) {
+    num_tasks_started_++;
+    EXPECT_LE(num_tasks_started_, num_tasks_);
+    EXPECT_EQ(num_tasks_started_, num_tasks_processed_ + 1);
+  }
+
+  virtual void DidProcessTask() {
+    num_tasks_processed_++;
+    EXPECT_LE(num_tasks_started_, num_tasks_);
+    EXPECT_EQ(num_tasks_started_, num_tasks_processed_);
+  }
+
+  int num_tasks_started() const { return num_tasks_started_; }
+  int num_tasks_processed() const { return num_tasks_processed_; }
+
+ private:
+  int num_tasks_started_;
+  int num_tasks_processed_;
+  const int num_tasks_;
+
+  DISALLOW_COPY_AND_ASSIGN(DummyTaskObserver);
+};
+
+TEST(MessageLoopTest, TaskObserver) {
+  const int kNumTasks = 6;
+  DummyTaskObserver observer(kNumTasks);
+
+  MessageLoop loop;
+  loop.AddTaskObserver(&observer);
+  loop.PostTask(FROM_HERE, new DummyTask(kNumTasks));
+  loop.Run();
+  loop.RemoveTaskObserver(&observer);
+
+  EXPECT_EQ(kNumTasks, observer.num_tasks_started());
+  EXPECT_EQ(kNumTasks, observer.num_tasks_processed());
+}
+
 #if defined(OS_WIN)
 TEST(MessageLoopTest, Dispatcher) {
   // This test requires a UI loop
@@ -1479,8 +1545,7 @@
 
 namespace {
 
-class QuitDelegate : public
-    base::MessagePumpLibevent::Watcher {
+class QuitDelegate : public base::MessagePumpLibevent::Watcher {
  public:
   virtual void OnFileCanWriteWithoutBlocking(int fd) {
     MessageLoop::current()->Quit();
@@ -1490,8 +1555,6 @@
   }
 };
 
-}  // namespace
-
 TEST(MessageLoopTest, DISABLED_FileDescriptorWatcherOutlivesMessageLoop) {
   // Simulate a MessageLoop that dies before an FileDescriptorWatcher.
   // This could happen when people use the Singleton pattern or atexit.
@@ -1518,8 +1581,8 @@
       // and don't run the message loop, just destroy it.
     }
   }
-  close(pipefds[0]);
-  close(pipefds[1]);
+  HANDLE_EINTR(close(pipefds[0]));
+  HANDLE_EINTR(close(pipefds[1]));
 }
 
 TEST(MessageLoopTest, FileDescriptorWatcherDoubleStop) {
@@ -1541,8 +1604,10 @@
       controller.StopWatchingFileDescriptor();
     }
   }
-  close(pipefds[0]);
-  close(pipefds[1]);
+  HANDLE_EINTR(close(pipefds[0]));
+  HANDLE_EINTR(close(pipefds[1]));
 }
 
+}  // namespace
+
 #endif  // defined(OS_POSIX)
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
index 2ad1d97..c2390b4 100644
--- a/base/message_pump_libevent.cc
+++ b/base/message_pump_libevent.cc
@@ -7,9 +7,10 @@
 #include <errno.h>
 #include <fcntl.h>
 
-#include "eintr_wrapper.h"
 #include "base/auto_reset.h"
+#include "base/eintr_wrapper.h"
 #include "base/logging.h"
+#include "base/observer_list.h"
 #include "base/scoped_nsautorelease_pool.h"
 #include "base/scoped_ptr.h"
 #include "base/time.h"
@@ -50,7 +51,9 @@
 
 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
     : is_persistent_(false),
-      event_(NULL) {
+      event_(NULL),
+      pump_(NULL),
+      watcher_(NULL) {
 }
 
 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
@@ -82,9 +85,25 @@
   // event_del() is a no-op if the event isn't active.
   int rv = event_del(e);
   delete e;
+  pump_ = NULL;
+  watcher_ = NULL;
   return (rv == 0);
 }
 
+void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
+    int fd, MessagePumpLibevent* pump) {
+  pump->WillProcessIOEvent();
+  watcher_->OnFileCanReadWithoutBlocking(fd);
+  pump->DidProcessIOEvent();
+}
+
+void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
+    int fd, MessagePumpLibevent* pump) {
+  pump->WillProcessIOEvent();
+  watcher_->OnFileCanWriteWithoutBlocking(fd);
+  pump->DidProcessIOEvent();
+}
+
 // Called if a byte is received on the wakeup pipe.
 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
   base::MessagePumpLibevent* that =
@@ -142,9 +161,9 @@
   event_del(wakeup_event_);
   delete wakeup_event_;
   if (wakeup_pipe_in_ >= 0)
-    close(wakeup_pipe_in_);
+    HANDLE_EINTR(close(wakeup_pipe_in_));
   if (wakeup_pipe_out_ >= 0)
-    close(wakeup_pipe_out_);
+    HANDLE_EINTR(close(wakeup_pipe_out_));
   event_base_free(event_base_);
 }
 
@@ -190,7 +209,7 @@
   }
 
   // Set current interest mask and message pump for this event.
-  event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate);
+  event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
 
   // Tell libevent which message pump this socket will belong to when we add it.
   if (event_base_set(event_base_, evt.get()) != 0) {
@@ -204,19 +223,25 @@
 
   // Transfer ownership of evt to controller.
   controller->Init(evt.release(), persistent);
+
+  controller->set_watcher(delegate);
+  controller->set_pump(this);
+
   return true;
 }
 
-
 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
                                                  void* context) {
-  Watcher* watcher = static_cast<Watcher*>(context);
+  FileDescriptorWatcher* controller =
+      static_cast<FileDescriptorWatcher*>(context);
+
+  MessagePumpLibevent* pump = controller->pump();
 
   if (flags & EV_WRITE) {
-    watcher->OnFileCanWriteWithoutBlocking(fd);
+    controller->OnFileCanWriteWithoutBlocking(fd, pump);
   }
   if (flags & EV_READ) {
-    watcher->OnFileCanReadWithoutBlocking(fd);
+    controller->OnFileCanReadWithoutBlocking(fd, pump);
   }
 }
 
@@ -304,4 +329,20 @@
   delayed_work_time_ = delayed_work_time;
 }
 
+void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
+  io_observers_.AddObserver(obs);
+}
+
+void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
+  io_observers_.RemoveObserver(obs);
+}
+
+void MessagePumpLibevent::WillProcessIOEvent() {
+  FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
+}
+
+void MessagePumpLibevent::DidProcessIOEvent() {
+  FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
+}
+
 }  // namespace base
diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h
index 8e2f77c..6516128 100644
--- a/base/message_pump_libevent.h
+++ b/base/message_pump_libevent.h
@@ -5,7 +5,9 @@
 #ifndef BASE_MESSAGE_PUMP_LIBEVENT_H_
 #define BASE_MESSAGE_PUMP_LIBEVENT_H_
 
+#include "base/basictypes.h"
 #include "base/message_pump.h"
+#include "base/observer_list.h"
 #include "base/time.h"
 
 // Declare structs we need from libevent.h rather than including it
@@ -18,33 +20,19 @@
 // TODO(dkegel): add support for background file IO somehow
 class MessagePumpLibevent : public MessagePump {
  public:
+  class IOObserver {
+   public:
+    IOObserver() {}
 
-  // Object returned by WatchFileDescriptor to manage further watching.
-  class FileDescriptorWatcher {
-    public:
-     FileDescriptorWatcher();
-     ~FileDescriptorWatcher();  // Implicitly calls StopWatchingFileDescriptor.
+    // An IOObserver is an object that receives IO notifications from the
+    // MessagePump.
+    //
+    // NOTE: An IOObserver implementation should be extremely fast!
+    virtual void WillProcessIOEvent() = 0;
+    virtual void DidProcessIOEvent() = 0;
 
-     // NOTE: These methods aren't called StartWatching()/StopWatching() to
-     // avoid confusion with the win32 ObjectWatcher class.
-
-     // Stop watching the FD, always safe to call.  No-op if there's nothing
-     // to do.
-     bool StopWatchingFileDescriptor();
-
-    private:
-     // Called by MessagePumpLibevent, ownership of |e| is transferred to this
-     // object.
-     void Init(event* e, bool is_persistent);
-
-     // Used by MessagePumpLibevent to take ownership of event_.
-     event *ReleaseEvent();
-     friend class MessagePumpLibevent;
-
-    private:
-     bool is_persistent_;  // false if this event is one-shot.
-     event* event_;
-     DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher);
+   protected:
+    virtual ~IOObserver() {}
   };
 
   // Used with WatchFileDescptor to asynchronously monitor the I/O readiness of
@@ -58,6 +46,45 @@
     virtual void OnFileCanWriteWithoutBlocking(int fd) = 0;
   };
 
+  // Object returned by WatchFileDescriptor to manage further watching.
+  class FileDescriptorWatcher {
+   public:
+    FileDescriptorWatcher();
+    ~FileDescriptorWatcher();  // Implicitly calls StopWatchingFileDescriptor.
+
+    // NOTE: These methods aren't called StartWatching()/StopWatching() to
+    // avoid confusion with the win32 ObjectWatcher class.
+
+    // Stop watching the FD, always safe to call.  No-op if there's nothing
+    // to do.
+    bool StopWatchingFileDescriptor();
+
+   private:
+    friend class MessagePumpLibevent;
+
+    // Called by MessagePumpLibevent, ownership of |e| is transferred to this
+    // object.
+    void Init(event* e, bool is_persistent);
+
+    // Used by MessagePumpLibevent to take ownership of event_.
+    event *ReleaseEvent();
+
+    void set_pump(MessagePumpLibevent* pump) { pump_ = pump; }
+    MessagePumpLibevent* pump() { return pump_; }
+
+    void set_watcher(Watcher* watcher) { watcher_ = watcher; }
+
+    void OnFileCanReadWithoutBlocking(int fd, MessagePumpLibevent* pump);
+    void OnFileCanWriteWithoutBlocking(int fd, MessagePumpLibevent* pump);
+
+    bool is_persistent_;  // false if this event is one-shot.
+    event* event_;
+    MessagePumpLibevent* pump_;
+    Watcher* watcher_;
+
+    DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher);
+  };
+
   MessagePumpLibevent();
   virtual ~MessagePumpLibevent();
 
@@ -84,6 +111,9 @@
                            FileDescriptorWatcher *controller,
                            Watcher *delegate);
 
+  void AddIOObserver(IOObserver* obs);
+  void RemoveIOObserver(IOObserver* obs);
+
   // MessagePump methods:
   virtual void Run(Delegate* delegate);
   virtual void Quit();
@@ -91,6 +121,8 @@
   virtual void ScheduleDelayedWork(const Time& delayed_work_time);
 
  private:
+  void WillProcessIOEvent();
+  void DidProcessIOEvent();
 
   // Risky part of constructor.  Returns true on success.
   bool Init();
@@ -122,6 +154,8 @@
   // ... libevent wrapper for read end
   event* wakeup_event_;
 
+  ObserverList<IOObserver> io_observers_;
+
   DISALLOW_COPY_AND_ASSIGN(MessagePumpLibevent);
 };
 
diff --git a/base/tracked.h b/base/tracked.h
index b0be729..3622d1c 100644
--- a/base/tracked.h
+++ b/base/tracked.h
@@ -107,8 +107,14 @@
 
   bool MissingBirthplace() const;
 
+#if defined(TRACK_ALL_TASK_OBJECTS)
+  base::TimeTicks tracked_birth_time() const { return tracked_birth_time_; }
+#else
+  base::TimeTicks tracked_birth_time() const { return base::TimeTicks::Now(); }
+#endif  // defined(TRACK_ALL_TASK_OBJECTS)
+
  private:
-#ifdef TRACK_ALL_TASK_OBJECTS
+#if defined(TRACK_ALL_TASK_OBJECTS)
 
   // Pointer to instance were counts of objects with the same birth location
   // (on the same thread) are stored.
@@ -118,7 +124,7 @@
   // reset before the object begins it active life.
   base::TimeTicks tracked_birth_time_;
 
-#endif  // TRACK_ALL_TASK_OBJECTS
+#endif  // defined(TRACK_ALL_TASK_OBJECTS)
 
   DISALLOW_COPY_AND_ASSIGN(Tracked);
 };