buffet: XmppChannel listens for connectivity changes

ShillClent provides notifications for network changes.
XmppChannel issue XMPP ping on every network change and restart if
failed. Only exception is soon scheduled upcoming reconnect.

BUG=brillo:1139
TEST=register device, disconnect network, wait device is offline in
     GCD dashboard, connect network.
     Dashboard should show device as online in less then 60 seconds.

Change-Id: I73dffc54400777b2325c26fb8aaf259b515174ce
Reviewed-on: https://chromium-review.googlesource.com/281413
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/buffet/base_api_handler_unittest.cc b/buffet/base_api_handler_unittest.cc
index cde0c41..aa780be 100644
--- a/buffet/base_api_handler_unittest.cc
+++ b/buffet/base_api_handler_unittest.cc
@@ -53,7 +53,7 @@
         command_manager_, state_manager_,
         std::unique_ptr<BuffetConfig>{new BuffetConfig{
             std::unique_ptr<StorageInterface>{new MemStorage}}},
-        transport_, true));
+        transport_, true, nullptr));
     handler_.reset(new BaseApiHandler{
         dev_reg_->AsWeakPtr(), state_manager_, command_manager_});
   }
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 5b9e5ee..e27b35d 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -114,12 +114,14 @@
     const std::shared_ptr<StateManager>& state_manager,
     std::unique_ptr<BuffetConfig> config,
     const std::shared_ptr<chromeos::http::Transport>& transport,
-    bool notifications_enabled)
+    bool notifications_enabled,
+    privetd::ShillClient* shill_client)
     : transport_{transport},
       command_manager_{command_manager},
       state_manager_{state_manager},
       config_{std::move(config)},
-      notifications_enabled_{notifications_enabled} {
+      notifications_enabled_{notifications_enabled},
+      shill_client_{shill_client} {
   cloud_backoff_policy_.reset(new chromeos::BackoffEntry::Policy{});
   cloud_backoff_policy_->num_errors_to_ignore = 0;
   cloud_backoff_policy_->initial_delay_ms = 100;
@@ -346,8 +348,8 @@
   }
 
   notification_channel_starting_ = true;
-  primary_notification_channel_.reset(
-      new XmppChannel{config_->robot_account(), access_token_, task_runner});
+  primary_notification_channel_.reset(new XmppChannel{
+      config_->robot_account(), access_token_, task_runner, shill_client_});
   primary_notification_channel_->Start(this);
 }
 
diff --git a/buffet/device_registration_info.h b/buffet/device_registration_info.h
index 860a8fc..ed96db0 100644
--- a/buffet/device_registration_info.h
+++ b/buffet/device_registration_info.h
@@ -37,6 +37,10 @@
 class KeyValueStore;
 }  // namespace chromeos
 
+namespace privetd {
+class ShillClient;
+}
+
 namespace buffet {
 
 class StateManager;
@@ -60,7 +64,8 @@
       const std::shared_ptr<StateManager>& state_manager,
       std::unique_ptr<BuffetConfig> config,
       const std::shared_ptr<chromeos::http::Transport>& transport,
-      bool notifications_enabled);
+      bool notifications_enabled,
+      privetd::ShillClient* shill_client);
 
   ~DeviceRegistrationInfo() override;
 
@@ -300,6 +305,8 @@
   NotificationChannel* current_notification_channel_{nullptr};
   bool notification_channel_starting_{false};
 
+  privetd::ShillClient* shill_client_{nullptr};
+
   // Tracks our current registration status.
   RegistrationStatus registration_status_{RegistrationStatus::kUnconfigured};
 
diff --git a/buffet/device_registration_info_unittest.cc b/buffet/device_registration_info_unittest.cc
index 0bbaeb1..9d70886 100644
--- a/buffet/device_registration_info_unittest.cc
+++ b/buffet/device_registration_info_unittest.cc
@@ -174,9 +174,9 @@
 
     std::unique_ptr<BuffetConfig> config{new BuffetConfig{std::move(storage)}};
     config_ = config.get();
-    dev_reg_ = std::unique_ptr<DeviceRegistrationInfo>(
-        new DeviceRegistrationInfo(command_manager_, state_manager_,
-                                   std::move(config), transport_, true));
+    dev_reg_.reset(new DeviceRegistrationInfo{command_manager_, state_manager_,
+                                              std::move(config), transport_,
+                                              true, nullptr});
 
     ReloadConfig();
   }
