Revert 212948 "Made MessagePump a non-thread safe class."

r212948 broke Mac Builder: http://build.chromium.org/p/chromium.memory/builders/Mac%20ASAN%20Builder/builds/25623

> Made MessagePump a non-thread safe class.
> 
> This CL makes MessagePump a non-thread safe class to make sure thread-bound resources (such as the UI window used for pumping messages on Windows) are freed on the correct thread.
> 
> Handling of incoming tasks and synchronization between different threads was moved out to a separate class - IncomingTaskQueue reducing the number of locks to be taken while posting a task to one. Posting tasks via both MessageLoop and MessageLoopProxyImpl is now routed via IncomingTaskQueue.
> 
> BUG=241939
> 
> Review URL: https://chromiumcodereview.appspot.com/17567007

TBR=alexeypa@chromium.org

Review URL: https://codereview.chromium.org/19737005

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@212952 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: c9904b429860b83606492d4fe8f1b232c04753b5
diff --git a/base/message_loop/message_loop.cc b/base/message_loop/message_loop.cc
index d2eafbd..bd3542a 100644
--- a/base/message_loop/message_loop.cc
+++ b/base/message_loop/message_loop.cc
@@ -13,6 +13,7 @@
 #include "base/lazy_instance.h"
 #include "base/logging.h"
 #include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop_proxy_impl.h"
 #include "base/message_loop/message_pump_default.h"
 #include "base/metrics/histogram.h"
 #include "base/metrics/statistics_recorder.h"
@@ -88,6 +89,14 @@
 
 MessageLoop::MessagePumpFactory* message_pump_for_ui_factory_ = NULL;
 
+// Create a process-wide unique ID to represent this task in trace events. This
+// will be mangled with a Process ID hash to reduce the likelyhood of colliding
+// with MessageLoop pointers on other processes.
+uint64 GetTaskTraceID(const PendingTask& task, MessageLoop* loop) {
+  return (static_cast<uint64>(task.sequence_num) << 32) |
+         static_cast<uint64>(reinterpret_cast<intptr_t>(loop));
+}
+
 // Returns true if MessagePump::ScheduleWork() must be called one
 // time for every task that is added to the MessageLoop incoming queue.
 bool AlwaysNotifyPump(MessageLoop::Type type) {
@@ -137,19 +146,18 @@
 
 MessageLoop::MessageLoop(Type type)
     : type_(type),
-      exception_restoration_(false),
       nestable_tasks_allowed_(true),
+      exception_restoration_(false),
+      message_histogram_(NULL),
+      run_loop_(NULL),
 #if defined(OS_WIN)
       os_modal_loop_(false),
 #endif  // OS_WIN
-      message_histogram_(NULL),
-      run_loop_(NULL) {
+      next_sequence_num_(0) {
   DCHECK(!current()) << "should only have one message loop per thread";
   lazy_tls_ptr.Pointer()->Set(this);
 
-  incoming_task_queue_ = new internal::IncomingTaskQueue(this);
-  message_loop_proxy_ =
-      new internal::MessageLoopProxyImpl(incoming_task_queue_);
+  message_loop_proxy_ = new MessageLoopProxyImpl();
   thread_task_runner_handle_.reset(
       new ThreadTaskRunnerHandle(message_loop_proxy_));
 
@@ -169,7 +177,7 @@
 #define MESSAGE_PUMP_UI NULL
 // ipc_channel_nacl.cc uses a worker thread to do socket reads currently, and
 // doesn't require extra support for watching file descriptors.
-#define MESSAGE_PUMP_IO new MessagePumpDefault()
+#define MESSAGE_PUMP_IO new MessagePumpDefault();
 #elif defined(OS_POSIX)  // POSIX but not MACOSX.
 #define MESSAGE_PUMP_UI new MessagePumpForUI()
 #define MESSAGE_PUMP_IO new MessagePumpLibevent()
@@ -179,14 +187,14 @@
 
   if (type_ == TYPE_UI) {
     if (message_pump_for_ui_factory_)
-      pump_.reset(message_pump_for_ui_factory_());
+      pump_ = message_pump_for_ui_factory_();
     else
-      pump_.reset(MESSAGE_PUMP_UI);
+      pump_ = MESSAGE_PUMP_UI;
   } else if (type_ == TYPE_IO) {
-    pump_.reset(MESSAGE_PUMP_IO);
+    pump_ = MESSAGE_PUMP_IO;
   } else {
     DCHECK_EQ(TYPE_DEFAULT, type_);
-    pump_.reset(new MessagePumpDefault());
+    pump_ = new MessagePumpDefault();
   }
 }
 
@@ -218,13 +226,23 @@
 
   thread_task_runner_handle_.reset();
 
-  // Tell the incoming queue that we are dying.
-  incoming_task_queue_->WillDestroyCurrentMessageLoop();
-  incoming_task_queue_ = NULL;
+  // Tell the message_loop_proxy that we are dying.
+  static_cast<MessageLoopProxyImpl*>(message_loop_proxy_.get())->
+      WillDestroyCurrentMessageLoop();
   message_loop_proxy_ = NULL;
 
   // OK, now make it so that no one can find us.
   lazy_tls_ptr.Pointer()->Set(NULL);
+
+#if defined(OS_WIN)
+  // If we left the high-resolution timer activated, deactivate it now.
+  // Doing this is not-critical, it is mainly to make sure we track
+  // the high resolution timer activations properly in our unit tests.
+  if (!high_resolution_timer_expiration_.is_null()) {
+    Time::ActivateHighResolutionTimer(false);
+    high_resolution_timer_expiration_ = TimeTicks();
+  }
+#endif
 }
 
 // static
@@ -265,14 +283,18 @@
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 bool MessageLoop::TryPostTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  return incoming_task_queue_->TryAddToIncomingQueue(from_here, task);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), true);
+  return AddToIncomingQueue(&pending_task, true);
 }
 
 void MessageLoop::PostDelayedTask(
@@ -280,14 +302,18 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(delay), true);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 void MessageLoop::PostNonNestableTask(
     const tracked_objects::Location& from_here,
     const Closure& task) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(TimeDelta()), false);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 void MessageLoop::PostNonNestableDelayedTask(
@@ -295,7 +321,9 @@
     const Closure& task,
     TimeDelta delay) {
   DCHECK(!task.is_null()) << from_here.ToString();
-  incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false);
+  PendingTask pending_task(
+      from_here, task, CalculateDelayedRuntime(delay), false);
+  AddToIncomingQueue(&pending_task, false);
 }
 
 void MessageLoop::Run() {
@@ -367,26 +395,17 @@
   task_observers_.RemoveObserver(task_observer);
 }
 
