Microbenchmark for the cost of posting tasks to different pump types

This adds microbenchmarks to measure the cost of posting a task to
a message loop running different pump types. This measures the wall and
(where supported) thread time that elapses from different numbers of
threads posting to one target. This is not designed to measure the time
taken by the target thread or fairness of posting threads.

Each test is set up with one target thread and 1-4 posting threads. The
posting threads each run one task that posts batches of tasks to the
target thread until they have been working for at least 5 seconds of
wall time. The tasks on the target thread simply increment a counter.
The test runner starts each posting thread, posts a start task to each
then joins them and then joins the target thread and aggregates stats.

BUG=412137

Review URL: https://codereview.chromium.org/551183002

Cr-Commit-Position: refs/heads/master@{#297264}


CrOS-Libchrome-Original-Commit: 2e146d74f78041f91bd43562bd1a0210374a41eb
diff --git a/base/android/java/src/org/chromium/base/JavaHandlerThread.java b/base/android/java/src/org/chromium/base/JavaHandlerThread.java
index 441c509..9643a63 100644
--- a/base/android/java/src/org/chromium/base/JavaHandlerThread.java
+++ b/base/android/java/src/org/chromium/base/JavaHandlerThread.java
@@ -35,5 +35,17 @@
         });
     }
 
+    @CalledByNative
+    private void stop(final long nativeThread, final long nativeEvent) {
+        new Handler(mThread.getLooper()).post(new Runnable() {
+            @Override
+            public void run() {
+                nativeStopThread(nativeThread, nativeEvent);
+            }
+        });
+        mThread.quitSafely();
+    }
+
     private native void nativeInitializeThread(long nativeJavaHandlerThread, long nativeEvent);
+    private native void nativeStopThread(long nativeJavaHandlerThread, long nativeEvent);
 }
diff --git a/base/android/java_handler_thread.cc b/base/android/java_handler_thread.cc
index 18e1440..117971e 100644
--- a/base/android/java_handler_thread.cc
+++ b/base/android/java_handler_thread.cc
@@ -44,6 +44,15 @@
 }
 
 void JavaHandlerThread::Stop() {
+  JNIEnv* env = base::android::AttachCurrentThread();
+  base::WaitableEvent shutdown_event(false, false);
+  Java_JavaHandlerThread_stop(env,
+                              java_thread_.obj(),
+                              reinterpret_cast<intptr_t>(this),
+                              reinterpret_cast<intptr_t>(&shutdown_event));
+  // Wait for thread to shut down before returning.
+  base::ThreadRestrictions::ScopedAllowWait wait_allowed;
+  shutdown_event.Wait();
 }
 
 void JavaHandlerThread::InitializeThread(JNIEnv* env, jobject obj,
@@ -54,6 +63,11 @@
   reinterpret_cast<base::WaitableEvent*>(event)->Signal();
 }
 
+void JavaHandlerThread::StopThread(JNIEnv* env, jobject obj, jlong event) {
+  static_cast<MessageLoopForUI*>(message_loop_.get())->Quit();
+  reinterpret_cast<base::WaitableEvent*>(event)->Signal();
+}
+
 // static
 bool JavaHandlerThread::RegisterBindings(JNIEnv* env) {
   return RegisterNativesImpl(env);
diff --git a/base/android/java_handler_thread.h b/base/android/java_handler_thread.h
index 5670f7d..7cd274d 100644
--- a/base/android/java_handler_thread.h
+++ b/base/android/java_handler_thread.h
@@ -34,6 +34,7 @@
   // Called from java on the newly created thread.
   // Start() will not return before this methods has finished.
   void InitializeThread(JNIEnv* env, jobject obj, jlong event);
+  void StopThread(JNIEnv* env, jobject obj, jlong event);
 
   static bool RegisterBindings(JNIEnv* env);
 
diff --git a/base/base.gyp b/base/base.gyp
index 59f6b2f..c45ccc7 100644
--- a/base/base.gyp
+++ b/base/base.gyp
@@ -816,6 +816,7 @@
       ],
       'sources': [
         'threading/thread_perftest.cc',
+        'message_loop/message_pump_perftest.cc',
         'test/run_all_unittests.cc',
         '../testing/perf/perf_test.cc'
       ],
diff --git a/base/message_loop/message_loop.h b/base/message_loop/message_loop.h
index 8db1e77..3cec37e 100644
--- a/base/message_loop/message_loop.h
+++ b/base/message_loop/message_loop.h
@@ -391,12 +391,22 @@
   // Returns true if the message loop is "idle". Provided for testing.
   bool IsIdleForTesting();
 
