Implement streaming I/O for response data

Sending response to an HTTP request is now using streaming I/O.
ProtocolHandler::CompleteRequest() now returns a file descriptor
for the write-end of a pipe and clients (request handlers) can
write to this file descriptor data at a rate it becomes available.
When expected amount of data is sent (or the pipe is closed on
the handler's end) the response is finalized.

Both normal (when data size is known) and chunked responses are
supported.

BUG: 24166746
Change-Id: Ie98cfa4262ba82941aaefd4c9bfd9986c784219c
diff --git a/libwebserv/protocol_handler.cc b/libwebserv/protocol_handler.cc
index 544281f..81f7a5c 100644
--- a/libwebserv/protocol_handler.cc
+++ b/libwebserv/protocol_handler.cc
@@ -31,8 +31,51 @@
 
 namespace {
 
-// A dummy callback for async D-Bus errors.
-void IgnoreError(chromeos::Error* error) {}
+// Dummy callbacks for async D-Bus/Stream errors.
+void IgnoreDBusError(chromeos::Error* error) {}
+void IgnoreStreamError(const chromeos::Error* error) {}
+
+// Structure to hold the data needed for asynchronous copying of two streams.
+struct StreamCopyData {
+  chromeos::StreamPtr src_stream;
+  chromeos::StreamPtr dest_stream;
+  std::vector<uint8_t> buffer;
+};
+
+// Forward-declaration.
+void PerformRead(std::unique_ptr<StreamCopyData> data);
+
+// Async callback which writes data to the destination stream after read.
+void PerformWrite(std::unique_ptr<StreamCopyData> data, size_t size) {
+  if (size == 0) // We are all done.
+    return;
+  data->dest_stream->WriteAllAsync(
+      data->buffer.data(), size, base::Bind(&PerformRead, base::Passed(&data)),
+      base::Bind(&IgnoreStreamError), nullptr);
+}
+
+// Reads the data from the source stream into a buffer and invokes PerformWrite
+// when done.
+void PerformRead(std::unique_ptr<StreamCopyData> data) {
+  data->src_stream->ReadAsync(data->buffer.data(), data->buffer.size(),
+                              base::Bind(&PerformWrite, base::Passed(&data)),
+                              base::Bind(&IgnoreStreamError), nullptr);
+}
+
+// Copies the data from |src_stream| to the destination stream represented
+// by a file descriptor |fd|.
+void WriteResponseData(chromeos::StreamPtr src_stream,
+                       const dbus::FileDescriptor& fd) {
+  std::unique_ptr<StreamCopyData> data{new StreamCopyData};
+  int dupfd = dup(fd.value());
+  data->src_stream = std::move(src_stream);
+  data->dest_stream =
+      chromeos::FileStream::FromFileDescriptor(dupfd, true, nullptr);
+  data->buffer.resize(4096);  // Read buffer of 4 KiB.
+  CHECK(data->src_stream);
+  CHECK(data->dest_stream);
+  PerformRead(std::move(data));
+}
 
 }  // anonymous namespace
 
@@ -129,7 +172,7 @@
     pair.first->RemoveRequestHandlerAsync(
         pair.second,
         base::Bind(&base::DoNothing),
-        base::Bind(&IgnoreError));
+        base::Bind(&IgnoreDBusError));
   }
 
   request_handlers_.erase(p);
@@ -209,7 +252,7 @@
     const std::string& request_id,
     int status_code,
     const std::multimap<std::string, std::string>& headers,
-    const std::vector<uint8_t>& data) {
+    chromeos::StreamPtr data_stream) {
   ProtocolHandlerProxy* proxy = GetRequestProtocolHandlerProxy(request_id);
   if (!proxy)
     return;
@@ -219,9 +262,13 @@
   for (const auto& pair : headers)
     header_list.emplace_back(pair.first, pair.second);
 
-  proxy->CompleteRequestAsync(request_id, status_code, header_list, data,
-                              base::Bind(&base::DoNothing),
-                              base::Bind([](chromeos::Error*) {}));
+  int64_t data_size = -1;
+  if (data_stream->CanGetSize())
+    data_size = data_stream->GetRemainingSize();
+  proxy->CompleteRequestAsync(
+      request_id, status_code, header_list, data_size,
+      base::Bind(&WriteResponseData, base::Passed(&data_stream)),
+      base::Bind(&IgnoreDBusError));
 }
 
 void ProtocolHandler::GetFileData(