diff --git a/buffet/main.cc b/buffet/main.cc
index 128d8e1..e173269 100644
--- a/buffet/main.cc
+++ b/buffet/main.cc
@@ -85,12 +85,12 @@
   options.state_path = base::FilePath{FLAGS_state_path};
   options.test_definitions_path = base::FilePath{FLAGS_test_definitions_path};
   options.xmpp_enabled = FLAGS_enable_xmpp;
+  options.device_whitelist.insert(device_whitelist.begin(),
+                                  device_whitelist.end());
   options.privet.config_path = base::FilePath{FLAGS_config_path};
   options.privet.disable_privet = FLAGS_disable_privet;
   options.privet.disable_security = FLAGS_disable_security;
   options.privet.enable_ping = FLAGS_enable_ping;
-  options.privet.device_whitelist.insert(device_whitelist.begin(),
-                                         device_whitelist.end());
 
   buffet::Daemon daemon{options};
   return daemon.Run();
diff --git a/buffet/manager.cc b/buffet/manager.cc
index 184baa0..eee593f 100644
--- a/buffet/manager.cc
+++ b/buffet/manager.cc
@@ -27,6 +27,7 @@
 #include "buffet/commands/schema_constants.h"
 #include "buffet/privet/constants.h"
 #include "buffet/privet/security_manager.h"
+#include "buffet/privet/shill_client.h"
 #include "buffet/privet/wifi_bootstrap_manager.h"
 #include "buffet/states/state_change_queue.h"
 #include "buffet/states/state_manager.h"
@@ -81,11 +82,14 @@
   transport->SetDefaultTimeout(base::TimeDelta::FromSeconds(
       kRequestTimeoutSeconds));
 
+  shill_client_.reset(new privetd::ShillClient(dbus_object_.GetBus(),
+                                               options.device_whitelist));
+
   // TODO(avakulenko): Figure out security implications of storing
   // device info state data unencrypted.
   device_info_.reset(new DeviceRegistrationInfo(
       command_manager_, state_manager_, std::move(config), transport,
-      options.xmpp_enabled));
+      options.xmpp_enabled, shill_client_.get()));
   device_info_->AddOnRegistrationChangedCallback(base::Bind(
       &Manager::OnRegistrationChanged, weak_ptr_factory_.GetWeakPtr()));
 
