blob: f8e8ddeb811ec0f1105ef02177b96a88f6f7689a [file] [log] [blame]
henrike@webrtc.orgf7795df2014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/virtualsocketserver.h"
12
13#include <errno.h>
14#include <math.h>
15
16#include <algorithm>
17#include <map>
18#include <vector>
19
20#include "webrtc/base/common.h"
21#include "webrtc/base/logging.h"
22#include "webrtc/base/physicalsocketserver.h"
23#include "webrtc/base/socketaddresspair.h"
24#include "webrtc/base/thread.h"
25#include "webrtc/base/timeutils.h"
26
27namespace rtc {
28#if defined(WEBRTC_WIN)
29const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} };
30#else
31// This value is entirely arbitrary, hence the lack of concern about endianness.
32const in_addr kInitialNextIPv4 = { 0x01000000 };
33#endif
34// Starts at ::2 so as to not cause confusion with ::1.
35const in6_addr kInitialNextIPv6 = { { {
36 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
37 } } };
38
39const uint16 kFirstEphemeralPort = 49152;
40const uint16 kLastEphemeralPort = 65535;
41const uint16 kEphemeralPortCount = kLastEphemeralPort - kFirstEphemeralPort + 1;
42const uint32 kDefaultNetworkCapacity = 64 * 1024;
43const uint32 kDefaultTcpBufferSize = 32 * 1024;
44
45const uint32 UDP_HEADER_SIZE = 28; // IP + UDP headers
46const uint32 TCP_HEADER_SIZE = 40; // IP + TCP headers
47const uint32 TCP_MSS = 1400; // Maximum segment size
48
49// Note: The current algorithm doesn't work for sample sizes smaller than this.
50const int NUM_SAMPLES = 1000;
51
52enum {
53 MSG_ID_PACKET,
54 MSG_ID_CONNECT,
55 MSG_ID_DISCONNECT,
56};
57
58// Packets are passed between sockets as messages. We copy the data just like
59// the kernel does.
60class Packet : public MessageData {
61 public:
62 Packet(const char* data, size_t size, const SocketAddress& from)
63 : size_(size), consumed_(0), from_(from) {
64 ASSERT(NULL != data);
65 data_ = new char[size_];
66 memcpy(data_, data, size_);
67 }
68
69 virtual ~Packet() {
70 delete[] data_;
71 }
72
73 const char* data() const { return data_ + consumed_; }
74 size_t size() const { return size_ - consumed_; }
75 const SocketAddress& from() const { return from_; }
76
77 // Remove the first size bytes from the data.
78 void Consume(size_t size) {
79 ASSERT(size + consumed_ < size_);
80 consumed_ += size;
81 }
82
83 private:
84 char* data_;
85 size_t size_, consumed_;
86 SocketAddress from_;
87};
88
89struct MessageAddress : public MessageData {
90 explicit MessageAddress(const SocketAddress& a) : addr(a) { }
91 SocketAddress addr;
92};
93
94// Implements the socket interface using the virtual network. Packets are
95// passed as messages using the message queue of the socket server.
96class VirtualSocket : public AsyncSocket, public MessageHandler {
97 public:
98 VirtualSocket(VirtualSocketServer* server, int family, int type, bool async)
99 : server_(server), family_(family), type_(type), async_(async),
100 state_(CS_CLOSED), error_(0), listen_queue_(NULL),
101 write_enabled_(false),
102 network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) {
103 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
104 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams
105 }
106
107 virtual ~VirtualSocket() {
108 Close();
109
110 for (RecvBuffer::iterator it = recv_buffer_.begin();
111 it != recv_buffer_.end(); ++it) {
112 delete *it;
113 }
114 }
115
116 virtual SocketAddress GetLocalAddress() const {
117 return local_addr_;
118 }
119
120 virtual SocketAddress GetRemoteAddress() const {
121 return remote_addr_;
122 }
123
124 // Used by server sockets to set the local address without binding.
125 void SetLocalAddress(const SocketAddress& addr) {
126 local_addr_ = addr;
127 }
128
129 virtual int Bind(const SocketAddress& addr) {
130 if (!local_addr_.IsNil()) {
131 error_ = EINVAL;
132 return -1;
133 }
134 local_addr_ = addr;
135 int result = server_->Bind(this, &local_addr_);
136 if (result != 0) {
137 local_addr_.Clear();
138 error_ = EADDRINUSE;
139 } else {
140 bound_ = true;
141 was_any_ = addr.IsAnyIP();
142 }
143 return result;
144 }
145
146 virtual int Connect(const SocketAddress& addr) {
147 return InitiateConnect(addr, true);
148 }
149
150 virtual int Close() {
151 if (!local_addr_.IsNil() && bound_) {
152 // Remove from the binding table.
153 server_->Unbind(local_addr_, this);
154 bound_ = false;
155 }
156
157 if (SOCK_STREAM == type_) {
158 // Cancel pending sockets
159 if (listen_queue_) {
160 while (!listen_queue_->empty()) {
161 SocketAddress addr = listen_queue_->front();
162
163 // Disconnect listening socket.
164 server_->Disconnect(server_->LookupBinding(addr));
165 listen_queue_->pop_front();
166 }
167 delete listen_queue_;
168 listen_queue_ = NULL;
169 }
170 // Disconnect stream sockets
171 if (CS_CONNECTED == state_) {
172 // Disconnect remote socket, check if it is a child of a server socket.
173 VirtualSocket* socket =
174 server_->LookupConnection(local_addr_, remote_addr_);
175 if (!socket) {
176 // Not a server socket child, then see if it is bound.
177 // TODO: If this is indeed a server socket that has no
178 // children this will cause the server socket to be
179 // closed. This might lead to unexpected results, how to fix this?
180 socket = server_->LookupBinding(remote_addr_);
181 }
182 server_->Disconnect(socket);
183
184 // Remove mapping for both directions.
185 server_->RemoveConnection(remote_addr_, local_addr_);
186 server_->RemoveConnection(local_addr_, remote_addr_);
187 }
188 // Cancel potential connects
189 MessageList msgs;
190 if (server_->msg_queue_) {
191 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
192 }
193 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
194 ASSERT(NULL != it->pdata);
195 MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
196
197 // Lookup remote side.
198 VirtualSocket* socket = server_->LookupConnection(local_addr_,
199 data->addr);
200 if (socket) {
201 // Server socket, remote side is a socket retreived by
202 // accept. Accepted sockets are not bound so we will not
203 // find it by looking in the bindings table.
204 server_->Disconnect(socket);
205 server_->RemoveConnection(local_addr_, data->addr);
206 } else {
207 server_->Disconnect(server_->LookupBinding(data->addr));
208 }
209 delete data;
210 }
211 // Clear incoming packets and disconnect messages
212 if (server_->msg_queue_) {
213 server_->msg_queue_->Clear(this);
214 }
215 }
216
217 state_ = CS_CLOSED;
218 local_addr_.Clear();
219 remote_addr_.Clear();
220 return 0;
221 }
222
223 virtual int Send(const void *pv, size_t cb) {
224 if (CS_CONNECTED != state_) {
225 error_ = ENOTCONN;
226 return -1;
227 }
228 if (SOCK_DGRAM == type_) {
229 return SendUdp(pv, cb, remote_addr_);
230 } else {
231 return SendTcp(pv, cb);
232 }
233 }
234
235 virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
236 if (SOCK_DGRAM == type_) {
237 return SendUdp(pv, cb, addr);
238 } else {
239 if (CS_CONNECTED != state_) {
240 error_ = ENOTCONN;
241 return -1;
242 }
243 return SendTcp(pv, cb);
244 }
245 }
246
247 virtual int Recv(void *pv, size_t cb) {
248 SocketAddress addr;
249 return RecvFrom(pv, cb, &addr);
250 }
251
252 virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
253 // If we don't have a packet, then either error or wait for one to arrive.
254 if (recv_buffer_.empty()) {
255 if (async_) {
256 error_ = EAGAIN;
257 return -1;
258 }
259 while (recv_buffer_.empty()) {
260 Message msg;
261 server_->msg_queue_->Get(&msg);
262 server_->msg_queue_->Dispatch(&msg);
263 }
264 }
265
266 // Return the packet at the front of the queue.
267 Packet* packet = recv_buffer_.front();
268 size_t data_read = _min(cb, packet->size());
269 memcpy(pv, packet->data(), data_read);
270 *paddr = packet->from();
271
272 if (data_read < packet->size()) {
273 packet->Consume(data_read);
274 } else {
275 recv_buffer_.pop_front();
276 delete packet;
277 }
278
279 if (SOCK_STREAM == type_) {
280 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
281 recv_buffer_size_ -= data_read;
282 if (was_full) {
283 VirtualSocket* sender = server_->LookupBinding(remote_addr_);
284 ASSERT(NULL != sender);
285 server_->SendTcp(sender);
286 }
287 }
288
289 return static_cast<int>(data_read);
290 }
291
292 virtual int Listen(int backlog) {
293 ASSERT(SOCK_STREAM == type_);
294 ASSERT(CS_CLOSED == state_);
295 if (local_addr_.IsNil()) {
296 error_ = EINVAL;
297 return -1;
298 }
299 ASSERT(NULL == listen_queue_);
300 listen_queue_ = new ListenQueue;
301 state_ = CS_CONNECTING;
302 return 0;
303 }
304
305 virtual VirtualSocket* Accept(SocketAddress *paddr) {
306 if (NULL == listen_queue_) {
307 error_ = EINVAL;
308 return NULL;
309 }
310 while (!listen_queue_->empty()) {
311 VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_,
312 async_);
313
314 // Set the new local address to the same as this server socket.
315 socket->SetLocalAddress(local_addr_);
316 // Sockets made from a socket that 'was Any' need to inherit that.
317 socket->set_was_any(was_any_);
318 SocketAddress remote_addr(listen_queue_->front());
319 int result = socket->InitiateConnect(remote_addr, false);
320 listen_queue_->pop_front();
321 if (result != 0) {
322 delete socket;
323 continue;
324 }
325 socket->CompleteConnect(remote_addr, false);
326 if (paddr) {
327 *paddr = remote_addr;
328 }
329 return socket;
330 }
331 error_ = EWOULDBLOCK;
332 return NULL;
333 }
334
335 virtual int GetError() const {
336 return error_;
337 }
338
339 virtual void SetError(int error) {
340 error_ = error;
341 }
342
343 virtual ConnState GetState() const {
344 return state_;
345 }
346
347 virtual int GetOption(Option opt, int* value) {
348 OptionsMap::const_iterator it = options_map_.find(opt);
349 if (it == options_map_.end()) {
350 return -1;
351 }
352 *value = it->second;
353 return 0; // 0 is success to emulate getsockopt()
354 }
355
356 virtual int SetOption(Option opt, int value) {
357 options_map_[opt] = value;
358 return 0; // 0 is success to emulate setsockopt()
359 }
360
361 virtual int EstimateMTU(uint16* mtu) {
362 if (CS_CONNECTED != state_)
363 return ENOTCONN;
364 else
365 return 65536;
366 }
367
368 void OnMessage(Message *pmsg) {
369 if (pmsg->message_id == MSG_ID_PACKET) {
370 //ASSERT(!local_addr_.IsAny());
371 ASSERT(NULL != pmsg->pdata);
372 Packet* packet = static_cast<Packet*>(pmsg->pdata);
373
374 recv_buffer_.push_back(packet);
375
376 if (async_) {
377 SignalReadEvent(this);
378 }
379 } else if (pmsg->message_id == MSG_ID_CONNECT) {
380 ASSERT(NULL != pmsg->pdata);
381 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
382 if (listen_queue_ != NULL) {
383 listen_queue_->push_back(data->addr);
384 if (async_) {
385 SignalReadEvent(this);
386 }
387 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
388 CompleteConnect(data->addr, true);
389 } else {
390 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
391 server_->Disconnect(server_->LookupBinding(data->addr));
392 }
393 delete data;
394 } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
395 ASSERT(SOCK_STREAM == type_);
396 if (CS_CLOSED != state_) {
397 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
398 state_ = CS_CLOSED;
399 remote_addr_.Clear();
400 if (async_) {
401 SignalCloseEvent(this, error);
402 }
403 }
404 } else {
405 ASSERT(false);
406 }
407 }
408
409 bool was_any() { return was_any_; }
410 void set_was_any(bool was_any) { was_any_ = was_any; }
411
412 private:
413 struct NetworkEntry {
414 size_t size;
415 uint32 done_time;
416 };
417
418 typedef std::deque<SocketAddress> ListenQueue;
419 typedef std::deque<NetworkEntry> NetworkQueue;
420 typedef std::vector<char> SendBuffer;
421 typedef std::list<Packet*> RecvBuffer;
422 typedef std::map<Option, int> OptionsMap;
423
424 int InitiateConnect(const SocketAddress& addr, bool use_delay) {
425 if (!remote_addr_.IsNil()) {
426 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
427 return -1;
428 }
429 if (local_addr_.IsNil()) {
430 // If there's no local address set, grab a random one in the correct AF.
431 int result = 0;
432 if (addr.ipaddr().family() == AF_INET) {
433 result = Bind(SocketAddress("0.0.0.0", 0));
434 } else if (addr.ipaddr().family() == AF_INET6) {
435 result = Bind(SocketAddress("::", 0));
436 }
437 if (result != 0) {
438 return result;
439 }
440 }
441 if (type_ == SOCK_DGRAM) {
442 remote_addr_ = addr;
443 state_ = CS_CONNECTED;
444 } else {
445 int result = server_->Connect(this, addr, use_delay);
446 if (result != 0) {
447 error_ = EHOSTUNREACH;
448 return -1;
449 }
450 state_ = CS_CONNECTING;
451 }
452 return 0;
453 }
454
455 void CompleteConnect(const SocketAddress& addr, bool notify) {
456 ASSERT(CS_CONNECTING == state_);
457 remote_addr_ = addr;
458 state_ = CS_CONNECTED;
459 server_->AddConnection(remote_addr_, local_addr_, this);
460 if (async_ && notify) {
461 SignalConnectEvent(this);
462 }
463 }
464
465 int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) {
466 // If we have not been assigned a local port, then get one.
467 if (local_addr_.IsNil()) {
468 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
469 int result = server_->Bind(this, &local_addr_);
470 if (result != 0) {
471 local_addr_.Clear();
472 error_ = EADDRINUSE;
473 return result;
474 }
475 }
476
477 // Send the data in a message to the appropriate socket.
478 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
479 }
480
481 int SendTcp(const void* pv, size_t cb) {
482 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
483 if (0 == capacity) {
484 write_enabled_ = true;
485 error_ = EWOULDBLOCK;
486 return -1;
487 }
488 size_t consumed = _min(cb, capacity);
489 const char* cpv = static_cast<const char*>(pv);
490 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
491 server_->SendTcp(this);
492 return static_cast<int>(consumed);
493 }
494
495 VirtualSocketServer* server_;
496 int family_;
497 int type_;
498 bool async_;
499 ConnState state_;
500 int error_;
501 SocketAddress local_addr_;
502 SocketAddress remote_addr_;
503
504 // Pending sockets which can be Accepted
505 ListenQueue* listen_queue_;
506
507 // Data which tcp has buffered for sending
508 SendBuffer send_buffer_;
509 bool write_enabled_;
510
511 // Critical section to protect the recv_buffer and queue_
512 CriticalSection crit_;
513
514 // Network model that enforces bandwidth and capacity constraints
515 NetworkQueue network_;
516 size_t network_size_;
517
518 // Data which has been received from the network
519 RecvBuffer recv_buffer_;
520 // The amount of data which is in flight or in recv_buffer_
521 size_t recv_buffer_size_;
522
523 // Is this socket bound?
524 bool bound_;
525
526 // When we bind a socket to Any, VSS's Bind gives it another address. For
527 // dual-stack sockets, we want to distinguish between sockets that were
528 // explicitly given a particular address and sockets that had one picked
529 // for them by VSS.
530 bool was_any_;
531
532 // Store the options that are set
533 OptionsMap options_map_;
534
535 friend class VirtualSocketServer;
536};
537
538VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
539 : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
540 network_delay_(Time()), next_ipv4_(kInitialNextIPv4),
541 next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort),
542 bindings_(new AddressMap()), connections_(new ConnectionMap()),
543 bandwidth_(0), network_capacity_(kDefaultNetworkCapacity),
544 send_buffer_capacity_(kDefaultTcpBufferSize),
545 recv_buffer_capacity_(kDefaultTcpBufferSize),
546 delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES),
547 delay_dist_(NULL), drop_prob_(0.0) {
548 if (!server_) {
549 server_ = new PhysicalSocketServer();
550 server_owned_ = true;
551 }
552 UpdateDelayDistribution();
553}
554
555VirtualSocketServer::~VirtualSocketServer() {
556 delete bindings_;
557 delete connections_;
558 delete delay_dist_;
559 if (server_owned_) {
560 delete server_;
561 }
562}
563
564IPAddress VirtualSocketServer::GetNextIP(int family) {
565 if (family == AF_INET) {
566 IPAddress next_ip(next_ipv4_);
567 next_ipv4_.s_addr =
568 HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1);
569 return next_ip;
570 } else if (family == AF_INET6) {
571 IPAddress next_ip(next_ipv6_);
572 uint32* as_ints = reinterpret_cast<uint32*>(&next_ipv6_.s6_addr);
573 as_ints[3] += 1;
574 return next_ip;
575 }
576 return IPAddress();
577}
578
579uint16 VirtualSocketServer::GetNextPort() {
580 uint16 port = next_port_;
581 if (next_port_ < kLastEphemeralPort) {
582 ++next_port_;
583 } else {
584 next_port_ = kFirstEphemeralPort;
585 }
586 return port;
587}
588
589Socket* VirtualSocketServer::CreateSocket(int type) {
590 return CreateSocket(AF_INET, type);
591}
592
593Socket* VirtualSocketServer::CreateSocket(int family, int type) {
594 return CreateSocketInternal(family, type);
595}
596
597AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
598 return CreateAsyncSocket(AF_INET, type);
599}
600
601AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
602 return CreateSocketInternal(family, type);
603}
604
605VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
606 return new VirtualSocket(this, family, type, true);
607}
608
609void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
610 msg_queue_ = msg_queue;
611 if (msg_queue_) {
612 msg_queue_->SignalQueueDestroyed.connect(this,
613 &VirtualSocketServer::OnMessageQueueDestroyed);
614 }
615}
616
617bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
618 ASSERT(msg_queue_ == Thread::Current());
619 if (stop_on_idle_ && Thread::Current()->empty()) {
620 return false;
621 }
622 return socketserver()->Wait(cmsWait, process_io);
623}
624
625void VirtualSocketServer::WakeUp() {
626 socketserver()->WakeUp();
627}
628
629bool VirtualSocketServer::ProcessMessagesUntilIdle() {
630 ASSERT(msg_queue_ == Thread::Current());
631 stop_on_idle_ = true;
632 while (!msg_queue_->empty()) {
633 Message msg;
634 if (msg_queue_->Get(&msg, kForever)) {
635 msg_queue_->Dispatch(&msg);
636 }
637 }
638 stop_on_idle_ = false;
639 return !msg_queue_->IsQuitting();
640}
641
642int VirtualSocketServer::Bind(VirtualSocket* socket,
643 const SocketAddress& addr) {
644 ASSERT(NULL != socket);
645 // Address must be completely specified at this point
646 ASSERT(!IPIsUnspec(addr.ipaddr()));
647 ASSERT(addr.port() != 0);
648
649 // Normalize the address (turns v6-mapped addresses into v4-addresses).
650 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
651
652 AddressMap::value_type entry(normalized, socket);
653 return bindings_->insert(entry).second ? 0 : -1;
654}
655
656int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
657 ASSERT(NULL != socket);
658
659 if (IPIsAny(addr->ipaddr())) {
660 addr->SetIP(GetNextIP(addr->ipaddr().family()));
661 } else if (!IPIsUnspec(addr->ipaddr())) {
662 addr->SetIP(addr->ipaddr().Normalized());
663 } else {
664 ASSERT(false);
665 }
666
667 if (addr->port() == 0) {
668 for (int i = 0; i < kEphemeralPortCount; ++i) {
669 addr->SetPort(GetNextPort());
670 if (bindings_->find(*addr) == bindings_->end()) {
671 break;
672 }
673 }
674 }
675
676 return Bind(socket, *addr);
677}
678
679VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
680 SocketAddress normalized(addr.ipaddr().Normalized(),
681 addr.port());
682 AddressMap::iterator it = bindings_->find(normalized);
683 return (bindings_->end() != it) ? it->second : NULL;
684}
685
686int VirtualSocketServer::Unbind(const SocketAddress& addr,
687 VirtualSocket* socket) {
688 SocketAddress normalized(addr.ipaddr().Normalized(),
689 addr.port());
690 ASSERT((*bindings_)[normalized] == socket);
691 bindings_->erase(bindings_->find(normalized));
692 return 0;
693}
694
695void VirtualSocketServer::AddConnection(const SocketAddress& local,
696 const SocketAddress& remote,
697 VirtualSocket* remote_socket) {
698 // Add this socket pair to our routing table. This will allow
699 // multiple clients to connect to the same server address.
700 SocketAddress local_normalized(local.ipaddr().Normalized(),
701 local.port());
702 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
703 remote.port());
704 SocketAddressPair address_pair(local_normalized, remote_normalized);
705 connections_->insert(std::pair<SocketAddressPair,
706 VirtualSocket*>(address_pair, remote_socket));
707}
708
709VirtualSocket* VirtualSocketServer::LookupConnection(
710 const SocketAddress& local,
711 const SocketAddress& remote) {
712 SocketAddress local_normalized(local.ipaddr().Normalized(),
713 local.port());
714 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
715 remote.port());
716 SocketAddressPair address_pair(local_normalized, remote_normalized);
717 ConnectionMap::iterator it = connections_->find(address_pair);
718 return (connections_->end() != it) ? it->second : NULL;
719}
720
721void VirtualSocketServer::RemoveConnection(const SocketAddress& local,
722 const SocketAddress& remote) {
723 SocketAddress local_normalized(local.ipaddr().Normalized(),
724 local.port());
725 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
726 remote.port());
727 SocketAddressPair address_pair(local_normalized, remote_normalized);
728 connections_->erase(address_pair);
729}
730
731static double Random() {
732 return static_cast<double>(rand()) / RAND_MAX;
733}
734
735int VirtualSocketServer::Connect(VirtualSocket* socket,
736 const SocketAddress& remote_addr,
737 bool use_delay) {
738 uint32 delay = use_delay ? GetRandomTransitDelay() : 0;
739 VirtualSocket* remote = LookupBinding(remote_addr);
740 if (!CanInteractWith(socket, remote)) {
741 LOG(LS_INFO) << "Address family mismatch between "
742 << socket->GetLocalAddress() << " and " << remote_addr;
743 return -1;
744 }
745 if (remote != NULL) {
746 SocketAddress addr = socket->GetLocalAddress();
747 msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT,
748 new MessageAddress(addr));
749 } else {
750 LOG(LS_INFO) << "No one listening at " << remote_addr;
751 msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT);
752 }
753 return 0;
754}
755
756bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
757 if (socket) {
758 // Remove the mapping.
759 msg_queue_->Post(socket, MSG_ID_DISCONNECT);
760 return true;
761 }
762 return false;
763}
764
765int VirtualSocketServer::SendUdp(VirtualSocket* socket,
766 const char* data, size_t data_size,
767 const SocketAddress& remote_addr) {
768 // See if we want to drop this packet.
769 if (Random() < drop_prob_) {
770 LOG(LS_VERBOSE) << "Dropping packet: bad luck";
771 return static_cast<int>(data_size);
772 }
773
774 VirtualSocket* recipient = LookupBinding(remote_addr);
775 if (!recipient) {
776 // Make a fake recipient for address family checking.
777 scoped_ptr<VirtualSocket> dummy_socket(
778 CreateSocketInternal(AF_INET, SOCK_DGRAM));
779 dummy_socket->SetLocalAddress(remote_addr);
780 if (!CanInteractWith(socket, dummy_socket.get())) {
781 LOG(LS_VERBOSE) << "Incompatible address families: "
782 << socket->GetLocalAddress() << " and " << remote_addr;
783 return -1;
784 }
785 LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
786 return static_cast<int>(data_size);
787 }
788
789 if (!CanInteractWith(socket, recipient)) {
790 LOG(LS_VERBOSE) << "Incompatible address families: "
791 << socket->GetLocalAddress() << " and " << remote_addr;
792 return -1;
793 }
794
795 CritScope cs(&socket->crit_);
796
797 uint32 cur_time = Time();
798 PurgeNetworkPackets(socket, cur_time);
799
800 // Determine whether we have enough bandwidth to accept this packet. To do
801 // this, we need to update the send queue. Once we know it's current size,
802 // we know whether we can fit this packet.
803 //
804 // NOTE: There are better algorithms for maintaining such a queue (such as
805 // "Derivative Random Drop"); however, this algorithm is a more accurate
806 // simulation of what a normal network would do.
807
808 size_t packet_size = data_size + UDP_HEADER_SIZE;
809 if (socket->network_size_ + packet_size > network_capacity_) {
810 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
811 return static_cast<int>(data_size);
812 }
813
814 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
815 UDP_HEADER_SIZE, false);
816
817 return static_cast<int>(data_size);
818}
819
820void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
821 // TCP can't send more data than will fill up the receiver's buffer.
822 // We track the data that is in the buffer plus data in flight using the
823 // recipient's recv_buffer_size_. Anything beyond that must be stored in the
824 // sender's buffer. We will trigger the buffered data to be sent when data
825 // is read from the recv_buffer.
826
827 // Lookup the local/remote pair in the connections table.
828 VirtualSocket* recipient = LookupConnection(socket->local_addr_,
829 socket->remote_addr_);
830 if (!recipient) {
831 LOG(LS_VERBOSE) << "Sending data to no one.";
832 return;
833 }
834
835 CritScope cs(&socket->crit_);
836
837 uint32 cur_time = Time();
838 PurgeNetworkPackets(socket, cur_time);
839
840 while (true) {
841 size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
842 size_t max_data_size = _min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
843 size_t data_size = _min(socket->send_buffer_.size(), max_data_size);
844 if (0 == data_size)
845 break;
846
847 AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
848 data_size, TCP_HEADER_SIZE, true);
849 recipient->recv_buffer_size_ += data_size;
850
851 size_t new_buffer_size = socket->send_buffer_.size() - data_size;
852 // Avoid undefined access beyond the last element of the vector.
853 // This only happens when new_buffer_size is 0.
854 if (data_size < socket->send_buffer_.size()) {
855 // memmove is required for potentially overlapping source/destination.
856 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
857 new_buffer_size);
858 }
859 socket->send_buffer_.resize(new_buffer_size);
860 }
861
862 if (socket->write_enabled_
863 && (socket->send_buffer_.size() < send_buffer_capacity_)) {
864 socket->write_enabled_ = false;
865 socket->SignalWriteEvent(socket);
866 }
867}
868
869void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
870 VirtualSocket* recipient,
871 uint32 cur_time,
872 const char* data,
873 size_t data_size,
874 size_t header_size,
875 bool ordered) {
876 VirtualSocket::NetworkEntry entry;
877 entry.size = data_size + header_size;
878
879 sender->network_size_ += entry.size;
880 uint32 send_delay = SendDelay(static_cast<uint32>(sender->network_size_));
881 entry.done_time = cur_time + send_delay;
882 sender->network_.push_back(entry);
883
884 // Find the delay for crossing the many virtual hops of the network.
885 uint32 transit_delay = GetRandomTransitDelay();
886
887 // Post the packet as a message to be delivered (on our own thread)
888 Packet* p = new Packet(data, data_size, sender->local_addr_);
889 uint32 ts = TimeAfter(send_delay + transit_delay);
890 if (ordered) {
891 // Ensure that new packets arrive after previous ones
892 // TODO: consider ordering on a per-socket basis, since this
893 // introduces artifical delay.
894 ts = TimeMax(ts, network_delay_);
895 }
896 msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p);
897 network_delay_ = TimeMax(ts, network_delay_);
898}
899
900void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
901 uint32 cur_time) {
902 while (!socket->network_.empty() &&
903 (socket->network_.front().done_time <= cur_time)) {
904 ASSERT(socket->network_size_ >= socket->network_.front().size);
905 socket->network_size_ -= socket->network_.front().size;
906 socket->network_.pop_front();
907 }
908}
909
910uint32 VirtualSocketServer::SendDelay(uint32 size) {
911 if (bandwidth_ == 0)
912 return 0;
913 else
914 return 1000 * size / bandwidth_;
915}
916
917#if 0
918void PrintFunction(std::vector<std::pair<double, double> >* f) {
919 return;
920 double sum = 0;
921 for (uint32 i = 0; i < f->size(); ++i) {
922 std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
923 sum += (*f)[i].second;
924 }
925 if (!f->empty()) {
926 const double mean = sum / f->size();
927 double sum_sq_dev = 0;
928 for (uint32 i = 0; i < f->size(); ++i) {
929 double dev = (*f)[i].second - mean;
930 sum_sq_dev += dev * dev;
931 }
932 std::cout << "Mean = " << mean << " StdDev = "
933 << sqrt(sum_sq_dev / f->size()) << std::endl;
934 }
935}
936#endif // <unused>
937
938void VirtualSocketServer::UpdateDelayDistribution() {
939 Function* dist = CreateDistribution(delay_mean_, delay_stddev_,
940 delay_samples_);
941 // We take a lock just to make sure we don't leak memory.
942 {
943 CritScope cs(&delay_crit_);
944 delete delay_dist_;
945 delay_dist_ = dist;
946 }
947}
948
949static double PI = 4 * atan(1.0);
950
951static double Normal(double x, double mean, double stddev) {
952 double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
953 return exp(-a) / (stddev * sqrt(2 * PI));
954}
955
956#if 0 // static unused gives a warning
957static double Pareto(double x, double min, double k) {
958 if (x < min)
959 return 0;
960 else
961 return k * std::pow(min, k) / std::pow(x, k+1);
962}
963#endif
964
965VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution(
966 uint32 mean, uint32 stddev, uint32 samples) {
967 Function* f = new Function();
968
969 if (0 == stddev) {
970 f->push_back(Point(mean, 1.0));
971 } else {
972 double start = 0;
973 if (mean >= 4 * static_cast<double>(stddev))
974 start = mean - 4 * static_cast<double>(stddev);
975 double end = mean + 4 * static_cast<double>(stddev);
976
977 for (uint32 i = 0; i < samples; i++) {
978 double x = start + (end - start) * i / (samples - 1);
979 double y = Normal(x, mean, stddev);
980 f->push_back(Point(x, y));
981 }
982 }
983 return Resample(Invert(Accumulate(f)), 0, 1, samples);
984}
985
986uint32 VirtualSocketServer::GetRandomTransitDelay() {
987 size_t index = rand() % delay_dist_->size();
988 double delay = (*delay_dist_)[index].second;
989 //LOG_F(LS_INFO) << "random[" << index << "] = " << delay;
990 return static_cast<uint32>(delay);
991}
992
993struct FunctionDomainCmp {
994 bool operator()(const VirtualSocketServer::Point& p1,
995 const VirtualSocketServer::Point& p2) {
996 return p1.first < p2.first;
997 }
998 bool operator()(double v1, const VirtualSocketServer::Point& p2) {
999 return v1 < p2.first;
1000 }
1001 bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1002 return p1.first < v2;
1003 }
1004};
1005
1006VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1007 ASSERT(f->size() >= 1);
1008 double v = 0;
1009 for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1010 double dx = (*f)[i + 1].first - (*f)[i].first;
1011 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1012 (*f)[i].second = v;
1013 v = v + dx * avgy;
1014 }
1015 (*f)[f->size()-1].second = v;
1016 return f;
1017}
1018
1019VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
1020 for (Function::size_type i = 0; i < f->size(); ++i)
1021 std::swap((*f)[i].first, (*f)[i].second);
1022
1023 std::sort(f->begin(), f->end(), FunctionDomainCmp());
1024 return f;
1025}
1026
1027VirtualSocketServer::Function* VirtualSocketServer::Resample(
1028 Function* f, double x1, double x2, uint32 samples) {
1029 Function* g = new Function();
1030
1031 for (size_t i = 0; i < samples; i++) {
1032 double x = x1 + (x2 - x1) * i / (samples - 1);
1033 double y = Evaluate(f, x);
1034 g->push_back(Point(x, y));
1035 }
1036
1037 delete f;
1038 return g;
1039}
1040
1041double VirtualSocketServer::Evaluate(Function* f, double x) {
1042 Function::iterator iter =
1043 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1044 if (iter == f->begin()) {
1045 return (*f)[0].second;
1046 } else if (iter == f->end()) {
1047 ASSERT(f->size() >= 1);
1048 return (*f)[f->size() - 1].second;
1049 } else if (iter->first == x) {
1050 return iter->second;
1051 } else {
1052 double x1 = (iter - 1)->first;
1053 double y1 = (iter - 1)->second;
1054 double x2 = iter->first;
1055 double y2 = iter->second;
1056 return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1057 }
1058}
1059
1060bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
1061 VirtualSocket* remote) {
1062 if (!local || !remote) {
1063 return false;
1064 }
1065 IPAddress local_ip = local->GetLocalAddress().ipaddr();
1066 IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
1067 IPAddress local_normalized = local_ip.Normalized();
1068 IPAddress remote_normalized = remote_ip.Normalized();
1069 // Check if the addresses are the same family after Normalization (turns
1070 // mapped IPv6 address into IPv4 addresses).
1071 // This will stop unmapped V6 addresses from talking to mapped V6 addresses.
1072 if (local_normalized.family() == remote_normalized.family()) {
1073 return true;
1074 }
1075
1076 // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY.
1077 int remote_v6_only = 0;
1078 remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only);
1079 if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) {
1080 return true;
1081 }
1082 // Same check, backwards.
1083 int local_v6_only = 0;
1084 local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only);
1085 if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) {
1086 return true;
1087 }
1088
1089 // Check to see if either socket was explicitly bound to IPv6-any.
1090 // These sockets can talk with anyone.
1091 if (local_ip.family() == AF_INET6 && local->was_any()) {
1092 return true;
1093 }
1094 if (remote_ip.family() == AF_INET6 && remote->was_any()) {
1095 return true;
1096 }
1097
1098 return false;
1099}
1100
1101} // namespace rtc