Revert 212948 "Made MessagePump a non-thread safe class."

r212948 broke Mac Builder: http://build.chromium.org/p/chromium.memory/builders/Mac%20ASAN%20Builder/builds/25623

> Made MessagePump a non-thread safe class.
> 
> This CL makes MessagePump a non-thread safe class to make sure thread-bound resources (such as the UI window used for pumping messages on Windows) are freed on the correct thread.
> 
> Handling of incoming tasks and synchronization between different threads was moved out to a separate class - IncomingTaskQueue reducing the number of locks to be taken while posting a task to one. Posting tasks via both MessageLoop and MessageLoopProxyImpl is now routed via IncomingTaskQueue.
> 
> BUG=241939
> 
> Review URL: https://chromiumcodereview.appspot.com/17567007

TBR=alexeypa@chromium.org

Review URL: https://codereview.chromium.org/19737005

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@212952 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: c9904b429860b83606492d4fe8f1b232c04753b5
diff --git a/base/message_loop/incoming_task_queue.cc b/base/message_loop/incoming_task_queue.cc
deleted file mode 100644
index db99d87..0000000
--- a/base/message_loop/incoming_task_queue.cc
+++ /dev/null
@@ -1,169 +0,0 @@
-// Copyright 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "base/message_loop/incoming_task_queue.h"
-
-#include "base/debug/trace_event.h"
-#include "base/location.h"
-#include "base/message_loop/message_loop.h"
-#include "base/synchronization/waitable_event.h"
-
-namespace base {
-namespace internal {
-
-IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
-    : message_loop_(message_loop),
-      next_sequence_num_(0) {
-}
-
-bool IncomingTaskQueue::AddToIncomingQueue(
-    const tracked_objects::Location& from_here,
-    const Closure& task,
-    TimeDelta delay,
-    bool nestable) {
-  AutoLock locked(incoming_queue_lock_);
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(delay), nestable);
-  return PostPendingTask(&pending_task);
-}
-
-bool IncomingTaskQueue::TryAddToIncomingQueue(
-    const tracked_objects::Location& from_here,
-    const Closure& task) {
-  if (!incoming_queue_lock_.Try()) {
-    // Reset |task|.
-    Closure local_task = task;
-    return false;
-  }
-
-  AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
-  return PostPendingTask(&pending_task);
-}
-
-bool IncomingTaskQueue::IsHighResolutionTimerEnabledForTesting() {
-#if defined(OS_WIN)
-  return !high_resolution_timer_expiration_.is_null();
-#else
-  return true;
-#endif
-}
-
-bool IncomingTaskQueue::IsIdleForTesting() {
-  AutoLock lock(incoming_queue_lock_);
-  return incoming_queue_.empty();
-}
-
-void IncomingTaskQueue::LockWaitUnLockForTesting(WaitableEvent* caller_wait,
-                                                 WaitableEvent* caller_signal) {
-  AutoLock lock(incoming_queue_lock_);
-  caller_wait->Signal();
-  caller_signal->Wait();
-}
-
-void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
-  // Make sure no tasks are lost.
-  DCHECK(work_queue->empty());
-
-  // Acquire all we can from the inter-thread queue with one lock acquisition.
-  AutoLock lock(incoming_queue_lock_);
-  if (!incoming_queue_.empty())
-    incoming_queue_.Swap(work_queue);  // Constant time
-
-  DCHECK(incoming_queue_.empty());
-}
-
-void IncomingTaskQueue::WillDestroyCurrentMessageLoop() {
-#if defined(OS_WIN)
-  // If we left the high-resolution timer activated, deactivate it now.
-  // Doing this is not-critical, it is mainly to make sure we track
-  // the high resolution timer activations properly in our unit tests.
-  if (!high_resolution_timer_expiration_.is_null()) {
-    Time::ActivateHighResolutionTimer(false);
-    high_resolution_timer_expiration_ = TimeTicks();
-  }
-#endif
-
-  AutoLock lock(incoming_queue_lock_);
-  message_loop_ = NULL;
-}
-
-IncomingTaskQueue::~IncomingTaskQueue() {
-  // Verify that WillDestroyCurrentMessageLoop() has been called.
-  DCHECK(!message_loop_);
-}
-
-TimeTicks IncomingTaskQueue::CalculateDelayedRuntime(TimeDelta delay) {
-  TimeTicks delayed_run_time;
-  if (delay > TimeDelta()) {
-    delayed_run_time = TimeTicks::Now() + delay;
-
-#if defined(OS_WIN)
-    if (high_resolution_timer_expiration_.is_null()) {
-      // Windows timers are granular to 15.6ms.  If we only set high-res
-      // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms,
-      // which as a percentage is pretty inaccurate.  So enable high
-      // res timers for any timer which is within 2x of the granularity.
-      // This is a tradeoff between accuracy and power management.
-      bool needs_high_res_timers = delay.InMilliseconds() <
-          (2 * Time::kMinLowResolutionThresholdMs);
-      if (needs_high_res_timers) {
-        if (Time::ActivateHighResolutionTimer(true)) {
-          high_resolution_timer_expiration_ = TimeTicks::Now() +
-              TimeDelta::FromMilliseconds(
-                  MessageLoop::kHighResolutionTimerModeLeaseTimeMs);
-        }
-      }
-    }
-#endif
-  } else {
-    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
-  }
-
-#if defined(OS_WIN)
-  if (!high_resolution_timer_expiration_.is_null()) {
-    if (TimeTicks::Now() > high_resolution_timer_expiration_) {
-      Time::ActivateHighResolutionTimer(false);
-      high_resolution_timer_expiration_ = TimeTicks();
-    }
-  }
-#endif
-
-  return delayed_run_time;
-}
-
-bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
-  // Warning: Don't try to short-circuit, and handle this thread's tasks more
-  // directly, as it could starve handling of foreign threads.  Put every task
-  // into this queue.
-
-  // This should only be called while the lock is taken.
-  incoming_queue_lock_.AssertAcquired();
-
-  if (!message_loop_) {
-    pending_task->task.Reset();
-    return false;
-  }
-
-  // Initialize the sequence number. The sequence number is used for delayed
-  // tasks (to faciliate FIFO sorting when two tasks have the same
-  // delayed_run_time value) and for identifying the task in about:tracing.
-  pending_task->sequence_num = next_sequence_num_++;
-
-  TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
-      TRACE_ID_MANGLE(message_loop_->GetTaskTraceID(*pending_task)));
-
-  bool was_empty = incoming_queue_.empty();
-  incoming_queue_.push(*pending_task);
-  pending_task->task.Reset();
-
-  // Wake up the pump.
-  message_loop_->ScheduleWork(was_empty);
-
-  return true;
-}
-
-}  // namespace internal
-}  // namespace base
diff --git a/base/message_loop/incoming_task_queue.h b/base/message_loop/incoming_task_queue.h
deleted file mode 100644
index d831a71..0000000
--- a/base/message_loop/incoming_task_queue.h
+++ /dev/null
@@ -1,104 +0,0 @@
-// Copyright 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
-#define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
-
-#include "base/base_export.h"
-#include "base/memory/ref_counted.h"
-#include "base/pending_task.h"
-#include "base/synchronization/lock.h"
-#include "base/time/time.h"
-
-namespace base {
-
-class MessageLoop;
-class WaitableEvent;
-
-namespace internal {
-
-// Implements a queue of tasks posted to the message loop running on the current
-// thread. This class takes care of synchronizing posting tasks from different
-// threads and together with MessageLoop ensures clean shutdown.
-class BASE_EXPORT IncomingTaskQueue
-    : public RefCountedThreadSafe<IncomingTaskQueue> {
- public:
-  explicit IncomingTaskQueue(MessageLoop* message_loop);
-
-  // Appends a task to the incoming queue. Posting of all tasks is routed though
-  // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
-  // task is properly synchronized between different threads.
-  //
-  // Returns true if the task was successfully added to the queue, otherwise
-  // returns false. In all cases, the ownership of |task| is transferred to the
-  // called method.
-  bool AddToIncomingQueue(const tracked_objects::Location& from_here,
-                          const Closure& task,
-                          TimeDelta delay,
-                          bool nestable);
-
-  // Same as AddToIncomingQueue() except that it will avoid blocking if the lock
-  // is already held, and will in that case (when the lock is contended) fail to
-  // add the task, and will return false.
-  bool TryAddToIncomingQueue(const tracked_objects::Location& from_here,
-                             const Closure& task);
-
-  // Returns true if the message loop has high resolution timers enabled.
-  // Provided for testing.
-  bool IsHighResolutionTimerEnabledForTesting();
-
-  // Returns true if the message loop is "idle". Provided for testing.
-  bool IsIdleForTesting();
-
-  // Takes the incoming queue lock, signals |caller_wait| and waits until
-  // |caller_signal| is signalled.
-  void LockWaitUnLockForTesting(WaitableEvent* caller_wait,
-                                WaitableEvent* caller_signal);
-
-  // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
-  // from the thread that is running the loop.
-  void ReloadWorkQueue(TaskQueue* work_queue);
-
-  // Disconnects |this| from the parent message loop.
-  void WillDestroyCurrentMessageLoop();
-
- private:
-  friend class RefCountedThreadSafe<IncomingTaskQueue>;
-  virtual ~IncomingTaskQueue();
-
-  // Calculates the time at which a PendingTask should run.
-  TimeTicks CalculateDelayedRuntime(TimeDelta delay);
-
-  // Adds a task to |incoming_queue_|. The caller retains ownership of
-  // |pending_task|, but this function will reset the value of
-  // |pending_task->task|. This is needed to ensure that the posting call stack
-  // does not retain |pending_task->task| beyond this function call.
-  bool PostPendingTask(PendingTask* pending_task);
-
-#if defined(OS_WIN)
-  TimeTicks high_resolution_timer_expiration_;
-#endif
-
-  // The lock that protects access to |incoming_queue_|, |message_loop_| and
-  // |next_sequence_num_|.
-  base::Lock incoming_queue_lock_;
-
-  // An incoming queue of tasks that are acquired under a mutex for processing
-  // on this instance's thread. These tasks have not yet been been pushed to
-  // |message_loop_|.
-  TaskQueue incoming_queue_;
-
-  // Points to the message loop that owns |this|.
-  MessageLoop* message_loop_;
-
-  // The next sequence number to use for delayed tasks.
-  int next_sequence_num_;
-
-  DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
-};
-
-}  // namespace internal
-}  // namespace base
-
-#endif  // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
diff --git a/base/message_loop/message_loop.cc b/base/message_loop/message_loop.cc
index d2eafbd..bd3542a 100644
--- a/base/message_loop/message_loop.cc
+++ b/base/message_loop/message_loop.cc
@@ -13,6 +13,7 @@
 #include "base/lazy_instance.h"
 #include "base/logging.h"
 #include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/message_loop/message_pump_default.h"
 #include "base/metrics/histogram.h"
 #include "base/metrics/statistics_recorder.h"
