Reland (3rd try): Lazily initialize MessageLoop for faster thread startup

Original review: https://codereview.chromium.org/1011683002/
2nd try: https://codereview.chromium.org/1129953004/

2nd try reverted due to race reports on Linux:
https://crbug.com/489263 Data races on valid_thread_id_ after r330329

This fixes:
- Race in MessageLoopProxyImpl by introducing lock
- Race in BrowserMainLoop/BrowserThreadImpl, where BrowserThread::CurrentlyOn()
  called on one of BrowserThreads tries to touch other thread's message_loop()
  via global thread table.

Reg: the latter race, the code flow that causes this race is like following:

  // On the main thread, we create all known browser threads:
  for (...) {
    {
      AutoLock lock(g_lock);
      g_threads[id] = new BrowserProcessSubThread();
    }
    // [A] This initializes the thread's message_loop, which causes a race
    // against [B] in the new code because new threads can start running
    // immediately.
    thread->StartWithOptions();
  }

  // On the new thread's main function, it calls CurrentlyOn() which does:
  {
    // [B] This touches other thread's Thread::message_loop.
    AutoLock lock(g_lock);
    return g_threads[other_thread_id] &&
           g_threads[other_thread_id]->message_loop() == MessageLoop::current();
  }

This was safe before because both message_loop initialization and the first
call to CurrentlyOn() on the new thread was done synchronously in
StartWithOptions() while the main thread was blocked. In the new code
new threads can start accessing message_loop() asynchronously while
the main thread's for loop is running.

PS1 is the original patch (2nd try) that got reverted.

BUG=465458, 489263

Review URL: https://codereview.chromium.org/1131513007

Cr-Commit-Position: refs/heads/master@{#331235}


CrOS-Libchrome-Original-Commit: 7f68f8735355c1c73557c3cfb294b441901fc31d
diff --git a/base/message_loop/incoming_task_queue.cc b/base/message_loop/incoming_task_queue.cc
index c1ce939..5e9a461 100644
--- a/base/message_loop/incoming_task_queue.cc
+++ b/base/message_loop/incoming_task_queue.cc
@@ -44,7 +44,8 @@
       message_loop_(message_loop),
       next_sequence_num_(0),
       message_loop_scheduled_(false),
-      always_schedule_work_(AlwaysNotifyPump(message_loop_->type())) {
+      always_schedule_work_(AlwaysNotifyPump(message_loop_->type())),
+      is_ready_for_scheduling_(false) {
 }
 
 bool IncomingTaskQueue::AddToIncomingQueue(
@@ -109,6 +110,15 @@
   message_loop_ = NULL;
 }
 
+void IncomingTaskQueue::StartScheduling() {
+  AutoLock lock(incoming_queue_lock_);
+  DCHECK(!is_ready_for_scheduling_);
+  DCHECK(!message_loop_scheduled_);
+  is_ready_for_scheduling_ = true;
+  if (!incoming_queue_.empty())
+    ScheduleWork();
+}
+
 IncomingTaskQueue::~IncomingTaskQueue() {
   // Verify that WillDestroyCurrentMessageLoop() has been called.
   DCHECK(!message_loop_);
@@ -148,19 +158,25 @@
   incoming_queue_.push(*pending_task);
   pending_task->task.Reset();
 
-  if (always_schedule_work_ || (!message_loop_scheduled_ && was_empty)) {
-    // Wake up the message loop.
-    message_loop_->ScheduleWork();
-    // After we've scheduled the message loop, we do not need to do so again
-    // until we know it has processed all of the work in our queue and is
-    // waiting for more work again. The message loop will always attempt to
-    // reload from the incoming queue before waiting again so we clear this flag
-    // in ReloadWorkQueue().
-    message_loop_scheduled_ = true;
+  if (is_ready_for_scheduling_ &&
+      (always_schedule_work_ || (!message_loop_scheduled_ && was_empty))) {
+    ScheduleWork();
   }
 
   return true;
 }
 
+void IncomingTaskQueue::ScheduleWork() {
+  DCHECK(is_ready_for_scheduling_);
+  // Wake up the message loop.
+  message_loop_->ScheduleWork();
+  // After we've scheduled the message loop, we do not need to do so again
+  // until we know it has processed all of the work in our queue and is
+  // waiting for more work again. The message loop will always attempt to
+  // reload from the incoming queue before waiting again so we clear this flag
+  // in ReloadWorkQueue().
+  message_loop_scheduled_ = true;
+}
+
 }  // namespace internal
 }  // namespace base
diff --git a/base/message_loop/incoming_task_queue.h b/base/message_loop/incoming_task_queue.h
index 72e1f30..7dd1e82 100644
--- a/base/message_loop/incoming_task_queue.h
+++ b/base/message_loop/incoming_task_queue.h
@@ -53,6 +53,10 @@
   // Disconnects |this| from the parent message loop.
   void WillDestroyCurrentMessageLoop();
 
+  // This should be called when the message loop becomes ready for
+  // scheduling work.
+  void StartScheduling();
+
  private:
   friend class RefCountedThreadSafe<IncomingTaskQueue>;
   virtual ~IncomingTaskQueue();
@@ -66,6 +70,9 @@
   // does not retain |pending_task->task| beyond this function call.
   bool PostPendingTask(PendingTask* pending_task);
 
+  // Wakes up the message loop and schedules work.
+  void ScheduleWork();
+
   // Number of tasks that require high resolution timing. This value is kept
   // so that ReloadWorkQueue() completes in constant time.
   int high_res_task_count_;
@@ -92,6 +99,9 @@
   // if the incoming queue was not empty.
   const bool always_schedule_work_;
 
+  // False until StartScheduling() is called.
+  bool is_ready_for_scheduling_;
+
   DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
 };
 
