shill: Make multiple TCP connections in ConnectionHealthChecker and
decide based on the whole sample

BUG=chromium:232136
TEST=(1) Build and run unit-tests
     (2) Test that there aren't many false positives in out-of-credit
         detection.

Change-Id: I3fa9d926536a2085dcdd8e6f7f92363e437a0b9f
Reviewed-on: https://gerrit.chromium.org/gerrit/48860
Commit-Queue: Prathmesh Prabhu <pprabhu@chromium.org>
Reviewed-by: Prathmesh Prabhu <pprabhu@chromium.org>
Tested-by: Prathmesh Prabhu <pprabhu@chromium.org>
diff --git a/connection_health_checker.cc b/connection_health_checker.cc
index 52ba738..c43d9e4 100644
--- a/connection_health_checker.cc
+++ b/connection_health_checker.cc
@@ -28,18 +28,29 @@
 #include "shill/socket_info_reader.h"
 
 using base::Bind;
+using base::Unretained;
 using std::string;
 using std::vector;
 
 namespace shill {
 
-//static
+// static
 const int ConnectionHealthChecker::kDNSTimeoutMilliseconds = 5000;
-//static
-const int ConnectionHealthChecker::kMaxConnectionAttempts = 3;
-//static
+// static
+const int ConnectionHealthChecker::kInvalidSocket = -1;
+// static
+const int ConnectionHealthChecker::kMaxFailedConnectionAttempts = 2;
+// static
+const int ConnectionHealthChecker::kMaxSentDataPollingAttempts = 2;
+// static
+const int ConnectionHealthChecker::kMinCongestedQueueAttempts = 2;
+// static
+const int ConnectionHealthChecker::kMinSuccessfulSendAttempts = 1;
+// static
 const int ConnectionHealthChecker::kNumDNSQueries = 5;
-//static
+// static
+const int ConnectionHealthChecker::kTCPStateUpdateWaitMilliseconds = 5000;
+// static
 const uint16 ConnectionHealthChecker::kRemotePort = 80;
 
 ConnectionHealthChecker::ConnectionHealthChecker(
@@ -49,27 +60,36 @@
     : connection_(connection),
       dispatcher_(dispatcher),
       result_callback_(result_callback),
-      socket_info_reader_(new SocketInfoReader()),
       socket_(new Sockets()),
       weak_ptr_factory_(this),
       connection_complete_callback_(
           Bind(&ConnectionHealthChecker::OnConnectionComplete,
                weak_ptr_factory_.GetWeakPtr())),
-      dns_client_callback_(Bind(&ConnectionHealthChecker::GetDNSResult,
-                                weak_ptr_factory_.GetWeakPtr())),
       tcp_connection_(new AsyncConnection(connection_->interface_name(),
                                           dispatcher_,
                                           socket_.get(),
                                           connection_complete_callback_)),
+      report_result_(
+          Bind(&ConnectionHealthChecker::ReportResult,
+               weak_ptr_factory_.GetWeakPtr())),
+      sock_fd_(kInvalidSocket),
+      socket_info_reader_(new SocketInfoReader()),
       dns_client_factory_(DNSClientFactory::GetInstance()),
-      run_data_test_(true),
+      dns_client_callback_(Bind(&ConnectionHealthChecker::GetDNSResult,
+                                weak_ptr_factory_.GetWeakPtr())),
       health_check_in_progress_(false),
-      num_connection_attempts_(0) {}
+      num_connection_failures_(0),
+      num_congested_queue_detected_(0),
+      num_successful_sends_(0) {}
 
 ConnectionHealthChecker::~ConnectionHealthChecker() {
   Stop();
 }
 
+bool ConnectionHealthChecker::health_check_in_progress() const {
+ return health_check_in_progress_;
+}
+
 void ConnectionHealthChecker::AddRemoteIP(IPAddress ip) {
   remote_ips_.push_back(ip);
 }
@@ -117,9 +137,10 @@
   }
 
   health_check_in_progress_ = true;
-  num_connection_attempts_ = 0;
+  num_connection_failures_ = 0;
+  num_congested_queue_detected_ = 0;
+  num_successful_sends_ = 0;
 
-  // Initiate the first attempt.
   if (remote_ips_.empty()) {
     // Nothing to try.
     Stop();
@@ -127,13 +148,21 @@
     result_callback_.Run(kResultUnknown);
     return;
   }
-  SetupTcpConnection();
+
+  // Initiate the first attempt.
+  NextHealthCheckSample();
 }
 
 void ConnectionHealthChecker::Stop() {
   if (tcp_connection_ != NULL)
     tcp_connection_->Stop();
+  verify_sent_data_callback_.Cancel();
+  ClearSocketDescriptor();
   health_check_in_progress_ = false;
+  num_connection_failures_ = 0;
+  num_congested_queue_detected_ = 0;
+  num_successful_sends_ = 0;
+  num_tx_queue_polling_attempts_ = 0;
 }
 
 const char *ConnectionHealthChecker::ResultToString(
@@ -156,47 +185,6 @@
   }
 }
 
