blob: f5ab1b9de06656a7d13c4435d3241fa57fa7fdff [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipc/mojo/ipc_message_pipe_reader.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop_proxy.h"
#include "ipc/mojo/async_handle_waiter.h"
#include "ipc/mojo/ipc_channel_mojo.h"
namespace IPC {
namespace internal {
MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
MessagePipeReader::Delegate* delegate)
: pipe_(handle.Pass()),
delegate_(delegate),
async_waiter_(
new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
base::Unretained(this)))) {
}
MessagePipeReader::~MessagePipeReader() {
CHECK(!IsValid());
}
void MessagePipeReader::Close() {
async_waiter_.reset();
pipe_.reset();
OnPipeClosed();
}
void MessagePipeReader::CloseWithError(MojoResult error) {
OnPipeError(error);
Close();
}
bool MessagePipeReader::Send(scoped_ptr<Message> message) {
DCHECK(IsValid());
message->TraceMessageBegin();
std::vector<MojoHandle> handles;
MojoResult result = MOJO_RESULT_OK;
#if defined(OS_POSIX) && !defined(OS_NACL)
result = ChannelMojo::ReadFromFileDescriptorSet(message.get(), &handles);
#endif
if (result == MOJO_RESULT_OK) {
result = MojoWriteMessage(handle(),
message->data(),
static_cast<uint32>(message->size()),
handles.empty() ? nullptr : &handles[0],
static_cast<uint32>(handles.size()),
MOJO_WRITE_MESSAGE_FLAG_NONE);
}
if (result != MOJO_RESULT_OK) {
std::for_each(handles.begin(), handles.end(), &MojoClose);
CloseWithError(result);
return false;
}
return true;
}
void MessagePipeReader::OnMessageReceived() {
Message message(data_buffer().empty() ? "" : &data_buffer()[0],
static_cast<uint32>(data_buffer().size()));
std::vector<MojoHandle> handle_buffer;
TakeHandleBuffer(&handle_buffer);
#if defined(OS_POSIX) && !defined(OS_NACL)
MojoResult write_result =
ChannelMojo::WriteToFileDescriptorSet(handle_buffer, &message);
if (write_result != MOJO_RESULT_OK) {
CloseWithError(write_result);
return;
}
#else
DCHECK(handle_buffer.empty());
#endif
message.TraceMessageEnd();
delegate_->OnMessageReceived(message);
}
void MessagePipeReader::OnPipeClosed() {
if (!delegate_)
return;
delegate_->OnPipeClosed(this);
delegate_ = nullptr;
}
void MessagePipeReader::OnPipeError(MojoResult error) {
if (!delegate_)
return;
delegate_->OnPipeError(this);
}
MojoResult MessagePipeReader::ReadMessageBytes() {
DCHECK(handle_buffer_.empty());
uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
uint32_t num_handles = 0;
MojoResult result = MojoReadMessage(pipe_.get().value(),
num_bytes ? &data_buffer_[0] : nullptr,
&num_bytes,
nullptr,
&num_handles,
MOJO_READ_MESSAGE_FLAG_NONE);
data_buffer_.resize(num_bytes);
handle_buffer_.resize(num_handles);
if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
// MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
// it needs more bufer. So we re-read it with resized buffers.
result = MojoReadMessage(pipe_.get().value(),
num_bytes ? &data_buffer_[0] : nullptr,
&num_bytes,
num_handles ? &handle_buffer_[0] : nullptr,
&num_handles,
MOJO_READ_MESSAGE_FLAG_NONE);
}
DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
return result;
}
void MessagePipeReader::ReadAvailableMessages() {
while (pipe_.is_valid()) {
MojoResult read_result = ReadMessageBytes();
if (read_result == MOJO_RESULT_SHOULD_WAIT)
break;
if (read_result != MOJO_RESULT_OK) {
// FAILED_PRECONDITION means that all the received messages
// got consumed and the peer is already closed.
if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
DLOG(WARNING)
<< "Pipe got error from ReadMessage(). Closing: " << read_result;
OnPipeError(read_result);
}
Close();
break;
}
OnMessageReceived();
}
}
void MessagePipeReader::ReadMessagesThenWait() {
while (true) {
ReadAvailableMessages();
if (!pipe_.is_valid())
break;
// |Wait()| is safe to call only after all messages are read.
// If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
// Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
// MessagePipe.
MojoResult result =
async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
// If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
// that have been arrived after the last |ReadAvailableMessages()|.
// We have to consume then and retry in that case.
if (result != MOJO_RESULT_ALREADY_EXISTS) {
if (result != MOJO_RESULT_OK) {
DLOG(ERROR) << "Result is " << result;
OnPipeError(result);
Close();
}
break;
}
}
}
void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
if (wait_result != MOJO_RESULT_OK) {
if (wait_result != MOJO_RESULT_ABORTED) {
// FAILED_PRECONDITION happens every time the peer is dead so
// it isn't worth polluting the log message.
DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
<< "Pipe got error from the waiter. Closing: " << wait_result;
OnPipeError(wait_result);
}
Close();
return;
}
ReadMessagesThenWait();
}
void MessagePipeReader::DelayedDeleter::operator()(
MessagePipeReader* ptr) const {
ptr->Close();
base::MessageLoopProxy::current()->PostTask(
FROM_HERE, base::Bind(&DeleteNow, ptr));
}
} // namespace internal
} // namespace IPC