Implement streaming I/O for response data

Sending response to an HTTP request is now using streaming I/O.
ProtocolHandler::CompleteRequest() now returns a file descriptor
for the write-end of a pipe and clients (request handlers) can
write to this file descriptor data at a rate it becomes available.
When expected amount of data is sent (or the pipe is closed on
the handler's end) the response is finalized.

Both normal (when data size is known) and chunked responses are
supported.

BUG: 24166746
Change-Id: Ie98cfa4262ba82941aaefd4c9bfd9986c784219c
diff --git a/libwebserv/protocol_handler.cc b/libwebserv/protocol_handler.cc
index 544281f..81f7a5c 100644
--- a/libwebserv/protocol_handler.cc
+++ b/libwebserv/protocol_handler.cc
@@ -31,8 +31,51 @@
 
 namespace {
 
-// A dummy callback for async D-Bus errors.
-void IgnoreError(chromeos::Error* error) {}
+// Dummy callbacks for async D-Bus/Stream errors.
+void IgnoreDBusError(chromeos::Error* error) {}
+void IgnoreStreamError(const chromeos::Error* error) {}
+
+// Structure to hold the data needed for asynchronous copying of two streams.
+struct StreamCopyData {
+  chromeos::StreamPtr src_stream;
+  chromeos::StreamPtr dest_stream;
+  std::vector<uint8_t> buffer;
+};
+
+// Forward-declaration.
+void PerformRead(std::unique_ptr<StreamCopyData> data);
+
+// Async callback which writes data to the destination stream after read.
+void PerformWrite(std::unique_ptr<StreamCopyData> data, size_t size) {
+  if (size == 0) // We are all done.
+    return;
+  data->dest_stream->WriteAllAsync(
+      data->buffer.data(), size, base::Bind(&PerformRead, base::Passed(&data)),
+      base::Bind(&IgnoreStreamError), nullptr);
+}
+
+// Reads the data from the source stream into a buffer and invokes PerformWrite
+// when done.
+void PerformRead(std::unique_ptr<StreamCopyData> data) {
+  data->src_stream->ReadAsync(data->buffer.data(), data->buffer.size(),
+                              base::Bind(&PerformWrite, base::Passed(&data)),
+                              base::Bind(&IgnoreStreamError), nullptr);
+}
+
+// Copies the data from |src_stream| to the destination stream represented
+// by a file descriptor |fd|.
+void WriteResponseData(chromeos::StreamPtr src_stream,
+                       const dbus::FileDescriptor& fd) {
+  std::unique_ptr<StreamCopyData> data{new StreamCopyData};
+  int dupfd = dup(fd.value());
+  data->src_stream = std::move(src_stream);
+  data->dest_stream =
+      chromeos::FileStream::FromFileDescriptor(dupfd, true, nullptr);
+  data->buffer.resize(4096);  // Read buffer of 4 KiB.
+  CHECK(data->src_stream);
+  CHECK(data->dest_stream);
+  PerformRead(std::move(data));
+}
 
 }  // anonymous namespace
 
@@ -129,7 +172,7 @@
     pair.first->RemoveRequestHandlerAsync(
         pair.second,
         base::Bind(&base::DoNothing),
-        base::Bind(&IgnoreError));
+        base::Bind(&IgnoreDBusError));
   }
 
   request_handlers_.erase(p);
@@ -209,7 +252,7 @@
     const std::string& request_id,
     int status_code,
     const std::multimap<std::string, std::string>& headers,
