shill: Add "IO Ready" handler

For output sockets and for non-traditional "input" events we
need to trigger when an fd is ready, but we don't want GLib
to be in charge of the IO itself.  For this, we create a
"ready handler" which triggers a callback which is passed the
file descriptor which is ready.

BUG=chromium-os:21664
TEST=New unit tests

Change-Id: I4960658e6d0940d8db65ae6f77ff09679b3ef96a
Reviewed-on: http://gerrit.chromium.org/gerrit/10327
Reviewed-by: Darin Petkov <petkov@chromium.org>
Reviewed-by: mukesh agrawal <quiche@chromium.org>
Tested-by: Paul Stewart <pstew@chromium.org>
diff --git a/Makefile b/Makefile
index d784473..927e4d0 100644
--- a/Makefile
+++ b/Makefile
@@ -95,7 +95,8 @@
 	ethernet_service.o \
 	event_dispatcher.o \
 	glib.o \
-	glib_io_handler.o \
+	glib_io_ready_handler.o \
+	glib_io_input_handler.o \
 	ip_address.o \
 	ipconfig.o \
 	ipconfig_dbus_adaptor.o \
diff --git a/event_dispatcher.cc b/event_dispatcher.cc
index 1fc62fb..6c76c1b 100644
--- a/event_dispatcher.cc
+++ b/event_dispatcher.cc
@@ -10,7 +10,8 @@
 #include <base/callback_old.h>
 #include <base/message_loop_proxy.h>
 
-#include "shill/glib_io_handler.h"
+#include "shill/glib_io_input_handler.h"
+#include "shill/glib_io_ready_handler.h"
 
 namespace shill {
 
@@ -40,7 +41,18 @@
 IOHandler *EventDispatcher::CreateInputHandler(
     int fd,
     Callback1<InputData *>::Type *callback) {
-  return new GlibIOInputHandler(fd, callback);
+  IOHandler *handler = new GlibIOInputHandler(fd, callback);
+  handler->Start();
+  return handler;
+}
+
+IOHandler *EventDispatcher::CreateReadyHandler(
+    int fd,
+    IOHandler::ReadyMode mode,
+    Callback1<int>::Type *callback) {
+  IOHandler *handler = new GlibIOReadyHandler(fd, mode, callback);
+  handler->Start();
+  return handler;
 }
 
 }  // namespace shill
diff --git a/event_dispatcher.h b/event_dispatcher.h
index d540fbd..5ca886e 100644
--- a/event_dispatcher.h
+++ b/event_dispatcher.h
@@ -11,6 +11,8 @@
 #include <base/memory/scoped_ptr.h>
 #include <base/message_loop.h>
 
+#include "shill/io_handler.h"
+
 namespace base {
 class MessageLoopProxy;
 }  // namespace base
@@ -19,9 +21,6 @@
 
 namespace shill {
 
-class InputData;
-class IOHandler;
-
 // This is the main event dispatcher.  It contains a central instance, and is
 // the entity responsible for dispatching events out of all queues to their
 // listeners during the idle loop.
@@ -44,6 +43,11 @@
       int fd,
       Callback1<InputData *>::Type *callback);
 
+  virtual IOHandler *CreateReadyHandler(
+      int fd,
+      IOHandler::ReadyMode mode,
+      Callback1<int>::Type *callback);
+
  private:
   scoped_ptr<MessageLoop> dont_use_directly_;
   scoped_refptr<base::MessageLoopProxy> message_loop_proxy_;
diff --git a/glib_io_handler.cc b/glib_io_input_handler.cc
similarity index 60%
rename from glib_io_handler.cc
rename to glib_io_input_handler.cc
index 51dc54a..1f45652 100644
--- a/glib_io_handler.cc
+++ b/glib_io_input_handler.cc
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include "shill/glib_io_handler.h"
+#include "shill/glib_io_input_handler.h"
 
 #include <stdio.h>
 #include <glib.h>
@@ -11,29 +11,9 @@
 
 static gboolean DispatchIOHandler(GIOChannel *chan,
                                   GIOCondition cond,
-                                  gpointer data);
-
-GlibIOInputHandler::GlibIOInputHandler(int fd,
-                                       Callback1<InputData*>::Type *callback)
-    : callback_(callback) {
-  channel_ = g_io_channel_unix_new(fd);
-  g_io_channel_set_close_on_unref(channel_, TRUE);
-  source_id_ = g_io_add_watch(channel_,
-                              static_cast<GIOCondition>(
-                                  G_IO_IN | G_IO_NVAL | G_IO_HUP | G_IO_ERR),
-                              DispatchIOHandler, this);
-}
-
-GlibIOInputHandler::~GlibIOInputHandler() {
-  g_source_remove(source_id_);
-  g_io_channel_shutdown(channel_, TRUE, NULL);
-  g_io_channel_unref(channel_);
-}
-
-static gboolean DispatchIOHandler(GIOChannel *chan,
-                                  GIOCondition cond,
                                   gpointer data) {
-  GlibIOInputHandler *handler = static_cast<GlibIOInputHandler *>(data);
+  Callback1<InputData*>::Type *callback =
+      static_cast<Callback1<InputData*>::Type *>(data);
   unsigned char buf[4096];
   gsize len;
   GIOError err;
@@ -50,9 +30,39 @@
   }
 
   InputData input_data(buf, len);
