Made MessagePump a non-thread safe class.

This CL makes MessagePump a non-thread safe class to make sure thread-bound resources (such as the UI window used for pumping messages on Windows) are freed on the correct thread.

Handling of incoming tasks and synchronization between different threads was moved out to a separate class - IncomingTaskQueue reducing the number of locks to be taken while posting a task to one. Posting tasks via both MessageLoop and MessageLoopProxyImpl is now routed via IncomingTaskQueue.

BUG=241939

Review URL: https://chromiumcodereview.appspot.com/17567007

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@212948 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: b908293642bab4631101157c0aac4fb89fd6c287
diff --git a/base/message_loop/incoming_task_queue.cc b/base/message_loop/incoming_task_queue.cc
new file mode 100644
index 0000000..db99d87
--- /dev/null
+++ b/base/message_loop/incoming_task_queue.cc
@@ -0,0 +1,169 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop/incoming_task_queue.h"
+
+#include "base/debug/trace_event.h"
+#include "base/location.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/waitable_event.h"
+
+namespace base {
+namespace internal {
+
+IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
+    : message_loop_(message_loop),
+      next_sequence_num_(0) {
+}
+
+bool IncomingTaskQueue::AddToIncomingQueue(
+    const tracked_objects::Location& from_here,
+    const Closure& task,
+    TimeDelta delay,
+    bool nestable) {
+  AutoLock locked(incoming_queue_lock_);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(delay), nestable);
+  return PostPendingTask(&pending_task);
+}
+
+bool IncomingTaskQueue::TryAddToIncomingQueue(
+    const tracked_objects::Location& from_here,
+    const Closure& task) {
+  if (!incoming_queue_lock_.Try()) {
+    // Reset |task|.
+    Closure local_task = task;
+    return false;
+  }
+
+  AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
+  return PostPendingTask(&pending_task);
+}
+
+bool IncomingTaskQueue::IsHighResolutionTimerEnabledForTesting() {
+#if defined(OS_WIN)
+  return !high_resolution_timer_expiration_.is_null();
+#else
+  return true;
+#endif
+}
+
+bool IncomingTaskQueue::IsIdleForTesting() {
+  AutoLock lock(incoming_queue_lock_);
+  return incoming_queue_.empty();
+}
+
+void IncomingTaskQueue::LockWaitUnLockForTesting(WaitableEvent* caller_wait,
+                                                 WaitableEvent* caller_signal) {
+  AutoLock lock(incoming_queue_lock_);
+  caller_wait->Signal();
+  caller_signal->Wait();
+}
+
+void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
+  // Make sure no tasks are lost.
+  DCHECK(work_queue->empty());
+
+  // Acquire all we can from the inter-thread queue with one lock acquisition.
+  AutoLock lock(incoming_queue_lock_);
+  if (!incoming_queue_.empty())
+    incoming_queue_.Swap(work_queue);  // Constant time
+
+  DCHECK(incoming_queue_.empty());
+}
+
+void IncomingTaskQueue::WillDestroyCurrentMessageLoop() {
+#if defined(OS_WIN)
+  // If we left the high-resolution timer activated, deactivate it now.
+  // Doing this is not-critical, it is mainly to make sure we track
+  // the high resolution timer activations properly in our unit tests.
+  if (!high_resolution_timer_expiration_.is_null()) {
+    Time::ActivateHighResolutionTimer(false);
+    high_resolution_timer_expiration_ = TimeTicks();
+  }
+#endif
+
+  AutoLock lock(incoming_queue_lock_);
+  message_loop_ = NULL;
+}
+
+IncomingTaskQueue::~IncomingTaskQueue() {
+  // Verify that WillDestroyCurrentMessageLoop() has been called.
+  DCHECK(!message_loop_);
+}
+
+TimeTicks IncomingTaskQueue::CalculateDelayedRuntime(TimeDelta delay) {
+  TimeTicks delayed_run_time;
+  if (delay > TimeDelta()) {
+    delayed_run_time = TimeTicks::Now() + delay;
+
+#if defined(OS_WIN)
+    if (high_resolution_timer_expiration_.is_null()) {
+      // Windows timers are granular to 15.6ms.  If we only set high-res
+      // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms,
+      // which as a percentage is pretty inaccurate.  So enable high
+      // res timers for any timer which is within 2x of the granularity.
+      // This is a tradeoff between accuracy and power management.
+      bool needs_high_res_timers = delay.InMilliseconds() <
+          (2 * Time::kMinLowResolutionThresholdMs);
+      if (needs_high_res_timers) {
+        if (Time::ActivateHighResolutionTimer(true)) {
+          high_resolution_timer_expiration_ = TimeTicks::Now() +
+              TimeDelta::FromMilliseconds(
+                  MessageLoop::kHighResolutionTimerModeLeaseTimeMs);
+        }
+      }
+    }
+#endif
+  } else {
+    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
+  }
+
+#if defined(OS_WIN)
+  if (!high_resolution_timer_expiration_.is_null()) {
+    if (TimeTicks::Now() > high_resolution_timer_expiration_) {
+      Time::ActivateHighResolutionTimer(false);
+      high_resolution_timer_expiration_ = TimeTicks();
+    }
+  }
+#endif
+
+  return delayed_run_time;
+}
+
+bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
+  // Warning: Don't try to short-circuit, and handle this thread's tasks more
+  // directly, as it could starve handling of foreign threads.  Put every task
+  // into this queue.
+
+  // This should only be called while the lock is taken.
+  incoming_queue_lock_.AssertAcquired();
+
+  if (!message_loop_) {
+    pending_task->task.Reset();
+    return false;
+  }
+
+  // Initialize the sequence number. The sequence number is used for delayed
+  // tasks (to faciliate FIFO sorting when two tasks have the same
+  // delayed_run_time value) and for identifying the task in about:tracing.
+  pending_task->sequence_num = next_sequence_num_++;
+
+  TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
+      TRACE_ID_MANGLE(message_loop_->GetTaskTraceID(*pending_task)));
+
+  bool was_empty = incoming_queue_.empty();
+  incoming_queue_.push(*pending_task);
+  pending_task->task.Reset();
+
+  // Wake up the pump.
+  message_loop_->ScheduleWork(was_empty);
+
+  return true;
+}
+
+}  // namespace internal
+}  // namespace base
diff --git a/base/message_loop/incoming_task_queue.h b/base/message_loop/incoming_task_queue.h
new file mode 100644
index 0000000..d831a71
--- /dev/null
+++ b/base/message_loop/incoming_task_queue.h
@@ -0,0 +1,104 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
+#define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
+
+#include "base/base_export.h"
+#include "base/memory/ref_counted.h"
+#include "base/pending_task.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+
+namespace base {
+
+class MessageLoop;
+class WaitableEvent;
+
+namespace internal {
+
+// Implements a queue of tasks posted to the message loop running on the current
+// thread. This class takes care of synchronizing posting tasks from different
+// threads and together with MessageLoop ensures clean shutdown.
+class BASE_EXPORT IncomingTaskQueue
+    : public RefCountedThreadSafe<IncomingTaskQueue> {
+ public:
+  explicit IncomingTaskQueue(MessageLoop* message_loop);
+
+  // Appends a task to the incoming queue. Posting of all tasks is routed though
+  // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
+  // task is properly synchronized between different threads.
+  //
+  // Returns true if the task was successfully added to the queue, otherwise
+  // returns false. In all cases, the ownership of |task| is transferred to the
+  // called method.
+  bool AddToIncomingQueue(const tracked_objects::Location& from_here,
+                          const Closure& task,
+                          TimeDelta delay,
+                          bool nestable);
+
+  // Same as AddToIncomingQueue() except that it will avoid blocking if the lock
+  // is already held, and will in that case (when the lock is contended) fail to
+  // add the task, and will return false.
+  bool TryAddToIncomingQueue(const tracked_objects::Location& from_here,
+                             const Closure& task);
+
+  // Returns true if the message loop has high resolution timers enabled.
+  // Provided for testing.
+  bool IsHighResolutionTimerEnabledForTesting();
+
+  // Returns true if the message loop is "idle". Provided for testing.
+  bool IsIdleForTesting();
+
+  // Takes the incoming queue lock, signals |caller_wait| and waits until
+  // |caller_signal| is signalled.
+  void LockWaitUnLockForTesting(WaitableEvent* caller_wait,
+                                WaitableEvent* caller_signal);
+
+  // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
+  // from the thread that is running the loop.
+  void ReloadWorkQueue(TaskQueue* work_queue);
+
+  // Disconnects |this| from the parent message loop.
+  void WillDestroyCurrentMessageLoop();
+
+ private:
+  friend class RefCountedThreadSafe<IncomingTaskQueue>;
+  virtual ~IncomingTaskQueue();
+
+  // Calculates the time at which a PendingTask should run.
+  TimeTicks CalculateDelayedRuntime(TimeDelta delay);
+
+  // Adds a task to |incoming_queue_|. The caller retains ownership of
+  // |pending_task|, but this function will reset the value of
+  // |pending_task->task|. This is needed to ensure that the posting call stack
+  // does not retain |pending_task->task| beyond this function call.
+  bool PostPendingTask(PendingTask* pending_task);
+
+#if defined(OS_WIN)
+  TimeTicks high_resolution_timer_expiration_;
+#endif
+
+  // The lock that protects access to |incoming_queue_|, |message_loop_| and
+  // |next_sequence_num_|.
+  base::Lock incoming_queue_lock_;
+
+  // An incoming queue of tasks that are acquired under a mutex for processing
+  // on this instance's thread. These tasks have not yet been been pushed to
+  // |message_loop_|.
+  TaskQueue incoming_queue_;
+
+  // Points to the message loop that owns |this|.
+  MessageLoop* message_loop_;
+
+  // The next sequence number to use for delayed tasks.
+  int next_sequence_num_;
+
+  DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
+};
+
+}  // namespace internal
+}  // namespace base
+
+#endif  // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
diff --git a/base/message_loop/message_loop.cc b/base/message_loop/message_loop.cc
index bd3542a..d2eafbd 100644
--- a/base/message_loop/message_loop.cc
+++ b/base/message_loop/message_loop.cc
@@ -13,7 +13,6 @@
 #include "base/lazy_instance.h"
 #include "base/logging.h"
 #include "base/memory/scoped_ptr.h"