@@ -105,8 +109,9 @@
 void Manager::StartPrivet(const privetd::Manager::Options& options,
                           AsyncEventSequencer* sequencer) {
   privet_.reset(new privetd::Manager{});
-  privet_->Start(options, dbus_object_.GetBus(), device_info_.get(),
-                 command_manager_.get(), state_manager_.get(), sequencer);
+  privet_->Start(options, dbus_object_.GetBus(), shill_client_.get(),
+                 device_info_.get(), command_manager_.get(),
+                 state_manager_.get(), sequencer);
 
   if (privet_->GetWifiBootstrapManager()) {
     privet_->GetWifiBootstrapManager()->RegisterStateListener(base::Bind(
diff --git a/buffet/manager.h b/buffet/manager.h
index 4cee9cc..17613c4 100644
--- a/buffet/manager.h
+++ b/buffet/manager.h
@@ -5,8 +5,8 @@
 #ifndef BUFFET_MANAGER_H_
 #define BUFFET_MANAGER_H_
 
-#include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -61,6 +61,7 @@
     base::FilePath state_path;
     base::FilePath test_definitions_path;
     bool xmpp_enabled{true};
+    std::set<std::string> device_whitelist;
     privetd::Manager::Options privet;
   };
 
@@ -137,6 +138,7 @@
   std::shared_ptr<StateManager> state_manager_;
   std::unique_ptr<DeviceRegistrationInfo> device_info_;
   std::unique_ptr<BaseApiHandler> base_api_handler_;
+  std::unique_ptr<privetd::ShillClient> shill_client_;
   std::unique_ptr<privetd::Manager> privet_;
 
   base::WeakPtrFactory<Manager> weak_ptr_factory_{this};
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index c651688..fbe2f95 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -15,6 +15,7 @@
 #include "buffet/notification/notification_delegate.h"
 #include "buffet/notification/notification_parser.h"
 #include "buffet/notification/xml_node.h"
+#include "buffet/privet/shill_client.h"
 #include "buffet/utils.h"
 
 namespace buffet {
@@ -75,21 +76,34 @@
 
 const char kDefaultXmppHost[] = "talk.google.com";
 const uint16_t kDefaultXmppPort = 5222;
-const uint64_t kPingIntervalSeconds = 60;  // 1 minute.
+
+// Used for keeping connection alive.
+const int kRegularPingIntervalSeconds = 60;
+const int kRegularPingTimeoutSeconds = 30;
+
+// Used for diagnostic when connectivity changed.
+const int kAgressivePingIntervalSeconds = 5;
+const int kAgressivePingTimeoutSeconds = 10;
+
+const int kConnectingTimeoutAfterNetChangeSeconds = 30;
 
 }  // namespace
 
 XmppChannel::XmppChannel(
     const std::string& account,
     const std::string& access_token,
-    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
+    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+    privetd::ShillClient* shill)
     : account_{account},
       access_token_{access_token},
       backoff_entry_{&kDefaultBackoffPolicy},
       task_runner_{task_runner},
       iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
   read_socket_data_.resize(4096);
-  ping_timer_.SetTaskRunner(task_runner);
+  if (shill) {
+    shill->RegisterConnectivityListener(base::Bind(
+        &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
+  }
 }
 
 void XmppChannel::OnMessageRead(size_t size) {
@@ -112,9 +126,9 @@
     // However, if the connection has never been established yet (e.g.
     // authorization failed), do not restart right now. Wait till we get
     // new credentials.
-    task_runner_->PostTask(FROM_HERE,
-                           base::Bind(&XmppChannel::Restart,
-                                      weak_ptr_factory_.GetWeakPtr()));
+    task_runner_->PostTask(
+        FROM_HERE,
+        base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
   } else if (delegate_) {
     delegate_->OnPermanentFailure();
   }
@@ -125,17 +139,17 @@
   // from expat XML parser and some stanza could cause the XMPP stream to be
   // reset and the parser to be re-initialized. We don't want to destroy the
   // parser while it is performing a callback invocation.
-  task_runner_->PostTask(FROM_HERE,
-                         base::Bind(&XmppChannel::HandleStanza,
-                                    weak_ptr_factory_.GetWeakPtr(),
-                                    base::Passed(std::move(stanza))));
+  task_runner_->PostTask(
+      FROM_HERE,
+      base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
+                 base::Passed(std::move(stanza))));
 }
 
 void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
   VLOG(2) << "XMPP stanza received: " << stanza->ToString();
 
   switch (state_) {
-    case XmppState::kStarted:
+    case XmppState::kConnected:
       if (stanza->name() == "stream:features" &&
           stanza->FindFirstChild("starttls/required", false)) {
         state_ = XmppState::kTlsStarted;
@@ -180,8 +194,8 @@
         iq_stanza_handler_->SendRequest(
             "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
             base::Bind(&XmppChannel::OnBindCompleted,
-                       weak_ptr_factory_.GetWeakPtr()),
-            base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+                       task_ptr_factory_.GetWeakPtr()),
+            base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
         return;
       }
       break;
@@ -226,8 +240,8 @@
   iq_stanza_handler_->SendRequest(
       "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
       base::Bind(&XmppChannel::OnSessionEstablished,
-                 weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+                 task_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
 }
 
 void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
@@ -240,9 +254,8 @@
                      "<item channel='cloud_devices' from=''/></subscribe>";
   iq_stanza_handler_->SendRequest(
       "set", "", account_, body,
-      base::Bind(&XmppChannel::OnSubscribed,
-                  weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+      base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
 }
 
 void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
@@ -279,9 +292,8 @@
   chromeos::TlsStream::Connect(
       std::move(raw_socket_), host_,
       base::Bind(&XmppChannel::OnTlsHandshakeComplete,
-                 weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnTlsError,
-                 weak_ptr_factory_.GetWeakPtr()));
+                 task_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnTlsError, task_ptr_factory_.GetWeakPtr()));
 }
 
 void XmppChannel::OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream) {
@@ -308,10 +320,9 @@
 
   write_pending_ = true;
   bool ok = stream_->WriteAllAsync(
-      write_socket_data_.data(),
-      write_socket_data_.size(),
-      base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
+      write_socket_data_.data(), write_socket_data_.size(),
+      base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnWriteError, task_ptr_factory_.GetWeakPtr()),
       &error);
 
   if (!ok)
@@ -339,10 +350,9 @@
   chromeos::ErrorPtr error;
   read_pending_ = true;
   bool ok = stream_->ReadAsync(
-      read_socket_data_.data(),
-      read_socket_data_.size(),
-      base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
+      read_socket_data_.data(), read_socket_data_.size(),
+      base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnReadError, task_ptr_factory_.GetWeakPtr()),
       &error);
 
   if (!ok)
@@ -361,6 +371,7 @@
 
 void XmppChannel::Connect(const std::string& host, uint16_t port,
                           const base::Closure& callback) {
+  state_ = XmppState::kConnecting;
   LOG(INFO) << "Starting XMPP connection to " << host << ":" << port;
   int socket_fd = ConnectSocket(host, port);
   if (socket_fd >= 0) {
@@ -379,13 +390,12 @@
     stream_ = raw_socket_.get();
     callback.Run();
   } else {
-    VLOG(2) << "Delaying connection to XMPP server " << host << " for "
-            << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
-            << " milliseconds.";
+    VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+            << backoff_entry_.GetTimeUntilRelease();
     task_runner_->PostDelayedTask(
         FROM_HERE,
-        base::Bind(&XmppChannel::Connect, weak_ptr_factory_.GetWeakPtr(),
-                    host, port, callback),
+        base::Bind(&XmppChannel::Connect, task_ptr_factory_.GetWeakPtr(), host,
+                   port, callback),
         backoff_entry_.GetTimeUntilRelease());
   }
 }
@@ -403,6 +413,7 @@
 }
 
 void XmppChannel::Restart() {
+  VLOG(1) << "Restarting XMPP";
   Stop();
   Start(delegate_);
 }
