blob: 9aa10d7735be7f9190dc19604922683f98410826 [file] [log] [blame]
henrike@webrtc.org47be73b2014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
17#include <string>
18#include "webrtc/base/basictypes.h"
19#include "webrtc/base/common.h"
20#include "webrtc/base/logging.h"
21#include "webrtc/base/messagequeue.h"
22#include "webrtc/base/stream.h"
23#include "webrtc/base/stringencode.h"
24#include "webrtc/base/stringutils.h"
25#include "webrtc/base/thread.h"
26#include "webrtc/base/timeutils.h"
27
28#if defined(WEBRTC_WIN)
29#include "webrtc/base/win32.h"
30#define fileno _fileno
31#endif
32
33namespace rtc {
34
35///////////////////////////////////////////////////////////////////////////////
36// StreamInterface
37///////////////////////////////////////////////////////////////////////////////
38StreamInterface::~StreamInterface() {
39}
40
41StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
42 size_t* written, int* error) {
43 StreamResult result = SR_SUCCESS;
44 size_t total_written = 0, current_written;
45 while (total_written < data_len) {
46 result = Write(static_cast<const char*>(data) + total_written,
47 data_len - total_written, &current_written, error);
48 if (result != SR_SUCCESS)
49 break;
50 total_written += current_written;
51 }
52 if (written)
53 *written = total_written;
54 return result;
55}
56
57StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
58 size_t* read, int* error) {
59 StreamResult result = SR_SUCCESS;
60 size_t total_read = 0, current_read;
61 while (total_read < buffer_len) {
62 result = Read(static_cast<char*>(buffer) + total_read,
63 buffer_len - total_read, &current_read, error);
64 if (result != SR_SUCCESS)
65 break;
66 total_read += current_read;
67 }
68 if (read)
69 *read = total_read;
70 return result;
71}
72
73StreamResult StreamInterface::ReadLine(std::string* line) {
74 line->clear();
75 StreamResult result = SR_SUCCESS;
76 while (true) {
77 char ch;
78 result = Read(&ch, sizeof(ch), NULL, NULL);
79 if (result != SR_SUCCESS) {
80 break;
81 }
82 if (ch == '\n') {
83 break;
84 }
85 line->push_back(ch);
86 }
87 if (!line->empty()) { // give back the line we've collected so far with
88 result = SR_SUCCESS; // a success code. Otherwise return the last code
89 }
90 return result;
91}
92
93void StreamInterface::PostEvent(Thread* t, int events, int err) {
94 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
95}
96
97void StreamInterface::PostEvent(int events, int err) {
98 PostEvent(Thread::Current(), events, err);
99}
100
101StreamInterface::StreamInterface() {
102}
103
104void StreamInterface::OnMessage(Message* msg) {
105 if (MSG_POST_EVENT == msg->message_id) {
106 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
107 SignalEvent(this, pe->events, pe->error);
108 delete msg->pdata;
109 }
110}
111
112///////////////////////////////////////////////////////////////////////////////
113// StreamAdapterInterface
114///////////////////////////////////////////////////////////////////////////////
115
116StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
117 bool owned)
118 : stream_(stream), owned_(owned) {
119 if (NULL != stream_)
120 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
121}
122
123void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
124 if (NULL != stream_)
125 stream_->SignalEvent.disconnect(this);
126 if (owned_)
127 delete stream_;
128 stream_ = stream;
129 owned_ = owned;
130 if (NULL != stream_)
131 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
132}
133
134StreamInterface* StreamAdapterInterface::Detach() {
135 if (NULL != stream_)
136 stream_->SignalEvent.disconnect(this);
137 StreamInterface* stream = stream_;
138 stream_ = NULL;
139 return stream;
140}
141
142StreamAdapterInterface::~StreamAdapterInterface() {
143 if (owned_)
144 delete stream_;
145}
146
147///////////////////////////////////////////////////////////////////////////////
148// StreamTap
149///////////////////////////////////////////////////////////////////////////////
150
151StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
152 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
153 tap_error_(0) {
154 AttachTap(tap);
155}
156
157void StreamTap::AttachTap(StreamInterface* tap) {
158 tap_.reset(tap);
159}
160
161StreamInterface* StreamTap::DetachTap() {
162 return tap_.release();
163}
164
165StreamResult StreamTap::GetTapResult(int* error) {
166 if (error) {
167 *error = tap_error_;
168 }
169 return tap_result_;
170}
171
172StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
173 size_t* read, int* error) {
174 size_t backup_read;
175 if (!read) {
176 read = &backup_read;
177 }
178 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
179 read, error);
180 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
181 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
182 }
183 return res;
184}
185
186StreamResult StreamTap::Write(const void* data, size_t data_len,
187 size_t* written, int* error) {
188 size_t backup_written;
189 if (!written) {
190 written = &backup_written;
191 }
192 StreamResult res = StreamAdapterInterface::Write(data, data_len,
193 written, error);
194 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
195 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
196 }
197 return res;
198}
199
200///////////////////////////////////////////////////////////////////////////////
201// StreamSegment
202///////////////////////////////////////////////////////////////////////////////
203
204StreamSegment::StreamSegment(StreamInterface* stream)
205 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
206 length_(SIZE_UNKNOWN) {
207 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
208 stream->GetPosition(&start_);
209}
210
211StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
212 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
213 length_(length) {
214 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
215 stream->GetPosition(&start_);
216}
217
218StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
219 size_t* read, int* error) {
220 if (SIZE_UNKNOWN != length_) {
221 if (pos_ >= length_)
222 return SR_EOS;
223 buffer_len = _min(buffer_len, length_ - pos_);
224 }
225 size_t backup_read;
226 if (!read) {
227 read = &backup_read;
228 }
229 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
230 read, error);
231 if (SR_SUCCESS == result) {
232 pos_ += *read;
233 }
234 return result;
235}
236
237bool StreamSegment::SetPosition(size_t position) {
238 if (SIZE_UNKNOWN == start_)
239 return false; // Not seekable
240 if ((SIZE_UNKNOWN != length_) && (position > length_))
241 return false; // Seek past end of segment
242 if (!StreamAdapterInterface::SetPosition(start_ + position))
243 return false;
244 pos_ = position;
245 return true;
246}
247
248bool StreamSegment::GetPosition(size_t* position) const {
249 if (SIZE_UNKNOWN == start_)
250 return false; // Not seekable
251 if (!StreamAdapterInterface::GetPosition(position))
252 return false;
253 if (position) {
254 ASSERT(*position >= start_);
255 *position -= start_;
256 }
257 return true;
258}
259
260bool StreamSegment::GetSize(size_t* size) const {
261 if (!StreamAdapterInterface::GetSize(size))
262 return false;
263 if (size) {
264 if (SIZE_UNKNOWN != start_) {
265 ASSERT(*size >= start_);
266 *size -= start_;
267 }
268 if (SIZE_UNKNOWN != length_) {
269 *size = _min(*size, length_);
270 }
271 }
272 return true;
273}
274
275bool StreamSegment::GetAvailable(size_t* size) const {
276 if (!StreamAdapterInterface::GetAvailable(size))
277 return false;
278 if (size && (SIZE_UNKNOWN != length_))
279 *size = _min(*size, length_ - pos_);
280 return true;
281}
282
283///////////////////////////////////////////////////////////////////////////////
284// NullStream
285///////////////////////////////////////////////////////////////////////////////
286
287NullStream::NullStream() {
288}
289
290NullStream::~NullStream() {
291}
292
293StreamState NullStream::GetState() const {
294 return SS_OPEN;
295}
296
297StreamResult NullStream::Read(void* buffer, size_t buffer_len,
298 size_t* read, int* error) {
299 if (error) *error = -1;
300 return SR_ERROR;
301}
302
303StreamResult NullStream::Write(const void* data, size_t data_len,
304 size_t* written, int* error) {
305 if (written) *written = data_len;
306 return SR_SUCCESS;
307}
308
309void NullStream::Close() {
310}
311
312///////////////////////////////////////////////////////////////////////////////
313// FileStream
314///////////////////////////////////////////////////////////////////////////////
315
316FileStream::FileStream() : file_(NULL) {
317}
318
319FileStream::~FileStream() {
320 FileStream::Close();
321}
322
323bool FileStream::Open(const std::string& filename, const char* mode,
324 int* error) {
325 Close();
326#if defined(WEBRTC_WIN)
327 std::wstring wfilename;
328 if (Utf8ToWindowsFilename(filename, &wfilename)) {
329 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
330 } else {
331 if (error) {
332 *error = -1;
333 return false;
334 }
335 }
336#else
337 file_ = fopen(filename.c_str(), mode);
338#endif
339 if (!file_ && error) {
340 *error = errno;
341 }
342 return (file_ != NULL);
343}
344
345bool FileStream::OpenShare(const std::string& filename, const char* mode,
346 int shflag, int* error) {
347 Close();
348#if defined(WEBRTC_WIN)
349 std::wstring wfilename;
350 if (Utf8ToWindowsFilename(filename, &wfilename)) {
351 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
352 if (!file_ && error) {
353 *error = errno;
354 return false;
355 }
356 return file_ != NULL;
357 } else {
358 if (error) {
359 *error = -1;
360 }
361 return false;
362 }
363#else
364 return Open(filename, mode, error);
365#endif
366}
367
368bool FileStream::DisableBuffering() {
369 if (!file_)
370 return false;
371 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
372}
373
374StreamState FileStream::GetState() const {
375 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
376}
377
378StreamResult FileStream::Read(void* buffer, size_t buffer_len,
379 size_t* read, int* error) {
380 if (!file_)
381 return SR_EOS;
382 size_t result = fread(buffer, 1, buffer_len, file_);
383 if ((result == 0) && (buffer_len > 0)) {
384 if (feof(file_))
385 return SR_EOS;
386 if (error)
387 *error = errno;
388 return SR_ERROR;
389 }
390 if (read)
391 *read = result;
392 return SR_SUCCESS;
393}
394
395StreamResult FileStream::Write(const void* data, size_t data_len,
396 size_t* written, int* error) {
397 if (!file_)
398 return SR_EOS;
399 size_t result = fwrite(data, 1, data_len, file_);
400 if ((result == 0) && (data_len > 0)) {
401 if (error)
402 *error = errno;
403 return SR_ERROR;
404 }
405 if (written)
406 *written = result;
407 return SR_SUCCESS;
408}
409
410void FileStream::Close() {
411 if (file_) {
412 DoClose();
413 file_ = NULL;
414 }
415}
416
417bool FileStream::SetPosition(size_t position) {
418 if (!file_)
419 return false;
420 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
421}
422
423bool FileStream::GetPosition(size_t* position) const {
424 ASSERT(NULL != position);
425 if (!file_)
426 return false;
427 long result = ftell(file_);
428 if (result < 0)
429 return false;
430 if (position)
431 *position = result;
432 return true;
433}
434
435bool FileStream::GetSize(size_t* size) const {
436 ASSERT(NULL != size);
437 if (!file_)
438 return false;
439 struct stat file_stats;
440 if (fstat(fileno(file_), &file_stats) != 0)
441 return false;
442 if (size)
443 *size = file_stats.st_size;
444 return true;
445}
446
447bool FileStream::GetAvailable(size_t* size) const {
448 ASSERT(NULL != size);
449 if (!GetSize(size))
450 return false;
451 long result = ftell(file_);
452 if (result < 0)
453 return false;
454 if (size)
455 *size -= result;
456 return true;
457}
458
459bool FileStream::ReserveSize(size_t size) {
460 // TODO: extend the file to the proper length
461 return true;
462}
463
464bool FileStream::GetSize(const std::string& filename, size_t* size) {
465 struct stat file_stats;
466 if (stat(filename.c_str(), &file_stats) != 0)
467 return false;
468 *size = file_stats.st_size;
469 return true;
470}
471
472bool FileStream::Flush() {
473 if (file_) {
474 return (0 == fflush(file_));
475 }
476 // try to flush empty file?
477 ASSERT(false);
478 return false;
479}
480
481#if defined(WEBRTC_POSIX) && !defined(__native_client__)
482
483bool FileStream::TryLock() {
484 if (file_ == NULL) {
485 // Stream not open.
486 ASSERT(false);
487 return false;
488 }
489
490 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
491}
492
493bool FileStream::Unlock() {
494 if (file_ == NULL) {
495 // Stream not open.
496 ASSERT(false);
497 return false;
498 }
499
500 return flock(fileno(file_), LOCK_UN) == 0;
501}
502
503#endif
504
505void FileStream::DoClose() {
506 fclose(file_);
507}
508
509CircularFileStream::CircularFileStream(size_t max_size)
510 : max_write_size_(max_size),
511 position_(0),
512 marked_position_(max_size / 2),
513 last_write_position_(0),
514 read_segment_(READ_LATEST),
515 read_segment_available_(0) {
516}
517
518bool CircularFileStream::Open(
519 const std::string& filename, const char* mode, int* error) {
520 if (!FileStream::Open(filename.c_str(), mode, error))
521 return false;
522
523 if (strchr(mode, "r") != NULL) { // Opened in read mode.
524 // Check if the buffer has been overwritten and determine how to read the
525 // log in time sequence.
526 size_t file_size;
527 GetSize(&file_size);
528 if (file_size == position_) {
529 // The buffer has not been overwritten yet. Read 0 .. file_size
530 read_segment_ = READ_LATEST;
531 read_segment_available_ = file_size;
532 } else {
533 // The buffer has been over written. There are three segments: The first
534 // one is 0 .. marked_position_, which is the marked earliest log. The
535 // second one is position_ .. file_size, which is the middle log. The
536 // last one is marked_position_ .. position_, which is the latest log.
537 read_segment_ = READ_MARKED;
538 read_segment_available_ = marked_position_;
539 last_write_position_ = position_;
540 }
541
542 // Read from the beginning.
543 position_ = 0;
544 SetPosition(position_);
545 }
546
547 return true;
548}
549
550StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
551 size_t* read, int* error) {
552 if (read_segment_available_ == 0) {
553 size_t file_size;
554 switch (read_segment_) {
555 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
556 read_segment_ = READ_MIDDLE;
557 position_ = last_write_position_;
558 SetPosition(position_);
559 GetSize(&file_size);
560 read_segment_available_ = file_size - position_;
561 break;
562
563 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
564 read_segment_ = READ_LATEST;
565 position_ = marked_position_;
566 SetPosition(position_);
567 read_segment_available_ = last_write_position_ - position_;
568 break;
569
570 default: // Finished READ_LATEST and return EOS.
571 return rtc::SR_EOS;
572 }
573 }
574
575 size_t local_read;
576 if (!read) read = &local_read;
577
578 size_t to_read = rtc::_min(buffer_len, read_segment_available_);
579 rtc::StreamResult result
580 = rtc::FileStream::Read(buffer, to_read, read, error);
581 if (result == rtc::SR_SUCCESS) {
582 read_segment_available_ -= *read;
583 position_ += *read;
584 }
585 return result;
586}
587
588StreamResult CircularFileStream::Write(const void* data, size_t data_len,
589 size_t* written, int* error) {
590 if (position_ >= max_write_size_) {
591 ASSERT(position_ == max_write_size_);
592 position_ = marked_position_;
593 SetPosition(position_);
594 }
595
596 size_t local_written;
597 if (!written) written = &local_written;
598
599 size_t to_eof = max_write_size_ - position_;
600 size_t to_write = rtc::_min(data_len, to_eof);
601 rtc::StreamResult result
602 = rtc::FileStream::Write(data, to_write, written, error);
603 if (result == rtc::SR_SUCCESS) {
604 position_ += *written;
605 }
606 return result;
607}
608
609AsyncWriteStream::~AsyncWriteStream() {
610 write_thread_->Clear(this, 0, NULL);
611 ClearBufferAndWrite();
612
613 CritScope cs(&crit_stream_);
614 stream_.reset();
615}
616
617// This is needed by some stream writers, such as RtpDumpWriter.
618bool AsyncWriteStream::GetPosition(size_t* position) const {
619 CritScope cs(&crit_stream_);
620 return stream_->GetPosition(position);
621}
622
623// This is needed by some stream writers, such as the plugin log writers.
624StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
625 size_t* read, int* error) {
626 CritScope cs(&crit_stream_);
627 return stream_->Read(buffer, buffer_len, read, error);
628}
629
630void AsyncWriteStream::Close() {
631 if (state_ == SS_CLOSED) {
632 return;
633 }
634
635 write_thread_->Clear(this, 0, NULL);
636 ClearBufferAndWrite();
637
638 CritScope cs(&crit_stream_);
639 stream_->Close();
640 state_ = SS_CLOSED;
641}
642
643StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
644 size_t* written, int* error) {
645 if (state_ == SS_CLOSED) {
646 return SR_ERROR;
647 }
648
649 size_t previous_buffer_length = 0;
650 {
651 CritScope cs(&crit_buffer_);
652 previous_buffer_length = buffer_.length();
653 buffer_.AppendData(data, data_len);
654 }
655
656 if (previous_buffer_length == 0) {
657 // If there's stuff already in the buffer, then we already called
658 // Post and the write_thread_ hasn't pulled it out yet, so we
659 // don't need to re-Post.
660 write_thread_->Post(this, 0, NULL);
661 }
662 // Return immediately, assuming that it works.
663 if (written) {
664 *written = data_len;
665 }
666 return SR_SUCCESS;
667}
668
669void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
670 ClearBufferAndWrite();
671}
672
673bool AsyncWriteStream::Flush() {
674 if (state_ == SS_CLOSED) {
675 return false;
676 }
677
678 ClearBufferAndWrite();
679
680 CritScope cs(&crit_stream_);
681 return stream_->Flush();
682}
683
684void AsyncWriteStream::ClearBufferAndWrite() {
685 Buffer to_write;
686 {
687 CritScope cs_buffer(&crit_buffer_);
688 buffer_.TransferTo(&to_write);
689 }
690
691 if (to_write.length() > 0) {
692 CritScope cs(&crit_stream_);
693 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
694 }
695}
696
697#if defined(WEBRTC_POSIX) && !defined(__native_client__)
698
699// Have to identically rewrite the FileStream destructor or else it would call
700// the base class's Close() instead of the sub-class's.
701POpenStream::~POpenStream() {
702 POpenStream::Close();
703}
704
705bool POpenStream::Open(const std::string& subcommand,
706 const char* mode,
707 int* error) {
708 Close();
709 file_ = popen(subcommand.c_str(), mode);
710 if (file_ == NULL) {
711 if (error)
712 *error = errno;
713 return false;
714 }
715 return true;
716}
717
718bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
719 int shflag, int* error) {
720 return Open(subcommand, mode, error);
721}
722
723void POpenStream::DoClose() {
724 wait_status_ = pclose(file_);
725}
726
727#endif
728
729///////////////////////////////////////////////////////////////////////////////
730// MemoryStream
731///////////////////////////////////////////////////////////////////////////////
732
733MemoryStreamBase::MemoryStreamBase()
734 : buffer_(NULL), buffer_length_(0), data_length_(0),
735 seek_position_(0) {
736}
737
738StreamState MemoryStreamBase::GetState() const {
739 return SS_OPEN;
740}
741
742StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
743 size_t* bytes_read, int* error) {
744 if (seek_position_ >= data_length_) {
745 return SR_EOS;
746 }
747 size_t available = data_length_ - seek_position_;
748 if (bytes > available) {
749 // Read partial buffer
750 bytes = available;
751 }
752 memcpy(buffer, &buffer_[seek_position_], bytes);
753 seek_position_ += bytes;
754 if (bytes_read) {
755 *bytes_read = bytes;
756 }
757 return SR_SUCCESS;
758}
759
760StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
761 size_t* bytes_written, int* error) {
762 size_t available = buffer_length_ - seek_position_;
763 if (0 == available) {
764 // Increase buffer size to the larger of:
765 // a) new position rounded up to next 256 bytes
766 // b) double the previous length
767 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
768 buffer_length_ * 2);
769 StreamResult result = DoReserve(new_buffer_length, error);
770 if (SR_SUCCESS != result) {
771 return result;
772 }
773 ASSERT(buffer_length_ >= new_buffer_length);
774 available = buffer_length_ - seek_position_;
775 }
776
777 if (bytes > available) {
778 bytes = available;
779 }
780 memcpy(&buffer_[seek_position_], buffer, bytes);
781 seek_position_ += bytes;
782 if (data_length_ < seek_position_) {
783 data_length_ = seek_position_;
784 }
785 if (bytes_written) {
786 *bytes_written = bytes;
787 }
788 return SR_SUCCESS;
789}
790
791void MemoryStreamBase::Close() {
792 // nothing to do
793}
794
795bool MemoryStreamBase::SetPosition(size_t position) {
796 if (position > data_length_)
797 return false;
798 seek_position_ = position;
799 return true;
800}
801
802bool MemoryStreamBase::GetPosition(size_t* position) const {
803 if (position)
804 *position = seek_position_;
805 return true;
806}
807
808bool MemoryStreamBase::GetSize(size_t* size) const {
809 if (size)
810 *size = data_length_;
811 return true;
812}
813
814bool MemoryStreamBase::GetAvailable(size_t* size) const {
815 if (size)
816 *size = data_length_ - seek_position_;
817 return true;
818}
819
820bool MemoryStreamBase::ReserveSize(size_t size) {
821 return (SR_SUCCESS == DoReserve(size, NULL));
822}
823
824StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
825 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
826}
827
828///////////////////////////////////////////////////////////////////////////////
829
830MemoryStream::MemoryStream()
831 : buffer_alloc_(NULL) {
832}
833
834MemoryStream::MemoryStream(const char* data)
835 : buffer_alloc_(NULL) {
836 SetData(data, strlen(data));
837}
838
839MemoryStream::MemoryStream(const void* data, size_t length)
840 : buffer_alloc_(NULL) {
841 SetData(data, length);
842}
843
844MemoryStream::~MemoryStream() {
845 delete [] buffer_alloc_;
846}
847
848void MemoryStream::SetData(const void* data, size_t length) {
849 data_length_ = buffer_length_ = length;
850 delete [] buffer_alloc_;
851 buffer_alloc_ = new char[buffer_length_ + kAlignment];
852 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
853 memcpy(buffer_, data, data_length_);
854 seek_position_ = 0;
855}
856
857StreamResult MemoryStream::DoReserve(size_t size, int* error) {
858 if (buffer_length_ >= size)
859 return SR_SUCCESS;
860
861 if (char* new_buffer_alloc = new char[size + kAlignment]) {
862 char* new_buffer = reinterpret_cast<char*>(
863 ALIGNP(new_buffer_alloc, kAlignment));
864 memcpy(new_buffer, buffer_, data_length_);
865 delete [] buffer_alloc_;
866 buffer_alloc_ = new_buffer_alloc;
867 buffer_ = new_buffer;
868 buffer_length_ = size;
869 return SR_SUCCESS;
870 }
871
872 if (error) {
873 *error = ENOMEM;
874 }
875 return SR_ERROR;
876}
877
878///////////////////////////////////////////////////////////////////////////////
879
880ExternalMemoryStream::ExternalMemoryStream() {
881}
882
883ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
884 SetData(data, length);
885}
886
887ExternalMemoryStream::~ExternalMemoryStream() {
888}
889
890void ExternalMemoryStream::SetData(void* data, size_t length) {
891 data_length_ = buffer_length_ = length;
892 buffer_ = static_cast<char*>(data);
893 seek_position_ = 0;
894}
895
896///////////////////////////////////////////////////////////////////////////////
897// FifoBuffer
898///////////////////////////////////////////////////////////////////////////////
899
900FifoBuffer::FifoBuffer(size_t size)
901 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
902 data_length_(0), read_position_(0), owner_(Thread::Current()) {
903 // all events are done on the owner_ thread
904}
905
906FifoBuffer::FifoBuffer(size_t size, Thread* owner)
907 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
908 data_length_(0), read_position_(0), owner_(owner) {
909 // all events are done on the owner_ thread
910}
911
912FifoBuffer::~FifoBuffer() {
913}
914
915bool FifoBuffer::GetBuffered(size_t* size) const {
916 CritScope cs(&crit_);
917 *size = data_length_;
918 return true;
919}
920
921bool FifoBuffer::SetCapacity(size_t size) {
922 CritScope cs(&crit_);
923 if (data_length_ > size) {
924 return false;
925 }
926
927 if (size != buffer_length_) {
928 char* buffer = new char[size];
929 const size_t copy = data_length_;
930 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
931 memcpy(buffer, &buffer_[read_position_], tail_copy);
932 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
933 buffer_.reset(buffer);
934 read_position_ = 0;
935 buffer_length_ = size;
936 }
937 return true;
938}
939
940StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
941 size_t offset, size_t* bytes_read) {
942 CritScope cs(&crit_);
943 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
944}
945
946StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
947 size_t offset, size_t* bytes_written) {
948 CritScope cs(&crit_);
949 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
950}
951
952StreamState FifoBuffer::GetState() const {
953 return state_;
954}
955
956StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
957 size_t* bytes_read, int* error) {
958 CritScope cs(&crit_);
959 const bool was_writable = data_length_ < buffer_length_;
960 size_t copy = 0;
961 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
962
963 if (result == SR_SUCCESS) {
964 // If read was successful then adjust the read position and number of
965 // bytes buffered.
966 read_position_ = (read_position_ + copy) % buffer_length_;
967 data_length_ -= copy;
968 if (bytes_read) {
969 *bytes_read = copy;
970 }
971
972 // if we were full before, and now we're not, post an event
973 if (!was_writable && copy > 0) {
974 PostEvent(owner_, SE_WRITE, 0);
975 }
976 }
977 return result;
978}
979
980StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
981 size_t* bytes_written, int* error) {
982 CritScope cs(&crit_);
983
984 const bool was_readable = (data_length_ > 0);
985 size_t copy = 0;
986 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
987
988 if (result == SR_SUCCESS) {
989 // If write was successful then adjust the number of readable bytes.
990 data_length_ += copy;
991 if (bytes_written) {
992 *bytes_written = copy;
993 }
994
995 // if we didn't have any data to read before, and now we do, post an event
996 if (!was_readable && copy > 0) {
997 PostEvent(owner_, SE_READ, 0);
998 }
999 }
1000 return result;
1001}
1002
1003void FifoBuffer::Close() {
1004 CritScope cs(&crit_);
1005 state_ = SS_CLOSED;
1006}
1007
1008const void* FifoBuffer::GetReadData(size_t* size) {
1009 CritScope cs(&crit_);
1010 *size = (read_position_ + data_length_ <= buffer_length_) ?
1011 data_length_ : buffer_length_ - read_position_;
1012 return &buffer_[read_position_];
1013}
1014
1015void FifoBuffer::ConsumeReadData(size_t size) {
1016 CritScope cs(&crit_);
1017 ASSERT(size <= data_length_);
1018 const bool was_writable = data_length_ < buffer_length_;
1019 read_position_ = (read_position_ + size) % buffer_length_;
1020 data_length_ -= size;
1021 if (!was_writable && size > 0) {
1022 PostEvent(owner_, SE_WRITE, 0);
1023 }
1024}
1025
1026void* FifoBuffer::GetWriteBuffer(size_t* size) {
1027 CritScope cs(&crit_);
1028 if (state_ == SS_CLOSED) {
1029 return NULL;
1030 }
1031
1032 // if empty, reset the write position to the beginning, so we can get
1033 // the biggest possible block
1034 if (data_length_ == 0) {
1035 read_position_ = 0;
1036 }
1037
1038 const size_t write_position = (read_position_ + data_length_)
1039 % buffer_length_;
1040 *size = (write_position > read_position_ || data_length_ == 0) ?
1041 buffer_length_ - write_position : read_position_ - write_position;
1042 return &buffer_[write_position];
1043}
1044
1045void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1046 CritScope cs(&crit_);
1047 ASSERT(size <= buffer_length_ - data_length_);
1048 const bool was_readable = (data_length_ > 0);
1049 data_length_ += size;
1050 if (!was_readable && size > 0) {
1051 PostEvent(owner_, SE_READ, 0);
1052 }
1053}
1054
1055bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1056 CritScope cs(&crit_);
1057 *size = buffer_length_ - data_length_;
1058 return true;
1059}
1060
1061StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1062 size_t bytes,
1063 size_t offset,
1064 size_t* bytes_read) {
1065 if (offset >= data_length_) {
1066 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1067 }
1068
1069 const size_t available = data_length_ - offset;
1070 const size_t read_position = (read_position_ + offset) % buffer_length_;
1071 const size_t copy = _min(bytes, available);
1072 const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1073 char* const p = static_cast<char*>(buffer);
1074 memcpy(p, &buffer_[read_position], tail_copy);
1075 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1076
1077 if (bytes_read) {
1078 *bytes_read = copy;
1079 }
1080 return SR_SUCCESS;
1081}
1082
1083StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1084 size_t bytes,
1085 size_t offset,
1086 size_t* bytes_written) {
1087 if (state_ == SS_CLOSED) {
1088 return SR_EOS;
1089 }
1090
1091 if (data_length_ + offset >= buffer_length_) {
1092 return SR_BLOCK;
1093 }
1094
1095 const size_t available = buffer_length_ - data_length_ - offset;
1096 const size_t write_position = (read_position_ + data_length_ + offset)
1097 % buffer_length_;
1098 const size_t copy = _min(bytes, available);
1099 const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1100 const char* const p = static_cast<const char*>(buffer);
1101 memcpy(&buffer_[write_position], p, tail_copy);
1102 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1103
1104 if (bytes_written) {
1105 *bytes_written = copy;
1106 }
1107 return SR_SUCCESS;
1108}
1109
1110
1111
1112///////////////////////////////////////////////////////////////////////////////
1113// LoggingAdapter
1114///////////////////////////////////////////////////////////////////////////////
1115
1116LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1117 const std::string& label, bool hex_mode)
1118 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1119 set_label(label);
1120}
1121
1122void LoggingAdapter::set_label(const std::string& label) {
1123 label_.assign("[");
1124 label_.append(label);
1125 label_.append("]");
1126}
1127
1128StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1129 size_t* read, int* error) {
1130 size_t local_read; if (!read) read = &local_read;
1131 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1132 error);
1133 if (result == SR_SUCCESS) {
1134 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1135 }
1136 return result;
1137}
1138
1139StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1140 size_t* written, int* error) {
1141 size_t local_written;
1142 if (!written) written = &local_written;
1143 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1144 error);
1145 if (result == SR_SUCCESS) {
1146 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1147 &lms_);
1148 }
1149 return result;
1150}
1151
1152void LoggingAdapter::Close() {
1153 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1154 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1155 LOG_V(level_) << label_ << " Closed locally";
1156 StreamAdapterInterface::Close();
1157}
1158
1159void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1160 if (events & SE_OPEN) {
1161 LOG_V(level_) << label_ << " Open";
1162 } else if (events & SE_CLOSE) {
1163 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1164 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1165 LOG_V(level_) << label_ << " Closed with error: " << err;
1166 }
1167 StreamAdapterInterface::OnEvent(stream, events, err);
1168}
1169
1170///////////////////////////////////////////////////////////////////////////////
1171// StringStream - Reads/Writes to an external std::string
1172///////////////////////////////////////////////////////////////////////////////
1173
1174StringStream::StringStream(std::string& str)
1175 : str_(str), read_pos_(0), read_only_(false) {
1176}
1177
1178StringStream::StringStream(const std::string& str)
1179 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1180}
1181
1182StreamState StringStream::GetState() const {
1183 return SS_OPEN;
1184}
1185
1186StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1187 size_t* read, int* error) {
1188 size_t available = _min(buffer_len, str_.size() - read_pos_);
1189 if (!available)
1190 return SR_EOS;
1191 memcpy(buffer, str_.data() + read_pos_, available);
1192 read_pos_ += available;
1193 if (read)
1194 *read = available;
1195 return SR_SUCCESS;
1196}
1197
1198StreamResult StringStream::Write(const void* data, size_t data_len,
1199 size_t* written, int* error) {
1200 if (read_only_) {
1201 if (error) {
1202 *error = -1;
1203 }
1204 return SR_ERROR;
1205 }
1206 str_.append(static_cast<const char*>(data),
1207 static_cast<const char*>(data) + data_len);
1208 if (written)
1209 *written = data_len;
1210 return SR_SUCCESS;
1211}
1212
1213void StringStream::Close() {
1214}
1215
1216bool StringStream::SetPosition(size_t position) {
1217 if (position > str_.size())
1218 return false;
1219 read_pos_ = position;
1220 return true;
1221}
1222
1223bool StringStream::GetPosition(size_t* position) const {
1224 if (position)
1225 *position = read_pos_;
1226 return true;
1227}
1228
1229bool StringStream::GetSize(size_t* size) const {
1230 if (size)
1231 *size = str_.size();
1232 return true;
1233}
1234
1235bool StringStream::GetAvailable(size_t* size) const {
1236 if (size)
1237 *size = str_.size() - read_pos_;
1238 return true;
1239}
1240
1241bool StringStream::ReserveSize(size_t size) {
1242 if (read_only_)
1243 return false;
1244 str_.reserve(size);
1245 return true;
1246}
1247
1248///////////////////////////////////////////////////////////////////////////////
1249// StreamReference
1250///////////////////////////////////////////////////////////////////////////////
1251
1252StreamReference::StreamReference(StreamInterface* stream)
1253 : StreamAdapterInterface(stream, false) {
1254 // owner set to false so the destructor does not free the stream.
1255 stream_ref_count_ = new StreamRefCount(stream);
1256}
1257
1258StreamInterface* StreamReference::NewReference() {
1259 stream_ref_count_->AddReference();
1260 return new StreamReference(stream_ref_count_, stream());
1261}
1262
1263StreamReference::~StreamReference() {
1264 stream_ref_count_->Release();
1265}
1266
1267StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1268 StreamInterface* stream)
1269 : StreamAdapterInterface(stream, false),
1270 stream_ref_count_(stream_ref_count) {
1271}
1272
1273///////////////////////////////////////////////////////////////////////////////
1274
1275StreamResult Flow(StreamInterface* source,
1276 char* buffer, size_t buffer_len,
1277 StreamInterface* sink,
1278 size_t* data_len /* = NULL */) {
1279 ASSERT(buffer_len > 0);
1280
1281 StreamResult result;
1282 size_t count, read_pos, write_pos;
1283 if (data_len) {
1284 read_pos = *data_len;
1285 } else {
1286 read_pos = 0;
1287 }
1288
1289 bool end_of_stream = false;
1290 do {
1291 // Read until buffer is full, end of stream, or error
1292 while (!end_of_stream && (read_pos < buffer_len)) {
1293 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1294 &count, NULL);
1295 if (result == SR_EOS) {
1296 end_of_stream = true;
1297 } else if (result != SR_SUCCESS) {
1298 if (data_len) {
1299 *data_len = read_pos;
1300 }
1301 return result;
1302 } else {
1303 read_pos += count;
1304 }
1305 }
1306
1307 // Write until buffer is empty, or error (including end of stream)
1308 write_pos = 0;
1309 while (write_pos < read_pos) {
1310 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1311 &count, NULL);
1312 if (result != SR_SUCCESS) {
1313 if (data_len) {
1314 *data_len = read_pos - write_pos;
1315 if (write_pos > 0) {
1316 memmove(buffer, buffer + write_pos, *data_len);
1317 }
1318 }
1319 return result;
1320 }
1321 write_pos += count;
1322 }
1323
1324 read_pos = 0;
1325 } while (!end_of_stream);
1326
1327 if (data_len) {
1328 *data_len = 0;
1329 }
1330 return SR_SUCCESS;
1331}
1332
1333///////////////////////////////////////////////////////////////////////////////
1334
1335} // namespace rtc