-#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/message_loop/message_pump_default.h"
 #include "base/metrics/histogram.h"
 #include "base/metrics/statistics_recorder.h"
@@ -89,14 +88,6 @@
 
 MessageLoop::MessagePumpFactory* message_pump_for_ui_factory_ = NULL;
 
-// Create a process-wide unique ID to represent this task in trace events. This
-// will be mangled with a Process ID hash to reduce the likelyhood of colliding
-// with MessageLoop pointers on other processes.
-uint64 GetTaskTraceID(const PendingTask& task, MessageLoop* loop) {
-  return (static_cast<uint64>(task.sequence_num) << 32) |
-         static_cast<uint64>(reinterpret_cast<intptr_t>(loop));
-}
-
 // Returns true if MessagePump::ScheduleWork() must be called one
 // time for every task that is added to the MessageLoop incoming queue.
 bool AlwaysNotifyPump(MessageLoop::Type type) {
@@ -146,18 +137,19 @@
 
 MessageLoop::MessageLoop(Type type)
     : type_(type),
-      nestable_tasks_allowed_(true),
       exception_restoration_(false),
-      message_histogram_(NULL),
-      run_loop_(NULL),
+      nestable_tasks_allowed_(true),
 #if defined(OS_WIN)
       os_modal_loop_(false),
 #endif  // OS_WIN
-      next_sequence_num_(0) {
+      message_histogram_(NULL),
+      run_loop_(NULL) {
   DCHECK(!current()) << "should only have one message loop per thread";
   lazy_tls_ptr.Pointer()->Set(this);
 
-  message_loop_proxy_ = new MessageLoopProxyImpl();
+  incoming_task_queue_ = new internal::IncomingTaskQueue(this);
+  message_loop_proxy_ =
+      new internal::MessageLoopProxyImpl(incoming_task_queue_);
   thread_task_runner_handle_.reset(
       new ThreadTaskRunnerHandle(message_loop_proxy_));
 
@@ -177,7 +169,7 @@
 #define MESSAGE_PUMP_UI NULL
 // ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and
 // doesn't require extra support for watching file descriptors.
-#define MESSAGE_PUMP_IO new MessagePumpDefault();
+#define MESSAGE_PUMP_IO new MessagePumpDefault()
 #elif defined(OS_POSIX)  // POSIX but not MACOSX.
 #define MESSAGE_PUMP_UI new MessagePumpForUI()
 #define MESSAGE_PUMP_IO new MessagePumpLibevent()
@@ -187,14 +179,14 @@
 
   if (type_ == TYPE_UI) {
     if (message_pump_for_ui_factory_)
-      pump_ = message_pump_for_ui_factory_();
+      pump_.reset(message_pump_for_ui_factory_());
     else
-      pump_ = MESSAGE_PUMP_UI;
+      pump_.reset(MESSAGE_PUMP_UI);
   } else if (type_ == TYPE_IO) {
-    pump_ = MESSAGE_PUMP_IO;
+    pump_.reset(MESSAGE_PUMP_IO);
   } else {
     DCHECK_EQ(TYPE_DEFAULT, type_);
-    pump_ = new MessagePumpDefault();
+    pump_.reset(new MessagePumpDefault());
   }
 }
 
@@ -226,23 +218,13 @@
 
   thread_task_runner_handle_.reset();
 
-  // Tell the message_loop_proxy that we are dying.
-  static_cast<MessageLoopProxyImpl*>(message_loop_proxy_.get())->
-      WillDestroyCurrentMessageLoop();
+  // Tell the incoming queue that we are dying.
+  incoming_task_queue_->WillDestroyCurrentMessageLoop();
+  incoming_task_queue_ = NULL;
   message_loop_proxy_ = NULL;
 
   // OK, now make it so that no one can find us.
   lazy_tls_ptr.Pointer()->Set(NULL);
-
-#if defined(OS_WIN)
-  // If we left the high-resolution timer activated, deactivate it now.
-  // Doing this is not-critical, it is mainly to make sure we track
-  // the high resolution timer activations properly in our unit tests.
-  if (!high_resolution_timer_expiration_.is_null()) {
-    Time::ActivateHighResolutionTimer(false);
-    high_resolution_timer_expiration_ = TimeTicks();
-  }
-#endif
 }
 
 // static
@@ -283,18 +265,14 @@
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
 }
 
 bool MessageLoop::TryPostTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
