blob: 02ae4094fa5a0ce5d11fdabeae0c00213f714a6b [file] [log] [blame]
henrike@webrtc.org0e118e72013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004 Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(POSIX)
29#include <sys/file.h>
30#endif // POSIX
31#include <sys/types.h>
32#include <sys/stat.h>
33#include <errno.h>
34#include <string>
35#include "talk/base/basictypes.h"
36#include "talk/base/common.h"
37#include "talk/base/logging.h"
38#include "talk/base/messagequeue.h"
39#include "talk/base/stream.h"
40#include "talk/base/stringencode.h"
41#include "talk/base/stringutils.h"
42#include "talk/base/thread.h"
43#include "talk/base/timeutils.h"
44
45#ifdef WIN32
46#include "talk/base/win32.h"
47#define fileno _fileno
48#endif
49
50namespace talk_base {
51
52///////////////////////////////////////////////////////////////////////////////
53// StreamInterface
54///////////////////////////////////////////////////////////////////////////////
55StreamInterface::~StreamInterface() {
56}
57
58StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
59 size_t* written, int* error) {
60 StreamResult result = SR_SUCCESS;
61 size_t total_written = 0, current_written;
62 while (total_written < data_len) {
63 result = Write(static_cast<const char*>(data) + total_written,
64 data_len - total_written, &current_written, error);
65 if (result != SR_SUCCESS)
66 break;
67 total_written += current_written;
68 }
69 if (written)
70 *written = total_written;
71 return result;
72}
73
74StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
75 size_t* read, int* error) {
76 StreamResult result = SR_SUCCESS;
77 size_t total_read = 0, current_read;
78 while (total_read < buffer_len) {
79 result = Read(static_cast<char*>(buffer) + total_read,
80 buffer_len - total_read, &current_read, error);
81 if (result != SR_SUCCESS)
82 break;
83 total_read += current_read;
84 }
85 if (read)
86 *read = total_read;
87 return result;
88}
89
90StreamResult StreamInterface::ReadLine(std::string* line) {
91 line->clear();
92 StreamResult result = SR_SUCCESS;
93 while (true) {
94 char ch;
95 result = Read(&ch, sizeof(ch), NULL, NULL);
96 if (result != SR_SUCCESS) {
97 break;
98 }
99 if (ch == '\n') {
100 break;
101 }
102 line->push_back(ch);
103 }
104 if (!line->empty()) { // give back the line we've collected so far with
105 result = SR_SUCCESS; // a success code. Otherwise return the last code
106 }
107 return result;
108}
109
110void StreamInterface::PostEvent(Thread* t, int events, int err) {
111 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
112}
113
114void StreamInterface::PostEvent(int events, int err) {
115 PostEvent(Thread::Current(), events, err);
116}
117
118StreamInterface::StreamInterface() {
119}
120
121void StreamInterface::OnMessage(Message* msg) {
122 if (MSG_POST_EVENT == msg->message_id) {
123 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
124 SignalEvent(this, pe->events, pe->error);
125 delete msg->pdata;
126 }
127}
128
129///////////////////////////////////////////////////////////////////////////////
130// StreamAdapterInterface
131///////////////////////////////////////////////////////////////////////////////
132
133StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
134 bool owned)
135 : stream_(stream), owned_(owned) {
136 if (NULL != stream_)
137 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
138}
139
140void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
141 if (NULL != stream_)
142 stream_->SignalEvent.disconnect(this);
143 if (owned_)
144 delete stream_;
145 stream_ = stream;
146 owned_ = owned;
147 if (NULL != stream_)
148 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
149}
150
151StreamInterface* StreamAdapterInterface::Detach() {
152 if (NULL != stream_)
153 stream_->SignalEvent.disconnect(this);
154 StreamInterface* stream = stream_;
155 stream_ = NULL;
156 return stream;
157}
158
159StreamAdapterInterface::~StreamAdapterInterface() {
160 if (owned_)
161 delete stream_;
162}
163
164///////////////////////////////////////////////////////////////////////////////
165// StreamTap
166///////////////////////////////////////////////////////////////////////////////
167
168StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
wu@webrtc.org5c9dd592013-10-25 21:18:33 +0000169 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000170 tap_error_(0) {
171 AttachTap(tap);
172}
173
174void StreamTap::AttachTap(StreamInterface* tap) {
175 tap_.reset(tap);
176}
177
178StreamInterface* StreamTap::DetachTap() {
179 return tap_.release();
180}
181
182StreamResult StreamTap::GetTapResult(int* error) {
183 if (error) {
184 *error = tap_error_;
185 }
186 return tap_result_;
187}
188
189StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
190 size_t* read, int* error) {
191 size_t backup_read;
192 if (!read) {
193 read = &backup_read;
194 }
195 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
196 read, error);
197 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
198 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
199 }
200 return res;
201}
202
203StreamResult StreamTap::Write(const void* data, size_t data_len,
204 size_t* written, int* error) {
205 size_t backup_written;
206 if (!written) {
207 written = &backup_written;
208 }
209 StreamResult res = StreamAdapterInterface::Write(data, data_len,
210 written, error);
211 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
212 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
213 }
214 return res;
215}
216
217///////////////////////////////////////////////////////////////////////////////
218// StreamSegment
219///////////////////////////////////////////////////////////////////////////////
220
221StreamSegment::StreamSegment(StreamInterface* stream)
222 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
223 length_(SIZE_UNKNOWN) {
224 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
225 stream->GetPosition(&start_);
226}
227
228StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
229 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
230 length_(length) {
231 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
232 stream->GetPosition(&start_);
233}
234
235StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
236 size_t* read, int* error) {
237 if (SIZE_UNKNOWN != length_) {
238 if (pos_ >= length_)
239 return SR_EOS;
240 buffer_len = _min(buffer_len, length_ - pos_);
241 }
242 size_t backup_read;
243 if (!read) {
244 read = &backup_read;
245 }
246 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
247 read, error);
248 if (SR_SUCCESS == result) {
249 pos_ += *read;
250 }
251 return result;
252}
253
254bool StreamSegment::SetPosition(size_t position) {
255 if (SIZE_UNKNOWN == start_)
256 return false; // Not seekable
257 if ((SIZE_UNKNOWN != length_) && (position > length_))
258 return false; // Seek past end of segment
259 if (!StreamAdapterInterface::SetPosition(start_ + position))
260 return false;
261 pos_ = position;
262 return true;
263}
264
265bool StreamSegment::GetPosition(size_t* position) const {
266 if (SIZE_UNKNOWN == start_)
267 return false; // Not seekable
268 if (!StreamAdapterInterface::GetPosition(position))
269 return false;
270 if (position) {
271 ASSERT(*position >= start_);
272 *position -= start_;
273 }
274 return true;
275}
276
277bool StreamSegment::GetSize(size_t* size) const {
278 if (!StreamAdapterInterface::GetSize(size))
279 return false;
280 if (size) {
281 if (SIZE_UNKNOWN != start_) {
282 ASSERT(*size >= start_);
283 *size -= start_;
284 }
285 if (SIZE_UNKNOWN != length_) {
286 *size = _min(*size, length_);
287 }
288 }
289 return true;
290}
291
292bool StreamSegment::GetAvailable(size_t* size) const {
293 if (!StreamAdapterInterface::GetAvailable(size))
294 return false;
295 if (size && (SIZE_UNKNOWN != length_))
296 *size = _min(*size, length_ - pos_);
297 return true;
298}
299
300///////////////////////////////////////////////////////////////////////////////
301// NullStream
302///////////////////////////////////////////////////////////////////////////////
303
304NullStream::NullStream() {
305}
306
307NullStream::~NullStream() {
308}
309
310StreamState NullStream::GetState() const {
311 return SS_OPEN;
312}
313
314StreamResult NullStream::Read(void* buffer, size_t buffer_len,
315 size_t* read, int* error) {
316 if (error) *error = -1;
317 return SR_ERROR;
318}
319
320StreamResult NullStream::Write(const void* data, size_t data_len,
321 size_t* written, int* error) {
322 if (written) *written = data_len;
323 return SR_SUCCESS;
324}
325
326void NullStream::Close() {
327}
328
329///////////////////////////////////////////////////////////////////////////////
330// FileStream
331///////////////////////////////////////////////////////////////////////////////
332
333FileStream::FileStream() : file_(NULL) {
334}
335
336FileStream::~FileStream() {
337 FileStream::Close();
338}
339
340bool FileStream::Open(const std::string& filename, const char* mode,
341 int* error) {
342 Close();
343#ifdef WIN32
344 std::wstring wfilename;
345 if (Utf8ToWindowsFilename(filename, &wfilename)) {
346 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
347 } else {
348 if (error) {
349 *error = -1;
350 return false;
351 }
352 }
353#else
354 file_ = fopen(filename.c_str(), mode);
355#endif
356 if (!file_ && error) {
357 *error = errno;
358 }
359 return (file_ != NULL);
360}
361
362bool FileStream::OpenShare(const std::string& filename, const char* mode,
363 int shflag, int* error) {
364 Close();
365#ifdef WIN32
366 std::wstring wfilename;
367 if (Utf8ToWindowsFilename(filename, &wfilename)) {
368 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
369 if (!file_ && error) {
370 *error = errno;
371 return false;
372 }
373 return file_ != NULL;
374 } else {
375 if (error) {
376 *error = -1;
377 }
378 return false;
379 }
380#else
381 return Open(filename, mode, error);
382#endif
383}
384
385bool FileStream::DisableBuffering() {
386 if (!file_)
387 return false;
388 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
389}
390
391StreamState FileStream::GetState() const {
392 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
393}
394
395StreamResult FileStream::Read(void* buffer, size_t buffer_len,
396 size_t* read, int* error) {
397 if (!file_)
398 return SR_EOS;
399 size_t result = fread(buffer, 1, buffer_len, file_);
400 if ((result == 0) && (buffer_len > 0)) {
401 if (feof(file_))
402 return SR_EOS;
403 if (error)
404 *error = errno;
405 return SR_ERROR;
406 }
407 if (read)
408 *read = result;
409 return SR_SUCCESS;
410}
411
412StreamResult FileStream::Write(const void* data, size_t data_len,
413 size_t* written, int* error) {
414 if (!file_)
415 return SR_EOS;
416 size_t result = fwrite(data, 1, data_len, file_);
417 if ((result == 0) && (data_len > 0)) {
418 if (error)
419 *error = errno;
420 return SR_ERROR;
421 }
422 if (written)
423 *written = result;
424 return SR_SUCCESS;
425}
426
427void FileStream::Close() {
428 if (file_) {
429 DoClose();
430 file_ = NULL;
431 }
432}
433
434bool FileStream::SetPosition(size_t position) {
435 if (!file_)
436 return false;
437 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
438}
439
440bool FileStream::GetPosition(size_t* position) const {
441 ASSERT(NULL != position);
442 if (!file_)
443 return false;
444 long result = ftell(file_);
445 if (result < 0)
446 return false;
447 if (position)
448 *position = result;
449 return true;
450}
451
452bool FileStream::GetSize(size_t* size) const {
453 ASSERT(NULL != size);
454 if (!file_)
455 return false;
456 struct stat file_stats;
457 if (fstat(fileno(file_), &file_stats) != 0)
458 return false;
459 if (size)
460 *size = file_stats.st_size;
461 return true;
462}
463
464bool FileStream::GetAvailable(size_t* size) const {
465 ASSERT(NULL != size);
466 if (!GetSize(size))
467 return false;
468 long result = ftell(file_);
469 if (result < 0)
470 return false;
471 if (size)
472 *size -= result;
473 return true;
474}
475
476bool FileStream::ReserveSize(size_t size) {
477 // TODO: extend the file to the proper length
478 return true;
479}
480
481bool FileStream::GetSize(const std::string& filename, size_t* size) {
482 struct stat file_stats;
483 if (stat(filename.c_str(), &file_stats) != 0)
484 return false;
485 *size = file_stats.st_size;
486 return true;
487}
488
489bool FileStream::Flush() {
490 if (file_) {
491 return (0 == fflush(file_));
492 }
493 // try to flush empty file?
494 ASSERT(false);
495 return false;
496}
497
wu@webrtc.org2a81a382014-01-03 22:08:47 +0000498#if defined(POSIX) && !defined(__native_client__)
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000499
500bool FileStream::TryLock() {
501 if (file_ == NULL) {
502 // Stream not open.
503 ASSERT(false);
504 return false;
505 }
506
507 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
508}
509
510bool FileStream::Unlock() {
511 if (file_ == NULL) {
512 // Stream not open.
513 ASSERT(false);
514 return false;
515 }
516
517 return flock(fileno(file_), LOCK_UN) == 0;
518}
519
520#endif
521
522void FileStream::DoClose() {
523 fclose(file_);
524}
525
wu@webrtc.org861d0732013-10-07 23:32:02 +0000526CircularFileStream::CircularFileStream(size_t max_size)
527 : max_write_size_(max_size),
528 position_(0),
529 marked_position_(max_size / 2),
530 last_write_position_(0),
531 read_segment_(READ_LATEST),
532 read_segment_available_(0) {
533}
534
535bool CircularFileStream::Open(
536 const std::string& filename, const char* mode, int* error) {
537 if (!FileStream::Open(filename.c_str(), mode, error))
538 return false;
539
540 if (strchr(mode, "r") != NULL) { // Opened in read mode.
541 // Check if the buffer has been overwritten and determine how to read the
542 // log in time sequence.
543 size_t file_size;
544 GetSize(&file_size);
545 if (file_size == position_) {
546 // The buffer has not been overwritten yet. Read 0 .. file_size
547 read_segment_ = READ_LATEST;
548 read_segment_available_ = file_size;
549 } else {
550 // The buffer has been over written. There are three segments: The first
551 // one is 0 .. marked_position_, which is the marked earliest log. The
552 // second one is position_ .. file_size, which is the middle log. The
553 // last one is marked_position_ .. position_, which is the latest log.
554 read_segment_ = READ_MARKED;
555 read_segment_available_ = marked_position_;
556 last_write_position_ = position_;
557 }
558
559 // Read from the beginning.
560 position_ = 0;
561 SetPosition(position_);
562 }
563
564 return true;
565}
566
567StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
568 size_t* read, int* error) {
569 if (read_segment_available_ == 0) {
570 size_t file_size;
571 switch (read_segment_) {
572 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
573 read_segment_ = READ_MIDDLE;
574 position_ = last_write_position_;
575 SetPosition(position_);
576 GetSize(&file_size);
577 read_segment_available_ = file_size - position_;
578 break;
579
580 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
581 read_segment_ = READ_LATEST;
582 position_ = marked_position_;
583 SetPosition(position_);
584 read_segment_available_ = last_write_position_ - position_;
585 break;
586
587 default: // Finished READ_LATEST and return EOS.
588 return talk_base::SR_EOS;
589 }
590 }
591
592 size_t local_read;
593 if (!read) read = &local_read;
594
595 size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
596 talk_base::StreamResult result
597 = talk_base::FileStream::Read(buffer, to_read, read, error);
598 if (result == talk_base::SR_SUCCESS) {
599 read_segment_available_ -= *read;
600 position_ += *read;
601 }
602 return result;
603}
604
605StreamResult CircularFileStream::Write(const void* data, size_t data_len,
606 size_t* written, int* error) {
607 if (position_ >= max_write_size_) {
608 ASSERT(position_ == max_write_size_);
609 position_ = marked_position_;
610 SetPosition(position_);
611 }
612
613 size_t local_written;
614 if (!written) written = &local_written;
615
616 size_t to_eof = max_write_size_ - position_;
617 size_t to_write = talk_base::_min(data_len, to_eof);
618 talk_base::StreamResult result
619 = talk_base::FileStream::Write(data, to_write, written, error);
620 if (result == talk_base::SR_SUCCESS) {
621 position_ += *written;
622 }
623 return result;
624}
625
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000626AsyncWriteStream::~AsyncWriteStream() {
627 write_thread_->Clear(this, 0, NULL);
628 ClearBufferAndWrite();
629
630 CritScope cs(&crit_stream_);
631 stream_.reset();
632}
633
634// This is needed by some stream writers, such as RtpDumpWriter.
635bool AsyncWriteStream::GetPosition(size_t* position) const {
636 CritScope cs(&crit_stream_);
637 return stream_->GetPosition(position);
638}
639
640// This is needed by some stream writers, such as the plugin log writers.
641StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
642 size_t* read, int* error) {
643 CritScope cs(&crit_stream_);
644 return stream_->Read(buffer, buffer_len, read, error);
645}
646
647void AsyncWriteStream::Close() {
648 if (state_ == SS_CLOSED) {
649 return;
650 }
651
652 write_thread_->Clear(this, 0, NULL);
653 ClearBufferAndWrite();
654
655 CritScope cs(&crit_stream_);
656 stream_->Close();
657 state_ = SS_CLOSED;
658}
659
660StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
661 size_t* written, int* error) {
662 if (state_ == SS_CLOSED) {
663 return SR_ERROR;
664 }
665
666 size_t previous_buffer_length = 0;
667 {
668 CritScope cs(&crit_buffer_);
669 previous_buffer_length = buffer_.length();
670 buffer_.AppendData(data, data_len);
671 }
672
673 if (previous_buffer_length == 0) {
674 // If there's stuff already in the buffer, then we already called
675 // Post and the write_thread_ hasn't pulled it out yet, so we
676 // don't need to re-Post.
677 write_thread_->Post(this, 0, NULL);
678 }
679 // Return immediately, assuming that it works.
680 if (written) {
681 *written = data_len;
682 }
683 return SR_SUCCESS;
684}
685
686void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
687 ClearBufferAndWrite();
688}
689
690bool AsyncWriteStream::Flush() {
691 if (state_ == SS_CLOSED) {
692 return false;
693 }
694
695 ClearBufferAndWrite();
696
697 CritScope cs(&crit_stream_);
698 return stream_->Flush();
699}
700
701void AsyncWriteStream::ClearBufferAndWrite() {
702 Buffer to_write;
703 {
704 CritScope cs_buffer(&crit_buffer_);
705 buffer_.TransferTo(&to_write);
706 }
707
708 if (to_write.length() > 0) {
709 CritScope cs(&crit_stream_);
710 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
711 }
712}
713
henrika@webrtc.org8485ec62014-01-14 10:00:58 +0000714#if defined(POSIX) && !defined(__native_client__)
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000715
716// Have to identically rewrite the FileStream destructor or else it would call
717// the base class's Close() instead of the sub-class's.
718POpenStream::~POpenStream() {
719 POpenStream::Close();
720}
721
722bool POpenStream::Open(const std::string& subcommand,
723 const char* mode,
724 int* error) {
725 Close();
726 file_ = popen(subcommand.c_str(), mode);
727 if (file_ == NULL) {
728 if (error)
729 *error = errno;
730 return false;
731 }
732 return true;
733}
734
735bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
736 int shflag, int* error) {
737 return Open(subcommand, mode, error);
738}
739
740void POpenStream::DoClose() {
741 wait_status_ = pclose(file_);
742}
743
744#endif
745
746///////////////////////////////////////////////////////////////////////////////
747// MemoryStream
748///////////////////////////////////////////////////////////////////////////////
749
750MemoryStreamBase::MemoryStreamBase()
751 : buffer_(NULL), buffer_length_(0), data_length_(0),
752 seek_position_(0) {
753}
754
755StreamState MemoryStreamBase::GetState() const {
756 return SS_OPEN;
757}
758
759StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
760 size_t* bytes_read, int* error) {
761 if (seek_position_ >= data_length_) {
762 return SR_EOS;
763 }
764 size_t available = data_length_ - seek_position_;
765 if (bytes > available) {
766 // Read partial buffer
767 bytes = available;
768 }
769 memcpy(buffer, &buffer_[seek_position_], bytes);
770 seek_position_ += bytes;
771 if (bytes_read) {
772 *bytes_read = bytes;
773 }
774 return SR_SUCCESS;
775}
776
777StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
778 size_t* bytes_written, int* error) {
779 size_t available = buffer_length_ - seek_position_;
780 if (0 == available) {
781 // Increase buffer size to the larger of:
782 // a) new position rounded up to next 256 bytes
783 // b) double the previous length
784 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
785 buffer_length_ * 2);
786 StreamResult result = DoReserve(new_buffer_length, error);
787 if (SR_SUCCESS != result) {
788 return result;
789 }
790 ASSERT(buffer_length_ >= new_buffer_length);
791 available = buffer_length_ - seek_position_;
792 }
793
794 if (bytes > available) {
795 bytes = available;
796 }
797 memcpy(&buffer_[seek_position_], buffer, bytes);
798 seek_position_ += bytes;
799 if (data_length_ < seek_position_) {
800 data_length_ = seek_position_;
801 }
802 if (bytes_written) {
803 *bytes_written = bytes;
804 }
805 return SR_SUCCESS;
806}
807
808void MemoryStreamBase::Close() {
809 // nothing to do
810}
811
812bool MemoryStreamBase::SetPosition(size_t position) {
813 if (position > data_length_)
814 return false;
815 seek_position_ = position;
816 return true;
817}
818
819bool MemoryStreamBase::GetPosition(size_t* position) const {
820 if (position)
821 *position = seek_position_;
822 return true;
823}
824
825bool MemoryStreamBase::GetSize(size_t* size) const {
826 if (size)
827 *size = data_length_;
828 return true;
829}
830
831bool MemoryStreamBase::GetAvailable(size_t* size) const {
832 if (size)
833 *size = data_length_ - seek_position_;
834 return true;
835}
836
837bool MemoryStreamBase::ReserveSize(size_t size) {
838 return (SR_SUCCESS == DoReserve(size, NULL));
839}
840
841StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
842 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
843}
844
845///////////////////////////////////////////////////////////////////////////////
846
847MemoryStream::MemoryStream()
848 : buffer_alloc_(NULL) {
849}
850
851MemoryStream::MemoryStream(const char* data)
852 : buffer_alloc_(NULL) {
853 SetData(data, strlen(data));
854}
855
856MemoryStream::MemoryStream(const void* data, size_t length)
857 : buffer_alloc_(NULL) {
858 SetData(data, length);
859}
860
861MemoryStream::~MemoryStream() {
862 delete [] buffer_alloc_;
863}
864
865void MemoryStream::SetData(const void* data, size_t length) {
866 data_length_ = buffer_length_ = length;
867 delete [] buffer_alloc_;
868 buffer_alloc_ = new char[buffer_length_ + kAlignment];
869 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
870 memcpy(buffer_, data, data_length_);
871 seek_position_ = 0;
872}
873
874StreamResult MemoryStream::DoReserve(size_t size, int* error) {
875 if (buffer_length_ >= size)
876 return SR_SUCCESS;
877
878 if (char* new_buffer_alloc = new char[size + kAlignment]) {
879 char* new_buffer = reinterpret_cast<char*>(
880 ALIGNP(new_buffer_alloc, kAlignment));
881 memcpy(new_buffer, buffer_, data_length_);
882 delete [] buffer_alloc_;
883 buffer_alloc_ = new_buffer_alloc;
884 buffer_ = new_buffer;
885 buffer_length_ = size;
886 return SR_SUCCESS;
887 }
888
889 if (error) {
890 *error = ENOMEM;
891 }
892 return SR_ERROR;
893}
894
895///////////////////////////////////////////////////////////////////////////////
896
897ExternalMemoryStream::ExternalMemoryStream() {
898}
899
900ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
901 SetData(data, length);
902}
903
904ExternalMemoryStream::~ExternalMemoryStream() {
905}
906
907void ExternalMemoryStream::SetData(void* data, size_t length) {
908 data_length_ = buffer_length_ = length;
909 buffer_ = static_cast<char*>(data);
910 seek_position_ = 0;
911}
912
913///////////////////////////////////////////////////////////////////////////////
914// FifoBuffer
915///////////////////////////////////////////////////////////////////////////////
916
917FifoBuffer::FifoBuffer(size_t size)
918 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
919 data_length_(0), read_position_(0), owner_(Thread::Current()) {
920 // all events are done on the owner_ thread
921}
922
923FifoBuffer::FifoBuffer(size_t size, Thread* owner)
924 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
925 data_length_(0), read_position_(0), owner_(owner) {
926 // all events are done on the owner_ thread
927}
928
929FifoBuffer::~FifoBuffer() {
930}
931
932bool FifoBuffer::GetBuffered(size_t* size) const {
933 CritScope cs(&crit_);
934 *size = data_length_;
935 return true;
936}
937
938bool FifoBuffer::SetCapacity(size_t size) {
939 CritScope cs(&crit_);
940 if (data_length_ > size) {
941 return false;
942 }
943
944 if (size != buffer_length_) {
945 char* buffer = new char[size];
946 const size_t copy = data_length_;
947 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
948 memcpy(buffer, &buffer_[read_position_], tail_copy);
949 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
950 buffer_.reset(buffer);
951 read_position_ = 0;
952 buffer_length_ = size;
953 }
954 return true;
955}
956
957StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
958 size_t offset, size_t* bytes_read) {
959 CritScope cs(&crit_);
960 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
961}
962
963StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
964 size_t offset, size_t* bytes_written) {
965 CritScope cs(&crit_);
966 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
967}
968
969StreamState FifoBuffer::GetState() const {
970 return state_;
971}
972
973StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
974 size_t* bytes_read, int* error) {
975 CritScope cs(&crit_);
976 const bool was_writable = data_length_ < buffer_length_;
977 size_t copy = 0;
978 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
979
980 if (result == SR_SUCCESS) {
981 // If read was successful then adjust the read position and number of
982 // bytes buffered.
983 read_position_ = (read_position_ + copy) % buffer_length_;
984 data_length_ -= copy;
985 if (bytes_read) {
986 *bytes_read = copy;
987 }
988
989 // if we were full before, and now we're not, post an event
990 if (!was_writable && copy > 0) {
991 PostEvent(owner_, SE_WRITE, 0);
992 }
993 }
994 return result;
995}
996
997StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
998 size_t* bytes_written, int* error) {
999 CritScope cs(&crit_);
1000
1001 const bool was_readable = (data_length_ > 0);
1002 size_t copy = 0;
1003 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
1004
1005 if (result == SR_SUCCESS) {
1006 // If write was successful then adjust the number of readable bytes.
1007 data_length_ += copy;
1008 if (bytes_written) {
1009 *bytes_written = copy;
1010 }
1011
1012 // if we didn't have any data to read before, and now we do, post an event
1013 if (!was_readable && copy > 0) {
1014 PostEvent(owner_, SE_READ, 0);
1015 }
1016 }
1017 return result;
1018}
1019
1020void FifoBuffer::Close() {
1021 CritScope cs(&crit_);
1022 state_ = SS_CLOSED;
1023}
1024
1025const void* FifoBuffer::GetReadData(size_t* size) {
1026 CritScope cs(&crit_);
1027 *size = (read_position_ + data_length_ <= buffer_length_) ?
1028 data_length_ : buffer_length_ - read_position_;
1029 return &buffer_[read_position_];
1030}
1031
1032void FifoBuffer::ConsumeReadData(size_t size) {
1033 CritScope cs(&crit_);
1034 ASSERT(size <= data_length_);
1035 const bool was_writable = data_length_ < buffer_length_;
1036 read_position_ = (read_position_ + size) % buffer_length_;
1037 data_length_ -= size;
1038 if (!was_writable && size > 0) {
1039 PostEvent(owner_, SE_WRITE, 0);
1040 }
1041}
1042
1043void* FifoBuffer::GetWriteBuffer(size_t* size) {
1044 CritScope cs(&crit_);
1045 if (state_ == SS_CLOSED) {
1046 return NULL;
1047 }
1048
1049 // if empty, reset the write position to the beginning, so we can get
1050 // the biggest possible block
1051 if (data_length_ == 0) {
1052 read_position_ = 0;
1053 }
1054
1055 const size_t write_position = (read_position_ + data_length_)
1056 % buffer_length_;
1057 *size = (write_position > read_position_ || data_length_ == 0) ?
1058 buffer_length_ - write_position : read_position_ - write_position;
1059 return &buffer_[write_position];
1060}
1061
1062void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1063 CritScope cs(&crit_);
1064 ASSERT(size <= buffer_length_ - data_length_);
1065 const bool was_readable = (data_length_ > 0);
1066 data_length_ += size;
1067 if (!was_readable && size > 0) {
1068 PostEvent(owner_, SE_READ, 0);
1069 }
1070}
1071
1072bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1073 CritScope cs(&crit_);
1074 *size = buffer_length_ - data_length_;
1075 return true;
1076}
1077
1078StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1079 size_t bytes,
1080 size_t offset,
1081 size_t* bytes_read) {
1082 if (offset >= data_length_) {
1083 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1084 }
1085
1086 const size_t available = data_length_ - offset;
1087 const size_t read_position = (read_position_ + offset) % buffer_length_;
1088 const size_t copy = _min(bytes, available);
1089 const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1090 char* const p = static_cast<char*>(buffer);
1091 memcpy(p, &buffer_[read_position], tail_copy);
1092 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1093
1094 if (bytes_read) {
1095 *bytes_read = copy;
1096 }
1097 return SR_SUCCESS;
1098}
1099
1100StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1101 size_t bytes,
1102 size_t offset,
1103 size_t* bytes_written) {
1104 if (state_ == SS_CLOSED) {
1105 return SR_EOS;
1106 }
1107
1108 if (data_length_ + offset >= buffer_length_) {
1109 return SR_BLOCK;
1110 }
1111
1112 const size_t available = buffer_length_ - data_length_ - offset;
1113 const size_t write_position = (read_position_ + data_length_ + offset)
1114 % buffer_length_;
1115 const size_t copy = _min(bytes, available);
1116 const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1117 const char* const p = static_cast<const char*>(buffer);
1118 memcpy(&buffer_[write_position], p, tail_copy);
1119 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1120
1121 if (bytes_written) {
1122 *bytes_written = copy;
1123 }
1124 return SR_SUCCESS;
1125}
1126
1127
1128
1129///////////////////////////////////////////////////////////////////////////////
1130// LoggingAdapter
1131///////////////////////////////////////////////////////////////////////////////
1132
1133LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1134 const std::string& label, bool hex_mode)
1135 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1136 set_label(label);
1137}
1138
1139void LoggingAdapter::set_label(const std::string& label) {
1140 label_.assign("[");
1141 label_.append(label);
1142 label_.append("]");
1143}
1144
1145StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1146 size_t* read, int* error) {
1147 size_t local_read; if (!read) read = &local_read;
1148 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1149 error);
1150 if (result == SR_SUCCESS) {
1151 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1152 }
1153 return result;
1154}
1155
1156StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1157 size_t* written, int* error) {
1158 size_t local_written;
1159 if (!written) written = &local_written;
1160 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1161 error);
1162 if (result == SR_SUCCESS) {
1163 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1164 &lms_);
1165 }
1166 return result;
1167}
1168
1169void LoggingAdapter::Close() {
1170 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1171 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1172 LOG_V(level_) << label_ << " Closed locally";
1173 StreamAdapterInterface::Close();
1174}
1175
1176void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1177 if (events & SE_OPEN) {
1178 LOG_V(level_) << label_ << " Open";
1179 } else if (events & SE_CLOSE) {
1180 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1181 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1182 LOG_V(level_) << label_ << " Closed with error: " << err;
1183 }
1184 StreamAdapterInterface::OnEvent(stream, events, err);
1185}
1186
1187///////////////////////////////////////////////////////////////////////////////
1188// StringStream - Reads/Writes to an external std::string
1189///////////////////////////////////////////////////////////////////////////////
1190
1191StringStream::StringStream(std::string& str)
1192 : str_(str), read_pos_(0), read_only_(false) {
1193}
1194
1195StringStream::StringStream(const std::string& str)
1196 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1197}
1198
1199StreamState StringStream::GetState() const {
1200 return SS_OPEN;
1201}
1202
1203StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1204 size_t* read, int* error) {
1205 size_t available = _min(buffer_len, str_.size() - read_pos_);
1206 if (!available)
1207 return SR_EOS;
1208 memcpy(buffer, str_.data() + read_pos_, available);
1209 read_pos_ += available;
1210 if (read)
1211 *read = available;
1212 return SR_SUCCESS;
1213}
1214
1215StreamResult StringStream::Write(const void* data, size_t data_len,
1216 size_t* written, int* error) {
1217 if (read_only_) {
1218 if (error) {
1219 *error = -1;
1220 }
1221 return SR_ERROR;
1222 }
1223 str_.append(static_cast<const char*>(data),
1224 static_cast<const char*>(data) + data_len);
1225 if (written)
1226 *written = data_len;
1227 return SR_SUCCESS;
1228}
1229
1230void StringStream::Close() {
1231}
1232
1233bool StringStream::SetPosition(size_t position) {
1234 if (position > str_.size())
1235 return false;
1236 read_pos_ = position;
1237 return true;
1238}
1239
1240bool StringStream::GetPosition(size_t* position) const {
1241 if (position)
1242 *position = read_pos_;
1243 return true;
1244}
1245
1246bool StringStream::GetSize(size_t* size) const {
1247 if (size)
1248 *size = str_.size();
1249 return true;
1250}
1251
1252bool StringStream::GetAvailable(size_t* size) const {
1253 if (size)
1254 *size = str_.size() - read_pos_;
1255 return true;
1256}
1257
1258bool StringStream::ReserveSize(size_t size) {
1259 if (read_only_)
1260 return false;
1261 str_.reserve(size);
1262 return true;
1263}
1264
1265///////////////////////////////////////////////////////////////////////////////
1266// StreamReference
1267///////////////////////////////////////////////////////////////////////////////
1268
1269StreamReference::StreamReference(StreamInterface* stream)
1270 : StreamAdapterInterface(stream, false) {
1271 // owner set to false so the destructor does not free the stream.
1272 stream_ref_count_ = new StreamRefCount(stream);
1273}
1274
1275StreamInterface* StreamReference::NewReference() {
1276 stream_ref_count_->AddReference();
1277 return new StreamReference(stream_ref_count_, stream());
1278}
1279
1280StreamReference::~StreamReference() {
1281 stream_ref_count_->Release();
1282}
1283
1284StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1285 StreamInterface* stream)
1286 : StreamAdapterInterface(stream, false),
1287 stream_ref_count_(stream_ref_count) {
1288}
1289
1290///////////////////////////////////////////////////////////////////////////////
1291
1292StreamResult Flow(StreamInterface* source,
1293 char* buffer, size_t buffer_len,
1294 StreamInterface* sink,
1295 size_t* data_len /* = NULL */) {
1296 ASSERT(buffer_len > 0);
1297
1298 StreamResult result;
1299 size_t count, read_pos, write_pos;
1300 if (data_len) {
1301 read_pos = *data_len;
1302 } else {
1303 read_pos = 0;
1304 }
1305
1306 bool end_of_stream = false;
1307 do {
1308 // Read until buffer is full, end of stream, or error
1309 while (!end_of_stream && (read_pos < buffer_len)) {
1310 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1311 &count, NULL);
1312 if (result == SR_EOS) {
1313 end_of_stream = true;
1314 } else if (result != SR_SUCCESS) {
1315 if (data_len) {
1316 *data_len = read_pos;
1317 }
1318 return result;
1319 } else {
1320 read_pos += count;
1321 }
1322 }
1323
1324 // Write until buffer is empty, or error (including end of stream)
1325 write_pos = 0;
1326 while (write_pos < read_pos) {
1327 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1328 &count, NULL);
1329 if (result != SR_SUCCESS) {
1330 if (data_len) {
1331 *data_len = read_pos - write_pos;
1332 if (write_pos > 0) {
1333 memmove(buffer, buffer + write_pos, *data_len);
1334 }
1335 }
1336 return result;
1337 }
1338 write_pos += count;
1339 }
1340
1341 read_pos = 0;
1342 } while (!end_of_stream);
1343
1344 if (data_len) {
1345 *data_len = 0;
1346 }
1347 return SR_SUCCESS;
1348}
1349
1350///////////////////////////////////////////////////////////////////////////////
1351
1352} // namespace talk_base