[IPC 4] Add client-side logic.

Introduces two classes + corresponding impl(s).
Client is the class that talks with the host and forwards
methods invocations through protobuf. Is essentially a
multiplexer of invocations coming from the various
autogenerated stubs, which are subclasses of ServiceProxy.

ServiceProxy is the base class for autogenerated stubs. It
keeps tracks of pending requests for each Service.

So the final picture will be as follows:

ServiceProxy_ForServiceA  \
ServiceProxy_ForServiceB  --> Client(Impl) -> UnixSocket
ServiceProxy_ForServiceC  /

ServiceProxy instances' and Client's lifetimes are decoulped
from each other via weak pointers.

Bug: 68854111
Change-Id: I90d9d90727068afa86aab62779605afa508c6ca0
diff --git a/ipc/src/buffered_frame_deserializer.cc b/ipc/src/buffered_frame_deserializer.cc
index 452a939..a6abecd 100644
--- a/ipc/src/buffered_frame_deserializer.cc
+++ b/ipc/src/buffered_frame_deserializer.cc
@@ -38,6 +38,9 @@
 // Size of the PROT_NONE guard region, adjactent to the end of the buffer.
 // It's a safety net to spot any out-of-bounds writes early.
 constexpr size_t kGuardRegionSize = kPageSize;
+
+// The header is just the number of bytes of the Frame protobuf message.
+constexpr size_t kHeaderSize = sizeof(uint32_t);
 }  // namespace
 
 BufferedFrameDeserializer::BufferedFrameDeserializer(size_t max_capacity)
@@ -104,9 +107,6 @@
   // empty (we drained all the complete frames) or starts with the header of the
   // next, still incomplete, frame.
 
-  // The header is just the number of bytes of the Frame protobuf message.
-  const size_t kHeaderSize = sizeof(uint32_t);
-
   size_t consumed_size = 0;
   for (;;) {
     if (size_ < consumed_size + kHeaderSize)
@@ -193,5 +193,19 @@
     decoded_frames_.push_back(std::move(frame));
 }
 
+// static
+std::string BufferedFrameDeserializer::Serialize(const Frame& frame) {
+  std::string buf;
+  buf.reserve(1024);  // Just an educated guess to avoid trivial expansions.
+  buf.insert(0, kHeaderSize, 0);  // Reserve the space for the header.
+  frame.AppendToString(&buf);
+  const uint32_t payload_size = static_cast<uint32_t>(buf.size() - kHeaderSize);
+  PERFETTO_DCHECK(payload_size == frame.GetCachedSize());
+  char header[kHeaderSize];
+  memcpy(header, base::AssumeLittleEndian(&payload_size), kHeaderSize);
+  buf.replace(0, kHeaderSize, header, kHeaderSize);
+  return buf;
+}
+
 }  // namespace ipc
 }  // namespace perfetto
diff --git a/ipc/src/buffered_frame_deserializer.h b/ipc/src/buffered_frame_deserializer.h
index 0a90ab8..0aca5e7 100644
--- a/ipc/src/buffered_frame_deserializer.h
+++ b/ipc/src/buffered_frame_deserializer.h
@@ -82,6 +82,11 @@
   explicit BufferedFrameDeserializer(size_t max_capacity = 128 * 1024);
   ~BufferedFrameDeserializer();
 
+  // This function doesn't really belong here as it does Serialization, unlike
+  // the rest of this class. However it is so small and has so many dependencies
+  // in common that doesn't justify having its own class.
+  static std::string Serialize(const Frame&);
+
   // Returns a buffer that can be passed to recv(). The buffer is deliberately
   // not initialized.
   ReceiveBuffer BeginReceive();