diff --git a/base/message_loop/message_loop.cc b/base/message_loop/message_loop.cc
index eb0f968..4222c77 100644
--- a/base/message_loop/message_loop.cc
+++ b/base/message_loop/message_loop.cc
@@ -100,6 +100,10 @@
 }
 #endif  // !defined(OS_NACL_SFI)
 
+scoped_ptr<MessagePump> ReturnPump(scoped_ptr<MessagePump> pump) {
+  return pump;
+}
+
 }  // namespace
 
 //------------------------------------------------------------------------------
@@ -116,41 +120,19 @@
 //------------------------------------------------------------------------------
 
 MessageLoop::MessageLoop(Type type)
-    : type_(type),
-#if defined(OS_WIN)
-      pending_high_res_tasks_(0),
-      in_high_res_mode_(false),
-#endif
-      nestable_tasks_allowed_(true),
-#if defined(OS_WIN)
-      os_modal_loop_(false),
-#endif  // OS_WIN
-      message_histogram_(NULL),
-      run_loop_(NULL) {
-  Init();
-
-  pump_ = CreateMessagePumpForType(type).Pass();
+    : MessageLoop(type, MessagePumpFactoryCallback()) {
+  BindToCurrentThread();
 }
 
 MessageLoop::MessageLoop(scoped_ptr<MessagePump> pump)
-    : pump_(pump.Pass()),
-      type_(TYPE_CUSTOM),
-#if defined(OS_WIN)
-      pending_high_res_tasks_(0),
-      in_high_res_mode_(false),
-#endif
-      nestable_tasks_allowed_(true),
-#if defined(OS_WIN)
-      os_modal_loop_(false),
-#endif  // OS_WIN
-      message_histogram_(NULL),
-      run_loop_(NULL) {
-  DCHECK(pump_.get());
-  Init();
+    : MessageLoop(TYPE_CUSTOM, Bind(&ReturnPump, Passed(&pump))) {
+  BindToCurrentThread();
 }
 
 MessageLoop::~MessageLoop() {
-  DCHECK_EQ(this, current());
+  // current() could be NULL if this message loop is destructed before it is
+  // bound to a thread.
+  DCHECK(current() == this || !current());
 
   // iOS just attaches to the loop, it doesn't Run it.
   // TODO(stuartmorgan): Consider wiring up a Detach().
@@ -299,11 +281,13 @@
 }
 
 void MessageLoop::Run() {
+  DCHECK(pump_);
   RunLoop run_loop;
   run_loop.Run();
 }
 
 void MessageLoop::RunUntilIdle() {
+  DCHECK(pump_);
   RunLoop run_loop;
   run_loop.RunUntilIdle();
 }
@@ -383,13 +367,43 @@
 
 //------------------------------------------------------------------------------
 
-void MessageLoop::Init() {
+scoped_ptr<MessageLoop> MessageLoop::CreateUnbound(
+    Type type, MessagePumpFactoryCallback pump_factory) {
+  return make_scoped_ptr(new MessageLoop(type, pump_factory));
+}
+
+MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory)
+    : type_(type),
+#if defined(OS_WIN)
+      pending_high_res_tasks_(0),
+      in_high_res_mode_(false),
+#endif
+      nestable_tasks_allowed_(true),
+#if defined(OS_WIN)
+      os_modal_loop_(false),
+#endif  // OS_WIN
+      pump_factory_(pump_factory),
+      message_histogram_(NULL),
+      run_loop_(NULL),
+      incoming_task_queue_(new internal::IncomingTaskQueue(this)),
+      message_loop_proxy_(
+          new internal::MessageLoopProxyImpl(incoming_task_queue_)) {
+  // If type is TYPE_CUSTOM non-null pump_factory must be given.
+  DCHECK_EQ(type_ == TYPE_CUSTOM, !pump_factory_.is_null());
+}
+
+void MessageLoop::BindToCurrentThread() {
+  DCHECK(!pump_);
+  if (!pump_factory_.is_null())
+    pump_ = pump_factory_.Run();
+  else
+    pump_ = CreateMessagePumpForType(type_);
+
   DCHECK(!current()) << "should only have one message loop per thread";
   lazy_tls_ptr.Pointer()->Set(this);
 
-  incoming_task_queue_ = new internal::IncomingTaskQueue(this);
-  message_loop_proxy_ =
-      new internal::MessageLoopProxyImpl(incoming_task_queue_);
+  incoming_task_queue_->StartScheduling();
+  message_loop_proxy_->BindToCurrentThread();
   thread_task_runner_handle_.reset(
       new ThreadTaskRunnerHandle(message_loop_proxy_));
 }
diff --git a/base/message_loop/message_loop.h b/base/message_loop/message_loop.h
index fd7596a..f2f89d0 100644
--- a/base/message_loop/message_loop.h
+++ b/base/message_loop/message_loop.h
@@ -114,7 +114,8 @@
   explicit MessageLoop(Type type = TYPE_DEFAULT);
   // Creates a TYPE_CUSTOM MessageLoop with the supplied MessagePump, which must
   // be non-NULL.
-  explicit MessageLoop(scoped_ptr<base::MessagePump> pump);
+  explicit MessageLoop(scoped_ptr<MessagePump> pump);
+
   ~MessageLoop() override;
 
   // Returns the MessageLoop object for the current thread, or null if none.
@@ -394,10 +395,6 @@
   // Returns true if the message loop is "idle". Provided for testing.
   bool IsIdleForTesting();
 
-  // Wakes up the message pump. Can be called on any thread. The caller is
-  // responsible for synchronizing ScheduleWork() calls.
-  void ScheduleWork();
-
   // Returns the TaskAnnotator which is used to add debug information to posted
   // tasks.
   debug::TaskAnnotator* task_annotator() { return &task_annotator_; }
@@ -411,9 +408,33 @@
 
  private:
   friend class RunLoop;
+  friend class internal::IncomingTaskQueue;
+  friend class ScheduleWorkTest;
+  friend class Thread;
 
-  // Configures various members for the two constructors.
-  void Init();
+  using MessagePumpFactoryCallback = Callback<scoped_ptr<MessagePump>()>;
+
+  // Creates a MessageLoop without binding to a thread.
+  // If |type| is TYPE_CUSTOM non-null |pump_factory| must be also given
+  // to create a message pump for this message loop.  Otherwise a default
+  // message pump for the |type| is created.
+  //
+  // It is valid to call this to create a new message loop on one thread,
+  // and then pass it to the thread where the message loop actually runs.
+  // The message loop's BindToCurrentThread() method must be called on the
+  // thread the message loop runs on, before calling Run().
+  // Before BindToCurrentThread() is called only Post*Task() functions can
+  // be called on the message loop.
+  scoped_ptr<MessageLoop> CreateUnbound(
+      Type type,
+      MessagePumpFactoryCallback pump_factory);
+
+  // Common private constructor. Other constructors delegate the initialization
+  // to this constructor.
+  MessageLoop(Type type, MessagePumpFactoryCallback pump_factory);
+
+  // Configure various members and bind this message loop to the current thread.
+  void BindToCurrentThread();
 
   // Invokes the actual run loop using the message pump.
   void RunHandler();
@@ -437,6 +458,10 @@
   // empty.
   void ReloadWorkQueue();
 
+  // Wakes up the message pump. Can be called on any thread. The caller is
+  // responsible for synchronizing ScheduleWork() calls.
+  void ScheduleWork();
+
   // Start recording histogram info about events and action IF it was enabled
   // and IF the statistics recorder can accept a registration of our histogram.
   void StartHistogrammer();
@@ -490,6 +515,10 @@
   bool os_modal_loop_;
 #endif
 
+  // pump_factory_.Run() is called to create a message pump for this loop
+  // if type_ is TYPE_CUSTOM and pump_ is null.
+  MessagePumpFactoryCallback pump_factory_;
+
   std::string thread_name_;
   // A profiling histogram showing the counts of various messages and events.
   HistogramBase* message_histogram_;
diff --git a/base/message_loop/message_loop_proxy_impl.cc b/base/message_loop/message_loop_proxy_impl.cc
index b7abca3..580620d 100644
--- a/base/message_loop/message_loop_proxy_impl.cc
+++ b/base/message_loop/message_loop_proxy_impl.cc
@@ -15,7 +15,13 @@
 MessageLoopProxyImpl::MessageLoopProxyImpl(
     scoped_refptr<IncomingTaskQueue> incoming_queue)
     : incoming_queue_(incoming_queue),
-      valid_thread_id_(PlatformThread::CurrentId()) {
+      valid_thread_id_(kInvalidThreadId) {
+}
+
+void MessageLoopProxyImpl::BindToCurrentThread() {
+  AutoLock lock(valid_thread_id_lock_);
+  DCHECK_EQ(kInvalidThreadId, valid_thread_id_);
+  valid_thread_id_ = PlatformThread::CurrentId();
 }
 
 bool MessageLoopProxyImpl::PostDelayedTask(
@@ -35,6 +41,7 @@
 }
 
 bool MessageLoopProxyImpl::RunsTasksOnCurrentThread() const {
+  AutoLock lock(valid_thread_id_lock_);
   return valid_thread_id_ == PlatformThread::CurrentId();
 }
 
diff --git a/base/message_loop/message_loop_proxy_impl.h b/base/message_loop/message_loop_proxy_impl.h
index 0fe629f..fa611c2 100644
--- a/base/message_loop/message_loop_proxy_impl.h
+++ b/base/message_loop/message_loop_proxy_impl.h
@@ -9,6 +9,7 @@
 #include "base/memory/ref_counted.h"
 #include "base/message_loop/message_loop_proxy.h"
 #include "base/pending_task.h"
+#include "base/synchronization/lock.h"
 #include "base/threading/platform_thread.h"
 
 namespace base {
@@ -24,6 +25,9 @@
   explicit MessageLoopProxyImpl(
       scoped_refptr<IncomingTaskQueue> incoming_queue);
 
+  // Initialize this message loop proxy on the current thread.
+  void BindToCurrentThread();
+
   // MessageLoopProxy implementation
   bool PostDelayedTask(const tracked_objects::Location& from_here,
                        const base::Closure& task,
@@ -40,8 +44,10 @@
   // THe incoming queue receiving all posted tasks.
   scoped_refptr<IncomingTaskQueue> incoming_queue_;
 
-  // ID of the thread |this| was created on.
+  // ID of the thread |this| was created on.  Could be accessed on multiple
+  // threads, protected by |valid_thread_id_lock_|.
   PlatformThreadId valid_thread_id_;
+  mutable Lock valid_thread_id_lock_;
 
   DISALLOW_COPY_AND_ASSIGN(MessageLoopProxyImpl);
 };
diff --git a/base/message_loop/message_pump_perftest.cc b/base/message_loop/message_pump_perftest.cc
index b3e5604..0bf3d0c 100644
--- a/base/message_loop/message_pump_perftest.cc
+++ b/base/message_loop/message_pump_perftest.cc
@@ -20,7 +20,6 @@
 #endif
 
 namespace base {
-namespace {
 
 class ScheduleWorkTest : public testing::Test {
  public:
@@ -224,9 +223,6 @@
 }
 #endif
 
-static void DoNothing() {
-}
-
 class FakeMessagePump : public MessagePump {
  public:
   FakeMessagePump() {}
@@ -289,5 +285,4 @@
   Run(1000, 100);
 }
 
-}  // namespace
 }  // namespace base