@@ -88,6 +89,14 @@
 
 MessageLoop::MessagePumpFactory* message_pump_for_ui_factory_ = NULL;
 
+// Create a process-wide unique ID to represent this task in trace events. This
+// will be mangled with a Process ID hash to reduce the likelyhood of colliding
+// with MessageLoop pointers on other processes.
+uint64 GetTaskTraceID(const PendingTask& task, MessageLoop* loop) {
+  return (static_cast<uint64>(task.sequence_num) << 32) |
+         static_cast<uint64>(reinterpret_cast<intptr_t>(loop));
+}
+
 // Returns true if MessagePump::ScheduleWork() must be called one
 // time for every task that is added to the MessageLoop incoming queue.
 bool AlwaysNotifyPump(MessageLoop::Type type) {
@@ -137,19 +146,18 @@
 
 MessageLoop::MessageLoop(Type type)
     : type_(type),
-      exception_restoration_(false),
       nestable_tasks_allowed_(true),
+      exception_restoration_(false),
+      message_histogram_(NULL),
+      run_loop_(NULL),
 #if defined(OS_WIN)
       os_modal_loop_(false),
 #endif  // OS_WIN
-      message_histogram_(NULL),
-      run_loop_(NULL) {
+      next_sequence_num_(0) {
   DCHECK(!current()) << "should only have one message loop per thread";
   lazy_tls_ptr.Pointer()->Set(this);
 
-  incoming_task_queue_ = new internal::IncomingTaskQueue(this);
-  message_loop_proxy_ =
-      new internal::MessageLoopProxyImpl(incoming_task_queue_);
+  message_loop_proxy_ = new MessageLoopProxyImpl();
   thread_task_runner_handle_.reset(
       new ThreadTaskRunnerHandle(message_loop_proxy_));
 
@@ -169,7 +177,7 @@
 #define MESSAGE_PUMP_UI NULL
 // ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and
 // doesn't require extra support for watching file descriptors.
-#define MESSAGE_PUMP_IO new MessagePumpDefault()
+#define MESSAGE_PUMP_IO new MessagePumpDefault();
 #elif defined(OS_POSIX)  // POSIX but not MACOSX.
 #define MESSAGE_PUMP_UI new MessagePumpForUI()
 #define MESSAGE_PUMP_IO new MessagePumpLibevent()
@@ -179,14 +187,14 @@
 
   if (type_ == TYPE_UI) {
     if (message_pump_for_ui_factory_)
-      pump_.reset(message_pump_for_ui_factory_());
+      pump_ = message_pump_for_ui_factory_();
     else
-      pump_.reset(MESSAGE_PUMP_UI);
+      pump_ = MESSAGE_PUMP_UI;
   } else if (type_ == TYPE_IO) {
-    pump_.reset(MESSAGE_PUMP_IO);
+    pump_ = MESSAGE_PUMP_IO;
   } else {
     DCHECK_EQ(TYPE_DEFAULT, type_);
-    pump_.reset(new MessagePumpDefault());
+    pump_ = new MessagePumpDefault();
   }
 }
 
@@ -218,13 +226,23 @@
 
   thread_task_runner_handle_.reset();
 
-  // Tell the incoming queue that we are dying.
-  incoming_task_queue_->WillDestroyCurrentMessageLoop();
-  incoming_task_queue_ = NULL;
+  // Tell the message_loop_proxy that we are dying.
+  static_cast<MessageLoopProxyImpl*>(message_loop_proxy_.get())->
+      WillDestroyCurrentMessageLoop();
   message_loop_proxy_ = NULL;
 
   // OK, now make it so that no one can find us.
   lazy_tls_ptr.Pointer()->Set(NULL);
+
+#if defined(OS_WIN)
+  // If we left the high-resolution timer activated, deactivate it now.
+  // Doing this is not-critical, it is mainly to make sure we track
+  // the high resolution timer activations properly in our unit tests.
+  if (!high_resolution_timer_expiration_.is_null()) {
+    Time::ActivateHighResolutionTimer(false);
+    high_resolution_timer_expiration_ = TimeTicks();
+  }
+#endif
 }
 
 // static
@@ -265,14 +283,18 @@
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 bool MessageLoop::TryPostTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  return incoming_task_queue_->TryAddToIncomingQueue(from_here, task);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
+  return AddToIncomingQueue(&pending_task, true);
 }
 
 void MessageLoop::PostDelayedTask(
@@ -280,14 +302,18 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(delay), true);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 void MessageLoop::PostNonNestableTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), false);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 void MessageLoop::PostNonNestableDelayedTask(
@@ -295,7 +321,9 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(delay), false);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 void MessageLoop::Run() {
@@ -367,26 +395,17 @@
   task_observers_.RemoveObserver(task_observer);
 }
 
