Add a base class for objects that want to filter messages on the IO thread.  I'll switch the filters to it in future separate changes.

I've also taken out the special case for an initial filter from the IPC classes.  The reason it existed was that there was a race condition of some messages not being filtered if a filter is added after construction but before launching the peer process.  Taking it out allows us to add more than one filter and makes things a little cleaner.
Review URL: http://codereview.chromium.org/5513001

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@68043 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: 4b580bf3020b1e0eaf5b7efad50896b4c62474c5
diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc
index f05ae48..8450c28 100644
--- a/ipc/ipc_channel_proxy.cc
+++ b/ipc/ipc_channel_proxy.cc
@@ -61,7 +61,6 @@
 //------------------------------------------------------------------------------
 
 ChannelProxy::Context::Context(Channel::Listener* listener,
-                               MessageFilter* filter,
                                MessageLoop* ipc_message_loop)
     : listener_message_loop_(MessageLoop::current()),
       listener_(listener),
@@ -69,8 +68,6 @@
       channel_(NULL),
       peer_pid_(0),
       channel_connected_called_(false) {
-  if (filter)
-    filters_.push_back(make_scoped_refptr(filter));
 }
 
 void ChannelProxy::Context::CreateChannel(const std::string& id,
@@ -118,6 +115,12 @@
 
 // Called on the IPC::Channel thread
 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
+  // Add any pending filters.  This avoids a race condition where someone
+  // creates a ChannelProxy, calls AddFilter, and then right after starts the
+  // peer process.  The IO thread could receive a message before the task to add
+  // the filter is run on the IO thread.
+  OnAddFilter();
+
   peer_pid_ = peer_pid;
   for (size_t i = 0; i < filters_.size(); ++i)
     filters_[i]->OnChannelConnected(peer_pid);
@@ -189,13 +192,24 @@
 }
 
 // Called on the IPC::Channel thread
-void ChannelProxy::Context::OnAddFilter(MessageFilter* filter) {
-  filters_.push_back(make_scoped_refptr(filter));
+void ChannelProxy::Context::OnAddFilter() {
+  std::vector<scoped_refptr<MessageFilter> > filters;
+  {
+    AutoLock auto_lock(pending_filters_lock_);
+    filters.swap(pending_filters_);
+  }
 
-  // If the channel has already been created, then we need to send this message
-  // so that the filter gets access to the Channel.
-  if (channel_)
-    filter->OnFilterAdded(channel_);
+  for (size_t i = 0; i < filters.size(); ++i) {
+    filters_.push_back(filters[i]);
+
+    // If the channel has already been created, then we need to send this
+    // message so that the filter gets access to the Channel.
+    if (channel_)
+      filters[i]->OnFilterAdded(channel_);
+    // Ditto for the peer process id.
+    if (peer_pid_)
+      filters[i]->OnChannelConnected(peer_pid_);
+  }
 }
 
 // Called on the IPC::Channel thread
@@ -212,6 +226,15 @@
 }
 
 // Called on the listener's thread
+void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
+  AutoLock auto_lock(pending_filters_lock_);
+  pending_filters_.push_back(make_scoped_refptr(filter));
+  ipc_message_loop_->PostTask(
+      FROM_HERE,
+      NewRunnableMethod(this, &Context::OnAddFilter));
+}
+
+// Called on the listener's thread
 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
   if (!listener_)
     return;
@@ -255,15 +278,18 @@
 
 //-----------------------------------------------------------------------------
 
-ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode,
-                           Channel::Listener* listener, MessageFilter* filter,
+ChannelProxy::ChannelProxy(const std::string& channel_id,
+                           Channel::Mode mode,
+                           Channel::Listener* listener,
                            MessageLoop* ipc_thread)