@@ -410,17 +421,18 @@
 void XmppChannel::Start(NotificationDelegate* delegate) {
   CHECK(state_ == XmppState::kNotStarted);
   delegate_ = delegate;
-  Connect(kDefaultXmppHost, kDefaultXmppPort,
-          base::Bind(&XmppChannel::OnConnected,
-                     weak_ptr_factory_.GetWeakPtr()));
+
+  Connect(
+      kDefaultXmppHost, kDefaultXmppPort,
+      base::Bind(&XmppChannel::OnConnected, task_ptr_factory_.GetWeakPtr()));
 }
 
 void XmppChannel::Stop() {
   if (IsConnected() && delegate_)
     delegate_->OnDisconnected();
 
-  weak_ptr_factory_.InvalidateWeakPtrs();
-  StopPingTimer();
+  task_ptr_factory_.InvalidateWeakPtrs();
+  ping_ptr_factory_.InvalidateWeakPtrs();
 
   if (tls_stream_) {
     tls_stream_->CloseBlocking(nullptr);
@@ -435,9 +447,10 @@
 }
 
 void XmppChannel::OnConnected() {
-  state_ = XmppState::kStarted;
+  CHECK(XmppState::kConnecting == state_);
+  state_ = XmppState::kConnected;
   RestartXmppStream();
-  StartPingTimer();
+  ScheduleRegularPing();
 }
 
 void XmppChannel::RestartXmppStream() {
@@ -448,34 +461,65 @@
   SendMessage(BuildXmppStartStreamCommand());
 }
 
-void XmppChannel::StartPingTimer() {
-  ping_timer_.Start(FROM_HERE,
-                    base::TimeDelta::FromSeconds(kPingIntervalSeconds),
-                    base::Bind(&XmppChannel::PingServer,
-                               weak_ptr_factory_.GetWeakPtr()));
+void XmppChannel::SchedulePing(base::TimeDelta interval,
+                               base::TimeDelta timeout) {
+  VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
+  ping_ptr_factory_.InvalidateWeakPtrs();
+  task_runner_->PostDelayedTask(
+      FROM_HERE, base::Bind(&XmppChannel::PingServer,
+                            ping_ptr_factory_.GetWeakPtr(), timeout),
+      interval);
 }
 
-void XmppChannel::StopPingTimer() {
-  ping_timer_.Stop();
+void XmppChannel::ScheduleRegularPing() {
+  SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
+               base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
 }
 
-void XmppChannel::PingServer() {
+void XmppChannel::ScheduleFastPing() {
+  SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
+               base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
+}
+
+void XmppChannel::PingServer(base::TimeDelta timeout) {
+  VLOG(1) << "Sending XMPP ping";
   // Send an XMPP Ping request as defined in XEP-0199 extension:
   // http://xmpp.org/extensions/xep-0199.html
-  iq_stanza_handler_->SendRequest(
-      "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>",
-      base::Bind(&XmppChannel::OnPingResponse, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnPingTimeout, weak_ptr_factory_.GetWeakPtr()));
+  iq_stanza_handler_->SendRequestWithCustomTimeout(
+      "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
+      base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
+                 base::Time::Now()),
+      base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
+                 base::Time::Now()));
 }
 
-void XmppChannel::OnPingResponse(std::unique_ptr<XmlNode> reply) {
+void XmppChannel::OnPingResponse(base::Time sent_time,
+                                 std::unique_ptr<XmlNode> reply) {
+  VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
   // Ping response received from server. Everything seems to be in order.
-  // Nothing else to do.
+  // Reschedule with default intervals.
+  ScheduleRegularPing();
 }
 
-void XmppChannel::OnPingTimeout() {
-  LOG(WARNING) << "XMPP channel seems to be disconnected - ping timed out";
+void XmppChannel::OnPingTimeout(base::Time sent_time) {
+  LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
+               << (base::Time::Now() - sent_time);
   Restart();
 }
 
+void XmppChannel::OnConnectivityChanged(bool online) {
+  if (state_ == XmppState::kNotStarted)
+    return;
+
+  if (state_ == XmppState::kConnecting &&
+      backoff_entry_.GetTimeUntilRelease() <
+          base::TimeDelta::FromSeconds(
+              kConnectingTimeoutAfterNetChangeSeconds)) {
+    VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
+    return;
+  }
+
+  ScheduleFastPing();
+}
+
 }  // namespace buffet
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index bdddd91..3d279e4 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -14,7 +14,6 @@
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
 #include <base/single_thread_task_runner.h>
-#include <base/timer/timer.h>
 #include <chromeos/backoff_entry.h>
 #include <chromeos/streams/stream.h>
 
@@ -22,6 +21,10 @@
 #include "buffet/notification/xmpp_iq_stanza_handler.h"
 #include "buffet/notification/xmpp_stream_parser.h"
 
+namespace privetd {
+class ShillClient;
+}
+
 namespace buffet {
 
 // Simple interface to abstract XmppChannel's SendMessage() method.
@@ -42,7 +45,8 @@
   // so you will need to reset the XmppClient every time this happens.
   XmppChannel(const std::string& account,
               const std::string& access_token,
-              const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
+              const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+              privetd::ShillClient* shill);
   ~XmppChannel() override = default;
 
   // Overrides from NotificationChannel.
@@ -57,7 +61,8 @@
   // Internal states for the XMPP stream.
   enum class XmppState {
     kNotStarted,
-    kStarted,
+    kConnecting,
+    kConnected,
     kTlsStarted,
     kTlsCompleted,
     kAuthenticationStarted,
@@ -74,8 +79,9 @@
   // to help provide unit-test-specific functionality.
   virtual void Connect(const std::string& host, uint16_t port,
                        const base::Closure& callback);
-  virtual void StartPingTimer();
-  virtual void StopPingTimer();
+  virtual void SchedulePing(base::TimeDelta interval, base::TimeDelta timeout);
+  void ScheduleRegularPing();
+  void ScheduleFastPing();
 
   XmppState state_{XmppState::kNotStarted};
 
@@ -119,9 +125,11 @@
 
   // Sends a ping request to the server to check if the connection is still
   // valid.
-  void PingServer();
-  void OnPingResponse(std::unique_ptr<XmlNode> reply);
-  void OnPingTimeout();
+  void PingServer(base::TimeDelta timeout);
+  void OnPingResponse(base::Time sent_time, std::unique_ptr<XmlNode> reply);
+  void OnPingTimeout(base::Time sent_time);
+
+  void OnConnectivityChanged(bool online);
 
   // Robot account name for the device.
   std::string account_;
@@ -153,8 +161,8 @@
   bool write_pending_{false};
   std::unique_ptr<IqStanzaHandler> iq_stanza_handler_;
 
-  base::Timer ping_timer_{true, true};
-
+  base::WeakPtrFactory<XmppChannel> ping_ptr_factory_{this};
+  base::WeakPtrFactory<XmppChannel> task_ptr_factory_{this};
   base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
   DISALLOW_COPY_AND_ASSIGN(XmppChannel);
 };
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
index ee6ea75..d16e81e 100644
--- a/buffet/notification/xmpp_channel_unittest.cc
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -103,7 +103,7 @@
   FakeXmppChannel(
       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
       base::Clock* clock)
-      : XmppChannel{kAccountName, kAccessToken, task_runner},
+      : XmppChannel{kAccountName, kAccessToken, task_runner, nullptr},
         fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, task_runner,
                      clock} {}
 
@@ -112,12 +112,13 @@
 
   void Connect(const std::string& host, uint16_t port,
                const base::Closure& callback) override {
+    set_state(XmppState::kConnecting);
     stream_ = &fake_stream_;
     callback.Run();
   }
 
-  void StartPingTimer() override {}
-  void StopPingTimer() override {}
+  void SchedulePing(base::TimeDelta interval,
+                    base::TimeDelta timeout) override {}
 
   chromeos::FakeStream fake_stream_;
 };
@@ -183,7 +184,7 @@
   xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
   xmpp_client_->Start(nullptr);
   RunTasks(1);
-  EXPECT_EQ(XmppChannel::XmppState::kStarted, xmpp_client_->state());
+  EXPECT_EQ(XmppChannel::XmppState::kConnected, xmpp_client_->state());
 }
 
 TEST_F(XmppChannelTest, HandleStartedResponse) {
diff --git a/buffet/notification/xmpp_iq_stanza_handler.cc b/buffet/notification/xmpp_iq_stanza_handler.cc
index e3a14c0..fada014 100644
--- a/buffet/notification/xmpp_iq_stanza_handler.cc
+++ b/buffet/notification/xmpp_iq_stanza_handler.cc
@@ -58,16 +58,30 @@
     const std::string& body,
     const ResponseCallback& response_callback,
     const TimeoutCallback& timeout_callback) {
+  return SendRequestWithCustomTimeout(
+      type, from, to, body,
+      base::TimeDelta::FromSeconds(kTimeoutIntervalSeconds), response_callback,
+      timeout_callback);
+}
+
+void IqStanzaHandler::SendRequestWithCustomTimeout(
+    const std::string& type,
+    const std::string& from,
+    const std::string& to,
+    const std::string& body,
+    base::TimeDelta timeout,
+    const ResponseCallback& response_callback,
+    const TimeoutCallback& timeout_callback) {
   // Remember the response callback to call later.
   requests_.emplace(++last_request_id_, response_callback);
   // Schedule a time-out callback for this request.
-  task_runner_->PostDelayedTask(
-      FROM_HERE,
-      base::Bind(&IqStanzaHandler::OnTimeOut,
-                 weak_ptr_factory_.GetWeakPtr(),
-                 last_request_id_,
-                 timeout_callback),
-      base::TimeDelta::FromSeconds(kTimeoutIntervalSeconds));
+  if (timeout < base::TimeDelta::Max()) {
+    task_runner_->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&IqStanzaHandler::OnTimeOut, weak_ptr_factory_.GetWeakPtr(),
+                   last_request_id_, timeout_callback),
+        timeout);
+  }
 
   std::string message = BuildIqStanza(std::to_string(last_request_id_),
                                       type, to, from, body);