+void MessageLoop::AssertIdle() const {
+  // We only check |incoming_queue_|, since we don't want to lock |work_queue_|.
+  AutoLock lock(incoming_queue_lock_);
+  DCHECK(incoming_queue_.empty());
+}
+
 bool MessageLoop::is_running() const {
   DCHECK_EQ(this, current());
   return run_loop_ != NULL;
 }
 
-bool MessageLoop::IsHighResolutionTimerEnabledForTesting() {
-  return incoming_task_queue_->IsHighResolutionTimerEnabledForTesting();
-}
-
-bool MessageLoop::IsIdleForTesting() {
-  // We only check the imcoming queue|, since we don't want to lock the work
-  // queue.
-  return incoming_task_queue_->IsIdleForTesting();
-}
-
-void MessageLoop::LockWaitUnLockForTesting(WaitableEvent* caller_wait,
-                                           WaitableEvent* caller_signal) {
-  incoming_task_queue_->LockWaitUnLockForTesting(caller_wait, caller_signal);
-}
-
 //------------------------------------------------------------------------------
 
 // Runs the loop in two different SEH modes:
@@ -451,7 +470,7 @@
       tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally);
 
   TRACE_EVENT_FLOW_END1("task", "MessageLoop::PostTask",
-      TRACE_ID_MANGLE(GetTaskTraceID(pending_task)),
+      TRACE_ID_MANGLE(GetTaskTraceID(pending_task, this)),
       "queue_duration",
       (start_time - pending_task.EffectiveTimePosted()).InMilliseconds());
   TRACE_EVENT2("task", "MessageLoop::RunTask",
@@ -504,6 +523,24 @@
   delayed_work_queue_.push(pending_task);
 }
 
