henrike@webrtc.org | 0e118e7 | 2013-07-10 00:45:36 +0000 | [diff] [blame] | 1 | /* |
| 2 | * libjingle |
| 3 | * Copyright 2004--2005, Google Inc. |
| 4 | * |
| 5 | * Redistribution and use in source and binary forms, with or without |
| 6 | * modification, are permitted provided that the following conditions are met: |
| 7 | * |
| 8 | * 1. Redistributions of source code must retain the above copyright notice, |
| 9 | * this list of conditions and the following disclaimer. |
| 10 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 11 | * this list of conditions and the following disclaimer in the documentation |
| 12 | * and/or other materials provided with the distribution. |
| 13 | * 3. The name of the author may not be used to endorse or promote products |
| 14 | * derived from this software without specific prior written permission. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
| 17 | * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF |
| 18 | * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO |
| 19 | * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 20 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| 21 | * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; |
| 22 | * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, |
| 23 | * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR |
| 24 | * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF |
| 25 | * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 26 | */ |
| 27 | |
| 28 | #include <iomanip> |
| 29 | |
| 30 | #include "talk/base/asyncsocket.h" |
| 31 | #include "talk/base/logging.h" |
| 32 | #include "talk/base/socketfactory.h" |
| 33 | #include "talk/base/socketpool.h" |
| 34 | #include "talk/base/socketstream.h" |
| 35 | #include "talk/base/thread.h" |
| 36 | |
| 37 | namespace talk_base { |
| 38 | |
| 39 | /////////////////////////////////////////////////////////////////////////////// |
| 40 | // StreamCache - Caches a set of open streams, defers creation to a separate |
| 41 | // StreamPool. |
| 42 | /////////////////////////////////////////////////////////////////////////////// |
| 43 | |
| 44 | StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { |
| 45 | } |
| 46 | |
| 47 | StreamCache::~StreamCache() { |
| 48 | for (ConnectedList::iterator it = active_.begin(); it != active_.end(); |
| 49 | ++it) { |
| 50 | delete it->second; |
| 51 | } |
| 52 | for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); |
| 53 | ++it) { |
| 54 | delete it->second; |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | StreamInterface* StreamCache::RequestConnectedStream( |
| 59 | const SocketAddress& remote, int* err) { |
| 60 | LOG_F(LS_VERBOSE) << "(" << remote << ")"; |
| 61 | for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); |
| 62 | ++it) { |
| 63 | if (remote == it->first) { |
| 64 | it->second->SignalEvent.disconnect(this); |
| 65 | // Move from cached_ to active_ |
| 66 | active_.push_front(*it); |
| 67 | cached_.erase(it); |
| 68 | if (err) |
| 69 | *err = 0; |
| 70 | LOG_F(LS_VERBOSE) << "Providing cached stream"; |
| 71 | return active_.front().second; |
| 72 | } |
| 73 | } |
| 74 | if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { |
| 75 | // We track active streams so that we can remember their address |
| 76 | active_.push_front(ConnectedStream(remote, stream)); |
| 77 | LOG_F(LS_VERBOSE) << "Providing new stream"; |
| 78 | return active_.front().second; |
| 79 | } |
| 80 | return NULL; |
| 81 | } |
| 82 | |
| 83 | void StreamCache::ReturnConnectedStream(StreamInterface* stream) { |
| 84 | for (ConnectedList::iterator it = active_.begin(); it != active_.end(); |
| 85 | ++it) { |
| 86 | if (stream == it->second) { |
| 87 | LOG_F(LS_VERBOSE) << "(" << it->first << ")"; |
| 88 | if (stream->GetState() == SS_CLOSED) { |
| 89 | // Return closed streams |
| 90 | LOG_F(LS_VERBOSE) << "Returning closed stream"; |
| 91 | pool_->ReturnConnectedStream(it->second); |
| 92 | } else { |
| 93 | // Monitor open streams |
| 94 | stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); |
| 95 | LOG_F(LS_VERBOSE) << "Caching stream"; |
| 96 | cached_.push_front(*it); |
| 97 | } |
| 98 | active_.erase(it); |
| 99 | return; |
| 100 | } |
| 101 | } |
| 102 | ASSERT(false); |
| 103 | } |
| 104 | |
| 105 | void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { |
| 106 | if ((events & SE_CLOSE) == 0) { |
| 107 | LOG_F(LS_WARNING) << "(" << events << ", " << err |
| 108 | << ") received non-close event"; |
| 109 | return; |
| 110 | } |
| 111 | for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); |
| 112 | ++it) { |
| 113 | if (stream == it->second) { |
| 114 | LOG_F(LS_VERBOSE) << "(" << it->first << ")"; |
| 115 | // We don't cache closed streams, so return it. |
| 116 | it->second->SignalEvent.disconnect(this); |
| 117 | LOG_F(LS_VERBOSE) << "Returning closed stream"; |
| 118 | pool_->ReturnConnectedStream(it->second); |
| 119 | cached_.erase(it); |
| 120 | return; |
| 121 | } |
| 122 | } |
| 123 | ASSERT(false); |
| 124 | } |
| 125 | |
| 126 | ////////////////////////////////////////////////////////////////////// |
| 127 | // NewSocketPool |
| 128 | ////////////////////////////////////////////////////////////////////// |
| 129 | |
| 130 | NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { |
| 131 | } |
| 132 | |
| 133 | NewSocketPool::~NewSocketPool() { |
| 134 | } |
| 135 | |
| 136 | StreamInterface* |
| 137 | NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { |
| 138 | AsyncSocket* socket = |
| 139 | factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); |
| 140 | if (!socket) { |
| 141 | if (err) |
| 142 | *err = -1; |
| 143 | return NULL; |
| 144 | } |
| 145 | if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { |
| 146 | if (err) |
| 147 | *err = socket->GetError(); |
| 148 | delete socket; |
| 149 | return NULL; |
| 150 | } |
| 151 | if (err) |
| 152 | *err = 0; |
| 153 | return new SocketStream(socket); |
| 154 | } |
| 155 | |
| 156 | void |
| 157 | NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { |
| 158 | Thread::Current()->Dispose(stream); |
| 159 | } |
| 160 | |
| 161 | ////////////////////////////////////////////////////////////////////// |
| 162 | // ReuseSocketPool |
| 163 | ////////////////////////////////////////////////////////////////////// |
| 164 | |
| 165 | ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) |
| 166 | : factory_(factory), stream_(NULL), checked_out_(false) { |
| 167 | } |
| 168 | |
| 169 | ReuseSocketPool::~ReuseSocketPool() { |
| 170 | ASSERT(!checked_out_); |
| 171 | delete stream_; |
| 172 | } |
| 173 | |
| 174 | StreamInterface* |
| 175 | ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { |
| 176 | // Only one socket can be used from this "pool" at a time |
| 177 | ASSERT(!checked_out_); |
| 178 | if (!stream_) { |
| 179 | LOG_F(LS_VERBOSE) << "Creating new socket"; |
| 180 | int family = remote.family(); |
| 181 | // TODO: Deal with this when we/I clean up DNS resolution. |
| 182 | if (remote.IsUnresolvedIP()) { |
| 183 | family = AF_INET; |
| 184 | } |
| 185 | AsyncSocket* socket = |
| 186 | factory_->CreateAsyncSocket(family, SOCK_STREAM); |
| 187 | if (!socket) { |
| 188 | if (err) |
| 189 | *err = -1; |
| 190 | return NULL; |
| 191 | } |
| 192 | stream_ = new SocketStream(socket); |
| 193 | } |
| 194 | if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { |
| 195 | LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; |
| 196 | } else { |
| 197 | remote_ = remote; |
| 198 | stream_->Close(); |
| 199 | if ((stream_->GetSocket()->Connect(remote_) != 0) |
| 200 | && !stream_->GetSocket()->IsBlocking()) { |
| 201 | if (err) |
| 202 | *err = stream_->GetSocket()->GetError(); |
| 203 | return NULL; |
| 204 | } else { |
| 205 | LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; |
| 206 | } |
| 207 | } |
| 208 | stream_->SignalEvent.disconnect(this); |
| 209 | checked_out_ = true; |
| 210 | if (err) |
| 211 | *err = 0; |
| 212 | return stream_; |
| 213 | } |
| 214 | |
| 215 | void |
| 216 | ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { |
| 217 | ASSERT(stream == stream_); |
| 218 | ASSERT(checked_out_); |
| 219 | checked_out_ = false; |
| 220 | // Until the socket is reused, monitor it to determine if it closes. |
| 221 | stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); |
| 222 | } |
| 223 | |
| 224 | void |
| 225 | ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { |
| 226 | ASSERT(stream == stream_); |
| 227 | ASSERT(!checked_out_); |
| 228 | |
| 229 | // If the stream was written to and then immediately returned to us then |
| 230 | // we may get a writable notification for it, which we should ignore. |
| 231 | if (events == SE_WRITE) { |
| 232 | LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; |
| 233 | return; |
| 234 | } |
| 235 | |
| 236 | // If the peer sent data, we can't process it, so drop the connection. |
| 237 | // If the socket has closed, clean it up. |
| 238 | // In either case, we'll reconnect it the next time it is used. |
| 239 | ASSERT(0 != (events & (SE_READ|SE_CLOSE))); |
| 240 | if (0 != (events & SE_CLOSE)) { |
| 241 | LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; |
| 242 | } else { |
| 243 | LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; |
| 244 | } |
| 245 | stream_->Close(); |
| 246 | } |
| 247 | |
| 248 | /////////////////////////////////////////////////////////////////////////////// |
| 249 | // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached |
| 250 | // LoggingAdapters. |
| 251 | /////////////////////////////////////////////////////////////////////////////// |
| 252 | |
| 253 | LoggingPoolAdapter::LoggingPoolAdapter( |
| 254 | StreamPool* pool, LoggingSeverity level, const std::string& label, |
| 255 | bool binary_mode) |
| 256 | : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { |
| 257 | } |
| 258 | |
| 259 | LoggingPoolAdapter::~LoggingPoolAdapter() { |
| 260 | for (StreamList::iterator it = recycle_bin_.begin(); |
| 261 | it != recycle_bin_.end(); ++it) { |
| 262 | delete *it; |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | StreamInterface* LoggingPoolAdapter::RequestConnectedStream( |
| 267 | const SocketAddress& remote, int* err) { |
| 268 | if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { |
| 269 | ASSERT(SS_CLOSED != stream->GetState()); |
| 270 | std::stringstream ss; |
| 271 | ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) |
| 272 | << stream << ")"; |
| 273 | LOG_V(level_) << ss.str() |
| 274 | << ((SS_OPEN == stream->GetState()) ? " Connected" |
| 275 | : " Connecting") |
| 276 | << " to " << remote; |
| 277 | if (recycle_bin_.empty()) { |
| 278 | return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); |
| 279 | } |
| 280 | LoggingAdapter* logging = recycle_bin_.front(); |
| 281 | recycle_bin_.pop_front(); |
| 282 | logging->set_label(ss.str()); |
| 283 | logging->Attach(stream); |
| 284 | return logging; |
| 285 | } |
| 286 | return NULL; |
| 287 | } |
| 288 | |
| 289 | void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { |
| 290 | LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); |
| 291 | pool_->ReturnConnectedStream(logging->Detach()); |
| 292 | recycle_bin_.push_back(logging); |
| 293 | } |
| 294 | |
| 295 | /////////////////////////////////////////////////////////////////////////////// |
| 296 | |
| 297 | } // namespace talk_base |