+void MessageLoop::AssertIdle() const {
+  // We only check |incoming_queue_|, since we don't want to lock |work_queue_|.
+  AutoLock lock(incoming_queue_lock_);
+  DCHECK(incoming_queue_.empty());
+}
+
 bool MessageLoop::is_running() const {
   DCHECK_EQ(this, current());
   return run_loop_ != NULL;
 }
 
-bool MessageLoop::IsHighResolutionTimerEnabledForTesting() {
-  return incoming_task_queue_->IsHighResolutionTimerEnabledForTesting();
-}
-
-bool MessageLoop::IsIdleForTesting() {
-  // We only check the imcoming queue|, since we don't want to lock the work
-  // queue.
-  return incoming_task_queue_->IsIdleForTesting();
-}
-
-void MessageLoop::LockWaitUnLockForTesting(WaitableEvent* caller_wait,
-                                           WaitableEvent* caller_signal) {
-  incoming_task_queue_->LockWaitUnLockForTesting(caller_wait, caller_signal);
-}
-
 //------------------------------------------------------------------------------
 
 // Runs the loop in two different SEH modes:
@@ -451,7 +470,7 @@
       tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally);
 
   TRACE_EVENT_FLOW_END1("task", "MessageLoop::PostTask",
-      TRACE_ID_MANGLE(GetTaskTraceID(pending_task)),
+      TRACE_ID_MANGLE(GetTaskTraceID(pending_task, this)),
       "queue_duration",
       (start_time - pending_task.EffectiveTimePosted()).InMilliseconds());
   TRACE_EVENT2("task", "MessageLoop::RunTask",
@@ -504,6 +523,24 @@
   delayed_work_queue_.push(pending_task);
 }
 
