Made MessagePump a non-thread safe class.

This CL makes MessagePump a non-thread safe class to make sure thread-bound resources (such as the UI window used for pumping messages on Windows) are freed on the correct thread.

Handling of incoming tasks and synchronization between different threads was moved out to a separate class - IncomingTaskQueue reducing the number of locks to be taken while posting a task to one. Posting tasks via both MessageLoop and MessageLoopProxyImpl is now routed via IncomingTaskQueue.

BUG=241939

Review URL: https://chromiumcodereview.appspot.com/17567007

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@212948 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: b908293642bab4631101157c0aac4fb89fd6c287
diff --git a/base/message_loop/message_loop.cc b/base/message_loop/message_loop.cc
index bd3542a..d2eafbd 100644
--- a/base/message_loop/message_loop.cc
+++ b/base/message_loop/message_loop.cc
@@ -13,7 +13,6 @@
 #include "base/lazy_instance.h"
 #include "base/logging.h"
 #include "base/memory/scoped_ptr.h"
-#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/message_loop/message_pump_default.h"
 #include "base/metrics/histogram.h"
 #include "base/metrics/statistics_recorder.h"
@@ -89,14 +88,6 @@
 
 MessageLoop::MessagePumpFactory* message_pump_for_ui_factory_ = NULL;
 
-// Create a process-wide unique ID to represent this task in trace events. This
-// will be mangled with a Process ID hash to reduce the likelyhood of colliding
-// with MessageLoop pointers on other processes.
-uint64 GetTaskTraceID(const PendingTask& task, MessageLoop* loop) {
-  return (static_cast<uint64>(task.sequence_num) << 32) |
-         static_cast<uint64>(reinterpret_cast<intptr_t>(loop));
-}
-
 // Returns true if MessagePump::ScheduleWork() must be called one
 // time for every task that is added to the MessageLoop incoming queue.
 bool AlwaysNotifyPump(MessageLoop::Type type) {
@@ -146,18 +137,19 @@
 
 MessageLoop::MessageLoop(Type type)
     : type_(type),
-      nestable_tasks_allowed_(true),
       exception_restoration_(false),
-      message_histogram_(NULL),
-      run_loop_(NULL),
+      nestable_tasks_allowed_(true),
 #if defined(OS_WIN)
       os_modal_loop_(false),
 #endif  // OS_WIN
-      next_sequence_num_(0) {
+      message_histogram_(NULL),
+      run_loop_(NULL) {
   DCHECK(!current()) << "should only have one message loop per thread";
   lazy_tls_ptr.Pointer()->Set(this);
 
-  message_loop_proxy_ = new MessageLoopProxyImpl();
+  incoming_task_queue_ = new internal::IncomingTaskQueue(this);
+  message_loop_proxy_ =
+      new internal::MessageLoopProxyImpl(incoming_task_queue_);
   thread_task_runner_handle_.reset(
       new ThreadTaskRunnerHandle(message_loop_proxy_));
 
@@ -177,7 +169,7 @@
 #define MESSAGE_PUMP_UI NULL
 // ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and
 // doesn't require extra support for watching file descriptors.
-#define MESSAGE_PUMP_IO new MessagePumpDefault();
+#define MESSAGE_PUMP_IO new MessagePumpDefault()
 #elif defined(OS_POSIX)  // POSIX but not MACOSX.
 #define MESSAGE_PUMP_UI new MessagePumpForUI()
 #define MESSAGE_PUMP_IO new MessagePumpLibevent()
@@ -187,14 +179,14 @@
 
   if (type_ == TYPE_UI) {
     if (message_pump_for_ui_factory_)
-      pump_ = message_pump_for_ui_factory_();
+      pump_.reset(message_pump_for_ui_factory_());
     else
-      pump_ = MESSAGE_PUMP_UI;
+      pump_.reset(MESSAGE_PUMP_UI);
   } else if (type_ == TYPE_IO) {
-    pump_ = MESSAGE_PUMP_IO;
+    pump_.reset(MESSAGE_PUMP_IO);
   } else {
     DCHECK_EQ(TYPE_DEFAULT, type_);
-    pump_ = new MessagePumpDefault();
+    pump_.reset(new MessagePumpDefault());
   }
 }
 
@@ -226,23 +218,13 @@
 
   thread_task_runner_handle_.reset();
 
-  // Tell the message_loop_proxy that we are dying.
-  static_cast<MessageLoopProxyImpl*>(message_loop_proxy_.get())->
-      WillDestroyCurrentMessageLoop();
+  // Tell the incoming queue that we are dying.
+  incoming_task_queue_->WillDestroyCurrentMessageLoop();
+  incoming_task_queue_ = NULL;
   message_loop_proxy_ = NULL;
 
   // OK, now make it so that no one can find us.
   lazy_tls_ptr.Pointer()->Set(NULL);
-
-#if defined(OS_WIN)
-  // If we left the high-resolution timer activated, deactivate it now.
-  // Doing this is not-critical, it is mainly to make sure we track
-  // the high resolution timer activations properly in our unit tests.
-  if (!high_resolution_timer_expiration_.is_null()) {
-    Time::ActivateHighResolutionTimer(false);
-    high_resolution_timer_expiration_ = TimeTicks();
-  }
-#endif
 }
 
 // static
@@ -283,18 +265,14 @@
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
 }
 
 bool MessageLoop::TryPostTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