-    const std::vector<uint8_t>& data) {
+    chromeos::StreamPtr data_stream) {
   ProtocolHandlerProxy* proxy = GetRequestProtocolHandlerProxy(request_id);
   if (!proxy)
     return;
@@ -219,9 +262,13 @@
   for (const auto& pair : headers)
     header_list.emplace_back(pair.first, pair.second);
 
-  proxy->CompleteRequestAsync(request_id, status_code, header_list, data,
-                              base::Bind(&base::DoNothing),
-                              base::Bind([](chromeos::Error*) {}));
+  int64_t data_size = -1;
+  if (data_stream->CanGetSize())
+    data_size = data_stream->GetRemainingSize();
+  proxy->CompleteRequestAsync(
+      request_id, status_code, header_list, data_size,
+      base::Bind(&WriteResponseData, base::Passed(&data_stream)),
+      base::Bind(&IgnoreDBusError));
 }
 
 void ProtocolHandler::GetFileData(
diff --git a/libwebserv/protocol_handler.h b/libwebserv/protocol_handler.h
index 91276d9..797c19a 100644
--- a/libwebserv/protocol_handler.h
+++ b/libwebserv/protocol_handler.h
@@ -167,7 +167,7 @@
       const std::string& request_id,
       int status_code,
       const std::multimap<std::string, std::string>& headers,
-      const std::vector<uint8_t>& data);
+      chromeos::StreamPtr data_stream);
 
   // Makes a call to the (remote) web server request handler over D-Bus to
   // obtain the file content of uploaded file (identified by |file_id|) during
diff --git a/libwebserv/response.cc b/libwebserv/response.cc
index 8fc49f7..9fc61d6 100644
--- a/libwebserv/response.cc
+++ b/libwebserv/response.cc
@@ -21,6 +21,7 @@
 #include <base/values.h>
 #include <chromeos/http/http_request.h>
 #include <chromeos/mime_utils.h>
+#include <chromeos/streams/memory_stream.h>
 #include <chromeos/strings/string_utils.h>
 #include <libwebserv/protocol_handler.h>
 
@@ -48,12 +49,11 @@
 }
 
 void Response::Reply(int status_code,
-                     const void* data,
-                     size_t data_size,
+                     chromeos::StreamPtr data_stream,
                      const std::string& mime_type) {
+  CHECK(data_stream);
   status_code_ = status_code;
-  const uint8_t* byte_ptr = static_cast<const uint8_t*>(data);
-  data_.assign(byte_ptr, byte_ptr + data_size);
+  data_stream_ = std::move(data_stream);
   AddHeader(chromeos::http::response_header::kContentType, mime_type);
   SendResponse();
 }
@@ -61,7 +61,8 @@
 void Response::ReplyWithText(int status_code,
                              const std::string& text,
                              const std::string& mime_type) {
-  Reply(status_code, text.data(), text.size(), mime_type);
+  Reply(status_code, chromeos::MemoryStream::OpenCopyOf(text, nullptr),
+        mime_type);
 }
 
 void Response::ReplyWithJson(int status_code, const base::Value* json) {
@@ -91,7 +92,7 @@
 
 void Response::ReplyWithError(int status_code, const std::string& error_text) {
   status_code_ = status_code;
-  data_.assign(error_text.begin(), error_text.end());
+  data_stream_ = chromeos::MemoryStream::OpenCopyOf(error_text, nullptr);
   SendResponse();
 }
 
@@ -102,7 +103,8 @@
 void Response::SendResponse() {
   CHECK(!reply_sent_) << "Response already sent";
   reply_sent_ = true;
-  handler_->CompleteRequest(request_id_, status_code_, headers_, data_);
+  handler_->CompleteRequest(request_id_, status_code_, headers_,
+                            std::move(data_stream_));
 }
 
 }  // namespace libwebserv
