blob: f20f78811fe8416c33c10b4a34a056342c7b2b1d [file] [log] [blame]
agl@chromium.org1c6dcf22009-07-23 08:57:21 +09001// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4//
5// Unit test for SyncChannel.
6
7#include <string>
8#include <vector>
9
10#include "base/basictypes.h"
11#include "base/logging.h"
12#include "base/message_loop.h"
13#include "base/platform_thread.h"
14#include "base/stl_util-inl.h"
15#include "base/string_util.h"
16#include "base/thread.h"
17#include "base/waitable_event.h"
18#include "ipc/ipc_message.h"
19#include "ipc/ipc_sync_channel.h"
20#include "testing/gtest/include/gtest/gtest.h"
21
22
23#define MESSAGES_INTERNAL_FILE "ipc/ipc_sync_message_unittest.h"
24#include "ipc/ipc_message_macros.h"
25
26using namespace IPC;
27using base::WaitableEvent;
28
29namespace {
30
31// Base class for a "process" with listener and IPC threads.
32class Worker : public Channel::Listener, public Message::Sender {
33 public:
34 // Will create a channel without a name.
35 Worker(Channel::Mode mode, const std::string& thread_name)
36 : done_(new WaitableEvent(false, false)),
37 channel_created_(new WaitableEvent(false, false)),
38 mode_(mode),
39 ipc_thread_((thread_name + "_ipc").c_str()),
40 listener_thread_((thread_name + "_listener").c_str()),
41 overrided_thread_(NULL),
42 shutdown_event_(true, false) { }
43
44 // Will create a named channel and use this name for the threads' name.
45 Worker(const std::string& channel_name, Channel::Mode mode)
46 : done_(new WaitableEvent(false, false)),
47 channel_created_(new WaitableEvent(false, false)),
48 channel_name_(channel_name),
49 mode_(mode),
50 ipc_thread_((channel_name + "_ipc").c_str()),
51 listener_thread_((channel_name + "_listener").c_str()),
52 overrided_thread_(NULL),
53 shutdown_event_(true, false) { }
54
55 // The IPC thread needs to outlive SyncChannel, so force the correct order of
56 // destruction.
57 virtual ~Worker() {
58 WaitableEvent listener_done(false, false), ipc_done(false, false);
59 ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
60 this, &Worker::OnListenerThreadShutdown1, &listener_done,
61 &ipc_done));
62 listener_done.Wait();
63 ipc_done.Wait();
64 ipc_thread_.Stop();
65 listener_thread_.Stop();
66 }
67 void AddRef() { }
68 void Release() { }
69 bool Send(Message* msg) { return channel_->Send(msg); }
70 bool SendWithTimeout(Message* msg, int timeout_ms) {
71 return channel_->SendWithTimeout(msg, timeout_ms);
72 }
73 void WaitForChannelCreation() { channel_created_->Wait(); }
74 void CloseChannel() {
75 DCHECK(MessageLoop::current() == ListenerThread()->message_loop());
76 channel_->Close();
77 }
78 void Start() {
79 StartThread(&listener_thread_, MessageLoop::TYPE_DEFAULT);
80 ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
81 this, &Worker::OnStart));
82 }
83 void OverrideThread(base::Thread* overrided_thread) {
84 DCHECK(overrided_thread_ == NULL);
85 overrided_thread_ = overrided_thread;
86 }
87 bool SendAnswerToLife(bool pump, int timeout, bool succeed) {
88 int answer = 0;
89 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
90 if (pump)
91 msg->EnableMessagePumping();
92 bool result = SendWithTimeout(msg, timeout);
93 DCHECK(result == succeed);
94 DCHECK(answer == (succeed ? 42 : 0));
95 return result;
96 }
97 bool SendDouble(bool pump, bool succeed) {
98 int answer = 0;
99 SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
100 if (pump)
101 msg->EnableMessagePumping();
102 bool result = Send(msg);
103 DCHECK(result == succeed);
104 DCHECK(answer == (succeed ? 10 : 0));
105 return result;
106 }
107 Channel::Mode mode() { return mode_; }
108 WaitableEvent* done_event() { return done_.get(); }
109
110 protected:
111 // Derived classes need to call this when they've completed their part of
112 // the test.
113 void Done() { done_->Signal(); }
114 // Functions for dervied classes to implement if they wish.
115 virtual void Run() { }
116 virtual void OnAnswer(int* answer) { NOTREACHED(); }
117 virtual void OnAnswerDelay(Message* reply_msg) {
118 // The message handler map below can only take one entry for
119 // SyncChannelTestMsg_AnswerToLife, so since some classes want
120 // the normal version while other want the delayed reply, we
121 // call the normal version if the derived class didn't override
122 // this function.
123 int answer;
124 OnAnswer(&answer);
125 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
126 Send(reply_msg);
127 }
128 virtual void OnDouble(int in, int* out) { NOTREACHED(); }
129 virtual void OnDoubleDelay(int in, Message* reply_msg) {
130 int result;
131 OnDouble(in, &result);
132 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
133 Send(reply_msg);
134 }
135
136 private:
137 base::Thread* ListenerThread() {
138 return overrided_thread_ ? overrided_thread_ : &listener_thread_;
139 }
140 // Called on the listener thread to create the sync channel.
141 void OnStart() {
142 // Link ipc_thread_, listener_thread_ and channel_ altogether.
143 StartThread(&ipc_thread_, MessageLoop::TYPE_IO);
144 channel_.reset(new SyncChannel(
145 channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true,
146 &shutdown_event_));
147 channel_created_->Signal();
148 Run();
149 }
150
151 void OnListenerThreadShutdown1(WaitableEvent* listener_event,
152 WaitableEvent* ipc_event) {
153 // SyncChannel needs to be destructed on the thread that it was created on.
154 channel_.reset();
155
156 MessageLoop::current()->RunAllPending();
157
158 ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
159 this, &Worker::OnIPCThreadShutdown, listener_event, ipc_event));
160 }
161
162 void OnIPCThreadShutdown(WaitableEvent* listener_event,
163 WaitableEvent* ipc_event) {
164 MessageLoop::current()->RunAllPending();
165 ipc_event->Signal();
166
167 listener_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
168 this, &Worker::OnListenerThreadShutdown2, listener_event));
169 }
170
171 void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
172 MessageLoop::current()->RunAllPending();
173 listener_event->Signal();
174 }
175
176 void OnMessageReceived(const Message& message) {
177 IPC_BEGIN_MESSAGE_MAP(Worker, message)
178 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
179 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
180 OnAnswerDelay)
181 IPC_END_MESSAGE_MAP()
182 }
183
184 void StartThread(base::Thread* thread, MessageLoop::Type type) {
185 base::Thread::Options options;
186 options.message_loop_type = type;
187 thread->StartWithOptions(options);
188 }
189
190 scoped_ptr<WaitableEvent> done_;
191 scoped_ptr<WaitableEvent> channel_created_;
192 std::string channel_name_;
193 Channel::Mode mode_;
194 scoped_ptr<SyncChannel> channel_;
195 base::Thread ipc_thread_;
196 base::Thread listener_thread_;
197 base::Thread* overrided_thread_;
198
199 base::WaitableEvent shutdown_event_;
200
201 DISALLOW_EVIL_CONSTRUCTORS(Worker);
202};
203
204
205// Starts the test with the given workers. This function deletes the workers
206// when it's done.
207void RunTest(std::vector<Worker*> workers) {
208 // First we create the workers that are channel servers, or else the other
209 // workers' channel initialization might fail because the pipe isn't created..
210 for (size_t i = 0; i < workers.size(); ++i) {
211 if (workers[i]->mode() == Channel::MODE_SERVER) {
212 workers[i]->Start();
213 workers[i]->WaitForChannelCreation();
214 }
215 }
216
217 // now create the clients
218 for (size_t i = 0; i < workers.size(); ++i) {
219 if (workers[i]->mode() == Channel::MODE_CLIENT)
220 workers[i]->Start();
221 }
222
223 // wait for all the workers to finish
224 for (size_t i = 0; i < workers.size(); ++i)
225 workers[i]->done_event()->Wait();
226
227 STLDeleteContainerPointers(workers.begin(), workers.end());
228}
229
230} // namespace
231
232class IPCSyncChannelTest : public testing::Test {
233 private:
234 MessageLoop message_loop_;
235};
236
237//-----------------------------------------------------------------------------
238
239namespace {
240
241class SimpleServer : public Worker {
242 public:
243 SimpleServer(bool pump_during_send)
244 : Worker(Channel::MODE_SERVER, "simpler_server"),
245 pump_during_send_(pump_during_send) { }
246 void Run() {
247 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
248 Done();
249 }
250
251 bool pump_during_send_;
252};
253
254class SimpleClient : public Worker {
255 public:
256 SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { }
257
258 void OnAnswer(int* answer) {
259 *answer = 42;
260 Done();
261 }
262};
263
264void Simple(bool pump_during_send) {
265 std::vector<Worker*> workers;
266 workers.push_back(new SimpleServer(pump_during_send));
267 workers.push_back(new SimpleClient());
268 RunTest(workers);
269}
270
271} // namespace
272
273// Tests basic synchronous call
274TEST_F(IPCSyncChannelTest, Simple) {
275 Simple(false);
276 Simple(true);
277}
278
279//-----------------------------------------------------------------------------
280
281namespace {
282
283class DelayClient : public Worker {
284 public:
285 DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { }
286
287 void OnAnswerDelay(Message* reply_msg) {
288 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
289 Send(reply_msg);
290 Done();
291 }
292};
293
294void DelayReply(bool pump_during_send) {
295 std::vector<Worker*> workers;
296 workers.push_back(new SimpleServer(pump_during_send));
297 workers.push_back(new DelayClient());
298 RunTest(workers);
299}
300
301} // namespace
302
303// Tests that asynchronous replies work
304TEST_F(IPCSyncChannelTest, DelayReply) {
305 DelayReply(false);
306 DelayReply(true);
307}
308
309//-----------------------------------------------------------------------------
310
311namespace {
312
313class NoHangServer : public Worker {
314 public:
315 explicit NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send)
316 : Worker(Channel::MODE_SERVER, "no_hang_server"),
317 got_first_reply_(got_first_reply),
318 pump_during_send_(pump_during_send) { }
319 void Run() {
320 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
321 got_first_reply_->Signal();
322
323 SendAnswerToLife(pump_during_send_, base::kNoTimeout, false);
324 Done();
325 }
326
327 WaitableEvent* got_first_reply_;
328 bool pump_during_send_;
329};
330
331class NoHangClient : public Worker {
332 public:
333 explicit NoHangClient(WaitableEvent* got_first_reply)
334 : Worker(Channel::MODE_CLIENT, "no_hang_client"),
335 got_first_reply_(got_first_reply) { }
336
337 virtual void OnAnswerDelay(Message* reply_msg) {
338 // Use the DELAY_REPLY macro so that we can force the reply to be sent
339 // before this function returns (when the channel will be reset).
340 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
341 Send(reply_msg);
342 got_first_reply_->Wait();
343 CloseChannel();
344 Done();
345 }
346
347 WaitableEvent* got_first_reply_;
348};
349
350void NoHang(bool pump_during_send) {
351 WaitableEvent got_first_reply(false, false);
352 std::vector<Worker*> workers;
353 workers.push_back(new NoHangServer(&got_first_reply, pump_during_send));
354 workers.push_back(new NoHangClient(&got_first_reply));
355 RunTest(workers);
356}
357
358} // namespace
359
360// Tests that caller doesn't hang if receiver dies
361TEST_F(IPCSyncChannelTest, NoHang) {
362 NoHang(false);
363 NoHang(true);
364}
365
366//-----------------------------------------------------------------------------
367
368namespace {
369
370class UnblockServer : public Worker {
371 public:
372 UnblockServer(bool pump_during_send)
373 : Worker(Channel::MODE_SERVER, "unblock_server"),
374 pump_during_send_(pump_during_send) { }
375 void Run() {
376 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
377 Done();
378 }
379
380 void OnDouble(int in, int* out) {
381 *out = in * 2;
382 }
383
384 bool pump_during_send_;
385};
386
387class UnblockClient : public Worker {
388 public:
389 UnblockClient(bool pump_during_send)
390 : Worker(Channel::MODE_CLIENT, "unblock_client"),
391 pump_during_send_(pump_during_send) { }
392
393 void OnAnswer(int* answer) {
394 SendDouble(pump_during_send_, true);
395 *answer = 42;
396 Done();
397 }
398
399 bool pump_during_send_;
400};
401
402void Unblock(bool server_pump, bool client_pump) {
403 std::vector<Worker*> workers;
404 workers.push_back(new UnblockServer(server_pump));
405 workers.push_back(new UnblockClient(client_pump));
406 RunTest(workers);
407}
408
409} // namespace
410
411// Tests that the caller unblocks to answer a sync message from the receiver.
412TEST_F(IPCSyncChannelTest, Unblock) {
413 Unblock(false, false);
414 Unblock(false, true);
415 Unblock(true, false);
416 Unblock(true, true);
417}
418
419//-----------------------------------------------------------------------------
420
421namespace {
422
423class RecursiveServer : public Worker {
424 public:
425 explicit RecursiveServer(
426 bool expected_send_result, bool pump_first, bool pump_second)
427 : Worker(Channel::MODE_SERVER, "recursive_server"),
428 expected_send_result_(expected_send_result),
429 pump_first_(pump_first), pump_second_(pump_second) { }
430 void Run() {
431 SendDouble(pump_first_, expected_send_result_);
432 Done();
433 }
434
435 void OnDouble(int in, int* out) {
436 *out = in * 2;
437 SendAnswerToLife(pump_second_, base::kNoTimeout, expected_send_result_);
438 }
439
440 bool expected_send_result_, pump_first_, pump_second_;
441};
442
443class RecursiveClient : public Worker {
444 public:
445 explicit RecursiveClient(bool pump_during_send, bool close_channel)
446 : Worker(Channel::MODE_CLIENT, "recursive_client"),
447 pump_during_send_(pump_during_send), close_channel_(close_channel) { }
448
449 void OnDoubleDelay(int in, Message* reply_msg) {
450 SendDouble(pump_during_send_, !close_channel_);
451 if (close_channel_) {
452 delete reply_msg;
453 } else {
454 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
455 Send(reply_msg);
456 }
457 Done();
458 }
459
460 void OnAnswerDelay(Message* reply_msg) {
461 if (close_channel_) {
462 delete reply_msg;
463 CloseChannel();
464 } else {
465 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
466 Send(reply_msg);
467 }
468 }
469
470 bool pump_during_send_, close_channel_;
471};
472
473void Recursive(
474 bool server_pump_first, bool server_pump_second, bool client_pump) {
475 std::vector<Worker*> workers;
476 workers.push_back(
477 new RecursiveServer(true, server_pump_first, server_pump_second));
478 workers.push_back(new RecursiveClient(client_pump, false));
479 RunTest(workers);
480}
481
482} // namespace
483
484// Tests a server calling Send while another Send is pending.
485TEST_F(IPCSyncChannelTest, Recursive) {
486 Recursive(false, false, false);
487 Recursive(false, false, true);
488 Recursive(false, true, false);
489 Recursive(false, true, true);
490 Recursive(true, false, false);
491 Recursive(true, false, true);
492 Recursive(true, true, false);
493 Recursive(true, true, true);
494}
495
496//-----------------------------------------------------------------------------
497
498namespace {
499
500void RecursiveNoHang(
501 bool server_pump_first, bool server_pump_second, bool client_pump) {
502 std::vector<Worker*> workers;
503 workers.push_back(
504 new RecursiveServer(false, server_pump_first, server_pump_second));
505 workers.push_back(new RecursiveClient(client_pump, true));
506 RunTest(workers);
507}
508
509} // namespace
510
511// Tests that if a caller makes a sync call during an existing sync call and
512// the receiver dies, neither of the Send() calls hang.
513TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
514 RecursiveNoHang(false, false, false);
515 RecursiveNoHang(false, false, true);
516 RecursiveNoHang(false, true, false);
517 RecursiveNoHang(false, true, true);
518 RecursiveNoHang(true, false, false);
519 RecursiveNoHang(true, false, true);
520 RecursiveNoHang(true, true, false);
521 RecursiveNoHang(true, true, true);
522}
523
524//-----------------------------------------------------------------------------
525
526namespace {
527
528class MultipleServer1 : public Worker {
529 public:
530 MultipleServer1(bool pump_during_send)
531 : Worker("test_channel1", Channel::MODE_SERVER),
532 pump_during_send_(pump_during_send) { }
533
534 void Run() {
535 SendDouble(pump_during_send_, true);
536 Done();
537 }
538
539 bool pump_during_send_;
540};
541
542class MultipleClient1 : public Worker {
543 public:
544 MultipleClient1(WaitableEvent* client1_msg_received,
545 WaitableEvent* client1_can_reply) :
546 Worker("test_channel1", Channel::MODE_CLIENT),
547 client1_msg_received_(client1_msg_received),
548 client1_can_reply_(client1_can_reply) { }
549
550 void OnDouble(int in, int* out) {
551 client1_msg_received_->Signal();
552 *out = in * 2;
553 client1_can_reply_->Wait();
554 Done();
555 }
556
557 private:
558 WaitableEvent *client1_msg_received_, *client1_can_reply_;
559};
560
561class MultipleServer2 : public Worker {
562 public:
563 MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { }
564
565 void OnAnswer(int* result) {
566 *result = 42;
567 Done();
568 }
569};
570
571class MultipleClient2 : public Worker {
572 public:
573 MultipleClient2(
574 WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply,
575 bool pump_during_send)
576 : Worker("test_channel2", Channel::MODE_CLIENT),
577 client1_msg_received_(client1_msg_received),
578 client1_can_reply_(client1_can_reply),
579 pump_during_send_(pump_during_send) { }
580
581 void Run() {
582 client1_msg_received_->Wait();
583 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
584 client1_can_reply_->Signal();
585 Done();
586 }
587
588 private:
589 WaitableEvent *client1_msg_received_, *client1_can_reply_;
590 bool pump_during_send_;
591};
592
593void Multiple(bool server_pump, bool client_pump) {
594 std::vector<Worker*> workers;
595
596 // A shared worker thread so that server1 and server2 run on one thread.
597 base::Thread worker_thread("Multiple");
598 ASSERT_TRUE(worker_thread.Start());
599
600 // Server1 sends a sync msg to client1, which blocks the reply until
601 // server2 (which runs on the same worker thread as server1) responds
602 // to a sync msg from client2.
603 WaitableEvent client1_msg_received(false, false);
604 WaitableEvent client1_can_reply(false, false);
605
606 Worker* worker;
607
608 worker = new MultipleServer2();
609 worker->OverrideThread(&worker_thread);
610 workers.push_back(worker);
611
612 worker = new MultipleClient2(
613 &client1_msg_received, &client1_can_reply, client_pump);
614 workers.push_back(worker);
615
616 worker = new MultipleServer1(server_pump);
617 worker->OverrideThread(&worker_thread);
618 workers.push_back(worker);
619
620 worker = new MultipleClient1(
621 &client1_msg_received, &client1_can_reply);
622 workers.push_back(worker);
623
624 RunTest(workers);
625}
626
627} // namespace
628
629// Tests that multiple SyncObjects on the same listener thread can unblock each
630// other.
631TEST_F(IPCSyncChannelTest, Multiple) {
632 Multiple(false, false);
633 Multiple(false, true);
634 Multiple(true, false);
635 Multiple(true, true);
636}
637
638//-----------------------------------------------------------------------------
639
640namespace {
641
642class QueuedReplyServer1 : public Worker {
643 public:
644 QueuedReplyServer1(bool pump_during_send)
645 : Worker("test_channel1", Channel::MODE_SERVER),
646 pump_during_send_(pump_during_send) { }
647 void Run() {
648 SendDouble(pump_during_send_, true);
649 Done();
650 }
651
652 bool pump_during_send_;
653};
654
655class QueuedReplyClient1 : public Worker {
656 public:
657 QueuedReplyClient1(WaitableEvent* client1_msg_received,
658 WaitableEvent* server2_can_reply) :
659 Worker("test_channel1", Channel::MODE_CLIENT),
660 client1_msg_received_(client1_msg_received),
661 server2_can_reply_(server2_can_reply) { }
662
663 void OnDouble(int in, int* out) {
664 client1_msg_received_->Signal();
665 *out = in * 2;
666 server2_can_reply_->Wait();
667 Done();
668 }
669
670 private:
671 WaitableEvent *client1_msg_received_, *server2_can_reply_;
672};
673
674class QueuedReplyServer2 : public Worker {
675 public:
676 explicit QueuedReplyServer2(WaitableEvent* server2_can_reply) :
677 Worker("test_channel2", Channel::MODE_SERVER),
678 server2_can_reply_(server2_can_reply) { }
679
680 void OnAnswer(int* result) {
681 server2_can_reply_->Signal();
682
683 // give client1's reply time to reach the server listener thread
684 PlatformThread::Sleep(200);
685
686 *result = 42;
687 Done();
688 }
689
690 WaitableEvent *server2_can_reply_;
691};
692
693class QueuedReplyClient2 : public Worker {
694 public:
695 explicit QueuedReplyClient2(
696 WaitableEvent* client1_msg_received, bool pump_during_send)
697 : Worker("test_channel2", Channel::MODE_CLIENT),
698 client1_msg_received_(client1_msg_received),
699 pump_during_send_(pump_during_send){ }
700
701 void Run() {
702 client1_msg_received_->Wait();
703 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
704 Done();
705 }
706
707 WaitableEvent *client1_msg_received_;
708 bool pump_during_send_;
709};
710
711void QueuedReply(bool server_pump, bool client_pump) {
712 std::vector<Worker*> workers;
713
714 // A shared worker thread so that server1 and server2 run on one thread.
715 base::Thread worker_thread("QueuedReply");
716 ASSERT_TRUE(worker_thread.Start());
717
718 WaitableEvent client1_msg_received(false, false);
719 WaitableEvent server2_can_reply(false, false);
720
721 Worker* worker;
722
723 worker = new QueuedReplyServer2(&server2_can_reply);
724 worker->OverrideThread(&worker_thread);
725 workers.push_back(worker);
726
727 worker = new QueuedReplyClient2(&client1_msg_received, client_pump);
728 workers.push_back(worker);
729
730 worker = new QueuedReplyServer1(server_pump);
731 worker->OverrideThread(&worker_thread);
732 workers.push_back(worker);
733
734 worker = new QueuedReplyClient1(
735 &client1_msg_received, &server2_can_reply);
736 workers.push_back(worker);
737
738 RunTest(workers);
739}
740
741} // namespace
742
743// While a blocking send is in progress, the listener thread might answer other
744// synchronous messages. This tests that if during the response to another
745// message the reply to the original messages comes, it is queued up correctly
746// and the original Send is unblocked later.
747TEST_F(IPCSyncChannelTest, QueuedReply) {
748 QueuedReply(false, false);
749 QueuedReply(false, true);
750 QueuedReply(true, false);
751 QueuedReply(true, true);
752}
753
754//-----------------------------------------------------------------------------
755
756namespace {
757
758class BadServer : public Worker {
759 public:
760 BadServer(bool pump_during_send)
761 : Worker(Channel::MODE_SERVER, "simpler_server"),
762 pump_during_send_(pump_during_send) { }
763 void Run() {
764 int answer = 0;
765
766 SyncMessage* msg = new SyncMessage(
767 MSG_ROUTING_CONTROL, SyncChannelTestMsg_Double::ID,
768 Message::PRIORITY_NORMAL, NULL);
769 if (pump_during_send_)
770 msg->EnableMessagePumping();
771
772 // Temporarily set the minimum logging very high so that the assertion
773 // in ipc_message_utils doesn't fire.
774 int log_level = logging::GetMinLogLevel();
775 logging::SetMinLogLevel(kint32max);
776 bool result = Send(msg);
777 logging::SetMinLogLevel(log_level);
778 DCHECK(!result);
779
780 // Need to send another message to get the client to call Done().
781 result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
782 DCHECK(result);
783 DCHECK(answer == 42);
784
785 Done();
786 }
787
788 bool pump_during_send_;
789};
790
791void BadMessage(bool pump_during_send) {
792 std::vector<Worker*> workers;
793 workers.push_back(new BadServer(pump_during_send));
794 workers.push_back(new SimpleClient());
795 RunTest(workers);
796}
797
798} // namespace
799
800// Tests that if a message is not serialized correctly, the Send() will fail.
801TEST_F(IPCSyncChannelTest, BadMessage) {
802 BadMessage(false);
803 BadMessage(true);
804}
805
806//-----------------------------------------------------------------------------
807
808namespace {
809
810class ChattyClient : public Worker {
811 public:
812 ChattyClient() :
813 Worker(Channel::MODE_CLIENT, "chatty_client") { }
814
815 void OnAnswer(int* answer) {
816 // The PostMessage limit is 10k. Send 20% more than that.
817 const int kMessageLimit = 10000;
818 const int kMessagesToSend = kMessageLimit * 120 / 100;
819 for (int i = 0; i < kMessagesToSend; ++i) {
820 if (!SendDouble(false, true))
821 break;
822 }
823 *answer = 42;
824 Done();
825 }
826};
827
828void ChattyServer(bool pump_during_send) {
829 std::vector<Worker*> workers;
830 workers.push_back(new UnblockServer(pump_during_send));
831 workers.push_back(new ChattyClient());
832 RunTest(workers);
833}
834
835} // namespace
836
837// Tests http://b/1093251 - that sending lots of sync messages while
838// the receiver is waiting for a sync reply does not overflow the PostMessage
839// queue.
840TEST_F(IPCSyncChannelTest, ChattyServer) {
841 ChattyServer(false);
842 ChattyServer(true);
843}
844
845//------------------------------------------------------------------------------
846
847namespace {
848
849class TimeoutServer : public Worker {
850 public:
851 TimeoutServer(int timeout_ms,
852 std::vector<bool> timeout_seq,
853 bool pump_during_send)
854 : Worker(Channel::MODE_SERVER, "timeout_server"),
855 timeout_ms_(timeout_ms),
856 timeout_seq_(timeout_seq),
857 pump_during_send_(pump_during_send) {
858 }
859
860 void Run() {
861 for (std::vector<bool>::const_iterator iter = timeout_seq_.begin();
862 iter != timeout_seq_.end(); ++iter) {
863 SendAnswerToLife(pump_during_send_, timeout_ms_, !*iter);
864 }
865 Done();
866 }
867
868 private:
869 int timeout_ms_;
870 std::vector<bool> timeout_seq_;
871 bool pump_during_send_;
872};
873
874class UnresponsiveClient : public Worker {
875 public:
876 UnresponsiveClient(std::vector<bool> timeout_seq)
877 : Worker(Channel::MODE_CLIENT, "unresponsive_client"),
878 timeout_seq_(timeout_seq) {
879 }
880
881 void OnAnswerDelay(Message* reply_msg) {
882 DCHECK(!timeout_seq_.empty());
883 if (!timeout_seq_[0]) {
884 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
885 Send(reply_msg);
886 } else {
887 // Don't reply.
888 delete reply_msg;
889 }
890 timeout_seq_.erase(timeout_seq_.begin());
891 if (timeout_seq_.empty())
892 Done();
893 }
894
895 private:
896 // Whether we should time-out or respond to the various messages we receive.
897 std::vector<bool> timeout_seq_;
898};
899
900void SendWithTimeoutOK(bool pump_during_send) {
901 std::vector<Worker*> workers;
902 std::vector<bool> timeout_seq;
903 timeout_seq.push_back(false);
904 timeout_seq.push_back(false);
905 timeout_seq.push_back(false);
906 workers.push_back(new TimeoutServer(5000, timeout_seq, pump_during_send));
907 workers.push_back(new SimpleClient());
908 RunTest(workers);
909}
910
911void SendWithTimeoutTimeout(bool pump_during_send) {
912 std::vector<Worker*> workers;
913 std::vector<bool> timeout_seq;
914 timeout_seq.push_back(true);
915 timeout_seq.push_back(false);
916 timeout_seq.push_back(false);
917 workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
918 workers.push_back(new UnresponsiveClient(timeout_seq));
919 RunTest(workers);
920}
921
922void SendWithTimeoutMixedOKAndTimeout(bool pump_during_send) {
923 std::vector<Worker*> workers;
924 std::vector<bool> timeout_seq;
925 timeout_seq.push_back(true);
926 timeout_seq.push_back(false);
927 timeout_seq.push_back(false);
928 timeout_seq.push_back(true);
929 timeout_seq.push_back(false);
930 workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
931 workers.push_back(new UnresponsiveClient(timeout_seq));
932 RunTest(workers);
933}
934
935} // namespace
936
937// Tests that SendWithTimeout does not time-out if the response comes back fast
938// enough.
939TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) {
940 SendWithTimeoutOK(false);
941 SendWithTimeoutOK(true);
942}
943
944// Tests that SendWithTimeout does time-out.
945TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) {
946 SendWithTimeoutTimeout(false);
947 SendWithTimeoutTimeout(true);
948}
949
950// Sends some message that time-out and some that succeed.
951TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
952 SendWithTimeoutMixedOKAndTimeout(false);
953 SendWithTimeoutMixedOKAndTimeout(true);
954}
955
956//------------------------------------------------------------------------------
957
958namespace {
959
960class NestedTask : public Task {
961 public:
962 NestedTask(Worker* server) : server_(server) { }
963 void Run() {
964 // Sleep a bit so that we wake up after the reply has been received.
965 PlatformThread::Sleep(250);
966 server_->SendAnswerToLife(true, base::kNoTimeout, true);
967 }
968
969 Worker* server_;
970};
971
972static bool timeout_occured = false;
973
974class TimeoutTask : public Task {
975 public:
976 void Run() {
977 timeout_occured = true;
978 }
979};
980
981class DoneEventRaceServer : public Worker {
982 public:
983 DoneEventRaceServer()
984 : Worker(Channel::MODE_SERVER, "done_event_race_server") { }
985
986 void Run() {
987 MessageLoop::current()->PostTask(FROM_HERE, new NestedTask(this));
988 MessageLoop::current()->PostDelayedTask(FROM_HERE, new TimeoutTask(), 9000);
989 // Even though we have a timeout on the Send, it will succeed since for this
990 // bug, the reply message comes back and is deserialized, however the done
991 // event wasn't set. So we indirectly use the timeout task to notice if a
992 // timeout occurred.
993 SendAnswerToLife(true, 10000, true);
994 DCHECK(!timeout_occured);
995 Done();
996 }
997};
998
999} // namespace
1000
1001// Tests http://b/1474092 - that if after the done_event is set but before
1002// OnObjectSignaled is called another message is sent out, then after its
1003// reply comes back OnObjectSignaled will be called for the first message.
1004TEST_F(IPCSyncChannelTest, DoneEventRace) {
1005 std::vector<Worker*> workers;
1006 workers.push_back(new DoneEventRaceServer());
1007 workers.push_back(new SimpleClient());
1008 RunTest(workers);
1009}