-  handler->callback()->Run(&input_data);
+  callback->Run(&input_data);
 
   return TRUE;
 }
 
+GlibIOInputHandler::GlibIOInputHandler(int fd,
+                                       Callback1<InputData*>::Type *callback)
+    : channel_(g_io_channel_unix_new(fd)),
+      callback_(callback),
+      source_id_(G_MAXUINT) {
+  g_io_channel_set_close_on_unref(channel_, TRUE);
+}
+
+GlibIOInputHandler::~GlibIOInputHandler() {
+  g_source_remove(source_id_);
+  g_io_channel_shutdown(channel_, TRUE, NULL);
+  g_io_channel_unref(channel_);
+}
+
+void GlibIOInputHandler::Start() {
+  if (source_id_ == G_MAXUINT) {
+    source_id_ = g_io_add_watch(channel_,
+                                static_cast<GIOCondition>(
+                                    G_IO_IN | G_IO_NVAL | G_IO_HUP | G_IO_ERR),
+                                DispatchIOHandler, callback_);
+  }
+}
+
+void GlibIOInputHandler::Stop() {
+  if (source_id_ != G_MAXUINT) {
+    g_source_remove(source_id_);
+    source_id_ = G_MAXUINT;
+  }
+}
+
 }  // namespace shill
diff --git a/glib_io_handler.h b/glib_io_input_handler.h
similarity index 77%
rename from glib_io_handler.h
rename to glib_io_input_handler.h
index a359745..d694586 100644
--- a/glib_io_handler.h
+++ b/glib_io_input_handler.h
@@ -2,8 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#ifndef SHILL_GLIB_IO_HANDLER_
-#define SHILL_GLIB_IO_HANDLER_
+#ifndef SHILL_GLIB_IO_INPUT_HANDLER_
+#define SHILL_GLIB_IO_INPUT_HANDLER_
 
 #include <stdio.h>
 #include <glib.h>
@@ -18,16 +18,17 @@
  public:
   GlibIOInputHandler(int fd, Callback1<InputData*>::Type *callback);
   ~GlibIOInputHandler();
-  Callback1<InputData *>::Type *callback() { return callback_; }
+
+  virtual void Start();
+  virtual void Stop();
 
  private:
   GIOChannel *channel_;
   Callback1<InputData *>::Type *callback_;
   guint source_id_;
-
 };
 
 
 }  // namespace shill
 
