blob: 853ea14ddec56faada739b4939b47e6d2456e0fb [file] [log] [blame]
Andreas Huber7a747b82010-06-07 15:19:40 -07001/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Andreas Huber6e3fa442010-09-21 13:13:15 -070017//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTPConnection"
19#include <utils/Log.h>
20
Andreas Huber7a747b82010-06-07 15:19:40 -070021#include "ARTPConnection.h"
22
23#include "ARTPSource.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/AMessage.h>
29#include <media/stagefright/foundation/AString.h>
Andreas Huber57648e42010-08-04 10:14:30 -070030#include <media/stagefright/foundation/hexdump.h>
Andreas Huber7a747b82010-06-07 15:19:40 -070031
32#include <arpa/inet.h>
33#include <sys/socket.h>
34
Andreas Huber7a747b82010-06-07 15:19:40 -070035namespace android {
36
Andreas Huber57648e42010-08-04 10:14:30 -070037static const size_t kMaxUDPSize = 1500;
38
Andreas Huber7a747b82010-06-07 15:19:40 -070039static uint16_t u16at(const uint8_t *data) {
40 return data[0] << 8 | data[1];
41}
42
43static uint32_t u32at(const uint8_t *data) {
44 return u16at(data) << 16 | u16at(&data[2]);
45}
46
47static uint64_t u64at(const uint8_t *data) {
48 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
49}
50
51// static
52const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
53
54struct ARTPConnection::StreamInfo {
55 int mRTPSocket;
56 int mRTCPSocket;
57 sp<ASessionDescription> mSessionDesc;
58 size_t mIndex;
59 sp<AMessage> mNotifyMsg;
Andreas Huber57648e42010-08-04 10:14:30 -070060 KeyedVector<uint32_t, sp<ARTPSource> > mSources;
61
Andreas Hubercc5fb1d2010-10-13 12:15:03 -070062 int64_t mNumRTCPPacketsReceived;
63 int64_t mNumRTPPacketsReceived;
Andreas Huber57648e42010-08-04 10:14:30 -070064 struct sockaddr_in mRemoteRTCPAddr;
Andreas Huber0416da72010-08-26 11:17:32 -070065
66 bool mIsInjected;
Andreas Huber7a747b82010-06-07 15:19:40 -070067};
68
Andreas Huberf88f8442010-08-10 11:18:36 -070069ARTPConnection::ARTPConnection(uint32_t flags)
70 : mFlags(flags),
71 mPollEventPending(false),
Andreas Huber57648e42010-08-04 10:14:30 -070072 mLastReceiverReportTimeUs(-1) {
Andreas Huber7a747b82010-06-07 15:19:40 -070073}
74
75ARTPConnection::~ARTPConnection() {
76}
77
78void ARTPConnection::addStream(
79 int rtpSocket, int rtcpSocket,
80 const sp<ASessionDescription> &sessionDesc,
81 size_t index,
Andreas Huber0416da72010-08-26 11:17:32 -070082 const sp<AMessage> &notify,
83 bool injected) {
Andreas Huber7a747b82010-06-07 15:19:40 -070084 sp<AMessage> msg = new AMessage(kWhatAddStream, id());
85 msg->setInt32("rtp-socket", rtpSocket);
86 msg->setInt32("rtcp-socket", rtcpSocket);
87 msg->setObject("session-desc", sessionDesc);
88 msg->setSize("index", index);
89 msg->setMessage("notify", notify);
Andreas Huber0416da72010-08-26 11:17:32 -070090 msg->setInt32("injected", injected);
Andreas Huber7a747b82010-06-07 15:19:40 -070091 msg->post();
92}
93
94void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
95 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
96 msg->setInt32("rtp-socket", rtpSocket);
97 msg->setInt32("rtcp-socket", rtcpSocket);
98 msg->post();
99}
100
101static void bumpSocketBufferSize(int s) {
102 int size = 256 * 1024;
103 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
104}
105
106// static
107void ARTPConnection::MakePortPair(
108 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
109 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
110 CHECK_GE(*rtpSocket, 0);
111
112 bumpSocketBufferSize(*rtpSocket);
113
114 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
115 CHECK_GE(*rtcpSocket, 0);
116
117 bumpSocketBufferSize(*rtcpSocket);
118
119 unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
120 start &= ~1;
121
122 for (unsigned port = start; port < 65536; port += 2) {
123 struct sockaddr_in addr;
124 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
125 addr.sin_family = AF_INET;
Andreas Huber04072692011-02-15 10:39:48 -0800126 addr.sin_addr.s_addr = htonl(INADDR_ANY);
Andreas Huber7a747b82010-06-07 15:19:40 -0700127 addr.sin_port = htons(port);
128
129 if (bind(*rtpSocket,
130 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
131 continue;
132 }
133
134 addr.sin_port = htons(port + 1);
135
136 if (bind(*rtcpSocket,
137 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
138 *rtpPort = port;
139 return;
140 }
141 }
142
143 TRESPASS();
144}
145
146void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
147 switch (msg->what()) {
148 case kWhatAddStream:
149 {
150 onAddStream(msg);
151 break;
152 }
153
154 case kWhatRemoveStream:
155 {
156 onRemoveStream(msg);
157 break;
158 }
159
160 case kWhatPollStreams:
161 {
162 onPollStreams();
163 break;
164 }
165
Andreas Huber0416da72010-08-26 11:17:32 -0700166 case kWhatInjectPacket:
167 {
168 onInjectPacket(msg);
169 break;
170 }
171
Andreas Huber7a747b82010-06-07 15:19:40 -0700172 default:
173 {
174 TRESPASS();
175 break;
176 }
177 }
178}
179
180void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
181 mStreams.push_back(StreamInfo());
182 StreamInfo *info = &*--mStreams.end();
183
184 int32_t s;
185 CHECK(msg->findInt32("rtp-socket", &s));
186 info->mRTPSocket = s;
187 CHECK(msg->findInt32("rtcp-socket", &s));
188 info->mRTCPSocket = s;
189
Andreas Huber0416da72010-08-26 11:17:32 -0700190 int32_t injected;
191 CHECK(msg->findInt32("injected", &injected));
192
193 info->mIsInjected = injected;
194
Andreas Huber7a747b82010-06-07 15:19:40 -0700195 sp<RefBase> obj;
196 CHECK(msg->findObject("session-desc", &obj));
197 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
198
199 CHECK(msg->findSize("index", &info->mIndex));
200 CHECK(msg->findMessage("notify", &info->mNotifyMsg));
201
Andreas Huber57648e42010-08-04 10:14:30 -0700202 info->mNumRTCPPacketsReceived = 0;
Andreas Hubercc5fb1d2010-10-13 12:15:03 -0700203 info->mNumRTPPacketsReceived = 0;
Andreas Huber57648e42010-08-04 10:14:30 -0700204 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
205
Andreas Huber0416da72010-08-26 11:17:32 -0700206 if (!injected) {
207 postPollEvent();
208 }
Andreas Huber7a747b82010-06-07 15:19:40 -0700209}
210
211void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
212 int32_t rtpSocket, rtcpSocket;
213 CHECK(msg->findInt32("rtp-socket", &rtpSocket));
214 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
215
216 List<StreamInfo>::iterator it = mStreams.begin();
217 while (it != mStreams.end()
218 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
219 ++it;
220 }
221
222 if (it == mStreams.end()) {
Andreas Huber05079be2011-11-09 14:26:43 -0800223 return;
Andreas Huber7a747b82010-06-07 15:19:40 -0700224 }
225
226 mStreams.erase(it);
227}
228
229void ARTPConnection::postPollEvent() {
230 if (mPollEventPending) {
231 return;
232 }
233
234 sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
235 msg->post();
236
237 mPollEventPending = true;
238}
239
240void ARTPConnection::onPollStreams() {
241 mPollEventPending = false;
242
243 if (mStreams.empty()) {
244 return;
245 }
246
247 struct timeval tv;
248 tv.tv_sec = 0;
249 tv.tv_usec = kSelectTimeoutUs;
250
251 fd_set rs;
252 FD_ZERO(&rs);
253
254 int maxSocket = -1;
255 for (List<StreamInfo>::iterator it = mStreams.begin();
256 it != mStreams.end(); ++it) {
Andreas Huber0416da72010-08-26 11:17:32 -0700257 if ((*it).mIsInjected) {
258 continue;
259 }
260
Andreas Huber7a747b82010-06-07 15:19:40 -0700261 FD_SET(it->mRTPSocket, &rs);
262 FD_SET(it->mRTCPSocket, &rs);
263
264 if (it->mRTPSocket > maxSocket) {
265 maxSocket = it->mRTPSocket;
266 }
267 if (it->mRTCPSocket > maxSocket) {
268 maxSocket = it->mRTCPSocket;
269 }
270 }
271
Andreas Huberf88ca7a02010-08-30 15:25:35 -0700272 if (maxSocket == -1) {
273 return;
274 }
275
Andreas Huber7a747b82010-06-07 15:19:40 -0700276 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
Andreas Huber7a747b82010-06-07 15:19:40 -0700277
278 if (res > 0) {
Andreas Huber05079be2011-11-09 14:26:43 -0800279 List<StreamInfo>::iterator it = mStreams.begin();
280 while (it != mStreams.end()) {
Andreas Huber0416da72010-08-26 11:17:32 -0700281 if ((*it).mIsInjected) {
Andreas Huber05079be2011-11-09 14:26:43 -0800282 ++it;
Andreas Huber0416da72010-08-26 11:17:32 -0700283 continue;
284 }
285
Andreas Huber05079be2011-11-09 14:26:43 -0800286 status_t err = OK;
Andreas Huber7a747b82010-06-07 15:19:40 -0700287 if (FD_ISSET(it->mRTPSocket, &rs)) {
Andreas Huber05079be2011-11-09 14:26:43 -0800288 err = receive(&*it, true);
Andreas Huber7a747b82010-06-07 15:19:40 -0700289 }
Andreas Huber05079be2011-11-09 14:26:43 -0800290 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
291 err = receive(&*it, false);
Andreas Huber7a747b82010-06-07 15:19:40 -0700292 }
Andreas Huber05079be2011-11-09 14:26:43 -0800293
294 if (err == -ECONNRESET) {
295 // socket failure, this stream is dead, Jim.
296
297 LOGW("failed to receive RTP/RTCP datagram.");
298 it = mStreams.erase(it);
299 continue;
300 }
301
302 ++it;
Andreas Huber7a747b82010-06-07 15:19:40 -0700303 }
304 }
305
Andreas Huber57648e42010-08-04 10:14:30 -0700306 int64_t nowUs = ALooper::GetNowUs();
307 if (mLastReceiverReportTimeUs <= 0
308 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
309 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
Andreas Huber05079be2011-11-09 14:26:43 -0800310 List<StreamInfo>::iterator it = mStreams.begin();
311 while (it != mStreams.end()) {
Andreas Huber57648e42010-08-04 10:14:30 -0700312 StreamInfo *s = &*it;
313
Andreas Huberf88ca7a02010-08-30 15:25:35 -0700314 if (s->mIsInjected) {
Andreas Huber05079be2011-11-09 14:26:43 -0800315 ++it;
Andreas Huberf88ca7a02010-08-30 15:25:35 -0700316 continue;
317 }
318
Andreas Huber57648e42010-08-04 10:14:30 -0700319 if (s->mNumRTCPPacketsReceived == 0) {
320 // We have never received any RTCP packets on this stream,
321 // we don't even know where to send a report.
Andreas Huber05079be2011-11-09 14:26:43 -0800322 ++it;
Andreas Huber57648e42010-08-04 10:14:30 -0700323 continue;
324 }
325
326 buffer->setRange(0, 0);
327
328 for (size_t i = 0; i < s->mSources.size(); ++i) {
329 sp<ARTPSource> source = s->mSources.valueAt(i);
330
331 source->addReceiverReport(buffer);
Andreas Huberf88f8442010-08-10 11:18:36 -0700332
333 if (mFlags & kRegularlyRequestFIR) {
334 source->addFIR(buffer);
335 }
Andreas Huber57648e42010-08-04 10:14:30 -0700336 }
337
338 if (buffer->size() > 0) {
Steve Block71f2cf12011-10-20 11:56:00 +0100339 ALOGV("Sending RR...");
Andreas Huber57648e42010-08-04 10:14:30 -0700340
Andreas Huber05079be2011-11-09 14:26:43 -0800341 ssize_t n;
342 do {
343 n = sendto(
Andreas Huber57648e42010-08-04 10:14:30 -0700344 s->mRTCPSocket, buffer->data(), buffer->size(), 0,
345 (const struct sockaddr *)&s->mRemoteRTCPAddr,
346 sizeof(s->mRemoteRTCPAddr));
Andreas Huber05079be2011-11-09 14:26:43 -0800347 } while (n < 0 && errno == EINTR);
348
349 if (n <= 0) {
350 LOGW("failed to send RTCP receiver report (%s).",
351 n == 0 ? "connection gone" : strerror(errno));
352
353 it = mStreams.erase(it);
354 continue;
355 }
356
Andreas Huber57648e42010-08-04 10:14:30 -0700357 CHECK_EQ(n, (ssize_t)buffer->size());
358
359 mLastReceiverReportTimeUs = nowUs;
360 }
Andreas Huber05079be2011-11-09 14:26:43 -0800361
362 ++it;
Andreas Huber57648e42010-08-04 10:14:30 -0700363 }
364 }
Andreas Huber05079be2011-11-09 14:26:43 -0800365
366 if (!mStreams.empty()) {
367 postPollEvent();
368 }
Andreas Huber7a747b82010-06-07 15:19:40 -0700369}
370
371status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
Steve Block71f2cf12011-10-20 11:56:00 +0100372 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
Andreas Huber04072692011-02-15 10:39:48 -0800373
Andreas Huber0416da72010-08-26 11:17:32 -0700374 CHECK(!s->mIsInjected);
375
Andreas Huber7a747b82010-06-07 15:19:40 -0700376 sp<ABuffer> buffer = new ABuffer(65536);
377
Andreas Huber57648e42010-08-04 10:14:30 -0700378 socklen_t remoteAddrLen =
379 (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
380 ? sizeof(s->mRemoteRTCPAddr) : 0;
Andreas Huber7a747b82010-06-07 15:19:40 -0700381
Andreas Huber05079be2011-11-09 14:26:43 -0800382 ssize_t nbytes;
383 do {
384 nbytes = recvfrom(
Andreas Huber7a747b82010-06-07 15:19:40 -0700385 receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
386 buffer->data(),
387 buffer->capacity(),
388 0,
Andreas Huber57648e42010-08-04 10:14:30 -0700389 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
390 remoteAddrLen > 0 ? &remoteAddrLen : NULL);
Andreas Huber05079be2011-11-09 14:26:43 -0800391 } while (nbytes < 0 && errno == EINTR);
Andreas Huber7a747b82010-06-07 15:19:40 -0700392
Andreas Huber05079be2011-11-09 14:26:43 -0800393 if (nbytes <= 0) {
394 return -ECONNRESET;
Andreas Huber7a747b82010-06-07 15:19:40 -0700395 }
396
397 buffer->setRange(0, nbytes);
398
Steve Block6215d3f2012-01-04 20:05:49 +0000399 // ALOGI("received %d bytes.", buffer->size());
Andreas Hubera979ad62010-08-19 10:56:15 -0700400
Andreas Huber7a747b82010-06-07 15:19:40 -0700401 status_t err;
402 if (receiveRTP) {
403 err = parseRTP(s, buffer);
404 } else {
405 err = parseRTCP(s, buffer);
406 }
407
408 return err;
409}
410
411status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
Andreas Hubercc5fb1d2010-10-13 12:15:03 -0700412 if (s->mNumRTPPacketsReceived++ == 0) {
413 sp<AMessage> notify = s->mNotifyMsg->dup();
414 notify->setInt32("first-rtp", true);
415 notify->post();
416 }
417
Andreas Huber7a747b82010-06-07 15:19:40 -0700418 size_t size = buffer->size();
419
420 if (size < 12) {
421 // Too short to be a valid RTP header.
422 return -1;
423 }
424
425 const uint8_t *data = buffer->data();
426
427 if ((data[0] >> 6) != 2) {
428 // Unsupported version.
429 return -1;
430 }
431
432 if (data[0] & 0x20) {
433 // Padding present.
434
435 size_t paddingLength = data[size - 1];
436
437 if (paddingLength + 12 > size) {
438 // If we removed this much padding we'd end up with something
439 // that's too short to be a valid RTP header.
440 return -1;
441 }
442
443 size -= paddingLength;
444 }
445
446 int numCSRCs = data[0] & 0x0f;
447
448 size_t payloadOffset = 12 + 4 * numCSRCs;
449
450 if (size < payloadOffset) {
451 // Not enough data to fit the basic header and all the CSRC entries.
452 return -1;
453 }
454
455 if (data[0] & 0x10) {
456 // Header eXtension present.
457
458 if (size < payloadOffset + 4) {
459 // Not enough data to fit the basic header, all CSRC entries
460 // and the first 4 bytes of the extension header.
461
462 return -1;
463 }
464
465 const uint8_t *extensionData = &data[payloadOffset];
466
467 size_t extensionLength =
468 4 * (extensionData[2] << 8 | extensionData[3]);
469
470 if (size < payloadOffset + 4 + extensionLength) {
471 return -1;
472 }
473
474 payloadOffset += 4 + extensionLength;
475 }
476
477 uint32_t srcId = u32at(&data[8]);
478
Andreas Huber57648e42010-08-04 10:14:30 -0700479 sp<ARTPSource> source = findSource(s, srcId);
Andreas Huber7a747b82010-06-07 15:19:40 -0700480
481 uint32_t rtpTime = u32at(&data[4]);
482
483 sp<AMessage> meta = buffer->meta();
484 meta->setInt32("ssrc", srcId);
485 meta->setInt32("rtp-time", rtpTime);
486 meta->setInt32("PT", data[1] & 0x7f);
487 meta->setInt32("M", data[1] >> 7);
488
489 buffer->setInt32Data(u16at(&data[2]));
Andreas Huber7a747b82010-06-07 15:19:40 -0700490 buffer->setRange(payloadOffset, size - payloadOffset);
491
492 source->processRTPPacket(buffer);
493
494 return OK;
495}
496
497status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
Andreas Huber3a48d4d2010-08-31 10:43:47 -0700498 if (s->mNumRTCPPacketsReceived++ == 0) {
499 sp<AMessage> notify = s->mNotifyMsg->dup();
500 notify->setInt32("first-rtcp", true);
501 notify->post();
502 }
503
Andreas Huber7a747b82010-06-07 15:19:40 -0700504 const uint8_t *data = buffer->data();
505 size_t size = buffer->size();
506
507 while (size > 0) {
508 if (size < 8) {
509 // Too short to be a valid RTCP header
510 return -1;
511 }
512
513 if ((data[0] >> 6) != 2) {
514 // Unsupported version.
515 return -1;
516 }
517
518 if (data[0] & 0x20) {
519 // Padding present.
520
521 size_t paddingLength = data[size - 1];
522
523 if (paddingLength + 12 > size) {
524 // If we removed this much padding we'd end up with something
525 // that's too short to be a valid RTP header.
526 return -1;
527 }
528
529 size -= paddingLength;
530 }
531
532 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
533
534 if (size < headerLength) {
535 // Only received a partial packet?
536 return -1;
537 }
538
539 switch (data[1]) {
540 case 200:
541 {
542 parseSR(s, data, headerLength);
543 break;
544 }
545
Andreas Huber57648e42010-08-04 10:14:30 -0700546 case 201: // RR
547 case 202: // SDES
548 case 204: // APP
549 break;
550
551 case 205: // TSFB (transport layer specific feedback)
552 case 206: // PSFB (payload specific feedback)
553 // hexdump(data, headerLength);
554 break;
555
556 case 203:
557 {
558 parseBYE(s, data, headerLength);
559 break;
560 }
561
Andreas Huber7a747b82010-06-07 15:19:40 -0700562 default:
563 {
Andreas Huber6e3fa442010-09-21 13:13:15 -0700564 LOGW("Unknown RTCP packet type %u of size %d",
565 (unsigned)data[1], headerLength);
Andreas Huber7a747b82010-06-07 15:19:40 -0700566 break;
567 }
568 }
569
570 data += headerLength;
571 size -= headerLength;
572 }
573
574 return OK;
575}
576
Andreas Huber57648e42010-08-04 10:14:30 -0700577status_t ARTPConnection::parseBYE(
578 StreamInfo *s, const uint8_t *data, size_t size) {
579 size_t SC = data[0] & 0x3f;
580
581 if (SC == 0 || size < (4 + SC * 4)) {
582 // Packet too short for the minimal BYE header.
583 return -1;
584 }
585
586 uint32_t id = u32at(&data[4]);
587
588 sp<ARTPSource> source = findSource(s, id);
589
590 source->byeReceived();
591
592 return OK;
593}
594
Andreas Huber7a747b82010-06-07 15:19:40 -0700595status_t ARTPConnection::parseSR(
596 StreamInfo *s, const uint8_t *data, size_t size) {
597 size_t RC = data[0] & 0x1f;
598
599 if (size < (7 + RC * 6) * 4) {
600 // Packet too short for the minimal SR header.
601 return -1;
602 }
603
604 uint32_t id = u32at(&data[4]);
605 uint64_t ntpTime = u64at(&data[8]);
606 uint32_t rtpTime = u32at(&data[16]);
607
Andreas Huber57648e42010-08-04 10:14:30 -0700608#if 0
Steve Block6215d3f2012-01-04 20:05:49 +0000609 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
Andreas Huber6e3fa442010-09-21 13:13:15 -0700610 id,
611 rtpTime,
612 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
Andreas Huber7a747b82010-06-07 15:19:40 -0700613#endif
614
Andreas Huber57648e42010-08-04 10:14:30 -0700615 sp<ARTPSource> source = findSource(s, id);
Andreas Huber7a747b82010-06-07 15:19:40 -0700616
Andreas Huberb2934b12011-02-08 10:18:41 -0800617 source->timeUpdate(rtpTime, ntpTime);
Andreas Huber7a747b82010-06-07 15:19:40 -0700618
619 return 0;
620}
621
Andreas Huber57648e42010-08-04 10:14:30 -0700622sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
623 sp<ARTPSource> source;
624 ssize_t index = info->mSources.indexOfKey(srcId);
625 if (index < 0) {
626 index = info->mSources.size();
627
628 source = new ARTPSource(
629 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
630
Andreas Huber57648e42010-08-04 10:14:30 -0700631 info->mSources.add(srcId, source);
632 } else {
633 source = info->mSources.valueAt(index);
634 }
635
636 return source;
637}
638
Andreas Huber0416da72010-08-26 11:17:32 -0700639void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
640 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
641 msg->setInt32("index", index);
642 msg->setObject("buffer", buffer);
643 msg->post();
644}
645
646void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
647 int32_t index;
648 CHECK(msg->findInt32("index", &index));
649
650 sp<RefBase> obj;
651 CHECK(msg->findObject("buffer", &obj));
652
653 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
654
655 List<StreamInfo>::iterator it = mStreams.begin();
656 while (it != mStreams.end()
657 && it->mRTPSocket != index && it->mRTCPSocket != index) {
658 ++it;
659 }
660
661 if (it == mStreams.end()) {
662 TRESPASS();
663 }
664
665 StreamInfo *s = &*it;
666
667 status_t err;
668 if (it->mRTPSocket == index) {
669 err = parseRTP(s, buffer);
670 } else {
Andreas Huber0416da72010-08-26 11:17:32 -0700671 err = parseRTCP(s, buffer);
672 }
673}
674
Andreas Huber7a747b82010-06-07 15:19:40 -0700675} // namespace android
676