+void MessageLoop::ReloadWorkQueue() {
+  // We can improve performance of our loading tasks from incoming_queue_ to
+  // work_queue_ by waiting until the last minute (work_queue_ is empty) to
+  // load.  That reduces the number of locks-per-task significantly when our
+  // queues get large.
+  if (!work_queue_.empty())
+    return;  // Wait till we *really* need to lock and load.
+
+  // Acquire all we can from the inter-thread queue with one lock acquisition.
+  {
+    AutoLock lock(incoming_queue_lock_);
+    if (incoming_queue_.empty())
+      return;
+    incoming_queue_.Swap(&work_queue_);  // Constant time
+    DCHECK(incoming_queue_.empty());
+  }
+}
+
 bool MessageLoop::DeletePendingTasks() {
   bool did_work = !work_queue_.empty();
   while (!work_queue_.empty()) {
@@ -533,25 +570,87 @@
   return did_work;
 }
 
-uint64 MessageLoop::GetTaskTraceID(const PendingTask& task) {
-  return (static_cast<uint64>(task.sequence_num) << 32) |
-         static_cast<uint64>(reinterpret_cast<intptr_t>(this));
+TimeTicks MessageLoop::CalculateDelayedRuntime(TimeDelta delay) {
+  TimeTicks delayed_run_time;
+  if (delay > TimeDelta()) {
+    delayed_run_time = TimeTicks::Now() + delay;
+
+#if defined(OS_WIN)
+    if (high_resolution_timer_expiration_.is_null()) {
+      // Windows timers are granular to 15.6ms.  If we only set high-res
+      // timers for those under 15.6ms, then a 18ms timer ticks at ~32ms,
+      // which as a percentage is pretty inaccurate.  So enable high
+      // res timers for any timer which is within 2x of the granularity.
+      // This is a tradeoff between accuracy and power management.
+      bool needs_high_res_timers = delay.InMilliseconds() <
+          (2 * Time::kMinLowResolutionThresholdMs);
+      if (needs_high_res_timers) {
+        if (Time::ActivateHighResolutionTimer(true)) {
+          high_resolution_timer_expiration_ = TimeTicks::Now() +
+              TimeDelta::FromMilliseconds(kHighResolutionTimerModeLeaseTimeMs);
+        }
+      }
+    }
+#endif
+  } else {
+    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
+  }
+
+#if defined(OS_WIN)
+  if (!high_resolution_timer_expiration_.is_null()) {
+    if (TimeTicks::Now() > high_resolution_timer_expiration_) {
+      Time::ActivateHighResolutionTimer(false);
+      high_resolution_timer_expiration_ = TimeTicks();
+    }
+  }
+#endif
+
+  return delayed_run_time;
 }
 
-void MessageLoop::ReloadWorkQueue() {
-  // We can improve performance of our loading tasks from the incoming queue to
-  // |*work_queue| by waiting until the last minute (|*work_queue| is empty) to
-  // load. That reduces the number of locks-per-task significantly when our
-  // queues get large.
-  if (work_queue_.empty())
-    incoming_task_queue_->ReloadWorkQueue(&work_queue_);
-}
+// Possibly called on a background thread!
+bool MessageLoop::AddToIncomingQueue(PendingTask* pending_task,
+                                     bool use_try_lock) {
+  // Warning: Don't try to short-circuit, and handle this thread's tasks more
+  // directly, as it could starve handling of foreign threads.  Put every task
+  // into this queue.
 
-void MessageLoop::ScheduleWork(bool was_empty) {
-  // The Android UI message loop needs to get notified each time
-  // a task is added to the incoming queue.
-  if (was_empty || AlwaysNotifyPump(type_))
-    pump_->ScheduleWork();
+  scoped_refptr<MessagePump> pump;
+  {
+    if (use_try_lock) {
+      if (!incoming_queue_lock_.Try()) {
+        pending_task->task.Reset();
+        return false;
+      }
+    } else {
+      incoming_queue_lock_.Acquire();
+    }
+    AutoLock locked(incoming_queue_lock_, AutoLock::AlreadyAcquired());
+    // Initialize the sequence number. The sequence number is used for delayed
+    // tasks (to faciliate FIFO sorting when two tasks have the same
+    // delayed_run_time value) and for identifying the task in about:tracing.
+    pending_task->sequence_num = next_sequence_num_++;
+
+    TRACE_EVENT_FLOW_BEGIN0("task", "MessageLoop::PostTask",
+        TRACE_ID_MANGLE(GetTaskTraceID(*pending_task, this)));
+
+    bool was_empty = incoming_queue_.empty();
+    incoming_queue_.push(*pending_task);
+    pending_task->task.Reset();
+    // The Android UI message loop needs to get notified each time
+    // a task is added to the incoming queue.
+    if (!was_empty && !AlwaysNotifyPump(type_))
+      return true;  // Someone else should have started the sub-pump.
+
+    pump = pump_;
+  }
+  // Since the incoming_queue_ may contain a task that destroys this message
+  // loop, we cannot exit incoming_queue_lock_ until we are done with |this|.
+  // We use a stack-based reference to the message pump so that we can call
+  // ScheduleWork outside of incoming_queue_lock_.
+
+  pump->ScheduleWork();
+  return true;
 }
 
 //------------------------------------------------------------------------------