-#endif  // SHILL_GLIB_IO_HANDLER_
+#endif  // SHILL_GLIB_IO_INPUT_HANDLER_
diff --git a/glib_io_ready_handler.cc b/glib_io_ready_handler.cc
new file mode 100644
index 0000000..3109fc2
--- /dev/null
+++ b/glib_io_ready_handler.cc
@@ -0,0 +1,67 @@
+// Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "shill/glib_io_ready_handler.h"
+
+#include <base/logging.h>
+#include <glib.h>
+#include <stdio.h>
+#include <sys/socket.h>
+
+namespace shill {
+
+static gboolean DispatchIOHandler(GIOChannel *chan,
+                                  GIOCondition cond,
+                                  gpointer data) {
+  Callback1<int>::Type *callback = static_cast<Callback1<int>::Type *>(data);
+
+  if (cond & (G_IO_NVAL | G_IO_HUP | G_IO_ERR))
+    return FALSE;
+
+  callback->Run(g_io_channel_unix_get_fd(chan));
+
+  return TRUE;
+}
+
+GlibIOReadyHandler::GlibIOReadyHandler(int fd,
+                                       IOHandler::ReadyMode mode,
+                                       Callback1<int>::Type *callback)
+    : channel_(NULL),
+      callback_(callback),
+      source_id_(G_MAXUINT) {
+  if (mode == kModeInput) {
+    condition_ = static_cast<GIOCondition>(
+        G_IO_IN | G_IO_NVAL | G_IO_HUP | G_IO_ERR);
+  } else if (mode == kModeOutput) {
+    condition_ = static_cast<GIOCondition>(
+        G_IO_OUT | G_IO_NVAL | G_IO_HUP | G_IO_ERR);
+  } else {
+    NOTREACHED() << "Unknown IO ready mode: " << mode;
+  }
+
+  channel_ = g_io_channel_unix_new(fd);
+  g_io_channel_set_close_on_unref(channel_, FALSE);
+}
+
+GlibIOReadyHandler::~GlibIOReadyHandler() {
+  g_source_remove(source_id_);
+  g_io_channel_shutdown(channel_, TRUE, NULL);
+  g_io_channel_unref(channel_);
+}
+
+void GlibIOReadyHandler::Start() {
+  if (source_id_ == G_MAXUINT) {
+    source_id_ = g_io_add_watch(channel_, condition_, DispatchIOHandler,
+                                callback_);
+  }
+}
+
+void GlibIOReadyHandler::Stop() {
+  if (source_id_ != G_MAXUINT) {
+    g_source_remove(source_id_);
+    source_id_ = G_MAXUINT;
+  }
+}
+
+}  // namespace shill
diff --git a/glib_io_ready_handler.h b/glib_io_ready_handler.h
new file mode 100644
index 0000000..919c8c3
--- /dev/null
+++ b/glib_io_ready_handler.h
@@ -0,0 +1,41 @@
+// Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SHILL_GLIB_IO_READY_HANDLER_
+#define SHILL_GLIB_IO_READY_HANDLER_
+
+#include <stdio.h>
+#include <glib.h>
+
+#include <base/callback_old.h>
+
+#include "shill/io_handler.h"
+
+namespace shill {
+
+// This handler is different from the GlibIOInputHandler
+// in that we don't read/write from the file handle and
+// leave that to the caller.  This is useful in accept()ing
+// sockets and effort to working with peripheral libraries.
+class GlibIOReadyHandler : public IOHandler {
+ public:
+    GlibIOReadyHandler(int fd,
+                       IOHandler::ReadyMode mode,
+                       Callback1<int>::Type *callback);
+  ~GlibIOReadyHandler();
+
+  virtual void Start();
+  virtual void Stop();
+
+ private:
+  GIOChannel *channel_;
+  GIOCondition condition_;
+  Callback1<int>::Type *callback_;
+  guint source_id_;
+};
+
+
+}  // namespace shill
+
+#endif  // SHILL_GLIB_IO_READY_HANDLER_
diff --git a/io_handler.h b/io_handler.h
index 74c34b9..97e0089 100644
--- a/io_handler.h
+++ b/io_handler.h
@@ -17,8 +17,16 @@
 
 class IOHandler {
  public:
+  enum ReadyMode {
+    kModeInput,
+    kModeOutput
+  };
+
   IOHandler() {}
   virtual ~IOHandler() {}
+
+  virtual void Start() {}
+  virtual void Stop() {}
 };
 
 }  // namespace shill
diff --git a/shill_unittest.cc b/shill_unittest.cc
index 81f1cca..564a957 100644
--- a/shill_unittest.cc
+++ b/shill_unittest.cc
@@ -32,20 +32,24 @@
         triggered_(false),
         callback_count_(0),
         got_data_(false),
+        got_ready_(false),
         data_callback_(NULL),
         input_handler_(NULL),
         tester_factory_(this) {
   }
 
+  void ScheduleFailSafe() {
+    // Set up a failsafe, so the test still exits even if something goes
+    // wrong.  The Factory owns the RunnableMethod, but we get a pointer to it.
+    failsafe_ = tester_factory_.NewRunnableMethod(
+        &MockEventDispatchTester::StopDispatcher);
+    dispatcher_->PostDelayedTask(failsafe_, 100);
+  }
+
   void ScheduleTimedTasks() {
     dispatcher_->PostDelayedTask(
         tester_factory_.NewRunnableMethod(&MockEventDispatchTester::Trigger),
         10);
-    // also set up a failsafe, so the test still exits even if something goes
-    // wrong.  The Factory owns the RunnableMethod, but we get a pointer to it.
-    failsafe_ = tester_factory_.NewRunnableMethod(
-        &MockEventDispatchTester::QuitRegardless);
-    dispatcher_->PostDelayedTask(failsafe_, 100);
   }
 
   void RescheduleUnlessTriggered() {
@@ -56,11 +60,11 @@
               &MockEventDispatchTester::RescheduleUnlessTriggered));
     } else {
       failsafe_->Cancel();
-      QuitRegardless();
+      StopDispatcher();
     }
   }
 
-  void QuitRegardless() {
+  void StopDispatcher() {
     dispatcher_->PostTask(new MessageLoop::QuitTask);
   }
 
@@ -76,16 +80,16 @@
                                     inputData->len, inputData->buf);
     got_data_ = true;
     IOComplete(inputData->len);
