Allow ObserverListThreadSafe to be used from sequenced tasks.

This CL is identical to https://codereview.chromium.org/2592143003,
except that RemoveObserver() works when called from any sequence
(existing code rely on that).

Previously, observers could only be added to an ObserverListThreadSafe
from single-threaded tasks. Observers were notified on the registration
thread.

With this CL, observers can also be added to an ObserverListThreadSafe
from sequenced tasks. They are notified on the registration sequence.

ObserverListThreadSafe behaves almost the same way as before when used
from single-threaded tasks. The following things changed:
- The order in which observers registered from the same thread
  are notified is no longer deterministic.
- One notification task is posted for each observer rather than one
  notification task per thread.
- If an observer is added to a NOTIFY_ALL ObserverListThreadSafe from
  a stack of notification dispatches, only the notification on top of
  the stack is sent to the newly added observer.

BUG=675631

Review-Url: https://codereview.chromium.org/2805933002
Cr-Commit-Position: refs/heads/master@{#463345}


CrOS-Libchrome-Original-Commit: 6aad314383abc65e773b58a5aa697a84bf42e176
diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h
index afb1010..9e6e347 100644
--- a/base/observer_list_threadsafe.h
+++ b/base/observer_list_threadsafe.h
@@ -5,52 +5,49 @@
 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
 #define BASE_OBSERVER_LIST_THREADSAFE_H_
 
-#include <algorithm>
-#include <map>
-#include <memory>
-#include <tuple>
+#include <unordered_map>
 
 #include "base/bind.h"
 #include "base/location.h"
 #include "base/logging.h"
 #include "base/macros.h"
-#include "base/memory/ptr_util.h"
 #include "base/memory/ref_counted.h"
 #include "base/observer_list.h"
+#include "base/sequenced_task_runner.h"
+#include "base/stl_util.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/sequenced_task_runner_handle.h"
+#include "base/threading/thread_local.h"
+#include "build/build_config.h"
+
+// TODO(fdoray): Removing these includes causes IWYU failures in other headers,
+// remove them in a follow- up CL.
+#include "base/memory/ptr_util.h"
 #include "base/single_thread_task_runner.h"
-#include "base/threading/platform_thread.h"
 #include "base/threading/thread_task_runner_handle.h"
 
 ///////////////////////////////////////////////////////////////////////////////
 //
 // OVERVIEW:
 //
-//   A thread-safe container for a list of observers.
-//   This is similar to the observer_list (see observer_list.h), but it
-//   is more robust for multi-threaded situations.
+//   A thread-safe container for a list of observers. This is similar to the
+//   observer_list (see observer_list.h), but it is more robust for multi-
+//   threaded situations.
 //
 //   The following use cases are supported:
-//    * Observers can register for notifications from any thread.
-//      Callbacks to the observer will occur on the same thread where
-//      the observer initially called AddObserver() from.
-//    * Any thread may trigger a notification via Notify().
-//    * Observers can remove themselves from the observer list inside
-//      of a callback.
-//    * If one thread is notifying observers concurrently with an observer
-//      removing itself from the observer list, the notifications will
-//      be silently dropped.
+//    * Observers can register for notifications from any sequence. They are
+//      always notified on the sequence from which they were registered.
+//    * Any sequence may trigger a notification via Notify().
+//    * Observers can remove themselves from the observer list inside of a
+//      callback.
+//    * If one sequence is notifying observers concurrently with an observer
+//      removing itself from the observer list, the notifications will be
+//      silently dropped.
 //
-//   The drawback of the threadsafe observer list is that notifications
-//   are not as real-time as the non-threadsafe version of this class.
-//   Notifications will always be done via PostTask() to another thread,
-//   whereas with the non-thread-safe observer_list, notifications happen
-//   synchronously and immediately.
-//
-//   IMPLEMENTATION NOTES
-//   The ObserverListThreadSafe maintains an ObserverList for each thread
-//   which uses the ThreadSafeObserver.  When Notifying the observers,
-//   we simply call PostTask to each registered thread, and then each thread
-//   will notify its regular ObserverList.
+//   The drawback of the threadsafe observer list is that notifications are not
+//   as real-time as the non-threadsafe version of this class. Notifications
+//   will always be done via PostTask() to another sequence, whereas with the
+//   non-thread-safe observer_list, notifications happen synchronously.
 //
 ///////////////////////////////////////////////////////////////////////////////
 
@@ -77,68 +74,63 @@
   using NotificationType =
       typename ObserverList<ObserverType>::NotificationType;
 
-  ObserverListThreadSafe()
-      : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
+  ObserverListThreadSafe() = default;
   explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
 
-  // Add an observer to the list.  An observer should not be added to
-  // the same list more than once.
-  void AddObserver(ObserverType* obs) {
-    // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
-    // so do not add the observer.
-    if (!ThreadTaskRunnerHandle::IsSet())
+  // Adds |observer| to the list. |observer| must not already be in the list.
+  void AddObserver(ObserverType* observer) {
+    // TODO(fdoray): Change this to a DCHECK once all call sites have a
+    // SequencedTaskRunnerHandle.
+    if (!SequencedTaskRunnerHandle::IsSet())
       return;
 
-    ObserverList<ObserverType>* list = nullptr;
-    PlatformThreadId thread_id = PlatformThread::CurrentId();
-    {
-      AutoLock lock(list_lock_);
-      if (observer_lists_.find(thread_id) == observer_lists_.end()) {
-        observer_lists_[thread_id] =
-            base::MakeUnique<ObserverListContext>(type_);
+    AutoLock auto_lock(lock_);
+
+    // Add |observer| to the list of observers.
+    DCHECK(!ContainsKey(observers_, observer));
+    const scoped_refptr<SequencedTaskRunner> task_runner =
+        SequencedTaskRunnerHandle::Get();
+    observers_[observer] = task_runner;
+
+    // If this is called while a notification is being dispatched on this thread
+    // and |type_| is NOTIFY_ALL, |observer| must be notified (if a notification
+    // is being dispatched on another thread in parallel, the notification may
+    // or may not make it to |observer| depending on the outcome of the race to
+    // |lock_|).
+    if (type_ == NotificationType::NOTIFY_ALL) {
+      const NotificationData* current_notification =
+          tls_current_notification_.Get();
+      if (current_notification) {
+        task_runner->PostTask(
+            current_notification->from_here,
+            Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
+                 observer, *current_notification));
       }
-      list = &(observer_lists_[thread_id]->list);
     }
-    list->AddObserver(obs);
   }
 
   // Remove an observer from the list if it is in the list.
-  // If there are pending notifications in-transit to the observer, they will
-  // be aborted.
-  // If the observer to be removed is in the list, RemoveObserver MUST
-  // be called from the same thread which called AddObserver.
-  void RemoveObserver(ObserverType* obs) {
-    PlatformThreadId thread_id = PlatformThread::CurrentId();
-    {
-      AutoLock lock(list_lock_);
-      auto it = observer_lists_.find(thread_id);
-      if (it == observer_lists_.end()) {
-        // This will happen if we try to remove an observer on a thread
-        // we never added an observer for.
-        return;
-      }
-      ObserverList<ObserverType>& list = it->second->list;
-
-      list.RemoveObserver(obs);
-
-      // If that was the last observer in the list, remove the ObserverList
-      // entirely.
-      if (list.size() == 0)
-        observer_lists_.erase(it);
-    }
+  //
+  // If a notification was sent to the observer but hasn't started to run yet,
+  // it will be aborted. If a notification has started to run, removing the
+  // observer won't stop it.
+  void RemoveObserver(ObserverType* observer) {
+    AutoLock auto_lock(lock_);
+    observers_.erase(observer);
   }
 
   // Verifies that the list is currently empty (i.e. there are no observers).
   void AssertEmpty() const {
-    AutoLock lock(list_lock_);
-    DCHECK(observer_lists_.empty());
+#if DCHECK_IS_ON()
+    AutoLock auto_lock(lock_);
+    DCHECK(observers_.empty());
+#endif
   }
 
-  // Notify methods.
-  // Make a thread-safe callback to each Observer in the list.
-  // Note, these calls are effectively asynchronous.  You cannot assume
-  // that at the completion of the Notify call that all Observers have
-  // been Notified.  The notification may still be pending delivery.
+  // Asynchronously invokes a callback on all observers, on their registration
+  // sequence. You cannot assume that at the completion of the Notify call that
+  // all Observers have been Notified. The notification may still be pending
+  // delivery.
   template <typename Method, typename... Params>
   void Notify(const tracked_objects::Location& from_here,
               Method m, Params&&... params) {
@@ -146,79 +138,71 @@
         Bind(&internal::Dispatcher<ObserverType, Method>::Run,
              m, std::forward<Params>(params)...);
 
-    AutoLock lock(list_lock_);
-    for (const auto& entry : observer_lists_) {
-      ObserverListContext* context = entry.second.get();
-      context->task_runner->PostTask(
+    AutoLock lock(lock_);
+    for (const auto& observer : observers_) {
+      observer.second->PostTask(
           from_here,
-          Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
-               this, context, method));
+          Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
+               observer.first, NotificationData(from_here, method)));
     }
   }
 
  private:
   friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
 
-  struct ObserverListContext {
-    explicit ObserverListContext(NotificationType type)
-        : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
+  struct NotificationData {
+    NotificationData(const tracked_objects::Location& from_here_in,
+                     const Callback<void(ObserverType*)>& method_in)
+        : from_here(from_here_in), method(method_in) {}
 
-    scoped_refptr<SingleThreadTaskRunner> task_runner;
-    ObserverList<ObserverType> list;
-
-   private:
-    DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
+    tracked_objects::Location from_here;
+    Callback<void(ObserverType*)> method;
   };
 
-  ~ObserverListThreadSafe() {
-  }
+  ~ObserverListThreadSafe() = default;
 
-  // Wrapper which is called to fire the notifications for each thread's
-  // ObserverList.  This function MUST be called on the thread which owns
-  // the unsafe ObserverList.
-  void NotifyWrapper(ObserverListContext* context,
-                     const Callback<void(ObserverType*)>& method) {
-    // Check that this list still needs notifications.
+  void NotifyWrapper(ObserverType* observer,
+                     const NotificationData& notification) {
     {
-      AutoLock lock(list_lock_);
-      auto it = observer_lists_.find(PlatformThread::CurrentId());
+      AutoLock auto_lock(lock_);
 
-      // The ObserverList could have been removed already.  In fact, it could
-      // have been removed and then re-added!  If the master list's loop
-      // does not match this one, then we do not need to finish this
-      // notification.
-      if (it == observer_lists_.end() || it->second.get() != context)
+      // Check whether the observer still needs a notification.
+      auto it = observers_.find(observer);
+      if (it == observers_.end())
         return;
+      DCHECK(it->second->RunsTasksOnCurrentThread());
     }
 
-    for (auto& observer : context->list) {
-      method.Run(&observer);
-    }
+    // Keep track of the notification being dispatched on the current thread.
+    // This will be used if the callback below calls AddObserver().
+    //
+    // Note: |tls_current_notification_| may not be nullptr if this runs in a
+    // nested loop started by a notification callback. In that case, it is
+    // important to save the previous value to restore it later.
+    const NotificationData* const previous_notification =
+        tls_current_notification_.Get();
+    tls_current_notification_.Set(&notification);
 
-    // If there are no more observers on the list, we can now delete it.
-    if (context->list.size() == 0) {
-      {
-        AutoLock lock(list_lock_);
-        // Remove |list| if it's not already removed.
-        // This can happen if multiple observers got removed in a notification.
-        // See http://crbug.com/55725.
-        auto it = observer_lists_.find(PlatformThread::CurrentId());
-        if (it != observer_lists_.end() && it->second.get() == context)
-          observer_lists_.erase(it);
-      }
-    }
+    // Invoke the callback.
+    notification.method.Run(observer);
+
+    // Reset the notification being dispatched on the current thread to its
+    // previous value.
+    tls_current_notification_.Set(previous_notification);
   }
 
-  mutable Lock list_lock_;  // Protects the observer_lists_.
+  const NotificationType type_ = NotificationType::NOTIFY_ALL;
 
-  // Key by PlatformThreadId because in tests, clients can attempt to remove
-  // observers without a SingleThreadTaskRunner. If this were keyed by
-  // SingleThreadTaskRunner, that operation would be silently ignored, leaving
-  // garbage in the ObserverList.
-  std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
-      observer_lists_;
+  // Synchronizes access to |observers_|.
+  mutable Lock lock_;
 
-  const NotificationType type_;
+  // Keys are observers. Values are the SequencedTaskRunners on which they must
+  // be notified.
+  std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
+      observers_;
+
+  // Notification being dispatched on the current thread.
+  ThreadLocalPointer<const NotificationData> tls_current_notification_;
 
   DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
 };
diff --git a/base/observer_list_unittest.cc b/base/observer_list_unittest.cc
index c5e556b..cc7889f 100644
--- a/base/observer_list_unittest.cc
+++ b/base/observer_list_unittest.cc
@@ -5,13 +5,21 @@
 #include "base/observer_list.h"
 #include "base/observer_list_threadsafe.h"
 
+#include <utility>
 #include <vector>
 
+#include "base/bind.h"
 #include "base/compiler_specific.h"
 #include "base/location.h"
 #include "base/memory/weak_ptr.h"
 #include "base/run_loop.h"
+#include "base/sequenced_task_runner.h"
 #include "base/single_thread_task_runner.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/task_scheduler/post_task.h"
+#include "base/task_scheduler/task_scheduler.h"
+#include "base/test/scoped_task_environment.h"
+#include "base/test/scoped_task_scheduler.h"
 #include "base/threading/platform_thread.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
@@ -65,20 +73,6 @@
   bool remove_self_;
 };
 
-class ThreadSafeDisrupter : public Foo {
- public:
-  ThreadSafeDisrupter(ObserverListThreadSafe<Foo>* list, Foo* doomed)
-      : list_(list),
-        doomed_(doomed) {
-  }
-  ~ThreadSafeDisrupter() override {}
-  void Observe(int x) override { list_->RemoveObserver(doomed_); }
-
- private:
-  ObserverListThreadSafe<Foo>* list_;
-  Foo* doomed_;
-};
-
 template <typename ObserverListType>
 class AddInObserve : public Foo {
  public:
@@ -276,7 +270,6 @@
   Adder b(-1);
   Adder c(1);
   Adder d(-1);
-  ThreadSafeDisrupter evil(observer_list.get(), &c);
 
   observer_list->AddObserver(&a);
   observer_list->AddObserver(&b);
@@ -284,11 +277,11 @@
   observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
   RunLoop().RunUntilIdle();
 
-  observer_list->AddObserver(&evil);
   observer_list->AddObserver(&c);
   observer_list->AddObserver(&d);
 
   observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
+  observer_list->RemoveObserver(&c);
   RunLoop().RunUntilIdle();
 
   EXPECT_EQ(20, a.total);
@@ -329,18 +322,18 @@
   EXPECT_EQ(0, b.total);
 }
 
-TEST(ObserverListThreadSafeTest, WithoutMessageLoop) {
+TEST(ObserverListThreadSafeTest, WithoutSequence) {
   scoped_refptr<ObserverListThreadSafe<Foo> > observer_list(
       new ObserverListThreadSafe<Foo>);
 
   Adder a(1), b(1), c(1);
 
-  // No MessageLoop, so these should not be added.
+  // No sequence, so these should not be added.
   observer_list->AddObserver(&a);
   observer_list->AddObserver(&b);
 
   {
-    // Add c when there's a loop.
+    // Add c when there's a sequence.
     MessageLoop loop;
     observer_list->AddObserver(&c);
 
@@ -351,10 +344,10 @@
     EXPECT_EQ(0, b.total);
     EXPECT_EQ(10, c.total);
 
-    // Now add a when there's a loop.
+    // Now add a when there's a sequence.
     observer_list->AddObserver(&a);
 
-    // Remove c when there's a loop.
+    // Remove c when there's a sequence.
     observer_list->RemoveObserver(&c);
 
     // Notify again.
@@ -366,7 +359,7 @@
     EXPECT_EQ(10, c.total);
   }
 
-  // Removing should always succeed with or without a loop.
+  // Removing should always succeed with or without a sequence.
   observer_list->RemoveObserver(&a);
 
   // Notifying should not fail but should also be a no-op.
@@ -491,6 +484,135 @@
   observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
 }
 
+namespace {
+
+class SequenceVerificationObserver : public Foo {
+ public:
+  explicit SequenceVerificationObserver(
+      scoped_refptr<SequencedTaskRunner> task_runner)
+      : task_runner_(std::move(task_runner)) {}
+  ~SequenceVerificationObserver() override = default;
+
+  void Observe(int x) override {
+    called_on_valid_sequence_ = task_runner_->RunsTasksOnCurrentThread();
+  }
+
+  bool called_on_valid_sequence() const { return called_on_valid_sequence_; }
+
+ private:
+  const scoped_refptr<SequencedTaskRunner> task_runner_;
+  bool called_on_valid_sequence_ = false;
+
+  DISALLOW_COPY_AND_ASSIGN(SequenceVerificationObserver);
+};
+
+}  // namespace
+
+// Verify that observers are notified on the correct sequence.
+TEST(ObserverListThreadSafeTest, NotificationOnValidSequence) {
+  test::ScopedTaskEnvironment scoped_task_environment;
+
+  auto task_runner_1 = CreateSequencedTaskRunnerWithTraits(TaskTraits());
+  auto task_runner_2 = CreateSequencedTaskRunnerWithTraits(TaskTraits());
+
+  auto observer_list = make_scoped_refptr(new ObserverListThreadSafe<Foo>());
+
+  SequenceVerificationObserver observer_1(task_runner_1);
+  SequenceVerificationObserver observer_2(task_runner_2);
+
+  task_runner_1->PostTask(
+      FROM_HERE, Bind(&ObserverListThreadSafe<Foo>::AddObserver, observer_list,
+                      Unretained(&observer_1)));
+  task_runner_2->PostTask(
+      FROM_HERE, Bind(&ObserverListThreadSafe<Foo>::AddObserver, observer_list,
+                      Unretained(&observer_2)));
+
+  TaskScheduler::GetInstance()->FlushForTesting();
+
+  observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
+
+  TaskScheduler::GetInstance()->FlushForTesting();
+
+  EXPECT_TRUE(observer_1.called_on_valid_sequence());
+  EXPECT_TRUE(observer_2.called_on_valid_sequence());
+}
+
+// Verify that when an observer is added to a NOTIFY_ALL ObserverListThreadSafe
+// from a notification, it is itself notified.
+TEST(ObserverListThreadSafeTest, AddObserverFromNotificationNotifyAll) {
+  test::ScopedTaskEnvironment scoped_task_environment;
+  auto observer_list = make_scoped_refptr(new ObserverListThreadSafe<Foo>());
+
+  Adder observer_added_from_notification(1);
+
+  AddInObserve<ObserverListThreadSafe<Foo>> initial_observer(
+      observer_list.get());
+  initial_observer.SetToAdd(&observer_added_from_notification);
+  observer_list->AddObserver(&initial_observer);
+
+  observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
+
+  base::RunLoop().RunUntilIdle();
+
+  EXPECT_EQ(1, observer_added_from_notification.GetValue());
+}
+
+namespace {
+
+class RemoveWhileNotificationIsRunningObserver : public Foo {
+ public:
+  RemoveWhileNotificationIsRunningObserver()
+      : notification_running_(WaitableEvent::ResetPolicy::AUTOMATIC,
+                              WaitableEvent::InitialState::NOT_SIGNALED),
+        barrier_(WaitableEvent::ResetPolicy::AUTOMATIC,
+                 WaitableEvent::InitialState::NOT_SIGNALED) {}
+  ~RemoveWhileNotificationIsRunningObserver() override = default;
+
+  void Observe(int x) override {
+    notification_running_.Signal();
+    barrier_.Wait();
+  }
+
+  void WaitForNotificationRunning() { notification_running_.Wait(); }
+  void Unblock() { barrier_.Signal(); }
+
+ private:
+  WaitableEvent notification_running_;
+  WaitableEvent barrier_;
+
+  DISALLOW_COPY_AND_ASSIGN(RemoveWhileNotificationIsRunningObserver);
+};
+
+}  // namespace
+
+// Verify that there is no crash when an observer is removed while it is being
+// notified.
+TEST(ObserverListThreadSafeTest, RemoveWhileNotificationIsRunning) {
+  auto observer_list = make_scoped_refptr(new ObserverListThreadSafe<Foo>());
+  RemoveWhileNotificationIsRunningObserver observer;
+
+  WaitableEvent task_running(WaitableEvent::ResetPolicy::AUTOMATIC,
+                             WaitableEvent::InitialState::NOT_SIGNALED);
+  WaitableEvent barrier(WaitableEvent::ResetPolicy::AUTOMATIC,
+                        WaitableEvent::InitialState::NOT_SIGNALED);
+
+  // This must be after the declaration of |barrier| so that tasks posted to
+  // TaskScheduler can safely use |barrier|.
+  test::ScopedTaskEnvironment scoped_task_environment;
+
+  CreateSequencedTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives())
+      ->PostTask(FROM_HERE,
+                 base::Bind(&ObserverListThreadSafe<Foo>::AddObserver,
+                            observer_list, Unretained(&observer)));
+  TaskScheduler::GetInstance()->FlushForTesting();
+
+  observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
+  observer.WaitForNotificationRunning();
+  observer_list->RemoveObserver(&observer);
+
+  observer.Unblock();
+}
+
 TEST(ObserverListTest, Existing) {
   ObserverList<Foo> observer_list(ObserverList<Foo>::NOTIFY_EXISTING_ONLY);
   Adder a(1);