diff --git a/libwebserv/response.h b/libwebserv/response.h
index f3f2adf..1919c09 100644
--- a/libwebserv/response.h
+++ b/libwebserv/response.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include <base/macros.h>
+#include <chromeos/streams/stream.h>
 #include <libwebserv/export.h>
 
 namespace base {
@@ -47,8 +48,7 @@
 
   // Generic reply method for sending arbitrary binary data response.
   void Reply(int status_code,
-             const void* data,
-             size_t data_size,
+             chromeos::StreamPtr data_stream,
              const std::string& mime_type);
 
   // Reply with text body.
@@ -87,7 +87,7 @@
   ProtocolHandler* handler_{nullptr};
   std::string request_id_;
   int status_code_{0};
-  std::vector<uint8_t> data_;
+  chromeos::StreamPtr data_stream_;
   std::multimap<std::string, std::string> headers_;
   bool reply_sent_{false};
 
diff --git a/webservd/dbus_bindings/org.chromium.WebServer.ProtocolHandler.dbus-xml b/webservd/dbus_bindings/org.chromium.WebServer.ProtocolHandler.dbus-xml
index e6aa434..c191f70 100644
--- a/webservd/dbus_bindings/org.chromium.WebServer.ProtocolHandler.dbus-xml
+++ b/webservd/dbus_bindings/org.chromium.WebServer.ProtocolHandler.dbus-xml
@@ -36,11 +36,15 @@
     <method name="CompleteRequest">
       <tp:docstring>
         Fulfills the request with specified |request_id| and provides response.
+        |data_size| if the size of the data is known. Otherwise should be set to
+        a value of -1. The actual data is to be written to the pipe provided
+        in |response_stream| file descriptor.
       </tp:docstring>
       <arg name="request_id" type="s" direction="in"/>
       <arg name="status_code" type="i" direction="in"/>
       <arg name="headers" type="a(ss)" direction="in"/>
-      <arg name="data" type="ay" direction="in"/>
+      <arg name="data_size" type="x" direction="in"/>
+      <arg name="response_stream" type="h" direction="out"/>
       <annotation name="org.chromium.DBus.Method.Kind" value="normal"/>
     </method>
     <!-- Properties -->
diff --git a/webservd/dbus_protocol_handler.cc b/webservd/dbus_protocol_handler.cc
index acb91a9..28549b2 100644
--- a/webservd/dbus_protocol_handler.cc
+++ b/webservd/dbus_protocol_handler.cc
@@ -151,9 +151,9 @@
   if (!request)
     return false;
 
-  int data_fd = 0;
-  if (request->GetFileData(in_file_id, &data_fd)) {
-    out_contents->PutValue(data_fd);
+  base::File file = request->GetFileData(in_file_id);
+  if (file.IsValid()) {
+    out_contents->PutValue(file.TakePlatformFile());
     out_contents->CheckValidity();
     return true;
   }
@@ -172,12 +172,16 @@
     const std::string& in_request_id,
     int32_t in_status_code,
     const std::vector<std::tuple<std::string, std::string>>& in_headers,
-    const std::vector<uint8_t>& in_data) {
+    int64_t in_data_size,
+    dbus::FileDescriptor* out_response_stream) {
   auto request = GetRequest(in_request_id, error);
   if (!request)
     return false;
 
-  if (request->Complete(in_status_code, in_headers, in_data)) {
+  base::File file = request->Complete(in_status_code, in_headers, in_data_size);
+  if (file.IsValid()) {
+    out_response_stream->PutValue(file.TakePlatformFile());
+    out_response_stream->CheckValidity();
     return true;
   }
   chromeos::Error::AddTo(error, FROM_HERE, chromeos::errors::dbus::kDomain,
diff --git a/webservd/dbus_protocol_handler.h b/webservd/dbus_protocol_handler.h
index 5fbfeeb..66af163 100644
--- a/webservd/dbus_protocol_handler.h
+++ b/webservd/dbus_protocol_handler.h
@@ -78,7 +78,8 @@
       const std::string& in_request_id,
       int32_t in_status_code,
       const std::vector<std::tuple<std::string, std::string>>& in_headers,
-      const std::vector<uint8_t>& in_data) override;
+      int64_t in_data_size,
+      dbus::FileDescriptor* out_response_stream) override;
 
  private:
   using RequestHandlerProxy = org::chromium::WebServer::RequestHandlerProxy;
diff --git a/webservd/log_manager.cc b/webservd/log_manager.cc
index 1cd96ed..ff69cb3 100644
--- a/webservd/log_manager.cc
+++ b/webservd/log_manager.cc
@@ -196,15 +196,13 @@
 
   // Log file entry for one HTTP request looking like this:
   // 127.0.0.1 - - [25/Feb/2015:03:29:12 -0800] "GET /test HTTP/1.1" 200 2326
+  std::string size_string{"-"};
+  if (response_size >= 0)
+    size_string = std::to_string(response_size);
   std::string log_entry = base::StringPrintf(
-      "%s - - [%s] \"%s %s %s\" %d %" PRIu64 "\n",
-      ip_address.c_str(),
-      str_buf,
-      method.c_str(),
-      url.c_str(),
-      version.c_str(),
-      status_code,
-      response_size);
+      "%s - - [%s] \"%s %s %s\" %d %s\n", ip_address.c_str(), str_buf,
+      method.c_str(), url.c_str(), version.c_str(), status_code,
+      size_string.c_str());
   GetInstance()->logger_->Log(timestamp, log_entry);
 }
 