-    : context_(new Context(listener, filter, ipc_thread)) {
+    : context_(new Context(listener, ipc_thread)) {
   Init(channel_id, mode, ipc_thread, true);
 }
 
-ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode,
-                           MessageLoop* ipc_thread, Context* context,
+ChannelProxy::ChannelProxy(const std::string& channel_id,
+                           Channel::Mode mode,
+                           MessageLoop* ipc_thread,
+                           Context* context,
                            bool create_pipe_now)
     : context_(context) {
   Init(channel_id, mode, ipc_thread, create_pipe_now);
@@ -314,12 +340,7 @@
 }
 
 void ChannelProxy::AddFilter(MessageFilter* filter) {
-  context_->ipc_message_loop()->PostTask(
-      FROM_HERE,
-      NewRunnableMethod(
-          context_.get(),
-          &Context::OnAddFilter,
-          make_scoped_refptr(filter)));
+  context_->AddFilter(filter);
 }
 
 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h
index 53a39b4..a85841d 100644
--- a/ipc/ipc_channel_proxy.h
+++ b/ipc/ipc_channel_proxy.h
@@ -8,6 +8,7 @@
 
 #include <vector>
 
+#include "base/lock.h"
 #include "base/ref_counted.h"
 #include "ipc/ipc_channel.h"
 
@@ -104,8 +105,9 @@
   // on the background thread.  Any message not handled by the filter will be
   // dispatched to the listener.  The given message loop indicates where the
   // IPC::Channel should be created.
-  ChannelProxy(const std::string& channel_id, Channel::Mode mode,
-               Channel::Listener* listener, MessageFilter* filter,
+  ChannelProxy(const std::string& channel_id,
+               Channel::Mode mode,
+               Channel::Listener* listener,
                MessageLoop* ipc_thread_loop);
 
   virtual ~ChannelProxy();
@@ -129,6 +131,10 @@
   // Ordinarily, messages sent to the ChannelProxy are routed to the matching
   // listener on the worker thread.  This API allows code to intercept messages
   // before they are sent to the worker thread.
+  // If you call this before the target process is launched, then you're
+  // guaranteed to not miss any messages.  But if you call this anytime after,
+  // then some messages might be missed since the filter is added internally on
+  // the IO thread.
   void AddFilter(MessageFilter* filter);
   void RemoveFilter(MessageFilter* filter);
 
@@ -147,16 +153,17 @@
   // A subclass uses this constructor if it needs to add more information
   // to the internal state.  If create_pipe_now is true, the pipe is created
   // immediately.  Otherwise it's created on the IO thread.
-  ChannelProxy(const std::string& channel_id, Channel::Mode mode,
-               MessageLoop* ipc_thread_loop, Context* context,
+  ChannelProxy(const std::string& channel_id,
+               Channel::Mode mode,
+               MessageLoop* ipc_thread_loop,
+               Context* context,
                bool create_pipe_now);
 
   // Used internally to hold state that is referenced on the IPC thread.
   class Context : public base::RefCountedThreadSafe<Context>,
                   public Channel::Listener {
    public:
-    Context(Channel::Listener* listener, MessageFilter* filter,
-            MessageLoop* ipc_thread);
+    Context(Channel::Listener* listener, MessageLoop* ipc_thread);
     void ClearIPCMessageLoop() { ipc_message_loop_ = NULL; }
     MessageLoop* ipc_message_loop() const { return ipc_message_loop_; }
     const std::string& channel_id() const { return channel_id_; }
@@ -196,10 +203,13 @@
     // Create the Channel
     void CreateChannel(const std::string& id, const Channel::Mode& mode);
 
-    // Methods called via InvokeLater:
+    // Methods called on the IO thread.
     void OnSendMessage(Message* message_ptr);
-    void OnAddFilter(MessageFilter* filter);
+    void OnAddFilter();
     void OnRemoveFilter(MessageFilter* filter);
+
+    // Methods called on the listener thread.
+    void AddFilter(MessageFilter* filter);
     void OnDispatchConnected();
     void OnDispatchError();
 
@@ -213,6 +223,12 @@
     std::string channel_id_;
     int peer_pid_;
     bool channel_connected_called_;
+
+    // Holds filters between the AddFilter call on the listerner thread and the
+    // IPC thread when they're added to filters_.
+    std::vector<scoped_refptr<MessageFilter> > pending_filters_;
+    // Lock for pending_filters_.
+    Lock pending_filters_lock_;
   };
 
   Context* context() { return context_; }
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index 8598c9c..e77846c 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -200,10 +200,9 @@
 
 SyncChannel::SyncContext::SyncContext(
     Channel::Listener* listener,
-    MessageFilter* filter,
     MessageLoop* ipc_thread,
     WaitableEvent* shutdown_event)
-    : ChannelProxy::Context(listener, filter, ipc_thread),
+    : ChannelProxy::Context(listener, ipc_thread),
       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
       shutdown_event_(shutdown_event) {
 }
@@ -356,13 +355,15 @@
 
 
 SyncChannel::SyncChannel(
-    const std::string& channel_id, Channel::Mode mode,
-    Channel::Listener* listener, MessageFilter* filter,
-    MessageLoop* ipc_message_loop, bool create_pipe_now,
+    const std::string& channel_id,
+    Channel::Mode mode,
+    Channel::Listener* listener,
+    MessageLoop* ipc_message_loop,
+    bool create_pipe_now,
     WaitableEvent* shutdown_event)
     : ChannelProxy(
           channel_id, mode, ipc_message_loop,
-          new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
+          new SyncContext(listener, ipc_message_loop, shutdown_event),
           create_pipe_now),
       sync_messages_with_no_timeout_allowed_(true) {
   // Ideally we only want to watch this object when running a nested message
diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h
index 713b868..3435042 100644
--- a/ipc/ipc_sync_channel.h
+++ b/ipc/ipc_sync_channel.h
@@ -34,9 +34,11 @@
 class SyncChannel : public ChannelProxy,
                     public base::WaitableEventWatcher::Delegate {
  public:
-  SyncChannel(const std::string& channel_id, Channel::Mode mode,
-              Channel::Listener* listener, MessageFilter* filter,
-              MessageLoop* ipc_message_loop, bool create_pipe_now,
+  SyncChannel(const std::string& channel_id,
+              Channel::Mode mode,
+              Channel::Listener* listener,
+              MessageLoop* ipc_message_loop,
+              bool create_pipe_now,
               base::WaitableEvent* shutdown_event);
   virtual ~SyncChannel();
 
@@ -59,7 +61,6 @@
                       public base::WaitableEventWatcher::Delegate {
    public:
     SyncContext(Channel::Listener* listener,
-                MessageFilter* filter,
                 MessageLoop* ipc_thread,
                 base::WaitableEvent* shutdown_event);
 
diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc
index 15d5a38..bf21917 100644
--- a/ipc/ipc_sync_channel_unittest.cc
+++ b/ipc/ipc_sync_channel_unittest.cc
@@ -170,7 +170,7 @@
     // Link ipc_thread_, listener_thread_ and channel_ altogether.
     StartThread(&ipc_thread_, MessageLoop::TYPE_IO);
     channel_.reset(new SyncChannel(
-        channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true,
+        channel_name_, mode_, this, ipc_thread_.message_loop(), true,
         &shutdown_event_));
     channel_created_->Signal();
     Run();
diff --git a/ipc/ipc_tests.cc b/ipc/ipc_tests.cc
index aa30182..aee01f5 100644
--- a/ipc/ipc_tests.cc
+++ b/ipc/ipc_tests.cc
@@ -250,7 +250,7 @@
   {
     // setup IPC channel proxy
     IPC::ChannelProxy chan(kTestClientChannel, IPC::Channel::MODE_SERVER,
-                           &channel_listener, NULL, thread.message_loop());
+                           &channel_listener, thread.message_loop());
 
     channel_listener.Init(&chan);