diff --git a/ipc/src/client_impl.cc b/ipc/src/client_impl.cc
new file mode 100644
index 0000000..b99e379
--- /dev/null
+++ b/ipc/src/client_impl.cc
@@ -0,0 +1,229 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/src/client_impl.h"
+
+#include <inttypes.h>
+
+#include "base/task_runner.h"
+#include "base/utils.h"
+#include "ipc/service_descriptor.h"
+#include "ipc/service_proxy.h"
+
+// TODO(primiano): Add ThreadChecker everywhere.
+
+// TODO(primiano): Add timeouts.
+
+namespace perfetto {
+namespace ipc {
+
+// static
+std::unique_ptr<Client> Client::CreateInstance(const char* socket_name,
+                                               base::TaskRunner* task_runner) {
+  std::unique_ptr<Client> client(new ClientImpl(socket_name, task_runner));
+  return client;
+}
+
+ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
+    : task_runner_(task_runner), weak_ptr_factory_(this) {
+  GOOGLE_PROTOBUF_VERIFY_VERSION;
+  sock_ = UnixSocket::Connect(socket_name, this, task_runner);
+}
+
+ClientImpl::~ClientImpl() {
+  OnDisconnect(nullptr);  // The UnixSocket* ptr is not used in OnDisconnect().
+}
+
+void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
+  if (!service_proxy)
+    return;
+  if (!sock_->is_connected())
+    return queued_bindings_.emplace_back(service_proxy);
+  RequestID request_id = ++last_request_id_;
+  Frame frame;
+  frame.set_request_id(request_id);
+  Frame::BindService* req = frame.mutable_msg_bind_service();
+  const char* const service_name = service_proxy->GetDescriptor().service_name;
+  req->set_service_name(service_name);
+  if (!SendFrame(frame)) {
+    PERFETTO_DLOG("BindService(%s) failed", service_name);
+    return service_proxy->OnConnect(false /* success */);
+  }
+  QueuedRequest qr;
+  qr.type = Frame::kMsgBindService;
+  qr.request_id = request_id;
+  qr.service_proxy = service_proxy;
+  queued_requests_.emplace(request_id, std::move(qr));
+}
+
+void ClientImpl::UnbindService(ServiceID service_id) {
+  service_bindings_.erase(service_id);
+}
+
+RequestID ClientImpl::BeginInvoke(ServiceID service_id,
+                                  const std::string& method_name,
+                                  MethodID remote_method_id,
+                                  const ProtoMessage& method_args,
+                                  base::WeakPtr<ServiceProxy> service_proxy) {
+  std::string args_proto;
+  RequestID request_id = ++last_request_id_;
+  Frame frame;
+  frame.set_request_id(request_id);
+  Frame::InvokeMethod* req = frame.mutable_msg_invoke_method();
+  req->set_service_id(service_id);
+  req->set_method_id(remote_method_id);
+  bool did_serialize = method_args.SerializeToString(&args_proto);
+  req->set_args_proto(args_proto);
+  if (!did_serialize || !SendFrame(frame)) {
+    PERFETTO_DLOG("BeginInvoke() failed while sending the frame");
+    return 0;
+  }
+  QueuedRequest qr;
+  qr.type = Frame::kMsgInvokeMethod;
+  qr.request_id = request_id;
+  qr.method_name = method_name;
+  qr.service_proxy = service_proxy;
+  queued_requests_.emplace(request_id, std::move(qr));
+  return request_id;
+}
+
+bool ClientImpl::SendFrame(const Frame& frame) {
+  // Serialize the frame into protobuf, add the size header, and send it.
+  std::string buf = BufferedFrameDeserializer::Serialize(frame);
+
+  // TODO(primiano): remember that this is doing non-blocking I/O. What if the
+  // socket buffer is full? Maybe we just want to drop this on the floor? Or
+  // maybe throttle the send and PostTask the reply later?
+  return sock_->Send(buf.data(), buf.size());
+}
+
+void ClientImpl::OnConnect(UnixSocket*, bool connected) {
+  // Drain the BindService() calls that were queued before establishig the
+  // connection with the host.
+  if (connected) {
+    for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings_)
+      BindService(service_proxy);
+  }
+  queued_bindings_.clear();
+}
+
+void ClientImpl::OnDisconnect(UnixSocket*) {
+  for (auto it : service_bindings_) {
+    base::WeakPtr<ServiceProxy>& service_proxy = it.second;
+    task_runner_->PostTask([service_proxy] {
+      if (service_proxy)
+        service_proxy->OnDisconnect();
+    });
+  }
+  service_bindings_.clear();
+  queued_bindings_.clear();
+}
+
+void ClientImpl::OnDataAvailable(UnixSocket*) {
+  size_t rsize;
+  do {
+    auto buf = frame_deserializer_.BeginReceive();
+    rsize = sock_->Receive(buf.data, buf.size);
+    if (!frame_deserializer_.EndReceive(rsize)) {
+      // The endpoint tried to send a frame that is way too large.
+      return sock_->Shutdown();  // In turn will trigger an OnDisconnect().
+      // TODO check this.
+    }
+  } while (rsize > 0);
+
+  while (std::unique_ptr<Frame> frame = frame_deserializer_.PopNextFrame())
+    OnFrameReceived(*frame);
+}
+
+void ClientImpl::OnFrameReceived(const Frame& frame) {
+  auto queued_requests_it = queued_requests_.find(frame.request_id());
+  if (queued_requests_it == queued_requests_.end()) {
+    PERFETTO_DLOG("OnFrameReceived(): got invalid request_id=%" PRIu64,
+                  static_cast<uint64_t>(frame.request_id()));
+    return;
+  }
+  QueuedRequest req = std::move(queued_requests_it->second);
+  queued_requests_.erase(queued_requests_it);
+
+  if (req.type == Frame::kMsgBindService &&
+      frame.msg_case() == Frame::kMsgBindServiceReply) {
+    return OnBindServiceReply(std::move(req), frame.msg_bind_service_reply());
+  }
+  if (req.type == Frame::kMsgInvokeMethod &&
+      frame.msg_case() == Frame::kMsgInvokeMethodReply) {
+    return OnInvokeMethodReply(std::move(req), frame.msg_invoke_method_reply());
+  }
+  if (frame.msg_case() == Frame::kMsgRequestError) {
+    PERFETTO_DLOG("Host error: %s", frame.msg_request_error().error());
+    return;
+  }
+
+  PERFETTO_DLOG(
+      "OnFrameReceived() request msg_type=%d, received msg_type=%d in reply to "
+      "request_id=%" PRIu64,
+      req.type, frame.msg_case(), static_cast<uint64_t>(frame.request_id()));
+}
+
+void ClientImpl::OnBindServiceReply(QueuedRequest req,
+                                    const Frame::BindServiceReply& reply) {
+  base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
+  if (!service_proxy)
+    return;
+  if (!reply.success()) {
+    PERFETTO_DLOG("BindService(): unknown service_name=\"%s\"",
+                  service_proxy->GetDescriptor().service_name);
+    return service_proxy->OnConnect(false /* success */);
+  }
+
+  // Build the method [name] -> [remote_id] map.
+  std::map<std::string, MethodID> methods;
+  for (const auto& method : reply.methods()) {
+    if (method.name().empty() || method.id() <= 0) {
+      PERFETTO_DLOG("OnBindServiceReply(): invalid method \"%s\" -> %" PRIu32,
+                    method.name().c_str(), method.id());
+      continue;
+    }
+    methods[method.name()] = method.id();
+  }
+  service_proxy->InitializeBinding(weak_ptr_factory_.GetWeakPtr(),
+                                   reply.service_id(), std::move(methods));
+  service_bindings_[reply.service_id()] = service_proxy;
+  service_proxy->OnConnect(true /* success */);
+}
+
+void ClientImpl::OnInvokeMethodReply(QueuedRequest req,
+                                     const Frame::InvokeMethodReply& reply) {
+  base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
+  if (!service_proxy)
+    return;
+  std::unique_ptr<ProtoMessage> decoded_reply;
+  if (reply.success()) {
+    // TODO this could be optimized, stop doing method name string lookups.
+    for (const auto& method : service_proxy->GetDescriptor().methods) {
+      if (req.method_name == method.name) {
+        decoded_reply = method.reply_proto_decoder(reply.reply_proto());
+        break;
+      }
+    }
+  }
+  service_proxy->EndInvoke(req.request_id, std::move(decoded_reply),
+                           reply.has_more());
+}
+
+ClientImpl::QueuedRequest::QueuedRequest() = default;
+
+}  // namespace ipc
+}  // namespace perfetto
diff --git a/ipc/src/client_impl.h b/ipc/src/client_impl.h
new file mode 100644
index 0000000..b50ee78
--- /dev/null
+++ b/ipc/src/client_impl.h
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef IPC_SRC_CLIENT_IMPL_H_
+#define IPC_SRC_CLIENT_IMPL_H_
+
+#include "base/task_runner.h"
+#include "ipc/client.h"
+#include "ipc/src/buffered_frame_deserializer.h"
+#include "ipc/src/unix_socket.h"
+
+#include "wire_protocol.pb.h"
+
+#include <list>
+#include <map>
+#include <memory>
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+}  // namespace base
+
+namespace ipc {
+
+class ServiceDescriptor;
+
+class ClientImpl : public Client, public UnixSocket::EventListener {
+ public:
+  ClientImpl(const char* socket_name, base::TaskRunner*);
+  ~ClientImpl() override;
+
+  // Client implementation.
+  void BindService(base::WeakPtr<ServiceProxy>) override;
+  void UnbindService(ServiceID) override;
+
+  // UnixSocket::EventListener implementation.
+  void OnConnect(UnixSocket*, bool connected) override;
+  void OnDisconnect(UnixSocket*) override;
+  void OnDataAvailable(UnixSocket*) override;
+
+  RequestID BeginInvoke(ServiceID,
+                        const std::string& method_name,
+                        MethodID remote_method_id,
+                        const ProtoMessage& method_args,
+                        base::WeakPtr<ServiceProxy>);
+
+ private:
+  struct QueuedRequest {
+    QueuedRequest();
+    int type = 0;  // From Frame::msg_case(), see wire_protocol.proto.
+    RequestID request_id = 0;
+    base::WeakPtr<ServiceProxy> service_proxy;
+
+    // Only for type == kMsgInvokeMethod.
+    std::string method_name;
+  };
+
+  ClientImpl(const ClientImpl&) = delete;
+  ClientImpl& operator=(const ClientImpl&) = delete;
+
+  bool SendFrame(const Frame&);
+  void OnFrameReceived(const Frame&);
+  void OnBindServiceReply(QueuedRequest, const Frame::BindServiceReply&);
+  void OnInvokeMethodReply(QueuedRequest, const Frame::InvokeMethodReply&);
+
+  std::unique_ptr<UnixSocket> sock_;
+  base::TaskRunner* const task_runner_;
+  RequestID last_request_id_ = 0;
+  BufferedFrameDeserializer frame_deserializer_;
+  std::map<RequestID, QueuedRequest> queued_requests_;
+  std::map<ServiceID, base::WeakPtr<ServiceProxy>> service_bindings_;
+  base::WeakPtrFactory<Client> weak_ptr_factory_;
+
+  // Queue of calls to BindService() that happened before the socket connected.
+  std::list<base::WeakPtr<ServiceProxy>> queued_bindings_;
+};
+
+}  // namespace ipc
+}  // namespace perfetto
+
+#endif  // IPC_SRC_CLIENT_IMPL_H_
diff --git a/ipc/src/client_impl_unittest.cc b/ipc/src/client_impl_unittest.cc
new file mode 100644
index 0000000..0df5900
--- /dev/null
+++ b/ipc/src/client_impl_unittest.cc
@@ -0,0 +1,202 @@
+/*
+ * Copyright (C) 2017 The Android Open foo Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/src/client_impl.h"
+
+#include "base/test/test_task_runner.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "ipc/service_descriptor.h"
+#include "ipc/service_proxy.h"
+#include "ipc/src/buffered_frame_deserializer.h"
+#include "ipc/src/unix_socket.h"
+
+#include "client_unittest_messages.pb.h"
+
+namespace perfetto {
+namespace ipc {
+namespace {
+
+using ::testing::_;
+using ::testing::Invoke;
+
+constexpr char kSockName[] = "/tmp/perfetto_client_impl_unittest.sock";
+
+class FakeProxy : public ServiceProxy {
+ public:
+  static std::unique_ptr<ProtoMessage> ReplyDecoder(const std::string& proto) {
+    std::unique_ptr<ProtoMessage> reply(new ReplyProto());
+    EXPECT_TRUE(reply->ParseFromString(proto));
+    return reply;
+  }
+
+  FakeProxy(const char* service_name, ServiceProxy::EventListener* el)
+      : ServiceProxy(el), service_name_(service_name) {}
+
+  const ServiceDescriptor& GetDescriptor() override {
+    if (!descriptor_.service_name) {
+      descriptor_.service_name = service_name_;
+      descriptor_.methods.push_back({"FakeMethod1", nullptr, &ReplyDecoder});
+    }
+    return descriptor_;
+  }
+
+  const char* service_name_;
+  ServiceDescriptor descriptor_;
+};
+
+class MockEventListener : public ServiceProxy::EventListener {
+ public:
+  MOCK_METHOD1(OnConnect, void(bool));
+  MOCK_METHOD0(OnDisconnect, void());
+};
+
+class FakeHost : public UnixSocket::EventListener {
+ public:
+  MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD1(OnFrameReceived, std::unique_ptr<Frame>(const Frame&));
+
+  explicit FakeHost(base::TaskRunner* task_runner) {
+    unlink(kSockName);
+    listening_sock = UnixSocket::Listen(kSockName, this, task_runner);
+    EXPECT_TRUE(listening_sock->is_listening());
+  }
+
+  ~FakeHost() override { unlink(kSockName); }
+
+  // UnixSocket::EventListener implementation.
+  void OnNewIncomingConnection(
+      UnixSocket*,
+      std::unique_ptr<UnixSocket> new_connection) override {
+    ASSERT_FALSE(client_sock);
+    client_sock = std::move(new_connection);
+  }
+
+  void OnDisconnect(UnixSocket*) override { OnDisconnect(); }
+
+  void OnDataAvailable(UnixSocket* sock) override {
+    if (sock != client_sock.get())
+      return;
+    auto buf = frame_deserializer.BeginReceive();
+    size_t rsize = client_sock->Receive(buf.data, buf.size);
+    EXPECT_TRUE(frame_deserializer.EndReceive(rsize));
+    while (std::unique_ptr<Frame> frame = frame_deserializer.PopNextFrame()) {
+      std::unique_ptr<Frame> reply = OnFrameReceived(*frame);
+      Reply(*reply);
+    }
+  }
+
+  void Reply(const Frame& frame) {
+    auto buf = BufferedFrameDeserializer::Serialize(frame);
+    ASSERT_TRUE(client_sock->is_connected());
+    EXPECT_TRUE(client_sock->Send(buf.data(), buf.size()));
+  }
+
+  BufferedFrameDeserializer frame_deserializer;
+  std::unique_ptr<UnixSocket> listening_sock;
+  std::unique_ptr<UnixSocket> client_sock;
+};
+
+TEST(ClientImplTest, BindAndInvokeMethod) {
+  static constexpr ServiceID kServiceID = 42;
+  static constexpr MethodID kMethodID = 13;
+
+  base::TestTaskRunner task_runner;
+  FakeHost host(&task_runner);
+  std::unique_ptr<Client> cli = Client::CreateInstance(kSockName, &task_runner);
+  MockEventListener event_listener;
+  std::unique_ptr<FakeProxy> proxy(new FakeProxy("FakeSvc", &event_listener));
+
+  // Bind to the host.
+  EXPECT_CALL(host, OnFrameReceived(_)).WillOnce(Invoke([](const Frame& req) {
+    EXPECT_EQ(Frame::kMsgBindService, req.msg_case());
+    EXPECT_EQ("FakeSvc", req.msg_bind_service().service_name());
+    std::unique_ptr<Frame> reply(new Frame());
+    reply->set_request_id(req.request_id());
+    reply->mutable_msg_bind_service_reply()->set_success(true);
+    reply->mutable_msg_bind_service_reply()->set_service_id(kServiceID);
+    auto* method = reply->mutable_msg_bind_service_reply()->add_methods();
+    method->set_name("FakeMethod1");
+    method->set_id(kMethodID);
+    return reply;
+  }));
+  cli->BindService(proxy->GetWeakPtr());
+  auto on_connect = task_runner.CreateCheckpoint("on_connect");
+  EXPECT_CALL(event_listener, OnConnect(true))
+      .WillOnce(Invoke([on_connect](bool) { on_connect(); }));
+  task_runner.RunUntilCheckpoint("on_connect");
+
+  // Invoke a valid method.
+  EXPECT_CALL(host, OnFrameReceived(_)).WillOnce(Invoke([](const Frame& req) {
+    EXPECT_EQ(Frame::kMsgInvokeMethod, req.msg_case());
+    EXPECT_EQ(kServiceID, req.msg_invoke_method().service_id());
+    EXPECT_EQ(kMethodID, req.msg_invoke_method().method_id());
+    RequestProto req_args;
+    EXPECT_TRUE(req_args.ParseFromString(req.msg_invoke_method().args_proto()));
+    EXPECT_EQ("req_data", req_args.data());
+
+    std::unique_ptr<Frame> reply(new Frame());
+    reply->set_request_id(req.request_id());
+    ReplyProto reply_args;
+    reply->mutable_msg_invoke_method_reply()->set_reply_proto(
+        reply_args.SerializeAsString());
+    reply->mutable_msg_invoke_method_reply()->set_success(true);
+    return reply;
+  }));
+
+  RequestProto req;
+  req.set_data("req_data");
+  auto on_invoke_reply = task_runner.CreateCheckpoint("on_invoke_reply");
+  DeferredBase deferred_reply(
+      [on_invoke_reply](AsyncResult<ProtoMessage> reply) {
+        EXPECT_TRUE(reply.success());
+        on_invoke_reply();
+      });
+  proxy->BeginInvoke("FakeMethod1", req, std::move(deferred_reply));
+  task_runner.RunUntilCheckpoint("on_invoke_reply");
+
+  // Invoke an invalid method.
+  EXPECT_CALL(host, OnFrameReceived(_)).WillOnce(Invoke([](const Frame& frame) {
+    EXPECT_EQ(Frame::kMsgInvokeMethod, frame.msg_case());
+    std::unique_ptr<Frame> reply(new Frame());
+    reply->set_request_id(frame.request_id());
+    reply->mutable_msg_invoke_method_reply()->set_success(false);
+    return reply;
+  }));
+
+  auto on_invalid_invoke = task_runner.CreateCheckpoint("on_invalid_invoke");
+  DeferredBase deferred_reply2(
+      [on_invalid_invoke](AsyncResult<ProtoMessage> reply) {
+        EXPECT_FALSE(reply.success());
+        on_invalid_invoke();
+      });
+  RequestProto empty_req;
+  proxy->BeginInvoke("FakeMethod1", empty_req, std::move(deferred_reply2));
+  task_runner.RunUntilCheckpoint("on_invalid_invoke");
+}
+
+// TODO(primiano): add the tests below in next CLs.
+// TEST(ClientImplTest, UnbindService) {}
+// TEST(ClientImplTest, BindAndInvokeStreamingMethod) {}
+// TEST(ClientImplTest, HostConnectionFailure) {}
+// TEST(ClientImplTest, HostPrematureDisconnect) {}
+// TEST(ClientImplTest, UnparsableReply) {}
+// TEST(ClientImplTest, ProxyDestroyedBeforeClient) {}
+// TEST(ClientImplTest, ClientDestroyedBeforeProxy) {}
+
+}  // namespace
+}  // namespace ipc
+}  // namespace perfetto
diff --git a/ipc/src/service_proxy.cc b/ipc/src/service_proxy.cc
new file mode 100644
index 0000000..9f4b53d
--- /dev/null
+++ b/ipc/src/service_proxy.cc
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ipc/service_proxy.h"
+
+#include <utility>
+
+#include "base/logging.h"
+#include "base/weak_ptr.h"
+#include "google/protobuf/message_lite.h"
+#include "ipc/service_descriptor.h"
+#include "ipc/src/client_impl.h"
+
+namespace perfetto {
+namespace ipc {
+
+ServiceProxy::ServiceProxy(EventListener* event_listener)
+    : weak_ptr_factory_(this), event_listener_(event_listener) {}
+
+ServiceProxy::~ServiceProxy() {
+  if (client_ && connected())
+    client_->UnbindService(service_id_);
+};
+
+void ServiceProxy::InitializeBinding(
+    base::WeakPtr<Client> client,
+    ServiceID service_id,
+    std::map<std::string, MethodID> remote_method_ids) {
+  client_ = client;
+  service_id_ = service_id;
+  remote_method_ids_ = std::move(remote_method_ids);
+}
+
+void ServiceProxy::BeginInvoke(const std::string& method_name,
+                               const ProtoMessage& request,
+                               DeferredBase reply) {
+  // |reply| will auto-resolve if it gets out of scope early.
+  if (!connected()) {
+    PERFETTO_DCHECK(false);
+    return;
+  }
+  if (!client_)
+    return;  // The Client object has been destroyed in the meantime.
+
+  auto remote_method_it = remote_method_ids_.find(method_name);
+  RequestID request_id = 0;
+  if (remote_method_it != remote_method_ids_.end()) {
+    request_id =
+        static_cast<ClientImpl*>(client_.get())
+            ->BeginInvoke(service_id_, method_name, remote_method_it->second,
+                          request, weak_ptr_factory_.GetWeakPtr());
+  }
+  if (!request_id)
+    return;
+  PERFETTO_DCHECK(pending_callbacks_.count(request_id) == 0);
+  pending_callbacks_.emplace(request_id, std::move(reply));
+}
+
+void ServiceProxy::EndInvoke(RequestID request_id,
+                             std::unique_ptr<ProtoMessage> result,
+                             bool has_more) {
+  auto callback_it = pending_callbacks_.find(request_id);
+  if (callback_it == pending_callbacks_.end()) {
+    PERFETTO_DCHECK(false);
+    return;
+  }
+  DeferredBase& reply_callback = callback_it->second;
+  AsyncResult<ProtoMessage> reply(std::move(result), has_more);
+  reply_callback.Resolve(std::move(reply));
+  if (!has_more)
+    pending_callbacks_.erase(callback_it);
+}
+
+void ServiceProxy::OnConnect(bool success) {
+  event_listener_->OnConnect(success);
+}
+
+void ServiceProxy::OnDisconnect() {
+  pending_callbacks_.clear();  // Will Reject() all the pending callbacks.
+  event_listener_->OnDisconnect();
+}
+
+base::WeakPtr<ServiceProxy> ServiceProxy::GetWeakPtr() const {
+  return weak_ptr_factory_.GetWeakPtr();
+}
+
+}  // namespace ipc
+}  // namespace perfetto
diff --git a/ipc/src/test/client_unittest_messages.proto b/ipc/src/test/client_unittest_messages.proto
new file mode 100644
index 0000000..6ff32e8
--- /dev/null
+++ b/ipc/src/test/client_unittest_messages.proto
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+option optimize_for = LITE_RUNTIME;
+
+package perfetto.ipc;
+
+message RequestProto {
+  string data = 1;
+}
+
+message ReplyProto {
+  string data = 2;  // 2 here is deliberately != RequestProto.data ID (1).
+}
diff --git a/ipc/src/test/greeter_service-gen.cc b/ipc/src/test/greeter_service-gen.cc
new file mode 100644
index 0000000..53e14d7
--- /dev/null
+++ b/ipc/src/test/greeter_service-gen.cc
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO(primiano): this file should be autogenerated by our the protobuf plugin.
+
+#include "greeter_service.pb.h"
+
+#include "ipc/basic_types.h"
+#include "ipc/service_descriptor.h"
+#include "ipc/src/test/greeter_service-gen.h"
+
+#include <memory>
+
+using ::perfetto::ipc::ServiceDescriptor;
+
+namespace ipc_test {
+
+namespace {
+
+template <typename T>
+using _AsyncResult = ::perfetto::ipc::AsyncResult<T>;
+
+template <typename T>
+using _Deferred = ::perfetto::ipc::Deferred<T>;
+using _DeferredBase = ::perfetto::ipc::DeferredBase;
+using _ProtoMessage = ::perfetto::ipc::ProtoMessage;
+
+// A templated protobuf message decoder. Returns nullptr in case of failure.
+template <typename T>
+std::unique_ptr<_ProtoMessage> Decoder(const std::string& proto_data) {
+  std::unique_ptr<_ProtoMessage> msg(new T());
+  if (msg->ParseFromString(proto_data))
+    return msg;
+  return nullptr;
+}
+
+ServiceDescriptor* CreateDescriptor() {
+  ServiceDescriptor* desc = new ServiceDescriptor();
+  desc->service_name = "Greeter";
+
+  // rpc SayHello(GreeterRequestMsg) returns (GreeterReplyMsg) {}
+  desc->methods.emplace_back(ServiceDescriptor::Method{
+      "SayHello", &Decoder<::ipc_test::GreeterRequestMsg>,
+      &Decoder<::ipc_test::GreeterReplyMsg>});
+
+  // rpc WaveGoodbye(GreeterRequestMsg) returns (GreeterReplyMsg) {}
+  desc->methods.emplace_back(ServiceDescriptor::Method{
+      "WaveGoodbye", &Decoder<::ipc_test::GreeterRequestMsg>,
+      &Decoder<::ipc_test::GreeterReplyMsg>});
+
+  desc->methods.shrink_to_fit();
+  return desc;
+}
+
+const ServiceDescriptor& GetDescriptorLazy() {
+  static ServiceDescriptor* lazily_initialized_descriptor = CreateDescriptor();
+  return *lazily_initialized_descriptor;
+}
+
+}  // namespace
+
+GreeterProxy::GreeterProxy(
+    ::perfetto::ipc::ServiceProxy::EventListener* event_listener)
+    : ::perfetto::ipc::ServiceProxy(event_listener) {}
+GreeterProxy::~GreeterProxy() = default;
+
+const ServiceDescriptor& GreeterProxy::GetDescriptor() {
+  return GetDescriptorLazy();
+}
+
+void GreeterProxy::SayHello(const GreeterRequestMsg& request,
+                            DeferredGreeterReply reply) {
+  BeginInvoke("SayHello", request, _DeferredBase(std::move(reply)));
+}
+
+void GreeterProxy::WaveGoodbye(const GreeterRequestMsg& request,
+                               DeferredGreeterReply reply) {
+  BeginInvoke("WaveGoodbye", request, _DeferredBase(std::move(reply)));
+}
+
+}  // namespace ipc_test
diff --git a/ipc/src/test/greeter_service-gen.h b/ipc/src/test/greeter_service-gen.h
new file mode 100644
index 0000000..1b8387b
--- /dev/null
+++ b/ipc/src/test/greeter_service-gen.h
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef IPC_SRC_TEST_GREETER_SERVICE_GEN_H_
+#define IPC_SRC_TEST_GREETER_SERVICE_GEN_H_
+
+// TODO(primiano): this file should be autogenerated by our the protobuf plugin.
+
+#include "ipc/deferred.h"
+#include "ipc/service_descriptor.h"
+#include "ipc/service_proxy.h"
+
+namespace ipc_test {
+
+// The request/response proto object in the .pb.h file.
+class GreeterRequestMsg;
+class GreeterReplyMsg;
+
+using DeferredGreeterReply = ::perfetto::ipc::Deferred<GreeterReplyMsg>;
+
+// TODO(primiano): introduce host-side sample service in next CL.
+// class Greeter : public ::perfetto::ipc::Service {};
+
+class GreeterProxy : public ::perfetto::ipc::ServiceProxy {
+ public:
+  GreeterProxy(::perfetto::ipc::ServiceProxy::EventListener*);
+  ~GreeterProxy() override;
+
+  // ServiceProxy implementation.
+  const ::perfetto::ipc::ServiceDescriptor& GetDescriptor() override;
+
+  // Greeter implementation.
+  void SayHello(const GreeterRequestMsg&, DeferredGreeterReply);
+  void WaveGoodbye(const GreeterRequestMsg&, DeferredGreeterReply);
+};
+
+}  // namespace ipc_test
+
+#endif  // IPC_SRC_TEST_GREETER_SERVICE_GEN_H_
diff --git a/ipc/src/test/greeter_service.proto b/ipc/src/test/greeter_service.proto
new file mode 100644
index 0000000..1ac1167
--- /dev/null
+++ b/ipc/src/test/greeter_service.proto
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+option optimize_for = LITE_RUNTIME;
+
+// Deliberately a namespace != of perfetto.* to spot namespace dependencies
+// bugs in the autogenerated headers.
+package ipc_test;
+
+service Greeter {
+  rpc SayHello(GreeterRequestMsg) returns (GreeterReplyMsg) {}
+  rpc WaveGoodbye(GreeterRequestMsg) returns (GreeterReplyMsg) {}
+}
+
+message GreeterRequestMsg {
+  string name = 1;
+}
+
+message GreeterReplyMsg {
+  string message = 1;
+}
diff --git a/ipc/src/unix_socket.cc b/ipc/src/unix_socket.cc
index d8d122a..71ad532 100644
--- a/ipc/src/unix_socket.cc
+++ b/ipc/src/unix_socket.cc
@@ -264,7 +264,7 @@
 
 bool UnixSocket::Send(const void* msg, size_t len, int send_fd) {
   if (state_ != State::kConnected) {
-    last_error_ = ENOTCONN;
+    errno = last_error_ = ENOTCONN;
     return false;
   }
 
diff --git a/ipc/src/wire_protocol.proto b/ipc/src/wire_protocol.proto
index d51f5ec..9a26042 100644
--- a/ipc/src/wire_protocol.proto
+++ b/ipc/src/wire_protocol.proto
@@ -48,6 +48,9 @@
     bytes reply_proto = 3;  // proto-encoded response value.
   }
 
+  // Host -> Client.
+  message RequestError { string error = 1; }
+
   // The client is expected to send requests with monotonically increasing
   // request_id. The host will match the request_id sent from the client.
   // In the case of a Streaming response (has_more = true) the host will send
@@ -59,6 +62,7 @@
     BindServiceReply msg_bind_service_reply = 4;
     InvokeMethod msg_invoke_method = 5;
     InvokeMethodReply msg_invoke_method_reply = 6;
+    RequestError msg_request_error = 7;
   }
 
   // Used only in unittests to generate a parsable message of arbitrary size.