-    QuitRegardless();
+    StopDispatcher();
   }
 
   bool GetData() { return got_data_; }
 
   void ListenIO(int fd) {
-    data_callback_.reset(NewCallback(this,
-                                     &MockEventDispatchTester::HandleData));
-    input_handler_.reset(dispatcher_->CreateInputHandler(fd,
-                                                         data_callback_.get()));
+    data_callback_.reset(
+        NewCallback(this, &MockEventDispatchTester::HandleData));
+    input_handler_.reset(
+        dispatcher_->CreateInputHandler(fd, data_callback_.get()));
   }
 
   void StopListenIO() {
@@ -93,6 +97,52 @@
     input_handler_.reset(NULL);
   }
 
+  void HandleReady(int fd) {
+    // Stop event handling after we receive in input-ready event.  We should
+    // no longer be called until events are re-enabled.
+    input_handler_->Stop();
+
+    if (got_ready_) {
+      // If we're still getting events after we have stopped them, something
+      // is really wrong, and we cannot just depend on ASSERT_FALSE() to get
+      // us out of it.  Make sure the dispatcher is also stopped, or else we
+      // could end up never exiting.
+      StopDispatcher();
+      ASSERT_FALSE(got_ready_) << "failed to stop Input Ready events";
+    }
+    got_ready_ = true;
+
+    LOG(INFO) << "MockEventDispatchTester handling ready for fd " << fd;
+    IOComplete(callback_count_);
+
+    if (callback_count_) {
+      StopDispatcher();
+    } else {
+      // Restart Ready events after 10 millisecond delay.
+      callback_count_++;
+      dispatcher_->PostDelayedTask(tester_factory_.NewRunnableMethod(
+          &MockEventDispatchTester::RestartReady), 10);
+    }
+  }
+
+  void RestartReady() {
+    got_ready_ = false;
+    input_handler_->Start();
+  }
+
+  void ListenReady(int fd) {
+    ready_callback_.reset(
+        NewCallback(this, &MockEventDispatchTester::HandleReady));
+    input_handler_.reset(
+        dispatcher_->CreateReadyHandler(fd, IOHandler::kModeInput,
+                                        ready_callback_.get()));
+  }
+
+  void StopListenReady() {
+    got_ready_ = false;
+    input_handler_.reset(NULL);
+  }
+
   MOCK_METHOD1(CallbackComplete, void(int));
   MOCK_METHOD1(IOComplete, void(int));
 
@@ -101,7 +151,9 @@
   bool triggered_;
   int callback_count_;
   bool got_data_;
+  bool got_ready_;
   scoped_ptr<Callback1<InputData*>::Type> data_callback_;
+  scoped_ptr<Callback1<int>::Type> ready_callback_;
   scoped_ptr<IOHandler> input_handler_;
   ScopedRunnableMethodFactory<MockEventDispatchTester> tester_factory_;
   CancelableTask* failsafe_;
@@ -121,6 +173,7 @@
     // Tests initialization done by the daemon's constructor
     ASSERT_NE(reinterpret_cast<Config*>(NULL), daemon_.config_);
     ASSERT_NE(reinterpret_cast<ControlInterface*>(NULL), daemon_.control_);
+    dispatcher_test_.ScheduleFailSafe();
   }
  protected:
   TestConfig config_;
@@ -132,12 +185,14 @@
 };
 
 
-TEST_F(ShillDaemonTest, EventDispatcher) {
+TEST_F(ShillDaemonTest, EventDispatcherTimer) {
   EXPECT_CALL(dispatcher_test_, CallbackComplete(Gt(0)));
   dispatcher_test_.ScheduleTimedTasks();
   dispatcher_test_.RescheduleUnlessTriggered();
   dispatcher_->DispatchForever();
+}
 
+TEST_F(ShillDaemonTest, EventDispatcherIO) {
   EXPECT_CALL(dispatcher_test_, IOComplete(16));
   int pipefd[2];
   ASSERT_EQ(pipe(pipefd), 0);
@@ -149,4 +204,20 @@
   dispatcher_test_.StopListenIO();
 }
 
+TEST_F(ShillDaemonTest, EventDispatcherReady) {
+  EXPECT_CALL(dispatcher_test_, IOComplete(0))
+      .Times(1);
+  EXPECT_CALL(dispatcher_test_, IOComplete(1))
+      .Times(1);
+
+  int pipefd[2];
+  ASSERT_EQ(pipe(pipefd), 0);
+
+  dispatcher_test_.ListenReady(pipefd[0]);
+  ASSERT_EQ(write(pipefd[1], "This is a test?!", 16), 16);
+
+  dispatcher_->DispatchForever();
+  dispatcher_test_.StopListenReady();
+}
+
 }  // namespace shill