+void MessageLoop::ReloadWorkQueue() {
+  // We can improve performance of our loading tasks from incoming_queue_ to
+  // work_queue_ by waiting until the last minute (work_queue_ is empty) to
+  // load.  That reduces the number of locks-per-task significantly when our
+  // queues get large.
+  if (!work_queue_.empty())
+    return;  // Wait till we *really* need to lock and load.
+
+  // Acquire all we can from the inter-thread queue with one lock acquisition.
+  {
+    AutoLock lock(incoming_queue_lock_);
+    if (incoming_queue_.empty())
+      return;
+    incoming_queue_.Swap(&work_queue_);  // Constant time
+    DCHECK(incoming_queue_.empty());
+  }
+}
+
 bool MessageLoop::DeletePendingTasks() {
   bool did_work = !work_queue_.empty();
   while (!work_queue_.empty()) {
@@ -533,25 +570,87 @@
   return did_work;
 }
 
-uint64 MessageLoop::GetTaskTraceID(const PendingTask& task) {
-  return (static_cast<uint64>(task.sequence_num) << 32) |
-         static_cast<uint64>(reinterpret_cast<intptr_t>(this));
+TimeTicks MessageLoop::CalculateDelayedRuntime(TimeDelta delay) {
+  TimeTicks delayed_run_time;
+  if (delay > TimeDelta()) {
+    delayed_run_time = TimeTicks::Now() + delay;
+
+#if defined(OS_WIN)
+    if (high_resolution_timer_expiration_.is_null()) {
+      // Windows timers are granular to 15.6ms.  If we only set high-res
+      // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms,
+      // which as a percentage is pretty inaccurate.  So enable high
+      // res timers for any timer which is within 2x of the granularity.
+      // This is a tradeoff between accuracy and power management.
+      bool needs_high_res_timers = delay.InMilliseconds() <
+          (2 * Time::kMinLowResolutionThresholdMs);
+      if (needs_high_res_timers) {
+        if (Time::ActivateHighResolutionTimer(true)) {
+          high_resolution_timer_expiration_ = TimeTicks::Now() +
+              TimeDelta::FromMilliseconds(kHighResolutionTimerModeLeaseTimeMs);
+        }
+      }
+    }
+#endif
+  } else {
+    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
+  }
+
+#if defined(OS_WIN)
+  if (!high_resolution_timer_expiration_.is_null()) {
+    if (TimeTicks::Now() > high_resolution_timer_expiration_) {
+      Time::ActivateHighResolutionTimer(false);
+      high_resolution_timer_expiration_ = TimeTicks();
+    }
+  }
+#endif
+
+  return delayed_run_time;
 }
 
-void MessageLoop::ReloadWorkQueue() {
-  // We can improve performance of our loading tasks from the incoming queue to
-  // |*work_queue| by waiting until the last minute (|*work_queue| is empty) to
-  // load. That reduces the number of locks-per-task significantly when our
-  // queues get large.
-  if (work_queue_.empty())
-    incoming_task_queue_->ReloadWorkQueue(&work_queue_);
-}
+// Possibly called on a background thread!
+bool MessageLoop::AddToIncomingQueue(PendingTask* pending_task,
+                                     bool use_try_lock) {
+  // Warning: Don't try to short-circuit, and handle this thread's tasks more
+  // directly, as it could starve handling of foreign threads.  Put every task
+  // into this queue.
 
-void MessageLoop::ScheduleWork(bool was_empty) {
-  // The Android UI message loop needs to get notified each time
-  // a task is added to the incoming queue.
-  if (was_empty || AlwaysNotifyPump(type_))
-    pump_->ScheduleWork();
+  scoped_refptr<MessagePump> pump;
+  {
+    if (use_try_lock) {
+      if (!incoming_queue_lock_.Try()) {
+        pending_task->task.Reset();
+        return false;
+      }
+    } else {
+      incoming_queue_lock_.Acquire();
+    }
+    AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
+    // Initialize the sequence number. The sequence number is used for delayed
+    // tasks (to faciliate FIFO sorting when two tasks have the same
+    // delayed_run_time value) and for identifying the task in about:tracing.
+    pending_task->sequence_num = next_sequence_num_++;
+
+    TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
+        TRACE_ID_MANGLE(GetTaskTraceID(*pending_task, this)));
+
+    bool was_empty = incoming_queue_.empty();
+    incoming_queue_.push(*pending_task);
+    pending_task->task.Reset();
+    // The Android UI message loop needs to get notified each time
+    // a task is added to the incoming queue.
+    if (!was_empty && !AlwaysNotifyPump(type_))
+      return true;  // Someone else should have started the sub-pump.
+
+    pump = pump_;
+  }
+  // Since the incoming_queue_ may contain a task that destroys this message
+  // loop, we cannot exit incoming_queue_lock_ until we are done with |this|.
+  // We use a stack-based reference to the message pump so that we can call
+  // ScheduleWork outside of incoming_queue_lock_.
+
+  pump->ScheduleWork();
+  return true;
 }
 
 //------------------------------------------------------------------------------
diff --git a/base/message_loop/message_loop.h b/base/message_loop/message_loop.h
index 6f71a85..846cc8d 100644
--- a/base/message_loop/message_loop.h
+++ b/base/message_loop/message_loop.h
@@ -13,10 +13,7 @@
 #include "base/callback_forward.h"
 #include "base/location.h"
 #include "base/memory/ref_counted.h"
-#include "base/memory/scoped_ptr.h"
-#include "base/message_loop/incoming_task_queue.h"
 #include "base/message_loop/message_loop_proxy.h"
-#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/message_loop/message_pump.h"
 #include "base/observer_list.h"
 #include "base/pending_task.h"
@@ -49,12 +46,12 @@
 namespace base {
 
 class HistogramBase;
+class MessageLoopLockTest;
 class RunLoop;
 class ThreadTaskRunnerHandle;
 #if defined(OS_ANDROID)
 class MessagePumpForUI;
 #endif
-class WaitableEvent;
 
 // A MessageLoop is used to process events for a particular thread.  There is
 // at most one MessageLoop instance per thread.
@@ -286,7 +283,7 @@
 
   // Gets the message loop proxy associated with this message loop.
   scoped_refptr<MessageLoopProxy> message_loop_proxy() {
-    return message_loop_proxy_;
+    return message_loop_proxy_.get();
   }
 
   // Enables or disables the recursive task processing. This happens in the case
@@ -362,10 +359,23 @@
   void AddTaskObserver(TaskObserver* task_observer);
   void RemoveTaskObserver(TaskObserver* task_observer);
 
+  // Returns true if the message loop has high resolution timers enabled.
+  // Provided for testing.
+  bool high_resolution_timers_enabled() {
+#if defined(OS_WIN)
+    return !high_resolution_timer_expiration_.is_null();
+#else
+    return true;
+#endif
+  }
+
   // When we go into high resolution timer mode, we will stay in hi-res mode
   // for at least 1s.
   static const int kHighResolutionTimerModeLeaseTimeMs = 1000;
 
+  // Asserts that the MessageLoop is "idle".
+  void AssertIdle() const;
+
 #if defined(OS_WIN)
   void set_os_modal_loop(bool os_modal_loop) {
     os_modal_loop_ = os_modal_loop;
@@ -379,18 +389,6 @@
   // Can only be called from the thread that owns the MessageLoop.
   bool is_running() const;
 
-  // Returns true if the message loop has high resolution timers enabled.
-  // Provided for testing.
-  bool IsHighResolutionTimerEnabledForTesting();
-
-  // Returns true if the message loop is "idle". Provided for testing.
-  bool IsIdleForTesting();
-
-  // Takes the incoming queue lock, signals |caller_wait| and waits until
-  // |caller_signal| is signalled.
-  void LockWaitUnLockForTesting(WaitableEvent* caller_wait,
-                                WaitableEvent* caller_signal);
-
   //----------------------------------------------------------------------------
  protected:
 
@@ -404,11 +402,11 @@
   }
 #endif
 
-  scoped_ptr<MessagePump> pump_;
+  scoped_refptr<MessagePump> pump_;
 
  private:
-  friend class internal::IncomingTaskQueue;
   friend class RunLoop;
+  friend class MessageLoopLockTest;
 
   // A function to encapsulate all the exception handling capability in the
   // stacks around the running of a main message loop.  It will run the message
@@ -438,23 +436,34 @@
   // Adds the pending task to delayed_work_queue_.
   void AddToDelayedWorkQueue(const PendingTask& pending_task);
 
+  // This function attempts to add pending task to our incoming_queue_.
+  // The append can only possibly fail when |use_try_lock| is true.
+  //
+  // When |use_try_lock| is true, then this call will avoid blocking if
+  // the related lock is already held, and will in that case (when the
+  // lock is contended) fail to perform the append, and will return false.
+  //
+  // If the call succeeds to append to the queue, then this call
+  // will return true.
+  //
+  // In all cases, the caller retains ownership of |pending_task|, but this
+  // function will reset the value of pending_task->task.  This is needed to
+  // ensure that the posting call stack does not retain pending_task->task
+  // beyond this function call.
+  bool AddToIncomingQueue(PendingTask* pending_task, bool use_try_lock);
+
+  // Load tasks from the incoming_queue_ into work_queue_ if the latter is
+  // empty.  The former requires a lock to access, while the latter is directly
+  // accessible on this thread.
+  void ReloadWorkQueue();
+
   // Delete tasks that haven't run yet without running them.  Used in the
   // destructor to make sure all the task's destructors get called.  Returns
   // true if some work was done.
   bool DeletePendingTasks();
 
-  // Creates a process-wide unique ID to represent this task in trace events.
-  // This will be mangled with a Process ID hash to reduce the likelyhood of
-  // colliding with MessageLoop pointers on other processes.
-  uint64 GetTaskTraceID(const PendingTask& task);
-
-  // Loads tasks from the incoming queue to |work_queue_| if the latter is
-  // empty.
-  void ReloadWorkQueue();
-
-  // Wakes up the message pump. Can be called on any thread. The caller is
-  // responsible for synchronizing ScheduleWork() calls.
-  void ScheduleWork(bool was_empty);
+  // Calculates the time at which a PendingTask should run.
+  TimeTicks CalculateDelayedRuntime(TimeDelta delay);
 
   // Start recording histogram info about events and action IF it was enabled
   // and IF the statistics recorder can accept a registration of our histogram.
@@ -489,30 +498,40 @@
 
   ObserverList<DestructionObserver> destruction_observers_;
 
-  bool exception_restoration_;
-
   // A recursion block that prevents accidentally running additional tasks when
   // insider a (accidentally induced?) nested message pump.
   bool nestable_tasks_allowed_;
 
-#if defined(OS_WIN)
-  // Should be set to true before calling Windows APIs like TrackPopupMenu, etc
-  // which enter a modal message loop.
-  bool os_modal_loop_;
-#endif
+  bool exception_restoration_;
 
   std::string thread_name_;
   // A profiling histogram showing the counts of various messages and events.
   HistogramBase* message_histogram_;
 
+  // An incoming queue of tasks that are acquired under a mutex for processing
+  // on this instance's thread. These tasks have not yet been sorted out into
+  // items for our work_queue_ vs delayed_work_queue_.
+  TaskQueue incoming_queue_;
+  // Protect access to incoming_queue_.
+  mutable Lock incoming_queue_lock_;
+
   RunLoop* run_loop_;
 
+#if defined(OS_WIN)
+  TimeTicks high_resolution_timer_expiration_;
+  // Should be set to true before calling Windows APIs like TrackPopupMenu, etc
+  // which enter a modal message loop.
+  bool os_modal_loop_;
+#endif
+
+  // The next sequence number to use for delayed tasks. Updating this counter is
+  // protected by incoming_queue_lock_.
+  int next_sequence_num_;
+
   ObserverList<TaskObserver> task_observers_;
 
-  scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;
-
-  // The message loop proxy associated with this message loop.
-  scoped_refptr<internal::MessageLoopProxyImpl> message_loop_proxy_;
+  // The message loop proxy associated with this message loop, if one exists.
+  scoped_refptr<MessageLoopProxy> message_loop_proxy_;
   scoped_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle_;
 
   template <class T, class R> friend class base::subtle::DeleteHelperInternal;
diff --git a/base/message_loop/message_loop_proxy_impl.cc b/base/message_loop/message_loop_proxy_impl.cc
index b7abca3..7dc8caa 100644
--- a/base/message_loop/message_loop_proxy_impl.cc
+++ b/base/message_loop/message_loop_proxy_impl.cc
@@ -5,43 +5,81 @@
 #include "base/message_loop/message_loop_proxy_impl.h"
 
 #include "base/location.h"
-#include "base/logging.h"
-#include "base/message_loop/incoming_task_queue.h"
-#include "base/message_loop/message_loop.h"
+#include "base/threading/thread_restrictions.h"
 
 namespace base {
-namespace internal {
 
-MessageLoopProxyImpl::MessageLoopProxyImpl(
-    scoped_refptr<IncomingTaskQueue> incoming_queue)
-    : incoming_queue_(incoming_queue),
-      valid_thread_id_(PlatformThread::CurrentId()) {
+MessageLoopProxyImpl::~MessageLoopProxyImpl() {
 }
 
 bool MessageLoopProxyImpl::PostDelayedTask(
     const tracked_objects::Location& from_here,
     const base::Closure& task,
     base::TimeDelta delay) {
-  DCHECK(!task.is_null()) << from_here.ToString();
-  return incoming_queue_->AddToIncomingQueue(from_here, task, delay, true);
+  return PostTaskHelper(from_here, task, delay, true);
 }
 
 bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
     const tracked_objects::Location& from_here,
     const base::Closure& task,
     base::TimeDelta delay) {
-  DCHECK(!task.is_null()) << from_here.ToString();
-  return incoming_queue_->AddToIncomingQueue(from_here, task, delay, false);
+  return PostTaskHelper(from_here, task, delay, false);
 }
 
 bool MessageLoopProxyImpl::RunsTasksOnCurrentThread() const {
-  return valid_thread_id_ == PlatformThread::CurrentId();
+  // We shouldn't use MessageLoop::current() since it uses LazyInstance which
+  // may be deleted by ~AtExitManager when a WorkerPool thread calls this
+  // function.
+  // http://crbug.com/63678
+  base::ThreadRestrictions::ScopedAllowSingleton allow_singleton;
+  AutoLock lock(message_loop_lock_);
+  return (target_message_loop_ &&
+          (MessageLoop::current() == target_message_loop_));
 }
 
-MessageLoopProxyImpl::~MessageLoopProxyImpl() {
+// MessageLoop::DestructionObserver implementation
+void MessageLoopProxyImpl::WillDestroyCurrentMessageLoop() {
+  AutoLock lock(message_loop_lock_);
+  target_message_loop_ = NULL;
 }
 
-}  // namespace internal
+void MessageLoopProxyImpl::OnDestruct() const {
+  // We shouldn't use MessageLoop::current() since it uses LazyInstance which
+  // may be deleted by ~AtExitManager when a WorkerPool thread calls this
+  // function.
+  // http://crbug.com/63678
+  base::ThreadRestrictions::ScopedAllowSingleton allow_singleton;
+  bool delete_later = false;
+  {
+    AutoLock lock(message_loop_lock_);
+    if (target_message_loop_ &&
+        (MessageLoop::current() != target_message_loop_)) {
+      target_message_loop_->DeleteSoon(FROM_HERE, this);
+      delete_later = true;
+    }
+  }
+  if (!delete_later)
+    delete this;
+}
+
+MessageLoopProxyImpl::MessageLoopProxyImpl()
+    : target_message_loop_(MessageLoop::current()) {
+}
+
+bool MessageLoopProxyImpl::PostTaskHelper(
+    const tracked_objects::Location& from_here, const base::Closure& task,
+    base::TimeDelta delay, bool nestable) {
+  AutoLock lock(message_loop_lock_);
+  if (target_message_loop_) {
+    if (nestable) {
+      target_message_loop_->PostDelayedTask(from_here, task, delay);
+    } else {
+      target_message_loop_->PostNonNestableDelayedTask(from_here, task, delay);
+    }
+    return true;
+  }
+  return false;
+}
 
 scoped_refptr<MessageLoopProxy>
 MessageLoopProxy::current() {
diff --git a/base/message_loop/message_loop_proxy_impl.h b/base/message_loop/message_loop_proxy_impl.h
index b7f62b9..6d6f0f6 100644
--- a/base/message_loop/message_loop_proxy_impl.h
+++ b/base/message_loop/message_loop_proxy_impl.h
@@ -6,24 +6,17 @@
 #define BASE_MESSAGE_LOOP_MESSAGE_LOOP_PROXY_IMPL_H_
 
 #include "base/base_export.h"
-#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop.h"
 #include "base/message_loop/message_loop_proxy.h"
-#include "base/pending_task.h"
-#include "base/threading/platform_thread.h"
+#include "base/synchronization/lock.h"
 
 namespace base {
-namespace internal {
-
-class IncomingTaskQueue;
 
 // A stock implementation of MessageLoopProxy that is created and managed by a
 // MessageLoop. For now a MessageLoopProxyImpl can only be created as part of a
 // MessageLoop.
 class BASE_EXPORT MessageLoopProxyImpl : public MessageLoopProxy {
  public:
-  explicit MessageLoopProxyImpl(
-      scoped_refptr<IncomingTaskQueue> incoming_queue);
-
   // MessageLoopProxy implementation
   virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
                                const base::Closure& task,
@@ -34,20 +27,36 @@
       base::TimeDelta delay) OVERRIDE;
   virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
 
- private:
-  friend class RefCountedThreadSafe<MessageLoopProxyImpl>;
+ protected:
   virtual ~MessageLoopProxyImpl();
 
-  // THe incoming queue receiving all posted tasks.
-  scoped_refptr<IncomingTaskQueue> incoming_queue_;
+  // Override OnDestruct so that we can delete the object on the target message
+  // loop if it still exists.
+  virtual void OnDestruct() const OVERRIDE;
 
-  // ID of the thread |this| was created on.
-  PlatformThreadId valid_thread_id_;
+ private:
+  // Allow the MessageLoop to create a MessageLoopProxyImpl.
+  friend class MessageLoop;
+  friend class DeleteHelper<MessageLoopProxyImpl>;
+
+  MessageLoopProxyImpl();
+
+  // Called directly by MessageLoop::~MessageLoop.
+  virtual void WillDestroyCurrentMessageLoop();
+
+
+  bool PostTaskHelper(const tracked_objects::Location& from_here,
+                      const base::Closure& task,
+                      base::TimeDelta delay,
+                      bool nestable);
+
+  // The lock that protects access to target_message_loop_.
+  mutable base::Lock message_loop_lock_;
+  MessageLoop* target_message_loop_;
 
   DISALLOW_COPY_AND_ASSIGN(MessageLoopProxyImpl);
 };
 
-}  // namespace internal
 }  // namespace base
 
 #endif  // BASE_MESSAGE_LOOP_MESSAGE_LOOP_PROXY_IMPL_H_
diff --git a/base/message_loop/message_loop_unittest.cc b/base/message_loop/message_loop_unittest.cc
index ab05b3c..504c8e3 100644
--- a/base/message_loop/message_loop_unittest.cc
+++ b/base/message_loop/message_loop_unittest.cc
@@ -10,7 +10,6 @@
 #include "base/logging.h"
 #include "base/memory/ref_counted.h"
 #include "base/message_loop/message_loop.h"
-#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/pending_task.h"
 #include "base/posix/eintr_wrapper.h"
 #include "base/run_loop.h"
@@ -27,6 +26,19 @@
 
 namespace base {
 
+class MessageLoopLockTest {
+ public:
+  static void LockWaitUnLock(MessageLoop* loop,
+                             WaitableEvent* caller_wait,
+                             WaitableEvent* caller_signal) {
+
+    loop->incoming_queue_lock_.Acquire();
+    caller_wait->Signal();
+    caller_signal->Wait();
+    loop->incoming_queue_lock_.Release();
+  }
+};
+
 // TODO(darin): Platform-specific MessageLoop tests should be grouped together
 // to avoid chopping this file up with so many #ifdefs.
 
@@ -109,10 +121,10 @@
   thread.Start();
   thread.message_loop()->PostTask(
       FROM_HERE,
-      Bind(&MessageLoop::LockWaitUnLockForTesting,
-           base::Unretained(MessageLoop::current()),
-           &wait,
-           &signal));
+      Bind(&MessageLoopLockTest::LockWaitUnLock,
+      MessageLoop::current(),
+      &wait,
+      &signal));
 
   wait.Wait();
   EXPECT_FALSE(MessageLoop::current()->TryPostTask(FROM_HERE, Bind(
@@ -1883,20 +1895,20 @@
   const TimeDelta kFastTimer = TimeDelta::FromMilliseconds(5);
   const TimeDelta kSlowTimer = TimeDelta::FromMilliseconds(100);
 
-  EXPECT_FALSE(loop.IsHighResolutionTimerEnabledForTesting());
+  EXPECT_FALSE(loop.high_resolution_timers_enabled());
 
   // Post a fast task to enable the high resolution timers.
   loop.PostDelayedTask(FROM_HERE, Bind(&PostNTasksThenQuit, 1),
                        kFastTimer);
   loop.Run();
-  EXPECT_TRUE(loop.IsHighResolutionTimerEnabledForTesting());
+  EXPECT_TRUE(loop.high_resolution_timers_enabled());
 
   // Post a slow task and verify high resolution timers
   // are still enabled.
   loop.PostDelayedTask(FROM_HERE, Bind(&PostNTasksThenQuit, 1),
                        kSlowTimer);
   loop.Run();
-  EXPECT_TRUE(loop.IsHighResolutionTimerEnabledForTesting());
+  EXPECT_TRUE(loop.high_resolution_timers_enabled());
 
   // Wait for a while so that high-resolution mode elapses.
   PlatformThread::Sleep(TimeDelta::FromMilliseconds(
@@ -1906,7 +1918,7 @@
   loop.PostDelayedTask(FROM_HERE, Bind(&PostNTasksThenQuit, 1),
                        kSlowTimer);
   loop.Run();
-  EXPECT_FALSE(loop.IsHighResolutionTimerEnabledForTesting());
+  EXPECT_FALSE(loop.high_resolution_timers_enabled());
 }
 
 #endif  // defined(OS_WIN)
diff --git a/base/message_loop/message_pump.h b/base/message_loop/message_pump.h
index 0ebba3a..5b72232 100644
--- a/base/message_loop/message_pump.h
+++ b/base/message_loop/message_pump.h
@@ -6,13 +6,13 @@
 #define BASE_MESSAGE_LOOP_MESSAGE_PUMP_H_
 
 #include "base/base_export.h"
-#include "base/threading/non_thread_safe.h"
+#include "base/memory/ref_counted.h"
 
 namespace base {
 
 class TimeTicks;
 
-class BASE_EXPORT MessagePump : public NonThreadSafe {
+class BASE_EXPORT MessagePump : public RefCountedThreadSafe<MessagePump> {
  public:
   // Please see the comments above the Run method for an illustration of how
   // these delegate methods are used.
@@ -42,7 +42,6 @@
   };
 
   MessagePump();
-  virtual ~MessagePump();
 
   // The Run method is called to enter the message pump's run loop.
   //
@@ -119,6 +118,10 @@
   // cancelling any pending DoDelayedWork callback.  This method may only be
   // used on the thread that called Run.
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) = 0;
+
+ protected:
+  virtual ~MessagePump();
+  friend class RefCountedThreadSafe<MessagePump>;
 };
 
 }  // namespace base
diff --git a/base/message_loop/message_pump_android.h b/base/message_loop/message_pump_android.h
index 8a07a0f..f3a8ded 100644
--- a/base/message_loop/message_pump_android.h
+++ b/base/message_loop/message_pump_android.h
@@ -22,7 +22,6 @@
 class BASE_EXPORT MessagePumpForUI : public MessagePump {
  public:
   MessagePumpForUI();
-  virtual ~MessagePumpForUI();
 
   virtual void Run(Delegate* delegate) OVERRIDE;
   virtual void Quit() OVERRIDE;
@@ -33,6 +32,9 @@
 
   static bool RegisterBindings(JNIEnv* env);
 
+ protected:
+  virtual ~MessagePumpForUI();
+
  private:
   RunLoop* run_loop_;
   base::android::ScopedJavaGlobalRef<jobject> system_message_handler_obj_;
diff --git a/base/message_loop/message_pump_aurax11.cc b/base/message_loop/message_pump_aurax11.cc
index 1f91a0e..7f60628 100644
--- a/base/message_loop/message_pump_aurax11.cc
+++ b/base/message_loop/message_pump_aurax11.cc
@@ -139,13 +139,6 @@
   x_root_window_ = DefaultRootWindow(g_xdisplay);
 }
 
-MessagePumpAuraX11::~MessagePumpAuraX11() {
-  g_source_destroy(x_source_);
-  g_source_unref(x_source_);
-  XCloseDisplay(g_xdisplay);
-  g_xdisplay = NULL;
-}
-
 // static
 Display* MessagePumpAuraX11::GetDefaultXDisplay() {
   if (!g_xdisplay)
@@ -218,6 +211,13 @@
   } while (event.type != MapNotify);
 }
 
+MessagePumpAuraX11::~MessagePumpAuraX11() {
+  g_source_destroy(x_source_);
+  g_source_unref(x_source_);
+  XCloseDisplay(g_xdisplay);
+  g_xdisplay = NULL;
+}
+
 void MessagePumpAuraX11::InitXSource() {
   // CHECKs are to help track down crbug.com/113106.
   CHECK(!x_source_);
diff --git a/base/message_loop/message_pump_aurax11.h b/base/message_loop/message_pump_aurax11.h
index 89089ad..f11b3b4 100644
--- a/base/message_loop/message_pump_aurax11.h
+++ b/base/message_loop/message_pump_aurax11.h
@@ -35,7 +35,6 @@
                                        public MessagePumpDispatcher {
  public:
   MessagePumpAuraX11();
-  virtual ~MessagePumpAuraX11();
 
   // Returns default X Display.
   static Display* GetDefaultXDisplay();
@@ -73,6 +72,9 @@
   // functions which require a mapped window.
   void BlockUntilWindowMapped(unsigned long xid);
 
+ protected:
+  virtual ~MessagePumpAuraX11();
+
  private:
   typedef std::map<unsigned long, MessagePumpDispatcher*> DispatchersMap;
 
diff --git a/base/message_loop/message_pump_default.cc b/base/message_loop/message_pump_default.cc
index 27c19e0..b36ff21 100644
--- a/base/message_loop/message_pump_default.cc
+++ b/base/message_loop/message_pump_default.cc
@@ -18,9 +18,6 @@
       event_(false, false) {
 }
 
-MessagePumpDefault::~MessagePumpDefault() {
-}
-
 void MessagePumpDefault::Run(Delegate* delegate) {
   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
 
diff --git a/base/message_loop/message_pump_default.h b/base/message_loop/message_pump_default.h
index a9b83e8..07a6c70 100644
--- a/base/message_loop/message_pump_default.h
+++ b/base/message_loop/message_pump_default.h
@@ -14,7 +14,6 @@
 class MessagePumpDefault : public MessagePump {
  public:
   MessagePumpDefault();
-  virtual ~MessagePumpDefault();
 
   // MessagePump methods:
   virtual void Run(Delegate* delegate) OVERRIDE;
@@ -22,6 +21,9 @@
   virtual void ScheduleWork() OVERRIDE;
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
 
+ protected:
+  virtual ~MessagePumpDefault() {}
+
  private:
   // This flag is set to false when Run should return.
   bool keep_running_;
diff --git a/base/message_loop/message_pump_glib.cc b/base/message_loop/message_pump_glib.cc
index cfacb7b..de012fd 100644
--- a/base/message_loop/message_pump_glib.cc
+++ b/base/message_loop/message_pump_glib.cc
@@ -160,13 +160,6 @@
   g_source_attach(work_source_, context_);
 }
 
-MessagePumpGlib::~MessagePumpGlib() {
-  g_source_destroy(work_source_);
-  g_source_unref(work_source_);
-  close(wakeup_pipe_read_);
-  close(wakeup_pipe_write_);
-}
-
 void MessagePumpGlib::RunWithDispatcher(Delegate* delegate,
                                         MessagePumpDispatcher* dispatcher) {
 #ifndef NDEBUG
@@ -327,6 +320,13 @@
   ScheduleWork();
 }
 
+MessagePumpGlib::~MessagePumpGlib() {
+  g_source_destroy(work_source_);
+  g_source_unref(work_source_);
+  close(wakeup_pipe_read_);
+  close(wakeup_pipe_write_);
+}
+
 MessagePumpDispatcher* MessagePumpGlib::GetDispatcher() {
   return state_ ? state_->dispatcher : NULL;
 }
diff --git a/base/message_loop/message_pump_glib.h b/base/message_loop/message_pump_glib.h
index 33690d0..775470e 100644
--- a/base/message_loop/message_pump_glib.h
+++ b/base/message_loop/message_pump_glib.h
@@ -35,7 +35,6 @@
 class BASE_EXPORT MessagePumpGlib : public MessagePump {
  public:
   MessagePumpGlib();
-  virtual ~MessagePumpGlib();
 
   // Like MessagePump::Run, but events are routed through dispatcher.
   virtual void RunWithDispatcher(Delegate* delegate,
@@ -65,6 +64,8 @@
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
 
  protected:
+  virtual ~MessagePumpGlib();
+
   // Returns the dispatcher for the current run state (|state_->dispatcher|).
   MessagePumpDispatcher* GetDispatcher();
 
diff --git a/base/message_loop/message_pump_gtk.cc b/base/message_loop/message_pump_gtk.cc
index ad65113..8fa8cf2 100644
--- a/base/message_loop/message_pump_gtk.cc
+++ b/base/message_loop/message_pump_gtk.cc
@@ -65,11 +65,6 @@
   gdk_event_handler_set(&EventDispatcher, this, NULL);
 }
 
-MessagePumpGtk::~MessagePumpGtk() {
-  gdk_event_handler_set(reinterpret_cast<GdkEventFunc>(gtk_main_do_event),
-                        this, NULL);
-}
-
 void MessagePumpGtk::DispatchEvents(GdkEvent* event) {
   UNSHIPPED_TRACE_EVENT1("task", "MessagePumpGtk::DispatchEvents",
                          "type", EventToTypeString(event));
@@ -97,6 +92,11 @@
   return GDK_DISPLAY_XDISPLAY(display);
 }
 
+MessagePumpGtk::~MessagePumpGtk() {
+  gdk_event_handler_set(reinterpret_cast<GdkEventFunc>(gtk_main_do_event),
+                        this, NULL);
+}
+
 void MessagePumpGtk::WillProcessEvent(GdkEvent* event) {
   FOR_EACH_OBSERVER(MessagePumpObserver, observers(), WillProcessEvent(event));
 }
diff --git a/base/message_loop/message_pump_gtk.h b/base/message_loop/message_pump_gtk.h
index 947ab88..e22e04f 100644
--- a/base/message_loop/message_pump_gtk.h
+++ b/base/message_loop/message_pump_gtk.h
@@ -43,7 +43,6 @@
 class BASE_EXPORT MessagePumpGtk : public MessagePumpGlib {
  public:
   MessagePumpGtk();
-  virtual ~MessagePumpGtk();
 
   // Dispatch an available GdkEvent. Essentially this allows a subclass to do
   // some task before/after calling the default handler (EventDispatcher).
@@ -52,6 +51,9 @@
   // Returns default X Display.
   static Display* GetDefaultXDisplay();
 
+ protected:
+  virtual ~MessagePumpGtk();
+
  private:
   // Invoked from EventDispatcher. Notifies all observers we're about to
   // process an event.
diff --git a/base/message_loop/message_pump_libevent.h b/base/message_loop/message_pump_libevent.h
index f3a48a9..3b1d26d 100644
--- a/base/message_loop/message_pump_libevent.h
+++ b/base/message_loop/message_pump_libevent.h
@@ -98,7 +98,6 @@
   };
 
   MessagePumpLibevent();
-  virtual ~MessagePumpLibevent();
 
   // Have the current thread's message loop watch for a a situation in which
   // reading/writing to the FD can be performed without blocking.
@@ -127,6 +126,9 @@
   virtual void ScheduleWork() OVERRIDE;
   virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
 
+ protected:
+  virtual ~MessagePumpLibevent();
+
  private:
   friend class MessagePumpLibeventTest;
 
diff --git a/base/message_loop/message_pump_libevent_unittest.cc b/base/message_loop/message_pump_libevent_unittest.cc
index 52ca95b..657ac7d 100644
--- a/base/message_loop/message_pump_libevent_unittest.cc
+++ b/base/message_loop/message_pump_libevent_unittest.cc
@@ -122,7 +122,7 @@
 };
 
 TEST_F(MessagePumpLibeventTest, DeleteWatcher) {
-  scoped_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
+  scoped_refptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
   MessagePumpLibevent::FileDescriptorWatcher* watcher =
       new MessagePumpLibevent::FileDescriptorWatcher;
   DeleteWatcher delegate(watcher);
@@ -147,7 +147,7 @@
 };
 
 TEST_F(MessagePumpLibeventTest, StopWatcher) {
-  scoped_ptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
+  scoped_refptr<MessagePumpLibevent> pump(new MessagePumpLibevent);
   MessagePumpLibevent::FileDescriptorWatcher watcher;
   StopWatcher delegate(&watcher);
   pump->WatchFileDescriptor(pipefds_[1],
diff --git a/base/message_loop/message_pump_ozone.h b/base/message_loop/message_pump_ozone.h
index edcdf2e..de75ab4 100644
--- a/base/message_loop/message_pump_ozone.h
+++ b/base/message_loop/message_pump_ozone.h
@@ -20,7 +20,6 @@
                                      public MessagePumpDispatcher {
  public:
   MessagePumpOzone();
-  virtual ~MessagePumpOzone();
 
   // Returns the UI message pump.
   static MessagePumpOzone* Current();
@@ -40,6 +39,7 @@
   virtual bool Dispatch(const NativeEvent& event) OVERRIDE;
 
  private:
+  virtual ~MessagePumpOzone();
   std::vector<MessagePumpDispatcher*> dispatcher_;
 
   DISALLOW_COPY_AND_ASSIGN(MessagePumpOzone);