blob: 6943ae0f4ea8f4fe754359fdad148623f18c1367 [file] [log] [blame]
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -08001#include <webrtc/RTPSocketHandler.h>
2
3#include <webrtc/MyWebSocketHandler.h>
4#include <webrtc/STUNMessage.h>
Jorge E. Moreira2a6ab082019-12-11 18:41:58 -08005#include <Utils.h>
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -08006
7#include <https/PlainSocket.h>
8#include <https/SafeCallbackable.h>
9#include <https/Support.h>
Jorge E. Moreira2a6ab082019-12-11 18:41:58 -080010#include <android-base/logging.h>
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -080011
12#include <netdb.h>
13#include <netinet/in.h>
14
15#include <cstring>
16#include <iostream>
17#include <set>
18
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -080019#include <gflags/gflags.h>
20
21DECLARE_string(public_ip);
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -080022
23static socklen_t getSockAddrLen(const sockaddr_storage &addr) {
24 switch (addr.ss_family) {
25 case AF_INET:
26 return sizeof(sockaddr_in);
27 case AF_INET6:
28 return sizeof(sockaddr_in6);
29 default:
30 CHECK(!"Should not be here.");
31 return 0;
32 }
33}
34
35RTPSocketHandler::RTPSocketHandler(
36 std::shared_ptr<RunLoop> runLoop,
37 std::shared_ptr<ServerState> serverState,
38 int domain,
39 uint16_t port,
40 uint32_t trackMask,
41 std::shared_ptr<RTPSession> session)
42 : mRunLoop(runLoop),
43 mServerState(serverState),
44 mLocalPort(port),
45 mTrackMask(trackMask),
46 mSession(session),
47 mSendPending(false),
48 mDTLSConnected(false) {
49 int sock = socket(domain, SOCK_DGRAM, 0);
50
51 makeFdNonblocking(sock);
52 mSocket = std::make_shared<PlainSocket>(mRunLoop, sock);
53
54 sockaddr_storage addr;
55
56 if (domain == PF_INET) {
57 sockaddr_in addrV4;
58 memset(addrV4.sin_zero, 0, sizeof(addrV4.sin_zero));
59 addrV4.sin_family = AF_INET;
60 addrV4.sin_port = htons(port);
61 addrV4.sin_addr.s_addr = INADDR_ANY;
62 memcpy(&addr, &addrV4, sizeof(addrV4));
63 } else {
64 CHECK_EQ(domain, PF_INET6);
65
66 sockaddr_in6 addrV6;
67 addrV6.sin6_family = AF_INET6;
68 addrV6.sin6_port = htons(port);
69 addrV6.sin6_addr = in6addr_any;
70 addrV6.sin6_scope_id = 0;
71 memcpy(&addr, &addrV6, sizeof(addrV6));
72 }
73
74 int res = bind(
75 sock,
76 reinterpret_cast<const sockaddr *>(&addr),
77 getSockAddrLen(addr));
78
79 CHECK(!res);
80
81 auto videoPacketizer =
82 (trackMask & TRACK_VIDEO)
83 ? mServerState->getVideoPacketizer() : nullptr;
84
85 auto audioPacketizer =
86 (trackMask & TRACK_AUDIO)
87 ? mServerState->getAudioPacketizer() : nullptr;
88
89 mRTPSender = std::make_shared<RTPSender>(
90 mRunLoop,
91 this,
92 videoPacketizer,
93 audioPacketizer);
94
95 if (trackMask & TRACK_VIDEO) {
96 mRTPSender->addSource(0xdeadbeef);
97 mRTPSender->addSource(0xcafeb0b0);
98
99 mRTPSender->addRetransInfo(0xdeadbeef, 96, 0xcafeb0b0, 97);
100
101 videoPacketizer->addSender(mRTPSender);
102 }
103
104 if (trackMask & TRACK_AUDIO) {
105 mRTPSender->addSource(0x8badf00d);
106
107 audioPacketizer->addSender(mRTPSender);
108 }
109}
110
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800111uint16_t RTPSocketHandler::getLocalPort() const {
112 return mLocalPort;
113}
114
115std::string RTPSocketHandler::getLocalUFrag() const {
116 return mSession->localUFrag();
117}
118
119std::string RTPSocketHandler::getLocalIPString() const {
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800120 return FLAGS_public_ip;
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800121}
122
123void RTPSocketHandler::run() {
124 mSocket->postRecv(makeSafeCallback(this, &RTPSocketHandler::onReceive));
125}
126
127void RTPSocketHandler::onReceive() {
128 std::vector<uint8_t> buffer(kMaxUDPPayloadSize);
129
130 uint8_t *data = buffer.data();
131
132 sockaddr_storage addr;
133 socklen_t addrLen = sizeof(addr);
134
135 auto n = mSocket->recvfrom(
136 data, buffer.size(), reinterpret_cast<sockaddr *>(&addr), &addrLen);
137
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800138 STUNMessage msg(data, n);
139 if (!msg.isValid()) {
140 if (mDTLSConnected) {
141 int err = -EINVAL;
142 if (mRTPSender) {
143 err = onSRTPReceive(data, static_cast<size_t>(n));
144 }
145
146 if (err == -EINVAL) {
147 LOG(VERBOSE) << "Sending to DTLS instead:";
148 // hexdump(data, n);
149
150 onDTLSReceive(data, static_cast<size_t>(n));
151
152 if (mTrackMask & TRACK_DATA) {
153 ssize_t n;
154
155 do {
156 uint8_t buf[kMaxUDPPayloadSize];
157 n = mDTLS->readApplicationData(buf, sizeof(buf));
158
159 if (n > 0) {
160 auto err = mSCTPHandler->inject(
161 buf, static_cast<size_t>(n));
162
163 if (err) {
164 LOG(WARNING)
165 << "SCTPHandler::inject returned error "
166 << err;
167 }
168 }
169 } while (n > 0);
170 }
171 }
172 } else {
173 onDTLSReceive(data, static_cast<size_t>(n));
174 }
175
176 run();
177 return;
178 }
179
180 if (msg.type() == 0x0001 /* Binding Request */) {
181 STUNMessage response(0x0101 /* Binding Response */, msg.data() + 8);
182
183 if (!matchesSession(msg)) {
184 LOG(WARNING) << "Unknown session or no USERNAME.";
185 run();
186 return;
187 }
188
189 const auto &answerPassword = mSession->localPassword();
190
191 // msg.dump(answerPassword);
192
193 if (addr.ss_family == AF_INET) {
194 uint8_t attr[8];
195 attr[0] = 0x00;
196
197 sockaddr_in addrV4;
198 CHECK_EQ(addrLen, sizeof(addrV4));
199
200 memcpy(&addrV4, &addr, addrLen);
201
202 attr[1] = 0x01; // IPv4
203
204 static constexpr uint32_t kMagicCookie = 0x2112a442;
205
206 uint16_t portHost = ntohs(addrV4.sin_port);
207 portHost ^= (kMagicCookie >> 16);
208
209 uint32_t ipHost = ntohl(addrV4.sin_addr.s_addr);
210 ipHost ^= kMagicCookie;
211
212 attr[2] = portHost >> 8;
213 attr[3] = portHost & 0xff;
214 attr[4] = ipHost >> 24;
215 attr[5] = (ipHost >> 16) & 0xff;
216 attr[6] = (ipHost >> 8) & 0xff;
217 attr[7] = ipHost & 0xff;
218
219 response.addAttribute(
220 0x0020 /* XOR-MAPPED-ADDRESS */, attr, sizeof(attr));
221 } else {
222 uint8_t attr[20];
223 attr[0] = 0x00;
224
225 CHECK_EQ(addr.ss_family, AF_INET6);
226
227 sockaddr_in6 addrV6;
228 CHECK_EQ(addrLen, sizeof(addrV6));
229
230 memcpy(&addrV6, &addr, addrLen);
231
232 attr[1] = 0x02; // IPv6
233
234 static constexpr uint32_t kMagicCookie = 0x2112a442;
235
236 uint16_t portHost = ntohs(addrV6.sin6_port);
237 portHost ^= (kMagicCookie >> 16);
238
239 attr[2] = portHost >> 8;
240 attr[3] = portHost & 0xff;
241
242 uint8_t ipHost[16];
243
244 std::string out;
245
246 for (size_t i = 0; i < 16; ++i) {
247 ipHost[i] = addrV6.sin6_addr.s6_addr[15 - i];
248
249 if (!out.empty()) {
250 out += ":";
251 }
Jorge E. Moreira2a6ab082019-12-11 18:41:58 -0800252 out += StringPrintf("%02x", ipHost[i]);
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800253
254 ipHost[i] ^= response.data()[4 + i];
255 }
256
257 // LOG(INFO) << "IP6 = " << out;
258
259 for (size_t i = 0; i < 16; ++i) {
260 attr[4 + i] = ipHost[15 - i];
261 }
262
263 response.addAttribute(
264 0x0020 /* XOR-MAPPED-ADDRESS */, attr, sizeof(attr));
265 }
266
267 response.addMessageIntegrityAttribute(answerPassword);
268 response.addFingerprint();
269
270 // response.dump(answerPassword);
271
272 auto res =
273 mSocket->sendto(
274 response.data(),
275 response.size(),
276 reinterpret_cast<const sockaddr *>(&addr),
277 addrLen);
278
279 CHECK_GT(res, 0);
280 CHECK_EQ(static_cast<size_t>(res), response.size());
281
282 if (!mSession->isActive()) {
283 mSession->setRemoteAddress(addr);
284
285 mSession->setIsActive();
286
287 mSession->schedulePing(
288 mRunLoop,
289 makeSafeCallback(
290 this, &RTPSocketHandler::pingRemote, mSession),
291 std::chrono::seconds(0));
292 }
293
294 } else {
295 // msg.dump();
296
297 if (msg.type() == 0x0101 && !mDTLS) {
298 mDTLS = std::make_shared<DTLS>(
299 shared_from_this(),
300 DTLS::Mode::ACCEPT,
301 mSession->localCertificate(),
302 mSession->localKey(),
303 mSession->remoteFingerprint(),
304 (mTrackMask != TRACK_DATA) /* useSRTP */);
305
306 mDTLS->connect(mSession->remoteAddress());
307 }
308 }
309
310 run();
311}
312
313bool RTPSocketHandler::matchesSession(const STUNMessage &msg) const {
314 const void *attrData;
315 size_t attrSize;
316 if (!msg.findAttribute(0x0006 /* USERNAME */, &attrData, &attrSize)) {
317 return false;
318 }
319
320 std::string uFragPair(static_cast<const char *>(attrData), attrSize);
321 auto colonPos = uFragPair.find(':');
322
323 if (colonPos == std::string::npos) {
324 return false;
325 }
326
327 std::string localUFrag(uFragPair, 0, colonPos);
328 std::string remoteUFrag(uFragPair, colonPos + 1);
329
330 if (mSession->localUFrag() != localUFrag
331 || mSession->remoteUFrag() != remoteUFrag) {
332
333 LOG(WARNING)
334 << "Unable to find session localUFrag='"
335 << localUFrag
336 << "', remoteUFrag='"
337 << remoteUFrag
338 << "'";
339
340 return false;
341 }
342
343 return true;
344}
345
346void RTPSocketHandler::pingRemote(std::shared_ptr<RTPSession> session) {
347 std::vector<uint8_t> transactionID { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
348
349 STUNMessage msg(
350 0x0001 /* Binding Request */,
351 transactionID.data());
352
353 std::string uFragPair =
354 session->remoteUFrag() + ":" + session->localUFrag();
355
356 msg.addAttribute(
357 0x0006 /* USERNAME */,
358 uFragPair.c_str(),
359 uFragPair.size());
360
361 uint64_t tieBreaker = 0xdeadbeefcafeb0b0; // XXX
362 msg.addAttribute(
363 0x802a /* ICE-CONTROLLING */,
364 &tieBreaker,
365 sizeof(tieBreaker));
366
367 uint32_t priority = 0xdeadbeef;
368 msg.addAttribute(
369 0x0024 /* PRIORITY */, &priority, sizeof(priority));
370
371 // We're the controlling agent and including the "USE-CANDIDATE" attribute
372 // below nominates this candidate.
373 msg.addAttribute(0x0025 /* USE_CANDIDATE */);
374
375 msg.addMessageIntegrityAttribute(session->remotePassword());
376 msg.addFingerprint();
377
378 queueDatagram(session->remoteAddress(), msg.data(), msg.size());
379
380 session->schedulePing(
381 mRunLoop,
382 makeSafeCallback(this, &RTPSocketHandler::pingRemote, session),
383 std::chrono::seconds(1));
384}
385
386RTPSocketHandler::Datagram::Datagram(
387 const sockaddr_storage &addr, const void *data, size_t size)
388 : mData(size),
389 mAddr(addr) {
390 memcpy(mData.data(), data, size);
391}
392
393const void *RTPSocketHandler::Datagram::data() const {
394 return mData.data();
395}
396
397size_t RTPSocketHandler::Datagram::size() const {
398 return mData.size();
399}
400
401const sockaddr_storage &RTPSocketHandler::Datagram::remoteAddress() const {
402 return mAddr;
403}
404
405void RTPSocketHandler::queueDatagram(
406 const sockaddr_storage &addr, const void *data, size_t size) {
407 auto datagram = std::make_shared<Datagram>(addr, data, size);
408
409 CHECK_LE(size, RTPSocketHandler::kMaxUDPPayloadSize);
410
411 mRunLoop->post(
412 makeSafeCallback<RTPSocketHandler>(
413 this,
414 [datagram](RTPSocketHandler *me) {
415 me->mOutQueue.push_back(datagram);
416
417 if (!me->mSendPending) {
418 me->scheduleDrainOutQueue();
419 }
420 }));
421}
422
423void RTPSocketHandler::scheduleDrainOutQueue() {
424 CHECK(!mSendPending);
425
426 mSendPending = true;
427 mSocket->postSend(
428 makeSafeCallback(
429 this, &RTPSocketHandler::drainOutQueue));
430}
431
432void RTPSocketHandler::drainOutQueue() {
433 mSendPending = false;
434
435 CHECK(!mOutQueue.empty());
436
437 do {
438 auto datagram = mOutQueue.front();
439
440 ssize_t n;
441 do {
442 const sockaddr_storage &remoteAddr = datagram->remoteAddress();
443
444 n = mSocket->sendto(
445 datagram->data(),
446 datagram->size(),
447 reinterpret_cast<const sockaddr *>(&remoteAddr),
448 getSockAddrLen(remoteAddr));
449 } while (n < 0 && errno == EINTR);
450
451 if (n < 0) {
452 if (errno == EAGAIN || errno == EWOULDBLOCK) {
453 break;
454 }
455
456 CHECK(!"Should not be here");
457 }
458
459 mOutQueue.pop_front();
460
461 } while (!mOutQueue.empty());
462
463 if (!mOutQueue.empty()) {
464 scheduleDrainOutQueue();
465 }
466}
467
468void RTPSocketHandler::onDTLSReceive(const uint8_t *data, size_t size) {
469 if (mDTLS) {
470 mDTLS->inject(data, size);
471 }
472}
473
474void RTPSocketHandler::notifyDTLSConnected() {
475 LOG(INFO) << "TDLS says that it's now connected.";
476
477 mDTLSConnected = true;
478
479 if (mTrackMask & TRACK_DATA) {
480 mSCTPHandler = std::make_shared<SCTPHandler>(mRunLoop, mDTLS);
481 mSCTPHandler->run();
482 }
483
484 mRTPSender->run();
485}
486
487int RTPSocketHandler::onSRTPReceive(uint8_t *data, size_t size) {
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800488 if (size < 2) {
489 return -EINVAL;
490 }
491
492 auto version = data[0] >> 6;
493 if (version != 2) {
494 return -EINVAL;
495 }
496
497 auto outSize = mDTLS->unprotect(data, size, false /* isRTP */);
498
Jorge E. Moreiraa18ff1a2019-12-17 18:20:56 -0800499 auto err = mRTPSender->injectRTCP(data, outSize);
500 if (err) {
501 LOG(WARNING) << "RTPSender::injectRTCP returned " << err;
502 }
503
504 return err;
505}
506
507void RTPSocketHandler::queueRTCPDatagram(const void *data, size_t size) {
508 if (!mDTLSConnected) {
509 return;
510 }
511
512 std::vector<uint8_t> copy(size + SRTP_MAX_TRAILER_LEN);
513 memcpy(copy.data(), data, size);
514
515 auto outSize = mDTLS->protect(copy.data(), size, false /* isRTP */);
516 CHECK_LE(outSize, copy.size());
517
518 queueDatagram(mSession->remoteAddress(), copy.data(), outSize);
519}
520
521void RTPSocketHandler::queueRTPDatagram(const void *data, size_t size) {
522 if (!mDTLSConnected) {
523 return;
524 }
525
526 std::vector<uint8_t> copy(size + SRTP_MAX_TRAILER_LEN);
527 memcpy(copy.data(), data, size);
528
529 auto outSize = mDTLS->protect(copy.data(), size, true /* isRTP */);
530 CHECK_LE(outSize, copy.size());
531
532 queueDatagram(mSession->remoteAddress(), copy.data(), outSize);
533}