-  return AddToIncomingQueue(&pending_task, true);
+  return incoming_task_queue_->TryAddToIncomingQueue(from_here, task);
 }
 
 void MessageLoop::PostDelayedTask(
@@ -302,18 +280,14 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(delay), true);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true);
 }
 
 void MessageLoop::PostNonNestableTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), false);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
 }
 
 void MessageLoop::PostNonNestableDelayedTask(
@@ -321,9 +295,7 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(delay), false);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false);
 }
 
 void MessageLoop::Run() {
@@ -395,17 +367,26 @@
   task_observers_.RemoveObserver(task_observer);
 }
 
-void MessageLoop::AssertIdle() const {
-  // We only check |incoming_queue_|, since we don't want to lock |work_queue_|.
-  AutoLock lock(incoming_queue_lock_);
-  DCHECK(incoming_queue_.empty());
-}
-
 bool MessageLoop::is_running() const {
   DCHECK_EQ(this, current());
   return run_loop_ != NULL;
 }
 
+bool MessageLoop::IsHighResolutionTimerEnabledForTesting() {
+  return incoming_task_queue_->IsHighResolutionTimerEnabledForTesting();
+}
+
+bool MessageLoop::IsIdleForTesting() {
+  // We only check the imcoming queue|, since we don't want to lock the work
+  // queue.
+  return incoming_task_queue_->IsIdleForTesting();
+}
+
+void MessageLoop::LockWaitUnLockForTesting(WaitableEvent* caller_wait,
+                                           WaitableEvent* caller_signal) {
+  incoming_task_queue_->LockWaitUnLockForTesting(caller_wait, caller_signal);
+}
+
 //------------------------------------------------------------------------------
 
 // Runs the loop in two different SEH modes:
@@ -470,7 +451,7 @@
       tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally);
 
   TRACE_EVENT_FLOW_END1("task", "MessageLoop::PostTask",
-      TRACE_ID_MANGLE(GetTaskTraceID(pending_task, this)),
+      TRACE_ID_MANGLE(GetTaskTraceID(pending_task)),
       "queue_duration",
       (start_time - pending_task.EffectiveTimePosted()).InMilliseconds());
   TRACE_EVENT2("task", "MessageLoop::RunTask",
@@ -523,24 +504,6 @@
   delayed_work_queue_.push(pending_task);
 }
 