diff --git a/buffet/notification/xmpp_iq_stanza_handler.h b/buffet/notification/xmpp_iq_stanza_handler.h
index 0eb0900..2583e90 100644
--- a/buffet/notification/xmpp_iq_stanza_handler.h
+++ b/buffet/notification/xmpp_iq_stanza_handler.h
@@ -45,6 +45,16 @@
                    const ResponseCallback& response_callback,
                    const TimeoutCallback& timeout_callback);
 
+  // |timeout| is the custom time interval after which requests should be
+  // considered failed.
+  void SendRequestWithCustomTimeout(const std::string& type,
+                                    const std::string& from,
+                                    const std::string& to,
+                                    const std::string& body,
+                                    base::TimeDelta timeout,
+                                    const ResponseCallback& response_callback,
+                                    const TimeoutCallback& timeout_callback);
+
   // Processes an <iq> stanza is received from the server. This will match the
   // stanza's 'id' attribute with pending request ID and if found, will
   // call the |response_callback|, or if the request is not found, an error
diff --git a/buffet/privet/privet_manager.cc b/buffet/privet/privet_manager.cc
index 2f0dd37..bdea47a 100644
--- a/buffet/privet/privet_manager.cc
+++ b/buffet/privet/privet_manager.cc
@@ -63,6 +63,7 @@
 
 void Manager::Start(const Options& options,
                     const scoped_refptr<dbus::Bus>& bus,
+                    ShillClient* shill_client,
                     buffet::DeviceRegistrationInfo* device,
                     buffet::CommandManager* command_manager,
                     buffet::StateManager* state_manager,
@@ -75,15 +76,14 @@
   security_.reset(new SecurityManager(device->GetConfig().pairing_modes(),
                                       device->GetConfig().embedded_code_path(),
                                       disable_security_));
-  shill_client_.reset(new ShillClient(bus, options.device_whitelist));
-  shill_client_->RegisterConnectivityListener(
+  shill_client->RegisterConnectivityListener(
       base::Bind(&Manager::OnConnectivityChanged, base::Unretained(this)));
   ap_manager_client_.reset(new ApManagerClient(bus));
 
   if (device->GetConfig().wifi_auto_setup_enabled()) {
     VLOG(1) << "Enabling WiFi bootstrapping.";
     wifi_bootstrap_manager_.reset(new WifiBootstrapManager(
-        device->GetConfig().last_configured_ssid(), shill_client_.get(),
+        device->GetConfig().last_configured_ssid(), shill_client,
         ap_manager_client_.get(), cloud_.get()));
     wifi_bootstrap_manager_->Init();
   }
diff --git a/buffet/privet/privet_manager.h b/buffet/privet/privet_manager.h
index 72189c0..2af19d1 100644
--- a/buffet/privet/privet_manager.h
+++ b/buffet/privet/privet_manager.h
@@ -52,7 +52,6 @@
     bool disable_privet{false};
     bool disable_security{false};
     bool enable_ping{false};
-    std::set<std::string> device_whitelist;
     base::FilePath config_path;
   };
 
@@ -61,6 +60,7 @@
 
   void Start(const Options& options,
              const scoped_refptr<dbus::Bus>& bus,
+             ShillClient* shill_client,
              buffet::DeviceRegistrationInfo* device,
              buffet::CommandManager* command_manager,
              buffet::StateManager* state_manager,
@@ -101,7 +101,6 @@
   std::unique_ptr<CloudDelegate> cloud_;
   std::unique_ptr<DeviceDelegate> device_;
   std::unique_ptr<SecurityManager> security_;
-  std::unique_ptr<ShillClient> shill_client_;
   std::unique_ptr<ApManagerClient> ap_manager_client_;
   std::unique_ptr<WifiBootstrapManager> wifi_bootstrap_manager_;
   std::unique_ptr<PeerdClient> peerd_client_;
diff --git a/buffet/privet/shill_client.cc b/buffet/privet/shill_client.cc
index 0a1b2e2..48d0cef 100644
--- a/buffet/privet/shill_client.cc
+++ b/buffet/privet/shill_client.cc
@@ -464,19 +464,20 @@
       new_connectivity_state = kv.second.service_state;
     }
   }