diff --git a/webservd/request.cc b/webservd/request.cc
index a794c40..bfc6822 100644
--- a/webservd/request.cc
+++ b/webservd/request.cc
@@ -100,13 +100,11 @@
   // web server to the remote request handler.
   int pipe_fds[2] = {-1, -1};
   CHECK_EQ(0, pipe(pipe_fds));
-  raw_data_pipe_in_ = base::File{pipe_fds[1]};
-  CHECK(raw_data_pipe_in_.IsValid());
-  raw_data_pipe_out_ = base::File{pipe_fds[0]};
-  CHECK(raw_data_pipe_out_.IsValid());
-  raw_data_stream_in_ = chromeos::FileStream::FromFileDescriptor(
-      raw_data_pipe_in_.GetPlatformFile(), false, nullptr);
-  CHECK(raw_data_stream_in_);
+  request_data_pipe_out_ = base::File{pipe_fds[0]};
+  CHECK(request_data_pipe_out_.IsValid());
+  request_data_stream_ = chromeos::FileStream::FromFileDescriptor(
+      pipe_fds[1], true, nullptr);
+  CHECK(request_data_stream_);
 
   // POST request processor.
   post_processor_ = MHD_create_post_processor(
@@ -120,39 +118,48 @@
   protocol_handler_->RemoveRequest(this);
 }
 
