Implement streaming I/O for response data

Sending response to an HTTP request is now using streaming I/O.
ProtocolHandler::CompleteRequest() now returns a file descriptor
for the write-end of a pipe and clients (request handlers) can
write to this file descriptor data at a rate it becomes available.
When expected amount of data is sent (or the pipe is closed on
the handler's end) the response is finalized.

Both normal (when data size is known) and chunked responses are
supported.

BUG: 24166746
Change-Id: Ie98cfa4262ba82941aaefd4c9bfd9986c784219c
diff --git a/libwebserv/protocol_handler.cc b/libwebserv/protocol_handler.cc
index 544281f..81f7a5c 100644
--- a/libwebserv/protocol_handler.cc
+++ b/libwebserv/protocol_handler.cc
@@ -31,8 +31,51 @@
 
 namespace {
 
-// A dummy callback for async D-Bus errors.
-void IgnoreError(chromeos::Error* error) {}
+// Dummy callbacks for async D-Bus/Stream errors.
+void IgnoreDBusError(chromeos::Error* error) {}
+void IgnoreStreamError(const chromeos::Error* error) {}
+
+// Structure to hold the data needed for asynchronous copying of two streams.
+struct StreamCopyData {
+  chromeos::StreamPtr src_stream;
+  chromeos::StreamPtr dest_stream;
+  std::vector<uint8_t> buffer;
+};
+
+// Forward-declaration.
+void PerformRead(std::unique_ptr<StreamCopyData> data);
+
+// Async callback which writes data to the destination stream after read.
+void PerformWrite(std::unique_ptr<StreamCopyData> data, size_t size) {
+  if (size == 0) // We are all done.
+    return;
+  data->dest_stream->WriteAllAsync(
+      data->buffer.data(), size, base::Bind(&PerformRead, base::Passed(&data)),
+      base::Bind(&IgnoreStreamError), nullptr);
+}
+
+// Reads the data from the source stream into a buffer and invokes PerformWrite
+// when done.
+void PerformRead(std::unique_ptr<StreamCopyData> data) {
+  data->src_stream->ReadAsync(data->buffer.data(), data->buffer.size(),
+                              base::Bind(&PerformWrite, base::Passed(&data)),
+                              base::Bind(&IgnoreStreamError), nullptr);
+}
+
+// Copies the data from |src_stream| to the destination stream represented
+// by a file descriptor |fd|.
+void WriteResponseData(chromeos::StreamPtr src_stream,
+                       const dbus::FileDescriptor& fd) {
+  std::unique_ptr<StreamCopyData> data{new StreamCopyData};
+  int dupfd = dup(fd.value());
+  data->src_stream = std::move(src_stream);
+  data->dest_stream =
+      chromeos::FileStream::FromFileDescriptor(dupfd, true, nullptr);
+  data->buffer.resize(4096);  // Read buffer of 4 KiB.
+  CHECK(data->src_stream);
+  CHECK(data->dest_stream);
+  PerformRead(std::move(data));
+}
 
 }  // anonymous namespace
 
@@ -129,7 +172,7 @@
     pair.first->RemoveRequestHandlerAsync(
         pair.second,
         base::Bind(&base::DoNothing),
-        base::Bind(&IgnoreError));
+        base::Bind(&IgnoreDBusError));
   }
 
   request_handlers_.erase(p);
@@ -209,7 +252,7 @@
     const std::string& request_id,
     int status_code,
     const std::multimap<std::string, std::string>& headers,
