blob: b9691c57b755554a3970b3efec66bc957ffa43fe [file] [log] [blame]
/*
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#if defined(WEBRTC_POSIX)
#include <sys/file.h>
#endif // WEBRTC_POSIX
#include <errno.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <algorithm>
#include <string>
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/messagequeue.h"
#include "rtc_base/stream.h"
#include "rtc_base/stringencode.h"
#include "rtc_base/stringutils.h"
#include "rtc_base/thread.h"
#include "rtc_base/timeutils.h"
#if defined(WEBRTC_WIN)
#include <windows.h>
#define fileno _fileno
#endif
namespace rtc {
///////////////////////////////////////////////////////////////////////////////
// StreamInterface
///////////////////////////////////////////////////////////////////////////////
StreamInterface::~StreamInterface() {}
StreamResult StreamInterface::WriteAll(const void* data,
size_t data_len,
size_t* written,
int* error) {
StreamResult result = SR_SUCCESS;
size_t total_written = 0, current_written;
while (total_written < data_len) {
result = Write(static_cast<const char*>(data) + total_written,
data_len - total_written, &current_written, error);
if (result != SR_SUCCESS)
break;
total_written += current_written;
}
if (written)
*written = total_written;
return result;
}
StreamResult StreamInterface::ReadAll(void* buffer,
size_t buffer_len,
size_t* read,
int* error) {
StreamResult result = SR_SUCCESS;
size_t total_read = 0, current_read;
while (total_read < buffer_len) {
result = Read(static_cast<char*>(buffer) + total_read,
buffer_len - total_read, &current_read, error);
if (result != SR_SUCCESS)
break;
total_read += current_read;
}
if (read)
*read = total_read;
return result;
}
StreamResult StreamInterface::ReadLine(std::string* line) {
line->clear();
StreamResult result = SR_SUCCESS;
while (true) {
char ch;
result = Read(&ch, sizeof(ch), nullptr, nullptr);
if (result != SR_SUCCESS) {
break;
}
if (ch == '\n') {
break;
}
line->push_back(ch);
}
if (!line->empty()) { // give back the line we've collected so far with
result = SR_SUCCESS; // a success code. Otherwise return the last code
}
return result;
}
void StreamInterface::PostEvent(Thread* t, int events, int err) {
t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
new StreamEventData(events, err));
}
void StreamInterface::PostEvent(int events, int err) {
PostEvent(Thread::Current(), events, err);
}
const void* StreamInterface::GetReadData(size_t* data_len) {
return nullptr;
}
void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
return nullptr;
}
bool StreamInterface::SetPosition(size_t position) {
return false;
}
bool StreamInterface::GetPosition(size_t* position) const {
return false;
}
bool StreamInterface::GetSize(size_t* size) const {
return false;
}
bool StreamInterface::GetAvailable(size_t* size) const {
return false;
}
bool StreamInterface::GetWriteRemaining(size_t* size) const {
return false;
}
bool StreamInterface::Flush() {
return false;
}
bool StreamInterface::ReserveSize(size_t size) {
return true;
}
StreamInterface::StreamInterface() {}
void StreamInterface::OnMessage(Message* msg) {
if (MSG_POST_EVENT == msg->message_id) {
StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
SignalEvent(this, pe->events, pe->error);
delete msg->pdata;
}
}
///////////////////////////////////////////////////////////////////////////////
// StreamAdapterInterface
///////////////////////////////////////////////////////////////////////////////
StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
bool owned)
: stream_(stream), owned_(owned) {
if (nullptr != stream_)
stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
}
StreamState StreamAdapterInterface::GetState() const {
return stream_->GetState();
}
StreamResult StreamAdapterInterface::Read(void* buffer,
size_t buffer_len,
size_t* read,
int* error) {
return stream_->Read(buffer, buffer_len, read, error);
}
StreamResult StreamAdapterInterface::Write(const void* data,
size_t data_len,
size_t* written,
int* error) {
return stream_->Write(data, data_len, written, error);
}
void StreamAdapterInterface::Close() {
stream_->Close();
}
bool StreamAdapterInterface::SetPosition(size_t position) {
return stream_->SetPosition(position);
}
bool StreamAdapterInterface::GetPosition(size_t* position) const {
return stream_->GetPosition(position);
}
bool StreamAdapterInterface::GetSize(size_t* size) const {
return stream_->GetSize(size);
}
bool StreamAdapterInterface::GetAvailable(size_t* size) const {
return stream_->GetAvailable(size);
}
bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
return stream_->GetWriteRemaining(size);
}
bool StreamAdapterInterface::ReserveSize(size_t size) {
return stream_->ReserveSize(size);
}
bool StreamAdapterInterface::Flush() {
return stream_->Flush();
}
void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
if (nullptr != stream_)
stream_->SignalEvent.disconnect(this);
if (owned_)
delete stream_;
stream_ = stream;
owned_ = owned;
if (nullptr != stream_)
stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
}
StreamInterface* StreamAdapterInterface::Detach() {
if (nullptr != stream_)
stream_->SignalEvent.disconnect(this);
StreamInterface* stream = stream_;
stream_ = nullptr;
return stream;
}
StreamAdapterInterface::~StreamAdapterInterface() {
if (owned_)
delete stream_;
}
void StreamAdapterInterface::OnEvent(StreamInterface* stream,
int events,
int err) {
SignalEvent(this, events, err);
}
///////////////////////////////////////////////////////////////////////////////
// FileStream
///////////////////////////////////////////////////////////////////////////////
FileStream::FileStream() : file_(nullptr) {}
FileStream::~FileStream() {
FileStream::Close();
}
bool FileStream::Open(const std::string& filename,
const char* mode,
int* error) {
Close();
#if defined(WEBRTC_WIN)
std::wstring wfilename;
if (Utf8ToWindowsFilename(filename, &wfilename)) {
file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
} else {
if (error) {
*error = -1;
return false;
}
}
#else
file_ = fopen(filename.c_str(), mode);
#endif
if (!file_ && error) {
*error = errno;
}
return (file_ != nullptr);
}
bool FileStream::OpenShare(const std::string& filename,
const char* mode,
int shflag,
int* error) {
Close();
#if defined(WEBRTC_WIN)
std::wstring wfilename;
if (Utf8ToWindowsFilename(filename, &wfilename)) {
file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
if (!file_ && error) {
*error = errno;
return false;
}
return file_ != nullptr;
} else {
if (error) {
*error = -1;
}
return false;
}
#else
return Open(filename, mode, error);
#endif
}
bool FileStream::DisableBuffering() {
if (!file_)
return false;
return (setvbuf(file_, nullptr, _IONBF, 0) == 0);
}
StreamState FileStream::GetState() const {
return (file_ == nullptr) ? SS_CLOSED : SS_OPEN;
}
StreamResult FileStream::Read(void* buffer,
size_t buffer_len,
size_t* read,
int* error) {
if (!file_)
return SR_EOS;
size_t result = fread(buffer, 1, buffer_len, file_);
if ((result == 0) && (buffer_len > 0)) {
if (feof(file_))
return SR_EOS;
if (error)
*error = errno;
return SR_ERROR;
}
if (read)
*read = result;
return SR_SUCCESS;
}
StreamResult FileStream::Write(const void* data,
size_t data_len,
size_t* written,
int* error) {
if (!file_)
return SR_EOS;
size_t result = fwrite(data, 1, data_len, file_);
if ((result == 0) && (data_len > 0)) {
if (error)
*error = errno;
return SR_ERROR;
}
if (written)
*written = result;
return SR_SUCCESS;
}
void FileStream::Close() {
if (file_) {
DoClose();
file_ = nullptr;
}
}
bool FileStream::SetPosition(size_t position) {
if (!file_)
return false;
return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
}
bool FileStream::GetPosition(size_t* position) const {
RTC_DCHECK(nullptr != position);
if (!file_)
return false;
long result = ftell(file_);
if (result < 0)
return false;
if (position)
*position = result;
return true;
}
bool FileStream::GetSize(size_t* size) const {
RTC_DCHECK(nullptr != size);
if (!file_)
return false;
struct stat file_stats;
if (fstat(fileno(file_), &file_stats) != 0)
return false;
if (size)
*size = file_stats.st_size;
return true;
}
bool FileStream::GetAvailable(size_t* size) const {
RTC_DCHECK(nullptr != size);
if (!GetSize(size))
return false;
long result = ftell(file_);
if (result < 0)
return false;
if (size)
*size -= result;
return true;
}
bool FileStream::ReserveSize(size_t size) {
// TODO: extend the file to the proper length
return true;
}
bool FileStream::Flush() {
if (file_) {
return (0 == fflush(file_));
}
// try to flush empty file?
RTC_NOTREACHED();
return false;
}
void FileStream::DoClose() {
fclose(file_);
}
///////////////////////////////////////////////////////////////////////////////
// MemoryStream
///////////////////////////////////////////////////////////////////////////////
MemoryStreamBase::MemoryStreamBase()
: buffer_(nullptr), buffer_length_(0), data_length_(0), seek_position_(0) {}
StreamState MemoryStreamBase::GetState() const {
return SS_OPEN;
}
StreamResult MemoryStreamBase::Read(void* buffer,
size_t bytes,
size_t* bytes_read,
int* error) {
if (seek_position_ >= data_length_) {
return SR_EOS;
}
size_t available = data_length_ - seek_position_;
if (bytes > available) {
// Read partial buffer
bytes = available;
}
memcpy(buffer, &buffer_[seek_position_], bytes);
seek_position_ += bytes;
if (bytes_read) {
*bytes_read = bytes;
}
return SR_SUCCESS;
}
StreamResult MemoryStreamBase::Write(const void* buffer,
size_t bytes,
size_t* bytes_written,
int* error) {
size_t available = buffer_length_ - seek_position_;
if (0 == available) {
// Increase buffer size to the larger of:
// a) new position rounded up to next 256 bytes
// b) double the previous length
size_t new_buffer_length =
std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
StreamResult result = DoReserve(new_buffer_length, error);
if (SR_SUCCESS != result) {
return result;
}
RTC_DCHECK(buffer_length_ >= new_buffer_length);
available = buffer_length_ - seek_position_;
}
if (bytes > available) {
bytes = available;
}
memcpy(&buffer_[seek_position_], buffer, bytes);
seek_position_ += bytes;
if (data_length_ < seek_position_) {
data_length_ = seek_position_;
}
if (bytes_written) {
*bytes_written = bytes;
}
return SR_SUCCESS;
}
void MemoryStreamBase::Close() {
// nothing to do
}
bool MemoryStreamBase::SetPosition(size_t position) {
if (position > data_length_)
return false;
seek_position_ = position;
return true;
}
bool MemoryStreamBase::GetPosition(size_t* position) const {
if (position)
*position = seek_position_;
return true;
}
bool MemoryStreamBase::GetSize(size_t* size) const {
if (size)
*size = data_length_;
return true;
}
bool MemoryStreamBase::GetAvailable(size_t* size) const {
if (size)
*size = data_length_ - seek_position_;
return true;
}
bool MemoryStreamBase::ReserveSize(size_t size) {
return (SR_SUCCESS == DoReserve(size, nullptr));
}
StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
}
///////////////////////////////////////////////////////////////////////////////
MemoryStream::MemoryStream() {}
MemoryStream::MemoryStream(const char* data) {
SetData(data, strlen(data));
}
MemoryStream::MemoryStream(const void* data, size_t length) {
SetData(data, length);
}
MemoryStream::~MemoryStream() {
delete[] buffer_;
}
void MemoryStream::SetData(const void* data, size_t length) {
data_length_ = buffer_length_ = length;
delete[] buffer_;
buffer_ = new char[buffer_length_];
memcpy(buffer_, data, data_length_);
seek_position_ = 0;
}
StreamResult MemoryStream::DoReserve(size_t size, int* error) {
if (buffer_length_ >= size)
return SR_SUCCESS;
if (char* new_buffer = new char[size]) {
memcpy(new_buffer, buffer_, data_length_);
delete[] buffer_;
buffer_ = new_buffer;
buffer_length_ = size;
return SR_SUCCESS;
}
if (error) {
*error = ENOMEM;
}
return SR_ERROR;
}
///////////////////////////////////////////////////////////////////////////////
ExternalMemoryStream::ExternalMemoryStream() {}
ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
SetData(data, length);
}
ExternalMemoryStream::~ExternalMemoryStream() {}
void ExternalMemoryStream::SetData(void* data, size_t length) {
data_length_ = buffer_length_ = length;
buffer_ = static_cast<char*>(data);
seek_position_ = 0;
}
///////////////////////////////////////////////////////////////////////////////
// FifoBuffer
///////////////////////////////////////////////////////////////////////////////
FifoBuffer::FifoBuffer(size_t size)
: state_(SS_OPEN),
buffer_(new char[size]),
buffer_length_(size),
data_length_(0),
read_position_(0),
owner_(Thread::Current()) {
// all events are done on the owner_ thread
}
FifoBuffer::FifoBuffer(size_t size, Thread* owner)
: state_(SS_OPEN),
buffer_(new char[size]),
buffer_length_(size),
data_length_(0),
read_position_(0),
owner_(owner) {
// all events are done on the owner_ thread
}
FifoBuffer::~FifoBuffer() {}
bool FifoBuffer::GetBuffered(size_t* size) const {
CritScope cs(&crit_);
*size = data_length_;
return true;
}
bool FifoBuffer::SetCapacity(size_t size) {
CritScope cs(&crit_);
if (data_length_ > size) {
return false;
}
if (size != buffer_length_) {
char* buffer = new char[size];
const size_t copy = data_length_;
const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
memcpy(buffer, &buffer_[read_position_], tail_copy);
memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
buffer_.reset(buffer);
read_position_ = 0;
buffer_length_ = size;
}
return true;
}
StreamResult FifoBuffer::ReadOffset(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read) {
CritScope cs(&crit_);
return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
}
StreamResult FifoBuffer::WriteOffset(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written) {
CritScope cs(&crit_);
return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
}
StreamState FifoBuffer::GetState() const {
CritScope cs(&crit_);
return state_;
}
StreamResult FifoBuffer::Read(void* buffer,
size_t bytes,
size_t* bytes_read,
int* error) {
CritScope cs(&crit_);
const bool was_writable = data_length_ < buffer_length_;
size_t copy = 0;
StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
if (result == SR_SUCCESS) {
// If read was successful then adjust the read position and number of
// bytes buffered.
read_position_ = (read_position_ + copy) % buffer_length_;
data_length_ -= copy;
if (bytes_read) {
*bytes_read = copy;
}
// if we were full before, and now we're not, post an event
if (!was_writable && copy > 0) {
PostEvent(owner_, SE_WRITE, 0);
}
}
return result;
}
StreamResult FifoBuffer::Write(const void* buffer,
size_t bytes,
size_t* bytes_written,
int* error) {
CritScope cs(&crit_);
const bool was_readable = (data_length_ > 0);
size_t copy = 0;
StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
if (result == SR_SUCCESS) {
// If write was successful then adjust the number of readable bytes.
data_length_ += copy;
if (bytes_written) {
*bytes_written = copy;
}
// if we didn't have any data to read before, and now we do, post an event
if (!was_readable && copy > 0) {
PostEvent(owner_, SE_READ, 0);
}
}
return result;
}
void FifoBuffer::Close() {
CritScope cs(&crit_);
state_ = SS_CLOSED;
}
const void* FifoBuffer::GetReadData(size_t* size) {
CritScope cs(&crit_);
*size = (read_position_ + data_length_ <= buffer_length_)
? data_length_
: buffer_length_ - read_position_;
return &buffer_[read_position_];
}
void FifoBuffer::ConsumeReadData(size_t size) {
CritScope cs(&crit_);
RTC_DCHECK(size <= data_length_);
const bool was_writable = data_length_ < buffer_length_;
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
if (!was_writable && size > 0) {
PostEvent(owner_, SE_WRITE, 0);
}
}
void* FifoBuffer::GetWriteBuffer(size_t* size) {
CritScope cs(&crit_);
if (state_ == SS_CLOSED) {
return nullptr;
}
// if empty, reset the write position to the beginning, so we can get
// the biggest possible block
if (data_length_ == 0) {
read_position_ = 0;
}
const size_t write_position =
(read_position_ + data_length_) % buffer_length_;
*size = (write_position > read_position_ || data_length_ == 0)
? buffer_length_ - write_position
: read_position_ - write_position;
return &buffer_[write_position];
}
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
CritScope cs(&crit_);
RTC_DCHECK(size <= buffer_length_ - data_length_);
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {
PostEvent(owner_, SE_READ, 0);
}
}
bool FifoBuffer::GetWriteRemaining(size_t* size) const {
CritScope cs(&crit_);
*size = buffer_length_ - data_length_;
return true;
}
StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_read) {
if (offset >= data_length_) {
return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
}
const size_t available = data_length_ - offset;
const size_t read_position = (read_position_ + offset) % buffer_length_;
const size_t copy = std::min(bytes, available);
const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
char* const p = static_cast<char*>(buffer);
memcpy(p, &buffer_[read_position], tail_copy);
memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
if (bytes_read) {
*bytes_read = copy;
}
return SR_SUCCESS;
}
StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
size_t bytes,
size_t offset,
size_t* bytes_written) {
if (state_ == SS_CLOSED) {
return SR_EOS;
}
if (data_length_ + offset >= buffer_length_) {
return SR_BLOCK;
}
const size_t available = buffer_length_ - data_length_ - offset;
const size_t write_position =
(read_position_ + data_length_ + offset) % buffer_length_;
const size_t copy = std::min(bytes, available);
const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
const char* const p = static_cast<const char*>(buffer);
memcpy(&buffer_[write_position], p, tail_copy);
memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
if (bytes_written) {
*bytes_written = copy;
}
return SR_SUCCESS;
}
///////////////////////////////////////////////////////////////////////////////
StreamResult Flow(StreamInterface* source,
char* buffer,
size_t buffer_len,
StreamInterface* sink,
size_t* data_len /* = nullptr */) {
RTC_DCHECK(buffer_len > 0);
StreamResult result;
size_t count, read_pos, write_pos;
if (data_len) {
read_pos = *data_len;
} else {
read_pos = 0;
}
bool end_of_stream = false;
do {
// Read until buffer is full, end of stream, or error
while (!end_of_stream && (read_pos < buffer_len)) {
result = source->Read(buffer + read_pos, buffer_len - read_pos, &count,
nullptr);
if (result == SR_EOS) {
end_of_stream = true;
} else if (result != SR_SUCCESS) {
if (data_len) {
*data_len = read_pos;
}
return result;
} else {
read_pos += count;
}
}
// Write until buffer is empty, or error (including end of stream)
write_pos = 0;
while (write_pos < read_pos) {
result = sink->Write(buffer + write_pos, read_pos - write_pos, &count,
nullptr);
if (result != SR_SUCCESS) {
if (data_len) {
*data_len = read_pos - write_pos;
if (write_pos > 0) {
memmove(buffer, buffer + write_pos, *data_len);
}
}
return result;
}
write_pos += count;
}
read_pos = 0;
} while (!end_of_stream);
if (data_len) {
*data_len = 0;
}
return SR_SUCCESS;
}
///////////////////////////////////////////////////////////////////////////////
} // namespace rtc