+  // Wakes up the message pump. Can be called on any thread. The caller is
+  // responsible for synchronizing ScheduleWork() calls.
+  void ScheduleWork(bool was_empty);
+
+  // Returns the TaskAnnotator which is used to add debug information to posted
+  // tasks.
+  debug::TaskAnnotator* task_annotator() { return &task_annotator_; }
+
+  // Runs the specified PendingTask.
+  void RunTask(const PendingTask& pending_task);
+
   //----------------------------------------------------------------------------
  protected:
   scoped_ptr<MessagePump> pump_;
 
  private:
-  friend class internal::IncomingTaskQueue;
   friend class RunLoop;
 
   // Configures various members for the two constructors.
@@ -408,9 +418,6 @@
   // Called to process any delayed non-nestable tasks.
   bool ProcessNextDelayedNonNestableTask();
 
-  // Runs the specified PendingTask.
-  void RunTask(const PendingTask& pending_task);
-
   // Calls RunTask or queues the pending_task on the deferred task list if it
   // cannot be run right now.  Returns true if the task was run.
   bool DeferOrRunPendingTask(const PendingTask& pending_task);
@@ -423,18 +430,10 @@
   // true if some work was done.
   bool DeletePendingTasks();
 
-  // Returns the TaskAnnotator which is used to add debug information to posted
-  // tasks.
-  debug::TaskAnnotator* task_annotator() { return &task_annotator_; }
-
   // Loads tasks from the incoming queue to |work_queue_| if the latter is
   // empty.
   void ReloadWorkQueue();
 
-  // Wakes up the message pump. Can be called on any thread. The caller is
-  // responsible for synchronizing ScheduleWork() calls.
-  void ScheduleWork(bool was_empty);
-
   // Start recording histogram info about events and action IF it was enabled
   // and IF the statistics recorder can accept a registration of our histogram.
   void StartHistogrammer();