-  VLOG(3) << "New connectivity state is "
+  VLOG(1) << "Connectivity changed: "
+          << ServiceStateToString(connectivity_state_) << " -> "
           << ServiceStateToString(new_connectivity_state);
-  if (new_connectivity_state != connectivity_state_) {
-    connectivity_state_ = new_connectivity_state;
-    // We may call UpdateConnectivityState whenever we mutate a data structure
-    // such that our connectivity status could change.  However, we don't want
-    // to allow people to call into ShillClient while some other operation is
-    // underway.  Therefore, call our callbacks later, when we're in a good
-    // state.
-    base::MessageLoop::current()->PostTask(
-        FROM_HERE, base::Bind(&ShillClient::NotifyConnectivityListeners,
-                              weak_factory_.GetWeakPtr(), AmOnline()));
-  }
+  // Notify listeners even if state changed to the same value. Listeners may
+  // want to handle this event.
+  connectivity_state_ = new_connectivity_state;
+  // We may call UpdateConnectivityState whenever we mutate a data structure
+  // such that our connectivity status could change.  However, we don't want
+  // to allow people to call into ShillClient while some other operation is
+  // underway.  Therefore, call our callbacks later, when we're in a good
+  // state.
+  base::MessageLoop::current()->PostTask(
+      FROM_HERE, base::Bind(&ShillClient::NotifyConnectivityListeners,
+                            weak_factory_.GetWeakPtr(), AmOnline()));
 }
 
 void ShillClient::NotifyConnectivityListeners(bool am_online) {