-    const std::vector<uint8_t>& data) {
+    chromeos::StreamPtr data_stream) {
   ProtocolHandlerProxy* proxy = GetRequestProtocolHandlerProxy(request_id);
   if (!proxy)
     return;
@@ -219,9 +262,13 @@
   for (const auto& pair : headers)
     header_list.emplace_back(pair.first, pair.second);
 
-  proxy->CompleteRequestAsync(request_id, status_code, header_list, data,
-                              base::Bind(&base::DoNothing),
-                              base::Bind([](chromeos::Error*) {}));
+  int64_t data_size = -1;
+  if (data_stream->CanGetSize())
+    data_size = data_stream->GetRemainingSize();
+  proxy->CompleteRequestAsync(
+      request_id, status_code, header_list, data_size,
+      base::Bind(&WriteResponseData, base::Passed(&data_stream)),
+      base::Bind(&IgnoreDBusError));
 }
 
 void ProtocolHandler::GetFileData(
diff --git a/libwebserv/protocol_handler.h b/libwebserv/protocol_handler.h
index 91276d9..797c19a 100644
--- a/libwebserv/protocol_handler.h
+++ b/libwebserv/protocol_handler.h
@@ -167,7 +167,7 @@
       const std::string& request_id,
       int status_code,
       const std::multimap<std::string, std::string>& headers,
-      const std::vector<uint8_t>& data);
+      chromeos::StreamPtr data_stream);
 
   // Makes a call to the (remote) web server request handler over D-Bus to
   // obtain the file content of uploaded file (identified by |file_id|) during
diff --git a/libwebserv/response.cc b/libwebserv/response.cc
index 8fc49f7..9fc61d6 100644
--- a/libwebserv/response.cc
+++ b/libwebserv/response.cc
@@ -21,6 +21,7 @@
 #include <base/values.h>
 #include <chromeos/http/http_request.h>
 #include <chromeos/mime_utils.h>
+#include <chromeos/streams/memory_stream.h>
 #include <chromeos/strings/string_utils.h>
 #include <libwebserv/protocol_handler.h>
 
@@ -48,12 +49,11 @@
 }
 
 void Response::Reply(int status_code,
-                     const void* data,
-                     size_t data_size,
+                     chromeos::StreamPtr data_stream,
                      const std::string& mime_type) {
+  CHECK(data_stream);
   status_code_ = status_code;
-  const uint8_t* byte_ptr = static_cast<const uint8_t*>(data);
-  data_.assign(byte_ptr, byte_ptr + data_size);
+  data_stream_ = std::move(data_stream);
   AddHeader(chromeos::http::response_header::kContentType, mime_type);
   SendResponse();
 }
@@ -61,7 +61,8 @@
 void Response::ReplyWithText(int status_code,
                              const std::string& text,
                              const std::string& mime_type) {
-  Reply(status_code, text.data(), text.size(), mime_type);
+  Reply(status_code, chromeos::MemoryStream::OpenCopyOf(text, nullptr),
+        mime_type);
 }
 
 void Response::ReplyWithJson(int status_code, const base::Value* json) {
@@ -91,7 +92,7 @@
 
 void Response::ReplyWithError(int status_code, const std::string& error_text) {
   status_code_ = status_code;
-  data_.assign(error_text.begin(), error_text.end());
+  data_stream_ = chromeos::MemoryStream::OpenCopyOf(error_text, nullptr);
   SendResponse();
 }
 
@@ -102,7 +103,8 @@
 void Response::SendResponse() {
   CHECK(!reply_sent_) << "Response already sent";
   reply_sent_ = true;
-  handler_->CompleteRequest(request_id_, status_code_, headers_, data_);
+  handler_->CompleteRequest(request_id_, status_code_, headers_,
+                            std::move(data_stream_));
 }
 
 }  // namespace libwebserv
diff --git a/libwebserv/response.h b/libwebserv/response.h
index f3f2adf..1919c09 100644
--- a/libwebserv/response.h
+++ b/libwebserv/response.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include <base/macros.h>
+#include <chromeos/streams/stream.h>
 #include <libwebserv/export.h>
 
 namespace base {
@@ -47,8 +48,7 @@
 
   // Generic reply method for sending arbitrary binary data response.
   void Reply(int status_code,
-             const void* data,
-             size_t data_size,
+             chromeos::StreamPtr data_stream,
              const std::string& mime_type);
 
   // Reply with text body.
@@ -87,7 +87,7 @@
   ProtocolHandler* handler_{nullptr};
   std::string request_id_;
   int status_code_{0};
-  std::vector<uint8_t> data_;
+  chromeos::StreamPtr data_stream_;
   std::multimap<std::string, std::string> headers_;
   bool reply_sent_{false};