-void ConnectionHealthChecker::SetupTcpConnection() {
-  // Pick a random IP from the set of IPs.
-  // This guards against
-  //   (1) Repeated failed attempts for the same IP at start-up everytime.
-  //   (2) All users attempting to connect to the same IP.
-  int next_ip_index = rand() % remote_ips_.size();
-  const IPAddress &ip = remote_ips_[next_ip_index];
-  SLOG(Connection, 3) << __func__ << ": Starting connection at "
-                      << ip.ToString();
-  if (tcp_connection_->Start(ip, kRemotePort)) {
-    // TCP connection successful, no need to try more.
-    return;
-  }
-  SLOG(Connection, 2) << __func__ << ": Connection attempt failed.";
-  TryNextIP();
-}
-
-void ConnectionHealthChecker::OnConnectionComplete(bool success, int sock_fd) {
-  if (!success) {
-    SLOG(Connection, 2) << __func__
-                        << ": AsyncConnection connection attempt failed.";
-    TryNextIP();  // Make sure TryNextIP() is the last statement.
-    return;
-  }
-  // Transferred owndership of valid sock_fd.
-
-  // Check if the established connection is healthy.
-  Result result = run_data_test_ ? SendData(sock_fd) : ShutDown(sock_fd);
-
-  // The health check routine(s) may further indicate a problem requiring a
-  // reattempt.
-  if (result == kResultConnectionFailure || result == kResultUnknown) {
-    socket_->Close(sock_fd);
-    TryNextIP();  // Make sure TryNextIP() is the last statement.
-  } else {
-    socket_->Close(sock_fd);
-    Stop();
-    result_callback_.Run(result);  // Make sure this is the last statement.
-  }
-}
-
 void ConnectionHealthChecker::GetDNSResult(const Error &error,
                                            const IPAddress& ip) {
   if (!error.IsSuccess()) {
@@ -211,83 +199,137 @@
   remote_ips_.push_back(ip);
 }
 
-void ConnectionHealthChecker::TryNextIP() {
-  ++num_connection_attempts_;
-  // Check if enough attempts have been made already.
-  if (num_connection_attempts_ >= kMaxConnectionAttempts) {
-    LOG(INFO) << __func__
-              << ": multiple failed attempts to established a TCP connection.";
-    // Give up. Clean up and notify client.
-    Stop();
-    result_callback_.Run(kResultConnectionFailure);
-    return;
+void ConnectionHealthChecker::GarbageCollectDNSClients() {
+  ScopedVector<DNSClient> keep;
+  ScopedVector<DNSClient> discard;
+  for (size_t i = 0; i < dns_clients_.size(); ++i) {
+    if (dns_clients_[i]->IsActive())
+      keep.push_back(dns_clients_[i]);
+    else
+      discard.push_back(dns_clients_[i]);
   }
-  SetupTcpConnection();
+  dns_clients_.weak_clear();
+  dns_clients_ = keep.Pass();  // Passes ownership of contents.
+  discard.clear();
 }
 
-// Send data on the connection and observe the TxCount.
-ConnectionHealthChecker::Result ConnectionHealthChecker::SendData(int sock_fd) {
+void ConnectionHealthChecker::NextHealthCheckSample() {
+  // Finish conditions:
+  if (num_connection_failures_ == kMaxFailedConnectionAttempts) {
+    health_check_result_ = kResultConnectionFailure;
+    dispatcher_->PostTask(report_result_);
+    return;
+  }
+  if (num_congested_queue_detected_ == kMinCongestedQueueAttempts) {
+    health_check_result_ = kResultCongestedTxQueue;
+    dispatcher_->PostTask(report_result_);
+    return;
+  }
+  if (num_successful_sends_ == kMinSuccessfulSendAttempts) {
+    health_check_result_ = kResultSuccess;
+    dispatcher_->PostTask(report_result_);
+    return;
+  }
+
+  // Pick a random IP from the set of IPs.
+  // This guards against
+  //   (1) Repeated failed attempts for the same IP at start-up everytime.
+  //   (2) All users attempting to connect to the same IP.
+  int next_ip_index = rand() % remote_ips_.size();
+  const IPAddress &ip = remote_ips_[next_ip_index];
+  SLOG(Connection, 3) << __func__ << ": Starting connection at "
+                      << ip.ToString();
+  if (!tcp_connection_->Start(ip, kRemotePort)) {
+    SLOG(Connection, 2) << __func__ << ": Connection attempt failed.";
+    ++num_connection_failures_;
+    NextHealthCheckSample();
+  }
+}
+
+void ConnectionHealthChecker::OnConnectionComplete(bool success, int sock_fd) {
+  if (!success) {
+    SLOG(Connection, 2) << __func__
+                        << ": AsyncConnection connection attempt failed.";
+    ++num_connection_failures_;
+    NextHealthCheckSample();
+    return;
+  }
+
+  SetSocketDescriptor(sock_fd);
+
   SocketInfo sock_info;
-  uint64 old_transmit_queue_value;
-  if (!GetSocketInfo(sock_fd, &sock_info) ||
+  if (!GetSocketInfo(sock_fd_, &sock_info) ||
       sock_info.connection_state() !=
           SocketInfo::kConnectionStateEstablished) {
     SLOG(Connection, 2) << __func__
                         << ": Connection originally not in established state..";
     // Count this as a failed connection attempt.
-    return kResultUnknown;
+    ++num_connection_failures_;
+    ClearSocketDescriptor();
+    NextHealthCheckSample();
+    return;
   }
-  old_transmit_queue_value = sock_info.transmit_queue_value();
 
+  old_transmit_queue_value_ = sock_info.transmit_queue_value();
+  num_tx_queue_polling_attempts_ = 0;
+
+  // Send data on the connection and post a delayed task to check successful
+  // transfer.
   char buf;
-  if (socket_->Send(sock_fd, &buf, sizeof(buf), 0) == -1) {
+  if (socket_->Send(sock_fd_, &buf, sizeof(buf), 0) == -1) {
     SLOG(Connection, 2) << __func__ << ": " << socket_->ErrorString();
     // Count this as a failed connection attempt.
-    return kResultConnectionFailure;
+    ++num_connection_failures_;
+    ClearSocketDescriptor();
+    NextHealthCheckSample();
+    return;
   }
 
-  // Wait to give enough time for the TxCount to be updated.
-  // TODO(pprabhu) Check that this is reliable wrt timing effects.
-  if (!GetSocketInfo(sock_fd, &sock_info) ||
-      sock_info.connection_state() !=
-          SocketInfo::kConnectionStateEstablished) {
-    SLOG(Connection, 2) << __func__
-                        << ": Connection not in established state after send.";
-    // Count this as a failed connection attempt.
-    return kResultUnknown;
-  }
-
-  if (sock_info.transmit_queue_value() > old_transmit_queue_value) {
-    return kResultCongestedTxQueue;
-  }
-
-  return kResultSuccess;
+  verify_sent_data_callback_.Reset(
+      Bind(&ConnectionHealthChecker::VerifySentData, Unretained(this)));
+  dispatcher_->PostDelayedTask(verify_sent_data_callback_.callback(),
+                               kTCPStateUpdateWaitMilliseconds);
 }
 
-// Attempt to shutdown the connection and check if the connection is stuck in
-// the TIME_WAIT tcp state.
-ConnectionHealthChecker::Result ConnectionHealthChecker::ShutDown(int sock_fd) {
-  if (socket_->ShutDown(sock_fd, SHUT_RDWR) == -1) {
-    SLOG(Connection, 2) << __func__
-                        << ": Failed to cleanly shut down the connection.";
-    // Count this as a failed connection attempt.
-    return kResultUnknown;
-  }
-  // Wait to give enough time for a normal TCP shutdown?
-  // TODO(pprabhu) Check that this is reliable wrt timing effects.
-
+void ConnectionHealthChecker::VerifySentData() {
   SocketInfo sock_info;
-  if (!GetSocketInfo(sock_fd, &sock_info)) {
-    // The TCP socket for the connection has been cleaned.
-    // This means ShutDown was successful.
-    return kResultSuccess;
+  bool sock_info_found = GetSocketInfo(sock_fd_, &sock_info);
+  // Acceptable TCP connection states after sending the data:
+  // kConnectionStateEstablished: No change in connection state since the send.
+  // kConnectionStateCloseWait: The remote host recieved the sent data and
+  //    requested connection close.
+  if (!sock_info_found ||
+      (sock_info.connection_state() !=
+           SocketInfo::kConnectionStateEstablished &&
+      sock_info.connection_state() !=
+           SocketInfo::kConnectionStateCloseWait)) {
+    SLOG(Connection, 2) << __func__
+                        << ": Connection not in acceptable state after send.";
+    if (sock_info_found)
+      SLOG(Connection, 3) << "Found socket info but in state: "
+                          << sock_info.connection_state();
+    ++num_connection_failures_;
+  } else if (sock_info.transmit_queue_value() > old_transmit_queue_value_ &&
+      sock_info.timer_state() ==
+          SocketInfo::kTimerStateRetransmitTimerPending) {
+    if (num_tx_queue_polling_attempts_ < kMaxSentDataPollingAttempts) {
+      SLOG(Connection, 2) << __func__
+                          << ": Polling again.";
+      ++num_tx_queue_polling_attempts_;
+      verify_sent_data_callback_.Reset(
+          Bind(&ConnectionHealthChecker::VerifySentData, Unretained(this)));
+      dispatcher_->PostDelayedTask(verify_sent_data_callback_.callback(),
+                                   kTCPStateUpdateWaitMilliseconds);
+      return;
+    }
+    SLOG(Connection, 2) << __func__ << ": Sampled congested Tx-Queue";
+    ++num_congested_queue_detected_;
+  } else {
+    SLOG(Connection, 2) << __func__ << ": Sampled successful send.";
+    ++num_successful_sends_;
   }
-  if (sock_info.connection_state() == SocketInfo::kConnectionStateFinWait1 ||
-      sock_info.connection_state() == SocketInfo::kConnectionStateFinWait2 ||
-      sock_info.connection_state() == SocketInfo::kConnectionStateTimeWait)
-    return kResultElongatedTimeWait;
-
-  return kResultUnknown;
+  ClearSocketDescriptor();
+  NextHealthCheckSample();
 }
 
 //TODO(pprabhu): Scrub IP address logging.
@@ -336,7 +378,7 @@
        ++info_list_it) {
     const SocketInfo &cur_sock_info = *info_list_it;
 
-    SLOG(Connection, 3)
+    SLOG(Connection, 4)
         << "Testing against IP = "
         << cur_sock_info.local_ip_address().ToString()
         << ":" << cur_sock_info.local_port()
@@ -347,7 +389,7 @@
 
     if (cur_sock_info.local_ip_address().Equals(local_ip_address) &&
         cur_sock_info.local_port() == local_port) {
-      // Copy SocketInfo.
+      SLOG(Connection, 3) << __func__ << ": Found matching TCP socket info.";
       *sock_info = cur_sock_info;
       return true;
     }
@@ -357,18 +399,23 @@
   return false;
 }
 
-void ConnectionHealthChecker::GarbageCollectDNSClients() {
-  ScopedVector<DNSClient> keep;
-  ScopedVector<DNSClient> discard;
-  for (size_t i = 0; i < dns_clients_.size(); ++i) {
-    if (dns_clients_[i]->IsActive())
-      keep.push_back(dns_clients_[i]);
-    else
-      discard.push_back(dns_clients_[i]);
+void ConnectionHealthChecker::ReportResult() {
+  SLOG(Connection, 2) << __func__ << ": Result: "
+                     << ResultToString(health_check_result_);
+  Stop();
+  result_callback_.Run(health_check_result_);
+}
+
+void ConnectionHealthChecker::SetSocketDescriptor(int sock_fd) {
+  if (sock_fd_ != kInvalidSocket) {
+    SLOG(Connection, 4) << "Closing socket";
+    socket_->Close(sock_fd_);
   }
-  dns_clients_.weak_clear();
-  dns_clients_ = keep.Pass();  // Passes ownership of contents.
-  discard.clear();
+  sock_fd_ = sock_fd;
+}
+
+void ConnectionHealthChecker::ClearSocketDescriptor() {
+  SetSocketDescriptor(kInvalidSocket);
 }
 
 }  // namespace shill