Reimplement ReceivedSyncMsgQueue::DispatchMessages

Implementation of IPC::SyncChannel::ReceivedSyncMsgQueue::DispatchMessages that
does not hold any messages in a local stack-frame's delayed_queue, which was
causing me to see an inbound sync message from a plugin not dispatched while
the renderer was waiting for replies from the plugin. This was causing the
plugin and renderer to deadlock waiting for each other.

BUG=108491
TEST=Run Pepperized O3D and observe for tab hangs
TEST=Run ipc_tests unittests


Review URL: http://codereview.chromium.org/9022038

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@117309 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: 522cc10d714c4a2e519176a8f24d2549820f63dc
diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc
index e7903f4..f396843 100644
--- a/ipc/ipc_sync_channel_unittest.cc
+++ b/ipc/ipc_sync_channel_unittest.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 //
@@ -1334,4 +1334,262 @@
   EXPECT_EQ(3, success);
 }
 
+//-----------------------------------------------------------------------------
+
+// This test case inspired by crbug.com/108491
+// We create two servers that use the same ListenerThread but have
+// SetRestrictDispatchToSameChannel set to true.
+// We create clients, then use some specific WaitableEvent wait/signalling to
+// ensure that messages get dispatched in a way that causes a deadlock due to
+// a nested dispatch and an eligible message in a higher-level dispatch's
+// delayed_queue. Specifically, we start with client1 about so send an
+// unblocking message to server1, while the shared listener thread for the
+// servers server1 and server2 is about to send a non-unblocking message to
+// client1. At the same time, client2 will be about to send an unblocking
+// message to server2. Server1 will handle the client1->server1 message by
+// telling server2 to send a non-unblocking message to client2.
+// What should happen is that the send to server2 should find the pending,
+// same-context client2->server2 message to dispatch, causing client2 to
+// unblock then handle the server2->client2 message, so that the shared
+// servers' listener thread can then respond to the client1->server1 message.
+// Then client1 can handle the non-unblocking server1->client1 message.
+// The old code would end up in a state where the server2->client2 message is
+// sent, but the client2->server2 message (which is eligible for dispatch, and
+// which is what client2 is waiting for) is stashed in a local delayed_queue
+// that has server1's channel context, causing a deadlock.
+// WaitableEvents in the events array are used to:
+//   event 0: indicate to client1 that server listener is in OnDoServerTask
+//   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
+//   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
+//   event 3: indicate to client2 that server listener is in OnDoServerTask
+
+namespace {
+
+class RestrictedDispatchDeadlockServer : public Worker {
+ public:
+  RestrictedDispatchDeadlockServer(int server_num,
+                                   WaitableEvent* server_ready_event,
+                                   WaitableEvent** events,
+                                   RestrictedDispatchDeadlockServer* peer)
+      : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER),
+        server_num_(server_num),
+        server_ready_event_(server_ready_event),
+        events_(events),
+        peer_(peer),
+        client_kicked_(false) { }
+
+  void OnDoServerTask() {
+    events_[3]->Signal();
+    events_[2]->Wait();
+    events_[0]->Signal();
+    SendMessageToClient();
+  }
+
+  void Run() {
+    channel()->SetRestrictDispatchToSameChannel(true);
+    server_ready_event_->Signal();
+  }
+
+  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
+
+ private:
+  bool OnMessageReceived(const Message& message) {
+    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
+     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
+     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
+    IPC_END_MESSAGE_MAP()
+    return true;
+  }
+
+  void OnNoArgs() {
+    if (server_num_ == 1) {
+      DCHECK(peer_ != NULL);
+      peer_->SendMessageToClient();
+    }
+  }
+
+  void SendMessageToClient() {
+    Message* msg = new SyncChannelTestMsg_NoArgs;
+    msg->set_unblock(false);
+    DCHECK(!msg->should_unblock());
+    Send(msg);
+  }
+
+  int server_num_;
+  WaitableEvent* server_ready_event_;
+  WaitableEvent** events_;
+  RestrictedDispatchDeadlockServer* peer_;
+  bool client_kicked_;
+};
+
+class RestrictedDispatchDeadlockClient2 : public Worker {
+ public:
+  RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server,
+                                    WaitableEvent* server_ready_event,
+                                    WaitableEvent** events)
+      : Worker("channel2", Channel::MODE_CLIENT),
+        server_(server),
+        server_ready_event_(server_ready_event),
+        events_(events),
+        received_msg_(false),
+        received_noarg_reply_(false),
+        done_issued_(false) {}
+
+  void Run() {
+    server_ready_event_->Wait();
+  }
+
+  void OnDoClient2Task() {
+    events_[3]->Wait();
+    events_[1]->Signal();
+    events_[2]->Signal();
+    DCHECK(received_msg_ == false);
+
+    Message* message = new SyncChannelTestMsg_NoArgs;
+    message->set_unblock(true);
+    Send(message);
+    received_noarg_reply_ = true;
+  }
+
+  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
+ private:
+  bool OnMessageReceived(const Message& message) {
+    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
+     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
+    IPC_END_MESSAGE_MAP()
+    return true;
+  }
+
+  void OnNoArgs() {
+    received_msg_ = true;
+    PossiblyDone();
+  }
+
+  void PossiblyDone() {
+    if (received_noarg_reply_ && received_msg_) {
+      DCHECK(done_issued_ == false);
+      done_issued_ = true;
+      Send(new SyncChannelTestMsg_Done);
+      Done();
+    }
+  }
+
+  RestrictedDispatchDeadlockServer* server_;
+  WaitableEvent* server_ready_event_;
+  WaitableEvent** events_;
+  bool received_msg_;
+  bool received_noarg_reply_;
+  bool done_issued_;
+};
+
+class RestrictedDispatchDeadlockClient1 : public Worker {
+ public:
+  RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server,
+                                    RestrictedDispatchDeadlockClient2* peer,
+                                    WaitableEvent* server_ready_event,
+                                    WaitableEvent** events)
+      : Worker("channel1", Channel::MODE_CLIENT),
+        server_(server),
+        peer_(peer),
+        server_ready_event_(server_ready_event),
+        events_(events),
+        received_msg_(false),
+        received_noarg_reply_(false),
+        done_issued_(false) {}
+
+  void Run() {
+    server_ready_event_->Wait();
+    server_->ListenerThread()->message_loop()->PostTask(
+        FROM_HERE,
+        base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_));
+    peer_->ListenerThread()->message_loop()->PostTask(
+        FROM_HERE,
+        base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_));
+    events_[0]->Wait();
+    events_[1]->Wait();
+    DCHECK(received_msg_ == false);
+
+    Message* message = new SyncChannelTestMsg_NoArgs;
+    message->set_unblock(true);
+    Send(message);
+    received_noarg_reply_ = true;
+    PossiblyDone();
+  }
+
+  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
+ private:
+  bool OnMessageReceived(const Message& message) {
+    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
+     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
+    IPC_END_MESSAGE_MAP()
+    return true;
+  }
+
+  void OnNoArgs() {
+    received_msg_ = true;
+    PossiblyDone();
+  }
+
+  void PossiblyDone() {
+    if (received_noarg_reply_ && received_msg_) {
+      DCHECK(done_issued_ == false);
+      done_issued_ = true;
+      Send(new SyncChannelTestMsg_Done);
+      Done();
+    }
+  }
+
+  RestrictedDispatchDeadlockServer* server_;
+  RestrictedDispatchDeadlockClient2* peer_;
+  WaitableEvent* server_ready_event_;
+  WaitableEvent** events_;
+  bool received_msg_;
+  bool received_noarg_reply_;
+  bool done_issued_;
+};
+
+}  // namespace
+
+TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
+  std::vector<Worker*> workers;
+
+  // A shared worker thread so that server1 and server2 run on one thread.
+  base::Thread worker_thread("RestrictedDispatchDeadlock");
+  ASSERT_TRUE(worker_thread.Start());
+
+  WaitableEvent server1_ready(false, false);
+  WaitableEvent server2_ready(false, false);
+
+  WaitableEvent event0(false, false);
+  WaitableEvent event1(false, false);
+  WaitableEvent event2(false, false);
+  WaitableEvent event3(false, false);
+  WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
+
+  RestrictedDispatchDeadlockServer* server1;
+  RestrictedDispatchDeadlockServer* server2;
+  RestrictedDispatchDeadlockClient1* client1;
+  RestrictedDispatchDeadlockClient2* client2;
+
+  server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events,
+                                                 NULL);
+  server2->OverrideThread(&worker_thread);
+  workers.push_back(server2);
+
+  client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready,
+                                                  events);
+  workers.push_back(client2);
+
+  server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events,
+                                                 server2);
+  server1->OverrideThread(&worker_thread);
+  workers.push_back(server1);
+
+  client1 = new RestrictedDispatchDeadlockClient1(server1, client2,
+                                                  &server1_ready, events);
+  workers.push_back(client1);
+
+  RunTest(workers);
+}
+
 }  // namespace IPC