-void MessageLoop::ReloadWorkQueue() {
-  // We can improve performance of our loading tasks from incoming_queue_ to
-  // work_queue_ by waiting until the last minute (work_queue_ is empty) to
-  // load.  That reduces the number of locks-per-task significantly when our
-  // queues get large.
-  if (!work_queue_.empty())
-    return;  // Wait till we *really* need to lock and load.
-
-  // Acquire all we can from the inter-thread queue with one lock acquisition.
-  {
-    AutoLock lock(incoming_queue_lock_);
-    if (incoming_queue_.empty())
-      return;
-    incoming_queue_.Swap(&work_queue_);  // Constant time
-    DCHECK(incoming_queue_.empty());
-  }
-}
-
 bool MessageLoop::DeletePendingTasks() {
   bool did_work = !work_queue_.empty();
   while (!work_queue_.empty()) {
@@ -570,87 +533,25 @@
   return did_work;
 }
 
-TimeTicks MessageLoop::CalculateDelayedRuntime(TimeDelta delay) {
-  TimeTicks delayed_run_time;
-  if (delay > TimeDelta()) {
-    delayed_run_time = TimeTicks::Now() + delay;
-
-#if defined(OS_WIN)
-    if (high_resolution_timer_expiration_.is_null()) {
-      // Windows timers are granular to 15.6ms.  If we only set high-res
-      // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms,
-      // which as a percentage is pretty inaccurate.  So enable high
-      // res timers for any timer which is within 2x of the granularity.
-      // This is a tradeoff between accuracy and power management.
-      bool needs_high_res_timers = delay.InMilliseconds() <
-          (2 * Time::kMinLowResolutionThresholdMs);
-      if (needs_high_res_timers) {
-        if (Time::ActivateHighResolutionTimer(true)) {
-          high_resolution_timer_expiration_ = TimeTicks::Now() +
-              TimeDelta::FromMilliseconds(kHighResolutionTimerModeLeaseTimeMs);
-        }
-      }
-    }
-#endif
-  } else {
-    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
-  }
-
-#if defined(OS_WIN)
-  if (!high_resolution_timer_expiration_.is_null()) {
-    if (TimeTicks::Now() > high_resolution_timer_expiration_) {
-      Time::ActivateHighResolutionTimer(false);
-      high_resolution_timer_expiration_ = TimeTicks();
-    }
-  }
-#endif
-
-  return delayed_run_time;
+uint64 MessageLoop::GetTaskTraceID(const PendingTask& task) {
+  return (static_cast<uint64>(task.sequence_num) << 32) |
+         static_cast<uint64>(reinterpret_cast<intptr_t>(this));
 }
 
-// Possibly called on a background thread!
-bool MessageLoop::AddToIncomingQueue(PendingTask* pending_task,
-                                     bool use_try_lock) {
-  // Warning: Don't try to short-circuit, and handle this thread's tasks more
-  // directly, as it could starve handling of foreign threads.  Put every task
-  // into this queue.
+void MessageLoop::ReloadWorkQueue() {
+  // We can improve performance of our loading tasks from the incoming queue to
+  // |*work_queue| by waiting until the last minute (|*work_queue| is empty) to
+  // load. That reduces the number of locks-per-task significantly when our
+  // queues get large.
+  if (work_queue_.empty())
+    incoming_task_queue_->ReloadWorkQueue(&work_queue_);
+}
 
-  scoped_refptr<MessagePump> pump;
-  {
-    if (use_try_lock) {
-      if (!incoming_queue_lock_.Try()) {
-        pending_task->task.Reset();
-        return false;
-      }
-    } else {
-      incoming_queue_lock_.Acquire();
-    }
-    AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
-    // Initialize the sequence number. The sequence number is used for delayed
-    // tasks (to faciliate FIFO sorting when two tasks have the same
-    // delayed_run_time value) and for identifying the task in about:tracing.
-    pending_task->sequence_num = next_sequence_num_++;
-
-    TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
-        TRACE_ID_MANGLE(GetTaskTraceID(*pending_task, this)));
-
-    bool was_empty = incoming_queue_.empty();
-    incoming_queue_.push(*pending_task);
-    pending_task->task.Reset();
-    // The Android UI message loop needs to get notified each time
-    // a task is added to the incoming queue.
-    if (!was_empty && !AlwaysNotifyPump(type_))
-      return true;  // Someone else should have started the sub-pump.
-
-    pump = pump_;
-  }
-  // Since the incoming_queue_ may contain a task that destroys this message
-  // loop, we cannot exit incoming_queue_lock_ until we are done with |this|.
-  // We use a stack-based reference to the message pump so that we can call
-  // ScheduleWork outside of incoming_queue_lock_.
-
-  pump->ScheduleWork();
-  return true;
+void MessageLoop::ScheduleWork(bool was_empty) {
+  // The Android UI message loop needs to get notified each time
+  // a task is added to the incoming queue.
+  if (was_empty || AlwaysNotifyPump(type_))
+    pump_->ScheduleWork();
 }
 
 //------------------------------------------------------------------------------
diff --git a/base/message_loop/message_loop.h b/base/message_loop/message_loop.h
index 846cc8d..6f71a85 100644
--- a/base/message_loop/message_loop.h
+++ b/base/message_loop/message_loop.h
@@ -13,7 +13,10 @@
 #include "base/callback_forward.h"
 #include "base/location.h"
 #include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/incoming_task_queue.h"
 #include "base/message_loop/message_loop_proxy.h"
+#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/message_loop/message_pump.h"
 #include "base/observer_list.h"
 #include "base/pending_task.h"
@@ -46,12 +49,12 @@
 namespace base {
 
 class HistogramBase;
-class MessageLoopLockTest;
 class RunLoop;
 class ThreadTaskRunnerHandle;
 #if defined(OS_ANDROID)
 class MessagePumpForUI;
 #endif
+class WaitableEvent;
 
 // A MessageLoop is used to process events for a particular thread.  There is
 // at most one MessageLoop instance per thread.
@@ -283,7 +286,7 @@
 
   // Gets the message loop proxy associated with this message loop.
   scoped_refptr<MessageLoopProxy> message_loop_proxy() {
-    return message_loop_proxy_.get();
+    return message_loop_proxy_;
   }
 
   // Enables or disables the recursive task processing. This happens in the case
@@ -359,23 +362,10 @@
   void AddTaskObserver(TaskObserver* task_observer);
   void RemoveTaskObserver(TaskObserver* task_observer);
 
-  // Returns true if the message loop has high resolution timers enabled.
-  // Provided for testing.
-  bool high_resolution_timers_enabled() {
-#if defined(OS_WIN)
-    return !high_resolution_timer_expiration_.is_null();
-#else
-    return true;
-#endif
-  }
-
   // When we go into high resolution timer mode, we will stay in hi-res mode
   // for at least 1s.
   static const int kHighResolutionTimerModeLeaseTimeMs = 1000;
 
-  // Asserts that the MessageLoop is "idle".
-  void AssertIdle() const;
-
 #if defined(OS_WIN)
   void set_os_modal_loop(bool os_modal_loop) {
     os_modal_loop_ = os_modal_loop;
@@ -389,6 +379,18 @@
   // Can only be called from the thread that owns the MessageLoop.
   bool is_running() const;
 
+  // Returns true if the message loop has high resolution timers enabled.
+  // Provided for testing.
+  bool IsHighResolutionTimerEnabledForTesting();
+
+  // Returns true if the message loop is "idle". Provided for testing.
+  bool IsIdleForTesting();
+
+  // Takes the incoming queue lock, signals |caller_wait| and waits until
+  // |caller_signal| is signalled.
+  void LockWaitUnLockForTesting(WaitableEvent* caller_wait,
+                                WaitableEvent* caller_signal);
+
   //----------------------------------------------------------------------------
  protected:
 
@@ -402,11 +404,11 @@
   }
 #endif
 
-  scoped_refptr<MessagePump> pump_;
+  scoped_ptr<MessagePump> pump_;
 
  private:
+  friend class internal::IncomingTaskQueue;
   friend class RunLoop;
-  friend class MessageLoopLockTest;
 
   // A function to encapsulate all the exception handling capability in the
   // stacks around the running of a main message loop.  It will run the message
@@ -436,34 +438,23 @@
   // Adds the pending task to delayed_work_queue_.
   void AddToDelayedWorkQueue(const PendingTask& pending_task);
 
-  // This function attempts to add pending task to our incoming_queue_.
-  // The append can only possibly fail when |use_try_lock| is true.
-  //
-  // When |use_try_lock| is true, then this call will avoid blocking if
-  // the related lock is already held, and will in that case (when the
-  // lock is contended) fail to perform the append, and will return false.
-  //
-  // If the call succeeds to append to the queue, then this call
-  // will return true.
-  //
-  // In all cases, the caller retains ownership of |pending_task|, but this
-  // function will reset the value of pending_task->task.  This is needed to
-  // ensure that the posting call stack does not retain pending_task->task
-  // beyond this function call.
-  bool AddToIncomingQueue(PendingTask* pending_task, bool use_try_lock);
-
-  // Load tasks from the incoming_queue_ into work_queue_ if the latter is
-  // empty.  The former requires a lock to access, while the latter is directly
-  // accessible on this thread.
-  void ReloadWorkQueue();
-
   // Delete tasks that haven't run yet without running them.  Used in the
   // destructor to make sure all the task's destructors get called.  Returns
   // true if some work was done.
   bool DeletePendingTasks();
 
-  // Calculates the time at which a PendingTask should run.
-  TimeTicks CalculateDelayedRuntime(TimeDelta delay);
+  // Creates a process-wide unique ID to represent this task in trace events.
+  // This will be mangled with a Process ID hash to reduce the likelyhood of
+  // colliding with MessageLoop pointers on other processes.
+  uint64 GetTaskTraceID(const PendingTask& task);
+
+  // Loads tasks from the incoming queue to |work_queue_| if the latter is
+  // empty.
+  void ReloadWorkQueue();
+
+  // Wakes up the message pump. Can be called on any thread. The caller is
+  // responsible for synchronizing ScheduleWork() calls.
+  void ScheduleWork(bool was_empty);
 
   // Start recording histogram info about events and action IF it was enabled
   // and IF the statistics recorder can accept a registration of our histogram.
@@ -498,40 +489,30 @@
 
   ObserverList<DestructionObserver> destruction_observers_;
 
+  bool exception_restoration_;
+
   // A recursion block that prevents accidentally running additional tasks when
   // insider a (accidentally induced?) nested message pump.
   bool nestable_tasks_allowed_;
 
-  bool exception_restoration_;
-
-  std::string thread_name_;
-  // A profiling histogram showing the counts of various messages and events.
-  HistogramBase* message_histogram_;
-
-  // An incoming queue of tasks that are acquired under a mutex for processing
-  // on this instance's thread. These tasks have not yet been sorted out into
-  // items for our work_queue_ vs delayed_work_queue_.
-  TaskQueue incoming_queue_;
-  // Protect access to incoming_queue_.
-  mutable Lock incoming_queue_lock_;
-
-  RunLoop* run_loop_;
-
 #if defined(OS_WIN)
-  TimeTicks high_resolution_timer_expiration_;
   // Should be set to true before calling Windows APIs like TrackPopupMenu, etc
   // which enter a modal message loop.
   bool os_modal_loop_;
 #endif
 
-  // The next sequence number to use for delayed tasks. Updating this counter is
-  // protected by incoming_queue_lock_.
-  int next_sequence_num_;
+  std::string thread_name_;
+  // A profiling histogram showing the counts of various messages and events.
+  HistogramBase* message_histogram_;
+
+  RunLoop* run_loop_;
 
   ObserverList<TaskObserver> task_observers_;
 
-  // The message loop proxy associated with this message loop, if one exists.
-  scoped_refptr<MessageLoopProxy> message_loop_proxy_;
+  scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;
+
+  // The message loop proxy associated with this message loop.
+  scoped_refptr<internal::MessageLoopProxyImpl> message_loop_proxy_;
   scoped_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle_;
 
   template <class T, class R> friend class base::subtle::DeleteHelperInternal;
diff --git a/base/message_loop/message_loop_proxy_impl.cc b/base/message_loop/message_loop_proxy_impl.cc
index 7dc8caa..b7abca3 100644
--- a/base/message_loop/message_loop_proxy_impl.cc
+++ b/base/message_loop/message_loop_proxy_impl.cc
@@ -5,81 +5,43 @@
 #include "base/message_loop/message_loop_proxy_impl.h"
 
 #include "base/location.h"
-#include "base/threading/thread_restrictions.h"
+#include "base/logging.h"
+#include "base/message_loop/incoming_task_queue.h"
+#include "base/message_loop/message_loop.h"
 
 namespace base {
+namespace internal {
 
-MessageLoopProxyImpl::~MessageLoopProxyImpl() {
+MessageLoopProxyImpl::MessageLoopProxyImpl(
+    scoped_refptr<IncomingTaskQueue> incoming_queue)
+    : incoming_queue_(incoming_queue),
+      valid_thread_id_(PlatformThread::CurrentId()) {
 }
 
 bool MessageLoopProxyImpl::PostDelayedTask(
     const tracked_objects::Location& from_here,
     const base::Closure& task,
     base::TimeDelta delay) {
-  return PostTaskHelper(from_here, task, delay, true);
+  DCHECK(!task.is_null()) << from_here.ToString();
+  return incoming_queue_->AddToIncomingQueue(from_here, task, delay, true);
 }
 
 bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
     const tracked_objects::Location& from_here,
     const base::Closure& task,
     base::TimeDelta delay) {
-  return PostTaskHelper(from_here, task, delay, false);
+  DCHECK(!task.is_null()) << from_here.ToString();
+  return incoming_queue_->AddToIncomingQueue(from_here, task, delay, false);
 }
 
 bool MessageLoopProxyImpl::RunsTasksOnCurrentThread() const {
-  // We shouldn't use MessageLoop::current() since it uses LazyInstance which
-  // may be deleted by ~AtExitManager when a WorkerPool thread calls this
-  // function.
-  // http://crbug.com/63678
-  base::ThreadRestrictions::ScopedAllowSingleton allow_singleton;
-  AutoLock lock(message_loop_lock_);
-  return (target_message_loop_ &&
-          (MessageLoop::current() == target_message_loop_));
+  return valid_thread_id_ == PlatformThread::CurrentId();
 }
 
-// MessageLoop::DestructionObserver implementation
-void MessageLoopProxyImpl::WillDestroyCurrentMessageLoop() {
-  AutoLock lock(message_loop_lock_);
-  target_message_loop_ = NULL;
+MessageLoopProxyImpl::~MessageLoopProxyImpl() {
 }
 
-void MessageLoopProxyImpl::OnDestruct() const {
-  // We shouldn't use MessageLoop::current() since it uses LazyInstance which
-  // may be deleted by ~AtExitManager when a WorkerPool thread calls this
-  // function.
-  // http://crbug.com/63678
-  base::ThreadRestrictions::ScopedAllowSingleton allow_singleton;
-  bool delete_later = false;
-  {
-    AutoLock lock(message_loop_lock_);
-    if (target_message_loop_ &&
-        (MessageLoop::current() != target_message_loop_)) {
-      target_message_loop_->DeleteSoon(FROM_HERE, this);
-      delete_later = true;
-    }
-  }
-  if (!delete_later)
-    delete this;
-}
-
-MessageLoopProxyImpl::MessageLoopProxyImpl()
-    : target_message_loop_(MessageLoop::current()) {
-}
-
-bool MessageLoopProxyImpl::PostTaskHelper(
-    const tracked_objects::Location& from_here, const base::Closure& task,
-    base::TimeDelta delay, bool nestable) {
-  AutoLock lock(message_loop_lock_);
-  if (target_message_loop_) {
-    if (nestable) {
-      target_message_loop_->PostDelayedTask(from_here, task, delay);
-    } else {
-      target_message_loop_->PostNonNestableDelayedTask(from_here, task, delay);
-    }
-    return true;
-  }
-  return false;
-}
+}  // namespace internal
 
 scoped_refptr<MessageLoopProxy>
 MessageLoopProxy::current() {
diff --git a/base/message_loop/message_loop_proxy_impl.h b/base/message_loop/message_loop_proxy_impl.h
index 6d6f0f6..b7f62b9 100644
--- a/base/message_loop/message_loop_proxy_impl.h
+++ b/base/message_loop/message_loop_proxy_impl.h
@@ -6,17 +6,24 @@
 #define BASE_MESSAGE_LOOP_MESSAGE_LOOP_PROXY_IMPL_H_
 
 #include "base/base_export.h"
-#include "base/message_loop/message_loop.h"
+#include "base/memory/ref_counted.h"
 #include "base/message_loop/message_loop_proxy.h"
-#include "base/synchronization/lock.h"
+#include "base/pending_task.h"
+#include "base/threading/platform_thread.h"
 
 namespace base {
+namespace internal {
+
+class IncomingTaskQueue;
 
 // A stock implementation of MessageLoopProxy that is created and managed by a
 // MessageLoop. For now a MessageLoopProxyImpl can only be created as part of a
 // MessageLoop.
 class BASE_EXPORT MessageLoopProxyImpl : public MessageLoopProxy {
  public:
+  explicit MessageLoopProxyImpl(
+      scoped_refptr<IncomingTaskQueue> incoming_queue);
+
   // MessageLoopProxy implementation
   virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
                                const base::Closure& task,
@@ -27,36 +34,20 @@
       base::TimeDelta delay) OVERRIDE;
   virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
 
- protected:
+ private:
+  friend class RefCountedThreadSafe<MessageLoopProxyImpl>;
   virtual ~MessageLoopProxyImpl();
 
-  // Override OnDestruct so that we can delete the object on the target message
-  // loop if it still exists.
-  virtual void OnDestruct() const OVERRIDE;
+  // THe incoming queue receiving all posted tasks.
+  scoped_refptr<IncomingTaskQueue> incoming_queue_;
 
- private:
-  // Allow the MessageLoop to create a MessageLoopProxyImpl.
-  friend class MessageLoop;
-  friend class DeleteHelper<MessageLoopProxyImpl>;
-
-  MessageLoopProxyImpl();
-
-  // Called directly by MessageLoop::~MessageLoop.
-  virtual void WillDestroyCurrentMessageLoop();
-
-
-  bool PostTaskHelper(const tracked_objects::Location& from_here,
-                      const base::Closure& task,
-                      base::TimeDelta delay,
-                      bool nestable);
-
-  // The lock that protects access to target_message_loop_.
-  mutable base::Lock message_loop_lock_;
-  MessageLoop* target_message_loop_;
+  // ID of the thread |this| was created on.
+  PlatformThreadId valid_thread_id_;
 
   DISALLOW_COPY_AND_ASSIGN(MessageLoopProxyImpl);
 };
 
+}  // namespace internal
 }  // namespace base
 
 #endif  // BASE_MESSAGE_LOOP_MESSAGE_LOOP_PROXY_IMPL_H_
diff --git a/base/message_loop/message_loop_unittest.cc b/base/message_loop/message_loop_unittest.cc
index 504c8e3..ab05b3c 100644
--- a/base/message_loop/message_loop_unittest.cc
+++ b/base/message_loop/message_loop_unittest.cc
@@ -10,6 +10,7 @@
 #include "base/logging.h"
 #include "base/memory/ref_counted.h"
 #include "base/message_loop/message_loop.h"
+#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/pending_task.h"
 #include "base/posix/eintr_wrapper.h"
 #include "base/run_loop.h"
@@ -26,19 +27,6 @@
 
 namespace base {
 
-class MessageLoopLockTest {
- public:
-  static void LockWaitUnLock(MessageLoop* loop,
-                             WaitableEvent* caller_wait,
-                             WaitableEvent* caller_signal) {
-
-    loop->incoming_queue_lock_.Acquire();
-    caller_wait->Signal();
-    caller_signal->Wait();
-    loop->incoming_queue_lock_.Release();
-  }
-};
-
 // TODO(darin): Platform-specific MessageLoop tests should be grouped together
 // to avoid chopping this file up with so many #ifdefs.
 
@@ -121,10 +109,10 @@
   thread.Start();
   thread.message_loop()->PostTask(
       FROM_HERE,
-      Bind(&MessageLoopLockTest::LockWaitUnLock,
-      MessageLoop::current(),
-      &wait,
-      &signal));
+      Bind(&MessageLoop::LockWaitUnLockForTesting,
+           base::Unretained(MessageLoop::current()),
+           &wait,
+           &signal));
 
   wait.Wait();
   EXPECT_FALSE(MessageLoop::current()->TryPostTask(FROM_HERE, Bind(
@@ -1895,20 +1883,20 @@
   const TimeDelta kFastTimer = TimeDelta::FromMilliseconds(5);
   const TimeDelta kSlowTimer = TimeDelta::FromMilliseconds(100);
 
-  EXPECT_FALSE(loop.high_resolution_timers_enabled());
+  EXPECT_FALSE(loop.IsHighResolutionTimerEnabledForTesting());
 
   // Post a fast task to enable the high resolution timers.
   loop.PostDelayedTask(FROM_HERE, Bind(&PostNTasksThenQuit, 1),
                        kFastTimer);
   loop.Run();
-  EXPECT_TRUE(loop.high_resolution_timers_enabled());
+  EXPECT_TRUE(loop.IsHighResolutionTimerEnabledForTesting());
 
   // Post a slow task and verify high resolution timers
   // are still enabled.
   loop.PostDelayedTask(FROM_HERE, Bind(&PostNTasksThenQuit, 1),
                        kSlowTimer);
   loop.Run();
-  EXPECT_TRUE(loop.high_resolution_timers_enabled());
+  EXPECT_TRUE(loop.IsHighResolutionTimerEnabledForTesting());
 
   // Wait for a while so that high-resolution mode elapses.
   PlatformThread::Sleep(TimeDelta::FromMilliseconds(
@@ -1918,7 +1906,7 @@
   loop.PostDelayedTask(FROM_HERE, Bind(&PostNTasksThenQuit, 1),
                        kSlowTimer);
   loop.Run();
-  EXPECT_FALSE(loop.high_resolution_timers_enabled());
+  EXPECT_FALSE(loop.IsHighResolutionTimerEnabledForTesting());
 }
 
 #endif  // defined(OS_WIN)
diff --git a/base/message_loop/message_pump.h b/base/message_loop/message_pump.h
index 5b72232..0ebba3a 100644
--- a/base/message_loop/message_pump.h
+++ b/base/message_loop/message_pump.h
@@ -6,13 +6,13 @@
 #define BASE_MESSAGE_LOOP_MESSAGE_PUMP_H_
 
 #include "base/base_export.h"
-#include "base/memory/ref_counted.h"
+#include "base/threading/non_thread_safe.h"
 
 namespace base {
 
 class TimeTicks;
 
-class BASE_EXPORT MessagePump : public RefCountedThreadSafe<MessagePump> {
+class BASE_EXPORT MessagePump : public NonThreadSafe {
  public:
   // Please see the comments above the Run method for an illustration of how
   // these delegate methods are used.
@@ -42,6 +42,7 @@
   };
 
   MessagePump();
+  virtual ~MessagePump();
 
   // The Run method is called to enter the message pump's run loop.
   //
@@ -118,10 +119,6 @@
   // cancelling any pending DoDelayedWork callback.  This method may only be
   // used on the thread that called Run.
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) = 0;
-
- protected:
-  virtual ~MessagePump();
-  friend class RefCountedThreadSafe<MessagePump>;
 };
 
 }  // namespace base
diff --git a/base/message_loop/message_pump_android.h b/base/message_loop/message_pump_android.h
index f3a8ded..8a07a0f 100644
--- a/base/message_loop/message_pump_android.h
+++ b/base/message_loop/message_pump_android.h
@@ -22,6 +22,7 @@
 class BASE_EXPORT MessagePumpForUI : public MessagePump {
  public:
   MessagePumpForUI();
+  virtual ~MessagePumpForUI();
 
   virtual void Run(Delegate* delegate) OVERRIDE;
   virtual void Quit() OVERRIDE;
@@ -32,9 +33,6 @@
 
   static bool RegisterBindings(JNIEnv* env);
 
- protected:
-  virtual ~MessagePumpForUI();
-
  private:
   RunLoop* run_loop_;
   base::android::ScopedJavaGlobalRef<jobject> system_message_handler_obj_;
diff --git a/base/message_loop/message_pump_aurax11.cc b/base/message_loop/message_pump_aurax11.cc
index 7f60628..1f91a0e 100644
--- a/base/message_loop/message_pump_aurax11.cc
+++ b/base/message_loop/message_pump_aurax11.cc
@@ -139,6 +139,13 @@
   x_root_window_ = DefaultRootWindow(g_xdisplay);
 }
 
+MessagePumpAuraX11::~MessagePumpAuraX11() {
+  g_source_destroy(x_source_);
+  g_source_unref(x_source_);
+  XCloseDisplay(g_xdisplay);
+  g_xdisplay = NULL;
+}
+
 // static
 Display* MessagePumpAuraX11::GetDefaultXDisplay() {
   if (!g_xdisplay)
@@ -211,13 +218,6 @@
   } while (event.type != MapNotify);
 }
 
-MessagePumpAuraX11::~MessagePumpAuraX11() {
-  g_source_destroy(x_source_);
-  g_source_unref(x_source_);
-  XCloseDisplay(g_xdisplay);
-  g_xdisplay = NULL;
-}
-
 void MessagePumpAuraX11::InitXSource() {
   // CHECKs are to help track down crbug.com/113106.
   CHECK(!x_source_);
diff --git a/base/message_loop/message_pump_aurax11.h b/base/message_loop/message_pump_aurax11.h
index f11b3b4..89089ad 100644
--- a/base/message_loop/message_pump_aurax11.h
+++ b/base/message_loop/message_pump_aurax11.h
@@ -35,6 +35,7 @@
                                        public MessagePumpDispatcher {
  public:
   MessagePumpAuraX11();
+  virtual ~MessagePumpAuraX11();
 
   // Returns default X Display.
   static Display* GetDefaultXDisplay();
@@ -72,9 +73,6 @@
   // functions which require a mapped window.
   void BlockUntilWindowMapped(unsigned long xid);
 
- protected:
-  virtual ~MessagePumpAuraX11();
-
  private:
   typedef std::map<unsigned long, MessagePumpDispatcher*> DispatchersMap;
 
diff --git a/base/message_loop/message_pump_default.cc b/base/message_loop/message_pump_default.cc
index b36ff21..27c19e0 100644
--- a/base/message_loop/message_pump_default.cc
+++ b/base/message_loop/message_pump_default.cc
@@ -18,6 +18,9 @@
       event_(false, false) {
 }
 
+MessagePumpDefault::~MessagePumpDefault() {
+}
+
 void MessagePumpDefault::Run(Delegate* delegate) {
   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
 
diff --git a/base/message_loop/message_pump_default.h b/base/message_loop/message_pump_default.h
index 07a6c70..a9b83e8 100644
--- a/base/message_loop/message_pump_default.h
+++ b/base/message_loop/message_pump_default.h
@@ -14,6 +14,7 @@
 class MessagePumpDefault : public MessagePump {
  public:
   MessagePumpDefault();
+  virtual ~MessagePumpDefault();
 
   // MessagePump methods:
   virtual void Run(Delegate* delegate) OVERRIDE;
@@ -21,9 +22,6 @@
   virtual void ScheduleWork() OVERRIDE;
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
 
- protected:
-  virtual ~MessagePumpDefault() {}
-
  private:
   // This flag is set to false when Run should return.
   bool keep_running_;
diff --git a/base/message_loop/message_pump_glib.cc b/base/message_loop/message_pump_glib.cc
index de012fd..cfacb7b 100644
--- a/base/message_loop/message_pump_glib.cc
+++ b/base/message_loop/message_pump_glib.cc
@@ -160,6 +160,13 @@
   g_source_attach(work_source_, context_);
 }
 
+MessagePumpGlib::~MessagePumpGlib() {
+  g_source_destroy(work_source_);
+  g_source_unref(work_source_);
+  close(wakeup_pipe_read_);
+  close(wakeup_pipe_write_);
+}
+
 void MessagePumpGlib::RunWithDispatcher(Delegate* delegate,
                                         MessagePumpDispatcher* dispatcher) {
 #ifndef NDEBUG
@@ -320,13 +327,6 @@
   ScheduleWork();
 }
 
-MessagePumpGlib::~MessagePumpGlib() {
-  g_source_destroy(work_source_);
-  g_source_unref(work_source_);
-  close(wakeup_pipe_read_);
-  close(wakeup_pipe_write_);
-}
-
 MessagePumpDispatcher* MessagePumpGlib::GetDispatcher() {
   return state_ ? state_->dispatcher : NULL;
 }
diff --git a/base/message_loop/message_pump_glib.h b/base/message_loop/message_pump_glib.h
index 775470e..33690d0 100644
--- a/base/message_loop/message_pump_glib.h
+++ b/base/message_loop/message_pump_glib.h
@@ -35,6 +35,7 @@
 class BASE_EXPORT MessagePumpGlib : public MessagePump {
  public:
   MessagePumpGlib();
+  virtual ~MessagePumpGlib();
 
   // Like MessagePump::Run, but events are routed through dispatcher.
   virtual void RunWithDispatcher(Delegate* delegate,
@@ -64,8 +65,6 @@
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
 
  protected:
-  virtual ~MessagePumpGlib();
-
   // Returns the dispatcher for the current run state (|state_->dispatcher|).
   MessagePumpDispatcher* GetDispatcher();
 
diff --git a/base/message_loop/message_pump_gtk.cc b/base/message_loop/message_pump_gtk.cc
index 8fa8cf2..ad65113 100644
--- a/base/message_loop/message_pump_gtk.cc
+++ b/base/message_loop/message_pump_gtk.cc
@@ -65,6 +65,11 @@
   gdk_event_handler_set(&EventDispatcher, this, NULL);
 }
 
+MessagePumpGtk::~MessagePumpGtk() {
+  gdk_event_handler_set(reinterpret_cast<GdkEventFunc>(gtk_main_do_event),
+                        this, NULL);
+}
+
 void MessagePumpGtk::DispatchEvents(GdkEvent* event) {
   UNSHIPPED_TRACE_EVENT1("task", "MessagePumpGtk::DispatchEvents",
                          "type", EventToTypeString(event));
@@ -92,11 +97,6 @@
   return GDK_DISPLAY_XDISPLAY(display);
 }
 
-MessagePumpGtk::~MessagePumpGtk() {
-  gdk_event_handler_set(reinterpret_cast<GdkEventFunc>(gtk_main_do_event),
-                        this, NULL);
-}
-
 void MessagePumpGtk::WillProcessEvent(GdkEvent* event) {
   FOR_EACH_OBSERVER(MessagePumpObserver, observers(), WillProcessEvent(event));
 }
diff --git a/base/message_loop/message_pump_gtk.h b/base/message_loop/message_pump_gtk.h
index e22e04f..947ab88 100644
--- a/base/message_loop/message_pump_gtk.h
+++ b/base/message_loop/message_pump_gtk.h
@@ -43,6 +43,7 @@
 class BASE_EXPORT MessagePumpGtk : public MessagePumpGlib {
  public:
   MessagePumpGtk();
+  virtual ~MessagePumpGtk();
 
   // Dispatch an available GdkEvent. Essentially this allows a subclass to do
   // some task before/after calling the default handler (EventDispatcher).
@@ -51,9 +52,6 @@
   // Returns default X Display.
   static Display* GetDefaultXDisplay();
 
- protected:
-  virtual ~MessagePumpGtk();
-
  private:
   // Invoked from EventDispatcher. Notifies all observers we're about to
   // process an event.
diff --git a/base/message_loop/message_pump_libevent.h b/base/message_loop/message_pump_libevent.h
index 3b1d26d..f3a48a9 100644
--- a/base/message_loop/message_pump_libevent.h
+++ b/base/message_loop/message_pump_libevent.h
@@ -98,6 +98,7 @@
   };
 
   MessagePumpLibevent();
+  virtual ~MessagePumpLibevent();
 
   // Have the current thread's message loop watch for a a situation in which
   // reading/writing to the FD can be performed without blocking.
@@ -126,9 +127,6 @@
   virtual void ScheduleWork() OVERRIDE;
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
 
- protected:
-  virtual ~MessagePumpLibevent();
-
  private:
   friend class MessagePumpLibeventTest;
 
diff --git a/base/message_loop/message_pump_libevent_unittest.cc b/base/message_loop/message_pump_libevent_unittest.cc
index 657ac7d..52ca95b 100644
--- a/base/message_loop/message_pump_libevent_unittest.cc
+++ b/base/message_loop/message_pump_libevent_unittest.cc
@@ -122,7 +122,7 @@
 };
 
 TEST_F(MessagePumpLibeventTest, DeleteWatcher) {
-  scoped_refptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
+  scoped_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
   MessagePumpLibevent::FileDescriptorWatcher* watcher =
       new MessagePumpLibevent::FileDescriptorWatcher;
   DeleteWatcher delegate(watcher);
@@ -147,7 +147,7 @@
 };
 
 TEST_F(MessagePumpLibeventTest, StopWatcher) {
-  scoped_refptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
+  scoped_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
   MessagePumpLibevent::FileDescriptorWatcher watcher;
   StopWatcher delegate(&watcher);
   pump->WatchFileDescriptor(pipefds_[1],
diff --git a/base/message_loop/message_pump_ozone.h b/base/message_loop/message_pump_ozone.h
index de75ab4..edcdf2e 100644
--- a/base/message_loop/message_pump_ozone.h
+++ b/base/message_loop/message_pump_ozone.h
@@ -20,6 +20,7 @@
                                      public MessagePumpDispatcher {
  public:
   MessagePumpOzone();
+  virtual ~MessagePumpOzone();
 
   // Returns the UI message pump.
   static MessagePumpOzone* Current();
@@ -39,7 +40,6 @@
   virtual bool Dispatch(const NativeEvent& event) OVERRIDE;
 
  private:
-  virtual ~MessagePumpOzone();
   std::vector<MessagePumpDispatcher*> dispatcher_;
 
   DISALLOW_COPY_AND_ASSIGN(MessagePumpOzone);