blob: b70fe2bc93f6787d5ea29e605ed7d95adc9f8b5d [file] [log] [blame]
Primiano Tuccied0ce252017-11-09 19:35:10 +00001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "ipc/src/client_impl.h"
18
19#include <inttypes.h>
20
21#include "base/task_runner.h"
22#include "base/utils.h"
23#include "ipc/service_descriptor.h"
24#include "ipc/service_proxy.h"
25
26// TODO(primiano): Add ThreadChecker everywhere.
27
28// TODO(primiano): Add timeouts.
29
30namespace perfetto {
31namespace ipc {
32
33// static
34std::unique_ptr<Client> Client::CreateInstance(const char* socket_name,
35 base::TaskRunner* task_runner) {
36 std::unique_ptr<Client> client(new ClientImpl(socket_name, task_runner));
37 return client;
38}
39
40ClientImpl::ClientImpl(const char* socket_name, base::TaskRunner* task_runner)
41 : task_runner_(task_runner), weak_ptr_factory_(this) {
42 GOOGLE_PROTOBUF_VERIFY_VERSION;
43 sock_ = UnixSocket::Connect(socket_name, this, task_runner);
44}
45
46ClientImpl::~ClientImpl() {
47 OnDisconnect(nullptr); // The UnixSocket* ptr is not used in OnDisconnect().
48}
49
50void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
51 if (!service_proxy)
52 return;
53 if (!sock_->is_connected())
54 return queued_bindings_.emplace_back(service_proxy);
55 RequestID request_id = ++last_request_id_;
56 Frame frame;
57 frame.set_request_id(request_id);
58 Frame::BindService* req = frame.mutable_msg_bind_service();
59 const char* const service_name = service_proxy->GetDescriptor().service_name;
60 req->set_service_name(service_name);
61 if (!SendFrame(frame)) {
62 PERFETTO_DLOG("BindService(%s) failed", service_name);
63 return service_proxy->OnConnect(false /* success */);
64 }
65 QueuedRequest qr;
66 qr.type = Frame::kMsgBindService;
67 qr.request_id = request_id;
68 qr.service_proxy = service_proxy;
69 queued_requests_.emplace(request_id, std::move(qr));
70}
71
72void ClientImpl::UnbindService(ServiceID service_id) {
73 service_bindings_.erase(service_id);
74}
75
76RequestID ClientImpl::BeginInvoke(ServiceID service_id,
77 const std::string& method_name,
78 MethodID remote_method_id,
79 const ProtoMessage& method_args,
80 base::WeakPtr<ServiceProxy> service_proxy) {
81 std::string args_proto;
82 RequestID request_id = ++last_request_id_;
83 Frame frame;
84 frame.set_request_id(request_id);
85 Frame::InvokeMethod* req = frame.mutable_msg_invoke_method();
86 req->set_service_id(service_id);
87 req->set_method_id(remote_method_id);
88 bool did_serialize = method_args.SerializeToString(&args_proto);
89 req->set_args_proto(args_proto);
90 if (!did_serialize || !SendFrame(frame)) {
91 PERFETTO_DLOG("BeginInvoke() failed while sending the frame");
92 return 0;
93 }
94 QueuedRequest qr;
95 qr.type = Frame::kMsgInvokeMethod;
96 qr.request_id = request_id;
97 qr.method_name = method_name;
98 qr.service_proxy = service_proxy;
99 queued_requests_.emplace(request_id, std::move(qr));
100 return request_id;
101}
102
103bool ClientImpl::SendFrame(const Frame& frame) {
104 // Serialize the frame into protobuf, add the size header, and send it.
105 std::string buf = BufferedFrameDeserializer::Serialize(frame);
106
107 // TODO(primiano): remember that this is doing non-blocking I/O. What if the
108 // socket buffer is full? Maybe we just want to drop this on the floor? Or
109 // maybe throttle the send and PostTask the reply later?
Primiano Tucci420e47e2017-11-14 11:31:49 +0000110 bool res = sock_->Send(buf.data(), buf.size());
111 PERFETTO_CHECK(!sock_->is_connected() || res);
112 return res;
Primiano Tuccied0ce252017-11-09 19:35:10 +0000113}
114
115void ClientImpl::OnConnect(UnixSocket*, bool connected) {
116 // Drain the BindService() calls that were queued before establishig the
117 // connection with the host.
Primiano Tucci3052b1a2017-11-14 10:51:01 +0000118 for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings_) {
119 if (connected) {
Primiano Tuccied0ce252017-11-09 19:35:10 +0000120 BindService(service_proxy);
Primiano Tucci3052b1a2017-11-14 10:51:01 +0000121 } else if (service_proxy) {
122 service_proxy->OnConnect(false /* success */);
123 }
Primiano Tuccied0ce252017-11-09 19:35:10 +0000124 }
125 queued_bindings_.clear();
126}
127
128void ClientImpl::OnDisconnect(UnixSocket*) {
129 for (auto it : service_bindings_) {
130 base::WeakPtr<ServiceProxy>& service_proxy = it.second;
131 task_runner_->PostTask([service_proxy] {
132 if (service_proxy)
133 service_proxy->OnDisconnect();
134 });
135 }
136 service_bindings_.clear();
137 queued_bindings_.clear();
138}
139
140void ClientImpl::OnDataAvailable(UnixSocket*) {
141 size_t rsize;
142 do {
143 auto buf = frame_deserializer_.BeginReceive();
Primiano Tuccif54cae42017-11-21 19:37:13 +0000144 base::ScopedFile fd;
145 rsize = sock_->Receive(buf.data, buf.size, &fd);
146 if (fd) {
147 PERFETTO_DCHECK(!received_fd_);
148 received_fd_ = std::move(fd);
149 }
Primiano Tuccied0ce252017-11-09 19:35:10 +0000150 if (!frame_deserializer_.EndReceive(rsize)) {
151 // The endpoint tried to send a frame that is way too large.
152 return sock_->Shutdown(); // In turn will trigger an OnDisconnect().
153 // TODO check this.
154 }
155 } while (rsize > 0);
156
157 while (std::unique_ptr<Frame> frame = frame_deserializer_.PopNextFrame())
158 OnFrameReceived(*frame);
159}
160
161void ClientImpl::OnFrameReceived(const Frame& frame) {
162 auto queued_requests_it = queued_requests_.find(frame.request_id());
163 if (queued_requests_it == queued_requests_.end()) {
164 PERFETTO_DLOG("OnFrameReceived(): got invalid request_id=%" PRIu64,
165 static_cast<uint64_t>(frame.request_id()));
166 return;
167 }
168 QueuedRequest req = std::move(queued_requests_it->second);
169 queued_requests_.erase(queued_requests_it);
170
171 if (req.type == Frame::kMsgBindService &&
172 frame.msg_case() == Frame::kMsgBindServiceReply) {
173 return OnBindServiceReply(std::move(req), frame.msg_bind_service_reply());
174 }
175 if (req.type == Frame::kMsgInvokeMethod &&
176 frame.msg_case() == Frame::kMsgInvokeMethodReply) {
177 return OnInvokeMethodReply(std::move(req), frame.msg_invoke_method_reply());
178 }
179 if (frame.msg_case() == Frame::kMsgRequestError) {
Sami Kyostilaf95fbaa2017-11-10 10:21:51 +0000180 PERFETTO_DLOG("Host error: %s", frame.msg_request_error().error().c_str());
Primiano Tuccied0ce252017-11-09 19:35:10 +0000181 return;
182 }
183
184 PERFETTO_DLOG(
185 "OnFrameReceived() request msg_type=%d, received msg_type=%d in reply to "
186 "request_id=%" PRIu64,
187 req.type, frame.msg_case(), static_cast<uint64_t>(frame.request_id()));
188}
189
190void ClientImpl::OnBindServiceReply(QueuedRequest req,
191 const Frame::BindServiceReply& reply) {
192 base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
193 if (!service_proxy)
194 return;
Primiano Tucci3052b1a2017-11-14 10:51:01 +0000195 const char* svc_name = service_proxy->GetDescriptor().service_name;
Primiano Tuccied0ce252017-11-09 19:35:10 +0000196 if (!reply.success()) {
Primiano Tucci3052b1a2017-11-14 10:51:01 +0000197 PERFETTO_DLOG("BindService(): unknown service_name=\"%s\"", svc_name);
198 return service_proxy->OnConnect(false /* success */);
199 }
200
201 auto prev_service = service_bindings_.find(reply.service_id());
202 if (prev_service != service_bindings_.end() && prev_service->second.get()) {
203 PERFETTO_DLOG(
204 "BindService(): Trying to bind service \"%s\" but another service "
205 "named \"%s\" is already bound with the same ID.",
206 svc_name, prev_service->second->GetDescriptor().service_name);
Primiano Tuccied0ce252017-11-09 19:35:10 +0000207 return service_proxy->OnConnect(false /* success */);
208 }
209
210 // Build the method [name] -> [remote_id] map.
211 std::map<std::string, MethodID> methods;
212 for (const auto& method : reply.methods()) {
213 if (method.name().empty() || method.id() <= 0) {
214 PERFETTO_DLOG("OnBindServiceReply(): invalid method \"%s\" -> %" PRIu32,
215 method.name().c_str(), method.id());
216 continue;
217 }
218 methods[method.name()] = method.id();
219 }
220 service_proxy->InitializeBinding(weak_ptr_factory_.GetWeakPtr(),
221 reply.service_id(), std::move(methods));
222 service_bindings_[reply.service_id()] = service_proxy;
223 service_proxy->OnConnect(true /* success */);
224}
225
226void ClientImpl::OnInvokeMethodReply(QueuedRequest req,
227 const Frame::InvokeMethodReply& reply) {
Primiano Tuccic2025af2017-11-20 12:47:49 +0000228 base::WeakPtr<ServiceProxy> service_proxy = req.service_proxy;
Primiano Tuccied0ce252017-11-09 19:35:10 +0000229 if (!service_proxy)
230 return;
231 std::unique_ptr<ProtoMessage> decoded_reply;
232 if (reply.success()) {
233 // TODO this could be optimized, stop doing method name string lookups.
234 for (const auto& method : service_proxy->GetDescriptor().methods) {
235 if (req.method_name == method.name) {
236 decoded_reply = method.reply_proto_decoder(reply.reply_proto());
237 break;
238 }
239 }
240 }
Primiano Tuccic2025af2017-11-20 12:47:49 +0000241 const RequestID request_id = req.request_id;
242 service_proxy->EndInvoke(request_id, std::move(decoded_reply),
Primiano Tuccied0ce252017-11-09 19:35:10 +0000243 reply.has_more());
Primiano Tuccic2025af2017-11-20 12:47:49 +0000244
245 // If this is a streaming method and future replies will be resolved, put back
246 // the |req| with the callback into the set of active requests.
247 if (reply.has_more())
248 queued_requests_.emplace(request_id, std::move(req));
Primiano Tuccied0ce252017-11-09 19:35:10 +0000249}
250
251ClientImpl::QueuedRequest::QueuedRequest() = default;
252
Primiano Tuccif54cae42017-11-21 19:37:13 +0000253base::ScopedFile ClientImpl::TakeReceivedFD() {
254 return std::move(received_fd_);
255}
256
Primiano Tuccied0ce252017-11-09 19:35:10 +0000257} // namespace ipc
258} // namespace perfetto