-  return AddToIncomingQueue(&pending_task, true);
+  return incoming_task_queue_->TryAddToIncomingQueue(from_here, task);
 }
 
 void MessageLoop::PostDelayedTask(
@@ -302,18 +280,14 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(delay), true);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true);
 }
 
 void MessageLoop::PostNonNestableTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(TimeDelta()), false);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
 }
 
 void MessageLoop::PostNonNestableDelayedTask(
@@ -321,9 +295,7 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  PendingTask pending_task(
-      from_here, task, CalculateDelayedRuntime(delay), false);
-  AddToIncomingQueue(&pending_task, false);
+  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false);
 }
 
 void MessageLoop::Run() {
@@ -395,17 +367,26 @@
   task_observers_.RemoveObserver(task_observer);
 }
 
-void MessageLoop::AssertIdle() const {
-  // We only check |incoming_queue_|, since we don't want to lock |work_queue_|.
-  AutoLock lock(incoming_queue_lock_);
-  DCHECK(incoming_queue_.empty());
-}
-
 bool MessageLoop::is_running() const {
   DCHECK_EQ(this, current());
   return run_loop_ != NULL;
 }
 
+bool MessageLoop::IsHighResolutionTimerEnabledForTesting() {
+  return incoming_task_queue_->IsHighResolutionTimerEnabledForTesting();
+}
+
+bool MessageLoop::IsIdleForTesting() {
+  // We only check the imcoming queue|, since we don't want to lock the work
+  // queue.
+  return incoming_task_queue_->IsIdleForTesting();
+}
+
+void MessageLoop::LockWaitUnLockForTesting(WaitableEvent* caller_wait,
+                                           WaitableEvent* caller_signal) {
+  incoming_task_queue_->LockWaitUnLockForTesting(caller_wait, caller_signal);
+}
+
 //------------------------------------------------------------------------------
 
 // Runs the loop in two different SEH modes:
@@ -470,7 +451,7 @@
       tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally);
 
   TRACE_EVENT_FLOW_END1("task", "MessageLoop::PostTask",
-      TRACE_ID_MANGLE(GetTaskTraceID(pending_task, this)),
+      TRACE_ID_MANGLE(GetTaskTraceID(pending_task)),
       "queue_duration",
       (start_time - pending_task.EffectiveTimePosted()).InMilliseconds());
   TRACE_EVENT2("task", "MessageLoop::RunTask",
@@ -523,24 +504,6 @@
   delayed_work_queue_.push(pending_task);
 }
 