-bool Request::GetFileData(int file_id, int* contents_fd) {
-  if (file_id < 0 || static_cast<size_t>(file_id) >= file_info_.size())
-    return false;
-  base::File file(file_info_[file_id]->temp_file_name,
-                  base::File::FLAG_OPEN | base::File::FLAG_READ);
-  if (!file.IsValid())
-    return false;
-  *contents_fd = file.TakePlatformFile();
-  return true;
+base::File Request::GetFileData(int file_id) {
+  base::File file;
+  if (file_id >= 0 && static_cast<size_t>(file_id) < file_info_.size()) {
+    file.Initialize(file_info_[file_id]->temp_file_name,
+                    base::File::FLAG_OPEN | base::File::FLAG_READ);
+  }
+  return file.Pass();
 }
 
-bool Request::Complete(
+base::File Request::Complete(
     int32_t status_code,
     const std::vector<std::tuple<std::string, std::string>>& headers,
-    const std::vector<uint8_t>& data) {
+    int64_t in_data_size) {
+  base::File file;
   if (response_data_started_)
-    return false;
+    return file.Pass();
 
   response_status_code_ = status_code;
   response_headers_.reserve(headers.size());
   for (const auto& tuple : headers) {
     response_headers_.emplace_back(std::get<0>(tuple), std::get<1>(tuple));
   }
-  response_data_ = data;
+
+  // Create the pipe for response data.
+  int pipe_fds[2] = {-1, -1};
+  CHECK_EQ(0, pipe(pipe_fds));
+  file = base::File{pipe_fds[1]};
+  CHECK(file.IsValid());
+  response_data_stream_ = chromeos::FileStream::FromFileDescriptor(
+      pipe_fds[0], true, nullptr);
+  CHECK(response_data_stream_);
+
+  response_data_size_ = in_data_size;
   response_data_started_ = true;
   const MHD_ConnectionInfo* info =
       MHD_get_connection_info(connection_, MHD_CONNECTION_INFO_CLIENT_ADDRESS);
 
   const sockaddr* client_addr = (info ? info->client_addr : nullptr);
   LogManager::OnRequestCompleted(base::Time::Now(), client_addr, method_, url_,
-                                 version_, status_code, data.size());
+                                 version_, status_code, in_data_size);
   protocol_handler_->ScheduleWork();
-  return true;
+  return file.Pass();
 }
 
 bool Request::Complete(
@@ -163,8 +170,13 @@
   std::vector<std::tuple<std::string, std::string>> headers_copy;
   headers_copy.emplace_back(chromeos::http::response_header::kContentType,
                             mime_type);
-  return Complete(status_code, headers_copy,
-                  chromeos::string_utils::GetStringAsBytes(data));
+  base::File file = Complete(status_code, headers_copy, data.size());
+  bool success = false;
+  if (file.IsValid()) {
+    const int size = data.size();
+    success = (file.WriteAtCurrentPos(data.c_str(), size) == size);
+  }
+  return success;
 }
 
 const std::string& Request::GetProtocolHandlerID() const {
@@ -172,7 +184,7 @@
 }
 
 int Request::GetBodyDataFileDescriptor() const {
-  int fd = dup(raw_data_pipe_out_.GetPlatformFile());
+  int fd = dup(request_data_pipe_out_.GetPlatformFile());
   CHECK_GE(fd, 0);
   return fd;
 }
@@ -206,18 +218,18 @@
 
 void Request::EndRequestData() {
   if (!request_data_finished_) {
-    if (raw_data_stream_in_) {
-      raw_data_stream_in_->CloseBlocking(nullptr);
-      raw_data_pipe_in_.Close();
-    }
+    if (request_data_stream_)
+      request_data_stream_->CloseBlocking(nullptr);
     if (!request_forwarded_)
       ForwardRequestToHandler();
     request_data_finished_ = true;
   }
 
   if (response_data_started_ && !response_data_finished_) {
-    MHD_Response* resp = MHD_create_response_from_buffer(
-        response_data_.size(), response_data_.data(), MHD_RESPMEM_PERSISTENT);
+    MHD_Response* resp = MHD_create_response_from_callback(
+        response_data_size_, 4096, &Request::ResponseDataCallback, this,
+        nullptr);
+    CHECK(resp);
     for (const auto& pair : response_headers_) {
       MHD_add_response_header(resp, pair.first.c_str(), pair.second.c_str());
     }
@@ -265,10 +277,10 @@
 
 bool Request::AddRawRequestData(const void* data, size_t* size) {
   CHECK(*size);
-  CHECK(raw_data_stream_in_) << "Data pipe hasn't been created.";
+  CHECK(request_data_stream_) << "Data pipe hasn't been created.";
 
   size_t written = 0;
-  if (!raw_data_stream_in_->WriteNonBlocking(data, *size, &written, nullptr))
+  if (!request_data_stream_->WriteNonBlocking(data, *size, &written, nullptr))
     return false;
 
   CHECK_LE(written, *size);
@@ -282,7 +294,7 @@
 
   // If written at least some data, we are good. We will be called again if more
   // data is available.
-  if (written > 0)
+  if (written > 0 || waiting_for_data_)
     return true;
 
   // Nothing has been written. The output pipe is full. Need to stop the data
@@ -292,20 +304,54 @@
 
   // Now, just monitor the pipe and figure out when we can resume sending data
   // over it.
-  bool success = raw_data_stream_in_->WaitForData(
+  waiting_for_data_ = request_data_stream_->WaitForData(
       chromeos::Stream::AccessMode::WRITE,
       base::Bind(&Request::OnPipeAvailable, weak_ptr_factory_.GetWeakPtr()),
       nullptr);
 
-  if (!success)
+  if (!waiting_for_data_)
     MHD_resume_connection(connection_);
 
-  return success;
+  return waiting_for_data_;
+}
+
+ssize_t Request::ResponseDataCallback(void *cls, uint64_t pos, char *buf,
+                                      size_t max) {
+  Request* self = static_cast<Request*>(cls);
+  size_t read = 0;
+  bool eos = false;
+  if (!self->response_data_stream_->ReadNonBlocking(buf, max, &read, &eos,
+                                                    nullptr)) {
+    return MHD_CONTENT_READER_END_WITH_ERROR;
+  }
+
+  if (read > 0 || self->waiting_for_data_)
+    return read;
+
+  if (eos)
+    return MHD_CONTENT_READER_END_OF_STREAM;
+
+  // Nothing can be read. The input pipe is empty. Need to stop the data
+  // transfer on the connection and wait till some data is available from the
+  // pipe.
+  MHD_suspend_connection(self->connection_);
+
+  self->waiting_for_data_ = self->response_data_stream_->WaitForData(
+      chromeos::Stream::AccessMode::READ,
+      base::Bind(&Request::OnPipeAvailable,
+                 self->weak_ptr_factory_.GetWeakPtr()),
+      nullptr);
+
+  if (!self->waiting_for_data_) {
+    MHD_resume_connection(self->connection_);
+    return MHD_CONTENT_READER_END_WITH_ERROR;
+  }
+  return 0;
 }
 
 void Request::OnPipeAvailable(chromeos::Stream::AccessMode mode) {
-  CHECK(mode == chromeos::Stream::AccessMode::WRITE);
   MHD_resume_connection(connection_);
+  waiting_for_data_ = false;
   protocol_handler_->ScheduleWork();
 }
 
diff --git a/webservd/request.h b/webservd/request.h
index 2dbba6e..b76fb73 100644
--- a/webservd/request.h
+++ b/webservd/request.h
@@ -77,13 +77,13 @@
 
   // Obtains the file descriptor containing data of uploaded file identified
   // by |file_id|.
-  bool GetFileData(int file_id, int* contents_fd);
+  base::File GetFileData(int file_id);
 
   // Finishes the request and provides the reply data.
-  bool Complete(
+  base::File Complete(
       int32_t status_code,
       const std::vector<std::tuple<std::string, std::string>>& headers,
-      const std::vector<uint8_t>& data);
+      int64_t in_data_size);
 
   // Helper function to provide the string data and mime type.
   bool Complete(
@@ -172,6 +172,10 @@
   // Forwards the request to the request handler.
   void ForwardRequestToHandler();
 
+  // Response data callback for MHD_create_response_from_callback().
+  static ssize_t ResponseDataCallback(void* cls, uint64_t pos, char* buf,
+                                      size_t max);
+
   TempFileManager* GetTempFileManager();
 
   std::string id_;
@@ -181,16 +185,17 @@
   std::string version_;
   MHD_Connection* connection_{nullptr};
   MHD_PostProcessor* post_processor_{nullptr};
-  // Data pipe for request body data (in/write and out/read ends of the pipe).
-  base::File raw_data_pipe_in_;
-  base::File raw_data_pipe_out_;
-  // Data stream for the input end of the data pipe.
-  chromeos::StreamPtr raw_data_stream_in_;
+  // Data pipe for request body data (output/read end of the pipe).
+  base::File request_data_pipe_out_;
+  // Data stream for the input/write end of the request data pipe.
+  chromeos::StreamPtr request_data_stream_;
+
   bool last_posted_data_was_file_{false};
   bool request_forwarded_{false};
   bool request_data_finished_{false};
   bool response_data_started_{false};
   bool response_data_finished_{false};
+  bool waiting_for_data_{false};
 
   std::vector<PairOfStrings> post_data_;
   std::vector<PairOfStrings> get_data_;
@@ -198,7 +203,10 @@
   std::vector<PairOfStrings> headers_;
 
   int response_status_code_{0};
-  std::vector<uint8_t> response_data_;
+  // Data size of response, -1 if unknown.
+  int64_t response_data_size_{-1};
+  // Data stream for the output/read end of the response data pipe.
+  chromeos::StreamPtr response_data_stream_;
   std::vector<PairOfStrings> response_headers_;
   ProtocolHandler* protocol_handler_;