diff --git a/base/message_loop/message_pump_perftest.cc b/base/message_loop/message_pump_perftest.cc
new file mode 100644
index 0000000..7c7b400
--- /dev/null
+++ b/base/message_loop/message_pump_perftest.cc
@@ -0,0 +1,294 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/bind.h"
+#include "base/format_macros.h"
+#include "base/memory/scoped_vector.h"
+#include "base/strings/stringprintf.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/thread.h"
+#include "base/time/time.h"
+#include "build/build_config.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "testing/perf/perf_test.h"
+
+#if defined(OS_ANDROID)
+#include "base/android/java_handler_thread.h"
+#endif
+
+namespace base {
+namespace {
+
+class ScheduleWorkTest : public testing::Test {
+ public:
+  ScheduleWorkTest() : counter_(0) {}
+
+  void Increment(uint64_t amount) { counter_ += amount; }
+
+  void Schedule(int index) {
+    base::TimeTicks start = base::TimeTicks::HighResNow();
+    base::TimeTicks thread_start;
+    if (TimeTicks::IsThreadNowSupported())
+      thread_start = base::TimeTicks::ThreadNow();
+    base::TimeDelta minimum = base::TimeDelta::Max();
+    base::TimeDelta maximum = base::TimeDelta();
+    base::TimeTicks now, lastnow = start;
+    uint64_t schedule_calls = 0u;
+    do {
+      for (size_t i = 0; i < kBatchSize; ++i) {
+        target_message_loop()->ScheduleWork(true);
+        schedule_calls++;
+      }
+      now = base::TimeTicks::HighResNow();
+      base::TimeDelta laptime = now - lastnow;
+      lastnow = now;
+      minimum = std::min(minimum, laptime);
+      maximum = std::max(maximum, laptime);
+    } while (now - start < base::TimeDelta::FromSeconds(kTargetTimeSec));
+
+    scheduling_times_[index] = now - start;
+    if (TimeTicks::IsThreadNowSupported())
+      scheduling_thread_times_[index] =
+          base::TimeTicks::ThreadNow() - thread_start;
+    min_batch_times_[index] = minimum;
+    max_batch_times_[index] = maximum;
+    target_message_loop()->PostTask(FROM_HERE,
+                                    base::Bind(&ScheduleWorkTest::Increment,
+                                               base::Unretained(this),
+                                               schedule_calls));
+  }
+
+  void ScheduleWork(MessageLoop::Type target_type, int num_scheduling_threads) {
+#if defined(OS_ANDROID)
+    if (target_type == MessageLoop::TYPE_JAVA) {
+      java_thread_.reset(new android::JavaHandlerThread("target"));
+      java_thread_->Start();
+    } else
+#endif
+    {
+      target_.reset(new Thread("target"));
+      target_->StartWithOptions(Thread::Options(target_type, 0u));
+    }
+
+    ScopedVector<Thread> scheduling_threads;
+    scheduling_times_.reset(new base::TimeDelta[num_scheduling_threads]);
+    scheduling_thread_times_.reset(new base::TimeDelta[num_scheduling_threads]);
+    min_batch_times_.reset(new base::TimeDelta[num_scheduling_threads]);
+    max_batch_times_.reset(new base::TimeDelta[num_scheduling_threads]);
+
+    for (int i = 0; i < num_scheduling_threads; ++i) {
+      scheduling_threads.push_back(new Thread("posting thread"));
+      scheduling_threads[i]->Start();
+    }
+
+    for (int i = 0; i < num_scheduling_threads; ++i) {
+      scheduling_threads[i]->message_loop()->PostTask(
+          FROM_HERE,
+          base::Bind(&ScheduleWorkTest::Schedule, base::Unretained(this), i));
+    }
+
+    for (int i = 0; i < num_scheduling_threads; ++i) {
+      scheduling_threads[i]->Stop();
+    }
+#if defined(OS_ANDROID)
+    if (target_type == MessageLoop::TYPE_JAVA) {
+      java_thread_->Stop();
+      java_thread_.reset();
+    } else
+#endif
+    {
+      target_->Stop();
+      target_.reset();
+    }
+    base::TimeDelta total_time;
+    base::TimeDelta total_thread_time;
+    base::TimeDelta min_batch_time = base::TimeDelta::Max();
+    base::TimeDelta max_batch_time = base::TimeDelta();
+    for (int i = 0; i < num_scheduling_threads; ++i) {
+      total_time += scheduling_times_[i];
+      total_thread_time += scheduling_thread_times_[i];
+      min_batch_time = std::min(min_batch_time, min_batch_times_[i]);
+      max_batch_time = std::max(max_batch_time, max_batch_times_[i]);
+    }
+    std::string trace = StringPrintf(
+        "%d_threads_scheduling_to_%s_pump",
+        num_scheduling_threads,
+        target_type == MessageLoop::TYPE_IO
+            ? "io"
+            : (target_type == MessageLoop::TYPE_UI ? "ui" : "default"));
+    perf_test::PrintResult(
+        "task",
+        "",
+        trace,
+        total_time.InMicroseconds() / static_cast<double>(counter_),
+        "us/task",
+        true);
+    perf_test::PrintResult(
+        "task",
+        "_min_batch_time",
+        trace,
+        min_batch_time.InMicroseconds() / static_cast<double>(kBatchSize),
+        "us/task",
+        false);
+    perf_test::PrintResult(
+        "task",
+        "_max_batch_time",
+        trace,
+        max_batch_time.InMicroseconds() / static_cast<double>(kBatchSize),
+        "us/task",
+        false);
+    if (TimeTicks::IsThreadNowSupported()) {
+      perf_test::PrintResult(
+          "task",
+          "_thread_time",
+          trace,
+          total_thread_time.InMicroseconds() / static_cast<double>(counter_),
+          "us/task",
+          true);
+    }
+  }
+
+  MessageLoop* target_message_loop() {
+#if defined(OS_ANDROID)
+    if (java_thread_)
+      return java_thread_->message_loop();
+#endif
+    return target_->message_loop();
+  }
+
+ private:
+  scoped_ptr<Thread> target_;
+#if defined(OS_ANDROID)
+  scoped_ptr<android::JavaHandlerThread> java_thread_;
+#endif
+  scoped_ptr<base::TimeDelta[]> scheduling_times_;
+  scoped_ptr<base::TimeDelta[]> scheduling_thread_times_;
+  scoped_ptr<base::TimeDelta[]> min_batch_times_;
+  scoped_ptr<base::TimeDelta[]> max_batch_times_;
+  uint64_t counter_;
+
+  static const size_t kTargetTimeSec = 5;
+  static const size_t kBatchSize = 1000;
+};
+
+TEST_F(ScheduleWorkTest, ThreadTimeToIOFromOneThread) {
+  ScheduleWork(MessageLoop::TYPE_IO, 1);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToIOFromTwoThreads) {
+  ScheduleWork(MessageLoop::TYPE_IO, 2);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToIOFromFourThreads) {
+  ScheduleWork(MessageLoop::TYPE_IO, 4);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToUIFromOneThread) {
+  ScheduleWork(MessageLoop::TYPE_UI, 1);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToUIFromTwoThreads) {
+  ScheduleWork(MessageLoop::TYPE_UI, 2);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToUIFromFourThreads) {
+  ScheduleWork(MessageLoop::TYPE_UI, 4);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToDefaultFromOneThread) {
+  ScheduleWork(MessageLoop::TYPE_DEFAULT, 1);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToDefaultFromTwoThreads) {
+  ScheduleWork(MessageLoop::TYPE_DEFAULT, 2);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToDefaultFromFourThreads) {
+  ScheduleWork(MessageLoop::TYPE_DEFAULT, 4);
+}
+
+#if defined(OS_ANDROID)
+TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromOneThread) {
+  ScheduleWork(MessageLoop::TYPE_JAVA, 1);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromTwoThreads) {
+  ScheduleWork(MessageLoop::TYPE_JAVA, 2);
+}
+
+TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromFourThreads) {
+  ScheduleWork(MessageLoop::TYPE_JAVA, 4);
+}
+#endif
+
+static void DoNothing() {
+}
+
+class FakeMessagePump : public MessagePump {
+ public:
+  FakeMessagePump() {}
+  virtual ~FakeMessagePump() {}
+
+  virtual void Run(Delegate* delegate) OVERRIDE {}
+
+  virtual void Quit() OVERRIDE {}
+  virtual void ScheduleWork() OVERRIDE {}
+  virtual void ScheduleDelayedWork(
+      const TimeTicks& delayed_work_time) OVERRIDE {}
+};
+
+class PostTaskTest : public testing::Test {
+ public:
+  void Run(int batch_size, int tasks_per_reload) {
+    base::TimeTicks start = base::TimeTicks::HighResNow();
+    base::TimeTicks now;
+    MessageLoop loop(scoped_ptr<MessagePump>(new FakeMessagePump));
+    scoped_refptr<internal::IncomingTaskQueue> queue(
+        new internal::IncomingTaskQueue(&loop));
+    uint32_t num_posted = 0;
+    do {
+      for (int i = 0; i < batch_size; ++i) {
+        for (int j = 0; j < tasks_per_reload; ++j) {
+          queue->AddToIncomingQueue(
+              FROM_HERE, base::Bind(&DoNothing), base::TimeDelta(), false);
+          num_posted++;
+        }
+        TaskQueue loop_local_queue;
+        queue->ReloadWorkQueue(&loop_local_queue);
+        while (!loop_local_queue.empty()) {
+          PendingTask t = loop_local_queue.front();
+          loop_local_queue.pop();
+          loop.RunTask(t);
+        }
+      }
+
+      now = base::TimeTicks::HighResNow();
+    } while (now - start < base::TimeDelta::FromSeconds(5));
+    std::string trace = StringPrintf("%d_tasks_per_reload", tasks_per_reload);
+    perf_test::PrintResult(
+        "task",
+        "",
+        trace,
+        (now - start).InMicroseconds() / static_cast<double>(num_posted),
+        "us/task",
+        true);
+  }
+};
+
+TEST_F(PostTaskTest, OneTaskPerReload) {
+  Run(10000, 1);
+}
+
+TEST_F(PostTaskTest, TenTasksPerReload) {
+  Run(10000, 10);
+}
+
+TEST_F(PostTaskTest, OneHundredTasksPerReload) {
+  Run(1000, 100);
+}
+
+}  // namespace
+}  // namespace base
diff --git a/base/threading/thread_perftest.cc b/base/threading/thread_perftest.cc
index eaeddf9..088f629 100644
--- a/base/threading/thread_perftest.cc
+++ b/base/threading/thread_perftest.cc
@@ -6,6 +6,7 @@
 #include "base/bind.h"
 #include "base/command_line.h"
 #include "base/memory/scoped_vector.h"
+#include "base/strings/stringprintf.h"
 #include "base/synchronization/condition_variable.h"
 #include "base/synchronization/lock.h"
 #include "base/synchronization/waitable_event.h"
@@ -259,7 +260,6 @@
 TEST_F(ConditionVariablePerfTest, EventPingPong) {
   RunPingPongTest("4_ConditionVariable_Threads", 4);
 }
-
 #if defined(OS_POSIX)
 
 // Absolutely 100% minimal posix waitable event. If there is a better/faster