-void MessageLoop::ReloadWorkQueue() {
-  // We can improve performance of our loading tasks from incoming_queue_ to
-  // work_queue_ by waiting until the last minute (work_queue_ is empty) to
-  // load.  That reduces the number of locks-per-task significantly when our
-  // queues get large.
-  if (!work_queue_.empty())
-    return;  // Wait till we *really* need to lock and load.
-
-  // Acquire all we can from the inter-thread queue with one lock acquisition.
-  {
-    AutoLock lock(incoming_queue_lock_);
-    if (incoming_queue_.empty())
-      return;
-    incoming_queue_.Swap(&work_queue_);  // Constant time
-    DCHECK(incoming_queue_.empty());
-  }
-}
-
 bool MessageLoop::DeletePendingTasks() {
   bool did_work = !work_queue_.empty();
   while (!work_queue_.empty()) {
@@ -570,87 +533,25 @@
   return did_work;
 }
 
-TimeTicks MessageLoop::CalculateDelayedRuntime(TimeDelta delay) {
-  TimeTicks delayed_run_time;
-  if (delay > TimeDelta()) {
-    delayed_run_time = TimeTicks::Now() + delay;
-
-#if defined(OS_WIN)
-    if (high_resolution_timer_expiration_.is_null()) {
-      // Windows timers are granular to 15.6ms.  If we only set high-res
-      // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms,
-      // which as a percentage is pretty inaccurate.  So enable high
-      // res timers for any timer which is within 2x of the granularity.
-      // This is a tradeoff between accuracy and power management.
-      bool needs_high_res_timers = delay.InMilliseconds() <
-          (2 * Time::kMinLowResolutionThresholdMs);
-      if (needs_high_res_timers) {
-        if (Time::ActivateHighResolutionTimer(true)) {
-          high_resolution_timer_expiration_ = TimeTicks::Now() +
-              TimeDelta::FromMilliseconds(kHighResolutionTimerModeLeaseTimeMs);
-        }
-      }
-    }
-#endif
-  } else {
-    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
-  }
-
-#if defined(OS_WIN)
-  if (!high_resolution_timer_expiration_.is_null()) {
-    if (TimeTicks::Now() > high_resolution_timer_expiration_) {
-      Time::ActivateHighResolutionTimer(false);
-      high_resolution_timer_expiration_ = TimeTicks();
-    }
-  }
-#endif
-
-  return delayed_run_time;
+uint64 MessageLoop::GetTaskTraceID(const PendingTask& task) {
+  return (static_cast<uint64>(task.sequence_num) << 32) |
+         static_cast<uint64>(reinterpret_cast<intptr_t>(this));
 }
 
-// Possibly called on a background thread!
-bool MessageLoop::AddToIncomingQueue(PendingTask* pending_task,
-                                     bool use_try_lock) {
-  // Warning: Don't try to short-circuit, and handle this thread's tasks more
-  // directly, as it could starve handling of foreign threads.  Put every task
-  // into this queue.
+void MessageLoop::ReloadWorkQueue() {
+  // We can improve performance of our loading tasks from the incoming queue to
+  // |*work_queue| by waiting until the last minute (|*work_queue| is empty) to
+  // load. That reduces the number of locks-per-task significantly when our
+  // queues get large.
+  if (work_queue_.empty())
+    incoming_task_queue_->ReloadWorkQueue(&work_queue_);
+}
 
-  scoped_refptr<MessagePump> pump;
-  {
-    if (use_try_lock) {
-      if (!incoming_queue_lock_.Try()) {
-        pending_task->task.Reset();
-        return false;
-      }
-    } else {
-      incoming_queue_lock_.Acquire();
-    }
-    AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
-    // Initialize the sequence number. The sequence number is used for delayed
-    // tasks (to faciliate FIFO sorting when two tasks have the same
-    // delayed_run_time value) and for identifying the task in about:tracing.
-    pending_task->sequence_num = next_sequence_num_++;
-
-    TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
-        TRACE_ID_MANGLE(GetTaskTraceID(*pending_task, this)));
-
-    bool was_empty = incoming_queue_.empty();
-    incoming_queue_.push(*pending_task);
-    pending_task->task.Reset();
-    // The Android UI message loop needs to get notified each time
-    // a task is added to the incoming queue.
-    if (!was_empty && !AlwaysNotifyPump(type_))
-      return true;  // Someone else should have started the sub-pump.
-
-    pump = pump_;
-  }
-  // Since the incoming_queue_ may contain a task that destroys this message
-  // loop, we cannot exit incoming_queue_lock_ until we are done with |this|.
-  // We use a stack-based reference to the message pump so that we can call
-  // ScheduleWork outside of incoming_queue_lock_.
-
-  pump->ScheduleWork();
-  return true;
+void MessageLoop::ScheduleWork(bool was_empty) {
+  // The Android UI message loop needs to get notified each time
+  // a task is added to the incoming queue.
+  if (was_empty || AlwaysNotifyPump(type_))
+    pump_->ScheduleWork();
 }
 
 //------------------------------------------------------------------------------